访问控制

企业需求 🔐

一个周一的上午,我收到一封企业客户的邮件:

“你好,我们的内部文档链接,不想让外部人员访问。能不能加个密码?”

另外,有些链接只想让公司内部的人访问,可以按IP限制吗?

我意识到,短链接服务已经不能只是简单的”点击即跳转”了。企业用户有强烈的安全需求——他们需要访问控制

访问控制(Access Control)是保护资源不被未授权用户访问的机制。在短链接场景中,我需要实现多层防护:

  • 密码保护:只有知道密码的人才能访问
  • IP白名单:只允许特定IP地址访问
  • 访问次数限制:只允许前N个人访问
  • 时间窗口限制:只在特定时间段内可访问

让我逐一实现这些功能。


密码保护 🔑

为什么需要密码保护?

密码保护是最基础也是最常用的访问控制方式。企业客户用它来保护:

  • 内部文档链接
  • 限时优惠链接
  • 私密分享的文件
  • 测试环境的入口

我面临的第一个问题是:如何安全地存储密码?

数据库设计

我需要在 urls 表中添加密码相关字段:

-- 添加密码字段(存储哈希值)
ALTER TABLE urls ADD COLUMN password_hash VARCHAR(255);

-- 添加密码提示(可选,帮助用户回忆)
ALTER TABLE urls ADD COLUMN password_hint VARCHAR(255);

-- 添加密码验证失败的尝试次数(用于防暴力破解)
ALTER TABLE urls ADD COLUMN failed_attempts INTEGER DEFAULT 0;

-- 添加访问计数器
ALTER TABLE urls ADD COLUMN access_count INTEGER DEFAULT 0;

-- 添加最大访问次数限制
ALTER TABLE urls ADD COLUMN max_access_count INTEGER DEFAULT NULL;

密码哈希:为什么不用 MD5?

在实现密码存储时,我需要考虑安全性。很多人会用 MD5 或 SHA1 来哈希密码:

# ❌ 错误做法:容易受到彩虹表攻击
import hashlib

def hash_password_bad(password):
    return hashlib.md5(password.encode()).hexdigest()

为什么不这样做?

MD5 和 SHA1 是快速哈希算法,设计初衷是快速计算校验和,而非安全存储密码。攻击者可以:

  1. 彩虹表攻击:预先计算常见密码的哈希值(如 “123456”、“password”),然后用查表方式破解
  2. 暴力破解:因为计算速度快,一秒钟可以尝试数十亿次

使用 bcrypt:正确的做法

我选择了 bcrypt——专门为密码哈希设计的算法:

# ✅ 正确做法:使用 bcrypt
import bcrypt

def hash_password(password: str) -> str:
    """生成密码哈希"""
    # bcrypt 会自动生成盐值并包含在哈希结果中
    salt = bcrypt.gensalt()
    hashed = bcrypt.hashpw(password.encode('utf-8'), salt)
    return hashed.decode('utf-8')

def verify_password(password: str, hashed: str) -> bool:
    """验证密码"""
    return bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8'))

bcrypt 的优势:

  1. 自动加盐:每次生成的哈希都不同,防止彩虹表攻击
  2. 计算成本可调:通过 work factor 参数控制计算时间,增加暴力破解成本
  3. 抗GPU攻击:算法设计使得GPU并行计算优势不明显

创建带密码的短链接

from flask import request, session, redirect, render_template
import bcrypt
import sqlite3
from datetime import datetime

def create_short_url_with_password(long_url: str, custom_password: str = None, hint: str = None):
    """创建带密码保护的短链接"""
    
    # 生成短码
    short_code = generate_short_code()
    
    # 如果提供了密码,则进行哈希
    password_hash = None
    if custom_password:
        password_hash = hash_password(custom_password)
    
    # 存储到数据库
    db.execute("""
        INSERT INTO urls (
            short_code, long_url, password_hash, 
            password_hint, created_at
        ) VALUES (?, ?, ?, ?, ?)
    """, (short_code, long_url, password_hash, hint, datetime.now()))
    
    return short_code

# 示例:创建一个带密码的短链接
short_code = create_short_url_with_password(
    long_url="https://internal.company.com/docs/confidential",
    custom_password="secret123",
    hint="公司成立年份的后三位"
)
print(f"短链接已创建:https://short.ly/{short_code}")
print(f"密码提示:公司成立年份的后三位")

访问流程:密码验证

当用户访问带密码的短链接时,我需要:

  1. 检查链接是否设置了密码
  2. 检查用户是否已经通过验证(Session)
  3. 如果未验证,显示密码输入页面
  4. 验证密码正确性,防止暴力破解
from flask import session
import time

@app.route('/<short_code>', methods=['GET'])
def redirect_with_auth(short_code: str):
    """带认证的短链接跳转"""
    
    # 查询短链接信息
    result = db.query("""
        SELECT long_url, password_hash, password_hint, 
               access_count, max_access_count
        FROM urls 
        WHERE short_code = ?
    """, (short_code,))
    
    if not result:
        return render_template('404.html'), 404
    
    url_info = result[0]
    
    # 检查是否需要密码验证
    if url_info['password_hash']:
        # 检查 session 中是否已验证
        auth_key = f'auth_{short_code}'
        if not session.get(auth_key):
            # 未验证,显示密码输入页面
            return render_template(
                'password_input.html',
                short_code=short_code,
                hint=url_info['password_hint']
            )
    
    # 检查访问次数限制
    if url_info['max_access_count']:
        if url_info['access_count'] >= url_info['max_access_count']:
            return render_template('access_limit_exceeded.html'), 403
    
    # 更新访问计数
    db.execute("""
        UPDATE urls 
        SET access_count = access_count + 1 
        WHERE short_code = ?
    """, (short_code,))
    
    # 跳转到目标链接
    return redirect(url_info['long_url'], code=302)

@app.route('/<short_code>/auth', methods=['POST'])
def verify_password(short_code: str):
    """验证密码"""
    
    password = request.form.get('password')
    if not password:
        return {"error": "请输入密码"}, 400
    
    # 查询短链接的密码哈希
    result = db.query("""
        SELECT password_hash, failed_attempts, 
               last_failed_attempt
        FROM urls 
        WHERE short_code = ?
    """, (short_code,))
    
    if not result:
        return {"error": "短链接不存在"}, 404
    
    url_info = result[0]
    
    # 检查是否被锁定(防暴力破解)
    if url_info['failed_attempts'] >= 5:
        last_failed = url_info['last_failed_attempt']
        if last_failed and (datetime.now() - last_failed).seconds < 300:  # 5分钟锁定
            return {"error": "尝试次数过多,请5分钟后再试"}, 429
    
    # 验证密码
    if verify_password(password, url_info['password_hash']):
        # 密码正确,清除失败记录
        db.execute("""
            UPDATE urls 
            SET failed_attempts = 0, last_failed_attempt = NULL
            WHERE short_code = ?
        """, (short_code,))
        
        # 设置 session
        session[f'auth_{short_code}'] = True
        session[f'auth_time_{short_code}'] = datetime.now().timestamp()
        
        return redirect(f'/{short_code}')
    else:
        # 密码错误,记录失败次数
        db.execute("""
            UPDATE urls 
            SET failed_attempts = failed_attempts + 1,
                last_failed_attempt = ?
            WHERE short_code = ?
        """, (datetime.now(), short_code))
        
        remaining = 5 - (url_info['failed_attempts'] + 1)
        return {
            "error": "密码错误",
            "remaining_attempts": remaining
        }, 401

密码输入页面

<!-- templates/password_input.html -->
<!DOCTYPE html>
<html>
<head>
    <title>需要密码验证</title>
    <style>
        .password-container {
            max-width: 400px;
            margin: 100px auto;
            padding: 30px;
            border-radius: 8px;
            box-shadow: 0 2px 10px rgba(0,0,0,0.1);
        }
        .password-input {
            width: 100%;
            padding: 12px;
            margin: 10px 0;
            border: 1px solid #ddd;
            border-radius: 4px;
        }
        .btn {
            width: 100%;
            padding: 12px;
            background: #007bff;
            color: white;
            border: none;
            border-radius: 4px;
            cursor: pointer;
        }
        .hint {
            color: #666;
            font-size: 14px;
            margin-bottom: 15px;
        }
        .error {
            color: #dc3545;
            margin-bottom: 10px;
        }
    </style>
</head>
<body>
    <div class="password-container">
        <h2>🔐 需要密码验证</h2>
        {% if hint %}
        <p class="hint">💡 提示:{{ hint }}</p>
        {% endif %}
        <form method="POST" action="/{{ short_code }}/auth">
            <input 
                type="password" 
                name="password" 
                class="password-input" 
                placeholder="请输入密码" 
                required
                autofocus
            >
            <button type="submit" class="btn">验证</button>
        </form>
    </div>
</body>
</html>

IP 白名单 🌐

需求场景

密码保护可以防止未授权访问,但企业客户有更高的安全需求:

“我们的内部链接,只允许公司网络访问。能不能按IP限制?”

IP 白名单是更严格的访问控制方式,常用于:

  • 内网资源保护:只允许公司内网IP访问
  • 地域限制:只允许特定国家/地区的IP访问
  • API访问控制:只允许合作伙伴服务器的IP调用

IP 地址的挑战

实现 IP 白名单有几个技术难点:

  1. 获取真实IP:需要处理代理、负载均衡器的情况
  2. IP范围匹配:单个IP不够,需要支持CIDR格式(如 192.168.1.0/24
  3. IPv4和IPv6兼容:需要同时支持两种协议

获取真实 IP

在 Flask 中,直接使用 request.remote_addr 可能不准确:

# ❌ 不准确:如果前面有代理,拿到的是代理IP
client_ip = request.remote_addr

正确做法是检查 X-Forwarded-For 头:

def get_client_ip():
    """获取客户端真实IP"""
    # 检查是否有代理
    if request.headers.getlist("X-Forwarded-For"):
        # X-Forwarded-For 可能包含多个IP,取第一个
        return request.headers.getlist("X-Forwarded-For")[0]
    else:
        return request.remote_addr

CIDR 匹配算法

企业客户不会只允许单个IP,而是整个IP段。CIDR(无类别域间路由)是标准表示法:

  • 192.168.1.0/24:表示 192.168.1.0 到 192.168.1.255(256个IP)
  • 10.0.0.0/8:表示 10.0.0.0 到 10.255.255.255(1600万个IP)

我需要实现CIDR匹配算法:

import ipaddress

def is_ip_allowed(client_ip: str, allowed_ips: list) -> bool:
    """
    检查IP是否在白名单中
    
    Args:
        client_ip: 客户端IP
        allowed_ips: 允许的IP列表,支持单个IP或CIDR
    
    Returns:
        bool: 是否允许访问
    """
    try:
        client_ip_obj = ipaddress.ip_address(client_ip)
        
        for allowed in allowed_ips:
            # 检查是否是CIDR格式
            if '/' in allowed:
                # CIDR范围匹配
                network = ipaddress.ip_network(allowed, strict=False)
                if client_ip_obj in network:
                    return True
            else:
                # 单个IP匹配
                if str(client_ip_obj) == allowed:
                    return True
        
        return False
        
    except ValueError:
        # IP地址格式错误
        return False

# 测试
print(is_ip_allowed("192.168.1.50", ["192.168.1.0/24"]))  # True
print(is_ip_allowed("192.168.2.50", ["192.168.1.0/24"]))  # False
print(is_ip_allowed("10.0.5.10", ["10.0.0.0/8"]))        # True
print(is_ip_allowed("8.8.8.8", ["8.8.8.8"]))              # True

数据库设计

-- 添加IP白名单字段(JSON格式,存储多个IP/CIDR)
ALTER TABLE urls ADD COLUMN allowed_ips TEXT DEFAULT NULL;

-- 添加拒绝IP列表(黑名单)
ALTER TABLE urls ADD COLUMN denied_ips TEXT DEFAULT NULL;

-- 添加IP访问日志(用于审计)
CREATE TABLE ip_access_logs (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    short_code VARCHAR(50) NOT NULL,
    ip_address VARCHAR(45) NOT NULL,
    access_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    allowed BOOLEAN NOT NULL,
    FOREIGN KEY (short_code) REFERENCES urls(short_code)
);

-- 添加索引
CREATE INDEX idx_ip_logs_short_code ON ip_access_logs(short_code);
CREATE INDEX idx_ip_logs_time ON ip_access_logs(access_time);

IP 白名单实现

import json
from flask import request, abort

@app.route('/<short_code>', methods=['GET'])
def redirect_with_ip_check(short_code: str):
    """带IP检查的短链接跳转"""
    
    # 查询短链接信息
    result = db.query("""
        SELECT long_url, password_hash, 
               allowed_ips, denied_ips
        FROM urls 
        WHERE short_code = ?
    """, (short_code,))
    
    if not result:
        return render_template('404.html'), 404
    
    url_info = result[0]
    
    # 获取客户端真实IP
    client_ip = get_client_ip()
    
    # 先检查黑名单(优先级高)
    if url_info['denied_ips']:
        denied_ips = json.loads(url_info['denied_ips'])
        if is_ip_allowed(client_ip, denied_ips):
            # 记录访问日志
            log_ip_access(short_code, client_ip, allowed=False)
            abort(403, description="您的IP地址被禁止访问")
    
    # 检查白名单
    if url_info['allowed_ips']:
        allowed_ips = json.loads(url_info['allowed_ips'])
        if not is_ip_allowed(client_ip, allowed_ips):
            # 记录访问日志
            log_ip_access(short_code, client_ip, allowed=False)
            abort(403, description="您的IP地址不在白名单中")
    
    # 记录成功访问
    log_ip_access(short_code, client_ip, allowed=True)
    
    # 密码检查(如果设置了密码)
    if url_info['password_hash']:
        auth_key = f'auth_{short_code}'
        if not session.get(auth_key):
            return render_template('password_input.html', short_code=short_code)
    
    # 跳转
    return redirect(url_info['long_url'], code=302)

def log_ip_access(short_code: str, ip: str, allowed: bool):
    """记录IP访问日志"""
    db.execute("""
        INSERT INTO ip_access_logs (short_code, ip_address, allowed)
        VALUES (?, ?, ?)
    """, (short_code, ip, allowed))

创建带IP限制的短链接

def create_short_url_with_ip_restriction(
    long_url: str,
    allowed_ips: list = None,
    denied_ips: list = None
) -> str:
    """
    创建带IP限制的短链接
    
    Args:
        long_url: 原始长链接
        allowed_ips: 白名单(IP列表或CIDR)
        denied_ips: 黑名单
    
    Returns:
        str: 短码
    """
    short_code = generate_short_code()
    
    # 将IP列表转换为JSON存储
    allowed_ips_json = json.dumps(allowed_ips) if allowed_ips else None
    denied_ips_json = json.dumps(denied_ips) if denied_ips else None
    
    db.execute("""
        INSERT INTO urls (
            short_code, long_url, 
            allowed_ips, denied_ips, 
            created_at
        ) VALUES (?, ?, ?, ?, ?)
    """, (short_code, long_url, allowed_ips_json, denied_ips_json, datetime.now()))
    
    return short_code

# 示例:只允许公司内网访问
short_code = create_short_url_with_ip_restriction(
    long_url="https://internal.company.com/dashboard",
    allowed_ips=["192.168.1.0/24", "10.0.0.0/8"]
)

# 示例:禁止特定IP访问
short_code = create_short_url_with_ip_restriction(
    long_url="https://example.com/sensitive",
    denied_ips=["1.2.3.4", "5.6.7.0/24"]
)

IP 访问统计

@app.route('/<short_code>/stats/ip')
def ip_access_stats(short_code: str):
    """IP访问统计"""
    
    # 查询访问统计
    stats = db.query("""
        SELECT 
            ip_address,
            COUNT(*) as access_count,
            SUM(CASE WHEN allowed = 1 THEN 1 ELSE 0 END) as allowed_count,
            SUM(CASE WHEN allowed = 0 THEN 1 ELSE 0 END) as denied_count,
            MAX(access_time) as last_access
        FROM ip_access_logs
        WHERE short_code = ?
        GROUP BY ip_address
        ORDER BY access_count DESC
        LIMIT 20
    """, (short_code,))
    
    return render_template('ip_stats.html', stats=stats)

访问次数限制 🎫

需求场景

一个营销客户找到我:

“我们要发限时优惠券,只给前100名用户。能不能实现’前100人可以访问,其他人不行’?”

这就是访问次数限制的需求:

  • 限量抢购:只允许前N人访问
  • 试用期限制:免费版只能访问100次
  • 配额管理:防止资源被过度消耗

数据库设计

我在密码保护部分已经添加了相关字段:

-- 访问计数器
ALTER TABLE urls ADD COLUMN access_count INTEGER DEFAULT 0;

-- 最大访问次数(NULL表示无限制)
ALTER TABLE urls ADD COLUMN max_access_count INTEGER DEFAULT NULL;

-- 达到限制后的操作:1=显示提示,2=跳转到备用链接
ALTER TABLE urls ADD COLUMN limit_action INTEGER DEFAULT 1;

-- 备用链接(当达到限制时跳转)
ALTER TABLE urls ADD COLUMN fallback_url TEXT DEFAULT NULL;

访问次数限制实现

from flask import render_template

def check_access_limit(short_code: str) -> tuple:
    """
    检查访问次数限制
    
    Returns:
        tuple: (allowed: bool, action: str)
    """
    result = db.query("""
        SELECT access_count, max_access_count, 
               limit_action, fallback_url
        FROM urls
        WHERE short_code = ?
    """, (short_code,))
    
    if not result:
        return False, "not_found"
    
    url_info = result[0]
    
    # 如果没有设置限制,允许访问
    if not url_info['max_access_count']:
        return True, None
    
    # 检查是否达到限制
    if url_info['access_count'] >= url_info['max_access_count']:
        action = url_info['limit_action']
        fallback_url = url_info['fallback_url']
        
        if action == 2 and fallback_url:
            # 跳转到备用链接
            return False, f"redirect:{fallback_url}"
        else:
            # 显示提示页面
            return False, "show_limit_page"
    
    return True, None

@app.route('/<short_code>', methods=['GET'])
def redirect_with_access_limit(short_code: str):
    """带访问次数限制的短链接跳转"""
    
    # 检查访问次数限制
    allowed, action = check_access_limit(short_code)
    
    if not allowed:
        if action.startswith("redirect:"):
            # 跳转到备用链接
            fallback_url = action.split(":", 1)[1]
            return redirect(fallback_url, code=302)
        elif action == "show_limit_page":
            # 显示限制页面
            return render_template('access_limit_exceeded.html')
        else:
            # 其他情况返回404
            return render_template('404.html'), 404
    
    # 查询链接信息
    result = db.query("""
        SELECT long_url, password_hash, allowed_ips
        FROM urls
        WHERE short_code = ?
    """, (short_code,))
    
    if not result:
        return render_template('404.html'), 404
    
    url_info = result[0]
    
    # ... 其他检查(密码、IP等)...
    
    # 更新访问计数
    db.execute("""
        UPDATE urls 
        SET access_count = access_count + 1 
        WHERE short_code = ?
    """, (short_code,))
    
    # 跳转
    return redirect(url_info['long_url'], code=302)

限制页面模板

<!-- templates/access_limit_exceeded.html -->
<!DOCTYPE html>
<html>
<head>
    <title>访问次数已用完</title>
    <style>
        .limit-container {
            max-width: 500px;
            margin: 100px auto;
            text-align: center;
            padding: 40px;
            border-radius: 8px;
            box-shadow: 0 2px 10px rgba(0,0,0,0.1);
        }
        .icon {
            font-size: 60px;
            margin-bottom: 20px;
        }
        h1 {
            color: #333;
            margin-bottom: 15px;
        }
        p {
            color: #666;
            line-height: 1.6;
        }
    </style>
</head>
<body>
    <div class="limit-container">
        <div class="icon">🎫</div>
        <h1>访问次数已达上限</h1>
        <p>
            抱歉,该短链接的访问次数已达到上限。<br>
            可能是因为名额已满或资源配额已用完。
        </p>
        <p>
            如有疑问,请联系链接创建者。
        </p>
    </div>
</body>
</html>

创建限次访问的短链接

def create_short_url_with_access_limit(
    long_url: str,
    max_access: int,
    fallback_url: str = None
) -> str:
    """
    创建限次访问的短链接
    
    Args:
        long_url: 原始长链接
        max_access: 最大访问次数
        fallback_url: 达到限制后的备用链接
    
    Returns:
        str: 短码
    """
    short_code = generate_short_code()
    
    db.execute("""
        INSERT INTO urls (
            short_code, long_url, 
            max_access_count, fallback_url,
            limit_action, created_at
        ) VALUES (?, ?, ?, ?, ?, ?)
    """, (short_code, long_url, max_access, fallback_url, 2, datetime.now()))
    
    return short_code

# 示例:前100人可以访问
short_code = create_short_url_with_access_limit(
    long_url="https://shop.com/coupon-early-bird",
    max_access=100,
    fallback_url="https://shop.com/coupon-expired"
)

实时访问计数查询

@app.route('/<short_code>/stats/access')
def access_count_stats(short_code: str):
    """查询访问计数"""
    
    result = db.query("""
        SELECT 
            access_count,
            max_access_count,
            CASE 
                WHEN max_access_count IS NULL THEN '无限制'
                WHEN access_count >= max_access_count THEN '已达上限'
                ELSE '可用'
            END as status,
            CASE 
                WHEN max_access_count IS NOT NULL 
                THEN ROUND((access_count * 100.0 / max_access_count), 2)
                ELSE NULL
            END as usage_percentage
        FROM urls
        WHERE short_code = ?
    """, (short_code,))
    
    if not result:
        return {"error": "短链接不存在"}, 404
    
    return {
        "short_code": short_code,
        "access_count": result[0]['access_count'],
        "max_access_count": result[0]['max_access_count'],
        "status": result[0]['status'],
        "usage_percentage": result[0]['usage_percentage']
    }

时间窗口限制 ⏰

需求场景

又有一个企业客户提出需求:

“我们的内部链接,只在工作时间可以访问,下班后不行。”

这就是时间窗口限制

  • 工作时间限制:只在周一到周五的9:00-18:00可访问
  • 活动时间限制:限时促销,只在一个日期范围内有效
  • 试用期限制:新用户注册后7天内可访问

数据库设计

-- 生效时间(UTC时间戳)
ALTER TABLE urls ADD COLUMN valid_from TIMESTAMP DEFAULT NULL;

-- 失效时间(UTC时间戳)
ALTER TABLE urls ADD COLUMN valid_until TIMESTAMP DEFAULT NULL;

-- 允许访问的时间段(JSON格式)
ALTER TABLE urls ADD COLUMN allowed_hours TEXT DEFAULT NULL;
-- 格式:{"monday": [{"start": "09:00", "end": "18:00"}], ...}

-- 时区设置
ALTER TABLE urls ADD COLUMN timezone VARCHAR(50) DEFAULT 'UTC';

时间范围限制

from datetime import datetime, timezone
import pytz

def is_within_time_range(short_code: str) -> bool:
    """检查当前时间是否在允许的时间范围内"""
    
    result = db.query("""
        SELECT valid_from, valid_until, timezone
        FROM urls
        WHERE short_code = ?
    """, (short_code,))
    
    if not result:
        return False
    
    url_info = result[0]
    
    # 获取当前时间(考虑时区)
    tz = pytz.timezone(url_info['timezone'] or 'UTC')
    now = datetime.now(tz)
    
    # 检查生效时间
    if url_info['valid_from']:
        valid_from = url_info['valid_from']
        if valid_from.tzinfo is None:
            valid_from = pytz.utc.localize(valid_from)
        if now < valid_from:
            return False
    
    # 检查失效时间
    if url_info['valid_until']:
        valid_until = url_info['valid_until']
        if valid_until.tzinfo is None:
            valid_until = pytz.utc.localize(valid_until)
        if now > valid_until:
            return False
    
    return True

每周时间段限制

import json
from datetime import time

def is_within_allowed_hours(short_code: str) -> bool:
    """检查当前时间是否在允许的每周时间段内"""
    
    result = db.query("""
        SELECT allowed_hours, timezone
        FROM urls
        WHERE short_code = ?
    """, (short_code,))
    
    if not result:
        return True  # 没有限制则允许
    
    url_info = result[0]
    
    if not url_info['allowed_hours']:
        return True
    
    # 获取当前时间
    tz = pytz.timezone(url_info['timezone'] or 'UTC')
    now = datetime.now(tz)
    
    # 解析允许的时间段
    allowed_hours = json.loads(url_info['allowed_hours'])
    
    # 获取当前是周几(0=周一,6=周日)
    weekday = now.weekday()
    
    # 获取当前时间
    current_time = now.time()
    
    # 检查是否有定义的时间段
    weekday_name = ['monday', 'tuesday', 'wednesday', 'thursday', 
                    'friday', 'saturday', 'sunday'][weekday]
    
    if weekday_name not in allowed_hours:
        return False  # 当天不允许访问
    
    # 检查当前时间是否在任意一个允许的时间段内
    for period in allowed_hours[weekday_name]:
        start_time = time.fromisoformat(period['start'])
        end_time = time.fromisoformat(period['end'])
        
        if start_time <= current_time <= end_time:
            return True
    
    return False

创建时间限制的短链接

from datetime import datetime, timedelta

def create_short_url_with_time_limit(
    long_url: str,
    valid_from: datetime = None,
    valid_until: datetime = None,
    allowed_hours: dict = None,
    timezone: str = 'Asia/Shanghai'
) -> str:
    """
    创建带时间限制的短链接
    
    Args:
        long_url: 原始长链接
        valid_from: 生效时间
        valid_until: 失效时间
        allowed_hours: 每周允许的时间段
        timezone: 时区
    
    Returns:
        str: 短码
    """
    short_code = generate_short_code()
    
    # 将时间段转为JSON
    allowed_hours_json = json.dumps(allowed_hours) if allowed_hours else None
    
    db.execute("""
        INSERT INTO urls (
            short_code, long_url, 
            valid_from, valid_until,
            allowed_hours, timezone,
            created_at
        ) VALUES (?, ?, ?, ?, ?, ?, ?)
    """, (short_code, long_url, valid_from, valid_until, 
          allowed_hours_json, timezone, datetime.now()))
    
    return short_code

# 示例1:限时优惠券(2024年1月1日到1月7日)
short_code = create_short_url_with_time_limit(
    long_url="https://shop.com/new-year-sale",
    valid_from=datetime(2024, 1, 1, 0, 0, 0),
    valid_until=datetime(2024, 1, 7, 23, 59, 59),
    timezone='Asia/Shanghai'
)

# 示例2:只在工作时间访问
short_code = create_short_url_with_time_limit(
    long_url="https://internal.company.com/dashboard",
    allowed_hours={
        'monday': [{'start': '09:00', 'end': '18:00'}],
        'tuesday': [{'start': '09:00', 'end': '18:00'}],
        'wednesday': [{'start': '09:00', 'end': '18:00'}],
        'thursday': [{'start': '09:00', 'end': '18:00'}],
        'friday': [{'start': '09:00', 'end': '18:00'}]
    },
    timezone='Asia/Shanghai'
)

完整的时间检查流程

@app.route('/<short_code>', methods=['GET'])
def redirect_with_time_check(short_code: str):
    """带时间检查的短链接跳转"""
    
    # 检查时间范围
    if not is_within_time_range(short_code):
        return render_template('invalid_time.html', reason="不在有效时间范围内")
    
    # 检查每周时间段
    if not is_within_allowed_hours(short_code):
        return render_template('invalid_time.html', reason="当前时间不允许访问")
    
    # ... 其他检查 ...
    
    # 跳转
    return redirect(url_info['long_url'], code=302)

访问控制方案对比 📊

我已经实现了四种访问控制方案,每种都有其适用场景:

方案实现难度安全性性能影响适用场景
密码保护⭐⭐ 中等⭐⭐⭐ 中等⭐⭐ 低内部文档、私密分享
IP白名单⭐⭐ 中等⭐⭐⭐⭐ 高⭐⭐ 低企业内网、地域限制
访问次数限制⭐ 简单⭐⭐ 低⭐ 低限量抢购、试用版
时间窗口限制⭐⭐⭐ 较难⭐⭐ 低⭐ 低限时活动、工作时间

组合使用

这些方案可以组合使用:

# 示例:企业内部链接
# - 只允许公司IP访问
# - 只在工作时间
# - 最多访问1000次
short_code = create_short_url(
    long_url="https://internal.company.com/reports",
    allowed_ips=["192.168.1.0/24"],
    allowed_hours={"monday": [{"start": "09:00", "end": "18:00"}], ...},
    max_access_count=1000
)

性能考虑

访问控制会增加响应延迟,我做了优化:

  1. 数据库索引:在 short_code 上建索引,快速查询
  2. 缓存Session:密码验证后缓存,避免重复查询
  3. IP匹配优化:使用高效的CIDR匹配算法
  4. 异步日志:访问日志异步写入,不影响主流程

下一步:批量管理 ⏭️

单个链接的访问控制已经实现了。但企业客户又提出新问题:

“我们有1000个内部链接,能不能批量设置IP白名单?” “每个月都要更新密码,能不能批量操作?”

单个链接管理已经不够了,我需要实现批量管理功能。

下一节,我会实现:

  • 批量设置访问控制
  • 批量更新密码
  • 批量导入导出
  • 权限组管理

(继续下一节:批量操作)