Fading Coder

One Final Commit for the Last Sprint

Home > Tech > Content

RabbitMQ Core Concepts and Five Messaging Patterns

Tech 1

Synchronous vs. Asynchronous Communication

Synchronous communication involves direct, immediate interaction, similar to a video call. Asynchronous communication involves delayed interaction, allowing a sender to communicate with multiple receivers without waiting, akin to texting.

Pros and Cons of Synchronous Calls

  • Pros: Strong timelinses; immediate response.
  • Cons: High code coupling due to direct inter-service calls, making modifications cumbersome. Sequential service calls can lead to resource waste and performance degradation during wait times. Failure of one service may cause cascading failures and system avalanches.

Pros and Cons of Asynchronous Calls

  • Pros:
    • Low Coupling: Uses an event-driven model via a broker, eliminating direct service dependencies.
    • Increased Throughput: Services process tasks independently without waiting for upstream completions.
    • Fault Isolation: Failures in one service do not directly impact others.
    • Traffic Shaping: A broker can buffer incoming requests and release them based on downstream processing capacity, preventing overload.
  • Cons:
    • High Broker Dependency: System reliability hinges on the broker's stability, security, and performance.
    • Obfuscated Call Chains: The loose coupling can make business process flows and debugging more complex.

The choice between synchronous and asynchronous models depends on the specific use case, with synchronous being more common.

What is MQ?

MQ (Message Queue) is a broker that stores and forwards messages in an event-driven architecture.

Common MQ solutions include RabbitMQ, ActiveMQ, RocketMQ, and Kafka. RabbitMQ is chosen for its robustness and features.

RabbitMQ Installasion

RabbitMQ, built on Erlang, is installed here using Docker for simplicity.

  1. Pull the RabbitMQ image.
docker pull rabbitmq:3-management
  1. Run the RabbitMQ container.
docker run \
 -e RABBITMQ_DEFAULT_USER=<your_username> \
 -e RABBITMQ_DEFAULT_PASS=<your_password> \
 --name mq \
 --hostname mq1 \
 -p 15672:15672 \
 -p 5672:5672 \
 -d \
 rabbitmq:3-management
  1. Access the management console at http://<your_ip>:15672 using the credentials set above.

Key Management Console Entities:

  • Channel: A virtual connection for operations.
  • Exchange: Routes messages to queues.
  • Queue: Buffers messages.
  • Virtual Host: A logical grouping for exchanges and queues.

The messaging flow: Publisher -> Exchange -> Queue -> Consumer.

Core Messaging Models

RabbitMQ models fall into two categories:

  1. Basic models: Simple Queue, Work Queue.
  2. Publish/Subscribe models (via exchanges): Fanout, Direct, Topic.

Getting Started with SpringAMQP

SpringAMQP provides a higher-level API over the AMQP protocol. Add the dependency to your pom.xml.

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

Configure connection properties in application.yml.

spring:
  rabbitmq:
    host: 192.168.121.10
    port: 5672
    virtual-host: /
    username: <your_username>
    password: <your_password>

Sending a Message

Use RabbitTemplate to send a message.

@SpringBootTest
public class MessageSenderTest {
    @Autowired
    private RabbitTemplate messageSender;

    @Test
    void sendBasicMessage() {
        String targetQueue = "basic.queue";
        String payload = "Hello, SpringAMQP!";
        messageSender.convertAndSend(targetQueue, payload);
    }
}

Receiving a Message

Use @RabbitListener to listen on a queue.

@Component
public class MessageReceiver {
    @RabbitListener(queues = "basic.queue")
    public void handleBasicMessage(String receivedPayload) {
        System.out.println("Received: " + receivedPayload);
    }
}

Work Queue Pattern

Multiple consumers can subscribe to the same queue. By default, RabbitMQ uses a round-robin dispatch. To enable fair dispatch based on consumer capability, limit prefetching.

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1  # Process one message at a time before fetching the next

Publish/Subscribe Patterns

Fanout Exchange

A Fanout Exchange broadcasts messages to all bound queues.

Define Queues, Exchange, and Bindings

@Configuration
public class BroadcastConfig {
    @Bean
    public FanoutExchange broadcastExchange() {
        return new FanoutExchange("broadcast.exchange");
    }
    @Bean
    public Queue broadcastQueueA() {
        return new Queue("broadcast.queue.a");
    }
    @Bean
    public Queue broadcastQueueB() {
        return new Queue("broadcast.queue.b");
    }
    @Bean
    public Binding bindQueueA(Queue broadcastQueueA, FanoutExchange broadcastExchange) {
        return BindingBuilder.bind(broadcastQueueA).to(broadcastExchange);
    }
    @Bean
    public Binding bindQueueB(Queue broadcastQueueB, FanoutExchange broadcastExchange) {
        return BindingBuilder.bind(broadcastQueueB).to(broadcastExchange);
    }
}

Listeners for Each Queue

@Component
public class BroadcastListener {
    @RabbitListener(queues = "broadcast.queue.a")
    public void handleBroadcastA(String msg) {
        System.out.println("Listener A: " + msg);
    }
    @RabbitListener(queues = "broadcast.queue.b")
    public void handleBroadcastB(String msg) {
        System.out.println("Listener B: " + msg);
    }
}

Publisher Sends to Exchange

@Test
void sendBroadcastMessage() {
    String exchangeName = "broadcast.exchange";
    String payload = "Broadcast Message!!";
    // Routing key is ignored for Fanout exchanges
    messageSender.convertAndSend(exchangeName, "", payload);
}

Direct Exchange

A Direct Exchange routes messages to queues based on an exact match between the mesage's routing key and the queue's binding key.

Define Listeners with Binding Keys

@Component
public class RoutingListener {
    @RabbitListener(bindings = @QueueBinding(
        value = @Queue("direct.queue.x"),
        exchange = @Exchange(name = "direct.exchange", type = ExchangeTypes.DIRECT),
        key = {"red", "blue"}
    ))
    public void handleDirectX(String msg) {
        System.out.println("Queue X (red/blue): " + msg);
    }
    @RabbitListener(bindings = @QueueBinding(
        value = @Queue("direct.queue.y"),
        exchange = @Exchange(name = "direct.exchange", type = ExchangeTypes.DIRECT),
        key = {"red", "yellow"}
    ))
    public void handleDirectY(String msg) {
        System.out.println("Queue Y (red/yellow): " + msg);
    }
}

Publisher Specifies Routing Key

@Test
void sendDirectMessage() {
    String exchangeName = "direct.exchange";
    String payload = "Direct Message!!";
    String routingKey = "blue";
    // Message goes to queues bound with key "blue"
    messageSender.convertAndSend(exchangeName, routingKey, payload);
}

Topic Exchange

A Topic Exchange routes messages using a pattern match between a routing key (a dot-separated list of words) and a binding key (which can include * for a single word and # for zero or more words).

Define Listeners with Pattern Binding Keys

@Component
public class PatternListener {
    @RabbitListener(bindings = @QueueBinding(
        value = @Queue("topic.queue.1"),
        exchange = @Exchange(name = "topic.exchange", type = ExchangeTypes.TOPIC),
        key = "china.#" // Matches 'china.news', 'china.weather', etc.
    ))
    public void handleTopicOne(String msg) {
        System.out.println("Queue 1 (china.#): " + msg);
    }
    @RabbitListener(bindings = @QueueBinding(
        value = @Queue("topic.queue.2"),
        exchange = @Exchange(name = "topic.exchange", type = ExchangeTypes.TOPIC),
        key = "#.news" // Matches 'china.news', 'world.news', etc.
    ))
    public void handleTopicTwo(String msg) {
        System.out.println("Queue 2 (#.news): " + msg);
    }
}

Publisher Sends with a Routing Key

@Test
void sendTopicMessage() {
    String exchangeName = "topic.exchange";
    String payload = "Topic Message!!";
    String routingKey = "china.news";
    // Message goes to queues whose binding pattern matches 'china.news'
    messageSender.convertAndSend(exchangeName, routingKey, payload);
}

Related Articles

Understanding Strong and Weak References in Java

Strong References Strong reference are the most prevalent type of object referencing in Java. When an object has a strong reference pointing to it, the garbage collector will not reclaim its memory. F...

Comprehensive Guide to SSTI Explained with Payload Bypass Techniques

Introduction Server-Side Template Injection (SSTI) is a vulnerability in web applications where user input is improper handled within the template engine and executed on the server. This exploit can r...

Implement Image Upload Functionality for Django Integrated TinyMCE Editor

Django’s Admin panel is highly user-friendly, and pairing it with TinyMCE, an effective rich text editor, simplifies content management significantly. Combining the two is particular useful for bloggi...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.