JavaData EngineeringBig DataHadoop

Chain Mapper Example

How to use the ChainMapper class in Hadoop to call multiple mappers in sequence, with a working example and key points about configuration and type compatibility.

16 February 2013 · 3 min read

Giving example of how to use the ChainMapper class in Hadoop to call code in sequence. I am writing comments in between to explain.

public class ChainDriver {
    public static Logger log = Logger.getLogger(ChainDriver.class);

    /**
     * @param args
     * @throws IOException
     * @throws ClassNotFoundException
     * @throws InterruptedException
     */
    public static void main(String[] args) throws IOException,
            InterruptedException, ClassNotFoundException {
        // Start main Chain Job and declare its conf and job
        Configuration chainConf = new Configuration();
        Job chainJob = Job.getInstance(chainConf);

        // Variable names kept like conf1 etc to make code less cluttered

        // Start Mapper for MyMapperA
        Configuration conf1 = new Configuration(false);
        // Example for Passing arguments to the mappers
        conf1.set("myParameter", args[2]);
        ChainMapper.addMapper(chainJob, MyMapperA.class,
                LongWritable.class, Text.class, Text.class, Text.class, conf1);

        // Start Mapper for Second replacement
        Configuration conf2 = new Configuration(false);
        // Dynamically take the class name from argument to make more Dynamic chain :)
        // (MapperC OR MapperD)
        ChainMapper.addMapper(chainJob,
                (Class<? extends Mapper>) Class.forName(args[2]), Text.class,
                Text.class, NullWritable.class, Text.class, conf2);

        // Set the parameters for main Chain Job
        chainJob.setJarByClass(ChainDriver.class);
        FileInputFormat.addInputPath(chainJob, new Path(args[0]));
        FileOutputFormat.setOutputPath(chainJob, new Path(args[1]));
        System.exit(chainJob.waitForCompletion(true) ? 0 : 1);
    }
}

Now in detail, a few important points:

1) Sub-mapper configuration

Configuration conf1 = new Configuration(false);

The sub-mappers’ configuration objects are initiated with boolean false. See the Configuration constructor:

public Configuration(boolean loadDefaults)

loadDefaults specifies whether to load from the default files.

2) Passing arguments

conf1.set("myParameter", args[2]);

You can use the same code as in any Driver class.

3) Adding a mapper to the chain

ChainMapper.addMapper(chainJob, MyMapperA.class,
        LongWritable.class, Text.class, Text.class, Text.class, conf1);

The method signature is:

public static void addMapper(Job job,
                             Class klass,
                             Class inputKeyClass,
                             Class inputValueClass,
                             Class outputKeyClass,
                             Class outputValueClass,
                             Configuration mapperConf)

The Job argument is the Job object of the main driver (chainJob). Then we specify which mapper to start and the key/value pairs used by the mapper. The last argument is the configuration of the mapper being called.

4) Type compatibility between chained mappers

You can call as many mappers and reducers in chain, but one thing to keep in mind is that the output of the previous mapper (or reducer) must be consumable directly by the next in the chain.

For example, if Map 1 and Map 2 are called in chain:

  • If Map 1 emits Text as Key and Long as Value
  • Then Map 2 should have Text as Key Input and Long as Value Input

The framework will not do any conversions for you.

5) ChainMapper overview

From the ChainMapper Javadocs:

The ChainMapper class allows to use multiple Mapper classes within a single Map task.

The key functionality of this feature is that the Mappers in the chain do not need to be aware that they are executed in a chain. This enables having reusable specialized Mappers that can be combined to perform composite operations within a single task.

Special care has to be taken when creating chains that the key/values output by a Mapper are valid for the following Mapper in the chain. It is assumed all Mappers and the Reduce in the chain use matching output and input key and value classes as no conversion is done by the chaining code.

Using the ChainMapper and the ChainReducer classes is possible to compose Map/Reduce jobs that look like [MAP+ / REDUCE MAP*]. And immediate benefit of this pattern is a dramatic reduction in disk IO.

I found it a pretty good tool while developing multiple processing pipelines. I just develop reusable classes for various tasks and call them in chain.

Update April 6: Based on experience using it, ChainMapper makes processing slow. So use it if and only if you really cannot avoid it.

Do you have some tips to improve performance of ChainMapper? Please share below.

Happy Chaining :)