Step-by-Step Guide to Writing a MapReduce Program – Hadoop Tutorial
Step-by-Step Guide Writing a MapReduce Program
The MapReduce programming model is the heart of Hadoop’s batch processing. This practical, step‑by‑step guide walks you through building, packaging, and running a complete MapReduce job—from setting up your project to reading the final output on HDFS. Perfect for beginners and a handy checklist for pros.
Hadoop 2.x/3.x cluster (pseudo‑distributed or distributed) with HDFS and YARN running.
Java 8+ (for classic MapReduce examples) and Maven/Gradle.
Basic shell commands and access to hdfs
and yarn
CLIs.
Tip: Verify your setup:
hadoop version
hdfs dfs -ls /
yarn node -list -all
Input: Decide how the input is formatted (text lines, CSV, JSON). Default is TextInputFormat
(one line → one record).
Mapper output: Pick mapper key/value types.
Reducer output: Pick final key/value types.
Example (Word Count):
Mapper emits: <Text word, IntWritable 1>
Reducer emits: <Text word, IntWritable total>
pom.xml
essentials:
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
Use your cluster’s Hadoop version for
${hadoop.version}
.
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.util.StringTokenizer;
public class TokenizerMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable ONE = new IntWritable(1);
private final Text word = new Text();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken().replaceAll("[^A-Za-z0-9]", "").toLowerCase());
if (word.getLength() > 0) context.write(word, ONE);
}
}
}
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable v : values) sum += v.get();
context.write(key, new IntWritable(sum));
}
}
A Combiner runs after the mapper on the same node to pre‑aggregate data and reduce shuffle size.
// Reuse reducer as combiner for sum semantics
job.setCombinerClass(IntSumReducer.class);
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCountDriver {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: wordcount <input> <output>");
System.exit(2);
}
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCountDriver.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class); // optional
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
mvn -DskipTests package
# result: target/your-artifact-name.jar
If your cluster doesn’t have all dependencies, use the Maven Shade plugin to create a fat/uber JAR.
echo -e "Hadoop MapReduce\nHadoop Hadoop" > input.txt
hdfs dfs -mkdir -p /user/$USER/mr/input
hdfs dfs -put -f input.txt /user/$USER/mr/input/
hdfs dfs -ls /user/$USER/mr/input
hadoop jar target/your-artifact-name.jar WordCountDriver \
/user/$USER/mr/input /user/$USER/mr/output-$(date +%s)
If the output path exists, Hadoop will fail. Always use a fresh directory.
Monitor progress:
yarn application -list
yarn application -status <application_id>
hdfs dfs -ls /user/$USER/mr/
hdfs dfs -cat /user/$USER/mr/output-*/part-r-00000 | head -20
Expected (example):
hadoop 3
mapreduce 1
a) Counters – track custom metrics
context.getCounter("QC","BLANK_TOKENS").increment(1);
b) Partitioner – control key → reducer mapping
job.setPartitionerClass(org.apache.hadoop.mapreduce.lib.partition.HashPartitioner.class);
job.setNumReduceTasks(2);
c) Compression – reduce shuffle/storage size
# map output compression
yarn jar ... -Dmapreduce.map.output.compress=true \
-Dmapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec
# final output compression
-Dmapreduce.output.fileoutputformat.compress=true \
-Dmapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec
d) Input/Output Formats – e.g., KeyValueTextInputFormat
, SequenceFileInputFormat
, Parquet (via ecosystem)
.
Local (single‑JVM) mode:
mapred-site.xml: mapreduce.framework.name=local
Logs:
yarn logs -applicationId <id> | less
Common issues:
Output directory exists → remove or use a new path.
ClassNotFound → build a fat JAR or match Hadoop versions.
Permission denied → check HDFS ACLs/permissions.
If you prefer Python/other languages, use Hadoop Streaming.
mapper.py
#!/usr/bin/env python3
import sys, re
for line in sys.stdin:
for token in re.findall(r"[A-Za-z0-9]+", line.lower()):
print(f"{token}\t1")
reducer.py
#!/usr/bin/env python3
import sys
current=None; acc=0
for line in sys.stdin:
key, val = line.rstrip('\n').split('\t')
if key != current and current is not None:
print(f"{current}\t{acc}"); acc=0
current = key; acc += int(val)
if current is not None:
print(f"{current}\t{acc}")
Run:
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
-files mapper.py,reducer.py \
-mapper mapper.py -reducer reducer.py \
-input /user/$USER/mr/input -output /user/$USER/mr/out-stream-$(date +%s)
Use Combiners for associative/commutative aggregates.
Increase map/reduce parallelism wisely: setNumReduceTasks(n)
.
Leverage data locality—store input on HDFS close to compute.
Enable Snappy/LZO compression for shuffle.
Prefer SequenceFiles/Parquet over plain text for large pipelines.
Q1. When should I use a Combiner?
When your reduce function is associative and commutative (e.g., sum, min, max) to reduce network I/O.
Q2. How do I run MapReduce without YARN?
Set mapreduce.framework.name=local
for local mode (good for dev/testing).
Q3. How big are HDFS blocks?
Typically 128 MB (configurable). Blocks define map input splits under TextInputFormat
.
Q4. How do I pass custom configs at runtime?
Use -Dkey=value
(e.g., -Dmapreduce.job.reduces=4
). Access via context.getConfiguration()
.
You’ve built a full MapReduce job end‑to‑end: model your data, implement Mapper/Reducer, configure the job, package a JAR, run on YARN, and validate the results. With counters, combiners, partitioners, compression, and streaming, you can scale to production‑grade pipelines.