MapReduce中的job.setNumReduceTasks(0)原理探究

博客中关于大数据技术的第二篇文章,来分析一下hadoop中必不可少的一环——MapReduce的源码~

起因

为什么会写这篇文章呢,因为前几天在看公司同事写的mr程序,发现其中的git提交记录很有意思,有一条commit叫“去除reduce”,当时的我对MapReduce的理解仅限于官网中的word count这个demo,自然觉得这个很有意思,为什么要去除reduce呢,又是怎么去除的呢?翻看了代码之后发现,其中一行关键的代码就是标题中写到的job.setNumReduceTasks(0),好奇心驱使我一定要了解到这行代码背后的原理,于是开始了MapReduce的源码探究之旅。

源码分析

要分析mr的源码,首先肯定是从入口开始看,即job.waitForCompletion:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public boolean waitForCompletion(boolean verbose
) throws IOException, InterruptedException,

ClassNotFoundException {

if (state == JobState.DEFINE) {
submit();
}
if (verbose) {
monitorAndPrintJob();
} else {
// get the completion poll interval from the client.
int completionPollIntervalMillis =
Job.getCompletionPollInterval(cluster.getConf());
while (!isComplete()) {
try {
Thread.sleep(completionPollIntervalMillis);
} catch (InterruptedException ie) {
}
}
}
return isSuccessful();
}

可以看到最重要的就是sumbit方法,另外可以知道的是,waitForCompletion内部有一个while循环用来等待整个mr程序的运行结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void submit() 
throws IOException, InterruptedException, ClassNotFoundException {

ensureState(JobState.DEFINE);
setUseNewAPI();
connect();
final JobSubmitter submitter =
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException {

return submitter.submitJobInternal(Job.this, cluster);
}
});
state = JobState.RUNNING;
LOG.info("The url to track the job: " + getTrackingURL());
}

submit代码第二行,setUseNewAPI函数如其名,就是使用新的api要运行mr:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private void setUseNewAPI() throws IOException {
int numReduces = conf.getNumReduceTasks();
String oldMapperClass = "mapred.mapper.class";
String oldReduceClass = "mapred.reducer.class";
conf.setBooleanIfUnset("mapred.mapper.new-api",
conf.get(oldMapperClass) == null);
if (conf.getUseNewMapper()) {
String mode = "new map API";
ensureNotSet("mapred.input.format.class", mode);
ensureNotSet(oldMapperClass, mode);
if (numReduces != 0) {
ensureNotSet("mapred.partitioner.class", mode);
} else {
ensureNotSet("mapred.output.format.class", mode);
}
} else {
String mode = "map compatability";
ensureNotSet(INPUT_FORMAT_CLASS_ATTR, mode);
ensureNotSet(MAP_CLASS_ATTR, mode);
if (numReduces != 0) {
ensureNotSet(PARTITIONER_CLASS_ATTR, mode);
} else {
ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode);
}
}
if (numReduces != 0) {
conf.setBooleanIfUnset("mapred.reducer.new-api",
conf.get(oldReduceClass) == null);
if (conf.getUseNewReducer()) {
String mode = "new reduce API";
ensureNotSet("mapred.output.format.class", mode);
ensureNotSet(oldReduceClass, mode);
} else {
String mode = "reduce compatability";
ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode);
ensureNotSet(REDUCE_CLASS_ATTR, mode);
}
}
}

接下去来看submit中的核心代码,submitter.submitJobInternal,submitJobInternal函数比较复杂,其中最关键的2步是创建splits文件并且提交job:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
JobStatus submitJobInternal(Job job, Cluster cluster) 
throws ClassNotFoundException, InterruptedException, IOException {


....

int maps = writeSplits(job, submitJobDir);
conf.setInt(MRJobConfig.NUM_MAPS, maps);
LOG.info("number of splits:" + maps);

....

status = submitClient.submitJob(
jobId, submitJobDir.toString(), job.getCredentials());
if (status != null) {
return status;
} else {
throw new IOException("Could not launch job");
}
} finally {
if (status == null) {
LOG.info("Cleaning up the staging area " + submitJobDir);
if (jtFs != null && submitJobDir != null)
jtFs.delete(submitJobDir, true);

}
}
}

下面我们来看writeSplits函数:

1
2
3
4
5
6
7
8
9
10
11
12
private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
Path jobSubmitDir) throws IOException,

InterruptedException, ClassNotFoundException {

JobConf jConf = (JobConf)job.getConfiguration();
int maps;
if (jConf.getUseNewMapper()) {
maps = writeNewSplits(job, jobSubmitDir);
} else {
maps = writeOldSplits(jConf, jobSubmitDir);
}
return maps;
}

可以看到根据配置的getUseNewMapper这个bool值来判断使用哪种方法去生成splits文件,由上面我们可以知道,只要是执行的waitForCompletion,都是会走newApi。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private <T extends InputSplit>
int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {

Configuration conf = job.getConfiguration();
InputFormat<?, ?> input =
ReflectionUtils.newInstance(job.getInputFormatClass(), conf);

List<InputSplit> splits = input.getSplits(job);
T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);

// sort the splits into order based on size, so that the biggest
// go first
Arrays.sort(array, new SplitComparator());
JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
jobSubmitDir.getFileSystem(conf), array);
return array.length;
}

其中最重要的一行代码为:

1
List<InputSplit> splits = input.getSplits(job);

通过InputFormat去获取splits文件,而这个InputFormat是我们在创建mr任务的时候设置的,我们来一个比较常用的FileInputFormat:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public List<InputSplit> getSplits(JobContext job) throws IOException {
StopWatch sw = new StopWatch().start();
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);

// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>();
List<FileStatus> files = listStatus(job);
for (FileStatus file: files) {
Path path = file.getPath();
long length = file.getLen();
if (length != 0) {
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
if (isSplitable(job, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(blockSize, minSize, maxSize);

long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}

if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else { // not splitable
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
// Save the number of input files for metrics/loadgen
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+ ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
}
return splits;
}

函数中通过isSplitable去判断是否可以切片,对于FileInputFormat来说,默认是可以切片的:

1
2
3
protected boolean isSplitable(JobContext context, Path filename) {
return true;
}

而我们看一下使用最多的TextInputFormat和ParquetInputFormat:

1
2
3
4
5
6
7
8
protected boolean isSplitable(JobContext context, Path file) {
final CompressionCodec codec =
new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
if (null == codec) {
return true;
}
return codec instanceof SplittableCompressionCodec;
}
1
2
3
protected boolean isSplitable(JobContext context, Path filename) {
return ContextUtil.getConfiguration(context).getBoolean(SPLIT_FILES, true);
}

可以看到对于TextInputFormat需要判断压缩格式,而ParquetInputFormat则是通过配置去判断。

我们接着看getSplits方法,如果isSplitable返回true,则会进行文件的切割逻辑:

1
2
3
4
5
6
7
8
9
10
11
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(blockSize, minSize, maxSize);

long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}

就是根据一个固定的值去切分文件而已。

至此我们已经了解了splits文件的生成逻辑,下面我们来看submitJobInternal中的提交job部分:

1
2
status = submitClient.submitJob(
jobId, submitJobDir.toString(), job.getCredentials());

其中submitClient我们可以使用它的子类——YARNRunner,从名字就可以看出,job最终提交到了hadoop的另一个杀手锏——yarn中。

我们的任务最终会通过MapTask和ReduceTask进行执行,下面我们来看MapTask的具体逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException, ClassNotFoundException, InterruptedException {

this.umbilical = umbilical;

if (isMapTask()) {
// If there are no reducers then there won't be any sort. Hence the map
// phase will govern the entire attempt's progress.
if (conf.getNumReduceTasks() == 0) {
mapPhase = getProgress().addPhase("map", 1.0f);
} else {
// If there are reducers then the entire attempt's progress will be
// split between the map phase (67%) and the sort phase (33%).
mapPhase = getProgress().addPhase("map", 0.667f);
sortPhase = getProgress().addPhase("sort", 0.333f);
}
}
TaskReporter reporter = startReporter(umbilical);

boolean useNewApi = job.getUseNewMapper();
initialize(job, getJobID(), reporter, useNewApi);

// check if it is a cleanupJobTask
if (jobCleanup) {
runJobCleanupTask(umbilical, reporter);
return;
}
if (jobSetup) {
runJobSetupTask(umbilical, reporter);
return;
}
if (taskCleanup) {
runTaskCleanupTask(umbilical, reporter);
return;
}

if (useNewApi) {
runNewMapper(job, splitMetaInfo, umbilical, reporter);
} else {
runOldMapper(job, splitMetaInfo, umbilical, reporter);
}
done(umbilical, reporter);
}

其中最重要的是下面这段代码:

1
2
3
4
5
if (useNewApi) {
runNewMapper(job, splitMetaInfo, umbilical, reporter);
} else {
runOldMapper(job, splitMetaInfo, umbilical, reporter);
}

根据上文分析,这里会使用runNewMapper。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
private <INKEY,INVALUE,OUTKEY,OUTVALUE>
void runNewMapper(final JobConf job,
final TaskSplitIndex splitIndex,
final TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, ClassNotFoundException,
InterruptedException {
// make a task context so we can get the classes
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job,
getTaskID(),
reporter);
// make a mapper
org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
(org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
// make the input format
org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
(org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
// rebuild the input split
org.apache.hadoop.mapreduce.InputSplit split = null;
split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
splitIndex.getStartOffset());
LOG.info("Processing split: " + split);

org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
new NewTrackingRecordReader<INKEY,INVALUE>
(split, inputFormat, reporter, taskContext);

job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
org.apache.hadoop.mapreduce.RecordWriter output = null;

// get an output object
if (job.getNumReduceTasks() == 0) {
output =
new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
} else {
output = new NewOutputCollector(taskContext, job, umbilical, reporter);
}

org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE>
mapContext =
new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(),
input, output,
committer,
reporter, split);

org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
mapperContext =
new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(
mapContext);

try {
input.initialize(split, mapperContext);
mapper.run(mapperContext);
mapPhase.complete();
setPhase(TaskStatus.Phase.SORT);
statusUpdate(umbilical);
input.close();
input = null;
output.close(mapperContext);
output = null;
} finally {
closeQuietly(input);
closeQuietly(output, mapperContext);
}
}

这个函数里,终于有我们要排查的问题了:

1
2
3
4
5
6
if (job.getNumReduceTasks() == 0) {
output =
new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
} else {
output = new NewOutputCollector(taskContext, job, umbilical, reporter);
}

如果ruduceTask设置的是0,RecordWriter使用的是NewDirectOutputCollector,否则是NewOutputCollector。

那么这个RecordWriter的作用是什么呢?我们知道,对于一般的Mapper来说,最终都需要调用context.write去写入数据,大家可以debug去看一下这个函数,最终就会调用RecordWriter的write方法,那么下面我们就来看看NewDirectOutputCollector和NewOutputCollector都做了什么:

1
2
3
4
5
6
7
8
9
10
11
out = outputFormat.getRecordWriter(taskContext);

public void write(K key, V value)
throws IOException, InterruptedException {

reporter.progress();
long bytesOutPrev = getOutputBytes(fsStats);
out.write(key, value);
long bytesOutCurr = getOutputBytes(fsStats);
fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
mapOutputRecordCounter.increment(1);
}

首先是NewDirectOutputCollector,可以看到内部就是调用了outputFormat的recordWrite,而如果我们设置了reduceTaskNums为0,一般会配合使用NullOutputFormat:

1
2
3
4
5
6
7
public RecordWriter<K, V> 
getRecordWriter(TaskAttemptContext context) {

return new RecordWriter<K, V>(){
public void write(K key, V value) { }
public void close(TaskAttemptContext context) { }
};
}

可以看到,内部啥都没做,也就是说,如果reduceTaskNums为0并且使用NullOutputFormat,map阶段过后,程序就结束了。

下面再来看NewOutputCollector:

1
2
3
4
5
@Override
public void write(K key, V value) throws IOException, InterruptedException {
collector.collect(key, value,
partitioner.getPartition(key, value, partitions));
}

内部会调用MapOutputBuffer的collect方法,实现就非常复杂了,设置到写文件,排序等操作,而这些操作在mr中有一个专有名词——shuffle。

Mapper运行原理

看完了上面的源码之后,我们可以继续看一下mapper是如何运行的,让我们来看MapperTask的runNewMapper方法的最后一段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private <INKEY,INVALUE,OUTKEY,OUTVALUE>
void runNewMapper(final JobConf job,
final TaskSplitIndex splitIndex,
final TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, ClassNotFoundException,

InterruptedException {

.....

try {
input.initialize(split, mapperContext);
mapper.run(mapperContext);
mapPhase.complete();
setPhase(TaskStatus.Phase.SORT);
statusUpdate(umbilical);
input.close();
input = null;
output.close(mapperContext);
output = null;
} finally {
closeQuietly(input);
closeQuietly(output, mapperContext);
}
}

可以看到其中有通过mapper的run去运行一个mapper:

1
mapper.run(mapperContext);

让我们来看看里面具体做了什么:

1
2
3
4
5
6
7
8
9
10
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
cleanup(context);
}
}

这里大家应该就很熟悉了,三个方法:

  1. setup
  2. map
  3. cleanup

这三个方法我们在平时写mr程序的时候肯定会用到的。

而在map方法中,具体的数据是通过context获取的,而这里的context我们通过去看runNewMapper方法可以知道其实是一个MapContextImpl实现类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public KEYIN getCurrentKey() throws IOException, InterruptedException {
return reader.getCurrentKey();
}

@Override
public VALUEIN getCurrentValue() throws IOException, InterruptedException {
return reader.getCurrentValue();
}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
return reader.nextKeyValue();
}

里面的代码也很简单,调用了reader的方法,而reader是通过inputFormat的createRecordReader去构造的,我们还是看TextInputFormat:

1
2
3
4
5
6
7
8
9
10
11
@Override
public RecordReader<LongWritable, Text>
createRecordReader(InputSplit split,
TaskAttemptContext context)
{

String delimiter = context.getConfiguration().get(
"textinputformat.record.delimiter");
byte[] recordDelimiterBytes = null;
if (null != delimiter)
recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
return new LineRecordReader(recordDelimiterBytes);
}

后面的代码就不深究了,可以看到就是按行读取数据。

总结

通过上面的分析,我终于明白了为什么要设置reduceTaskNums为0了,因为对于一些特定的场景——比如读取hive数据写kafka topic,其实是不需要使用reduce过程的,只需要在map中将数据读取出来发给kafka就好了,而如果你使用map+reduce的模式,在reduce中处理逻辑的话,会经过一段十分复杂并且耗时的shuffle过程,从我个人的理解,shuffle过程是mr运行缓慢的主要原因,因为它涉及到了大量的文件io,所以我们可以通过不进行reduce阶段来提升整个mr程序的效率。