Implementing AI Behavior Recognition on Edge Devices with Video Stream Processing
The system operates through a sequential workflow: reading configuration files, initializing models and components, processing video frames, and managing output streams.
Acquiring Video Streams
Three primary methods are used to obtain video streams for processinng:
Method 1: Simulated Video Stream Utilize MediaMTX and FFmpeg to generate a simultaed RTSP stream. This can be deployed on a local network device like a Raspberry Pi.
MediaMTX: https://github.com/bluenviron/mediamtx/releases FFmpeg: https://ffmpeg.org/ffmpeg.html
nohup ./mediamtx mediamtx.yml > mediamtx.log 2>&1 &
nohup ffmpeg -re -stream_loop -1 -i /home/medias/sample.mp4 -vcodec copy -acodec copy -b:v 5M -f rtsp -rtsp_transport tcp rtsp://localhost:8554/stream.sdp > ffmpeg.log 2>&1 &
Stream URL: rtsp://192.168.124.31:8554/stream.sdp
Method 2: Network Camera Stream (Hikvision)
rtsp://admin:password@192.168.124.38:554/h265/ch1/main/av_stream
Method 3: Network Video Recorder Stream (Hikvision)
rtsp://admin:password@192.168.124.29:554/Streaming/Channels/101
Core Implementation Components
1. Thread Pool Initialization
// Initialize detection thread pool
DetectorPool* detector_pool = new DetectorPool();
detector_pool->configure(model_file, thread_count, nms_thresh, conf_thresh, label_file, class_count);
2. Video Decoder Setup
VideoDecoder* decoder = new VideoDecoder();
decoder->initialize(codec_type, frame_rate, &app_context);
decoder->setFrameCallback(frame_processing_callback);
app_context.decoder = decoder;
3. Frame Processing Pipeline
// Convert YUV420SP to RGB888 using hardware acceleration
rga_buffer_t yuv_buffer = wrapbuffer_fd(fd, width, height, RK_FORMAT_YCbCr_420_SP, width_stride, height_stride);
cv::Mat rgb_frame = cv::Mat::zeros(height, width, CV_8UC3);
rga_buffer_t rgb_buffer = wrapbuffer_virtualaddr((void*)rgb_frame.data, width, height, RK_FORMAT_RGB_888);
imcopy(yuv_buffer, rgb_buffer);
// Submit frame for inference
detector_pool->processFrame(rgb_frame, frame_id++);
4. Detection Result Processing and Visualization
std::vector<DetectionResult> detections;
auto status = detector_pool->retrieveResults(detections, result_id);
bool alert_triggered = false;
for (const auto& detection : detections) {
auto label_entry = context->label_map.find(detection.class_label);
bool within_zone = true;
if (context->zone_enabled && context->monitoring_zone.size() > 0) {
Point center = {
detection.bounding_box.x + detection.bounding_box.width / 2,
detection.bounding_box.y + detection.bounding_box.height / 2
};
within_zone = pointInPolygon(context->monitoring_zone, center);
}
double threshold_value = label_entry->second;
if (label_entry != context->label_map.end() && within_zone && detection.confidence >= threshold_value) {
cv::rectangle(frame, detection.bounding_box, cv::Scalar(0, 0, 255), 2);
std::string label_text = detection.class_label + " " + std::to_string(detection.confidence);
cv::putText(frame, label_text, cv::Point(detection.bounding_box.x, detection.bounding_box.y - 5),
cv::FONT_HERSHEY_SIMPLEX, 0.7, cv::Scalar(0, 0, 255), 2);
alert_triggered = true;
} else {
cv::rectangle(frame, detection.bounding_box, detection.color, 1);
std::string label_text = detection.class_label + " " + std::to_string(detection.confidence);
cv::putText(frame, label_text, cv::Point(detection.bounding_box.x, detection.bounding_box.y - 5),
cv::FONT_HERSHEY_SIMPLEX, 0.5, cv::Scalar(255, 0, 255), 1);
}
}
5. Stream Output Configuraton
// Initialize streaming environment
char* config_path = get_executable_directory(context->config_file.c_str());
StreamConfig stream_config = {
.worker_threads = 10,
.log_level = 0,
.log_output = LOG_TO_CONSOLE,
.config_file = config_path
};
initialize_streaming(&stream_config);
free(config_path);
if (context->http_enabled)
start_http_server(context->http_port, 0);
if (context->rtsp_enabled)
start_rtsp_server(context->rtsp_port, 0);
if (context->rtmp_enabled)
start_rtmp_server(context->rtmp_port, 0);
context->stream_player = create_stream_player();
context->source_url = stream_url;
set_player_callbacks(context->stream_player, play_event_handler, stream_error_handler, context);
play_stream(context->stream_player, context->source_url);
// Push encoded video data
stream_media_data(context->media_handle, encoded_data, data_size, timestamp, timestamp);
Configuration Parameters
[EDGE_AI]
ProcessID = b6cf4e7e-b952-4bf5-be29-0225f71d7f57
ModelPath = /home/edge-ai/weights/detection_model.rknn
NMSThreshold = 0.65
ConfidenceThreshold = 0.4
LabelFile = /home/edge-ai/labels/coco_labels.txt
ClassCount = 80
StreamURL = rtsp://192.168.124.31:8554/stream.sdp
VideoCodec = 264
SourceFrameRate = 25
WorkerThreads = 20
EnableHTTP = 0
HTTPPort = 80
EnableRTSP = 1
RTSPPort = 554
EnableRTMP = 0
RTMPPort = 1935
MonitoringZones = {11,00,17,10}
DetectionLabels = {person,0.5}
AlertDuration = 3
AlertInterval = 60
RecordingLength = 5