一、下载编译

官网坐标: https://gobblin.apache.org/

下载源码 坐标 https://github.com/apache/incubator-gobblin/releases

解压后查看目录下是否存在 gradle/wrapper/gradle-wrapper.jar 文件

若不存在 使用此坐标下载并丢过去 https://github.com/apache/incubator-gobblin/blob/0.12.0/gradle/wrapper/gradle-wrapper.jar

使用./gradlew build -x findbugsMain -x test -x rat -x checkstyleMain 编译项目(因网络因素可能有几次失败)

编译后文件 apache-gobblin-incubating-bin-0.14.0.tar.gz

二、配置编写

文档坐标:https://gobblin.readthedocs.io/en/latest/

启动命令(需指定配置文件目录,工作目录):

./bin/gobblin-standalone.sh start --conf /conf/gobblin  --workdir /data/gobblin

也可 vi ~/.bashrc

export GOBBLIN_JOB_CONFIG_DIR=/conf/gobblin

export GOBBLIN_WORK_DIR=/data/gobblin/

以mysql --> canal --> kafka --> gobblin --> mysql 线为例

task-job配置文件

#job 名称 需唯一必填
job.name=CanalKafkaToMysql
job.group=gobblin
job.description=canal kafka base data to mysql
#是否开启锁,需zk支持
job.lock.enabled=false
 
#kafka相关配置
kafka.brokers=10.174.89.47:9092
#数据源 此处按官网配置将异常
source.class=org.apache.gobblin.source.extractor.extract.kafka.KafkaDeserializerSource
extract.namespace=org.apache.gobblin.extract.kafka
topic.whitelist=dxh_base
#simple.writer.delimiter=\n
#反序列化类型
kafka.deserializer.type=CONFLUENT_JSON
mr.job.max.mappers=1
bootstrap.with.offset=earliest
 
#写入mysql
writer.destination.type=MYSQL
writer.builder.class=org.apache.gobblin.writer.JdbcWriterBuilder
#输出到nohup.out
#writer.builder.class=org.apache.gobblin.writer.ConsoleWriterBuilder
#临时写入目录
writer.staging.dir=/data/gobblin/write-staging
writer.output.dir=/data/gobblin/write-output
 
#转换器 多个转换器按顺序
converter.classes=org.apache.gobblin.converter.avro.JsonIntermediateToAvroConverter
converter.classes=org.apache.gobblin.converter.jdbc.AvroToJdbcEntryConverter
#converter.avro.jdbc.entry_fields_pairs={\"src_fn\":\"firstname\",\"src_ln\":\"lastname\"}
 
 
#最终提交
data.publisher.type=org.apache.gobblin.publisher.JdbcPublisher
data.publisher.final.dir=/data/gobblin/job-out
data.publisher.replace.final.dir=true
 
#jdbc相关信息
jdbc.publisher.url=jdbc:mysql://10.174.89.47:3306
jdbc.publisher.driver=com.mysql.jdbc.Driver
jdbc.publisher.username=dxh
jdbc.publisher.password=Dxh2017$$
jdbc.publisher.database_name=dxh_base
jdbc.publisher.table_name=itemsku_tmp
writer.jdbc.batch_size=1000
 
 
metrics.reporting.file.enabled=true
metrics.log.dir=/data/gobblin/metrics
metrics.reporting.file.suffix=txt
 
state.store.dir=/data/gobblin/state-store
注:jdbc需依赖gobblin-sql-0.14.0.jar 文件,若lib目录下不存在 可在gobblin-modules 找到

流程线:

source  -->  converter  --> writer --> publisher

三、注意事项

1.每个表需要一个topic

2.每个表需要一个job

3.数据导入中会在目标库建临时表用于暂存数据,虽然可通过配置关闭,但有丢数据的可能。

4.若不扩展converter,与canal对接需写各种schema 定义数据,且对sql进行过滤较难实现

5.若扩展converter需注意传输格式及schema

文章目录