Implementing RabbitMQ Message Sending on Android
RabbitMQ is a message broker that facilitates communication between applications using message queues. It enables decoupled interactions by allowing producers to send messages to queues, which consumers can then retrieve asynchronously.
Key Components in RabbitMQ
- Exchange: Routes messages to queues based on defined rules.
- Queue: Holds messages until they are consumed.
- Channel: A virtual connection within a TCP connection for message operations.
- Binding: Links a queue to an exchange with specifci routing keys.
Initializing RabbitMQ Connection
public void setupConnectionFactory() {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.1.100");
connectionFactory.setPort(5672);
connectionFactory.setUsername("user");
connectionFactory.setPassword("pass");
connectionFactory.setAutomaticRecoveryEnabled(false);
}
Creating a Message Producer
public void startMessageProducer(final Handler callbackHandler) {
Thread producerThread = new Thread(() -> {
while (true) {
try {
Connection conn = connectionFactory.newConnection();
Channel msgChannel = conn.createChannel();
msgChannel.confirmSelect();
while (true) {
String msgContent = messageBuffer.takeFirst();
try {
msgChannel.basicPublish("DirectExchange", "routingKey", null, msgContent.getBytes());
Log.i("Producer", "Sent: " + msgContent);
msgChannel.waitForConfirmsOrDie();
} catch (Exception ex) {
Log.e("Producer", "Failed: " + msgContent);
messageBuffer.putFirst(msgContent);
throw ex;
}
}
} catch (InterruptedException ie) {
break;
} catch (Exception e) {
Message msg = new Message();
msg.what = 500;
Bundle data = new Bundle();
if (retryCount > 5) {
data.putString("error", "Reconnection attempts exhausted");
data.putBoolean("terminate", true);
msg.setData(data);
callbackHandler.sendMessage(msg);
retryCount = 1;
}
data.putString("error", "Reconnecting attempt " + retryCount);
data.putBoolean("terminate", false);
msg.setData(data);
callbackHandler.sendMessage(msg);
retryCount++;
Log.w("Producer", "Connection issue: " + e.getClass().getName());
try {
Thread.sleep(5000);
} catch (InterruptedException ie) {
break;
}
}
}
});
producerThread.start();
}
Sending Immediate Mesasges
private BlockingDeque<String> messageBuffer = new LinkedBlockingDeque<>();
public void sendImmediateMessage(String msg) {
try {
Log.d("Sender", "Queued: " + msg);
messageBuffer.putLast(msg);
} catch (InterruptedException ie) {
ie.printStackTrace();
}
}