Fading Coder

One Final Commit for the Last Sprint

Home > Tech > Content

Dual Camera Acquisition with OpenCV-Python: Threading and Multiprocessing Approaches

Tech May 19 2

Overview

Cpaturing video streams from multiple USB cameras simultaneously using OpenCV requires careful consideration of I/O blocking, USB bandwidth limitations, and resource management. This article presents three distinct implementation strategies with modified code structures and logic.

Synchronous Dual Capture Method

The simplest approach reads frames sequentially from two devices. While easy to implement, this method may suffer from latency if one camera blocks.

import cv2
import numpy as np

def sync_dual_capture():
    primary_cam = cv2.VideoCapture(0)
    secondary_cam = cv2.VideoCapture(2)
    
    if not (primary_cam.isOpened() and secondary_cam.isOpened()):
        print("Failed to initialize one or both cameras")
        return
    
    # Configure stream properties
    for cam in [primary_cam, secondary_cam]:
        cam.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
        cam.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
    
    while True:
        ret1, img1 = primary_cam.read()
        ret2, img2 = secondary_cam.read()
        
        if not (ret1 and ret2):
            break
        
        # Display individual feeds
        cv2.imshow("Primary Device", img1)
        cv2.imshow("Secondary Device", img2)
        
        # Create side-by-side composite
        combined = np.hstack((img1, img2))
        cv2.imshow("Combined View", combined)
        
        if cv2.waitKey(1) & 0xFF == ord('q'):
            # Save high-resolution snapshot
            hd_frame = cv2.resize(img1, (1920, 1080))
            cv2.imwrite('snapshot.jpg', hd_frame)
            break
    
    primary_cam.release()
    secondary_cam.release()
    cv2.destroyAllWindows()

sync_dual_capture()

Threaded Non-Blocking Solution

For independent camera operations, threading prevents I/O blocking. Each camera runs in its own thread with dedicated window rendering.

import cv2
import threading
import time

class CameraThread(threading.Thread):
    def __init__(self, device_id, window_name):
        threading.Thread.__init__(self)
        self.device_id = device_id
        self.window_name = window_name
        self.running = True
    
    def run(self):
        capture_dev = cv2.VideoCapture(self.device_id)
        
        if not capture_dev.isOpened():
            print(f"Cannot access camera {self.device_id}")
            return
        
        capture_dev.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
        capture_dev.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
        
        while self.running:
            success, frame = capture_dev.read()
            if success:
                cv2.imshow(self.window_name, frame)
            
            if cv2.waitKey(1) & 0xFF == ord('q'):
                self.running = False
        
        capture_dev.release()
    
    def stop(self):
        self.running = False

def threaded_capture():
    cam_thread1 = CameraThread(0, "Device A Stream")
    cam_thread2 = CameraThread(1, "Device B Stream")
    
    cam_thread1.start()
    cam_thread2.start()
    
    try:
        while cam_thread1.running and cam_thread2.running:
            time.sleep(0.1)
    except KeyboardInterrupt:
        pass
    
    cam_thread1.stop()
    cam_thread2.stop()
    cam_thread1.join()
    cam_thread2.join()
    cv2.destroyAllWindows()

threaded_capture()

Multiprocessing for Resource Isolation

When CPU-intensive processing is required, multiprocessing avoids GIL limitations and provides true parallelism. This is especially effective on multi-core systems.

import cv2
import multiprocessing as mp

def capture_process(camera_index):
    cap = cv2.VideoCapture(camera_index)
    
    # Force MJPEG codec to reduce USB bandwidth usage
    cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc('M', 'J', 'P', 'G'))
    cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
    cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
    
    actual_width = cap.get(cv2.CAP_PROP_FRAME_WIDTH)
    actual_height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT)
    fps = cap.get(cv2.CAP_PROP_FPS)
    print(f"Camera {camera_index}: {actual_width}x{actual_height} @ {fps}fps")
    
    window_title = f"Camera {camera_index} Feed"
    
    while True:
        ret, image = cap.read()
        if not ret:
            print(f"Camera {camera_index} frame capture failed")
            break
        
        cv2.imshow(window_title, image)
        
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break
    
    cap.release()
    cv2.destroyAllWindows()

if __name__ == '__main__':
    # Note: On some embedded systems (e.g., Raspberry Pi),
    # even indices may correspond to physical USB ports
    process1 = mp.Process(target=capture_process, args=(0,))
    process2 = mp.Process(target=capture_process, args=(2,))
    
    process1.start()
    process2.start()
    
    process1.join()
    process2.join()

Hardware Considerations and Bandwidth Optimization

USB bandwidth saturation commonly causes frame drops or initialization failures when multiple cameras share the same bus. The MJPEG codec significant reduces data throughput compared to uncompressed YUYV.

# Apply before capturing begins
camera_handle.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*"MJPG"))

Additional mitigation strategies include:

  • Connecting cameras to separate USB controllers
  • Lowering resolution and frame rate
  • Using powered USB hubs with independent channels
  • Implementing selective frame retrieval with grab() and retrieve() for synchronized capture

Grab-Retrieve Pattern for Synchronized Acquisition

import cv2

def synchronized_grab():
    dev1 = cv2.VideoCapture(3)
    dev2 = cv2.VideoCapture(2)
    
    if not (dev1.isOpened() and dev2.isOpened()):
        return
    
    # Pre-allocate buffers
    dev1.grab()
    dev2.grab()
    
    while True:
        status1, img1 = dev1.retrieve()
        status2, img2 = dev2.retrieve()
        
        if status1 and status2:
            composite = cv2.hconcat([img1, img2])
            cv2.imshow("Synchronized Pair", composite)
        
        # Queue next frames
        dev1.grab()
        dev2.grab()
        
        if cv2.waitKey(1) == ord('q'):
            break
    
    dev1.release()
    dev2.release()
    cv2.destroyAllWindows()

Related Articles

Understanding Strong and Weak References in Java

Strong References Strong reference are the most prevalent type of object referencing in Java. When an object has a strong reference pointing to it, the garbage collector will not reclaim its memory. F...

Comprehensive Guide to SSTI Explained with Payload Bypass Techniques

Introduction Server-Side Template Injection (SSTI) is a vulnerability in web applications where user input is improper handled within the template engine and executed on the server. This exploit can r...

Implement Image Upload Functionality for Django Integrated TinyMCE Editor

Django’s Admin panel is highly user-friendly, and pairing it with TinyMCE, an effective rich text editor, simplifies content management significantly. Combining the two is particular useful for bloggi...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.