Fading Coder

One Final Commit for the Last Sprint

Home > Tech > Content

Introduction to Real-Time Stream Processing with Apache Storm

Tech May 9 4

Apache Storm is an open-source distributed computation system designed for processing real-time data streams. Often compared to Hadoop for batch processing, Storm excels in unbounded data scenarios where low latency is critical, such as real-time analytics, online machine learning, and continuous computation. A Storm cluster operates on a master-slave architecture where a master node (Nimbus) distributes tasks across worker nodes (Supervisors). The fundamental unit of work is the Topology, a graph of processing logic that runs indefinitely until terminated.

Core Abstractions

The architecture relies on several key concepts:

  • Topology: A directed acyclic graph (DAG) defining the data flow. It consists of Spouts (data sources) and Bolts (processing units).
  • Stream: An unbounded sequence of tuples that forms the data pipeline between components.
  • Spout: The entry point for data. Spouts connect to external sources (like message queues) and emit tuples into the topology.
  • Bolt: Components that process incoming streams. Bolts can filter, aggregate, join, or store data, emitting new streams for downstream processing.
  • Stream Groupings: Mechanisms defining how tuples are distributed between Bolt tasks (e.g., Shuffle Grouping for random distribution, Fields Grouping for consistent hashing).

Project Configuration

To develop a Storm application, Maven is used to manage dependencies. The storm-core dependency provides the necessary libraries for local testing and remote cluster deployment.

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>1.2.2</version>
    <scope>provided</scope>
</dependency>

Implementing a Basic Topology

A minimal topology requires a Spout to generate data and a Bolt to consume it. This example demonstrates a simple pass-through mechanism.

Defining the Data Source (Spout)

The MessageProducerSpout class generates a sequence of messages. It extends BaseRichSpout and implements the core lifecycle methods.

public class MessageProducerSpout extends BaseRichSpout {
    private SpoutOutputCollector collector;
    private int index = 0;
    private String[] messages = {"Event A", "Event B", "Event C"};

    @Override
    public void open(Map map, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void nextTuple() {
        if (index < messages.length) {
            collector.emit(new Values(messages[index]));
            index++;
        }
        Utils.sleep(100);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("message"));
    }
}

Defining the Processing Logic (Bolt)

The ConsolePrinterBolt receives messages and logs them to the console. Bolts handle the actual stream processing logic.

public class ConsolePrinterBolt extends BaseRichBolt {
    private OutputCollector collector;

    @Override
    public void prepare(Map map, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
        String msg = tuple.getStringByField("message");
        System.out.println("Processing: " + msg);
        collector.ack(tuple);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // No output for this bolt
    }
}

Topology Execution

The main class constructs the topology, linking the Spout to the Bolt via a shuffle grouping, and submits it to a local cluster for testing.

public class BasicTopologyRunner {
    public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout-producer", new MessageProducerSpout());
        builder.setBolt("bolt-printer", new ConsolePrinterBolt())
               .shuffleGrouping("spout-producer");

        Config conf = new Config();
        conf.setDebug(true);

        try (LocalCluster cluster = new LocalCluster()) {
            cluster.submitTopology("basic-demo", conf, builder.createTopology());
            Thread.sleep(5000);
        }
    }
}

Chaining Bolts for Complex Processing

Real-world scenarios often require multiple processing stages. This example implements a word frequency counter, demonstrating how to chain Bolts and use field groupings to ensure consistent routing.

Data Partitioning (Split Bolt)

The first bolt normalizes text and splits it into individual words.

public class SentenceSplitBolt extends BaseRichBolt {
    private OutputCollector collector;

    @Override
    public void prepare(Map map, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
        String sentence = tuple.getString(0);
        String[] words = sentence.toLowerCase().split("\\s+");
        for (String word : words) {
            if (!word.isEmpty()) {
                collector.emit(new Values(word));
            }
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

Stateful Aggregation (Count Bolt)

The WordCounterBolt uses a hash map to count occurrences. Field grouping ensures that all tuples for a specific word are sent to the same task instance.

public class WordCounterBolt extends BaseRichBolt {
    private Map<String, Integer> counters;
    private OutputCollector collector;

    @Override
    public void prepare(Map map, TopologyContext context, OutputCollector collector) {
        this.counters = new HashMap<>();
        this.collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
        String word = tuple.getStringByField("word");
        counters.merge(word, 1, Integer::sum);
    }

    @Override
    public void cleanup() {
        System.out.println("--- Final Word Counts ---");
        counters.forEach((k, v) -> System.out.println(k + ": " + v));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {}
}

Topology Assembly

The topology links a sentence spout to the splitter, and the splitter to the counter using a field grouping on the "word" field.

public class WordCountTopology {
    public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("text-source", new MessageProducerSpout());
        builder.setBolt("split-bolt", new SentenceSplitBolt())
               .shuffleGrouping("text-source");
        builder.setBolt("count-bolt", new WordCounterBolt())
               .fieldsGrouping("split-bolt", new Fields("word"));

        Config conf = new Config();
        try (LocalCluster cluster = new LocalCluster()) {
            cluster.submitTopology("word-count", conf, builder.createTopology());
            Thread.sleep(10000);
        }
    }
}

Related Articles

Understanding Strong and Weak References in Java

Strong References Strong reference are the most prevalent type of object referencing in Java. When an object has a strong reference pointing to it, the garbage collector will not reclaim its memory. F...

Comprehensive Guide to SSTI Explained with Payload Bypass Techniques

Introduction Server-Side Template Injection (SSTI) is a vulnerability in web applications where user input is improper handled within the template engine and executed on the server. This exploit can r...

Implement Image Upload Functionality for Django Integrated TinyMCE Editor

Django’s Admin panel is highly user-friendly, and pairing it with TinyMCE, an effective rich text editor, simplifies content management significantly. Combining the two is particular useful for bloggi...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.