使用 DataX 进行数据导入导出

大数据平台建设系列

Posted by 王灿辉 on 2020-03-06

DataX 简介

DataX 是阿里开源的一个数据同步框架,通过 Reader/Writer 插件,能够实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能。

源码和文档我们都可以在 Github 上获得。

DataX 安装部署

前置需求

下载

DataX 的安装有两种方式,可以下载源码自己编译,也可以直接下载官方编译好的 DataX 工具包。方便起见,我们选择后者。

1
$ wget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz -P ~/dl

解压

解压 DataX 工具包到 /usr/local/ 目录下。

1
$ sudo tar zxvf ~/dl/datax.tar.gz -D /usr/local

测试

解压后用自检脚本测试一下:

1
2
$ cd /usr/local/datax
$ python bin/datax.py job/job.json

显示如下日志,表明安装成功。

1
2
3
4
5
6
7
8
2020-03-06 16:46:39.875 [job-0] INFO  JobContainer -
任务启动时刻                    : 2020-03-06 16:46:29
任务结束时刻                    : 2020-03-06 16:46:39
任务总计耗时                    :                 10s
任务平均流量                    :          253.91KB/s
记录写入速度                    :          10000rec/s
读出记录总数                    :              100000
读写失败总数                    :                   0

自定义任务

DataX 使用 json 作为配置文件,配置文件可以在本地也可以在远程的 http 服务器上面。

配置项简介

配置文件最外层是一个 job,job 包含 setting 和 content 两部分,其中 setting 用于对整个 job 进行配置,包括全局 channel 配置,脏数据配置,限速配置等。content 配置的是数据的 reader 和 writer。

详细介绍见 Github 中相应 reader/writer 的 doc 目录。

从本地文件导入到 Hive

我们可以将数据从本地文件系统/home/hadoop/datax/db.csv导入到Hive数据库当中,本质上是将数据解析,然后写入到HDFS文件系统。 Hive数据库可以自动去加载对应的文件,所以我们需要用到txtfilereader和hdfswriter。这里具体的数据格式大家可以根据自己的表结构进行定义。

假如我们要将下面的 csv 格式的数据导入到 hive 中。 测试数据: db.csv

1
2
3
4
5
6
1,创建用户,1554099545,hdfs,创建用户 test
2,更新用户,1554099546,yarn,更新用户 test1
3,删除用户,1554099547,hdfs,删除用户 test2
4,更新用户,1554189515,yarn,更新用户 test3
5,删除用户,1554199525,hdfs,删除用户 test4
6,创建用户,1554299345,yarn,创建用户 test5

首先要在 hive 中创建对应的库和表。

1
2
3
4
5
6
7
8
9
10
hiev> create database db01;
hive> create table log_dev2(
>     id int,
>     name string,
>     create_time int,
>     creator string,
>     info string
> )
> row format delimited fields terminated by ','
> stored as orcfile;

接下来从编辑任务的配置文件。 配置文件:file2hive.json

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
{
    "job":{
        "setting":{
            "speed":{
                "channel":2
            }
        },
        "content":[
            {
                "reader":{
                    "name":"txtfilereader",
                    "parameter":{
                        "path":[
                            "/home/hadoop/datax/db.csv"
                        ],
                        "encoding":"UTF-8",
                        "column":[
                            {
                                "index":0,
                                "type":"long"
                            },
                            {
                                "index":1,
                                "type":"string"
                            },
                            {
                                "index":2,
                                "type":"long"
                            },
                            {
                                "index":3,
                                "type":"string"
                            },
                            {
                                "index":4,
                                "type":"string"
                            }
                        ],
                        "fieldDelimiter":","
                    }
                },
                "writer":{
                    "name":"hdfswriter",
                    "parameter":{
                        "defaultFS":"hdfs://127.0.0.1:9000",
                        "fileType":"orc",
                        "path":"/hive/warehouse/db01.db/log_dev2",
                        "fileName":"log_dev2.csv",
                        "column":[
                            {
                                "name":"id",
                                "type":"int"
                            },
                            {
                                "name":"name",
                                "type":"string"
                            },
                            {
                                "name":"create_time",
                                "type":"INT"
                            },
                            {
                                "name":"creator",
                                "type":"string"
                            },
                            {
                                "name":"info",
                                "type":"string"
                            }
                        ],
                        "writeMode":"append",
                        "fieldDelimiter":",",
                        "compress":"NONE"
                    }
                }
            }
        ]
    }
}

配置完成之后,通过执行 python datax.py file2hive.json 即可。传输完成之后大家可以通过 hdfs 以及 hive 命令验证一下数据是否导入完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
$ hadoop fs -find /hive/warehouse/db01.db
/hive/warehouse/db01.db
/hive/warehouse/db01.db/log_dev2
/hive/warehouse/db01.db/log_dev2/log_dev2.csv__07332c07_89ba_4b43_a755_94936fa1243e
$ hive
hive> use db01;
OK
Time taken: 3.618 seconds
hive> select * from log_dev2;
OK
1       创建用户        1554099545      hdfs    创建用户 test0
2       更新用户        1554099546      yarn    更新用户 test1
3       删除用户        1554099547      hdfs    删除用户 test2
4       更新用户        1554189515      yarn    更新用户 test3
5       删除用户        1554199525      hdfs    删除用户 test4
6       创建用户        1554299345      yarn    创建用户 test5
Time taken: 1.465 seconds, Fetched: 6 row(s)

可见数据已经成功导入。

从 Mysql 导入到 Hive

我们可以将数据从 Mysql 数据库导入到 Hive 数据库当中,本质上是通过 JDBC 链接 mysql 查询数据,然后写入到 HDFS 文件系统。 Hive 数据库可以自动去加载对应的文件,所以我们需要用到 mysqlreader 和 hdfswriter。这里具体的数据格式大家可以根据自己的表结构进行定义。

配置文件:mysql2hive.json

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
{
    "job":{
        "setting":{
            "speed":{
                "channel":3
            },
            "errorLimit":{
                "record":0,
                "percentage":0.02
            }
        },
        "content":[
            {
                "reader":{
                    "name":"mysqlreader",
                    "parameter":{
                        "username":"root",
                        "password":"yourpassword",
                        "column":[
                            "id",
                            "name",
                            "create_time",
                            "creator",
                            "info"
                        ],
                        "where":"creator='${creator}' and create_time>${create_time}",
                        "connection":[
                            {
                                "table":[
                                    "dev_log"
                                ],
                                "jdbcUrl":[
                                    "jdbc:mysql://127.0.0.1:3306/imooc_dev"
                                ]
                            }
                        ]
                    }
                },
                "writer":{
                    "name":"hdfswriter",
                    "parameter":{
                        "defaultFS":"hdfs://127.0.0.1:9000",
                        "fileType":"text",
                        "path":"/hive/warehouse/db01.db/log_dev1",
                        "fileName":"log_dev3.csv",
                        "column":[
                            {
                                "name":"id",
                                "type":"int"
                            },
                            {
                                "name":"name",
                                "type":"string"
                            },
                            {
                                "name":"create_time",
                                "type":"INT"
                            },
                            {
                                "name":"creator",
                                "type":"string"
                            },
                            {
                                "name":"info",
                                "type":"string"
                            }
                        ],
                        "writeMode":"append",
                        "fieldDelimiter":",",
                        "compress":"GZIP"
                    }
                }
            }
        ]
    }
}

值得注意的是,配置文件中我们通过 ${creator} 的方式可以使用变量进行占位,随后通过在命令行中使用 -Dparam=value 的形式传递变量。

1
python datax.py mysql2hive.json -p "-Dcreator=yarn -Dcreate_time=1554099547"

通过这种变量的方式我们可以实现增量同步。



赞赏支持
微信赞赏
微信赞赏
支付宝
支付宝