DataX的简单使用
DataX是阿里巴巴开发的用于离线数据同步的工具,它支持在MySQL、Oracle、SqlServer、HDFS、HBase等多个数据库之间进行数据的离线同步。
安装DataX
我们可以直接下载已经打包好的文件
wget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
当然,我们也可以选择从源码编译安装DataX。由于上面的包已经比较旧了,推荐从源码进行安装。
git clone https://github.com/alibaba/DataX.git
因为我们只需要针对一些指定的数据库,所以可以删除pom.xml
文件中我们不需要使用的数据库子模块。我保留的子模块如下
1 | <!-- reader --> |
同时,我在使用中遇到了一个MySQL连接的问题,经确认应该是mysql驱动的问题。所以还需要修改pom.xml
中的MySQL驱动的版本<mysql.driver.version>5.1.34</mysql.driver.version>
。经过上面两步修改之后就可以执行编译命令了
mvn -U clean package assembly:assembly -Dmaven.test.skip=true
执行结果如下
...
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 01:42 min
[INFO] Finished at: 2021-06-09T14:32:08+08:00
[INFO] ------------------------------------------------------------------------
编译生成的包文件位于target/datax.tar.gz
。解压工具包
tar -zxvf datax.tar.gz
cd datax
至此DataX就安装好了,它的目录如下
➜ datax tree -L 1
.
├── bin
├── conf
├── job
├── lib
├── plugin
├── script
└── tmp
7 directories, 0 files
读取MySQL数据并打印
接下来我们创建一个从MySQL读取数据并且把数据打印出来的任务,创建文件job/mysql_print.json
1 | { |
创建好任务文件之后就可以执行同步命令了
➜ datax python bin/datax.py job/mysql_print.json
DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.
...
14 dev 2021-10-09 14:54:46 f9d428b45e882506ce45fb77dbb8e96611554129
15 master 2021-10-09 14:55:41 f9d428b45e882506ce45fb77dbb8e96611554129
20 revert-a8a85b0e 2021-08-03 09:40:09 04b81ff4fbf976328d59d105681b8c05fa4f04b5
22 默认 2021-06-09 10:48:33 04b81ff4fbf976328d59d105681b8c05fa4f04b5
...
2021-06-09 14:39:36.845 [job-0] INFO JobContainer -
任务启动时刻 : 2021-06-09 14:39:25
任务结束时刻 : 2021-06-09 14:39:36
任务总计耗时 : 10s
任务平均流量 : 22B/s
记录写入速度 : 0rec/s
读出记录总数 : 4
读写失败总数 : 0
如上所示,我们已经成功的把MySQL中的4条数据查询并打印出来了。
从MySQL读取数据并写入Elasticsearch
我们再创建一个类似于上面的同步任务,只是这一次我们不打印数据而是把数据同步到Elasticsearch中去。创建job/mysql_2_es.json
文件
1 | {"job": { |
执行结果如下
➜ datax python bin/datax.py job/mysql_2_es.json
DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.
2021-06-09 14:51:45.622 [main] INFO VMInfo - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl
2021-06-09 14:51:45.632 [main] INFO Engine - the machine info =>
osInfo: Oracle Corporation 1.8 25.321-b07
jvmInfo: Mac OS X x86_64 10.15.7
cpu num: 4
totalPhysicalMemory: -0.00G
freePhysicalMemory: -0.00G
maxFileDescriptorCount: -1
currentOpenFileDescriptorCount: -1
GC Names [PS MarkSweep, PS Scavenge]
MEMORY_NAME | allocation_size | init_size
PS Eden Space | 256.00MB | 256.00MB
Code Cache | 240.00MB | 2.44MB
Compressed Class Space | 1,024.00MB | 0.00MB
PS Survivor Space | 42.50MB | 42.50MB
PS Old Gen | 683.00MB | 683.00MB
Metaspace | -0.00MB | 0.00MB
2021-06-09 14:51:45.658 [main] INFO Engine -
{
"content":[
{
"reader":{
"name":"mysqlreader",
"parameter":{
"column":[
"id",
"name",
"update_time",
"message"
],
"connection":[
{
"jdbcUrl":[
"jdbc:mysql://172.19.34.19:3306/bugatti"
],
"table":[
"script_version"
]
}
],
"password":"****",
"username":"root"
}
},
"writer":{
"name":"elasticsearchwriter",
"parameter":{
"accessId":"1",
"accessKey":"*",
"batchSize":2,
"column":[
{
"name":"id",
"type":"id"
},
{
"name":"name",
"type":"keyword"
},
{
"format":"yyyy-MM-dd HH:mm:ss",
"name":"update_time",
"type":"date"
},
{
"name":"message",
"type":"keyword"
}
],
"endpoint":"http://172.19.40.171:9200",
"index":"script-version",
"settings":{
"index":{
"number_of_replicas":1,
"number_of_shards":3
}
},
"type":"_doc"
}
}
}
],
"setting":{
"errorLimit":{
"percentage":0.02,
"record":0
},
"speed":{
"channel":3
}
}
}
2021-06-09 14:51:45.688 [main] WARN Engine - prioriy set to 0, because NumberFormatException, the value is: null
2021-06-09 14:51:45.690 [main] INFO PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0
2021-06-09 14:51:45.690 [main] INFO JobContainer - DataX jobContainer starts job.
2021-06-09 14:51:45.693 [main] INFO JobContainer - Set jobId = 0
2021-06-09 14:51:46.257 [job-0] INFO OriginalConfPretreatmentUtil - Available jdbcUrl:jdbc:mysql://172.19.34.19:3306/bugatti?yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true.
2021-06-09 14:51:46.334 [job-0] INFO OriginalConfPretreatmentUtil - table:[script_version] has columns:[id,name,update_time,message].
2021-06-09 14:51:46.345 [job-0] INFO JobContainer - jobContainer starts to do prepare ...
2021-06-09 14:51:46.347 [job-0] INFO JobContainer - DataX Reader.Job [mysqlreader] do prepare work .
2021-06-09 14:51:46.347 [job-0] INFO JobContainer - DataX Writer.Job [elasticsearchwriter] do prepare work .
2021-06-09 14:51:47.117 [job-0] INFO AbstractJestClient - Setting server pool to a list of 1 servers: [http://172.19.40.171:9200]
2021-06-09 14:51:47.119 [job-0] INFO JestClientFactory - Using single thread/connection supporting basic connection manager
2021-06-09 14:51:47.220 [job-0] INFO JestClientFactory - Using default GSON instance
2021-06-09 14:51:47.220 [job-0] INFO JestClientFactory - Node Discovery disabled...
2021-06-09 14:51:47.220 [job-0] INFO JestClientFactory - Idle connection reaping disabled...
2021-06-09 14:51:47.220 [job-0] INFO JestClientFactory - Authentication cache set for preemptive authentication
2021-06-09 14:51:47.245 [job-0] INFO ESWriter$Job - [{"name":"id","type":"id"},{"name":"name","type":"keyword"},{"format":"yyyy-MM-dd HH:mm:ss","name":"update_time","type":"date"},{"name":"message","type":"keyword"}]
2021-06-09 14:51:47.255 [job-0] INFO ESWriter$Job - index:[script-version], type:[_doc], mappings:[{"_doc":{"properties":{"message":{"type":"keyword"},"name":{"type":"keyword"},"update_time":{"type":"date"}}}}]
2021-06-09 14:51:47.458 [job-0] INFO ESClient - create index script-version
2021-06-09 14:51:47.488 [job-0] INFO ESClient - index [script-version] already exists
2021-06-09 14:51:47.489 [job-0] INFO JobContainer - jobContainer starts to do split ...
2021-06-09 14:51:47.490 [job-0] INFO JobContainer - Job set Channel-Number to 3 channels.
2021-06-09 14:51:47.503 [job-0] INFO JobContainer - DataX Reader.Job [mysqlreader] splits to [1] tasks.
2021-06-09 14:51:47.504 [job-0] INFO JobContainer - DataX Writer.Job [elasticsearchwriter] splits to [1] tasks.
2021-06-09 14:51:47.527 [job-0] INFO JobContainer - jobContainer starts to do schedule ...
2021-06-09 14:51:47.536 [job-0] INFO JobContainer - Scheduler starts [1] taskGroups.
2021-06-09 14:51:47.538 [job-0] INFO JobContainer - Running by standalone Mode.
2021-06-09 14:51:47.549 [taskGroup-0] INFO TaskGroupContainer - taskGroupId=[0] start [1] channels for [1] tasks.
2021-06-09 14:51:47.555 [taskGroup-0] INFO Channel - Channel set byte_speed_limit to -1, No bps activated.
2021-06-09 14:51:47.555 [taskGroup-0] INFO Channel - Channel set record_speed_limit to -1, No tps activated.
2021-06-09 14:51:47.674 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[0] attemptCount[1] is started
2021-06-09 14:51:47.687 [0-0-0-writer] INFO AbstractJestClient - Setting server pool to a list of 1 servers: [http://172.19.40.171:9200]
2021-06-09 14:51:47.687 [0-0-0-reader] INFO CommonRdbmsReader$Task - Begin to read record by Sql: [select id,name,update_time,message from script_version
] jdbcUrl:[jdbc:mysql://172.19.34.19:3306/bugatti?yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true].
2021-06-09 14:51:47.687 [0-0-0-writer] INFO JestClientFactory - Using multi thread/connection supporting pooling connection manager
2021-06-09 14:51:47.709 [0-0-0-writer] INFO JestClientFactory - Using default GSON instance
2021-06-09 14:51:47.710 [0-0-0-writer] INFO JestClientFactory - Node Discovery disabled...
2021-06-09 14:51:47.710 [0-0-0-writer] INFO JestClientFactory - Idle connection reaping disabled...
2021-06-09 14:51:47.711 [0-0-0-writer] INFO JestClientFactory - Authentication cache set for preemptive authentication
2021-06-09 14:51:47.775 [0-0-0-reader] INFO CommonRdbmsReader$Task - Finished read record by Sql: [select id,name,update_time,message from script_version
] jdbcUrl:[jdbc:mysql://172.19.34.19:3306/bugatti?yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true].
2021-06-09 14:51:48.369 [0-0-0-writer] INFO ESWriter$Job - task end, write size :4
2021-06-09 14:51:48.388 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[0] is successed, used[714]ms
2021-06-09 14:51:48.388 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] completed it's tasks.
2021-06-09 14:51:57.722 [job-0] INFO StandAloneJobContainerCommunicator - Total 4 records, 226 bytes | Speed 22B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.074s | Percentage 100.00%
2021-06-09 14:51:57.723 [job-0] INFO AbstractScheduler - Scheduler accomplished all tasks.
2021-06-09 14:51:57.724 [job-0] INFO JobContainer - DataX Writer.Job [elasticsearchwriter] do post work.
2021-06-09 14:51:57.725 [job-0] INFO AbstractJestClient - Setting server pool to a list of 1 servers: [http://172.19.40.171:9200]
2021-06-09 14:51:57.725 [job-0] INFO JestClientFactory - Using single thread/connection supporting basic connection manager
2021-06-09 14:51:57.727 [job-0] INFO JestClientFactory - Using default GSON instance
2021-06-09 14:51:57.727 [job-0] INFO JestClientFactory - Node Discovery disabled...
2021-06-09 14:51:57.727 [job-0] INFO JestClientFactory - Idle connection reaping disabled...
2021-06-09 14:51:57.727 [job-0] INFO JestClientFactory - Authentication cache set for preemptive authentication
2021-06-09 14:51:57.728 [job-0] INFO JobContainer - DataX Reader.Job [mysqlreader] do post work.
2021-06-09 14:51:57.728 [job-0] INFO JobContainer - DataX jobId [0] completed successfully.
2021-06-09 14:51:57.729 [job-0] INFO HookInvoker - No hook invoked, because base dir not exists or is a file: /Users/hourui/Downloads/111111/datax/hook
2021-06-09 14:51:57.735 [job-0] INFO JobContainer -
[total cpu info] =>
averageCpu | maxDeltaCpu | minDeltaCpu
-1.00% | -1.00% | -1.00%
[total gc info] =>
NAME | totalGCCount | maxDeltaGCCount | minDeltaGCCount | totalGCTime | maxDeltaGCTime | minDeltaGCTime
PS MarkSweep | 1 | 1 | 1 | 0.060s | 0.060s | 0.060s
PS Scavenge | 1 | 1 | 1 | 0.031s | 0.031s | 0.031s
2021-06-09 14:51:57.735 [job-0] INFO JobContainer - PerfTrace not enable!
2021-06-09 14:51:57.736 [job-0] INFO StandAloneJobContainerCommunicator - Total 4 records, 226 bytes | Speed 22B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.074s | Percentage 100.00%
2021-06-09 14:51:57.737 [job-0] INFO JobContainer -
任务启动时刻 : 2021-06-09 14:51:45
任务结束时刻 : 2021-06-09 14:51:57
任务总计耗时 : 12s
任务平均流量 : 22B/s
记录写入速度 : 0rec/s
读出记录总数 : 4
读写失败总数 : 0
上面的脚本执行完毕之后我们就可以在Elasticsearch中查询到相关的数据了
curl http://172.19.40.171:9200/script-version/_search
查询结果如下
1 | {"took":3,"timed_out":false,"_shards":{"total":2,"successful":2,"skipped":0,"failed":0},"hits":{"total":{"value":4,"relation":"eq"},"max_score":1.0,"hits":[ |
本文链接:
https://www.nosuchfield.com/2021/06/09/Simple-use-of-DataX/
版权声明:
本博客所有文章均采用
CC BY-NC-SA 4.0 许可协议,转载请注明出处!