Fading Coder

One Final Commit for the Last Sprint

Home > Tech > Content

Building a Distributed Hadoop Cluster for Inverted Index Implementation with MapReduce

Tech May 13 1

Overview

This project details the construction of a fully distributed Hadoop cluster and the implementation of an inverted index using MapReduce. The inverted index serves as a fundamental data structure in search engines, enabling efficient retrieval of documents containing specific terms.

System Architecture

Core Components

HDFS (Hadoop Distributed File System)

HDFS provides scalable distributed storage essential for managing large-scale web crawl data. Its block-based storage mechanism with configurable replication factor ensures data reliability and fault tolerance, forming the foundation for subsequent processing tasks.

MapReduce

The MapReduce programming model enables parallel processing of large datasets by dividing computational tasks into independent subtasks. This framework proves particularly effective for search engine operations including index construction, query processing, and ranking calculations.

HBase

HBase serves as a distributed, scalable columnar database well-suited for storing intermediate data such as inverted indexes. Its random read/write capabilities support high-throughput query operations critical for search functionality.

YARN

YARN (Yet Another Resource Negotiator) manages cluster resources, scheduling and allocating capacity across various applications running on the Hadoop cluster.

Cluster Deployment Topology

The cluster consists of three nodes with the following role distribution:

Node Roles
hadoop102 NameNode, DataNode, NodeManager
hadoop103 DataNode, ResourceManager, NodeManager
hadoop104 SecondaryNameNode, DataNode, NodeManager

Environment Setup

Virtual Machine Configuration

Three virtual machines running CentOS 7.5 were configured to simulate a production distributed environment. Each node was provisioned with appropriate network settings and host configurations.

Software Versions

  • Java: Oracle JDK 8u212
  • Hadoop: 3.1.3
  • Zookeeper: 3.5.7
  • HBase: 2.4.11

Installation Steps

JDK and Hadoop Installation

  1. Remove pre-installed OpenJDK
  2. Transfer installation packages to /opt/software
  3. Extract to /opt/module directory
  4. Configure environment variibles in /etc/profile.d/my_env.sh
  5. Distribute to cluster nodes using scp
scp -r /opt/module/jdk1.8.0_212 user@hadoop103:/opt/module
scp -r /opt/module/jdk1.8.0_212 user@hadoop104:/opt/module
scp -r /opt/module/hadoop-3.1.3 user@hadoop103:/opt/module
scp -r /opt/module/hadoop-3.1.3 user@hadoop104:/opt/module

Hadoop Cluster Configuration

Core configuration files modified:

  • core-site.xml: HDFS and YARN settings
  • hdfs-site.xml: Replication parameters
  • yarn-site.xml: ResourceManager configuration
  • mapred-site.xml: MapReduce framework settings

The workers file was configured to include all data node hosts, and the name node was formatted prior to initial startup.

Zookeeper Configuration

Zookeeper requires manual ID assignment. Each node received a unique identifier:

  • hadoop102: myid = 2
  • hadoop103: myid = 3
  • hadoop104: myid = 4

Communication ports 2888 and 3888 were configured for leader-follower communication.

HBase Configuration

HBase integrates with the existing ZooKeeper ensemble. The HBase JAR naming conflict with Hadoop's SLF4J implementation was resolved by renaming the HBase library file.

Data Preparation

The dataset contained approximately 9.4 million sentences consuming roughly 1.43GB of storage. Given the significant volume, the data was split into manageable chunks of 10,000 sentences each using the following Python script:

import os

input_file_path = r'D:\data\sentences.txt'
output_directory = r'D:\data\files'

with open(input_file_path, 'r', encoding='utf-8') as input_file:
    lines = input_file.readlines()

total_lines = len(lines)
num_files = (total_lines + 9999) // 10000

for i in range(num_files):
    start = i * 10000
    end = min((i + 1) * 10000, total_lines)
    output_filename = os.path.join(output_directory, f'file{i + 1}.txt')
    
    with open(output_filename, 'w', encoding='utf-8') as output_file:
        output_file.writelines(lines[start:end])

This process generated 970 files subsequently uploaded to HDFS for processing.

Inverted Index Implemantation

Mapper Implementation

The Mapper processes input records and extracts term-document associations.

package com.search.index;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class IndexMapper extends Mapper<LongWritable, Text, Text, Text> {
    private Text outputKey = new Text();
    private Text outputValue = new Text();
    private Map<String, Map<String, Integer>> termFrequencies = new HashMap<>();

    @Override
    protected void map(LongWritable key, Text value, Context context) 
            throws IOException, InterruptedException {
        String[] parts = value.toString().split(" ");
        
        FileSplit split = (FileSplit) context.getInputSplit();
        String fileName = split.getPath().getName();
        
        String[] terms = java.util.Arrays.copyOfRange(parts, 1, parts.length);
        
        for (String term : terms) {
            if (!termFrequencies.containsKey(term)) {
                termFrequencies.put(term, new HashMap<>());
            }
            
            Map<String, Integer> docCounts = termFrequencies.get(term);
            docCounts.put(fileName, docCounts.getOrDefault(fileName, 0) + 1);
        }
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        for (Map.Entry<String, Map<String, Integer>> entry : termFrequencies.entrySet()) {
            String term = entry.getKey();
            Map<String, Integer> docCounts = entry.getValue();
            
            for (Map.Entry<String, Integer> docEntry : docCounts.entrySet()) {
                outputKey.set(term);
                outputValue.set(docEntry.getKey() + ":" + docEntry.getValue());
                context.write(outputKey, outputValue);
            }
        }
    }
}

Reducer Implementation

The Reducer aggregates term occurrences across documents and writes results to HBase.

package com.search.index;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.Text;

import java.io.IOException;

public class IndexReducer extends TableReducer<Text, Text, ImmutableBytesWritable> {
    private static Text result = new Text();

    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) 
            throws IOException, InterruptedException {
        StringBuilder documentList = new StringBuilder();
        
        for (Text value : values) {
            documentList.append(value.toString()).append("; ");
        }
        
        result.set(documentList.toString());
        Put put = new Put(key.toString().getBytes());
        put.addColumn("info".getBytes(), "index".getBytes(), result.toString().getBytes());
        context.write(null, put);
    }
}

Driver Configuration

The Driver orchestrates the MapReduce job and configures HBase connectivity.

package com.search.index;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import java.io.IOException;

public class IndexDriver {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://hadoop102:8020");
        conf.set("yarn.resourcemanager.hostname", "hadoop103");
        conf.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104");

        Job job = Job.getInstance(conf);
        job.setJarByClass(IndexDriver.class);
        job.setMapperClass(IndexMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        TableMapReduceUtil.initTableReducerObj("InvertedIndex", IndexReducer.class, job);

        boolean success = job.waitForCompletion(true);
        System.exit(success ? 0 : 1);
    }
}

Execution

  1. Create input directory in HDFS:
hadoop fs -mkdir /input
  1. Upload data files:
hadoop fs -put input /input
  1. Create HBase table:
hbase shell
create 'InvertedIndex', 'info'
  1. Execute MapReduce job:
$HBASE_HOME/bin/hbase mapredcp
hadoop jar test3.jar /input

Technical Considerations

Data Splitting Strategy

Splitting large datasets into smaller segments prevents memory overflow during processing and enables paralel execution across cluster nodes.

HBase Integration

Storing inverted indexes in HBase provides fast random access capabilities essential for query-time lookups. The columnar storage model efficiently handles sparse term-document relationships.

Cluster Resource Management

YARN manages application lifecycle and resource allocation, ensuring fair scheduling across concurrent MapReduce jobs.

Conclusion

This implementation demonstrates a complete pipeline for building distributed search infrastructure using Hadoop ecosystem components. The combination of HDFS for storage, MapReduce for processing, and HBase for serving queries provides a scalable foundation for large-scale search applications.

Tags: Hadoop

Related Articles

Understanding Strong and Weak References in Java

Strong References Strong reference are the most prevalent type of object referencing in Java. When an object has a strong reference pointing to it, the garbage collector will not reclaim its memory. F...

Comprehensive Guide to SSTI Explained with Payload Bypass Techniques

Introduction Server-Side Template Injection (SSTI) is a vulnerability in web applications where user input is improper handled within the template engine and executed on the server. This exploit can r...

Implement Image Upload Functionality for Django Integrated TinyMCE Editor

Django’s Admin panel is highly user-friendly, and pairing it with TinyMCE, an effective rich text editor, simplifies content management significantly. Combining the two is particular useful for bloggi...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.