2024/08/14 分享备份。
背景
db-archiver ( https://github.com/databendcloud/db-archiver ) 是我们自研的一个能够实现从 RDBMS 全量或增量 T+1 归档数据的工具,基于 db-archiver 目前可以实现 Mysql Or Tidb 的归档。
在此之前我们先来看一下目前在 databend 的生态中,我们提供实现数据同步的方案大概有这么几种,Flink CDC, kafka connect, debezium,Airbyte ,但是像 Flink CDC 等这几个方案,首先就需要用户已经有了 Flink 或者本身就搭建 kafka 等基础设施,而很多小公司都不会有人力去维护这么一套。或者用户只是想一次性迁移数据到 Databend,这种情况下就要求工具尽量简单,开箱即用,用完即走。能符合这个条件的,在这个图里只有 Datax 符合。
所以 Db-archiver 设计的初衷是为了平替 datax 。DataX 是阿里云 DataWorks数据集成 的开源版本,在阿里内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、databend 等各种异构数据源之间高效的数据同步功能。但是在实施过程中我们发现 Datax 有诸多的痛点,导致用户在数据迁移过程中困难重重, 体验很差。
db-archiver VS Datax
- 打包速度以及可执行文件大小
Datax 是java 写的,需要加载各种 jar 包,整个项目较为臃肿编译速度非常慢,且生成的可执行文件总共有 2 个多 G,使用起来非常灾难。
db-archiver 呢编译速度快,最终可执行文件只有 10M ,十分清爽。
2.开发以及 bug fix 的速度
Datax 的维护方是阿里,该项目目前基本处于一种半维护状态, issue 的解决以及 pr 的合并速度都极慢,所以我们每次 fix 问题之后,都是在 fork 的分支上打包给客户,鉴于第一条中提到的包大小有 2G 之多,整个过程对用户很不友好。
- 指标和日志
db-archiver 增加了非常多的指标和日志,对排查问题很有帮助,基本上哪里有问题,一眼就能定位出来。下面截取了一次同步过程中的部分日志。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| INFO[0011] upload by presigned url cost: 6076 ms INFO[0012] upload by presigned url cost: 7262 ms INFO[0012] thread-7: copy into cost: 1400 ms ingest_databend=IngestData 2024/08/13 15:43:58 thread-7: ingest 39999 rows (4201.005115 rows/s), 23639409 bytes (2482794.023097 bytes/s) INFO[0012] thread-4: copy into cost: 1408 ms ingest_databend=IngestData 2024/08/13 15:43:58 thread-4: ingest 39999 rows (4125.188979 rows/s), 22799430 bytes (2437986.686345 bytes/s) 2024/08/13 15:43:58 Globla speed: total ingested 79998 rows (8315.510453 rows/s), 15279619 bytes (1588262.600387 bytes/s) INFO[0012] condition: (id >= 1439993 and id < 1479992) INFO[0012] thread-2: copy into cost: 1713 ms ingest_databend=IngestData 2024/08/13 15:43:58 thread-2: ingest 39999 rows (4095.170564 rows/s), 22799430 bytes (2420245.803072 bytes/s) 2024/08/13 15:43:58 Globla speed: total ingested 119997 rows (8162.421021 rows/s), 29719259 bytes (1559022.517010 bytes/s) INFO[0012] condition: (id >= 839996 and id < 879995) 2024/08/13 15:43:58 Globla speed: total ingested 119997 rows (8087.586352 rows/s), 44158899 bytes (1544729.094295 bytes/s) INFO[0012] condition: (id >= 439998 and id < 479997) INFO[0012] thread-0: copy into cost: 1824 ms ingest_databend=IngestData 2024/08/13 15:43:58 thread-0: ingest 39999 rows (4035.346363 rows/s), 21726145 bytes (2384889.700537 bytes/s) 2024/08/13 15:43:58 thread-7: extract 39999 rows (0.000000 rows/s) 2024/08/13 15:43:58 thread-4: extract 39999 rows (0.000000 rows/s) 2024/08/13 15:43:58 Globla speed: total ingested 159996 rows (7927.205481 rows/s), 57525254 bytes (1514096.345894 bytes/s) INFO[0013] condition: (id >= 40000 and id < 79999) 2024/08/13 15:43:58 thread-2: extract 39999 rows (0.000000 rows/s) 2024/08/13 15:43:58 thread-0: extract 39999 rows (0.000000 rows/s) INFO[0013] upload by presigned url cost: 8293 ms INFO[0013] thread-3: copy into cost: 1241 ms ingest_databend=IngestData 2024/08/13 15:43:59 thread-3: ingest 39999 rows (3748.537451 rows/s), 22799430 bytes (2215219.768763 bytes/s) 2024/08/13 15:43:59 Globla speed: total ingested 199995 rows (7375.987130 rows/s), 71964894 bytes (1408761.080845 bytes/s) INFO[0013] condition: (id >= 639997 and id < 679996) 2024/08/13 15:43:59 thread-3: extract 39999 rows (40001.375235 rows/s)
|
- 从 MySQL 中抽取 数据的速率
- 每次 presign 的效率
- upload stage
- copy into
- 每个线程的同步速率 (
2024/08/13 15:43:58 thread-2: ingest 39999 rows (4095.170564 rows/s), 22799430 bytes (2420245.803072 bytes/s
))
- 全局的同步速率 (
2024/08/13 15:43:59 Globla speed: total ingested 199995 rows (7375.987130 rows/s), 71964894 bytes (1408761.080845 bytes/s)
)
通过全局的同步速率我们可以确定当前的配置参数是不是最优的,比如 batchSize,thread 数量与机器的配置搭配是不是最优。
|
可执行文件大小 |
开发以及 bug fix 的速度 |
指标和日志 |
速率 |
db-archiver |
10M |
快 |
多 |
200w 数据 2min |
Datax |
2G |
慢 |
少 |
200w 数据 10min |
总之我们自研 db-archiver 之后,整个过程更加的可控,再比如我们刚刚对接的一个客户,他们与 databend Cloud 不在同一个 region,同步数据走公网非常慢的问题,这时候我们增加 userStage 参数让用户创建并指定 external stage,这在 datax 中是无法通过参数配置的,所以这样也能够给到用户最佳的体验。
下面来看下 db-archiver 提供的两种同步模式。
两种模式
第一种是
根据 sourceSplitKey
同步数据
如果源表有自增主键,可以设置 sourceSplitKey
为主键。db-archiver 将根据 sourceSplitKey
按照规则切分数据,并发同步数据到 Databend。这是性能最高的模式。后面会讲这个切分数据的规则。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| { "sourceHost": "127.0.0.1", "sourcePort": 3306, "sourceUser": "root", "sourcePass": "", "sourceDB": "mydb", "sourceTable": "test_table1", "sourceWhereCondition": "id > 0", "sourceSplitKey": "id", "databendDSN": "<https://cloudapp:ajynnyvfk7ue@tn3ftqihs--medium-p8at.gw.aws-us-east-2.default.databend.com:443>", "databendTable": "testSync.test_table1", "batchSize": 40000, "batchMaxInterval": 30, "userStage": "USER STAGE", "maxThread": 10 }
|
根据 sourceSplitTimeKey
同步数据
某些情况下用户的源表没有自增主键,但是有时间列,这个时候可以设置 sourceSplitTimeKey
同步数据。db-archiver 将根据 sourceSplitTimeKey
分割数据。
sourceSplitTimeKey
必须与 timeSplitUnit
一起设置。timeSplitUnit
是切片数据的时间颗粒度,可以是 minute
, hour
, day
。由于数据在时间上的密集度用户是最了解的,用户就可以自定义timeSplitUnit
按时间列分割数据,以达到最合理的同步效率。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| { "sourceHost": "127.0.0.1", "sourcePort": 3306, "sourceUser": "root", "sourcePass": "123456", "sourceDB": "mydb", "sourceTable": "my_table2", "sourceWhereCondition": "t1 >= '2024-06-01' and t1 < '2024-07-01'", "sourceSplitKey": "", "sourceSplitTimeKey": "t1", "timeSplitUnit": "minute", "databendDSN": "<https://cloudapp:password@tn3ftqihs--medium-p8at.gw.aws-us-east-2.default.databend.com:443>", "databendTable": "testSync.my_table2", "batchSize": 2, "batchMaxInterval": 30, "userStage": "~", "deleteAfterSync": false, "maxThread": 10 }
|
压测的过程中按照 sourceSplitTimeKey
同步性能不如按照主键 id,且并发的话会对 MySQL 的性能造成影响,为了保障MySQL 不受影响,所以这个模式不支持并发。
数据切片算法
对于 Databend 的数据同步没有太多可以分享的,核心的链路就是 源表(MySQL) → NDJSON→ Stage→ Copy into→ databend table。
比较核心的是数据切片的算法,这关乎数据同步的准确性、同步的效率,下面就重点讲下这个。
db-archiver 会取源表的主键为 sourceSplitKey
。将根据 sourceSplitKey
分割数据,并行同步数据到 Databend。
sourceSplitTimeKey
用于按时间列分割数据。sourceSplitTimeKey
和 sourceSplitKey
至少需要设置一个。
Sync data according to the sourceSplitKey
下面详细讲一下 db-archiver 按照 id primary key 对数据切片的算法原理:
- 首先用户会传入一个 sourceWhereCondition 作为整体的数据范围圈定,比如 ‘id> 100 and id < 10000000’,这样就可以获取范围内的 min(id) 和 max(id)
- 根据上面拿到的 min, max id,再根据分配的线程数量计算出每个线程需要处理的数据范围:
1 2 3 4 5 6 7 8 9 10
| rangeSize := (maxSplitKey - minSplitKey) / s.cfg.MaxThread for i := 0; i < s.cfg.MaxThread; i++ { lowerBound := minSplitKey + rangeSize*i upperBound := lowerBound + rangeSize if i == s.cfg.MaxThread-1 { // Ensure the last condition includes maxSplitKey upperBound = maxSplitKey } conditions = append(conditions, []int{lowerBound, upperBound}) }
|
注意处理这里的边界条件,当达到最后一个线程的时候,该线程内的 upperBound 直接等于 maxSplitKey,这里线程内的范围是一个全闭区间。
- 将上面得到的每个线程内的数据范围数组,分别再按照 batchSize 切分
1 2 3 4 5 6
| if (minSplitKey + s.cfg.BatchSize - 1) >= allMax { conditions = append(conditions, fmt.Sprintf("(%s >= %d and %s <= %d)", s.cfg.SourceSplitKey, minSplitKey, s.cfg.SourceSplitKey, allMax)) return conditions } conditions = append(conditions, fmt.Sprintf("(%s >= %d and %s < %d)", s.cfg.SourceSplitKey, minSplitKey, s.cfg.SourceSplitKey, minSplitKey+s.cfg.BatchSize-1)) minSplitKey += s.cfg.BatchSize - 1
|
这里也要处理一些边界条件,否则会导致切片范围有误最终导致同步过去的数据量不准确。
对于最后一次 (minSplitKey + s.cfg.BatchSize - 1) >= maxSplitKey,的情况,如果最小的 key 都比整个范围最大的 maxKey 大了,就直接返回。
如果当前的 maxSplitKey 恰好与 maxKey 相等,那这就是最后一批,需要保证右侧区间是闭区间,这样才能保证整个数据是连续的。
其余的情况就是左闭右开的数据区间。最终的切分效果就是:[1,100), [100, 200), [200,300), [300, 400)……, [9900, 10000]
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| for{ ... if (minSplitKey + s.cfg.BatchSize - 1) >= maxSplitKey { if minSplitKey > allMax { return conditions // minkey > allMax return directly } if maxSplitKey == allMax { conditions = append(conditions, fmt.Sprintf("(%s >= %d and %s <= %d)", s.cfg.SourceSplitKey, minSplitKey, s.cfg.SourceSplitKey, maxSplitKey)) // corner case, must <=x<= } else { conditions = append(conditions, fmt.Sprintf("(%s >= %d and %s < %d)", s.cfg.SourceSplitKey, minSplitKey, s.cfg.SourceSplitKey, maxSplitKey)) // other condition is [) } break } }
|
- 在每个线程中,对按照 batchSize 切分后的数据进行同步
1 2 3
| for _, condition := range conditions { // processing }
|
整个切片的流程可以参考下图:
Sync data according to the sourceSplitTimeKey
我们在给客户演示的时候得知,有些表是没有指定自增的 id,这种情况下上面按照 primary key id 进行数据同步的算法就失效了,所以我们需要支持按照时间字段切片同步。
- 跟 id 一样,也是先根据 where condition 确定 min, max time 的数据范围
- 第二步跟 id 有所区别,由于是按照时间字段切片,所以再使用 batchSize 来分割会让问题复杂并且容易出错, 并且数据在时间上的密集度用户是最了解的,所以这里我们引入新的字段
timeSplitUnit
,其取值有:
1 2 3 4 5 6 7 8 9 10 11 12
| switch StringToTimeSplitUnit[c.TimeSplitUnit] { caseMinute: return 10 * time.Minute caseQuarter: return 15 * time.Minute caseHour: return 2 * time.Hour caseDay: return 24 * time.Hour default: return 0 }
|
这样可以让用户根据不同的数据密集程度,来决定用什么时间范围来切分。
1 2 3 4 5 6 7 8 9 10
| if minTime.After(maxTime) { conditions = append(conditions, fmt.Sprintf("(%s >= '%s' and %s <= '%s')", s.cfg.SourceSplitTimeKey, minTime.Format("2006-01-02 15:04:05"), s.cfg.SourceSplitTimeKey, maxTime.Format("2006-01-02 15:04:05"))) break } if minTime.Equal(maxTime) { conditions = append(conditions, fmt.Sprintf("(%s >= '%s' and %s <= '%s')", s.cfg.SourceSplitTimeKey, minTime.Format("2006-01-02 15:04:05"), s.cfg.SourceSplitTimeKey, maxTime.Format("2006-01-02 15:04:05"))) break } conditions = append(conditions, fmt.Sprintf("(%s >= '%s' and %s < '%s')", s.cfg.SourceSplitTimeKey, minTime.Format("2006-01-02 15:04:05"), s.cfg.SourceSplitTimeKey, minTime.Add(s.cfg.GetTimeRangeBySplitUnit()).Format("2006-01-02 15:04:05"))) minTime = minTime.Add(s.cfg.GetTimeRangeBySplitUnit())
|
- 根据 batchSize 分批同步数据
这里取了个巧把 batchSize 切分放到了 SQL 里面,可以结合 offset 可以减小对源端的压力。
具体流程如下图:
演示
…
归档可视化((TODO))
为了进一步方便用户使用,简化数据进入 databend 的流程,我们计划将 db-archiver 可视化。
能够让用户通过在界面上配置同步任务,管理同步任务,点点点就能完成数据从外部迁移到 Databend。
配置页
配置的流程
任务管理页面
同步任务的界面如下:
也可以参考 dataworks 的任务界面:
可以查看目标任务的基本信息及运行情况。
- 任务名称:为您展示任务的名称。单击任务名称,即可进入目标任务的详情页面。
- 任务ID:为您展示任务的ID。
- 运行状态:任务的运行状态,包括Runing, Stop, Success, Fail..等状态。
- 开始运行:任务的开始运行时间。
- 结束运行:任务的结束运行时间。
- 运行时长:任务的运行时长,单位为秒。
- 任务类型:任务的类型。
- 运行速率: 数据导入的速率
- 同步进度:数据同步的进度百分比
同样可以操作任务,对任务进行开始,暂停,继续,结束等操作。
前端直接在 https://github.com/databendcloud/db-archiver 项目中开发,然后通过 go embed 将前端一同打包,这样用户直接使用一个二进制启动项目即可。
存在问题
由于 db-archiver 可以根据用户配置信息使用了并发数据抽取,因此不能严格保证数据一致性:根据splitKey 进行数据切分后,会先后启动多个并发任务完成数据同步。由于多个并发任务相互之间不属于同一个读事务,同时多个并发任务存在时间间隔。因此这份数据并不是完整的
、一致的
数据快照信息。
针对多线程的一致性快照需求,在技术上目前无法实现,只能从工程角度解决,我们提供几个解决思路给用户,用户可以自行选择:
- 使用单线程同步,即不再进行数据切片。缺点是速度比较慢,但是能够很好保证一致性。
- 关闭其他数据写入方,保证当前数据为静态数据,例如,锁表、关闭备库同步等等。缺点是可能影响在线业务。
不过 db-archiver 适用的是归档场景,基本能够保证当前为静态数据。
RoadMap
- 支持 RDBMS 的更多数据源,比如 pg, oracle 等
- 支持分库分表到一张表
- 可视化归档
最后,希望 db-archiver 能够取代 datax ,成为日后我们给客户推荐离线数据同步的最佳方案。
我的分享就到这里,感谢大家。