导航菜单

容灾备份

场景

最坏的情况发生了:整个机房断电。

事件:
- 数据中心电力故障
- UPS 只能维持 30 分钟
- 发电机启动失败
- 整个机房离线

影响:
- 北京地域完全不可用
- 用户请求失败
- 数据可能丢失

解决方案:多活容灾

1. 多地域架构

之前架构:
北京(主) → 上海(从)

   广州(从)

问题:北京故障,无法写入

多活架构:
北京(主写入)
   ↓ ⇅
上海(主写入)
   ↓ ⇅
广州(主写入)

每个地域都可以:
- 处理读写请求
- 同步数据到其他地域
- 独立运行

2. 数据同步策略

class MultiMasterReplication:
    """多主复制管理"""

    def __init__(self):
        self.regions = ['beijing', 'shanghai', 'guangzhou']
        self.local_region = get_local_region()

    def write_with_replication(self, table, data):
        """写入并复制到其他地域"""

        # 生成全局唯一 ID
        data['id'] = self.generate_global_id()
        data['region'] = self.local_region
        data['timestamp'] = time.time()

        # 写入本地数据库
        self.write_to_local_db(table, data)

        # 异步复制到其他地域
        self.replicate_to_other_regions(table, data)

    def generate_global_id(self):
        """生成全局唯一 ID"""

        # 使用雪花算法
        timestamp = int(time.time() * 1000)
        region_id = self.regions.index(self.local_region)
        sequence = self.get_sequence()

        return (timestamp << 22) | (region_id << 12) | sequence

    def replicate_to_other_regions(self, table, data):
        """异步复制到其他地域"""

        for region in self.regions:
            if region == self.local_region:
                continue

            # 发布到消息队列
            message_queue.publish({
                'type': 'replication',
                'target_region': region,
                'table': table,
                'data': data
            })

    def consume_replications(self):
        """消费复制消息(写入其他地域)"""

        while True:
            try:
                message = message_queue.consume(timeout=1)

                if message and message['type'] == 'replication':
                    if message['target_region'] == self.local_region:
                        # 写入本地数据库
                        self.write_to_local_db(
                            message['table'],
                            message['data']
                        )

            except Exception as e:
                logging.error(f'Replication error: {e}')
                time.sleep(1)

multi_master = MultiMasterReplication()

3. 冲突解决策略

class ConflictResolver:
    """冲突解决器"""

    def resolve_conflict(self, record1, record2):
        """解决冲突记录"""

        # 策略 1:时间戳优先
        if record1['timestamp'] > record2['timestamp']:
            return record1
        else:
            return record2

        # 策略 2:地域优先级
        region_priority = {
            'beijing': 3,
            'shanghai': 2,
            'guangzhou': 1
        }

        if region_priority[record1['region']] > region_priority[record2['region']]:
            return record1
        else:
            return record2

        # 策略 3:业务逻辑
        return self.resolve_by_business_logic(record1, record2)

    def resolve_by_business_logic(self, record1, record2):
        """根据业务逻辑解决冲突"""

        # 例如:用户资料,保留最后更新的字段
        resolved = {}

        for field in record1.keys():
            if field == 'updated_at':
                # 取最新的更新时间
                resolved[field] = max(record1[field], record2[field])
            elif field in ['name', 'email']:
                # 重要字段,保留时间戳新的
                if record1['timestamp'] > record2['timestamp']:
                    resolved[field] = record1[field]
                else:
                    resolved[field] = record2[field]
            else:
                # 其他字段,合并非空值
                resolved[field] = record1[field] or record2[field]

        return resolved

conflict_resolver = ConflictResolver()

4. DNS 故障转移

class DNSFailover:
    """DNS 故障转移"""

    def __init__(self):
        self.health_status = {}
        self.dns_provider = DNSProvider()

    def check_region_health(self):
        """检查各地域健康状态"""

        for region in ['beijing', 'shanghai', 'guangzhou']:
            try:
                response = requests.get(
                    f'https://{region}.kuaiyizhi.cn/health',
                    timeout=5
                )

                self.health_status[region] = (
                    response.status_code == 200
                )

            except Exception:
                self.health_status[region] = False

    def update_dns_records(self):
        """更新 DNS 记录"""

        healthy_regions = [
            r for r, healthy in self.health_status.items()
            if healthy
        ]

        if not healthy_regions:
            send_alert('All regions are down!')
            return

        # 更新 DNS 记录(轮询到健康地域)
        self.dns_provider.update_records(
            name='api.kuaiyizhi.cn',
            records=[
                {'type': 'A', 'value': f'{r}.kuaiyizhi.cn'}
                for r in healthy_regions
            ]
        )

# 定时检查和更新
def monitor_and_update_dns():
    """监控并更新 DNS"""

    dns_failover = DNSFailover()

    while True:
        dns_failover.check_region_health()
        dns_failover.update_dns_records()
        time.sleep(60)

5. 数据备份策略

class BackupManager:
    """备份管理器"""

    def backup_to_external_storage(self):
        """备份到外部存储"""

        # 1. 数据库备份
        self.backup_database()

        # 2. Redis 备份
        self.backup_redis()

        # 3. 配置文件备份
        self.backup_configs()

    def backup_database(self):
        """备份数据库到 S3"""

        # 使用 mysqldump
        dump_file = f'/tmp/mysql_backup_{int(time.time())}.sql'

        subprocess.run([
            'mysqldump',
            '--all-databases',
            '--single-transaction',
            '--master-data=2',
            f'--result-file={dump_file}'
        ])

        # 压缩
        compressed_file = dump_file + '.gz'
        subprocess.run(['gzip', dump_file])

        # 上传到 S3
        s3_client.upload_file(
            compressed_file,
            'api-backups',
            f'mysql/{os.path.basename(compressed_file)}'
        )

        # 删除本地文件
        os.remove(compressed_file)

    def backup_redis(self):
        """备份 Redis 到 S3"""

        # 触发 Redis 后台保存
        redis_client.save()

        # 等待保存完成
        while redis_client.lastsave() < time.time() - 3600:
            time.sleep(1)

        # 复制 RDB 文件
        rdb_file = '/var/lib/redis/dump.rdb'
        backup_file = f'/tmp/redis_backup_{int(time.time())}.rdb'

        shutil.copy2(rdb_file, backup_file)

        # 上传到 S3
        s3_client.upload_file(
            backup_file,
            'api-backups',
            f'redis/{os.path.basename(backup_file)}'
        )

        os.remove(backup_file)

# 定时备份
def scheduled_backup():
    """定时备份"""

    backup_manager = BackupManager()
    backup_manager.backup_to_external_storage()

    logging.info('Backup completed')

# 每天凌晨 3 点备份
scheduler.add_job(
    scheduled_backup,
    CronTrigger(hour=3, minute=0),
    id='daily_backup'
)

灾难演练

def disaster_recovery_drill():
    """灾难恢复演练"""

    logging.info('Starting disaster recovery drill')

    # 1. 模拟北京地域故障
    simulate_region_failure('beijing')

    # 2. 验证 DNS 故障转移
    health = check_api_health()
    assert health['status'] == 'healthy', 'DNS failover failed'

    # 3. 验证其他地域正常工作
    for region in ['shanghai', 'guangzhou']:
        response = requests.get(
            f'https://{region}.kuaiyizhi.cn/health'
        )
        assert response.status_code == 200, f'{region} is down'

    # 4. 恢复北京地域
    restore_region('beijing')

    logging.info('Disaster recovery drill completed successfully')

效果验证

优化前

机房故障:
- 北京地域完全不可用
- 无法处理新请求
- 无法写入数据
- 持续时间:数小时

优化后

机房故障:
- DNS 自动切换到其他地域
- 其他地域继续服务
- 数据最终一致
- 用户基本无感知

本节小结

✅ 完成的工作:

  • 实现了多主架构
  • 实现了冲突解决
  • 实现了 DNS 故障转移
  • 实现了数据备份

✅ 效果:

  • 机房故障不影响整体
  • 实现了真正的多活
  • 数据安全有保障

🎯 完成!我已经学会了构建高可用系统

搜索