Fading Coder

One Final Commit for the Last Sprint

Home > Tech > Content

Implementing Vehicle and Pedestrian Detection with TensorFlow-based YOLOv3-Tiny

Tech May 17 1

Network Architecture Overview

The implementation utilizes a TensorFlow adaptation of YOLOv3 with a Darknet53 backbone for real-time object detection of vehicles and pedestrians.

Code Structure

tf_yolov3
├── extract_voc.py          # Generate training data format from raw files
├── make_voc_tfrecord.sh    # Create TFRecord format
├── convert_tfrecord.py     # Convert raw data to TFRecord format
├── dataset.py             # Data augmentation and loading
├── utils.py               # NMS, bounding box drawing, PB file operations
├── common.py              # Convolution layers with padding (Slim)
├── yolov3.py              # Define Darknet53 and YOLOv3 architecture
├── convert_weight.py      # Convert Darknet weights to TensorFlow PB format
├── yolov3_train.py        # Load parameters, data, and training loop
├── evaluate.py            # Model performance evaluation
├── inference.py           # Model inference using trained weights
├── show_input_image.py    # Display input images and bounding boxes
├── kmeans.py              # Calculate anchors from input data
└── test_video.py          # Video testing of model performance

Data Preparation Pipeline

VOC Data Extraction

import os
import argparse
import xml.etree.ElementTree as ET

sets = [('2012', 'train'), ('2012', 'val')]
class_names = ["aeroplane", "bicycle", "bird", "boat", "bottle", "bus", "car", "cat", "chair", "cow", "diningtable", "dog", "horse", "motorbike", "person", "pottedplant", "sheep", "sofa", "train", "tvmonitor"]

parser = argparse.ArgumentParser()
parser.add_argument("--voc_directory", default="/home/user/datasets/VOCdevkit/training/")
parser.add_argument("--output_directory", default="./")
arguments = parser.parse_args()

def process_annotations(year, image_identifier, output_file):
    annotation_file = os.path.join(arguments.voc_directory, f'VOCdevkit/VOC{year}/Annotations/{image_identifier}.xml')
    with open(annotation_file) as xml_file:
        tree = ET.parse(xml_file)
        root = tree.getroot()
        
        for obj in root.iter('object'):
            difficulty = obj.find('difficult').text
            class_name = obj.find('name').text
            if class_name not in class_names or int(difficulty) == 1:
                continue
            class_id = class_names.index(class_name)
            bbox = obj.find('bndbox')
            coordinates = (int(bbox.find('xmin').text), int(bbox.find('ymin').text), 
                         int(bbox.find('xmax').text), int(bbox.find('ymax').text))
            output_file.write(" " + " ".join([str(coord) for coord in coordinates]) + " " + str(class_id))

for year, image_set in sets:
    text_file_path = os.path.join(arguments.voc_directory, f'VOCdevkit/VOC{year}/ImageSets/Main/{image_set}.txt')
    if not os.path.exists(text_file_path): 
        continue
    image_identifiers = open(text_file_path).read().strip().split()
    output_file_path = os.path.join(arguments.output_directory, f'{year}_{image_set}.txt')
    output_file = open(output_file_path, 'w')
    
    for image_id in image_identifiers:
        image_location = os.path.join(arguments.voc_directory, f'VOCdevkit/VOC{year}/JPEGImages/{image_id}.jpg')
        print("=>", image_location)
        output_file.write(image_location)
        process_annotations(year, image_id, output_file)
        output_file.write('\n')
    output_file.close()

TFRecord Conversion Script

python scripts/extract_voc.py --voc_directory /home/user/datasets/VOC/training/ --output_directory ./
cat ./2007_train.txt ./2007_val.txt > voc_training.txt
python scripts/extract_voc.py --voc_directory /home/user/datasets/VOC/testing/ --output_directory ./
cat ./2007_test.txt > voc_testing.txt
python core/convert_tfrecord.py --dataset_file ./voc_training.txt --tfrecord_prefix /home/user/datasets/VOC/training/voc_training
python core/convert_tfrecord.py --dataset_file ./voc_testing.txt --tfrecord_prefix /home/user/datasets/VOC/testing/voc_testing

TFRecord Format Converter

import sys
import argparse
import numpy as np
import tensorflow as tf

def main(argv):
    parser = argparse.ArgumentParser()
    parser.add_argument("--dataset_file", default='./data/dataset.txt')
    parser.add_argument("--tfrecord_prefix", default='./data/tfrecords/dataset')
    flags = parser.parse_args()

    dataset_map = {}
    
    with open(flags.dataset_file, 'r') as file_handle:
        for line in file_handle.readlines():
            elements = line.split(' ')
            image_path = elements[0]
            box_count = len(elements[1:]) // 5
            bounding_boxes = np.zeros([box_count, 5], dtype=np.float32)
            for idx in range(box_count):
                bounding_boxes[idx] = elements[1+idx*5:6+idx*5]
            dataset_map[image_path] = bounding_boxes

    image_paths = list(dataset_map.keys())
    total_images = len(image_paths)
    print(f">> Processing {total_images} images")

    tfrecord_filename = flags.tfrecord_prefix + ".tfrecords"
    with tf.python_io.TFRecordWriter(tfrecord_filename) as writer:
        for idx in range(total_images):
            image_data = tf.gfile.FastGFile(image_paths[idx], 'rb').read()
            boxes_data = dataset_map[image_paths[idx]]
            boxes_data = boxes_data.tostring()

            example_record = tf.train.Example(features=tf.train.Features(
                feature={
                    'image': tf.train.Feature(bytes_list=tf.train.BytesList(value=[image_data])),
                    'boxes': tf.train.Feature(bytes_list=tf.train.BytesList(value=[boxes_data])),
                }
            ))

            writer.write(example_record.SerializeToString())
        print(f">> Saved {total_images} images in {tfrecord_filename}")

Dataset Handling and Augmentation

import cv2
import numpy as np
from core import utils
import tensorflow as tf

class DataParser(object):
    def __init__(self, img_height, img_width, anchor_boxes, class_count, debug_mode=False):
        self.anchor_boxes = anchor_boxes
        self.class_count = class_count
        self.img_height = img_height
        self.img_width = img_width
        self.debug_mode = debug_mode

    def horizontal_flip(self, image_data, ground_truth):
        width_val = tf.cast(tf.shape(image_data)[1], tf.float32)
        image_data = tf.image.flip_left_right(image_data)
        
        x_min, y_min, x_max, y_max, label = tf.unstack(ground_truth, axis=1)
        x_min, y_min, x_max, y_max = width_val - x_max, y_min, width_val - x_min, y_max
        ground_truth = tf.stack([x_min, y_min, x_max, y_max, label], axis=1)
        
        return image_data, ground_truth

    def color_distortion(self, image_data, ground_truth):
        image_data = tf.image.random_brightness(image_data, max_delta=32./255.)
        image_data = tf.image.random_saturation(image_data, lower=0.8, upper=1.2)
        image_data = tf.image.random_hue(image_data, max_delta=0.2)
        image_data = tf.image.random_contrast(image_data, lower=0.8, upper=1.2)
        return image_data, ground_truth

    def gaussian_blur(self, image_data, ground_truth):
        def apply_blur(img):
            return cv2.GaussianBlur(img, (5, 5), 0)
        height, width = image_data.shape.as_list()[:2]
        image_data = tf.py_func(apply_blur, [image_data], tf.uint8)
        image_data.set_shape([height, width, 3])
        return image_data, ground_truth

    def random_crop_operation(self, image_data, ground_truth, min_coverage=0.8, 
                             aspect_range=[0.8, 1.2], area_range=[0.5, 1.0]):
        
        height, width = tf.cast(tf.shape(image_data)[0], tf.float32), tf.cast(tf.shape(image_data)[1], tf.float32)
        x_min, y_min, x_max, y_max, label = tf.unstack(ground_truth, axis=1)
        bbox_coords = tf.stack([y_min/height, x_min/width, y_max/height, x_max/width], axis=1)
        bbox_coords = tf.clip_by_value(bbox_coords, 0, 1)
        
        start_point, crop_size, distorted_boxes = tf.image.sample_distorted_bounding_box(
            tf.shape(image_data),
            bounding_boxes=tf.expand_dims(bbox_coords, axis=0),
            min_object_covered=min_coverage,
            aspect_ratio_range=aspect_range,
            area_range=area_range)
        
        cropped_region = [distorted_boxes[0,0,1]*width, distorted_boxes[0,0,0]*height, 
                         distorted_boxes[0,0,3]*width, distorted_boxes[0,0,2]*height]

        cropped_x_min = tf.clip_by_value(x_min, cropped_region[0], cropped_region[2]) - cropped_region[0]
        cropped_y_min = tf.clip_by_value(y_min, cropped_region[1], cropped_region[3]) - cropped_region[1]
        cropped_x_max = tf.clip_by_value(x_max, cropped_region[0], cropped_region[2]) - cropped_region[0]
        cropped_y_max = tf.clip_by_value(y_max, cropped_region[1], cropped_region[3]) - cropped_region[1]

        image_data = tf.slice(image_data, start_point, crop_size)
        ground_truth = tf.stack([cropped_x_min, cropped_y_min, cropped_x_max, cropped_y_max, label], axis=1)

        return image_data, ground_truth

    def preprocessing_pipeline(self, image_data, ground_truth):
        image_data, ground_truth = utils.resize_image_correct_bbox(image_data, ground_truth, 
                                                                 self.img_height, self.img_width)

        if self.debug_mode: 
            return image_data, ground_truth

        true_13, true_26, true_52 = tf.py_func(self.process_true_boxes, inp=[ground_truth],
                            Tout=[tf.float32, tf.float32, tf.float32])
        image_data = image_data / 255.

        return image_data, true_13, true_26, true_52

    def process_true_boxes(self, gt_boxes):
        layer_count = len(self.anchor_boxes) // 3
        anchor_masks = [[6,7,8], [3,4,5], [0,1,2]] if layer_count == 3 else [[3,4,5], [1,2,3]]
        grid_dimensions = [[self.img_height//x, self.img_width//x] for x in (32, 16, 8)]

        box_centers = (gt_boxes[:, 0:2] + gt_boxes[:, 2:4]) / 2
        box_dimensions = gt_boxes[:, 2:4] - gt_boxes[:, 0:2]

        gt_boxes[:, 0:2] = box_centers
        gt_boxes[:, 2:4] = box_dimensions

        output_13 = np.zeros(shape=[grid_dimensions[0][0], grid_dimensions[0][1], 3, 5+self.class_count], dtype=np.float32)
        output_26 = np.zeros(shape=[grid_dimensions[1][0], grid_dimensions[1][1], 3, 5+self.class_count], dtype=np.float32)
        output_52 = np.zeros(shape=[grid_dimensions[2][0], grid_dimensions[2][1], 3, 5+self.class_count], dtype=np.float32)

        outputs = [output_13, output_26, output_52]
        anchor_max = self.anchor_boxes / 2.
        anchor_min = -anchor_max
        valid_mask = box_dimensions[:, 0] > 0

        box_sizes = box_dimensions[valid_mask]
        box_sizes = np.expand_dims(box_sizes, -2)
        boxes_max = box_sizes / 2.
        boxes_min = -boxes_max

        intersect_mins = np.maximum(boxes_min, anchor_min)
        intersect_maxs = np.minimum(boxes_max, anchor_max)
        intersect_wh = np.maximum(intersect_maxs - intersect_mins, 0.)
        intersect_area = intersect_wh[..., 0] * intersect_wh[..., 1]
        box_area = box_sizes[..., 0] * box_sizes[..., 1]

        anchor_area = self.anchor_boxes[:, 0] * self.anchor_boxes[:, 1]
        iou_values = intersect_area / (box_area + anchor_area - intersect_area)
        best_anchors = np.argmax(iou_values, axis=-1)

        for idx, anchor_idx in enumerate(best_anchors):
            for layer_idx in range(layer_count):
                if anchor_idx not in anchor_masks[layer_idx]: 
                    continue

                grid_x = np.floor(gt_boxes[idx,0]/self.img_width*grid_dimensions[layer_idx][1]).astype('int32')
                grid_y = np.floor(gt_boxes[idx,1]/self.img_height*grid_dimensions[layer_idx][0]).astype('int32')

                anchor_position = anchor_masks[layer_idx].index(anchor_idx)
                class_id = gt_boxes[idx, 4].astype('int32')

                outputs[layer_idx][grid_y, grid_x, anchor_position, 0:4] = gt_boxes[idx, 0:4]
                outputs[layer_idx][grid_y, grid_x, anchor_position, 4] = 1.
                outputs[layer_idx][grid_y, grid_x, anchor_position, 5+class_id] = 1.

        return output_13, output_26, output_52

    def parse_example(self, serialized_data):
        features = tf.parse_single_example(
            serialized_data,
            features={
                'image': tf.FixedLenFeature([], dtype=tf.string),
                'boxes': tf.FixedLenFeature([], dtype=tf.string),
            }
        )

        image_data = tf.image.decode_jpeg(features['image'], channels=3)
        image_data = tf.image.convert_image_dtype(image_data, tf.uint8)

        bounding_boxes = tf.decode_raw(features['boxes'], tf.float32)
        bounding_boxes = tf.reshape(bounding_boxes, shape=[-1, 5])

        return self.preprocessing_pipeline(image_data, bounding_boxes)

class DatasetHandler(object):
    def __init__(self, parser, tfrecords_location, batch_size, shuffle_buffer=None, repeat_dataset=True):
        self.parser = parser
        self.file_names = tf.gfile.Glob(tfrecords_location)
        self.batch_size = batch_size
        self.shuffle_buffer = shuffle_buffer
        self.repeat_dataset = repeat_dataset
        self._initialize()

    def _initialize(self):
        try:
            self.dataset = tf.data.TFRecordDataset(self.file_names)
        except:
            raise NotImplementedError("No TFRecords found!")

        self.dataset = self.dataset.map(map_func=self.parser.parse_example, num_parallel_calls=10)
        self.dataset = self.dataset.repeat() if self.repeat_dataset else self.dataset

        if self.shuffle_buffer is not None:
            self.dataset = self.dataset.shuffle(self.shuffle_buffer)

        self.dataset = self.dataset.batch(self.batch_size).prefetch(self.batch_size)
        self.iterator = self.dataset.make_one_shot_iterator()

    def get_next_batch(self):
        return self.iterator.get_next()

YOLOv3-Tiny Configuration

For deployment on ARM chips, YOLOv3-Tiny is used instead of the full YOLOv3 model due to its reduced computational requirements.

Configuration modifications required:

  1. Edit yolov3-tiny.cfg file
  2. Focus on [yolo] sections and preceding [convolutional] layers
  3. Modify classes parameter in all [yolo] sections to match target object count (e.g., classes=4 for 4 object types)
  4. Update filters parameter in convolutional layers before [yolo] sections using formula: filters = 3 * (classes + 5)
  5. For 4 classes, set filters=27
  6. Update car_classes.txt in model_data directory with target object labels, one per line
Tags: tensorflow

Related Articles

Understanding Strong and Weak References in Java

Strong References Strong reference are the most prevalent type of object referencing in Java. When an object has a strong reference pointing to it, the garbage collector will not reclaim its memory. F...

Comprehensive Guide to SSTI Explained with Payload Bypass Techniques

Introduction Server-Side Template Injection (SSTI) is a vulnerability in web applications where user input is improper handled within the template engine and executed on the server. This exploit can r...

Implement Image Upload Functionality for Django Integrated TinyMCE Editor

Django’s Admin panel is highly user-friendly, and pairing it with TinyMCE, an effective rich text editor, simplifies content management significantly. Combining the two is particular useful for bloggi...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.