Fading Coder

One Final Commit for the Last Sprint

Home > Tech > Content

Integrating Kafka and Storm with Spring Boot

Tech May 8 3

Kafka and Storm Overview

If you are familiar with Kafka and Storm, you can skip this section. If not, you can refer to my previous blog posts.

Environment Setup for Kafka and Storm

Link: http://www.panchengming.com/2018/01/26/pancm70/

Usage of Kafka

Link: http://www.panchengming.com/2018/01/28/pancm71/ http://www.panchengming.com/2018/02/08/pancm72/

Usage of Storm

Link: http://www.panchengming.com/2018/03/16/pancm75/

Spring Boot Integration with Kafka and Storm

Why Use Spring Boot with Kafka and Storm

Typically, integrating Kafka with Storm meets most requirements. However, scalability might be limited. The current mainstream microservices framework, Spring Clloud, is based on Spring Boot. Therefore, using Spring Boot to integrate Kafka and Storm allows for unified configuration and better scalability.

What Can Be Achieved Using Spring Boot with Kafka and Storm

Generally, Kafka and Storm integration involves using Kafka for data transmission and Storm for real-time processing of data in Kafka.

With Spring Boot, we perform the same tasks but with unified management of Kafka and Storm.

For a clearer understanding, consider the following simple use case:

There is a large amount of user data in the database, some of which is unnecessary (dirty data). We need to clean these user data and store them back into the database, requiring real-time processing with low latency and easy management.

In this scenario, we can use Spring Boot + Kafka + Storm for development.

Development Preparation

Before writing code, we need to clarify what to develop. In the above use case, we need a large amount of data, but here we only create a simple demo to achieve the basic functionality. The conditions to meet are as follows:

  1. Provide an interface to write user data into Kafka;
  2. Use Storm's spout to retrieve data from Kafka and send it to the bolt;
  3. Remove users under 10 years old in the bolt and write the data into MySQL;

Based on the above requiremants, we proceed with the integration of Spring Boot, Kafka, and Storm.

First, we need the relevant JAR packages, so the Maven dependencies are as follows:


<properties>
	<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	<java.version>1.8</java.version>
	<springboot.version>1.5.9.RELEASE</springboot.version>
	<mybatis-spring-boot>1.2.0</mybatis-spring-boot>
	<mysql-connector>5.1.44</mysql-connector>
	<slf4j.version>1.7.25</slf4j.version>
	<logback.version>1.2.3</logback.version>
	<kafka.version>1.0.0</kafka.version>
	<storm.version>1.2.1</storm.version>
	<fastjson.version>1.2.41</fastjson.version>
	<druid>1.1.8</druid>
</properties>


<dependencies>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-web</artifactId>
		<version>${springboot.version}</version>
	</dependency>

	<!-- Spring Boot Mybatis dependency -->
	<dependency>
		<groupId>org.mybatis.spring.boot</groupId>
		<artifactId>mybatis-spring-boot-starter</artifactId>
		<version>${mybatis-spring-boot}</version>
	</dependency>

	<!-- MySQL connection driver dependency -->
	<dependency>
		<groupId>mysql</groupId>
		<artifactId>mysql-connector-java</artifactId>
		<version>${mysql-connector}</version>
	</dependency>


	<dependency>
		<groupId>org.slf4j</groupId>
		<artifactId>slf4j-api</artifactId>
		<version>${slf4j.version}</version>
	</dependency>


	<dependency>
		<groupId>ch.qos.logback</groupId>
		<artifactId>logback-classic</artifactId>
		<version>${logback.version}</version>
	</dependency>

	<dependency>
		<groupId>ch.qos.logback</groupId>
		<artifactId>logback-core</artifactId>
		<version>${logback.version}</version>
	</dependency>


	<!-- kafka -->
	<dependency>
		<groupId>org.apache.kafka</groupId>
		<artifactId>kafka_2.12</artifactId>
		<version>${kafka.version}</version>
		<exclusions>
			<exclusion>
				<groupId>org.apache.zookeeper</groupId>
				<artifactId>zookeeper</artifactId>
			</exclusion>
			<exclusion>
				<groupId>org.slf4j</groupId>
				<artifactId>slf4j-log4j12</artifactId>
			</exclusion>
			<exclusion>
				<groupId>log4j</groupId>
				<artifactId>log4j</artifactId>
			</exclusion>
		</exclusions>
		<scope>provided</scope>
	</dependency>


	<dependency>
		<groupId>org.apache.kafka</groupId>
		<artifactId>kafka-clients</artifactId>
		<version>${kafka.version}</version>
	</dependency>

	<dependency>
		<groupId>org.apache.kafka</groupId>
		<artifactId>kafka-streams</artifactId>
		<version>${kafka.version}</version>
	</dependency>


	<!-- storm related jars -->
	<dependency>
		<groupId>org.apache.storm</groupId>
		<artifactId>storm-core</artifactId>
		<version>${storm.version}</version>
		<!-- exclude dependencies -->
		<exclusions>
			<exclusion>
				<groupId>org.apache.logging.log4j</groupId>
				<artifactId>log4j-slf4j-impl</artifactId>
			</exclusion>
			<exclusion>
				<groupId>org.apache.logging.log4j</groupId>
				<artifactId>log4j-1.2-api</artifactId>
			</exclusion>
			<exclusion>
				<groupId>org.apache.logging.log4j</groupId>
				<artifactId>log4j-web</artifactId>
			</exclusion>
			<exclusion>
				<groupId>org.slf4j</groupId>
				<artifactId>slf4j-log4j12</artifactId>
			</exclusion>
			<exclusion>
				<artifactId>ring-cors</artifactId>
				<groupId>ring-cors</groupId>
			</exclusion>
		</exclusions>
		<scope>provided</scope>
	</dependency>

	<dependency>
		<groupId>org.apache.storm</groupId>
		<artifactId>storm-kafka</artifactId>
		<version>${storm.version}</version>
	</dependency>


	<!-- fastjson related jars -->
	<dependency>
		<groupId>com.alibaba</groupId>
		<artifactId>fastjson</artifactId>
		<version>${fastjson.version}</version>
	</dependency>

	<!-- Druid connection pool dependency -->
	<dependency>
		<groupId>com.alibaba</groupId>
		<artifactId>druid</artifactId>
		<version>${druid}</version>
	</dependency>
</dependencies>

After adding the dependencies, we add the corresponding configurations in application.properties:


# log
logging.config=classpath:logback.xml

## mysql
spring.datasource.url=jdbc:mysql://localhost:3306/springBoot2?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driverClassName=com.mysql.jdbc.Driver


## kafka 
kafka.servers = 192.169.0.23\:9092,192.169.0.24\:9092,192.169.0.25\:9092  
kafka.topicName = USER_TOPIC
kafka.autoCommit = false
kafka.maxPollRecords = 100
kafka.groupId = groupA
kafka.commitRule = earliest

Note: This configuration is partial; the full configuration can be found in my GitHub.

Database script:

-- script for springBoot2 database

CREATE TABLE `t_user` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Auto-incrementing ID',
  `name` varchar(10) DEFAULT NULL COMMENT 'Name',
  `age` int(2) DEFAULT NULL COMMENT 'Age',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=15 DEFAULT CHARSET=utf8

Note: For simplicity, we only create a single table for this demonstration.

Code Writing

Note: I will explain only a few key classes; the complete project link can be found at the bottom of the blog.

Before integrating Kafka and Storm with Spring Boot, we can first write the code for Kafka and Storm separately, then proceed with the integration.

First, we get the data source, i.e., the spout in Storm retrieves data from Kafka.

In previous Storm tutorials, we discussed the Storm execution flow. The spout is a component that retrieves data in Storm. We mainly implement the nextTuple method, where we write the code to retrieve data from Kafka. After starting Storm, the data retrieval can be performed.

The main code for the spout class is as follows:

@Override
public void nextTuple() {
	for (;;) {
		try {
			msgList = consumer.poll(100);
			if (null != msgList && !msgList.isEmpty()) {
				String msg = "";
				List<User> list = new ArrayList<User>();
				for (ConsumerRecord<String, String> record : msgList) {
					// Original data
					msg = record.value();
					if (null == msg || "".equals(msg.trim())) {
						continue;
					}
					try {
						list.add(JSON.parseObject(msg, User.class));
					} catch (Exception e) {
						logger.error("Data format mismatch! Data: {}", msg);
						continue;
					}
				}
				logger.info("Spout emits data:" + list);
				// Send to bolt
				this.collector.emit(new Values(JSON.toJSONString(list)));
				consumer.commitAsync();
			} else {
				TimeUnit.SECONDS.sleep(3);
				logger.info("No data retrieved...");
			}
		} catch (Exception e) {
			logger.error("Message queue processing exception!", e);
			try {
				TimeUnit.SECONDS.sleep(10);
			} catch (InterruptedException e1) {
				logger.error("Pause failed!", e1);
			}
		}
	}
}

Note: If the spout fails to send data, it will retry!

The spout class mainly sends data retrieved from Kafka to the bolt, which processes the data. After successful processing, the data is written to the database, and an acknowledgment is sent to the spout to avoid retransmission.

The main method for the bolt class is execute, where we primarily implement the logic. Since we use only one bolt, there is no need to define a field for further forwarding.

The implementation code is as follows:

@Override
public void execute(Tuple tuple) {
	String msg = tuple.getStringByField(Constants.FIELD);
	try {
		List<User> listUser = JSON.parseArray(msg, User.class);
		// Remove users under 10 years old
		if (listUser != null && listUser.size() > 0) {
			Iterator<User> iterator = listUser.iterator();
			while (iterator.hasNext()) {
				User user = iterator.next();
				if (user.getAge() < 10) {
					logger.warn("Bolt removes data: {}", user);
					iterator.remove();
				}
			}
			if (listUser != null && listUser.size() > 0) {
				userService.insertBatch(listUser);
			}
		}
	} catch (Exception e) {
		logger.error("Bolt data processing failed! Data: {}", msg, e);
	}
}

After writing the spout and bolt, we proceed to write the Storm main class.

The main class of Storm is responsible for submitting the Topology. When submitting the Topology, we need to set up the spout and bolt accordingly. There are two modes for the Topology operation:

  1. Local mode, which uses the local Storm JAR to simulate the environment.
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("TopologyApp", conf, builder.createTopology());

  1. Remote mode, which runs on the Storm cluster.
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());

To make it convenient, both methods are implemented, controlled by the args parameter in the main method.

The configuration details for the Topology are well explained in the code comments, so I won't elaborate further here. The code is as follows:

public void runStorm(String[] args) {
	// Define a topology
	TopologyBuilder builder = new TopologyBuilder();
	// Set 1 executor (thread), default is 1
	builder.setSpout(Constants.KAFKA_SPOUT, new KafkaInsertDataSpout(), 1);
	// shuffleGrouping: random grouping
	// Set 1 executor (thread), and two tasks
	builder.setBolt(Constants.INSERT_BOLT, new InsertBolt(), 1).setNumTasks(1).shuffleGrouping(Constants.KAFKA_SPOUT);
	Config conf = new Config();
	// Set an acker
	conf.setNumAckers(1);
	// Set a worker
	conf.setNumWorkers(1);
	try {
		// If there are parameters, submit to the cluster and use the first parameter as the topology name
		// If there are no parameters, submit locally
		if (args != null && args.length > 0) {
			logger.info("Running remote mode");
			StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
		} else {
			// Start local mode
			logger.info("Running local mode");
			LocalCluster cluster = new LocalCluster();
			cluster.submitTopology("TopologyApp", conf, builder.createTopology());
		}
	} catch (Exception e) {
		logger.error("Storm startup failed! Program exiting!", e);
		System.exit(1);
	}
	logger.info("Storm started successfully...");
}

After writing the Kafka and Storm-related code, we now proceed with the integration with Spring Boot!

Before integrating with Spring Boot, we need to solve the following issues.

  1. How to submit the Storm Topology within the Spring Boot application?

Storm submits the Topology to determine how to start. Usually, it runs via the main method. However, Spring Boot also starts via the main method. So, how to resolve this issue?

  • Solution: Place the Storm Topology in the Spring Boot main class, allowing it to start along with Spring Boot.
  • Result: It can start together (which is expected). However, the next issue arises: the bolt and spout classes cannot use Spring annotations.
  1. How to enable Spring annotations for the bolt and spout classes?
  • Solution: Spout and bolt classes are instantiated by the Nimbus node and serialized and sent to the supervisor, then deserialized. Thus, they cannot use annotations. Instead, we can dynamically retrieve Spring beans.
  • Result: After implementing dynamic bean retrieval, Storm starts successfully.
  1. Sometimes it starts normally, sometimes not, and dynamic beans also fail to retrieve?
  • Solution: After resolving issues 1 and 2, sometimes problem 3 occurs. After extensive research, it was found that the issue was due to Spring Boot's hot deployment. Removing it resolved the issue.

These were the three issues I encountered during the integration. The solutions seem feasible. Perhaps other reasons could cause these issues, but after integrating this way, no further problems have occurred. If there are any inaccuracies in the above solutions, please feel free to provide feedback!

After solving the above issues, we return to the code.

The program's entry point, the main class, after integration, is as follows:

@SpringBootApplication
public class Application {

	public static void main(String[] args) {
		// Start embedded Tomcat and initialize Spring environment and its components
		ConfigurableApplicationContext context = SpringApplication.run(Application.class, args);
		GetSpringBean springBean = new GetSpringBean();
		springBean.setApplicationContext(context);
		TopologyApp app = context.getBean(TopologyApp.class);
		app.runStorm(args);
	}

}

Dynamic bean retrieval code:

public class GetSpringBean implements ApplicationContextAware {

	private static ApplicationContext context;

	public static Object getBean(String name) {
		return context.getBean(name);
	}

	public static <T> T getBean(Class<T> c) {
		return context.getBean(c);
	}

	@Override
	public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
		if (applicationContext != null) {
			context = applicationContext;
		}
	}

}

The main code explanation ends here. Other parts are similar to before.

Testing Results

After succesfully starting the program, we first call the interface to add a few data entries to Kafka.

Add Request:

POST http://localhost:8087/api/user

{"name":"Zhang San","age":20}
{"name":"Li Si","age":10}
{"name":"Wang Wu","age":5}

After successfully adding, we can use xshell tools to check the data in the Kafka cluster.

Run: **kafka-console-consumer.sh --zookeeper master:2181 --topic USER_TOPIC --from-beginning**

Then, we can see the following output results.

This also indicates that the data has been successfully written to Kafka.

Since it is real-time data retrieval from Kafka, we can also view the printed statements in the console.

Console Output:

 INFO  com.pancm.storm.spout.KafkaInsertDataSpout - Spout emits data:[{"age":5,"name":"Wang Wu"}, {"age":10,"name":"Li Si"}, {"age":20,"name":"Zhang San"}]
 WARN  com.pancm.storm.bolt.InsertBolt - Bolt removes data:{"age":5,"name":"Wang Wu"}
 INFO  com.alibaba.druid.pool.DruidDataSource - {dataSource-1} initialized
 DEBUG com.pancm.dao.UserDao.insertBatch - ==> Preparing: insert into t_user (name,age) values (?,?), (?,?) 
 DEBUG com.pancm.dao.UserDao.insertBatch - ==> Parameters: Li Si(String), 10(Integer), Zhang San(String), 20(Integer)
 DEBUG com.pancm.dao.UserDao.insertBatch - <==    Updates: 2
 INFO  com.pancm.service.impl.UserServiceImpl - Batch added 2 records successfully!

We can see the process and result of the handling in the console.

Then, we can also query all the data in the database through the interface.

Query Request:

GET http://localhost:8087/api/user

Return Result:

[{
	"id":1,
	"name":"Li Si",
	"age":10
},{
	"id":2,
	"name":"Zhang San",
	"age":20
}]

The test results returned in the code clearly match our expectations.

Related Articles

Comprehensive Guide to SSTI Explained with Payload Bypass Techniques

Introduction Server-Side Template Injection (SSTI) is a vulnerability in web applications where user input is improper handled within the template engine and executed on the server. This exploit can r...

Implement Image Upload Functionality for Django Integrated TinyMCE Editor

Django’s Admin panel is highly user-friendly, and pairing it with TinyMCE, an effective rich text editor, simplifies content management significantly. Combining the two is particular useful for bloggi...

SBUS Signal Analysis and Communication Implementation Using STM32 with Fus Remote Controller

Overview In a recent project, I utilized the SBUS protocol with the Fus remote controller to control a vehicle's basic operations, including movement, lights, and mode switching. This article is aimed...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.