Step-by-Step Guide to Writing a MapReduce Program – Hadoop Tutorial

8/23/2025

Step-by-Step Guide Writing a MapReduce Program

Go Back

Step-by-Step Guide to Writing a MapReduce Program – Hadoop Tutorial

The MapReduce programming model is the heart of Hadoop’s batch processing. This practical, step‑by‑step guide walks you through building, packaging, and running a complete MapReduce job—from setting up your project to reading the final output on HDFS. Perfect for beginners and a handy checklist for pros.


Step-by-Step Guide  Writing a MapReduce Program

Prerequisites

  • Hadoop 2.x/3.x cluster (pseudo‑distributed or distributed) with HDFS and YARN running.

  • Java 8+ (for classic MapReduce examples) and Maven/Gradle.

  • Basic shell commands and access to hdfs and yarn CLIs.

Tip: Verify your setup:

hadoop version
hdfs dfs -ls /
yarn node -list -all

Step 1: Understand Your Data & Choose Key/Value Types

  • Input: Decide how the input is formatted (text lines, CSV, JSON). Default is TextInputFormat (one line → one record).

  • Mapper output: Pick mapper key/value types.

  • Reducer output: Pick final key/value types.

Example (Word Count):

  • Mapper emits: <Text word, IntWritable 1>

  • Reducer emits: <Text word, IntWritable total>


Step 2: Create a Maven Project (Recommended)

pom.xml essentials:

<dependencies>
  <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>${hadoop.version}</version>
    <scope>provided</scope>
  </dependency>
  <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-mapreduce-client-core</artifactId>
    <version>${hadoop.version}</version>
    <scope>provided</scope>
  </dependency>
</dependencies>

Use your cluster’s Hadoop version for ${hadoop.version}.


Step 3: Implement the Mapper

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.util.StringTokenizer;

public class TokenizerMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
  private final static IntWritable ONE = new IntWritable(1);
  private final Text word = new Text();

  @Override
  protected void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {
    StringTokenizer itr = new StringTokenizer(value.toString());
    while (itr.hasMoreTokens()) {
      word.set(itr.nextToken().replaceAll("[^A-Za-z0-9]", "").toLowerCase());
      if (word.getLength() > 0) context.write(word, ONE);
    }
  }
}

Step 4: Implement the Reducer

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;

public class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
  @Override
  protected void reduce(Text key, Iterable<IntWritable> values, Context context)
      throws IOException, InterruptedException {
    int sum = 0;
    for (IntWritable v : values) sum += v.get();
    context.write(key, new IntWritable(sum));
  }
}

Step 5: (Optional but Powerful) Combiner

A Combiner runs after the mapper on the same node to pre‑aggregate data and reduce shuffle size.

// Reuse reducer as combiner for sum semantics
job.setCombinerClass(IntSumReducer.class);

Step 6: Write the Driver (Job) Class

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountDriver {
  public static void main(String[] args) throws Exception {
    if (args.length != 2) {
      System.err.println("Usage: wordcount <input> <output>");
      System.exit(2);
    }
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCountDriver.class);

    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class); // optional
    job.setReducerClass(IntSumReducer.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);

    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

Step 7: Build a Fat JAR

mvn -DskipTests package
# result: target/your-artifact-name.jar

If your cluster doesn’t have all dependencies, use the Maven Shade plugin to create a fat/uber JAR.


Step 8: Load Sample Data into HDFS

echo -e "Hadoop MapReduce\nHadoop Hadoop" > input.txt
hdfs dfs -mkdir -p /user/$USER/mr/input
hdfs dfs -put -f input.txt /user/$USER/mr/input/
hdfs dfs -ls /user/$USER/mr/input

Step 9: Run the Job on YARN

hadoop jar target/your-artifact-name.jar WordCountDriver \
  /user/$USER/mr/input /user/$USER/mr/output-$(date +%s)

If the output path exists, Hadoop will fail. Always use a fresh directory.

Monitor progress:

yarn application -list
yarn application -status <application_id>

Step 10: View the Output

hdfs dfs -ls /user/$USER/mr/
hdfs dfs -cat /user/$USER/mr/output-*/part-r-00000 | head -20

Expected (example):

hadoop 3
mapreduce 1

Step 11: Add Production‑Ready Features

a) Counters – track custom metrics

context.getCounter("QC","BLANK_TOKENS").increment(1);

b) Partitioner – control key → reducer mapping

job.setPartitionerClass(org.apache.hadoop.mapreduce.lib.partition.HashPartitioner.class);
job.setNumReduceTasks(2);

c) Compression – reduce shuffle/storage size

# map output compression
yarn jar ... -Dmapreduce.map.output.compress=true \
  -Dmapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec
# final output compression
-Dmapreduce.output.fileoutputformat.compress=true \
-Dmapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec

d) Input/Output Formats – e.g., KeyValueTextInputFormat, SequenceFileInputFormat, Parquet (via ecosystem).


Step 12: Test Locally & Debug

  • Local (single‑JVM) mode:

mapred-site.xml: mapreduce.framework.name=local
  • Logs:

yarn logs -applicationId <id> | less
  • Common issues:

    • Output directory exists → remove or use a new path.

    • ClassNotFound → build a fat JAR or match Hadoop versions.

    • Permission denied → check HDFS ACLs/permissions.


Alternative: Hadoop Streaming (Python Example)

If you prefer Python/other languages, use Hadoop Streaming.

mapper.py

#!/usr/bin/env python3
import sys, re
for line in sys.stdin:
    for token in re.findall(r"[A-Za-z0-9]+", line.lower()):
        print(f"{token}\t1")

reducer.py

#!/usr/bin/env python3
import sys
current=None; acc=0
for line in sys.stdin:
    key, val = line.rstrip('\n').split('\t')
    if key != current and current is not None:
        print(f"{current}\t{acc}"); acc=0
    current = key; acc += int(val)
if current is not None:
    print(f"{current}\t{acc}")

Run:

hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
 -files mapper.py,reducer.py \
 -mapper mapper.py -reducer reducer.py \
 -input /user/$USER/mr/input -output /user/$USER/mr/out-stream-$(date +%s)

Performance Tips

  • Use Combiners for associative/commutative aggregates.

  • Increase map/reduce parallelism wisely: setNumReduceTasks(n).

  • Leverage data locality—store input on HDFS close to compute.

  • Enable Snappy/LZO compression for shuffle.

  • Prefer SequenceFiles/Parquet over plain text for large pipelines.


FAQ (Quick SEO Boost)

Q1. When should I use a Combiner?
When your reduce function is associative and commutative (e.g., sum, min, max) to reduce network I/O.

Q2. How do I run MapReduce without YARN?
Set mapreduce.framework.name=local for local mode (good for dev/testing).

Q3. How big are HDFS blocks?
Typically 128 MB (configurable). Blocks define map input splits under TextInputFormat.

Q4. How do I pass custom configs at runtime?
Use -Dkey=value (e.g., -Dmapreduce.job.reduces=4). Access via context.getConfiguration().


Conclusion

You’ve built a full MapReduce job end‑to‑end: model your data, implement Mapper/Reducer, configure the job, package a JAR, run on YARN, and validate the results. With counters, combiners, partitioners, compression, and streaming, you can scale to production‑grade pipelines.