DataX 简介
DataX 是阿里开源的一个数据同步框架,通过 Reader/Writer 插件,能够实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能。
源码和文档我们都可以在 Github 上获得。
DataX 安装部署
前置需求
下载
DataX 的安装有两种方式,可以下载源码自己编译,也可以直接下载官方编译好的 DataX 工具包。方便起见,我们选择后者。
1
$ wget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz -P ~/dl
解压
解压 DataX 工具包到 /usr/local/
目录下。
1
$ sudo tar zxvf ~/dl/datax.tar.gz -D /usr/local
测试
解压后用自检脚本测试一下:
1
2
$ cd /usr/local/datax
$ python bin/datax.py job/job.json
显示如下日志,表明安装成功。
1
2
3
4
5
6
7
8
2020-03-06 16:46:39.875 [job-0] INFO JobContainer -
任务启动时刻 : 2020-03-06 16:46:29
任务结束时刻 : 2020-03-06 16:46:39
任务总计耗时 : 10s
任务平均流量 : 253.91KB/s
记录写入速度 : 10000rec/s
读出记录总数 : 100000
读写失败总数 : 0
自定义任务
DataX 使用 json 作为配置文件,配置文件可以在本地也可以在远程的 http 服务器上面。
配置项简介
配置文件最外层是一个 job,job 包含 setting 和 content 两部分,其中 setting 用于对整个 job 进行配置,包括全局 channel 配置,脏数据配置,限速配置等。content 配置的是数据的 reader 和 writer。
详细介绍见 Github 中相应 reader/writer 的 doc 目录。
从本地文件导入到 Hive
我们可以将数据从本地文件系统/home/hadoop/datax/db.csv导入到Hive数据库当中,本质上是将数据解析,然后写入到HDFS文件系统。 Hive数据库可以自动去加载对应的文件,所以我们需要用到txtfilereader和hdfswriter。这里具体的数据格式大家可以根据自己的表结构进行定义。
假如我们要将下面的 csv 格式的数据导入到 hive 中。 测试数据: db.csv
1
2
3
4
5
6
1,创建用户,1554099545,hdfs,创建用户 test
2,更新用户,1554099546,yarn,更新用户 test1
3,删除用户,1554099547,hdfs,删除用户 test2
4,更新用户,1554189515,yarn,更新用户 test3
5,删除用户,1554199525,hdfs,删除用户 test4
6,创建用户,1554299345,yarn,创建用户 test5
首先要在 hive 中创建对应的库和表。
1
2
3
4
5
6
7
8
9
10
hiev> create database db01;
hive> create table log_dev2(
> id int,
> name string,
> create_time int,
> creator string,
> info string
> )
> row format delimited fields terminated by ','
> stored as orcfile;
接下来从编辑任务的配置文件。 配置文件:file2hive.json
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
{
"job":{
"setting":{
"speed":{
"channel":2
}
},
"content":[
{
"reader":{
"name":"txtfilereader",
"parameter":{
"path":[
"/home/hadoop/datax/db.csv"
],
"encoding":"UTF-8",
"column":[
{
"index":0,
"type":"long"
},
{
"index":1,
"type":"string"
},
{
"index":2,
"type":"long"
},
{
"index":3,
"type":"string"
},
{
"index":4,
"type":"string"
}
],
"fieldDelimiter":","
}
},
"writer":{
"name":"hdfswriter",
"parameter":{
"defaultFS":"hdfs://127.0.0.1:9000",
"fileType":"orc",
"path":"/hive/warehouse/db01.db/log_dev2",
"fileName":"log_dev2.csv",
"column":[
{
"name":"id",
"type":"int"
},
{
"name":"name",
"type":"string"
},
{
"name":"create_time",
"type":"INT"
},
{
"name":"creator",
"type":"string"
},
{
"name":"info",
"type":"string"
}
],
"writeMode":"append",
"fieldDelimiter":",",
"compress":"NONE"
}
}
}
]
}
}
配置完成之后,通过执行 python datax.py file2hive.json
即可。传输完成之后大家可以通过 hdfs 以及 hive 命令验证一下数据是否导入完成。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
$ hadoop fs -find /hive/warehouse/db01.db
/hive/warehouse/db01.db
/hive/warehouse/db01.db/log_dev2
/hive/warehouse/db01.db/log_dev2/log_dev2.csv__07332c07_89ba_4b43_a755_94936fa1243e
$ hive
hive> use db01;
OK
Time taken: 3.618 seconds
hive> select * from log_dev2;
OK
1 创建用户 1554099545 hdfs 创建用户 test0
2 更新用户 1554099546 yarn 更新用户 test1
3 删除用户 1554099547 hdfs 删除用户 test2
4 更新用户 1554189515 yarn 更新用户 test3
5 删除用户 1554199525 hdfs 删除用户 test4
6 创建用户 1554299345 yarn 创建用户 test5
Time taken: 1.465 seconds, Fetched: 6 row(s)
可见数据已经成功导入。
从 Mysql 导入到 Hive
我们可以将数据从 Mysql 数据库导入到 Hive 数据库当中,本质上是通过 JDBC 链接 mysql 查询数据,然后写入到 HDFS 文件系统。 Hive 数据库可以自动去加载对应的文件,所以我们需要用到 mysqlreader 和 hdfswriter。这里具体的数据格式大家可以根据自己的表结构进行定义。
配置文件:mysql2hive.json
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
{
"job":{
"setting":{
"speed":{
"channel":3
},
"errorLimit":{
"record":0,
"percentage":0.02
}
},
"content":[
{
"reader":{
"name":"mysqlreader",
"parameter":{
"username":"root",
"password":"yourpassword",
"column":[
"id",
"name",
"create_time",
"creator",
"info"
],
"where":"creator='${creator}' and create_time>${create_time}",
"connection":[
{
"table":[
"dev_log"
],
"jdbcUrl":[
"jdbc:mysql://127.0.0.1:3306/imooc_dev"
]
}
]
}
},
"writer":{
"name":"hdfswriter",
"parameter":{
"defaultFS":"hdfs://127.0.0.1:9000",
"fileType":"text",
"path":"/hive/warehouse/db01.db/log_dev1",
"fileName":"log_dev3.csv",
"column":[
{
"name":"id",
"type":"int"
},
{
"name":"name",
"type":"string"
},
{
"name":"create_time",
"type":"INT"
},
{
"name":"creator",
"type":"string"
},
{
"name":"info",
"type":"string"
}
],
"writeMode":"append",
"fieldDelimiter":",",
"compress":"GZIP"
}
}
}
]
}
}
值得注意的是,配置文件中我们通过 ${creator}
的方式可以使用变量进行占位,随后通过在命令行中使用 -Dparam=value
的形式传递变量。
1
python datax.py mysql2hive.json -p "-Dcreator=yarn -Dcreate_time=1554099547"
通过这种变量的方式我们可以实现增量同步。