Securing Apache Kafka with SASL/PLAIN Authentication and Spring Boot Client Integration
Enabling SASL/PLAIN Authentication in Kafka 2.4.0
To protect Kafka clusters exposed to untrusted networks, SASL/PLAIN authentication—paired with TLS ancryption—is implemented for secure client and inteer-broker communication.
Broker-Side JAAS Configuration
Create a JAAS configuration file (e.g., kafka_server_jaas.conf) defining user credentials and broker identity:
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret"
user_consumer="consumer-pass"
user_producer="producer-pass";
};
This file must be referenced via JVM argument when starting each broker:
-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf
Update the broker startup script (kafka-server-start.sh) to inject this parameter into KAFKA_HEAP_OPTS:
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf"
Broker Security Properties
In server.properties, configure SASL over SSL:
listeners=SASL_SSL://0.0.0.0:9093
advertised.listeners=SASL_SSL://kafka.example.com:9093
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
ssl.keystore.location=/opt/kafka/config/kafka.server.keystore.jks
ssl.keystore.password=keystore-pass
ssl.key.password=key-pass
ssl.truststore.location=/opt/kafka/config/kafka.server.truststore.jks
ssl.truststore.password=truststore-pass
Note: SSL configuration is mandatory to prevent plaintext credential exposure.
Client-Side Authentication Setup
For Java clients—including Spring Boot applications—configure SASL parameters programmatically or via properties.
Spring Boot Kafka Configuration Class
@Configuration
@EnableKafka
public class SecureKafkaConfig {
@Bean
public ProducerFactory<String, String> secureProducerFactory(KafkaProperties props) {
Map<String, Object> configs = props.buildProducerProperties();
configs.put(ProducerConfig.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
configs.put(ProducerConfig.SASL_MECHANISM_CLASS_NAME, "PLAIN");
configs.put(SaslConfigs.SASL_JAAS_CONFIG,
"org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"producer\" password=\"producer-pass\";");
return new DefaultKafkaProducerFactory<>(configs);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> factory) {
return new KafkaTemplate<>(factory);
}
@Bean
public ConsumerFactory<String, String> secureConsumerFactory(KafkaProperties props) {
Map<String, Object> configs = props.buildConsumerProperties();
configs.put(ConsumerConfig.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
configs.put(ConsumerConfig.SASL_MECHANISM_CLASS_NAME, "PLAIN");
configs.put(SaslConfigs.SASL_JAAS_CONFIG,
"org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"consumer\" password=\"consumer-pass\";");
return new DefaultKafkaConsumerFactory<>(configs);
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(2000);
return factory;
}
}
Application Properties (application.yml)
spring:
kafka:
bootstrap-servers: kafka.example.com:9093
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: secure-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
Optional: External Credential Handling
To avoid hardcoding secrets, implement custom CallbackHandler classes and set them via:
sasl.server.callback.handler.class(broker-side)sasl.client.callback.handler.class(client-side)
These handlers can fetch credentials dynamically from vaults, environment variables, or external auth services.
Validation Steps
- Start brokers with updated configurations and verify no
SaslExceptionappears in logs. - Use
kafka-console-producer.shwith SASL options to test connectivity:bin/kafka-console-producer.sh \ --bootstrap-server kafka.example.com:9093 \ --topic test-topic \ --producer-property security.protocol=SASL_SSL \ --producer-property sasl.mechanism=PLAIN \ --producer-property sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username='producer' password='producer-pass';" - Confirm ACL enforcement by attempting connections with invalid credentials.