## 一、概览
[Github主页](https://github.com/alibaba/DataX)
DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能。

- 设计理念
为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。
- 当前使用现状
DataX在阿里巴巴集团内被广泛使用,承担了所有大数据的离线同步业务,并已持续稳定运行了6年之久。目前每天完成同步8w多道作业,每日传输数据量超过300TB。
## 二、DataX3.0框架设计

DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。
- Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
- Writer:Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
- Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。
## 三、支持
经过几年积累,DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入。DataX目前支持数据如下:
| 类型 | 数据源 | Reader(读) | Writer(写) |
| :-------------: |:-------------:| :-----:| :-----:|
| RDBMS 关系型数据库 | MySQL| √| √|
| | Oracle | √| √|
| | SQLServer| √| √|
| | PostgreSQL| √| √|
| | DRDS | √| √|
| | 通用RDBMS(支持所有关系型数据库)| √| √|
| 阿里云数仓数据存储 | ODPS| √| √|
| | ADS | | √|
| | OSS| √| √|
| | OCS| √| √|
| NoSQL数据存储| OTS| √| √|
| | Hbase0.94| √| √|
| | Hbase1.1| √| √|
| | Phoenix4.x| √| √|
| | Phoenix5.x| √| √|
| | MongoDB| √| √|
| | Hive| √| √|
| | Cassandra| √| √|
| 无结构化数据存储| TxtFile| √| √|
| | FTP| √| √|
| | HDFS| √ | √|
| | Elasticsearch| | √|
| 时间序列数据库| OpenTSDB| √| |
| | TSDB| √| √|
DataX Framework提供了简单的接口与插件交互,提供简单的插件接入机制,只需要任意加上一种插件,就能无缝对接其他数据源。
## 四、DataX3.0核心架构
DataX 3.0 开源版本支持单机多线程模式完成同步作业运行,本小节按一个DataX作业生命周期的时序图,从整体架构设计非常简要说明DataX各个模块相互关系。

### 核心模块介绍:
1. DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
2. DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
3. 切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
4. 每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—Channel—>Writer的线程来完成任务同步工作。
5. DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0。
### DataX调度流程:
举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张分表的mysql数据同步到odps里面。 DataX的调度决策思路是:
1. DataXJob根据分库分表切分成了100个Task。
2. 根据20个并发,DataX计算共需要分配4个TaskGroup。
3. 4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。
## 五、DataX 3.0六大核心优势
- 可靠的数据质量监控
- 完美解决数据传输个别类型失真问题
DataX旧版对于部分数据类型(比如时间戳)传输一直存在毫秒阶段等数据失真情况,新版本DataX3.0已经做到支持所有的强数据类型,每一种插件都有自己的数据类型转换策略,让数据可以完整无损的传输到目的端。
- 提供作业全链路的流量、数据量运行时监控
DataX3.0运行过程中可以将作业本身状态、数据流量、数据速度、执行进度等信息进行全面的展示,让用户可以实时了解作业状态。并可在作业执行过程中智能判断源端和目的端的速度对比情况,给予用户更多性能排查信息。
- 提供脏数据探测
在大量数据的传输过程中,必定会由于各种原因导致很多数据传输报错(比如类型转换错误),这种数据DataX认为就是脏数据。DataX目前可以实现脏数据精确过滤、识别、采集、展示,为用户提供多种的脏数据处理模式,让用户准确把控数据质量大关!
- 丰富的数据转换功能
DataX作为一个服务于大数据的ETL工具,除了提供数据快照搬迁功能之外,还提供了丰富数据转换的功能,让数据在传输过程中可以轻松完成数据脱敏,补全,过滤等数据转换功能,另外还提供了自动groovy函数,让用户自定义转换函数。详情请看DataX3的transformer详细介绍。
- 精准的速度控制
还在为同步过程对在线存储压力影响而担心吗?新版本DataX3.0提供了包括通道(并发)、记录流、字节流三种流控模式,可以随意控制你的作业速度,让你的作业在库可以承受的范围内达到最佳的同步速度。
```
"speed": {
"channel": 5,
"byte": 1048576,
"record": 10000
}
```
- 强劲的同步性能
ataX3.0每一种读插件都有一种或多种切分策略,都能将作业合理切分成多个Task并行执行,单机多线程执行模型可以让DataX速度随并发成线性增长。在源端和目的端性能都足够的情况下,单个作业一定可以打满网卡。另外,DataX团队对所有的已经接入的插件都做了极致的性能优化,并且做了完整的性能测试。
- 健壮的容错机制
DataX作业是极易受外部因素的干扰,网络闪断、数据源不稳定等因素很容易让同步到一半的作业报错停止。因此稳定性是DataX的基本要求,在DataX 3.0的设计中,重点完善了框架和插件的稳定性。目前DataX3.0可以做到线程级别、进程级别(暂时未开放)、作业级别多层次局部/全局的重试,保证用户的作业稳定运行。
- 线程内部重试
DataX的核心插件都经过团队的全盘review,不同的网络交互方式都有不同的重试策略。
- 线程级别重试
目前DataX已经可以实现TaskFailover,针对于中间失败的Task,DataX框架可以做到整个Task级别的重新调度。
- 极简的使用体验
- 易用
下载即可用,支持linux和windows,只需要短短几步骤就可以完成数据的传输。
- 详细
DataX在运行日志中打印了大量信息,其中包括传输速度,Reader、Writer性能,进程CPU,JVM和GC情况等等。
- 传输过程中打印传输速度、进度等

- 传输过程中会打印进程相关的CPU、JVM等

- 在任务结束之后,打印总体运行情况

## 六、快速开始
### 准备工作
- JDK 1.8或更高版本
- Python(推荐Python2.6.X)
#### 工具部署
- 直接下载DataX工具包:[DataX下载地址](http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz)
1. 下载并上传到服务器上,路径```/opt/dataX```,解压```tar -zxvf datax.tar.gz```,进入解压后的```dataX```。
2. 执行自检脚本,```python ./bin/datax.py ./job/job.json```,

出现这个界面表示安装成功。
#### mysql to mysql
- 创建测试表
- datax1.user
```
CREATE TABLE `datax1`.`user` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`first_name` varchar(50) NULL,
`last_name` varchar(50) NULL,
`create_time` datetime NULL,
`update_time` datetime NULL ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
);
INSERT INTO `datax1`.`user`(`id`, `first_name`, `last_name`, `create_time`, `update_time`) VALUES (1, '张', '三', '2020-09-03 10:54:57', '2020-09-03 10:54:57');
INSERT INTO `datax1`.`user`(`id`, `first_name`, `last_name`, `create_time`, `update_time`) VALUES (2, '王', '五', '2020-09-03 10:55:21', '2020-09-03 10:55:21');
```
- datax2.user
```
CREATE TABLE `datax2`.`user` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`first_name` varchar(50) NULL,
`last_name` varchar(50) NULL,
`create_time` datetime NULL,
`update_time` datetime NULL ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
);
```
- mysql2mysql的模板文件
查看模板
```python ./bin/datax.py -r mysqlreader -w mysqlwriter```
```
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
"id",
"name"
], #所配置的表中需要同步的列名集合,使用JSON的数组描述字段信息。用户使用*代表默认使用所有列配置,例如['*']。
"splitPk": "db_id", #MysqlReader进行数据抽取时,如果指定splitPk,表示用户希望使用splitPk代表的字段进行数据分片,DataX因此会启动并发任务进行数据同步,这样可以大大提供数据同步的效能。
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3306/database"
], #描述的是到对端数据库的JDBC连接信息,使用JSON的数组描述,并支持一个库填写多个连接地址。之所以使用JSON数组描述连接信息,是因为阿里集团内部支持多个IP探测,如果配置了多个,MysqlReader可以依次探测ip的可连接性,直到选择一个合法的IP。如果全部连接失败,MysqlReader报错。 注意,jdbcUrl必须包含在connection配置单元中。对于阿里集团外部使用情况,JSON数组填写一个JDBC连接即可。
"table": [
"table"
], #所选取的需要同步的表。使用JSON的数组描述,因此支持多张表同时抽取。当配置为多张表时,用户自己需保证多张表是同一schema结构,MysqlReader不予检查表是否同一逻辑表。注意,table必须包含在connection配置单元中。
"querySql": [
"select db_id,on_line_flag from db_info where db_id < 10;"
] #在有些业务场景下,where这一配置项不足以描述所筛选的条件,用户可以通过该配置型来自定义筛选SQL。当用户配置了这一项之后,DataX系统就会忽略table,column这些配置型,直接使用这个配置项的内容对数据进行筛选,例如需要进行多表join后同步数据,使用select a,b from table_a join table_b on table_a.id = table_b.id 当用户配置querySql时,MysqlReader直接忽略table、column、where条件的配置,querySql优先级大于table、column、where选项。
}
],
"password": "", #数据源指定用户名的密码
"username": "", #数据源的用户名
"where": "" #筛选条件,MysqlReader根据指定的column、table、where条件拼接SQL,并根据这个SQL进行数据抽取。在实际业务场景中,往往会选择当天的数据进行同步,可以将where条件指定为gmt_create > $bizdate 。注意:不可以将where条件指定为limit 10,limit不是SQL的合法where子句。where条件可以有效地进行业务增量同步。如果不填写where语句,包括不提供where的key或者value,DataX均视作同步全量数据。
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"column": [
"id",
"name"
], #目的表需要写入数据的字段,字段之间用英文逗号分隔。例如: "column": ["id","name","age"]。如果要依次写入全部列,使用*表示, 例如: "column": ["*"]。
"connection": [
{
"jdbcUrl": "jdbc:mysql://127.0.0.1:3306/datax?useUnicode=true&characterEncoding=gbk", #目的数据库的 JDBC 连接信息。作业运行时,DataX 会在你提供的 jdbcUrl 后面追加如下属性:yearIsDateType=false&zeroDateTimeBehavior=convertToNull&rewriteBatchedStatements=true
"table": [
"test"
] #目的表的表名称。支持写入一个或者多个表。当配置为多张表时,必须确保所有表结构保持一致。注意:table 和 jdbcUrl 必须包含在 connection 配置单元中
}
],
"password": "", #目的数据库的密码
"preSql": [
"delete from test"
], #写入数据到目的表前,会先执行这里的标准语句。如果 Sql 中有你需要操作到的表名称,请使用 @table 表示,这样在实际执行 Sql 语句时,会对变量按照实际表名称进行替换。比如你的任务是要写入到目的端的100个同构分表(表名称为:datax_00,datax01, ... datax_98,datax_99),并且你希望导入数据前,先对表中数据进行删除操作,那么你可以这样配置:"preSql":["delete from 表名"],效果是:在执行到每个表写入数据前,会先执行对应的 delete from 对应表名称
"session": [
"set session sql_mode='ANSI'"
], #DataX在获取Mysql连接时,执行session指定的SQL语句,修改当前connection session属性
"username": "", #目的数据库的用户名
"writeMode": "insert" #控制写入数据到目标表采用 insert into 或者 replace into 或者 ON DUPLICATE KEY UPDATE 语句,所有选项:insert/replace/update,默认insert
}
}
}
],
"setting": {
"speed": {
"channel": "" #指定channel数
}
}
}
}
```
- 创建自定义的配置文件
- 创建配置文件目录 ```mkdir json``` 并进入
- 新建mysql2mysql文件,```vim mysql2mysql.json```
```
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
"*"
],
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3306/datax1"
],
"table": [
"user"
]
}
],
"password": "123456",
"username": "root"
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"column": [
"*"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://127.0.0.1:3306/datax2?useUnicode=true&characterEncoding=gbk",
"table": [
"user"
]
}
],
"password": "123456",
"username": "root",
"writeMode": "insert"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
```
- 执行 ```python ../bin/datax.py ./mysql2mysql.json```

DataX3.0的使用