Fading Coder

One Final Commit for the Last Sprint

Home > Tech > Content

Implementing Asynchronous Patterns and WebSocket Communication in Tornado

Tech May 8 3

Understanding Synchronous and Asynchronous Execution

Synchronous Execution

Synchronous execution follows a sequential, blocking model where each operation must complete before the next begins. Consider two client request handlers:

import time

def handle_request_a():
    print('Processing request A')
    print('Request A completed')

def handle_request_b():
    print('Processing request B')
    print('Request B completed')

def process_requests():
    handle_request_a()
    handle_request_b()

if __name__ == '__main__':
    process_requests()

Output:

Processing request A
Request A completed
Processing request B
Request B completed

When long-running operations like I/O are introduced, synchronous execution becomes problematic:

def simulate_io_operation():
    print('Starting I/O operation')
    time.sleep(3)
    print('I/O operation completed')
    return 'io_response'

def handle_request_a():
    print('Processing request A')
    result = simulate_io_operation()
    print(f'Result: {result}')
    print('Request A completed')

def handle_request_b():
    print('Processing request B')
    print('Request B completed')

process_requests()

Output:

Processing request A
Starting I/O operation
I/O operation completed
Result: io_response
Request A completed
Processing request B
Request B completed

Request B cannot proceed until request A completes its I/O operation.

Asynchronous Execution

Asynchronous execution delegates time-consuming operations to separate execution contexts, allowing the main program to continue processing other tasks.

Callback-Based Implementation

import time
import threading

def async_io_operation(callback):
    def execute_task():
        print('Beginning I/O operation')
        time.sleep(3)
        print('I/O finished, invoking callback')
        callback('operation_result')
    threading.Thread(target=execute_task).start()

def completion_handler(result):
    print('Callback execution started')
    print(f'Received: {result}')
    print('Callback execution ended')

def handle_request_a():
    print('Processing request A')
    async_io_operation(completion_handler)
    print('Request A handler exiting')

def handle_request_b():
    print('Processing request B')
    time.sleep(1)
    print('Request B completed')

handle_request_a()
handle_request_b()
while True:
    pass

Output:

Processing request A
Request A handler exiting
Processing request B
Beginning I/O operation
Request B completed
I/O finished, invoking callback
Callback execution started
Received: operation_result
Callback execution ended

Coroutine-Based Implementation Using Yield

Generators with yield provide a way to write asynchronous code that resembles synchronous patterns.

import time
import threading

generator_instance = None

def long_running_task():
    def task_runner():
        global generator_instance
        print('Starting async task')
        time.sleep(3)
        try:
            print('Task complete, resuming generator')
            generator_instance.send('task_output')
        except StopIteration:
            pass
    threading.Thread(target=task_runner).start()

def handle_request_a():
    print('Processing request A')
    output = yield long_running_task()
    print(f'Output: {output}')
    print('Request A completed')

def handle_request_b():
    print('Processing request B')
    time.sleep(1)
    print('Request B completed')

def run_app():
    global generator_instance
    generator_instance = handle_request_a()
    generator_instance.next()
    handle_request_b()
    while True:
        pass

run_app()

Tornado uses epoll-based event loops rather than threads to manage coroutines, maintaining execution within a single thread for true coroutine behavior.

Tornado Asynchronous Programming

AsyncHTTPClient

Tornado provides an asynchronous HTTP client for non-blocking web requests.

import tornado.web
import tornado.httpclient
import json

class MainHandler(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    def get(self):
        client = tornado.httpclient.AsyncHTTPClient()
        client.fetch(
            'http://api.example.com/data.json',
            callback=self.handle_response
        )
    
    def handle_response(self, response):
        if response.error:
            self.send_error(500)
        else:
            data = json.loads(response.body)
            self.write(f'Data: {data}')
        self.finish()

The @tornado.web.asynchronous decorator indicates the handler uses asynchronous callbacks. The connection remains open until self.finish() is called.

Coroutine-Based Approach

Tornado's gen.coroutine decorator enables cleaner asynchronous code using yield.

import tornado.web
import tornado.httpclient
import tornado.gen
import json

class MainHandler(tornado.web.RequestHandler):
    @tornado.gen.coroutine
    def get(self):
        client = tornado.httpclient.AsyncHTTPClient()
        response = yield client.fetch('http://api.example.com/data.json')
        if response.error:
            self.send_error(500)
        else:
            data = json.loads(response.body)
            self.write(f'Data: {data}')

Parallel Coroutine Execution

Multiple asynchronous operations can execute concurrently:

class MultiRequestHandler(tornado.web.RequestHandler):
    @tornado.gen.coroutine
    def get(self):
        urls = [
            'http://api.example.com/data1.json',
            'http://api.example.com/data2.json'
        ]
        http_client = tornado.httpclient.AsyncHTTPClient()
        
        # Execute two requests in parallel
        resp1, resp2 = yield [
            http_client.fetch(urls[0]),
            http_client.fetch(urls[1])
        ]
        
        results = {
            'first': json.loads(resp1.body),
            'second': json.loads(resp2.body)
        }
        self.write(str(results))

Database Considerations

Tornado's single-threaded nature means synchronous database queries can block the entire server. For slow queries, consider:

  • Optimizing database performance
  • Implementing external HTTP APIs for database access
  • Using Tornado's AsyncHTTPClient to call database APIs asynchronously

WebSocket Implementation

Server-Side WebSocket Handler

import tornado.web
import tornado.websocket
import datetime

class ChatSocketHandler(tornado.websocket.WebSocketHandler):
    active_clients = set()
    
    def open(self):
        self.active_clients.add(self)
        for client in self.active_clients:
            timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            client.write_message(f'[{self.request.remote_ip}] joined at {timestamp}')
    
    def on_message(self, message):
        timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        for client in self.active_clients:
            client.write_message(f'[{self.request.remote_ip}] at {timestamp}: {message}')
    
    def on_close(self):
        self.active_clients.remove(self)
        timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        for client in self.active_clients:
            client.write_message(f'[{self.request.remote_ip}] left at {timestamp}')
    
    def check_origin(self, origin):
        return True  # Allow cross-origin WebSocket connections

Client-Side JavaScript

const socket = new WebSocket('ws://localhost:8888/chat');

socket.onopen = function() {
    socket.send('Connection established');
};

socket.onmessage = function(event) {
    console.log('Message received:', event.data);
};

socket.onclose = function() {
    console.log('Connection closed');
};

function sendMessage(text) {
    socket.send(text);
}

Application Setup

import tornado.web
import tornado.ioloop

app = tornado.web.Application([
    (r'/chat', ChatSocketHandler),
])

if __name__ == '__main__':
    app.listen(8888)
    tornado.ioloop.IOLoop.current().start()

Deployment Configuration

Multi-Process Deployment with Supervisor

Create supervisor configuration for multiple Tornado instances:

[program:tornado-8000]
command=/path/to/python /app/server.py --port=8000
directory=/app
user=appuser
autorestart=true
redirect_stderr=true
stdout_logfile=/var/log/tornado.log

[program:tornado-8001]
command=/path/to/python /app/server.py --port=8001
directory=/app
user=appuser
autorestart=true
redirect_stderr=true
stdout_logfile=/var/log/tornado.log

[group:tornado]
programs=tornado-8000,tornado-8001

Nginx Configuration for Load Balancing

upstream tornado_backend {
    server 127.0.0.1:8000;
    server 127.0.0.1:8001;
}

upstream websocket_backend {
    server 127.0.0.1:8000;
}

server {
    listen 80;
    
    location /chat {
        proxy_pass http://websocket_backend/chat;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection 'upgrade';
    }
    
    location / {
        proxy_pass http://tornado_backend;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
    }
}
Tags: Tornado

Related Articles

Understanding Strong and Weak References in Java

Strong References Strong reference are the most prevalent type of object referencing in Java. When an object has a strong reference pointing to it, the garbage collector will not reclaim its memory. F...

Comprehensive Guide to SSTI Explained with Payload Bypass Techniques

Introduction Server-Side Template Injection (SSTI) is a vulnerability in web applications where user input is improper handled within the template engine and executed on the server. This exploit can r...

Implement Image Upload Functionality for Django Integrated TinyMCE Editor

Django’s Admin panel is highly user-friendly, and pairing it with TinyMCE, an effective rich text editor, simplifies content management significantly. Combining the two is particular useful for bloggi...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.