Implementing Asynchronous Patterns and WebSocket Communication in Tornado
Understanding Synchronous and Asynchronous Execution
Synchronous Execution
Synchronous execution follows a sequential, blocking model where each operation must complete before the next begins. Consider two client request handlers:
import time
def handle_request_a():
print('Processing request A')
print('Request A completed')
def handle_request_b():
print('Processing request B')
print('Request B completed')
def process_requests():
handle_request_a()
handle_request_b()
if __name__ == '__main__':
process_requests()
Output:
Processing request A
Request A completed
Processing request B
Request B completed
When long-running operations like I/O are introduced, synchronous execution becomes problematic:
def simulate_io_operation():
print('Starting I/O operation')
time.sleep(3)
print('I/O operation completed')
return 'io_response'
def handle_request_a():
print('Processing request A')
result = simulate_io_operation()
print(f'Result: {result}')
print('Request A completed')
def handle_request_b():
print('Processing request B')
print('Request B completed')
process_requests()
Output:
Processing request A
Starting I/O operation
I/O operation completed
Result: io_response
Request A completed
Processing request B
Request B completed
Request B cannot proceed until request A completes its I/O operation.
Asynchronous Execution
Asynchronous execution delegates time-consuming operations to separate execution contexts, allowing the main program to continue processing other tasks.
Callback-Based Implementation
import time
import threading
def async_io_operation(callback):
def execute_task():
print('Beginning I/O operation')
time.sleep(3)
print('I/O finished, invoking callback')
callback('operation_result')
threading.Thread(target=execute_task).start()
def completion_handler(result):
print('Callback execution started')
print(f'Received: {result}')
print('Callback execution ended')
def handle_request_a():
print('Processing request A')
async_io_operation(completion_handler)
print('Request A handler exiting')
def handle_request_b():
print('Processing request B')
time.sleep(1)
print('Request B completed')
handle_request_a()
handle_request_b()
while True:
pass
Output:
Processing request A
Request A handler exiting
Processing request B
Beginning I/O operation
Request B completed
I/O finished, invoking callback
Callback execution started
Received: operation_result
Callback execution ended
Coroutine-Based Implementation Using Yield
Generators with yield provide a way to write asynchronous code that resembles synchronous patterns.
import time
import threading
generator_instance = None
def long_running_task():
def task_runner():
global generator_instance
print('Starting async task')
time.sleep(3)
try:
print('Task complete, resuming generator')
generator_instance.send('task_output')
except StopIteration:
pass
threading.Thread(target=task_runner).start()
def handle_request_a():
print('Processing request A')
output = yield long_running_task()
print(f'Output: {output}')
print('Request A completed')
def handle_request_b():
print('Processing request B')
time.sleep(1)
print('Request B completed')
def run_app():
global generator_instance
generator_instance = handle_request_a()
generator_instance.next()
handle_request_b()
while True:
pass
run_app()
Tornado uses epoll-based event loops rather than threads to manage coroutines, maintaining execution within a single thread for true coroutine behavior.
Tornado Asynchronous Programming
AsyncHTTPClient
Tornado provides an asynchronous HTTP client for non-blocking web requests.
import tornado.web
import tornado.httpclient
import json
class MainHandler(tornado.web.RequestHandler):
@tornado.web.asynchronous
def get(self):
client = tornado.httpclient.AsyncHTTPClient()
client.fetch(
'http://api.example.com/data.json',
callback=self.handle_response
)
def handle_response(self, response):
if response.error:
self.send_error(500)
else:
data = json.loads(response.body)
self.write(f'Data: {data}')
self.finish()
The @tornado.web.asynchronous decorator indicates the handler uses asynchronous callbacks. The connection remains open until self.finish() is called.
Coroutine-Based Approach
Tornado's gen.coroutine decorator enables cleaner asynchronous code using yield.
import tornado.web
import tornado.httpclient
import tornado.gen
import json
class MainHandler(tornado.web.RequestHandler):
@tornado.gen.coroutine
def get(self):
client = tornado.httpclient.AsyncHTTPClient()
response = yield client.fetch('http://api.example.com/data.json')
if response.error:
self.send_error(500)
else:
data = json.loads(response.body)
self.write(f'Data: {data}')
Parallel Coroutine Execution
Multiple asynchronous operations can execute concurrently:
class MultiRequestHandler(tornado.web.RequestHandler):
@tornado.gen.coroutine
def get(self):
urls = [
'http://api.example.com/data1.json',
'http://api.example.com/data2.json'
]
http_client = tornado.httpclient.AsyncHTTPClient()
# Execute two requests in parallel
resp1, resp2 = yield [
http_client.fetch(urls[0]),
http_client.fetch(urls[1])
]
results = {
'first': json.loads(resp1.body),
'second': json.loads(resp2.body)
}
self.write(str(results))
Database Considerations
Tornado's single-threaded nature means synchronous database queries can block the entire server. For slow queries, consider:
- Optimizing database performance
- Implementing external HTTP APIs for database access
- Using Tornado's AsyncHTTPClient to call database APIs asynchronously
WebSocket Implementation
Server-Side WebSocket Handler
import tornado.web
import tornado.websocket
import datetime
class ChatSocketHandler(tornado.websocket.WebSocketHandler):
active_clients = set()
def open(self):
self.active_clients.add(self)
for client in self.active_clients:
timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
client.write_message(f'[{self.request.remote_ip}] joined at {timestamp}')
def on_message(self, message):
timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
for client in self.active_clients:
client.write_message(f'[{self.request.remote_ip}] at {timestamp}: {message}')
def on_close(self):
self.active_clients.remove(self)
timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
for client in self.active_clients:
client.write_message(f'[{self.request.remote_ip}] left at {timestamp}')
def check_origin(self, origin):
return True # Allow cross-origin WebSocket connections
Client-Side JavaScript
const socket = new WebSocket('ws://localhost:8888/chat');
socket.onopen = function() {
socket.send('Connection established');
};
socket.onmessage = function(event) {
console.log('Message received:', event.data);
};
socket.onclose = function() {
console.log('Connection closed');
};
function sendMessage(text) {
socket.send(text);
}
Application Setup
import tornado.web
import tornado.ioloop
app = tornado.web.Application([
(r'/chat', ChatSocketHandler),
])
if __name__ == '__main__':
app.listen(8888)
tornado.ioloop.IOLoop.current().start()
Deployment Configuration
Multi-Process Deployment with Supervisor
Create supervisor configuration for multiple Tornado instances:
[program:tornado-8000]
command=/path/to/python /app/server.py --port=8000
directory=/app
user=appuser
autorestart=true
redirect_stderr=true
stdout_logfile=/var/log/tornado.log
[program:tornado-8001]
command=/path/to/python /app/server.py --port=8001
directory=/app
user=appuser
autorestart=true
redirect_stderr=true
stdout_logfile=/var/log/tornado.log
[group:tornado]
programs=tornado-8000,tornado-8001
Nginx Configuration for Load Balancing
upstream tornado_backend {
server 127.0.0.1:8000;
server 127.0.0.1:8001;
}
upstream websocket_backend {
server 127.0.0.1:8000;
}
server {
listen 80;
location /chat {
proxy_pass http://websocket_backend/chat;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection 'upgrade';
}
location / {
proxy_pass http://tornado_backend;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
}
}