Fading Coder

One Final Commit for the Last Sprint

Home > Tech > Content

Securing Apache Kafka with SASL/PLAIN Authentication and Spring Boot Client Integration

Tech 1

Enabling SASL/PLAIN Authentication in Kafka 2.4.0

To protect Kafka clusters exposed to untrusted networks, SASL/PLAIN authentication—paired with TLS ancryption—is implemented for secure client and inteer-broker communication.

Broker-Side JAAS Configuration

Create a JAAS configuration file (e.g., kafka_server_jaas.conf) defining user credentials and broker identity:

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin-secret"
    user_admin="admin-secret"
    user_consumer="consumer-pass"
    user_producer="producer-pass";
};

This file must be referenced via JVM argument when starting each broker:

-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf

Update the broker startup script (kafka-server-start.sh) to inject this parameter into KAFKA_HEAP_OPTS:

export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf"

Broker Security Properties

In server.properties, configure SASL over SSL:

listeners=SASL_SSL://0.0.0.0:9093
advertised.listeners=SASL_SSL://kafka.example.com:9093
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
ssl.keystore.location=/opt/kafka/config/kafka.server.keystore.jks
ssl.keystore.password=keystore-pass
ssl.key.password=key-pass
ssl.truststore.location=/opt/kafka/config/kafka.server.truststore.jks
ssl.truststore.password=truststore-pass

Note: SSL configuration is mandatory to prevent plaintext credential exposure.

Client-Side Authentication Setup

For Java clients—including Spring Boot applications—configure SASL parameters programmatically or via properties.

Spring Boot Kafka Configuration Class
@Configuration
@EnableKafka
public class SecureKafkaConfig {

    @Bean
    public ProducerFactory<String, String> secureProducerFactory(KafkaProperties props) {
        Map<String, Object> configs = props.buildProducerProperties();
        configs.put(ProducerConfig.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        configs.put(ProducerConfig.SASL_MECHANISM_CLASS_NAME, "PLAIN");
        configs.put(SaslConfigs.SASL_JAAS_CONFIG,
            "org.apache.kafka.common.security.plain.PlainLoginModule required " +
            "username=\"producer\" password=\"producer-pass\";");
        return new DefaultKafkaProducerFactory<>(configs);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> factory) {
        return new KafkaTemplate<>(factory);
    }

    @Bean
    public ConsumerFactory<String, String> secureConsumerFactory(KafkaProperties props) {
        Map<String, Object> configs = props.buildConsumerProperties();
        configs.put(ConsumerConfig.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        configs.put(ConsumerConfig.SASL_MECHANISM_CLASS_NAME, "PLAIN");
        configs.put(SaslConfigs.SASL_JAAS_CONFIG,
            "org.apache.kafka.common.security.plain.PlainLoginModule required " +
            "username=\"consumer\" password=\"consumer-pass\";");
        return new DefaultKafkaConsumerFactory<>(configs);
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
    kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(2000);
        return factory;
    }
}
Application Properties (application.yml)
spring:
  kafka:
    bootstrap-servers: kafka.example.com:9093
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: secure-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

Optional: External Credential Handling

To avoid hardcoding secrets, implement custom CallbackHandler classes and set them via:

  • sasl.server.callback.handler.class (broker-side)
  • sasl.client.callback.handler.class (client-side)

These handlers can fetch credentials dynamically from vaults, environment variables, or external auth services.

Validation Steps

  1. Start brokers with updated configurations and verify no SaslException appears in logs.
  2. Use kafka-console-producer.sh with SASL options to test connectivity:
    bin/kafka-console-producer.sh \
      --bootstrap-server kafka.example.com:9093 \
      --topic test-topic \
      --producer-property security.protocol=SASL_SSL \
      --producer-property sasl.mechanism=PLAIN \
      --producer-property sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username='producer' password='producer-pass';"
    
  3. Confirm ACL enforcement by attempting connections with invalid credentials.
Tags: Kafkasasl

Related Articles

Understanding Strong and Weak References in Java

Strong References Strong reference are the most prevalent type of object referencing in Java. When an object has a strong reference pointing to it, the garbage collector will not reclaim its memory. F...

Comprehensive Guide to SSTI Explained with Payload Bypass Techniques

Introduction Server-Side Template Injection (SSTI) is a vulnerability in web applications where user input is improper handled within the template engine and executed on the server. This exploit can r...

Implement Image Upload Functionality for Django Integrated TinyMCE Editor

Django’s Admin panel is highly user-friendly, and pairing it with TinyMCE, an effective rich text editor, simplifies content management significantly. Combining the two is particular useful for bloggi...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.