Dual Camera Acquisition with OpenCV-Python: Threading and Multiprocessing Approaches
Overview
Cpaturing video streams from multiple USB cameras simultaneously using OpenCV requires careful consideration of I/O blocking, USB bandwidth limitations, and resource management. This article presents three distinct implementation strategies with modified code structures and logic.
Synchronous Dual Capture Method
The simplest approach reads frames sequentially from two devices. While easy to implement, this method may suffer from latency if one camera blocks.
import cv2
import numpy as np
def sync_dual_capture():
primary_cam = cv2.VideoCapture(0)
secondary_cam = cv2.VideoCapture(2)
if not (primary_cam.isOpened() and secondary_cam.isOpened()):
print("Failed to initialize one or both cameras")
return
# Configure stream properties
for cam in [primary_cam, secondary_cam]:
cam.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cam.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
while True:
ret1, img1 = primary_cam.read()
ret2, img2 = secondary_cam.read()
if not (ret1 and ret2):
break
# Display individual feeds
cv2.imshow("Primary Device", img1)
cv2.imshow("Secondary Device", img2)
# Create side-by-side composite
combined = np.hstack((img1, img2))
cv2.imshow("Combined View", combined)
if cv2.waitKey(1) & 0xFF == ord('q'):
# Save high-resolution snapshot
hd_frame = cv2.resize(img1, (1920, 1080))
cv2.imwrite('snapshot.jpg', hd_frame)
break
primary_cam.release()
secondary_cam.release()
cv2.destroyAllWindows()
sync_dual_capture()
Threaded Non-Blocking Solution
For independent camera operations, threading prevents I/O blocking. Each camera runs in its own thread with dedicated window rendering.
import cv2
import threading
import time
class CameraThread(threading.Thread):
def __init__(self, device_id, window_name):
threading.Thread.__init__(self)
self.device_id = device_id
self.window_name = window_name
self.running = True
def run(self):
capture_dev = cv2.VideoCapture(self.device_id)
if not capture_dev.isOpened():
print(f"Cannot access camera {self.device_id}")
return
capture_dev.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
capture_dev.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
while self.running:
success, frame = capture_dev.read()
if success:
cv2.imshow(self.window_name, frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
self.running = False
capture_dev.release()
def stop(self):
self.running = False
def threaded_capture():
cam_thread1 = CameraThread(0, "Device A Stream")
cam_thread2 = CameraThread(1, "Device B Stream")
cam_thread1.start()
cam_thread2.start()
try:
while cam_thread1.running and cam_thread2.running:
time.sleep(0.1)
except KeyboardInterrupt:
pass
cam_thread1.stop()
cam_thread2.stop()
cam_thread1.join()
cam_thread2.join()
cv2.destroyAllWindows()
threaded_capture()
Multiprocessing for Resource Isolation
When CPU-intensive processing is required, multiprocessing avoids GIL limitations and provides true parallelism. This is especially effective on multi-core systems.
import cv2
import multiprocessing as mp
def capture_process(camera_index):
cap = cv2.VideoCapture(camera_index)
# Force MJPEG codec to reduce USB bandwidth usage
cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc('M', 'J', 'P', 'G'))
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
actual_width = cap.get(cv2.CAP_PROP_FRAME_WIDTH)
actual_height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT)
fps = cap.get(cv2.CAP_PROP_FPS)
print(f"Camera {camera_index}: {actual_width}x{actual_height} @ {fps}fps")
window_title = f"Camera {camera_index} Feed"
while True:
ret, image = cap.read()
if not ret:
print(f"Camera {camera_index} frame capture failed")
break
cv2.imshow(window_title, image)
if cv2.waitKey(10) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
if __name__ == '__main__':
# Note: On some embedded systems (e.g., Raspberry Pi),
# even indices may correspond to physical USB ports
process1 = mp.Process(target=capture_process, args=(0,))
process2 = mp.Process(target=capture_process, args=(2,))
process1.start()
process2.start()
process1.join()
process2.join()
Hardware Considerations and Bandwidth Optimization
USB bandwidth saturation commonly causes frame drops or initialization failures when multiple cameras share the same bus. The MJPEG codec significant reduces data throughput compared to uncompressed YUYV.
# Apply before capturing begins
camera_handle.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*"MJPG"))
Additional mitigation strategies include:
- Connecting cameras to separate USB controllers
- Lowering resolution and frame rate
- Using powered USB hubs with independent channels
- Implementing selective frame retrieval with
grab()andretrieve()for synchronized capture
Grab-Retrieve Pattern for Synchronized Acquisition
import cv2
def synchronized_grab():
dev1 = cv2.VideoCapture(3)
dev2 = cv2.VideoCapture(2)
if not (dev1.isOpened() and dev2.isOpened()):
return
# Pre-allocate buffers
dev1.grab()
dev2.grab()
while True:
status1, img1 = dev1.retrieve()
status2, img2 = dev2.retrieve()
if status1 and status2:
composite = cv2.hconcat([img1, img2])
cv2.imshow("Synchronized Pair", composite)
# Queue next frames
dev1.grab()
dev2.grab()
if cv2.waitKey(1) == ord('q'):
break
dev1.release()
dev2.release()
cv2.destroyAllWindows()