Mapreduce custom output

Introduction

Mapreduce is a very strong computing framework and we could use it to transform data to HDFS. Acutally, we could use Mapreduce to write data to any databse.

Basic input and output

The key thing in Mapreduce is Mapper and Reducer. We use the following code snippets to build a mapper. Here, Mapper<Object, Text, Text, Text> means input key and value class and ouput key and value class. Please notice that we could not use string and int here, since it is not fit the requirement in HDFS. value is the line of input. We could split it into key-value pair to context. The context is the output for Mapper.

 public static class YourMapper extends Mapper<Object, Text, Text, Text> {

    private Text word = new Text();
    private Text val = new Text();

    public void map(Object key, Text value, Context context)
        throws IOException, InterruptedException {
            ......
            context.write(word, val);
        }
 }

In reducer, we could use the following code snippets to write a reducer. We first needs to implement the Reducer class. In reduce function, we could see all values in Iterable<Text> values for the same key. Therefore, we could use your own function to combine all values.

 public static class yourReducer
      extends Reducer<Text, Text, Text, IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<Text> values, Context context)
        throws IOException, InterruptedException {
      int sum = 0;
      ....
      result.set(sum);
      context.write(key, result);
    }
  }

Customize your output

In the previous part, we could only write data to text file. You could find from here about how to write file to HBase with official API and implement TableReducer not Reducer.

So what if I want to write my own output path, like writing to Redis directly. I think the following code could help you. I reference this blog

First you need to implement you Mapper and Reducer as before. Then you need to write a customized output format.

The key class implemented RecordWriter. The <Text, Text> is the output of Reducer. Use write function to do operation on any database.

 public static class YouOutputFormat extends OutputFormat<Text, Text> {

    // write some configuration in this function and you could use it in the main function
    public static void configure(...) {
      ...
    }

    // This method returns an instance of a RecordWriter for the task.  Note how
    // we are pulling the variables set by the static methods during
    // configuration
    public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext job)
        throws IOException, InterruptedException {

      return new YouRecordWriter(Hosts);
    }


    // The output committer is used on the back-end to, well, commit output.
    // Discussion of this class is out of scope, but more info can be found here
    public OutputCommitter getOutputCommitter(TaskAttemptContext context)
        throws IOException, InterruptedException {
      // use a null output committer, since
      return (new NullOutputFormat<Text, Text>()).getOutputCommitter(context);
    }

    //The key class is here. The <Text, Text> is the output of Reducer
    public static class YourRecordWriter extends RecordWriter<Text, Text> {

      private Jedis j;

      public YourRecordWriter(...) {
            ....
      }

      public void write(Text key, Text value)
          throws IOException, InterruptedException {
      
        // Write the key/value pair
      
      }

      @Override
      public void close(TaskAttemptContext context)
          throws IOException, InterruptedException {
      // finish writing

      }
    }
  } 

I hope this blog will be helpful. Thank you for reading!

comments powered by Disqus