1. 从stream 流读取数据并打印到控制台
- 进入DataX的安装目录,执行以下命令,查看流读取的配置模板
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| [whybigdata@node01 datax]$ ./bin/datax.py -r streamreader -w streamwriter [whybigdata@node01 bin]$ python datax.py -r streamreader -w streamwriter DataX (DATAX-OPENSOURCE-3.0), From Alibaba ! Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.
Please refer to the streamreader document: https://github.com/alibaba/DataX/blob/master/streamreader/doc/streamreader.md
Please refer to the streamwriter document: https://github.com/alibaba/DataX/blob/master/streamwriter/doc/streamwriter.md
Please save the following configuration as a json file and use python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json to run the job.
{ "job": { "content": [ { "reader": { "name": "streamreader", "parameter": { "column": [], "sliceRecordCount": "" } }, "writer": { "name": "streamwriter", "parameter": { "encoding": "", "print": true } } } ], "setting": { "speed": { "channel": "" } } } }
|
- 根据模板编写配置文件:进入DataX安装目录下的job目录
1 2
| ruby [whybigdata@node01 job]$ vim stream2stream.json
|
添加以下内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| { "job": { "content": [ { "reader": { "name": "streamreader", "parameter": { "column": [ { "type": "long", "value": "18" }, { "type": "string", "value": "zhangsan" } ], "sliceRecordCount": 10 } }, "writer": { "name": "streamwriter", "parameter": { "encoding": "UTF-8", "print": true } } } ], "setting": { "speed": { "channel": 1 } } } }
|
column表示读取的数据字段,需要编写出每个字段的数据类型和相对应的值,数值类型的字段值也需要通过引号“”
引起来,直接使用数字会报错。
1 2
| ruby [whybigdata@node01 job]$ /opt/module/datax/bin/datax.py /opt/module/datax/job/stream2stream.json
|
根据sliceRecordCount
参数值,流输出到控制台有10条数据
2. 读取 MySQL 中的数据存放到 HDFS
2.1 查看官方模板
执行以下命令,查看MySQL读取以及hdfs写出的配置模板
1 2
| ruby [whybigdata@node01 datax]$ ./bin/datax.py -r mysqlreader -w hdfswriter
|
模板如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| { "job": { "content": [ { "reader": { "name": "mysqlreader", "parameter": { "column": [], "connection": [ { "jdbcUrl": [], "table": [] } ], "password": "", "username": "", "where": "" } }, "writer": { "name": "hdfswriter", "parameter": { "column": [], "compress": "", "defaultFS": "", "fieldDelimiter": "", "fileName": "", "fileType": "", "path": "", "writeMode": "" } } } ], "setting": { "speed": { "channel": "" } } } }
|
2.1.1 mysqlreader 参数解析:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| "reader":{ "name":"mysqlreader", "parameter":{ "column":[], "connection":[ { "jdbcUrl":[], "table":[], 【“querySql:[]】 } ], "password:"", "usemname":"", 【"where":""】 【"splitPk":""】 } }
|
参数解释如下:
name
:reader名
column
:需要同步的列名集合,使用JSON数组描述自带信息,*代表所有列
jdbcUrl
:对数据库的DBC连接信息,使用JSON数组描述,支持多个连接地址
table
:需要同步的表,支持多个
querySql
:自定义SQL,配置它后,mysglreader直接忽略table、column、where
password
:数据库用户名对应的密码
usemame
:数据库用户名
where
:筛选条件
splitPK
:数据分片字段,一般是主键,仅支持整型
注意:【】中的参数为可选参数
2.1.2 hdfswriter 参数解析:
1 2 3 4 5 6 7 8 9 10 11 12 13
| "writer":{ "name":"hdfswriter", "parameter":{ "column":[], "compress":"" "defaultFS":"", "fieldDelimiter":"", "fileName":"", "fileType":"", "path":"", "writeMode":"" } }
|
参数解释如下:
name
:writer名
column
:写入数据的字段,其中name指定字段名,type指定类型
compress
:hdfs文件压缩类型,默认不填写意味着没有压缩。
defaultFS
:hdfs文件系统namenode节点地址,格式:hds/p:端口
fieldDelimiter
:字段分隔符
fileName
:写入文件名
fileType
:文件的类型,目前只支持用户配置为 tet
或 orc
path
:存储到Hadoophdfs文件系统的路径信息
writeMode
:hdfswriter写入前数据清理处理模式:
append
:写入前不做任何处理,DataX hdfswriter直接使用filename写入,并保证文件名不冲突。
nonConflict
:如果目录下有fileName前的文件,直接报错。
2.2 准备数据
1 2 3
| mysql> create database datax; mysql> use datax; mysql> create table student(id int,name varchar(20));
|
1 2
| sql mysql> insert into student values(1001,'zhangsan'),(1002,'lisi'),(1003,'wangwu');
|
2.3 编写配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| [whybigdata@node01 datax]$ vim /opt/module/datax/job/mysql2hdfs.json { "job": { "content": [ { "reader": { "name": "mysqlreader", "parameter": { "column": [ "id", "name" ], "connection": [ { "jdbcUrl": [ "jdbc:mysql://node01:3306/datax" ], "table": [ "student" ] } ], "username": "root", "password": "123456" } }, "writer": { "name": "hdfswriter", "parameter": { "column": [ { "name": "id", "type": "int" }, { "name": "name", "type": "string" } ], "defaultFS": "hdfs://node01:9000", "fieldDelimiter": "\t", "fileName": "student.txt", "fileType": "text", "path": "/datax-out", "writeMode": "append" } } } ], "setting": { "speed": { "channel": "1" } } } }
|
2.4 执行任务
1 2 3 4 5 6 7 8 9
| bash复制代码[whybigdata@node01 datax]$ bin/datax.py job/mysql2hdfs.json 2023-01-10 15:54:16.581 [job-0] INFO JobContainer - 任务启动时刻 : 2023-01-10 15:54:04 任务结束时刻 : 2023-01-10 15:54:16 任务总计耗时 : 12s 任务平均流量 : 3B/s 记录写入速度 : 0rec/s 读出记录总数 : 3 读写失败总数 : 0
|
DataX保证数据一致性的原理:⭐⭐⭐⭐⭐
DataX读取MySQL表的数据写入到hdfs过程中,若是并发执行的操作,只有全部成功写入搭配hdfs就算最终成功(此过程中一开始会生成一个临时文件,全部成功之后才会修改临时文件名字、路径);如果有个别失败,那job就直接宣告失败,相对应的并发操作中其他成功的操作也会将对应的临时路径删除掉。
2.5 查看 HDFS
注意:HdfsWriter 实际执行时会在该文件名后添加随机的后缀作为每个线程写入实际文件名。
2.6 关于 HA 的支持
将以下参数写入到writer【parameter】
中即可
1 2 3 4 5 6 7
| "hadoopConfig":{ "dfs.nameservices": "ns", "dfs.ha.namenodes.ns": "nn1,nn2", "dfs.namenode.rpc-address.ns.nn1": "主机名:端口", "dfs.namenode.rpc-address.ns.nn2": "主机名:端口", "dfs.client.failover.proxy.provider.ns":"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" }
|
2.7 导入到Hive
本质上也是通过HDFS来实现的
过程:导入到Hive —> hdfswriter配置文件 —> load data 进入Hive对应的表 | 直接写入到Hive表对应的路径
3. 读取 HDFS 数据写入 MySQL
3. 1 将上个案例上传的文件改名
1 2
| bash [whybigdata@node01 datax]$ hadoop fs -mv /student.txt* /student.txt
|
3.2 查看官方模板
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| [whybigdata@node01 datax]$ python bin/datax.py -r hdfsreader -w mysqlwriter { "job": { "content": [ { "reader": { "name": "hdfsreader", "parameter": { "column": [], "defaultFS": "", "encoding": "UTF-8", "fieldDelimiter": ",", "fileType": "orc", "path": "" } }, "writer": { "name": "mysqlwriter", "parameter": { "column": [], "connection": [ { "jdbcUrl": "", "table": [] } ], "passwoard": "", "preSql": [], "session": [], "username": "", "writeMode": "" } } } ], "setting": { "speed": { "channel": "" } } } }
|
3.3 创建配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| [whybigdata@node01 datax]$ vim job/hdfs2mysql.json { "job": { "content": [ { "reader": { "name": "hdfsreader", "parameter": { "column": ["*"], "defaultFS": "hdfs://node01:9000", "encoding": "UTF-8", "fieldDelimiter": "\t", "fileType": "text", "path": "/student.txt" } }, "writer": { "name": "mysqlwriter", "parameter": { "column": [ "id", "name" ], "connection": [ { "jdbcUrl": "jdbc:mysql://node01:3306/datax", "table": ["student2"] } ], "password": "123456", "username": "root", "writeMode": "insert" } } } ], "setting": { "speed": { "channel": "1" } } } }
|
3.4 在 MySQL 的 datax 数据库中创建 student2
1 2
| sql复制代码mysql> use datax; mysql> create table student2(id int,name varchar(20));
|
3.5 执行任务
1 2 3 4 5 6 7 8 9
| bash复制代码[whybigdata@node01 datax]$ bin/datax.py job/hdfs2mysql.json 2023-01-10 16:21:53.616 [job-0] INFO JobContainer - 任务启动时刻 : 2023-01-10 16:21:41 任务结束时刻 : 2023-01-10 16:21:53 任务总计耗时 : 11s 任务平均流量 : 3B/s 记录写入速度 : 0rec/s 读出记录总数 : 3 读写失败总数 : 0
|
3.6 查看 student2 表
1 2 3 4 5 6 7 8 9
| sql复制代码mysql> select * from student2; + + + | id | name | + + + | 1001 | zhangsan | | 1002 | lisi | | 1003 | wangwu | + + + 3 rows in set (0.00 sec)
|
结束!