数据同步方案设计
问题
如何设计一套可靠的数据同步系统,实现 MySQL 到 Elasticsearch / Redis / 数据仓库等异构数据源的实时同步?
答案
同步场景
| 场景 | 说明 |
|---|---|
| DB → ES | 搜索引擎索引同步 |
| DB → Redis | 缓存预热与更新 |
| DB → DB | 分库分表数据迁移 |
| DB → 数据仓库 | 离线/实时分析 |
| DB → MQ | 事件驱动解耦 |
方案对比
| 方案 | 实时性 | 侵入性 | 可靠性 | 复杂度 |
|---|---|---|---|---|
| 双写 | 实时 | 高(业务耦合) | 低(易不一致) | 低 |
| MQ 异步 | 秒级 | 中 | 中 | 中 |
| Binlog 订阅 (CDC) | 秒级 | 无侵入 | 高 | 中高 |
| 定时全量/增量同步 | 分钟~小时级 | 低 | 高 | 低 |
推荐方案
生产环境首选 Binlog CDC,对业务代码零侵入。Canal(阿里开源)和 Debezium 是最主流的两个工具。
Binlog CDC 原理
Canal 核心原理:
- 伪装为 MySQL Slave,向 Master 发送 dump 协议
- Master 推送 Binlog 事件流
- Canal 解析 Binlog(ROW 模式),提取变更数据
- 将变更事件投递到 MQ / 直接写入目标
Canal 使用示例
Canal 客户端消费 Binlog 事件
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("canal-server", 11111), "example", "", "");
connector.connect();
connector.subscribe("mydb\\.order.*"); // 订阅 mydb 的 order 相关表
while (true) {
Message message = connector.getWithoutAck(100);
long batchId = message.getId();
if (batchId != -1) {
for (Entry entry : message.getEntries()) {
if (entry.getEntryType() == EntryType.ROWDATA) {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
EventType eventType = rowChange.getEventType();
for (RowData rowData : rowChange.getRowDatasList()) {
if (eventType == EventType.INSERT) {
syncToES(rowData.getAfterColumnsList());
} else if (eventType == EventType.UPDATE) {
updateES(rowData.getAfterColumnsList());
} else if (eventType == EventType.DELETE) {
deleteFromES(rowData.getBeforeColumnsList());
}
}
}
}
connector.ack(batchId); // 确认消费
}
}
Debezium 方案
Debezium MySQL Connector 配置
{
"name": "mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql-host",
"database.port": "3306",
"database.user": "debezium",
"database.password": "password",
"database.server.id": "1001",
"topic.prefix": "dbserver1",
"database.include.list": "mydb",
"table.include.list": "mydb.orders,mydb.users",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schema-changes.mydb"
}
}
Canal vs Debezium
| 对比 | Canal | Debezium |
|---|---|---|
| 厂商 | 阿里巴巴 | Red Hat |
| 支持数据库 | MySQL | MySQL/PostgreSQL/MongoDB/Oracle |
| 传输 | 自带 Client / MQ | Kafka Connect |
| 国内生态 | 成熟 | 较少 |
| Schema 追踪 | 不支持 | 支持 |
数据一致性保障
| 策略 | 说明 |
|---|---|
| ACK 机制 | 消费成功后才确认,失败重试 |
| 幂等处理 | 基于主键做 upsert,重复消费不产生副作用 |
| 定时对账 | 定期全量/增量对比,发现不一致则修复 |
| 监控告警 | 同步延迟超阈值报警 |
常见面试问题
Q1: 为什么不用双写?
答案:
双写(业务代码同时写 DB 和 ES)存在严重问题:
- 一致性难保证:DB 成功但 ES 失败,数据不一致
- 业务耦合高:每个写操作都要加同步逻辑
- 性能下降:写操作链路变长,RT 增加
Q2: Binlog 同步延迟怎么处理?
答案:
- 正常延迟在毫秒~秒级
- 延迟过高时检查:从库同步延迟、Canal 消费积压、目标写入瓶颈
- 降级策略:同步延迟时查主库兜底
Q3: 同步过程中目标库宕机怎么办?
答案:
- MQ 缓冲:Canal → MQ → 目标,MQ 持久化保障不丢
- 位点管理:Canal 记录消费位点(Binlog position / GTID),恢复后从断点续传
- 死信队列:多次重试失败进入死信,人工排查后重新投递
Q4: 如何做数据迁移的全量 + 增量?
答案:
- 记录当前 Binlog 位点
- 全量导出存量数据(mysqldump / SELECT 分批)写入目标
- 从记录的位点开始增量订阅 Binlog
- 增量追上实时后,启动对账校验
- 校验通过,切换读流量到新库