Building a Distributed Image Processing System with Java, Python, and RabbitMQ
Problem Statement
A web-based image upload feature requires numeric verification within uploaded images. The architecture leverages Java web services to handle HTTP requests and orchestrates communication through a message broker, while Python handles optical character recognition (OCR) processing.
Technology Stack
This solution integrates multiple technologies working together:
- Spring Boot — Backend REST API framework
- Python — OCR processing and image analysis
- RabbitMQ — Asynchronous message broker for inter-service communication
- Docker — Containerization for RabbitMQ deployment
System Architecture
The solution folows a distributed microservices pattern with bidirectional messaging:
Web Client --upload--> Java API (Producer) --> RabbitMQ --> Python Worker (Consumer)
Python Worker (Producer) --> RabbitMQ --> Java Consumer --> Response
The system consists of two communication channels: one for dispatching image data from Java to Python, and another for returning processed results back to the Java applicatoin.
Infrastructure Setup
Docker Installation (CentOS)
yum install -y yum-utils device-mapper-persistent-data lvm2
yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo
yum install docker-ce
systemctl start docker
systemctl enable docker
RabbitMQ Deployment
Pull and launch the RabbitMQ management image:
docker pull rabbitmq:3.9-management
docker run -d --hostname rabbitmq-host --name message-broker \
-p 5672:5672 -p 15672:15672 \
rabbitmq:3.9-management
Access the management console at http://your-server-ip:15672. Default credentials are guest/guest.
Nginx Configuration
Nginx serves as a reverse proxy and handles static file serving. Refer to standard Nginx documentation for installation on Windows or Linux platforms.
Frontend Implementation
<html>
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Image Upload Service</title>
<script src="https://code.jquery.com/jquery-3.6.0.min.js"></script>
</head>
<body>
<div>
<input type="file" id="imageFile" accept="image/*">
<button id="uploadBtn">Submit Image</button>
</div>
<script>
document.getElementById('uploadBtn').addEventListener('click', function() {
const fileInput = document.getElementById('imageFile');
const selectedFile = fileInput.files[0];
if (!selectedFile) {
alert('Please select an image first');
return;
}
const formData = new FormData();
formData.append('imageData', selectedFile);
fetch('/api/image/process', {
method: 'POST',
body: formData
})
.then(response => response.json())
.then(data => console.log('Processing initiated:', data))
.catch(error => console.error('Error:', error));
});
</script>
</body>
</html>
Java Backend Services
Create a multi-module Maven project with two distinct modules: one for publishing tasks and another for consuming results.
Common Dependencies
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
Shared Configuration
spring:
application:
name: distributed-image-processor
rabbitmq:
host: 192.168.1.100
port: 5672
username: guest
password: guest
virtual-host: /
Producer Module — Task Dispatcher
REST Controller:
@RestController
@RequestMapping("/api/image")
public class ImageUploadController {
private static final Logger logger = LoggerFactory.getLogger(ImageUploadController.class);
private final MessageDispatcher dispatcher;
@Autowired
public ImageUploadController(MessageDispatcher dispatcher) {
this.dispatcher = dispatcher;
}
@PostMapping("/process")
public ResponseEntity<String> handleImageUpload(@RequestParam("imageData") MultipartFile imageFile) {
try {
byte[] imageBytes = imageFile.getBytes();
dispatcher.submitTask("java2python", imageBytes);
logger.info("Image submitted for processing");
return ResponseEntity.accepted().body("Processing started");
} catch (IOException e) {
logger.error("Failed to process upload", e);
return ResponseEntity.internalServerError().body("Upload failed");
}
}
}
Service Interface and Implementation:
public interface MessageDispatcher {
void submitTask(String routingKey, byte[] payload);
}
@Service
public class RabbitDispatcher implements MessageDispatcher {
private final RabbitTemplate template;
@Autowired
public RabbitDispatcher(RabbitTemplate template) {
this.template = template;
}
@Override
public void submitTask(String routingKey, byte[] payload) {
template.convertAndSend(routingKey, payload);
}
}
CORS Configuration:
@Configuration
public class CrossOriginConfig implements WebMvcConfigurer {
@Override
public void addCorsMappings(CorsRegistry registry) {
registry.addMapping("/**")
.allowedOrigins("*")
.allowedMethods("GET", "POST", "PUT", "DELETE", "OPTIONS")
.allowedHeaders("*")
.maxAge(3600);
}
}
Consumer Module — Result Handler
Listen for OCR results on the python2java queue:
@Component
public class ResultListener {
private static final Logger logger = LoggerFactory.getLogger(ResultListener.class);
@RabbitListener(queuesToDeclare = @Queue(
value = "python2java",
durable = "true",
autoDelete = "false"
))
public void handleResult(byte[] messageBody) {
String result = new String(messageBody, StandardCharsets.UTF_8);
logger.info("Received OCR result: {}", result);
processRecognitionOutput(result);
}
private void processRecognitionOutput(String content) {
logger.info("Processing recognized text: {}", content.trim());
}
}
Python Processnig Service
Install the required Python packages:
python -m pip install pika pillow pytesseract --upgrade
Message Consumer — Image Receiver
#!/usr/bin/env python
import os
from PIL import Image
import pytesseract
import pika
BROKER_HOST = '192.168.1.100'
INPUT_QUEUE = 'java2python'
TEMP_IMAGE_PATH = '/tmp/processed_image.png'
class ImageProcessor:
def __init__(self, image_path):
self.image_path = image_path
def extract_text(self) -> str:
image = Image.open(self.image_path)
return pytesseract.image_to_string(image, lang='eng')
def persist_image(image_path: str, content: bytes) -> None:
os.makedirs(os.path.dirname(image_path), exist_ok=True)
with open(image_path, 'wb') as f:
f.write(content)
def process_and_respond(channel, method, properties, body):
persist_image(TEMP_IMAGE_PATH, body)
processor = ImageProcessor(TEMP_IMAGE_PATH)
extracted_text = processor.extract_text()
print(f"OCR Result: {extracted_text}")
send_response(extracted_text.strip())
def send_response(text: str) -> None:
connection = pika.BlockingConnection(pika.ConnectionParameters(BROKER_HOST))
channel = connection.channel()
channel.queue_declare(queue='python2java', durable=True)
channel.basic_publish(
exchange='',
routing_key='python2java',
body=text.encode('utf-8')
)
connection.close()
def start_consuming() -> None:
connection = pika.BlockingConnection(pika.ConnectionParameters(BROKER_HOST))
channel = connection.channel()
channel.queue_declare(queue=INPUT_QUEUE, durable=True)
channel.basic_consume(INPUT_QUEUE, on_message_callback=process_and_respond, auto_ack=True)
print(f"Waiting for images on queue: {INPUT_QUEUE}")
channel.start_consuming()
if __name__ == '__main__':
start_consuming()
Message Producer — Result Publisher
#!/usr/bin/env python
import pika
class ResultPublisher:
def __init__(self, host: str, queue_name: str, message_body: bytes):
self.host = host
self.queue = queue_name
self.body = message_body
def publish(self) -> None:
connection = pika.BlockingConnection(pika.ConnectionParameters(self.host))
channel = connection.channel()
channel.queue_declare(self.queue, durable=True)
channel.basic_publish(
exchange='',
routing_key=self.queue,
body=self.body
)
connection.close()
print(f"Published result to {self.queue}")
Testing the Integration
Execute the following startup sequence:
- Start the RabbitMQ container
- Initialize the Java producer service
- Launch the Python consumer process
- Start the Java consumer module
- Upload an image through the web interface
The complete flow transfers the uploaded image through the message broker to Python for OCR processing, then returns the extracted text back through a separate queue for the Java application to consume.