MapReduce中parquet的split处理

在阅读《Hadoop权威指南》中parquet相关章节的时候,我想到了之前看到的MapReduce中split分片的代码,当时只看了基础的FileInputFormat,这次就借着这个机会来看看parquet是如何处理的。

文件格式

有关parquet文件格式的内容我这里就不多赘述了,网上有很多不错的文章,大家可以参考。

这里有一篇快速入门的文章,看完之后我总结一下:

parquet主要分为3个部分:

  1. Header
  2. Block
  3. Footer

文件相关的元数据都在Footer中,而Block中则是一个Row Group,其中有一族Column Chunk。

源码分析

大致了解了一下parquet的文件格式之后,我们来看具体的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* {@inheritDoc}
*/

@Override
public List<InputSplit> getSplits(JobContext jobContext) throws IOException {
Configuration configuration = ContextUtil.getConfiguration(jobContext);
List<InputSplit> splits = new ArrayList<InputSplit>();

if (isTaskSideMetaData(configuration)) {
// Although not required by the API, some clients may depend on always
// receiving ParquetInputSplit. Translation is required at some point.
for (InputSplit split : super.getSplits(jobContext)) {
Preconditions.checkArgument(split instanceof FileSplit,
"Cannot wrap non-FileSplit: " + split);
splits.add(ParquetInputSplit.from((FileSplit) split));
}
return splits;

} else {
splits.addAll(getSplits(configuration, getFooters(jobContext)));
}

return splits;
}

public static boolean isTaskSideMetaData(Configuration configuration) {
return configuration.getBoolean(TASK_SIDE_METADATA, TRUE);
}

首先在源码中根据TASK_SIDE_METADATA,也就是parquet.task.side.metadata这个属性值去区分分片的逻辑,默认为true的情况下,走的是父类FileInputFormat的逻辑,也就是根据split size(默认128M)去进行分片。而如果这个值为false,则走另外一种形式的分片逻辑,这也是我们今年要了解的内容。

我们先来看getFooters方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public List<Footer> getFooters(JobContext jobContext) throws IOException {
List<FileStatus> statuses = listStatus(jobContext);
if (statuses.isEmpty()) {
return Collections.emptyList();
}
Configuration config = ContextUtil.getConfiguration(jobContext);
List<Footer> footers = new ArrayList<Footer>(statuses.size());
Set<FileStatus> missingStatuses = new HashSet<FileStatus>();
Map<Path, FileStatusWrapper> missingStatusesMap =
new HashMap<Path, FileStatusWrapper>(missingStatuses.size());

if (footersCache == null) {
footersCache =
new LruCache<FileStatusWrapper, FootersCacheValue>(Math.max(statuses.size(), MIN_FOOTER_CACHE_SIZE));
}
for (FileStatus status : statuses) {
FileStatusWrapper statusWrapper = new FileStatusWrapper(status);
FootersCacheValue cacheEntry =
footersCache.getCurrentValue(statusWrapper);
if (Log.DEBUG) {
LOG.debug("Cache entry " + (cacheEntry == null ? "not " : "")
+ " found for '" + status.getPath() + "'");
}
if (cacheEntry != null) {
footers.add(cacheEntry.getFooter());
} else {
missingStatuses.add(status);
missingStatusesMap.put(status.getPath(), statusWrapper);
}
}
if (Log.DEBUG) {
LOG.debug("found " + footers.size() + " footers in cache and adding up "
+ "to " + missingStatuses.size() + " missing footers to the cache");
}


if (missingStatuses.isEmpty()) {
return footers;
}

List<Footer> newFooters = getFooters(config, missingStatuses);
for (Footer newFooter : newFooters) {
// Use the original file status objects to make sure we store a
// conservative (older) modification time (i.e. in case the files and
// footers were modified and it's not clear which version of the footers
// we have)
FileStatusWrapper fileStatus = missingStatusesMap.get(newFooter.getFile());
footersCache.put(fileStatus, new FootersCacheValue(fileStatus, newFooter));
}

footers.addAll(newFooters);
return footers;
}

顾名思义,这个方法就是去获取parquet文件的footer,而根据上面文件格式的学习我们可以得知,footer中有parquet文件的元数据,其中最重要的就是schema信息还有对应每一个block的元数据了:

1
2
3
4
5
6
7
8
9
10
11
12
private final FileMetaData fileMetaData;
private final List<BlockMetaData> blocks;

public final class FileMetaData implements Serializable {
private static final long serialVersionUID = 1L;

private final MessageType schema;

private final Map<String, String> keyValueMetaData;

private final String createdBy;
}

在获取到了文件的footer之后,调用了getSplits方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public List<ParquetInputSplit> getSplits(Configuration configuration, List<Footer> footers) throws IOException {
boolean strictTypeChecking = configuration.getBoolean(STRICT_TYPE_CHECKING, true);
final long maxSplitSize = configuration.getLong("mapred.max.split.size", Long.MAX_VALUE);
final long minSplitSize = Math.max(getFormatMinSplitSize(), configuration.getLong("mapred.min.split.size", 0L));
if (maxSplitSize < 0 || minSplitSize < 0) {
throw new ParquetDecodingException("maxSplitSize or minSplitSize should not be negative: maxSplitSize = " + maxSplitSize + "; minSplitSize = " + minSplitSize);
}
GlobalMetaData globalMetaData = ParquetFileWriter.getGlobalMetaData(footers, strictTypeChecking);
ReadContext readContext = getReadSupport(configuration).init(new InitContext(
configuration,
globalMetaData.getKeyValueMetaData(),
globalMetaData.getSchema()));

return new ClientSideMetadataSplitStrategy().getSplits(
configuration, footers, maxSplitSize, minSplitSize, readContext);
}

让我们来看最后的getSplits:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
List<ParquetInputSplit> getSplits(Configuration configuration, List<Footer> footers,
long maxSplitSize, long minSplitSize, ReadContext readContext)

throws IOException {

List<ParquetInputSplit> splits = new ArrayList<ParquetInputSplit>();
Filter filter = ParquetInputFormat.getFilter(configuration);

long rowGroupsDropped = 0;
long totalRowGroups = 0;

for (Footer footer : footers) {
final Path file = footer.getFile();
LOG.debug(file);
FileSystem fs = file.getFileSystem(configuration);
FileStatus fileStatus = fs.getFileStatus(file);
ParquetMetadata parquetMetaData = footer.getParquetMetadata();
List<BlockMetaData> blocks = parquetMetaData.getBlocks();

List<BlockMetaData> filteredBlocks;

totalRowGroups += blocks.size();
filteredBlocks = RowGroupFilter.filterRowGroups(filter, blocks, parquetMetaData.getFileMetaData().getSchema());
rowGroupsDropped += blocks.size() - filteredBlocks.size();

if (filteredBlocks.isEmpty()) {
continue;
}

BlockLocation[] fileBlockLocations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
splits.addAll(
generateSplits(
filteredBlocks,
fileBlockLocations,
fileStatus,
readContext.getRequestedSchema().toString(),
readContext.getReadSupportMetadata(),
minSplitSize,
maxSplitSize)
);
}

if (rowGroupsDropped > 0 && totalRowGroups > 0) {
int percentDropped = (int) ((((double) rowGroupsDropped) / totalRowGroups) * 100);
LOG.info("Dropping " + rowGroupsDropped + " row groups that do not pass filter predicate! (" + percentDropped + "%)");
} else {
LOG.info("There were no row groups that could be dropped due to filter predicates");
}
return splits;
}

该方法首先根据Hadoop的FileSystem获取到了文件所对应的block信息,这一步和FileInputFormat如出一辙。

接着调用了generateSplits方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static <T> List<ParquetInputSplit> generateSplits(
List<BlockMetaData> rowGroupBlocks,
BlockLocation[] hdfsBlocksArray,
FileStatus fileStatus,
String requestedSchema,
Map<String, String> readSupportMetadata, long minSplitSize, long maxSplitSize) throws IOException
{


List<SplitInfo> splitRowGroups =
generateSplitInfo(rowGroupBlocks, hdfsBlocksArray, minSplitSize, maxSplitSize);

//generate splits from rowGroups of each split
List<ParquetInputSplit> resultSplits = new ArrayList<ParquetInputSplit>();
for (SplitInfo splitInfo : splitRowGroups) {
ParquetInputSplit split = splitInfo.getParquetInputSplit(fileStatus, requestedSchema, readSupportMetadata);
resultSplits.add(split);
}
return resultSplits;
}

该方法中又调用了generateSplitInfo:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
static List<SplitInfo> generateSplitInfo(
List<BlockMetaData> rowGroupBlocks,
BlockLocation[] hdfsBlocksArray,
long minSplitSize, long maxSplitSize)
{

List<SplitInfo> splitRowGroups;

if (maxSplitSize < minSplitSize || maxSplitSize < 0 || minSplitSize < 0) {
throw new ParquetDecodingException("maxSplitSize and minSplitSize should be positive and max should be greater or equal to the minSplitSize: maxSplitSize = " + maxSplitSize + "; minSplitSize is " + minSplitSize);
}
HDFSBlocks hdfsBlocks = new HDFSBlocks(hdfsBlocksArray);
hdfsBlocks.checkBelongingToANewHDFSBlock(rowGroupBlocks.get(0));
SplitInfo currentSplit = new SplitInfo(hdfsBlocks.getCurrentBlock());

//assign rowGroups to splits
splitRowGroups = new ArrayList<SplitInfo>();
checkSorted(rowGroupBlocks);//assert row groups are sorted
for (BlockMetaData rowGroupMetadata : rowGroupBlocks) {
if ((hdfsBlocks.checkBelongingToANewHDFSBlock(rowGroupMetadata)
&& currentSplit.getCompressedByteSize() >= minSplitSize
&& currentSplit.getCompressedByteSize() > 0)
|| currentSplit.getCompressedByteSize() >= maxSplitSize) {
//create a new split
splitRowGroups.add(currentSplit);//finish previous split
currentSplit = new SplitInfo(hdfsBlocks.getCurrentBlock());
}
currentSplit.addRowGroup(rowGroupMetadata);
}

if (currentSplit.getRowGroupCount() > 0) {
splitRowGroups.add(currentSplit);
}

return splitRowGroups;
}

这个方法就是最核心的方法了,其中的逻辑大致是就是根据checkBelongingToANewHDFSBlock这个方法去判断parquet文件的某一个block是否在hdfs的block中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private boolean checkBelongingToANewHDFSBlock(BlockMetaData rowGroupMetadata) {
boolean isNewHdfsBlock = false;
long rowGroupMidPoint = rowGroupMetadata.getStartingPos() + (rowGroupMetadata.getCompressedSize() / 2);

//if mid point is not in the current HDFS block any more, return true
while (rowGroupMidPoint > getHDFSBlockEndingPosition(currentMidPointHDFSBlockIndex)) {
isNewHdfsBlock = true;
currentMidPointHDFSBlockIndex++;
if (currentMidPointHDFSBlockIndex >= hdfsBlocks.length)
throw new ParquetDecodingException("the row group is not in hdfs blocks in the file: midpoint of row groups is "
+ rowGroupMidPoint
+ ", the end of the hdfs block is "
+ getHDFSBlockEndingPosition(currentMidPointHDFSBlockIndex - 1));
}

while (rowGroupMetadata.getStartingPos() > getHDFSBlockEndingPosition(currentStartHdfsBlockIndex)) {
currentStartHdfsBlockIndex++;
if (currentStartHdfsBlockIndex >= hdfsBlocks.length)
throw new ParquetDecodingException("The row group does not start in this file: row group offset is "
+ rowGroupMetadata.getStartingPos()
+ " but the end of hdfs blocks of file is "
+ getHDFSBlockEndingPosition(currentStartHdfsBlockIndex));
}
return isNewHdfsBlock;
}

可以看到,checkBelongingToANewHDFSBlock内部就是通过文件offset的方式去判断的。

由上可知,generateSplitInfo就是去做了一个parquet的block和hdfs的block的映射,一个hdfs的block可以对应多个parquet文件的block。换句话说,代码中的SplitInfo就可以对应多个parquet的block:

1
2
3
4
5
static class SplitInfo {
List<BlockMetaData> rowGroups = new ArrayList<BlockMetaData>();
BlockLocation hdfsBlock;
long compressedByteSize = 0L;
}

最后让我们来看一下SplitInfo的getParquetInputSplit方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public ParquetInputSplit getParquetInputSplit(FileStatus fileStatus, String requestedSchema, Map<String, String> readSupportMetadata) throws IOException {
MessageType requested = MessageTypeParser.parseMessageType(requestedSchema);
long length = 0;

for (BlockMetaData block : this.getRowGroups()) {
List<ColumnChunkMetaData> columns = block.getColumns();
for (ColumnChunkMetaData column : columns) {
if (requested.containsPath(column.getPath().toArray())) {
length += column.getTotalSize();
}
}
}

BlockMetaData lastRowGroup = this.getRowGroups().get(this.getRowGroupCount() - 1);
long end = lastRowGroup.getStartingPos() + lastRowGroup.getTotalByteSize();

long[] rowGroupOffsets = new long[this.getRowGroupCount()];
for (int i = 0; i < rowGroupOffsets.length; i++) {
rowGroupOffsets[i] = this.getRowGroups().get(i).getStartingPos();
}

return new ParquetInputSplit(
fileStatus.getPath(),
hdfsBlock.getOffset(),
end,
length,
hdfsBlock.getHosts(),
rowGroupOffsets
);
}
}

很简单,就是去生成一个FileSplit,length就是对应所有Row Group的大小。

总结

通过上面的学习,我们可以知道,ParquetInputFormat内部有2种分片策略:

  1. 使用其父类的FileInputFormat的分片策略,根据split size来进行划分。
  2. 结合parquet文件格式自身的特性,根据block,也就是Row Group进行划分。