Fading Coder

One Final Commit for the Last Sprint

Home > Tech > Content

Using Kafka and Flink for Real-Time Data Processing

Tech May 15 1

Implementing a Kafka Producer

Below is a Java-based Kafka producer implementation that sends data to a Kafka topic on a scheduled basis:

@Configuration
@Slf4j
public class ScheduledDataProducer extends Thread {

    public static final String BROKER_URL = "your_broker_ip:9092";
    public static final String TOPIC_NAME = "test";

    public static void main(String[] args) {
        ScheduledDataProducer producer = new ScheduledDataProducer();
        producer.start();
    }

    @Override
    public void run() {
        try {
            startDataProduction();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    @Scheduled(initialDelayString = "${kf.flink.init}", fixedDelayString = "${kf.flink.fixRate}")
    private void startDataProduction() throws InterruptedException {
        log.info("Initializing scheduled producer task");
        Properties config = createKafkaConfig();
        Producer<string string=""> producer = new KafkaProducer<>(config);
        SimpleDateFormat timeFormatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String currentTime = timeFormatter.format(new Date());

        for (int i = 1; i <= Integer.MAX_VALUE; i++) {
            String jsonData = "{\"id\":" + i + ",\"ip\":\"192.168.0." + i + "\",\"date\":\"" + currentTime + "\"}";
            String key = "record-" + i;
            Thread.sleep(300);
            if (i % 10 == 0) {
                Thread.sleep(1000);
            }
            producer.send(new ProducerRecord<>(TOPIC_NAME, key, jsonData));
        }

        producer.close();
    }

    private Properties createKafkaConfig() {
        Properties props = new Properties();
        props.put("bootstrap.servers", BROKER_URL);
        props.put("acks", "1");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        return props;
    }
}</string>

Building the Flink Streaming Application

The following Flink application consumes messages from the Kafka topic and processes them in real time:

public class RealTimeDataProcessor {
    private static final String KAFKA_TOPIC = "test";
    private static final String KAFKA_BROKER = "your_broker_ip:9092";

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(1000);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        Properties kafkaProps = createKafkaProperties();
        DataStream<string> stream = env.addSource(
            new FlinkKafkaConsumer<>(KAFKA_TOPIC, new SimpleStringSchema(), kafkaProps)
                .setStartFromEarliest()
        );

        stream.rebalance()
            .map((MapFunction<string string="">) value -> {
                System.out.println("Received data: " + value);
                return value;
            })
            .print();

        env.execute("Kafka Real-Time Processing Job");
    }

    private static Properties createKafkaProperties() {
        Properties props = new Properties();
        props.put("bootstrap.servers", KAFKA_BROKER);
        props.put("zookeeper.connect", "192.168.47.130:2182");
        props.put("group.id", "flink_consumer_group");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        return props;
    }
}</string></string>

Deploying and Running the Flink Job

Once the application is built into a JAR file, you can deploy it to HDFS and run it with Flink:

hdfs dfs -put your-application.jar /user/flink/your-application.jar

# Navigate to Flink's bin directory
cd /path/to/flink/bin
./flink run --class com.example.RealTimeDataProcessor /user/flink/your-application.jar

Monitoring the Job

After starting the job, you can monitor its progress through the Flink web dashboard. The dashboard typically runs on port 8081 by default.

Related Articles

Understanding Strong and Weak References in Java

Strong References Strong reference are the most prevalent type of object referencing in Java. When an object has a strong reference pointing to it, the garbage collector will not reclaim its memory. F...

Comprehensive Guide to SSTI Explained with Payload Bypass Techniques

Introduction Server-Side Template Injection (SSTI) is a vulnerability in web applications where user input is improper handled within the template engine and executed on the server. This exploit can r...

Implement Image Upload Functionality for Django Integrated TinyMCE Editor

Django’s Admin panel is highly user-friendly, and pairing it with TinyMCE, an effective rich text editor, simplifies content management significantly. Combining the two is particular useful for bloggi...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.