Hadoop使用MultipleOutputs时,没有数据

Posted by AlstonWilliams on February 17, 2019

在这么一个场景中,我们用到了MultipleOutputs

我们在进行统计之后,想要将其按照Key输出到不同的目录。

在使用MultipleOutputs来做这件事的时候,就发现,目录能够被正确创建,但是对应的文件却老是没有内容。

读了一遍MultipleOutputs的源码,发现应该没有问题啊。

最后,请教我们组的Leader,才知道问题原来是出在了没有调用两个方法。

在我们用Context来输出的时候,我们直接调用context.write(key, value)就能将结果输出到正确的目录,但是调用multipleOutputs.write(key,value)时,并不是这样。

它会先将你要写的这些内容,保存到一个缓冲区中,只有当你调用multipleOutputs.close()和Reducer的super.cleanup(context)时,才正确。

示例代码如下:

package com.company.reducer;

import com.company.datastructure.SuffixCompareText;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

import java.io.IOException;

public class SuffixWordCountReducerWithOutputToCustomizedLocation extends Reducer<SuffixCompareText, IntWritable, Text, Text>{

    private int total;
    private int unique;
    private String suffixOfPreviousPartition = null;
    private Text outputKey = new Text();
    private Text outputValue = new Text();

    private MultipleOutputs<Text, Text> mos;

    @Override
    public void setup(Context context) {
        mos = new MultipleOutputs(context);
    }

    @Override
    public void reduce(SuffixCompareText key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        String keyString = key.toString();
        String suffixOfCurrentKey = keyString.substring(keyString.length() - 1);
        if (!suffixOfCurrentKey.equals(suffixOfPreviousPartition)) {
            if (suffixOfPreviousPartition != null) {
                outputKey.set(suffixOfPreviousPartition);
                outputValue.set(Integer.toString(total) + " " + Integer.toString(unique));
                mos.write(outputKey, outputValue, generateFileName());
            }
            suffixOfPreviousPartition = suffixOfCurrentKey;
            total = 0;
            unique = 0;
        }

        for (IntWritable value : values) {
            total += value.get();
        }
        unique++;

    }

    @Override
    public void cleanup(Context context) throws IOException, InterruptedException {
        if (suffixOfPreviousPartition != null) {
            outputKey.set(suffixOfPreviousPartition);
            outputValue.set(Integer.toString(total) + " " + Integer.toString(unique));
            mos.write(outputKey, outputValue, generateFileName());
        }
        // You should call these two lines to flush the data to hdfs because the data is saved in buffer when you call write.
        mos.close();
        super.cleanup(context);
    }

    private String generateFileName() {
        return outputKey + "/" + outputKey;
    }

}