数据同步
场景
多地域部署后,遇到新问题。
用户反馈:
"为什么我在北京刚注册,上海还查不到我的账号?"
技术分析:
- 数据同步有延迟
- 主从复制不及时
- 跨地域网络不稳定问题分析
数据流向:
北京(主库) → 上海(从库)
│
└→ 广州(从库)
问题:
1. 主从延迟:几秒到几分钟
2. 网络不稳定:偶发中断
3. 数据一致性:用户期望实时解决方案
1. 优化主从复制
# MySQL 主库配置
[mysqld]
server-id = 1
log-bin = mysql-bin
binlog_format = ROW
binlog_cache_size = 4M
max_binlog_cache_size = 512M
# 并发复制
binlog_group_commit = 10
# 半同步复制
plugin_load = "rpl_semi_sync_master=semisync_master.so"
rpl_semi_sync_master_enabled = 1
rpl_semi_sync_master_timeout = 1000 # 1 秒超时
# 从库配置
[mysqld]
server-id = 2 # 每个从库不同
relay-log = mysql-relay-bin
read-only = 1
# 并行复制
slave_parallel_type = LOGICAL_CLOCK
slave_parallel_workers = 4
# 半同步从库
plugin_load = "rpl_semi_sync_slave=semisync_slave.so"
rpl_semi_sync_slave_enabled = 12. 应用层路由优化
class DatabaseRouter:
"""智能数据库路由"""
def __init__(self):
self.master_db = connect_to('beijing-master')
self.slave_dbs = {
'beijing': connect_to('beijing-slave'),
'shanghai': connect_to('shanghai-slave'),
'guangzhou': connect_to('guangzhou-slave')
}
self.last_write_time = 0
self.replication_lag_threshold = 2 # 2 秒
def get_connection(self, read_only=False):
"""获取数据库连接"""
if not read_only:
# 写操作:使用主库
self.last_write_time = time.time()
return self.master_db
# 读操作:优先使用本地从库
local_region = get_user_region()
local_slave = self.slave_dbs[local_region]
# 检查是否刚写过(避免读到旧数据)
if time.time() - self.last_write_time < self.replication_lag_threshold:
return self.master_db
# 检查从库延迟
lag = self.check_replication_lag(local_slave)
if lag > self.replication_lag_threshold:
# 延迟过高,使用主库
return self.master_db
return local_slave
def check_replication_lag(self, slave):
"""检查从库延迟"""
try:
cursor = slave.cursor()
cursor.execute('SHOW SLAVE STATUS')
status = cursor.fetchone()
lag = status['Seconds_Behind_Master']
return lag if lag is not None else 0
except Exception:
return float('inf')
db_router = DatabaseRouter()
# 使用示例
def get_user(user_id):
"""读取用户(使用从库)"""
conn = db_router.get_connection(read_only=True)
cursor = conn.cursor()
cursor.execute('SELECT * FROM users WHERE id = %s', (user_id,))
return cursor.fetchone()
def update_user(user_id, data):
"""更新用户(使用主库)"""
conn = db_router.get_connection(read_only=False)
cursor = conn.cursor()
cursor.execute('UPDATE users SET ... WHERE id = %s', (user_id,))
conn.commit()3. 最终一致性方案
class EventualConsistencyManager:
"""最终一致性管理器"""
def __init__(self):
self.pending_updates = {}
self.redis = redis.Redis()
def write_with_eventual_consistency(self, user_id, data):
"""写入数据(最终一致)"""
# 1. 写入主库
update_user_in_master(user_id, data)
# 2. 记录待同步更新
update_id = generate_update_id()
self.pending_updates[update_id] = {
'user_id': user_id,
'data': data,
'timestamp': time.time()
}
# 3. 发布更新事件
self.redis.publish('data_updates', json.dumps({
'update_id': update_id,
'user_id': user_id,
'data': data
}))
# 4. 设置短期缓存
cache_key = f'user:{user_id}'
self.redis.setex(cache_key, 60, json.dumps(data))
def subscribe_to_updates(self):
"""订阅数据更新"""
pubsub = self.redis.pubsub()
pubsub.subscribe('data_updates')
for message in pubsub.listen():
if message['type'] == 'message':
update = json.loads(message['data'])
self.sync_to_local_slave(update)
def sync_to_local_slave(self, update):
"""同步到本地从库"""
local_slave = db_router.slave_dbs[get_local_region()]
# 更新本地从库
update_user_in_slave(
local_slave,
update['user_id'],
update['data']
)
# 清理待更新记录
if update['update_id'] in self.pending_updates:
del self.pending_updates[update['update_id']]
consistency_manager = EventualConsistencyManager()
# 启动同步订阅
Thread(target=consistency_manager.subscribe_to_updates, daemon=True).start()监控和告警
主从同步监控
def monitor_replication():
"""监控主从同步状态"""
for region, slave in db_router.slave_dbs.items():
try:
# 检查从库状态
cursor = slave.cursor()
cursor.execute('SHOW SLAVE STATUS')
status = cursor.fetchone()
lag = status['Seconds_Behind_Master']
slave_status = status['Slave_IO_Running']
sql_status = status['Slave_SQL_Running']
# 检查异常
if lag is None or lag > 10:
send_alert(
f'Replication lag in {region}: {lag}s'
)
if slave_status != 'Yes':
send_alert(
f'Slave IO stopped in {region}'
)
if sql_status != 'Yes':
send_alert(
f'Slave SQL stopped in {region}'
)
except Exception as e:
send_alert(
f'Failed to check replication in {region}: {e}'
)
# 定时监控
scheduler.add_job(
monitor_replication,
'interval',
seconds=60,
id='monitor_replication'
)效果验证
优化前
主从延迟:
- 平均延迟:5 秒
- 最大延迟:30 秒
- 影响用户体验优化后
主从延迟:
- 平均延迟:<1 秒
- 最大延迟:3 秒
- 用户体验良好
措施:
- 半同步复制
- 并行复制
- 智能路由
- 最终一致性方案本节小结
✅ 完成的工作:
- 优化了主从复制配置
- 实现了智能数据库路由
- 实现了最终一致性方案
- 添加了同步监控
✅ 效果:
- 主从延迟降低 80%
- 数据一致性提升
- 用户体验改善
⚠️ 下一步:用户访问哪里的服务器?
🎯 下一步:如何实现就近路由?
