Using Kafka and Flink for Real-Time Data Processing
Implementing a Kafka Producer
Below is a Java-based Kafka producer implementation that sends data to a Kafka topic on a scheduled basis:
@Configuration
@Slf4j
public class ScheduledDataProducer extends Thread {
public static final String BROKER_URL = "your_broker_ip:9092";
public static final String TOPIC_NAME = "test";
public static void main(String[] args) {
ScheduledDataProducer producer = new ScheduledDataProducer();
producer.start();
}
@Override
public void run() {
try {
startDataProduction();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Scheduled(initialDelayString = "${kf.flink.init}", fixedDelayString = "${kf.flink.fixRate}")
private void startDataProduction() throws InterruptedException {
log.info("Initializing scheduled producer task");
Properties config = createKafkaConfig();
Producer<string string=""> producer = new KafkaProducer<>(config);
SimpleDateFormat timeFormatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String currentTime = timeFormatter.format(new Date());
for (int i = 1; i <= Integer.MAX_VALUE; i++) {
String jsonData = "{\"id\":" + i + ",\"ip\":\"192.168.0." + i + "\",\"date\":\"" + currentTime + "\"}";
String key = "record-" + i;
Thread.sleep(300);
if (i % 10 == 0) {
Thread.sleep(1000);
}
producer.send(new ProducerRecord<>(TOPIC_NAME, key, jsonData));
}
producer.close();
}
private Properties createKafkaConfig() {
Properties props = new Properties();
props.put("bootstrap.servers", BROKER_URL);
props.put("acks", "1");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return props;
}
}</string>
Building the Flink Streaming Application
The following Flink application consumes messages from the Kafka topic and processes them in real time:
public class RealTimeDataProcessor {
private static final String KAFKA_TOPIC = "test";
private static final String KAFKA_BROKER = "your_broker_ip:9092";
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Properties kafkaProps = createKafkaProperties();
DataStream<string> stream = env.addSource(
new FlinkKafkaConsumer<>(KAFKA_TOPIC, new SimpleStringSchema(), kafkaProps)
.setStartFromEarliest()
);
stream.rebalance()
.map((MapFunction<string string="">) value -> {
System.out.println("Received data: " + value);
return value;
})
.print();
env.execute("Kafka Real-Time Processing Job");
}
private static Properties createKafkaProperties() {
Properties props = new Properties();
props.put("bootstrap.servers", KAFKA_BROKER);
props.put("zookeeper.connect", "192.168.47.130:2182");
props.put("group.id", "flink_consumer_group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
return props;
}
}</string></string>
Deploying and Running the Flink Job
Once the application is built into a JAR file, you can deploy it to HDFS and run it with Flink:
hdfs dfs -put your-application.jar /user/flink/your-application.jar
# Navigate to Flink's bin directory
cd /path/to/flink/bin
./flink run --class com.example.RealTimeDataProcessor /user/flink/your-application.jar
Monitoring the Job
After starting the job, you can monitor its progress through the Flink web dashboard. The dashboard typically runs on port 8081 by default.