MapReduce之reduce过程浅析

博客中关于大数据的第三篇文章,国庆期间继续扒了扒MapReduce相关的源码,这篇文章来快速的总结一下reduce阶段。

总体概览

和Map阶段相似,Reduce阶段的入口我们可以看ReduceTask的run方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
@Override
@SuppressWarnings("unchecked")
public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException, InterruptedException, ClassNotFoundException {

job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());

if (isMapOrReduce()) {
copyPhase = getProgress().addPhase("copy");
sortPhase = getProgress().addPhase("sort");
reducePhase = getProgress().addPhase("reduce");
}
// start thread that will handle communication with parent
TaskReporter reporter = startReporter(umbilical);

boolean useNewApi = job.getUseNewReducer();
initialize(job, getJobID(), reporter, useNewApi);

// check if it is a cleanupJobTask
if (jobCleanup) {
runJobCleanupTask(umbilical, reporter);
return;
}
if (jobSetup) {
runJobSetupTask(umbilical, reporter);
return;
}
if (taskCleanup) {
runTaskCleanupTask(umbilical, reporter);
return;
}

// Initialize the codec
codec = initCodec();
RawKeyValueIterator rIter = null;
ShuffleConsumerPlugin shuffleConsumerPlugin = null;

Class combinerClass = conf.getCombinerClass();
CombineOutputCollector combineCollector =
(null != combinerClass) ?
new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;

Class<? extends ShuffleConsumerPlugin> clazz =
job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);

shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);
LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);

ShuffleConsumerPlugin.Context shuffleContext =
new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical,
super.lDirAlloc, reporter, codec,
combinerClass, combineCollector,
spilledRecordsCounter, reduceCombineInputCounter,
shuffledMapsCounter,
reduceShuffleBytes, failedShuffleCounter,
mergedMapOutputsCounter,
taskStatus, copyPhase, sortPhase, this,
mapOutputFile, localMapFiles);
shuffleConsumerPlugin.init(shuffleContext);

rIter = shuffleConsumerPlugin.run();

// free up the data structures
mapOutputFilesOnDisk.clear();

sortPhase.complete(); // sort is complete
setPhase(TaskStatus.Phase.REDUCE);
statusUpdate(umbilical);
Class keyClass = job.getMapOutputKeyClass();
Class valueClass = job.getMapOutputValueClass();
RawComparator comparator = job.getOutputValueGroupingComparator();

if (useNewApi) {
runNewReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
} else {
runOldReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
}

shuffleConsumerPlugin.close();
done(umbilical, reporter);
}

run方法开头就很清晰的看到了reduce阶段要做的三件事:

  1. copy
  2. sort
  3. reduce

下面我们就按照这样的顺序来快速过一下吧。

copy阶段

我们还是先看ReduceTask的run方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Override
@SuppressWarnings("unchecked")
public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException, InterruptedException, ClassNotFoundException {

job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());

if (isMapOrReduce()) {
copyPhase = getProgress().addPhase("copy");
sortPhase = getProgress().addPhase("sort");
reducePhase = getProgress().addPhase("reduce");
}

.....

shuffleConsumerPlugin.init(shuffleContext);

rIter = shuffleConsumerPlugin.run();

.....
}

第一步,调用了shuffleConsumerPlugin的init方法去做一些初始化操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Override
public void init(ShuffleConsumerPlugin.Context context) {
this.context = context;

this.reduceId = context.getReduceId();
this.jobConf = context.getJobConf();
this.umbilical = context.getUmbilical();
this.reporter = context.getReporter();
this.metrics = new ShuffleClientMetrics(reduceId, jobConf);
this.copyPhase = context.getCopyPhase();
this.taskStatus = context.getStatus();
this.reduceTask = context.getReduceTask();
this.localMapFiles = context.getLocalMapFiles();

scheduler = new ShuffleSchedulerImpl<K, V>(jobConf, taskStatus, reduceId,
this, copyPhase, context.getShuffledMapsCounter(),
context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
merger = createMergeManager(context);
}

可以看到里面创建了2个比较关键的类:scheduler和merger。scheduler的作用就是用于调度任务,而merger的作用则是用于进行第二阶段的sort。

我们回到ReduceTask的run方法,看shuffleConsumerPlugin的run:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Override
public RawKeyValueIterator run() throws IOException, InterruptedException {

.....

// Start the map-completion events fetcher thread
final EventFetcher<K,V> eventFetcher =
new EventFetcher<K,V>(reduceId, umbilical, scheduler, this,
maxEventsToFetch);
eventFetcher.start();

// Start the map-output fetcher threads
boolean isLocal = localMapFiles != null;
final int numFetchers = isLocal ? 1 :
jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
Fetcher<K,V>[] fetchers = new Fetcher[numFetchers];
if (isLocal) {
fetchers[0] = new LocalFetcher<K, V>(jobConf, reduceId, scheduler,
merger, reporter, metrics, this, reduceTask.getShuffleSecret(),
localMapFiles);
fetchers[0].start();
} else {
for (int i=0; i < numFetchers; ++i) {
fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger,
reporter, metrics, this,
reduceTask.getShuffleSecret());
fetchers[i].start();
}
}

.....

return kvIter;
}

这个方法比较长,并且包含了前面说的copy和sort两个阶段,我们先来看前面的copy阶段,其中创建了2种线程:EventFetcher和Fetcher。其中Fetcher又根据是否是Uber模式分为LocalFetcher和Fetcher。我们先看EventFetcher:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Override
public void run() {
int failures = 0;
LOG.info(reduce + " Thread started: " + getName());

try {
while (!stopped && !Thread.currentThread().isInterrupted()) {
try {
int numNewMaps = getMapCompletionEvents();
failures = 0;
if (numNewMaps > 0) {
LOG.info(reduce + ": " + "Got " + numNewMaps + " new map-outputs");
}
LOG.debug("GetMapEventsThread about to sleep for " + SLEEP_TIME);
if (!Thread.currentThread().isInterrupted()) {
Thread.sleep(SLEEP_TIME);
}
} catch (InterruptedException e) {
LOG.info("EventFetcher is interrupted.. Returning");
return;
} catch (IOException ie) {
LOG.info("Exception in getting events", ie);
// check to see whether to abort
if (++failures >= MAX_RETRIES) {
throw new IOException("too many failures downloading events", ie);
}
// sleep for a bit
if (!Thread.currentThread().isInterrupted()) {
Thread.sleep(RETRY_PERIOD);
}
}
}
} catch (InterruptedException e) {
return;
} catch (Throwable t) {
exceptionReporter.reportException(t);
return;
}
}

从它的run方法可以知道,EventFetcher的作用就是通过rpc调用去获取已经完成的map task数量。

我们再看一下Fetcher:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public void run() {
try {
while (!stopped && !Thread.currentThread().isInterrupted()) {
MapHost host = null;
try {
// If merge is on, block
merger.waitForResource();

// Get a host to shuffle from
host = scheduler.getHost();
metrics.threadBusy();

// Shuffle
copyFromHost(host);
} finally {
if (host != null) {
scheduler.freeHost(host);
metrics.threadFree();
}
}
}
} catch (InterruptedException ie) {
return;
} catch (Throwable t) {
exceptionReporter.reportException(t);
}
}

首先判断是否在进行merge,如果在merge则阻塞,否则通过copyFromHost方法,使用一个http调用去获取map task存储在文件中的数据。

我们再深入的看一下CopyFromHost方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
@VisibleForTesting
protected void copyFromHost(MapHost host) throws IOException {
// reset retryStartTime for a new host
retryStartTime = 0;
// Get completed maps on 'host'
List<TaskAttemptID> maps = scheduler.getMapsForHost(host);

// Sanity check to catch hosts with only 'OBSOLETE' maps,
// especially at the tail of large jobs
if (maps.size() == 0) {
return;
}

if(LOG.isDebugEnabled()) {
LOG.debug("Fetcher " + id + " going to fetch from " + host + " for: "
+ maps);
}

// List of maps to be fetched yet
Set<TaskAttemptID> remaining = new HashSet<TaskAttemptID>(maps);

// Construct the url and connect
URL url = getMapOutputURL(host, maps);
DataInputStream input = openShuffleUrl(host, remaining, url);
if (input == null) {
return;
}

try {
// Loop through available map-outputs and fetch them
// On any error, faildTasks is not null and we exit
// after putting back the remaining maps to the
// yet_to_be_fetched list and marking the failed tasks.
TaskAttemptID[] failedTasks = null;
while (!remaining.isEmpty() && failedTasks == null) {
try {
failedTasks = copyMapOutput(host, input, remaining, fetchRetryEnabled);
} catch (IOException e) {
//
// Setup connection again if disconnected by NM
connection.disconnect();
// Get map output from remaining tasks only.
url = getMapOutputURL(host, remaining);
input = openShuffleUrl(host, remaining, url);
if (input == null) {
return;
}
}
}

if(failedTasks != null && failedTasks.length > 0) {
LOG.warn("copyMapOutput failed for tasks "+Arrays.toString(failedTasks));
scheduler.hostFailed(host.getHostName());
for(TaskAttemptID left: failedTasks) {
scheduler.copyFailed(left, host, true, false);
}
}

// Sanity check
if (failedTasks == null && !remaining.isEmpty()) {
throw new IOException("server didn't return all expected map outputs: "
+ remaining.size() + " left.");
}
input.close();
input = null;
} finally {
if (input != null) {
IOUtils.cleanup(LOG, input);
input = null;
}
for (TaskAttemptID left : remaining) {
scheduler.putBackKnownMapOutput(host, left);
}
}
}

其中最重要的就是调用了copyMapOutput方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
private TaskAttemptID[] copyMapOutput(MapHost host,
DataInputStream input,
Set<TaskAttemptID> remaining,
boolean canRetry) throws IOException {
MapOutput<K,V> mapOutput = null;
TaskAttemptID mapId = null;
long decompressedLength = -1;
long compressedLength = -1;

try {
long startTime = Time.monotonicNow();
int forReduce = -1;
//Read the shuffle header
try {
ShuffleHeader header = new ShuffleHeader();
header.readFields(input);
mapId = TaskAttemptID.forName(header.mapId);
compressedLength = header.compressedLength;
decompressedLength = header.uncompressedLength;
forReduce = header.forReduce;
} catch (IllegalArgumentException e) {
badIdErrs.increment(1);
LOG.warn("Invalid map id ", e);
//Don't know which one was bad, so consider all of them as bad
return remaining.toArray(new TaskAttemptID[remaining.size()]);
}

InputStream is = input;
is = CryptoUtils.wrapIfNecessary(jobConf, is, compressedLength);
compressedLength -= CryptoUtils.cryptoPadding(jobConf);
decompressedLength -= CryptoUtils.cryptoPadding(jobConf);

// Do some basic sanity verification
if (!verifySanity(compressedLength, decompressedLength, forReduce,
remaining, mapId)) {
return new TaskAttemptID[] {mapId};
}

if(LOG.isDebugEnabled()) {
LOG.debug("header: " + mapId + ", len: " + compressedLength +
", decomp len: " + decompressedLength);
}

// Get the location for the map output - either in-memory or on-disk
try {
mapOutput = merger.reserve(mapId, decompressedLength, id);
} catch (IOException ioe) {
// kill this reduce attempt
ioErrs.increment(1);
scheduler.reportLocalError(ioe);
return EMPTY_ATTEMPT_ID_ARRAY;
}

// Check if we can shuffle *now* ...
if (mapOutput == null) {
LOG.info("fetcher#" + id + " - MergeManager returned status WAIT ...");
//Not an error but wait to process data.
return EMPTY_ATTEMPT_ID_ARRAY;
}

// The codec for lz0,lz4,snappy,bz2,etc. throw java.lang.InternalError
// on decompression failures. Catching and re-throwing as IOException
// to allow fetch failure logic to be processed
try {
// Go!
LOG.info("fetcher#" + id + " about to shuffle output of map "
+ mapOutput.getMapId() + " decomp: " + decompressedLength
+ " len: " + compressedLength + " to " + mapOutput.getDescription());
mapOutput.shuffle(host, is, compressedLength, decompressedLength,
metrics, reporter);
} catch (java.lang.InternalError e) {
LOG.warn("Failed to shuffle for fetcher#"+id, e);
throw new IOException(e);
}

// Inform the shuffle scheduler
long endTime = Time.monotonicNow();
// Reset retryStartTime as map task make progress if retried before.
retryStartTime = 0;

scheduler.copySucceeded(mapId, host, compressedLength,
startTime, endTime, mapOutput);
// Note successful shuffle
remaining.remove(mapId);
metrics.successFetch();
return null;
} catch (IOException ioe) {

if (canRetry) {
checkTimeoutOrRetry(host, ioe);
}

ioErrs.increment(1);
if (mapId == null || mapOutput == null) {
LOG.warn("fetcher#" + id + " failed to read map header" +
mapId + " decomp: " +
decompressedLength + ", " + compressedLength, ioe);
if(mapId == null) {
return remaining.toArray(new TaskAttemptID[remaining.size()]);
} else {
return new TaskAttemptID[] {mapId};
}
}

LOG.warn("Failed to shuffle output of " + mapId +
" from " + host.getHostName(), ioe);

// Inform the shuffle-scheduler
mapOutput.abort();
metrics.failedFetch();
return new TaskAttemptID[] {mapId};
}
}

这个方法比较复杂,我们看关键的步骤:

1
mapOutput = merger.reserve(mapId, decompressedLength, id);

这一步通过前面在init中创建的MergeManagerImpl的reverse方法去获取一个mapOutput:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Override
public synchronized MapOutput<K,V> reserve(TaskAttemptID mapId,
long requestedSize,
int fetcher
) throws IOException
{

if (!canShuffleToMemory(requestedSize)) {
LOG.info(mapId + ": Shuffling to disk since " + requestedSize +
" is greater than maxSingleShuffleLimit (" +
maxSingleShuffleLimit + ")");
return new OnDiskMapOutput<K,V>(mapId, reduceId, this, requestedSize,
jobConf, mapOutputFile, fetcher, true);
}

if (usedMemory > memoryLimit) {
LOG.debug(mapId + ": Stalling shuffle since usedMemory (" + usedMemory
+ ") is greater than memoryLimit (" + memoryLimit + ")." +
" CommitMemory is (" + commitMemory + ")");
return null;
}

// Allow the in-memory shuffle to progress
LOG.debug(mapId + ": Proceeding with shuffle since usedMemory ("
+ usedMemory + ") is lesser than memoryLimit (" + memoryLimit + ")."
+ "CommitMemory is (" + commitMemory + ")");
return unconditionalReserve(mapId, requestedSize, true);
}

可以看到这里有一个判断:

1
2
3
private boolean canShuffleToMemory(long requestedSize) {
return (requestedSize < maxSingleShuffleLimit);
}

意思就是map产生的文件大小是否符合我们的要求,如果小于要求则copy到内存中,否则直接copy到文件里,这里的maxSingleShuffleLimit是通过config设置的,我们可以自行根据自己的实际情况进行重写。

通过这个方法我们知道,copy阶段存在内存copy和文件copy两种模式,根据我们设置的阈值去区分,好处是一些小文件copy到内存里,在之后的sort阶段可以直接在内存中排序,减少文件io。

获取到mapOutput,会调用起shuffle方法去进行真正的数据copy,具体代码就不分析了,无非就是InputStream写内存或者写文件。

在copy完毕之后,会调用scheduler的copySuccess方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public synchronized void copySucceeded(TaskAttemptID mapId,
MapHost host,
long bytes,
long startMillis,
long endMillis,
MapOutput<K,V> output
) throws IOException
{

.....

if (!finishedMaps[mapIndex]) {
output.commit();
finishedMaps[mapIndex] = true;
shuffledMapsCounter.increment(1);
if (--remainingMaps == 0) {
notifyAll();
}

.....
}
}

这个方法最关键的两步就是会改变2个变量:finishedMaps和remainingMaps。

根据上面的学习我们可以得知,copy阶段最主要的功能就是获取map task结束后存储在文件中的数据,根据一定的规则决定是copy到内存中或者是copy到文件里。

sort阶段

我们回到shuffleConsumerPlugin的run方法,看一下后续的操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@Override
public RawKeyValueIterator run() throws IOException, InterruptedException {

.....

// Wait for shuffle to complete successfully
while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {
reporter.progress();

synchronized (this) {
if (throwable != null) {
throw new ShuffleError("error in shuffle in " + throwingThreadName,
throwable);
}
}
}

// Stop the event-fetcher thread
eventFetcher.shutDown();

// Stop the map-output fetcher threads
for (Fetcher<K,V> fetcher : fetchers) {
fetcher.shutDown();
}

// stop the scheduler
scheduler.close();

copyPhase.complete(); // copy is already complete
taskStatus.setPhase(TaskStatus.Phase.SORT);
reduceTask.statusUpdate(umbilical);

// Finish the on-going merges...
RawKeyValueIterator kvIter = null;
try {
kvIter = merger.close();
} catch (Throwable e) {
throw new ShuffleError("Error while doing final merge " , e);
}

.....

return kvIter;
}

可以看到有一个while循环,条件是:

1
scheduler.waitUntilDone(PROGRESS_FREQUENCY)

我们去看一下具体的实现:

1
2
3
4
5
6
7
8
9
@Override
public synchronized boolean waitUntilDone(int millis
) throws InterruptedException
{

if (remainingMaps > 0) {
wait(millis);
return remainingMaps == 0;
}
return true;
}

从文字上我们也很好理解,就是去判断是否还有剩余的map没有处理,如果已经全部处理完了(remainingMaps == 0)就执行后续的操作。而copy阶段的分析中我们已经知道,在copy完成之后,会调用scheduler的copySuccess方法更新remainingMaps。

真正的sort是在merge的close方法中:

1
kvIter = merger.close();

这个方法比较复杂,我们就不进去看来,其中主要的逻辑就是先将copy阶段产生的内存数据进行排序然后输出到文件中,再将其和copy阶段产生的文件进行堆排序,然后进行文件的merge。

reduce阶段

copy和sort阶段完成之后,让我们回到ReduceTask的run方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Override
@SuppressWarnings("unchecked")
public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException, InterruptedException, ClassNotFoundException {

job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());

.....

if (useNewApi) {
runNewReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
} else {
runOldReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
}

shuffleConsumerPlugin.close();
done(umbilical, reporter);
}

这里的代码和map阶段就几乎是一模一样了,runNewReducer中就是读取数据,然后调用我们自己的Reduce类去进行操作,具体的代码就不看了。