超额计费
场景
账单系统上线后,运行正常。
但运营团队发现了一些问题:
用户反馈:
1. 为什么我的账单有超额费用?
2. 超额费用是怎么计算的?
3. 我什么时候超的限?
技术问题:
1. 统计数据延迟(月度统计不是实时的)
2. 无法精确计算超额时间点
3. 用户无法实时查看已用量问题分析
我查了一下当前的计费逻辑:
# 当前计费方式(简化版)
total_calls = get_monthly_stats(user_id, month)
plan_limit = subscription['daily_limit'] * 30
overage = max(0, total_calls - plan_limit)
overage_fee = overage * 0.001问题:
- 使用月度统计表,数据有延迟
- 无法精确计算超额时间点
- 用户无法实时监控自己的用量
改进方案
实时用量追踪
不再依赖月度统计表,而是实时追踪:
# 实时用量追踪
def track_api_call_realtime(user_id):
"""实时追踪 API 调用"""
current_time = datetime.now()
current_month = current_time.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
# 使用 Redis 的 INCR 命令(原子操作)
cache_key = f'usage:{user_id}:{current_month.strftime("%Y-%m")}'
# 增加计数
current_count = redis_client.incr(cache_key)
# 设置过期时间(2 个月)
redis_client.expire(cache_key, 60 * 24 * 60)
return current_count实时查询用量
@app.route('/api/billing/usage')
@require_api_key
def get_current_usage():
"""获取当前实时用量"""
current_time = datetime.now()
current_month = current_time.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
# 获取用户套餐限额
limits = get_user_plan_limits(request.user['id'])
# 计算月度限额
daily_limit = get_daily_limit(request.user['id'])
monthly_limit = daily_limit * 30
# 获取当前用量
cache_key = f'usage:{request.user["id"]}:{current_month.strftime("%Y-%m")}'
current_usage = int(redis_client.get(cache_key) or 0)
# 计算超额
overage = max(0, current_usage - monthly_limit)
# 计算预估费用
overage_fee = overage * 0.001
# 计算剩余天数
days_in_month = (current_time.replace(day=28) + timedelta(days=4)).day
days_remaining = days_in_month - current_time.day
return jsonify({
'period': {
'month': current_month.strftime('%Y-%m'),
'days_remaining': days_remaining
},
'usage': {
'current': current_usage,
'limit': monthly_limit,
'overage': overage,
'percentage': (current_usage / monthly_limit * 100) if monthly_limit > 0 else 0
},
'estimated_cost': {
'overage_fee': overage_fee
},
'plan': {
'name': limits['plan'],
'daily_limit': daily_limit
}
})超额预警
用量达到 80% 时预警
def check_usage_warning(user_id):
"""检查是否需要发送用量预警"""
current_time = datetime.now()
current_month = current_time.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
# 获取用户套餐
limits = get_user_plan_limits(user_id)
daily_limit = get_daily_limit(user_id)
monthly_limit = daily_limit * 30
# 获取当前用量
cache_key = f'usage:{user_id}:{current_month.strftime("%Y-%m")}'
current_usage = int(redis_client.get(cache_key) or 0)
# 计算使用百分比
usage_percentage = (current_usage / monthly_limit * 100) if monthly_limit > 0 else 0
# 检查是否需要预警
warning_key = f'usage_warning:{user_id}:{current_month.strftime("%Y-%m")}'
if usage_percentage >= 80:
# 检查是否已经发送过预警
if not redis_client.exists(warning_key):
# 发送预警
send_usage_warning_email(user_id, current_usage, monthly_limit, usage_percentage)
# 标记已发送(7 天内不重复发送)
redis_client.setex(warning_key, 7 * 24 * 3600, '1')
if usage_percentage >= 100:
# 检查是否已经发送过超额通知
overage_key = f'usage_overage:{user_id}:{current_month.strftime("%Y-%m")}'
if not redis_client.exists(overage_key):
# 发送超额通知
send_overage_notification_email(user_id, current_usage, monthly_limit)
# 标记已发送(1 天内不重复发送)
redis_client.setex(overage_key, 24 * 3600, '1')
def send_usage_warning_email(user_id, current_usage, limit, percentage):
"""发送用量预警邮件"""
user = get_user_info(user_id)
subject = f'【用量预警】您的 API 调用量已达到{percentage:.0f}%'
body = f'''
尊敬的{user["name"]}:
您的 API 调用量已达到套餐限额的{percentage:.0f}%。
当前情况:
- 本月已用:{current_usage:,}次
- 套餐限额:{limit:,}次
- 使用比例:{percentage:.1f}%
建议:
1. 检查是否有异常调用
2. 考虑升级套餐以获得更高限额
3. 设置调用监控和告警
如果继续超出限额,将产生超额费用。
此致
API 平台团队
'''
send_email(to=user['email'], subject=subject, body=body)
def send_overage_notification_email(user_id, current_usage, limit):
"""发送超额通知邮件"""
user = get_user_info(user_id)
overage = current_usage - limit
estimated_fee = overage * 0.001
subject = f'【超额通知】您的 API 调用量已超出套餐限额'
body = f'''
尊敬的{user["name"]}:
您的 API 调用量已超出套餐限额。
当前情况:
- 本月已用:{current_usage:,}次
- 套餐限额:{limit:,}次
- 超出量:{overage:,}次
- 预估超额费用:¥{estimated_fee:.2f}
计费规则:
- 超出部分按¥0.001/次计费
- 月底统一结算
建议:
1. 立即检查调用日志
2. 考虑升级套餐以节省费用
3. 设置调用限流
此致
API 平台团队
'''
send_email(to=user['email'], subject=subject, body=body)超额限制
硬限额 vs 软限额
# 配置超额处理策略
OVERAGE_CONFIG = {
'free': {
'hard_limit': True, # 免费版:硬限额(超出后直接拒绝)
'max_overage': 0, # 最大超额:0
'action': 'reject' # 动作:拒绝请求
},
'basic': {
'hard_limit': False, # 付费版:软限额(允许超额,但收费)
'max_overage': 10000, # 最大超额:10000 次
'action': 'charge' # 动作:计费
},
'pro': {
'hard_limit': False,
'max_overage': 100000,
'action': 'charge'
},
'enterprise': {
'hard_limit': False,
'max_overage': -1, # -1 表示无限制
'action': 'charge'
}
}
def check_overage_limit(user_id):
"""检查超额限制"""
# 获取用户套餐
limits = get_user_plan_limits(user_id)
plan = limits['plan']
config = OVERAGE_CONFIG.get(plan, OVERAGE_CONFIG['basic'])
if not config['hard_limit']:
# 软限额,总是允许
return {
'allowed': True,
'action': 'charge'
}
# 硬限额
daily_limit = get_daily_limit(user_id)
monthly_limit = daily_limit * 30
# 获取当前用量
current_time = datetime.now()
current_month = current_time.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
cache_key = f'usage:{user_id}:{current_month.strftime("%Y-%m")}'
current_usage = int(redis_client.get(cache_key) or 0)
if current_usage >= monthly_limit:
return {
'allowed': False,
'action': 'reject',
'reason': 'Monthly limit exceeded',
'upgrade_url': f'/pricing?upgrade=1'
}
return {
'allowed': True,
'action': 'allow'
}
# 在 API 调用中使用
@app.route('/api/weather')
@require_api_key
def get_weather():
# 检查超额限制
overage_check = check_overage_limit(request.user['id'])
if not overage_check['allowed']:
return jsonify({
'error': 'Monthly limit exceeded',
'message': '您已达到本月调用限额,请升级套餐或等待下月重置',
'upgrade_url': overage_check.get('upgrade_url')
}), 429
# 追踪调用
track_api_call_realtime(request.user['id'])
# ... 业务逻辑精确账单生成
使用实时数据生成账单
def generate_accurate_invoice(user_id, month):
"""使用实时数据生成精确账单"""
# 获取月份范围
if isinstance(month, str):
month = datetime.strptime(month, '%Y-%m').date()
period_start = month.replace(day=1)
period_end = (period_start + timedelta(days=32)).replace(day=1) - timedelta(days=1)
with get_db_connection() as conn:
cursor = conn.cursor()
# 获取用户订阅
cursor.execute(
'''SELECT us.*, p.* FROM user_subscriptions us
JOIN pricing_plans p ON us.plan_id = p.id
WHERE us.user_id = ?
AND us.current_period_start <= ?
AND us.current_period_end >= ?
AND us.status = 'active'
ORDER BY us.created_at DESC
LIMIT 1''',
(user_id, period_end, period_start)
)
subscription = cursor.fetchone()
if not subscription:
raise Exception(f'No active subscription for user {user_id}')
# 从 Redis 获取实时用量
cache_key = f'usage:{user_id}:{period_start.strftime("%Y-%m")}'
total_calls = int(redis_client.get(cache_key) or 0)
# 如果 Redis 中没有数据,从数据库统计
if total_calls == 0:
cursor.execute(
'''SELECT SUM(total_calls) as total_calls
FROM user_stats_monthly
WHERE user_id = ? AND month = ?''',
(user_id, period_start)
)
result = cursor.fetchone()
total_calls = result['total_calls'] or 0
# 计算超额
plan_limit = subscription['daily_limit'] * 30
overage = max(0, total_calls - plan_limit)
# 生成账单
cursor.execute(
'''INSERT INTO invoices
(user_id, subscription_id, billing_period_start, billing_period_end,
total_calls, plan_limit, overage_calls, status)
VALUES (?, ?, ?, ?, ?, ?, ?, 'draft')''',
(user_id, subscription['id'], period_start, period_end,
total_calls, plan_limit, overage)
)
invoice_id = cursor.lastrowid
# 添加基础费用
base_fee = subscription['price_monthly']
cursor.execute(
'''INSERT INTO invoice_items
(invoice_id, description, quantity, unit_price, amount, item_type)
VALUES (?, ?, 1, ?, ?, 'base_fee')''',
(invoice_id, f'{subscription["display_name"]} 月费', base_fee, base_fee)
)
# 添加超额费用
if overage > 0:
overage_unit_price = 0.001
overage_amount = overage * overage_unit_price
cursor.execute(
'''INSERT INTO invoice_items
(invoice_id, description, quantity, unit_price, amount, item_type)
VALUES (?, ?, ?, ?, ?, 'overage')''',
(invoice_id, f'超额调用 {overage:,} 次', overage, overage_unit_price, overage_amount)
)
# 计算总计
cursor.execute(
'''SELECT SUM(amount) as subtotal FROM invoice_items
WHERE invoice_id = ?''',
(invoice_id,)
)
subtotal = cursor.fetchone()['subtotal'] or 0
tax = subtotal * 0.06
total = subtotal + tax
# 更新账单
cursor.execute(
'''UPDATE invoices
SET subtotal = ?, tax = ?, total = ?
WHERE id = ?''',
(subtotal, tax, total, invoice_id)
)
conn.commit()
return invoice_id用量仪表盘
前端显示
<!-- usage-dashboard.html -->
<div class="usage-dashboard">
<h2>本月用量</h2>
<div class="usage-summary">
<div class="usage-bar">
<div class="usage-fill" style="width: 75%"></div>
</div>
<div class="usage-text">
已用 7,500 次 / 限额 10,000 次 (75%)
</div>
</div>
<div class="usage-details">
<div class="detail-item">
<span class="label">套餐</span>
<span class="value">基础版</span>
</div>
<div class="detail-item">
<span class="label">日限额</span>
<span class="value">10,000 次/天</span>
</div>
<div class="detail-item">
<span class="label">已用量</span>
<span class="value">7,500 次</span>
</div>
<div class="detail-item">
<span class="label">剩余量</span>
<span class="value">2,500 次</span>
</div>
<div class="detail-item">
<span class="label">超额预估</span>
<span class="value">¥0.00</span>
</div>
</div>
<div class="usage-chart">
<!-- 过去 30 天的调用趋势图 -->
</div>
</div>本节小结
✅ 完成的工作:
- 实现了实时用量追踪
- 实现了用量预警机制
- 实现了超额限制策略
- 改进了账单生成逻辑
- 提供了用量查询 API
✅ 用户体验提升:
- 实时查看用量
- 及时收到预警
- 了解超额费用
🎯 下一步: 我需要接入更多 API 来吸引更多用户。
