纯钧(ChunJun,原名FlinkX)框架学习

目录

一、背景

二、概念

三、特性

四、工作原理

五、快速开始

1.数据同步任务模版

kafka to kudu

mysql to hive 

2.数据同步执行命令

flinkx老版本命令参数:

flinkx老版本执行命令: 

chunjun新版本执行命令:(明显看出命令还是减少了很多的,更简便易用了)

六、dolphinscheduler集成chunjun

五、快速开始

1.数据同步任务模版

flinkx使用和datax配置差不多,配置好输入输出的json

kafka to kudu

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "streamreader",
          "parameter": {
            "column": [
              {
                "name": "int_field",
                "type": "int"
              },
              {
                "name": "byte_field",
                "type": "byte"
              },
              {
                "name": "short_field",
                "type": "smallint"
              },
              {
                "name": "long_field",
                "type": "bigint"
              },
              {
                "name": "binary_field",
                "type": "binary"
              },
              {
                "name": "string_field",
                "type": "string"
              },
              {
                "name": "bool_field",
                "type": "boolean"
              },
              {
                "name": "float_field",
                "type": "float"
              },
              {
                "name": "double_field",
                "type": "double"
              }
            ],
            "sliceRecordCount": [
              100
            ]
          }
        },
        "writer": {
          "parameter": {
            "kerberos": {
              "keytab": "/Users/wtz/dtstack/conf/kerberos/eng-cdh/hive3.keytab",
              "principal": "hive/eng-cdh3@DTSTACK.COM",
              "krb5Conf": "/Users/wtz/dtstack/conf/kerberos/eng-cdh/krb5.conf"
            },
            "column": [
              {
                "name": "int_field",
                "type": "int32"
              },
              {
                "name": "byte_field",
                "type": "int8"
              },
              {
                "name": "short_field",
                "type": "int16"
              },
              {
                "name": "long_field",
                "type": "int64"
              },
              {
                "name": "binary_field",
                "type": "binary"
              },
              {
                "name": "string_field",
                "type": "string"
              },
              {
                "name": "bool_field",
                "type": "bool"
              },
              {
                "name": "float_field",
                "type": "float"
              },
              {
                "name": "double_field",
                "type": "double"
              }
            ],
            "masters": "eng-cdh1:7051",
            "table": "table_name",
            "flushMode": "manual_flush",
            "writeMode": "append",
            "batchSizeBytes": 1048576
          },
          "name": "kuduwriter"
        }
      }
    ],
    "setting": {
        "speed": {
               "channel": 1,
               "bytes": 0
                 },
        "errorLimit": {
               "record": 10000,
               "percentage": 100
                 },
        "dirty": {
                "path": "/tmp",
                "hadoopConfig": {
                    "fs.default.name": "hdfs://ns1",
                    "dfs.nameservices": "ns1",
                    "dfs.ha.namenodes.ns1": "nn1,nn2",
                    "dfs.namenode.rpc-address.ns1.nn1": "node02:9000",
                    "dfs.namenode.rpc-address.ns1.nn2": "node03:9000",
                    "dfs.ha.automatic-failover.enabled": "true",
                    "dfs.client.failover.proxy.provider.ns1": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
                    "fs.hdfs.impl.disable.cache": "true"
                }
            },
        "restore": {
               "isRestore": false,
               "isStream": false
      }
    }
  }
}

mysql to hive

{
  "job": {
    "content": [
      {
        "reader": {
          "parameter" : {
            "username" : "username",
            "password" : "password",
            "cat" : "insert,delete,update",
            "jdbcUrl" : "jdbc:mysql://ip:3308/tudou?useSSL=false",
            "host" : "ip",
            "port" : 3308,
            "start" : {
            },
            "table" : [ "tudou.kudu" ],
            "splitUpdate" : false,
            "pavingData" : true
          },
          "name" : "binlogreader"
        },
        "writer": {
          "name" : "hivewriter",
          "parameter" : {
            "jdbcUrl" : "jdbc:hive2://ip:10000/tudou",
            "username" : "",
            "password" : "",
            "fileType" : "text",
            "fieldDelimiter" : ",",
            "writeMode" : "overwrite",
            "compress" : "",
            "charsetName" : "UTF-8",
            "maxFileSize" : 1073741824,
            "analyticalRules" : "test_${schema}_${table}",
            "schema" : "tudou",
            "tablesColumn" : "{\"kudu\":[{\"comment\":\"\",\"type\":\"varchar\",\"key\":\"type\"},{\"comment\":\"\",\"type\":\"varchar\",\"key\":\"schema\"},{\"comment\":\"\",\"type\":\"varchar\",\"key\":\"table\"},{\"comment\":\"\",\"type\":\"bigint\",\"key\":\"ts\"},{\"part\":false,\"comment\":\"\",\"type\":\"INT\",\"key\":\"before_id\"},{\"comment\":\"\",\"type\":\"INT\",\"key\":\"after_id\",\"part\":false},{\"part\":false,\"comment\":\"\",\"type\":\"VARCHAR\",\"key\":\"before_name\"},{\"comment\":\"\",\"type\":\"VARCHAR\",\"key\":\"after_name\",\"part\":false},{\"part\":false,\"comment\":\"\",\"type\":\"INT\",\"key\":\"before_age\"},{\"comment\":\"\",\"type\":\"INT\",\"key\":\"after_age\",\"part\":false}]}",
            "partition" : "pt",
            "partitionType" : "MINUTE",
            "defaultFS" : "hdfs://ns",
            "hadoopConfig" : {
              "dfs.ha.namenodes.ns": "nn1,nn2",
              "fs.defaultFS": "hdfs://ns",
              "dfs.namenode.rpc-address.ns.nn2": "ip:9000",
              "dfs.client.failover.proxy.provider.ns": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
              "dfs.namenode.rpc-address.ns.nn1": "ip:9000",
              "dfs.nameservices": "ns",
              "fs.hdfs.impl.disable.cache": "true",
              "hadoop.user.name": "root",
              "fs.hdfs.impl": "org.apache.hadoop.hdfs.DistributedFileSystem"
            }
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 1
      }
    }
  }
}

2.数据同步执行命令

flinkx老版本命令参数:

model
描述:执行模式,也就是flink集群的工作模式
local: 本地模式
standalone: 独立部署模式的flink集群
yarn: yarn模式的flink集群,需要提前在yarn上启动一个flink session,使用默认名称"Flink session cluster"
必选:否
默认值:local

job
描述:数据同步任务描述文件的存放路径;该描述文件中使用json字符串存放任务信息。
必选:是
默认值:无

pluginRoot
描述:插件根目录地址,也就是打包后产生的pluginRoot目录。
必选:是
默认值:无

flinkconf
描述:flink配置文件所在的目录(单机模式下不需要),如/hadoop/flink-1.4.0/conf
必选:否
默认值:无

yarnconf
描述:Hadoop配置文件(包括hdfs和yarn)所在的目录(单机模式下不需要),如/hadoop/etc/hadoop
必选:否
默认值:无

flinkx老版本执行命令: 

以本地模式启动数据同步任务
bin/flinkx -mode local -job /Users/softfly/company/flink-data-transfer/jobs/task_to_run.json -pluginRoot /Users/softfly/company/flink-data-transfer/plugins -confProp "{"flink.checkpoint.interval":60000,"flink.checkpoint.stateBackend":"/flink_checkpoint/"}" -s /flink_checkpoint/0481473685a8e7d22e7bd079d6e5c08c/chk-*

以standalone模式启动数据同步任务
bin/flinkx -mode standalone -job /Users/softfly/company/flink-data-transfer/jobs/oracle_to_oracle.json  -pluginRoot /Users/softfly/company/flink-data-transfer/plugins -flinkconf /hadoop/flink-1.4.0/conf -confProp "{"flink.checkpoint.interval":60000,"flink.checkpoint.stateBackend":"/flink_checkpoint/"}" -s /flink_checkpoint/0481473685a8e7d22e7bd079d6e5c08c/chk-*

以yarn模式启动数据同步任务
bin/flinkx -mode yarn -job /Users/softfly/company/flinkx/jobs/mysql_to_mysql.json  -pluginRoot /opt/dtstack/flinkplugin/syncplugin -flinkconf /opt/dtstack/myconf/conf -yarnconf /opt/dtstack/myconf/hadoop -confProp "{"flink.checkpoint.interval":60000,"flink.checkpoint.stateBackend":"/flink_checkpoint/"}" -s /flink_checkpoint/0481473685a8e7d22e7bd079d6e5c08c/chk-*

chunjun新版本执行命令:(明显看出命令还是减少了很多的,更简便易用了)

以本地模式启动数据同步任务
进入到chunjun-dist 目录,执行命令
sh bin/chunjun-local.sh  -job chunjun-examples/json/stream/stream.json


以standalone模式启动数据同步任务
1. 启动Flink Standalone环境
sh $FLINK_HOME/bin/start-cluster.sh
启动成功后默认端口为8081,我们可以访问当前机器的8081端口进入standalone的flink web ui
2. 提交任务
进入到本地chunjun-dist目录,执行命令
sh bin/chunjun-standalone.sh -job chunjun-examples/json/stream/stream.json

以yarn模式启动数据同步任务
1. 启动Yarn Session环境
Yarn Session 模式依赖Flink 和 Hadoop 环境,需要在提交机器中提前设置好HADOOPHOME和
FLINK_HOME,我们需要使用yarn-session -t参数上传chunjun-dist
cd $FLINK_HOME/bin
./yarn-session -t $CHUNJUN_HOME -d
2. 提交任务
通过yarn web ui 查看session 对应的application $SESSION_APPLICATION_ID,进入到本地chunjun-dist目录,执行命令
sh ./bin/chunjun-yarn-session.sh -job chunjun-examples/json/stream/stream.json -confProp {\"yarn.application.id\":\"SESSION_APPLICATION_ID\"}

六、dolphinscheduler集成chunjun

dolphinscheduler工具栏集成chunjun,本来不支持的,7天前有位好心的大佬更新了相关代码🙏🙏🙏

好人一生平安!!!集成好之后可以自定义好json文件,直接拖拽chunjun组件配置实时数据同步任务啦~

下载地址:https://github.com/apache/dolphinscheduler/tree/dev/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun

ps:我们开发小哥最近比较任性,rm -rf了什么东东,dolphinssheduler挂掉了暂时没法用,具体的安装文档修好之后再整理叭🌸

未经允许不得转载:木盒主机 » 纯钧(ChunJun,原名FlinkX)框架学习

赞 (0)

相关推荐

    暂无内容!