目录
一、背景
二、概念
三、特性
四、工作原理
五、快速开始
1.数据同步任务模版
kafka to kudu
mysql to hive
2.数据同步执行命令
flinkx老版本命令参数:
flinkx老版本执行命令:
chunjun新版本执行命令:(明显看出命令还是减少了很多的,更简便易用了)
六、dolphinscheduler集成chunjun
五、快速开始
1.数据同步任务模版
flinkx使用和datax配置差不多,配置好输入输出的json
kafka to kudu
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column": [
{
"name": "int_field",
"type": "int"
},
{
"name": "byte_field",
"type": "byte"
},
{
"name": "short_field",
"type": "smallint"
},
{
"name": "long_field",
"type": "bigint"
},
{
"name": "binary_field",
"type": "binary"
},
{
"name": "string_field",
"type": "string"
},
{
"name": "bool_field",
"type": "boolean"
},
{
"name": "float_field",
"type": "float"
},
{
"name": "double_field",
"type": "double"
}
],
"sliceRecordCount": [
100
]
}
},
"writer": {
"parameter": {
"kerberos": {
"keytab": "/Users/wtz/dtstack/conf/kerberos/eng-cdh/hive3.keytab",
"principal": "hive/eng-cdh3@DTSTACK.COM",
"krb5Conf": "/Users/wtz/dtstack/conf/kerberos/eng-cdh/krb5.conf"
},
"column": [
{
"name": "int_field",
"type": "int32"
},
{
"name": "byte_field",
"type": "int8"
},
{
"name": "short_field",
"type": "int16"
},
{
"name": "long_field",
"type": "int64"
},
{
"name": "binary_field",
"type": "binary"
},
{
"name": "string_field",
"type": "string"
},
{
"name": "bool_field",
"type": "bool"
},
{
"name": "float_field",
"type": "float"
},
{
"name": "double_field",
"type": "double"
}
],
"masters": "eng-cdh1:7051",
"table": "table_name",
"flushMode": "manual_flush",
"writeMode": "append",
"batchSizeBytes": 1048576
},
"name": "kuduwriter"
}
}
],
"setting": {
"speed": {
"channel": 1,
"bytes": 0
},
"errorLimit": {
"record": 10000,
"percentage": 100
},
"dirty": {
"path": "/tmp",
"hadoopConfig": {
"fs.default.name": "hdfs://ns1",
"dfs.nameservices": "ns1",
"dfs.ha.namenodes.ns1": "nn1,nn2",
"dfs.namenode.rpc-address.ns1.nn1": "node02:9000",
"dfs.namenode.rpc-address.ns1.nn2": "node03:9000",
"dfs.ha.automatic-failover.enabled": "true",
"dfs.client.failover.proxy.provider.ns1": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
"fs.hdfs.impl.disable.cache": "true"
}
},
"restore": {
"isRestore": false,
"isStream": false
}
}
}
}
mysql to hive
{
"job": {
"content": [
{
"reader": {
"parameter" : {
"username" : "username",
"password" : "password",
"cat" : "insert,delete,update",
"jdbcUrl" : "jdbc:mysql://ip:3308/tudou?useSSL=false",
"host" : "ip",
"port" : 3308,
"start" : {
},
"table" : [ "tudou.kudu" ],
"splitUpdate" : false,
"pavingData" : true
},
"name" : "binlogreader"
},
"writer": {
"name" : "hivewriter",
"parameter" : {
"jdbcUrl" : "jdbc:hive2://ip:10000/tudou",
"username" : "",
"password" : "",
"fileType" : "text",
"fieldDelimiter" : ",",
"writeMode" : "overwrite",
"compress" : "",
"charsetName" : "UTF-8",
"maxFileSize" : 1073741824,
"analyticalRules" : "test_${schema}_${table}",
"schema" : "tudou",
"tablesColumn" : "{\"kudu\":[{\"comment\":\"\",\"type\":\"varchar\",\"key\":\"type\"},{\"comment\":\"\",\"type\":\"varchar\",\"key\":\"schema\"},{\"comment\":\"\",\"type\":\"varchar\",\"key\":\"table\"},{\"comment\":\"\",\"type\":\"bigint\",\"key\":\"ts\"},{\"part\":false,\"comment\":\"\",\"type\":\"INT\",\"key\":\"before_id\"},{\"comment\":\"\",\"type\":\"INT\",\"key\":\"after_id\",\"part\":false},{\"part\":false,\"comment\":\"\",\"type\":\"VARCHAR\",\"key\":\"before_name\"},{\"comment\":\"\",\"type\":\"VARCHAR\",\"key\":\"after_name\",\"part\":false},{\"part\":false,\"comment\":\"\",\"type\":\"INT\",\"key\":\"before_age\"},{\"comment\":\"\",\"type\":\"INT\",\"key\":\"after_age\",\"part\":false}]}",
"partition" : "pt",
"partitionType" : "MINUTE",
"defaultFS" : "hdfs://ns",
"hadoopConfig" : {
"dfs.ha.namenodes.ns": "nn1,nn2",
"fs.defaultFS": "hdfs://ns",
"dfs.namenode.rpc-address.ns.nn2": "ip:9000",
"dfs.client.failover.proxy.provider.ns": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
"dfs.namenode.rpc-address.ns.nn1": "ip:9000",
"dfs.nameservices": "ns",
"fs.hdfs.impl.disable.cache": "true",
"hadoop.user.name": "root",
"fs.hdfs.impl": "org.apache.hadoop.hdfs.DistributedFileSystem"
}
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
2.数据同步执行命令
flinkx老版本命令参数:
model
描述:执行模式,也就是flink集群的工作模式
local: 本地模式
standalone: 独立部署模式的flink集群
yarn: yarn模式的flink集群,需要提前在yarn上启动一个flink session,使用默认名称"Flink session cluster"
必选:否
默认值:local
job
描述:数据同步任务描述文件的存放路径;该描述文件中使用json字符串存放任务信息。
必选:是
默认值:无
pluginRoot
描述:插件根目录地址,也就是打包后产生的pluginRoot目录。
必选:是
默认值:无
flinkconf
描述:flink配置文件所在的目录(单机模式下不需要),如/hadoop/flink-1.4.0/conf
必选:否
默认值:无
yarnconf
描述:Hadoop配置文件(包括hdfs和yarn)所在的目录(单机模式下不需要),如/hadoop/etc/hadoop
必选:否
默认值:无
flinkx老版本执行命令:
以本地模式启动数据同步任务
bin/flinkx -mode local -job /Users/softfly/company/flink-data-transfer/jobs/task_to_run.json -pluginRoot /Users/softfly/company/flink-data-transfer/plugins -confProp "{"flink.checkpoint.interval":60000,"flink.checkpoint.stateBackend":"/flink_checkpoint/"}" -s /flink_checkpoint/0481473685a8e7d22e7bd079d6e5c08c/chk-*
以standalone模式启动数据同步任务
bin/flinkx -mode standalone -job /Users/softfly/company/flink-data-transfer/jobs/oracle_to_oracle.json -pluginRoot /Users/softfly/company/flink-data-transfer/plugins -flinkconf /hadoop/flink-1.4.0/conf -confProp "{"flink.checkpoint.interval":60000,"flink.checkpoint.stateBackend":"/flink_checkpoint/"}" -s /flink_checkpoint/0481473685a8e7d22e7bd079d6e5c08c/chk-*
以yarn模式启动数据同步任务
bin/flinkx -mode yarn -job /Users/softfly/company/flinkx/jobs/mysql_to_mysql.json -pluginRoot /opt/dtstack/flinkplugin/syncplugin -flinkconf /opt/dtstack/myconf/conf -yarnconf /opt/dtstack/myconf/hadoop -confProp "{"flink.checkpoint.interval":60000,"flink.checkpoint.stateBackend":"/flink_checkpoint/"}" -s /flink_checkpoint/0481473685a8e7d22e7bd079d6e5c08c/chk-*
chunjun新版本执行命令:(明显看出命令还是减少了很多的,更简便易用了)
以本地模式启动数据同步任务
进入到chunjun-dist 目录,执行命令
sh bin/chunjun-local.sh -job chunjun-examples/json/stream/stream.json
以standalone模式启动数据同步任务
1. 启动Flink Standalone环境
sh $FLINK_HOME/bin/start-cluster.sh
启动成功后默认端口为8081,我们可以访问当前机器的8081端口进入standalone的flink web ui
2. 提交任务
进入到本地chunjun-dist目录,执行命令
sh bin/chunjun-standalone.sh -job chunjun-examples/json/stream/stream.json
以yarn模式启动数据同步任务
1. 启动Yarn Session环境
Yarn Session 模式依赖Flink 和 Hadoop 环境,需要在提交机器中提前设置好HADOOPHOME和
FLINK_HOME,我们需要使用yarn-session -t参数上传chunjun-dist
cd $FLINK_HOME/bin
./yarn-session -t $CHUNJUN_HOME -d
2. 提交任务
通过yarn web ui 查看session 对应的application $SESSION_APPLICATION_ID,进入到本地chunjun-dist目录,执行命令
sh ./bin/chunjun-yarn-session.sh -job chunjun-examples/json/stream/stream.json -confProp {\"yarn.application.id\":\"SESSION_APPLICATION_ID\"}
六、dolphinscheduler集成chunjun
dolphinscheduler工具栏集成chunjun,本来不支持的,7天前有位好心的大佬更新了相关代码🙏🙏🙏
好人一生平安!!!集成好之后可以自定义好json文件,直接拖拽chunjun组件配置实时数据同步任务啦~
下载地址:https://github.com/apache/dolphinscheduler/tree/dev/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun
ps:我们开发小哥最近比较任性,rm -rf了什么东东,dolphinssheduler挂掉了暂时没法用,具体的安装文档修好之后再整理叭🌸
未经允许不得转载:木盒主机 » 纯钧(ChunJun,原名FlinkX)框架学习