DataX学习之HelloWorld

最近去公司的大数据部门轮岗,接的第一个任务就是异构数据的导出项目,趁着机会学习了一下DataX框架,开一个系列来记录一下。

项目背景

既然是异构数据的导出,那肯定少不了数据源和目标源。数据源有hive和kafka,目标源涵盖很多,像hbase,hive,kafka,es等等,之前公司的做法是通过mr任务去跑数据,但是配置很繁琐,而且基本都是祖传配置,需要人工拷贝,所以趁着这次重构,希望在前端展示上做一个产品化的平台来cover这些工作量,并且在后端核心的导出逻辑上使用一些其他方式,抛弃mr任务。

经过研讨之后项目组决定以hive为目标源的项目还是使用mr任务去跑,而以kafka为数据源的项目则使用DataX这个开源框架。

何为DataX

根据Github上的描述,DataX就是一个离线数据的同步工具,来解决异构数据源之间的高效同步。在围观了一下代码之后发现这个项目确实不错,对于我这样一个大数据的新手玩家来说,只需要编写简单的reader和writer插件就可以完成任务,颇有serverless的感觉。

不过DataX还有一个问题,就是貌似只支持单机而不支持集群,所以真正投入生产可能还得配合成熟的调度框架才行。

Hello Datax

作为程序员,当然还是要以代码说话,在研究了半天时间之后,我决定动手写一个DataX的kafkareader插件的小demo用来上手。

对于DataX插件的资料大家可以去看DataX插件开发宝典这篇文章,写的十分详细了。

既然要开发kafkareader,那肯定是要将DataX和kafka整合到一起了,话不多说,先看代码吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
public class KafkaReader extends Reader {

public static class Job extends Reader.Job {

private Configuration originalConfig = null;

@Override
public void init() {
this.originalConfig = super.getPluginJobConf();
}

@Override
public void destroy() {

}

@Override
public List<Configuration> split(int adviceNumber) {
List<Configuration> configurations = new ArrayList<Configuration>();

for (int i = 0; i < adviceNumber; i++) {
configurations.add(this.originalConfig.clone());
}
return configurations;
}
}

public static class Task extends Reader.Task {

private List<String> topics = new ArrayList<String>();

@Override
public void init() {
topics.add("kafka_topic");
}

@Override
public void destroy() {

}

@Override
public void startRead(RecordSender recordSender) {
Properties props = new Properties();
props.put("bootstrap.servers", "clusters_ip");
props.put("group.id", "kzTestGroup");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("max.poll.records", 1000);
props.put("auto.offset.reset", "latest");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(topics);

Record oneRecord = null;

while(true) {
System.out.println("start polling kafka msg...");

ConsumerRecords<String,String> records = consumer.poll(100);
Iterator<ConsumerRecord<String,String>> iterator = records.iterator();
if(!iterator.hasNext()) {
System.out.println("no data...");
//break;
}
System.out.println("msg arriving...");
if(iterator.hasNext()) {
ConsumerRecord<String,String> record = iterator.next();
oneRecord = buildOneRecord(recordSender, record.value());
if (oneRecord != null) {
recordSender.sendToWriter(oneRecord);
}
}

try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

private Record buildOneRecord(RecordSender recordSender, String value) {
System.out.println("msg value => " + value);
Record record = recordSender.createRecord();
record.addColumn(new StringColumn(value));
return record;
}
}

}

这个demo的代码十分简单,根据DataX插件开发宝典的指示,创建一个Reader,并有2个静态内部类Job和Task。

在Task的startRead方法中去创建一个kafka的consumer并且监听topic,将数据封装成DataX所需要的recode并通过recordSender传递出去就可以了。

当然,这个demo是非常简单的一个例子,对于要投入生产的代码,我们需要做的是参数化,比如kafka的集群地址,topic需要用参数的方式配置,还有就是和位点相关的逻辑,是否需要从头开始消费等等。

最后就是Job类的Task方法了,这里我直接使用了adviceNumber去创建多个Task,根据kafka的特别,我个人推荐是直接使用kafka的partition数量去创建Task,比较合理。

后记

这篇文章算是DataX框架学习使用的一个开篇,随着公司项目的进行,肯定会有更多的知识需要学习,文章也会继续更新~