访问控制
企业需求 🔐
一个周一的上午,我收到一封企业客户的邮件:
“你好,我们的内部文档链接,不想让外部人员访问。能不能加个密码?”
另外,有些链接只想让公司内部的人访问,可以按IP限制吗?
我意识到,短链接服务已经不能只是简单的”点击即跳转”了。企业用户有强烈的安全需求——他们需要访问控制。
访问控制(Access Control)是保护资源不被未授权用户访问的机制。在短链接场景中,我需要实现多层防护:
- 密码保护:只有知道密码的人才能访问
- IP白名单:只允许特定IP地址访问
- 访问次数限制:只允许前N个人访问
- 时间窗口限制:只在特定时间段内可访问
让我逐一实现这些功能。
密码保护 🔑
为什么需要密码保护?
密码保护是最基础也是最常用的访问控制方式。企业客户用它来保护:
- 内部文档链接
- 限时优惠链接
- 私密分享的文件
- 测试环境的入口
我面临的第一个问题是:如何安全地存储密码?
数据库设计
我需要在 urls 表中添加密码相关字段:
-- 添加密码字段(存储哈希值)
ALTER TABLE urls ADD COLUMN password_hash VARCHAR(255);
-- 添加密码提示(可选,帮助用户回忆)
ALTER TABLE urls ADD COLUMN password_hint VARCHAR(255);
-- 添加密码验证失败的尝试次数(用于防暴力破解)
ALTER TABLE urls ADD COLUMN failed_attempts INTEGER DEFAULT 0;
-- 添加访问计数器
ALTER TABLE urls ADD COLUMN access_count INTEGER DEFAULT 0;
-- 添加最大访问次数限制
ALTER TABLE urls ADD COLUMN max_access_count INTEGER DEFAULT NULL;密码哈希:为什么不用 MD5?
在实现密码存储时,我需要考虑安全性。很多人会用 MD5 或 SHA1 来哈希密码:
# ❌ 错误做法:容易受到彩虹表攻击
import hashlib
def hash_password_bad(password):
return hashlib.md5(password.encode()).hexdigest()为什么不这样做?
MD5 和 SHA1 是快速哈希算法,设计初衷是快速计算校验和,而非安全存储密码。攻击者可以:
- 彩虹表攻击:预先计算常见密码的哈希值(如 “123456”、“password”),然后用查表方式破解
- 暴力破解:因为计算速度快,一秒钟可以尝试数十亿次
使用 bcrypt:正确的做法
我选择了 bcrypt——专门为密码哈希设计的算法:
# ✅ 正确做法:使用 bcrypt
import bcrypt
def hash_password(password: str) -> str:
"""生成密码哈希"""
# bcrypt 会自动生成盐值并包含在哈希结果中
salt = bcrypt.gensalt()
hashed = bcrypt.hashpw(password.encode('utf-8'), salt)
return hashed.decode('utf-8')
def verify_password(password: str, hashed: str) -> bool:
"""验证密码"""
return bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8'))bcrypt 的优势:
- 自动加盐:每次生成的哈希都不同,防止彩虹表攻击
- 计算成本可调:通过 work factor 参数控制计算时间,增加暴力破解成本
- 抗GPU攻击:算法设计使得GPU并行计算优势不明显
创建带密码的短链接
from flask import request, session, redirect, render_template
import bcrypt
import sqlite3
from datetime import datetime
def create_short_url_with_password(long_url: str, custom_password: str = None, hint: str = None):
"""创建带密码保护的短链接"""
# 生成短码
short_code = generate_short_code()
# 如果提供了密码,则进行哈希
password_hash = None
if custom_password:
password_hash = hash_password(custom_password)
# 存储到数据库
db.execute("""
INSERT INTO urls (
short_code, long_url, password_hash,
password_hint, created_at
) VALUES (?, ?, ?, ?, ?)
""", (short_code, long_url, password_hash, hint, datetime.now()))
return short_code
# 示例:创建一个带密码的短链接
short_code = create_short_url_with_password(
long_url="https://internal.company.com/docs/confidential",
custom_password="secret123",
hint="公司成立年份的后三位"
)
print(f"短链接已创建:https://short.ly/{short_code}")
print(f"密码提示:公司成立年份的后三位")访问流程:密码验证
当用户访问带密码的短链接时,我需要:
- 检查链接是否设置了密码
- 检查用户是否已经通过验证(Session)
- 如果未验证,显示密码输入页面
- 验证密码正确性,防止暴力破解
from flask import session
import time
@app.route('/<short_code>', methods=['GET'])
def redirect_with_auth(short_code: str):
"""带认证的短链接跳转"""
# 查询短链接信息
result = db.query("""
SELECT long_url, password_hash, password_hint,
access_count, max_access_count
FROM urls
WHERE short_code = ?
""", (short_code,))
if not result:
return render_template('404.html'), 404
url_info = result[0]
# 检查是否需要密码验证
if url_info['password_hash']:
# 检查 session 中是否已验证
auth_key = f'auth_{short_code}'
if not session.get(auth_key):
# 未验证,显示密码输入页面
return render_template(
'password_input.html',
short_code=short_code,
hint=url_info['password_hint']
)
# 检查访问次数限制
if url_info['max_access_count']:
if url_info['access_count'] >= url_info['max_access_count']:
return render_template('access_limit_exceeded.html'), 403
# 更新访问计数
db.execute("""
UPDATE urls
SET access_count = access_count + 1
WHERE short_code = ?
""", (short_code,))
# 跳转到目标链接
return redirect(url_info['long_url'], code=302)
@app.route('/<short_code>/auth', methods=['POST'])
def verify_password(short_code: str):
"""验证密码"""
password = request.form.get('password')
if not password:
return {"error": "请输入密码"}, 400
# 查询短链接的密码哈希
result = db.query("""
SELECT password_hash, failed_attempts,
last_failed_attempt
FROM urls
WHERE short_code = ?
""", (short_code,))
if not result:
return {"error": "短链接不存在"}, 404
url_info = result[0]
# 检查是否被锁定(防暴力破解)
if url_info['failed_attempts'] >= 5:
last_failed = url_info['last_failed_attempt']
if last_failed and (datetime.now() - last_failed).seconds < 300: # 5分钟锁定
return {"error": "尝试次数过多,请5分钟后再试"}, 429
# 验证密码
if verify_password(password, url_info['password_hash']):
# 密码正确,清除失败记录
db.execute("""
UPDATE urls
SET failed_attempts = 0, last_failed_attempt = NULL
WHERE short_code = ?
""", (short_code,))
# 设置 session
session[f'auth_{short_code}'] = True
session[f'auth_time_{short_code}'] = datetime.now().timestamp()
return redirect(f'/{short_code}')
else:
# 密码错误,记录失败次数
db.execute("""
UPDATE urls
SET failed_attempts = failed_attempts + 1,
last_failed_attempt = ?
WHERE short_code = ?
""", (datetime.now(), short_code))
remaining = 5 - (url_info['failed_attempts'] + 1)
return {
"error": "密码错误",
"remaining_attempts": remaining
}, 401密码输入页面
<!-- templates/password_input.html -->
<!DOCTYPE html>
<html>
<head>
<title>需要密码验证</title>
<style>
.password-container {
max-width: 400px;
margin: 100px auto;
padding: 30px;
border-radius: 8px;
box-shadow: 0 2px 10px rgba(0,0,0,0.1);
}
.password-input {
width: 100%;
padding: 12px;
margin: 10px 0;
border: 1px solid #ddd;
border-radius: 4px;
}
.btn {
width: 100%;
padding: 12px;
background: #007bff;
color: white;
border: none;
border-radius: 4px;
cursor: pointer;
}
.hint {
color: #666;
font-size: 14px;
margin-bottom: 15px;
}
.error {
color: #dc3545;
margin-bottom: 10px;
}
</style>
</head>
<body>
<div class="password-container">
<h2>🔐 需要密码验证</h2>
{% if hint %}
<p class="hint">💡 提示:{{ hint }}</p>
{% endif %}
<form method="POST" action="/{{ short_code }}/auth">
<input
type="password"
name="password"
class="password-input"
placeholder="请输入密码"
required
autofocus
>
<button type="submit" class="btn">验证</button>
</form>
</div>
</body>
</html>IP 白名单 🌐
需求场景
密码保护可以防止未授权访问,但企业客户有更高的安全需求:
“我们的内部链接,只允许公司网络访问。能不能按IP限制?”
IP 白名单是更严格的访问控制方式,常用于:
- 内网资源保护:只允许公司内网IP访问
- 地域限制:只允许特定国家/地区的IP访问
- API访问控制:只允许合作伙伴服务器的IP调用
IP 地址的挑战
实现 IP 白名单有几个技术难点:
- 获取真实IP:需要处理代理、负载均衡器的情况
- IP范围匹配:单个IP不够,需要支持CIDR格式(如
192.168.1.0/24) - IPv4和IPv6兼容:需要同时支持两种协议
获取真实 IP
在 Flask 中,直接使用 request.remote_addr 可能不准确:
# ❌ 不准确:如果前面有代理,拿到的是代理IP
client_ip = request.remote_addr正确做法是检查 X-Forwarded-For 头:
def get_client_ip():
"""获取客户端真实IP"""
# 检查是否有代理
if request.headers.getlist("X-Forwarded-For"):
# X-Forwarded-For 可能包含多个IP,取第一个
return request.headers.getlist("X-Forwarded-For")[0]
else:
return request.remote_addrCIDR 匹配算法
企业客户不会只允许单个IP,而是整个IP段。CIDR(无类别域间路由)是标准表示法:
192.168.1.0/24:表示 192.168.1.0 到 192.168.1.255(256个IP)10.0.0.0/8:表示 10.0.0.0 到 10.255.255.255(1600万个IP)
我需要实现CIDR匹配算法:
import ipaddress
def is_ip_allowed(client_ip: str, allowed_ips: list) -> bool:
"""
检查IP是否在白名单中
Args:
client_ip: 客户端IP
allowed_ips: 允许的IP列表,支持单个IP或CIDR
Returns:
bool: 是否允许访问
"""
try:
client_ip_obj = ipaddress.ip_address(client_ip)
for allowed in allowed_ips:
# 检查是否是CIDR格式
if '/' in allowed:
# CIDR范围匹配
network = ipaddress.ip_network(allowed, strict=False)
if client_ip_obj in network:
return True
else:
# 单个IP匹配
if str(client_ip_obj) == allowed:
return True
return False
except ValueError:
# IP地址格式错误
return False
# 测试
print(is_ip_allowed("192.168.1.50", ["192.168.1.0/24"])) # True
print(is_ip_allowed("192.168.2.50", ["192.168.1.0/24"])) # False
print(is_ip_allowed("10.0.5.10", ["10.0.0.0/8"])) # True
print(is_ip_allowed("8.8.8.8", ["8.8.8.8"])) # True数据库设计
-- 添加IP白名单字段(JSON格式,存储多个IP/CIDR)
ALTER TABLE urls ADD COLUMN allowed_ips TEXT DEFAULT NULL;
-- 添加拒绝IP列表(黑名单)
ALTER TABLE urls ADD COLUMN denied_ips TEXT DEFAULT NULL;
-- 添加IP访问日志(用于审计)
CREATE TABLE ip_access_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
short_code VARCHAR(50) NOT NULL,
ip_address VARCHAR(45) NOT NULL,
access_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
allowed BOOLEAN NOT NULL,
FOREIGN KEY (short_code) REFERENCES urls(short_code)
);
-- 添加索引
CREATE INDEX idx_ip_logs_short_code ON ip_access_logs(short_code);
CREATE INDEX idx_ip_logs_time ON ip_access_logs(access_time);IP 白名单实现
import json
from flask import request, abort
@app.route('/<short_code>', methods=['GET'])
def redirect_with_ip_check(short_code: str):
"""带IP检查的短链接跳转"""
# 查询短链接信息
result = db.query("""
SELECT long_url, password_hash,
allowed_ips, denied_ips
FROM urls
WHERE short_code = ?
""", (short_code,))
if not result:
return render_template('404.html'), 404
url_info = result[0]
# 获取客户端真实IP
client_ip = get_client_ip()
# 先检查黑名单(优先级高)
if url_info['denied_ips']:
denied_ips = json.loads(url_info['denied_ips'])
if is_ip_allowed(client_ip, denied_ips):
# 记录访问日志
log_ip_access(short_code, client_ip, allowed=False)
abort(403, description="您的IP地址被禁止访问")
# 检查白名单
if url_info['allowed_ips']:
allowed_ips = json.loads(url_info['allowed_ips'])
if not is_ip_allowed(client_ip, allowed_ips):
# 记录访问日志
log_ip_access(short_code, client_ip, allowed=False)
abort(403, description="您的IP地址不在白名单中")
# 记录成功访问
log_ip_access(short_code, client_ip, allowed=True)
# 密码检查(如果设置了密码)
if url_info['password_hash']:
auth_key = f'auth_{short_code}'
if not session.get(auth_key):
return render_template('password_input.html', short_code=short_code)
# 跳转
return redirect(url_info['long_url'], code=302)
def log_ip_access(short_code: str, ip: str, allowed: bool):
"""记录IP访问日志"""
db.execute("""
INSERT INTO ip_access_logs (short_code, ip_address, allowed)
VALUES (?, ?, ?)
""", (short_code, ip, allowed))创建带IP限制的短链接
def create_short_url_with_ip_restriction(
long_url: str,
allowed_ips: list = None,
denied_ips: list = None
) -> str:
"""
创建带IP限制的短链接
Args:
long_url: 原始长链接
allowed_ips: 白名单(IP列表或CIDR)
denied_ips: 黑名单
Returns:
str: 短码
"""
short_code = generate_short_code()
# 将IP列表转换为JSON存储
allowed_ips_json = json.dumps(allowed_ips) if allowed_ips else None
denied_ips_json = json.dumps(denied_ips) if denied_ips else None
db.execute("""
INSERT INTO urls (
short_code, long_url,
allowed_ips, denied_ips,
created_at
) VALUES (?, ?, ?, ?, ?)
""", (short_code, long_url, allowed_ips_json, denied_ips_json, datetime.now()))
return short_code
# 示例:只允许公司内网访问
short_code = create_short_url_with_ip_restriction(
long_url="https://internal.company.com/dashboard",
allowed_ips=["192.168.1.0/24", "10.0.0.0/8"]
)
# 示例:禁止特定IP访问
short_code = create_short_url_with_ip_restriction(
long_url="https://example.com/sensitive",
denied_ips=["1.2.3.4", "5.6.7.0/24"]
)IP 访问统计
@app.route('/<short_code>/stats/ip')
def ip_access_stats(short_code: str):
"""IP访问统计"""
# 查询访问统计
stats = db.query("""
SELECT
ip_address,
COUNT(*) as access_count,
SUM(CASE WHEN allowed = 1 THEN 1 ELSE 0 END) as allowed_count,
SUM(CASE WHEN allowed = 0 THEN 1 ELSE 0 END) as denied_count,
MAX(access_time) as last_access
FROM ip_access_logs
WHERE short_code = ?
GROUP BY ip_address
ORDER BY access_count DESC
LIMIT 20
""", (short_code,))
return render_template('ip_stats.html', stats=stats)访问次数限制 🎫
需求场景
一个营销客户找到我:
“我们要发限时优惠券,只给前100名用户。能不能实现’前100人可以访问,其他人不行’?”
这就是访问次数限制的需求:
- 限量抢购:只允许前N人访问
- 试用期限制:免费版只能访问100次
- 配额管理:防止资源被过度消耗
数据库设计
我在密码保护部分已经添加了相关字段:
-- 访问计数器
ALTER TABLE urls ADD COLUMN access_count INTEGER DEFAULT 0;
-- 最大访问次数(NULL表示无限制)
ALTER TABLE urls ADD COLUMN max_access_count INTEGER DEFAULT NULL;
-- 达到限制后的操作:1=显示提示,2=跳转到备用链接
ALTER TABLE urls ADD COLUMN limit_action INTEGER DEFAULT 1;
-- 备用链接(当达到限制时跳转)
ALTER TABLE urls ADD COLUMN fallback_url TEXT DEFAULT NULL;访问次数限制实现
from flask import render_template
def check_access_limit(short_code: str) -> tuple:
"""
检查访问次数限制
Returns:
tuple: (allowed: bool, action: str)
"""
result = db.query("""
SELECT access_count, max_access_count,
limit_action, fallback_url
FROM urls
WHERE short_code = ?
""", (short_code,))
if not result:
return False, "not_found"
url_info = result[0]
# 如果没有设置限制,允许访问
if not url_info['max_access_count']:
return True, None
# 检查是否达到限制
if url_info['access_count'] >= url_info['max_access_count']:
action = url_info['limit_action']
fallback_url = url_info['fallback_url']
if action == 2 and fallback_url:
# 跳转到备用链接
return False, f"redirect:{fallback_url}"
else:
# 显示提示页面
return False, "show_limit_page"
return True, None
@app.route('/<short_code>', methods=['GET'])
def redirect_with_access_limit(short_code: str):
"""带访问次数限制的短链接跳转"""
# 检查访问次数限制
allowed, action = check_access_limit(short_code)
if not allowed:
if action.startswith("redirect:"):
# 跳转到备用链接
fallback_url = action.split(":", 1)[1]
return redirect(fallback_url, code=302)
elif action == "show_limit_page":
# 显示限制页面
return render_template('access_limit_exceeded.html')
else:
# 其他情况返回404
return render_template('404.html'), 404
# 查询链接信息
result = db.query("""
SELECT long_url, password_hash, allowed_ips
FROM urls
WHERE short_code = ?
""", (short_code,))
if not result:
return render_template('404.html'), 404
url_info = result[0]
# ... 其他检查(密码、IP等)...
# 更新访问计数
db.execute("""
UPDATE urls
SET access_count = access_count + 1
WHERE short_code = ?
""", (short_code,))
# 跳转
return redirect(url_info['long_url'], code=302)限制页面模板
<!-- templates/access_limit_exceeded.html -->
<!DOCTYPE html>
<html>
<head>
<title>访问次数已用完</title>
<style>
.limit-container {
max-width: 500px;
margin: 100px auto;
text-align: center;
padding: 40px;
border-radius: 8px;
box-shadow: 0 2px 10px rgba(0,0,0,0.1);
}
.icon {
font-size: 60px;
margin-bottom: 20px;
}
h1 {
color: #333;
margin-bottom: 15px;
}
p {
color: #666;
line-height: 1.6;
}
</style>
</head>
<body>
<div class="limit-container">
<div class="icon">🎫</div>
<h1>访问次数已达上限</h1>
<p>
抱歉,该短链接的访问次数已达到上限。<br>
可能是因为名额已满或资源配额已用完。
</p>
<p>
如有疑问,请联系链接创建者。
</p>
</div>
</body>
</html>创建限次访问的短链接
def create_short_url_with_access_limit(
long_url: str,
max_access: int,
fallback_url: str = None
) -> str:
"""
创建限次访问的短链接
Args:
long_url: 原始长链接
max_access: 最大访问次数
fallback_url: 达到限制后的备用链接
Returns:
str: 短码
"""
short_code = generate_short_code()
db.execute("""
INSERT INTO urls (
short_code, long_url,
max_access_count, fallback_url,
limit_action, created_at
) VALUES (?, ?, ?, ?, ?, ?)
""", (short_code, long_url, max_access, fallback_url, 2, datetime.now()))
return short_code
# 示例:前100人可以访问
short_code = create_short_url_with_access_limit(
long_url="https://shop.com/coupon-early-bird",
max_access=100,
fallback_url="https://shop.com/coupon-expired"
)实时访问计数查询
@app.route('/<short_code>/stats/access')
def access_count_stats(short_code: str):
"""查询访问计数"""
result = db.query("""
SELECT
access_count,
max_access_count,
CASE
WHEN max_access_count IS NULL THEN '无限制'
WHEN access_count >= max_access_count THEN '已达上限'
ELSE '可用'
END as status,
CASE
WHEN max_access_count IS NOT NULL
THEN ROUND((access_count * 100.0 / max_access_count), 2)
ELSE NULL
END as usage_percentage
FROM urls
WHERE short_code = ?
""", (short_code,))
if not result:
return {"error": "短链接不存在"}, 404
return {
"short_code": short_code,
"access_count": result[0]['access_count'],
"max_access_count": result[0]['max_access_count'],
"status": result[0]['status'],
"usage_percentage": result[0]['usage_percentage']
}时间窗口限制 ⏰
需求场景
又有一个企业客户提出需求:
“我们的内部链接,只在工作时间可以访问,下班后不行。”
这就是时间窗口限制:
- 工作时间限制:只在周一到周五的9:00-18:00可访问
- 活动时间限制:限时促销,只在一个日期范围内有效
- 试用期限制:新用户注册后7天内可访问
数据库设计
-- 生效时间(UTC时间戳)
ALTER TABLE urls ADD COLUMN valid_from TIMESTAMP DEFAULT NULL;
-- 失效时间(UTC时间戳)
ALTER TABLE urls ADD COLUMN valid_until TIMESTAMP DEFAULT NULL;
-- 允许访问的时间段(JSON格式)
ALTER TABLE urls ADD COLUMN allowed_hours TEXT DEFAULT NULL;
-- 格式:{"monday": [{"start": "09:00", "end": "18:00"}], ...}
-- 时区设置
ALTER TABLE urls ADD COLUMN timezone VARCHAR(50) DEFAULT 'UTC';时间范围限制
from datetime import datetime, timezone
import pytz
def is_within_time_range(short_code: str) -> bool:
"""检查当前时间是否在允许的时间范围内"""
result = db.query("""
SELECT valid_from, valid_until, timezone
FROM urls
WHERE short_code = ?
""", (short_code,))
if not result:
return False
url_info = result[0]
# 获取当前时间(考虑时区)
tz = pytz.timezone(url_info['timezone'] or 'UTC')
now = datetime.now(tz)
# 检查生效时间
if url_info['valid_from']:
valid_from = url_info['valid_from']
if valid_from.tzinfo is None:
valid_from = pytz.utc.localize(valid_from)
if now < valid_from:
return False
# 检查失效时间
if url_info['valid_until']:
valid_until = url_info['valid_until']
if valid_until.tzinfo is None:
valid_until = pytz.utc.localize(valid_until)
if now > valid_until:
return False
return True每周时间段限制
import json
from datetime import time
def is_within_allowed_hours(short_code: str) -> bool:
"""检查当前时间是否在允许的每周时间段内"""
result = db.query("""
SELECT allowed_hours, timezone
FROM urls
WHERE short_code = ?
""", (short_code,))
if not result:
return True # 没有限制则允许
url_info = result[0]
if not url_info['allowed_hours']:
return True
# 获取当前时间
tz = pytz.timezone(url_info['timezone'] or 'UTC')
now = datetime.now(tz)
# 解析允许的时间段
allowed_hours = json.loads(url_info['allowed_hours'])
# 获取当前是周几(0=周一,6=周日)
weekday = now.weekday()
# 获取当前时间
current_time = now.time()
# 检查是否有定义的时间段
weekday_name = ['monday', 'tuesday', 'wednesday', 'thursday',
'friday', 'saturday', 'sunday'][weekday]
if weekday_name not in allowed_hours:
return False # 当天不允许访问
# 检查当前时间是否在任意一个允许的时间段内
for period in allowed_hours[weekday_name]:
start_time = time.fromisoformat(period['start'])
end_time = time.fromisoformat(period['end'])
if start_time <= current_time <= end_time:
return True
return False创建时间限制的短链接
from datetime import datetime, timedelta
def create_short_url_with_time_limit(
long_url: str,
valid_from: datetime = None,
valid_until: datetime = None,
allowed_hours: dict = None,
timezone: str = 'Asia/Shanghai'
) -> str:
"""
创建带时间限制的短链接
Args:
long_url: 原始长链接
valid_from: 生效时间
valid_until: 失效时间
allowed_hours: 每周允许的时间段
timezone: 时区
Returns:
str: 短码
"""
short_code = generate_short_code()
# 将时间段转为JSON
allowed_hours_json = json.dumps(allowed_hours) if allowed_hours else None
db.execute("""
INSERT INTO urls (
short_code, long_url,
valid_from, valid_until,
allowed_hours, timezone,
created_at
) VALUES (?, ?, ?, ?, ?, ?, ?)
""", (short_code, long_url, valid_from, valid_until,
allowed_hours_json, timezone, datetime.now()))
return short_code
# 示例1:限时优惠券(2024年1月1日到1月7日)
short_code = create_short_url_with_time_limit(
long_url="https://shop.com/new-year-sale",
valid_from=datetime(2024, 1, 1, 0, 0, 0),
valid_until=datetime(2024, 1, 7, 23, 59, 59),
timezone='Asia/Shanghai'
)
# 示例2:只在工作时间访问
short_code = create_short_url_with_time_limit(
long_url="https://internal.company.com/dashboard",
allowed_hours={
'monday': [{'start': '09:00', 'end': '18:00'}],
'tuesday': [{'start': '09:00', 'end': '18:00'}],
'wednesday': [{'start': '09:00', 'end': '18:00'}],
'thursday': [{'start': '09:00', 'end': '18:00'}],
'friday': [{'start': '09:00', 'end': '18:00'}]
},
timezone='Asia/Shanghai'
)完整的时间检查流程
@app.route('/<short_code>', methods=['GET'])
def redirect_with_time_check(short_code: str):
"""带时间检查的短链接跳转"""
# 检查时间范围
if not is_within_time_range(short_code):
return render_template('invalid_time.html', reason="不在有效时间范围内")
# 检查每周时间段
if not is_within_allowed_hours(short_code):
return render_template('invalid_time.html', reason="当前时间不允许访问")
# ... 其他检查 ...
# 跳转
return redirect(url_info['long_url'], code=302)访问控制方案对比 📊
我已经实现了四种访问控制方案,每种都有其适用场景:
| 方案 | 实现难度 | 安全性 | 性能影响 | 适用场景 |
|---|---|---|---|---|
| 密码保护 | ⭐⭐ 中等 | ⭐⭐⭐ 中等 | ⭐⭐ 低 | 内部文档、私密分享 |
| IP白名单 | ⭐⭐ 中等 | ⭐⭐⭐⭐ 高 | ⭐⭐ 低 | 企业内网、地域限制 |
| 访问次数限制 | ⭐ 简单 | ⭐⭐ 低 | ⭐ 低 | 限量抢购、试用版 |
| 时间窗口限制 | ⭐⭐⭐ 较难 | ⭐⭐ 低 | ⭐ 低 | 限时活动、工作时间 |
组合使用
这些方案可以组合使用:
# 示例:企业内部链接
# - 只允许公司IP访问
# - 只在工作时间
# - 最多访问1000次
short_code = create_short_url(
long_url="https://internal.company.com/reports",
allowed_ips=["192.168.1.0/24"],
allowed_hours={"monday": [{"start": "09:00", "end": "18:00"}], ...},
max_access_count=1000
)性能考虑
访问控制会增加响应延迟,我做了优化:
- 数据库索引:在
short_code上建索引,快速查询 - 缓存Session:密码验证后缓存,避免重复查询
- IP匹配优化:使用高效的CIDR匹配算法
- 异步日志:访问日志异步写入,不影响主流程
下一步:批量管理 ⏭️
单个链接的访问控制已经实现了。但企业客户又提出新问题:
“我们有1000个内部链接,能不能批量设置IP白名单?” “每个月都要更新密码,能不能批量操作?”
单个链接管理已经不够了,我需要实现批量管理功能。
下一节,我会实现:
- 批量设置访问控制
- 批量更新密码
- 批量导入导出
- 权限组管理
(继续下一节:批量操作)