Building a Distributed Hadoop Cluster for Inverted Index Implementation with MapReduce
Overview
This project details the construction of a fully distributed Hadoop cluster and the implementation of an inverted index using MapReduce. The inverted index serves as a fundamental data structure in search engines, enabling efficient retrieval of documents containing specific terms.
System Architecture
Core Components
HDFS (Hadoop Distributed File System)
HDFS provides scalable distributed storage essential for managing large-scale web crawl data. Its block-based storage mechanism with configurable replication factor ensures data reliability and fault tolerance, forming the foundation for subsequent processing tasks.
MapReduce
The MapReduce programming model enables parallel processing of large datasets by dividing computational tasks into independent subtasks. This framework proves particularly effective for search engine operations including index construction, query processing, and ranking calculations.
HBase
HBase serves as a distributed, scalable columnar database well-suited for storing intermediate data such as inverted indexes. Its random read/write capabilities support high-throughput query operations critical for search functionality.
YARN
YARN (Yet Another Resource Negotiator) manages cluster resources, scheduling and allocating capacity across various applications running on the Hadoop cluster.
Cluster Deployment Topology
The cluster consists of three nodes with the following role distribution:
| Node | Roles |
|---|---|
| hadoop102 | NameNode, DataNode, NodeManager |
| hadoop103 | DataNode, ResourceManager, NodeManager |
| hadoop104 | SecondaryNameNode, DataNode, NodeManager |
Environment Setup
Virtual Machine Configuration
Three virtual machines running CentOS 7.5 were configured to simulate a production distributed environment. Each node was provisioned with appropriate network settings and host configurations.
Software Versions
- Java: Oracle JDK 8u212
- Hadoop: 3.1.3
- Zookeeper: 3.5.7
- HBase: 2.4.11
Installation Steps
JDK and Hadoop Installation
- Remove pre-installed OpenJDK
- Transfer installation packages to /opt/software
- Extract to /opt/module directory
- Configure environment variibles in /etc/profile.d/my_env.sh
- Distribute to cluster nodes using scp
scp -r /opt/module/jdk1.8.0_212 user@hadoop103:/opt/module
scp -r /opt/module/jdk1.8.0_212 user@hadoop104:/opt/module
scp -r /opt/module/hadoop-3.1.3 user@hadoop103:/opt/module
scp -r /opt/module/hadoop-3.1.3 user@hadoop104:/opt/module
Hadoop Cluster Configuration
Core configuration files modified:
- core-site.xml: HDFS and YARN settings
- hdfs-site.xml: Replication parameters
- yarn-site.xml: ResourceManager configuration
- mapred-site.xml: MapReduce framework settings
The workers file was configured to include all data node hosts, and the name node was formatted prior to initial startup.
Zookeeper Configuration
Zookeeper requires manual ID assignment. Each node received a unique identifier:
- hadoop102: myid = 2
- hadoop103: myid = 3
- hadoop104: myid = 4
Communication ports 2888 and 3888 were configured for leader-follower communication.
HBase Configuration
HBase integrates with the existing ZooKeeper ensemble. The HBase JAR naming conflict with Hadoop's SLF4J implementation was resolved by renaming the HBase library file.
Data Preparation
The dataset contained approximately 9.4 million sentences consuming roughly 1.43GB of storage. Given the significant volume, the data was split into manageable chunks of 10,000 sentences each using the following Python script:
import os
input_file_path = r'D:\data\sentences.txt'
output_directory = r'D:\data\files'
with open(input_file_path, 'r', encoding='utf-8') as input_file:
lines = input_file.readlines()
total_lines = len(lines)
num_files = (total_lines + 9999) // 10000
for i in range(num_files):
start = i * 10000
end = min((i + 1) * 10000, total_lines)
output_filename = os.path.join(output_directory, f'file{i + 1}.txt')
with open(output_filename, 'w', encoding='utf-8') as output_file:
output_file.writelines(lines[start:end])
This process generated 970 files subsequently uploaded to HDFS for processing.
Inverted Index Implemantation
Mapper Implementation
The Mapper processes input records and extracts term-document associations.
package com.search.index;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class IndexMapper extends Mapper<LongWritable, Text, Text, Text> {
private Text outputKey = new Text();
private Text outputValue = new Text();
private Map<String, Map<String, Integer>> termFrequencies = new HashMap<>();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] parts = value.toString().split(" ");
FileSplit split = (FileSplit) context.getInputSplit();
String fileName = split.getPath().getName();
String[] terms = java.util.Arrays.copyOfRange(parts, 1, parts.length);
for (String term : terms) {
if (!termFrequencies.containsKey(term)) {
termFrequencies.put(term, new HashMap<>());
}
Map<String, Integer> docCounts = termFrequencies.get(term);
docCounts.put(fileName, docCounts.getOrDefault(fileName, 0) + 1);
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
for (Map.Entry<String, Map<String, Integer>> entry : termFrequencies.entrySet()) {
String term = entry.getKey();
Map<String, Integer> docCounts = entry.getValue();
for (Map.Entry<String, Integer> docEntry : docCounts.entrySet()) {
outputKey.set(term);
outputValue.set(docEntry.getKey() + ":" + docEntry.getValue());
context.write(outputKey, outputValue);
}
}
}
}
Reducer Implementation
The Reducer aggregates term occurrences across documents and writes results to HBase.
package com.search.index;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.Text;
import java.io.IOException;
public class IndexReducer extends TableReducer<Text, Text, ImmutableBytesWritable> {
private static Text result = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
StringBuilder documentList = new StringBuilder();
for (Text value : values) {
documentList.append(value.toString()).append("; ");
}
result.set(documentList.toString());
Put put = new Put(key.toString().getBytes());
put.addColumn("info".getBytes(), "index".getBytes(), result.toString().getBytes());
context.write(null, put);
}
}
Driver Configuration
The Driver orchestrates the MapReduce job and configures HBase connectivity.
package com.search.index;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import java.io.IOException;
public class IndexDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://hadoop102:8020");
conf.set("yarn.resourcemanager.hostname", "hadoop103");
conf.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104");
Job job = Job.getInstance(conf);
job.setJarByClass(IndexDriver.class);
job.setMapperClass(IndexMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
TableMapReduceUtil.initTableReducerObj("InvertedIndex", IndexReducer.class, job);
boolean success = job.waitForCompletion(true);
System.exit(success ? 0 : 1);
}
}
Execution
- Create input directory in HDFS:
hadoop fs -mkdir /input
- Upload data files:
hadoop fs -put input /input
- Create HBase table:
hbase shell
create 'InvertedIndex', 'info'
- Execute MapReduce job:
$HBASE_HOME/bin/hbase mapredcp
hadoop jar test3.jar /input
Technical Considerations
Data Splitting Strategy
Splitting large datasets into smaller segments prevents memory overflow during processing and enables paralel execution across cluster nodes.
HBase Integration
Storing inverted indexes in HBase provides fast random access capabilities essential for query-time lookups. The columnar storage model efficiently handles sparse term-document relationships.
Cluster Resource Management
YARN manages application lifecycle and resource allocation, ensuring fair scheduling across concurrent MapReduce jobs.
Conclusion
This implementation demonstrates a complete pipeline for building distributed search infrastructure using Hadoop ecosystem components. The combination of HDFS for storage, MapReduce for processing, and HBase for serving queries provides a scalable foundation for large-scale search applications.