Implementing Vehicle and Pedestrian Detection with TensorFlow-based YOLOv3-Tiny
Network Architecture Overview
The implementation utilizes a TensorFlow adaptation of YOLOv3 with a Darknet53 backbone for real-time object detection of vehicles and pedestrians.
Code Structure
tf_yolov3
├── extract_voc.py # Generate training data format from raw files
├── make_voc_tfrecord.sh # Create TFRecord format
├── convert_tfrecord.py # Convert raw data to TFRecord format
├── dataset.py # Data augmentation and loading
├── utils.py # NMS, bounding box drawing, PB file operations
├── common.py # Convolution layers with padding (Slim)
├── yolov3.py # Define Darknet53 and YOLOv3 architecture
├── convert_weight.py # Convert Darknet weights to TensorFlow PB format
├── yolov3_train.py # Load parameters, data, and training loop
├── evaluate.py # Model performance evaluation
├── inference.py # Model inference using trained weights
├── show_input_image.py # Display input images and bounding boxes
├── kmeans.py # Calculate anchors from input data
└── test_video.py # Video testing of model performance
Data Preparation Pipeline
VOC Data Extraction
import os
import argparse
import xml.etree.ElementTree as ET
sets = [('2012', 'train'), ('2012', 'val')]
class_names = ["aeroplane", "bicycle", "bird", "boat", "bottle", "bus", "car", "cat", "chair", "cow", "diningtable", "dog", "horse", "motorbike", "person", "pottedplant", "sheep", "sofa", "train", "tvmonitor"]
parser = argparse.ArgumentParser()
parser.add_argument("--voc_directory", default="/home/user/datasets/VOCdevkit/training/")
parser.add_argument("--output_directory", default="./")
arguments = parser.parse_args()
def process_annotations(year, image_identifier, output_file):
annotation_file = os.path.join(arguments.voc_directory, f'VOCdevkit/VOC{year}/Annotations/{image_identifier}.xml')
with open(annotation_file) as xml_file:
tree = ET.parse(xml_file)
root = tree.getroot()
for obj in root.iter('object'):
difficulty = obj.find('difficult').text
class_name = obj.find('name').text
if class_name not in class_names or int(difficulty) == 1:
continue
class_id = class_names.index(class_name)
bbox = obj.find('bndbox')
coordinates = (int(bbox.find('xmin').text), int(bbox.find('ymin').text),
int(bbox.find('xmax').text), int(bbox.find('ymax').text))
output_file.write(" " + " ".join([str(coord) for coord in coordinates]) + " " + str(class_id))
for year, image_set in sets:
text_file_path = os.path.join(arguments.voc_directory, f'VOCdevkit/VOC{year}/ImageSets/Main/{image_set}.txt')
if not os.path.exists(text_file_path):
continue
image_identifiers = open(text_file_path).read().strip().split()
output_file_path = os.path.join(arguments.output_directory, f'{year}_{image_set}.txt')
output_file = open(output_file_path, 'w')
for image_id in image_identifiers:
image_location = os.path.join(arguments.voc_directory, f'VOCdevkit/VOC{year}/JPEGImages/{image_id}.jpg')
print("=>", image_location)
output_file.write(image_location)
process_annotations(year, image_id, output_file)
output_file.write('\n')
output_file.close()
TFRecord Conversion Script
python scripts/extract_voc.py --voc_directory /home/user/datasets/VOC/training/ --output_directory ./
cat ./2007_train.txt ./2007_val.txt > voc_training.txt
python scripts/extract_voc.py --voc_directory /home/user/datasets/VOC/testing/ --output_directory ./
cat ./2007_test.txt > voc_testing.txt
python core/convert_tfrecord.py --dataset_file ./voc_training.txt --tfrecord_prefix /home/user/datasets/VOC/training/voc_training
python core/convert_tfrecord.py --dataset_file ./voc_testing.txt --tfrecord_prefix /home/user/datasets/VOC/testing/voc_testing
TFRecord Format Converter
import sys
import argparse
import numpy as np
import tensorflow as tf
def main(argv):
parser = argparse.ArgumentParser()
parser.add_argument("--dataset_file", default='./data/dataset.txt')
parser.add_argument("--tfrecord_prefix", default='./data/tfrecords/dataset')
flags = parser.parse_args()
dataset_map = {}
with open(flags.dataset_file, 'r') as file_handle:
for line in file_handle.readlines():
elements = line.split(' ')
image_path = elements[0]
box_count = len(elements[1:]) // 5
bounding_boxes = np.zeros([box_count, 5], dtype=np.float32)
for idx in range(box_count):
bounding_boxes[idx] = elements[1+idx*5:6+idx*5]
dataset_map[image_path] = bounding_boxes
image_paths = list(dataset_map.keys())
total_images = len(image_paths)
print(f">> Processing {total_images} images")
tfrecord_filename = flags.tfrecord_prefix + ".tfrecords"
with tf.python_io.TFRecordWriter(tfrecord_filename) as writer:
for idx in range(total_images):
image_data = tf.gfile.FastGFile(image_paths[idx], 'rb').read()
boxes_data = dataset_map[image_paths[idx]]
boxes_data = boxes_data.tostring()
example_record = tf.train.Example(features=tf.train.Features(
feature={
'image': tf.train.Feature(bytes_list=tf.train.BytesList(value=[image_data])),
'boxes': tf.train.Feature(bytes_list=tf.train.BytesList(value=[boxes_data])),
}
))
writer.write(example_record.SerializeToString())
print(f">> Saved {total_images} images in {tfrecord_filename}")
Dataset Handling and Augmentation
import cv2
import numpy as np
from core import utils
import tensorflow as tf
class DataParser(object):
def __init__(self, img_height, img_width, anchor_boxes, class_count, debug_mode=False):
self.anchor_boxes = anchor_boxes
self.class_count = class_count
self.img_height = img_height
self.img_width = img_width
self.debug_mode = debug_mode
def horizontal_flip(self, image_data, ground_truth):
width_val = tf.cast(tf.shape(image_data)[1], tf.float32)
image_data = tf.image.flip_left_right(image_data)
x_min, y_min, x_max, y_max, label = tf.unstack(ground_truth, axis=1)
x_min, y_min, x_max, y_max = width_val - x_max, y_min, width_val - x_min, y_max
ground_truth = tf.stack([x_min, y_min, x_max, y_max, label], axis=1)
return image_data, ground_truth
def color_distortion(self, image_data, ground_truth):
image_data = tf.image.random_brightness(image_data, max_delta=32./255.)
image_data = tf.image.random_saturation(image_data, lower=0.8, upper=1.2)
image_data = tf.image.random_hue(image_data, max_delta=0.2)
image_data = tf.image.random_contrast(image_data, lower=0.8, upper=1.2)
return image_data, ground_truth
def gaussian_blur(self, image_data, ground_truth):
def apply_blur(img):
return cv2.GaussianBlur(img, (5, 5), 0)
height, width = image_data.shape.as_list()[:2]
image_data = tf.py_func(apply_blur, [image_data], tf.uint8)
image_data.set_shape([height, width, 3])
return image_data, ground_truth
def random_crop_operation(self, image_data, ground_truth, min_coverage=0.8,
aspect_range=[0.8, 1.2], area_range=[0.5, 1.0]):
height, width = tf.cast(tf.shape(image_data)[0], tf.float32), tf.cast(tf.shape(image_data)[1], tf.float32)
x_min, y_min, x_max, y_max, label = tf.unstack(ground_truth, axis=1)
bbox_coords = tf.stack([y_min/height, x_min/width, y_max/height, x_max/width], axis=1)
bbox_coords = tf.clip_by_value(bbox_coords, 0, 1)
start_point, crop_size, distorted_boxes = tf.image.sample_distorted_bounding_box(
tf.shape(image_data),
bounding_boxes=tf.expand_dims(bbox_coords, axis=0),
min_object_covered=min_coverage,
aspect_ratio_range=aspect_range,
area_range=area_range)
cropped_region = [distorted_boxes[0,0,1]*width, distorted_boxes[0,0,0]*height,
distorted_boxes[0,0,3]*width, distorted_boxes[0,0,2]*height]
cropped_x_min = tf.clip_by_value(x_min, cropped_region[0], cropped_region[2]) - cropped_region[0]
cropped_y_min = tf.clip_by_value(y_min, cropped_region[1], cropped_region[3]) - cropped_region[1]
cropped_x_max = tf.clip_by_value(x_max, cropped_region[0], cropped_region[2]) - cropped_region[0]
cropped_y_max = tf.clip_by_value(y_max, cropped_region[1], cropped_region[3]) - cropped_region[1]
image_data = tf.slice(image_data, start_point, crop_size)
ground_truth = tf.stack([cropped_x_min, cropped_y_min, cropped_x_max, cropped_y_max, label], axis=1)
return image_data, ground_truth
def preprocessing_pipeline(self, image_data, ground_truth):
image_data, ground_truth = utils.resize_image_correct_bbox(image_data, ground_truth,
self.img_height, self.img_width)
if self.debug_mode:
return image_data, ground_truth
true_13, true_26, true_52 = tf.py_func(self.process_true_boxes, inp=[ground_truth],
Tout=[tf.float32, tf.float32, tf.float32])
image_data = image_data / 255.
return image_data, true_13, true_26, true_52
def process_true_boxes(self, gt_boxes):
layer_count = len(self.anchor_boxes) // 3
anchor_masks = [[6,7,8], [3,4,5], [0,1,2]] if layer_count == 3 else [[3,4,5], [1,2,3]]
grid_dimensions = [[self.img_height//x, self.img_width//x] for x in (32, 16, 8)]
box_centers = (gt_boxes[:, 0:2] + gt_boxes[:, 2:4]) / 2
box_dimensions = gt_boxes[:, 2:4] - gt_boxes[:, 0:2]
gt_boxes[:, 0:2] = box_centers
gt_boxes[:, 2:4] = box_dimensions
output_13 = np.zeros(shape=[grid_dimensions[0][0], grid_dimensions[0][1], 3, 5+self.class_count], dtype=np.float32)
output_26 = np.zeros(shape=[grid_dimensions[1][0], grid_dimensions[1][1], 3, 5+self.class_count], dtype=np.float32)
output_52 = np.zeros(shape=[grid_dimensions[2][0], grid_dimensions[2][1], 3, 5+self.class_count], dtype=np.float32)
outputs = [output_13, output_26, output_52]
anchor_max = self.anchor_boxes / 2.
anchor_min = -anchor_max
valid_mask = box_dimensions[:, 0] > 0
box_sizes = box_dimensions[valid_mask]
box_sizes = np.expand_dims(box_sizes, -2)
boxes_max = box_sizes / 2.
boxes_min = -boxes_max
intersect_mins = np.maximum(boxes_min, anchor_min)
intersect_maxs = np.minimum(boxes_max, anchor_max)
intersect_wh = np.maximum(intersect_maxs - intersect_mins, 0.)
intersect_area = intersect_wh[..., 0] * intersect_wh[..., 1]
box_area = box_sizes[..., 0] * box_sizes[..., 1]
anchor_area = self.anchor_boxes[:, 0] * self.anchor_boxes[:, 1]
iou_values = intersect_area / (box_area + anchor_area - intersect_area)
best_anchors = np.argmax(iou_values, axis=-1)
for idx, anchor_idx in enumerate(best_anchors):
for layer_idx in range(layer_count):
if anchor_idx not in anchor_masks[layer_idx]:
continue
grid_x = np.floor(gt_boxes[idx,0]/self.img_width*grid_dimensions[layer_idx][1]).astype('int32')
grid_y = np.floor(gt_boxes[idx,1]/self.img_height*grid_dimensions[layer_idx][0]).astype('int32')
anchor_position = anchor_masks[layer_idx].index(anchor_idx)
class_id = gt_boxes[idx, 4].astype('int32')
outputs[layer_idx][grid_y, grid_x, anchor_position, 0:4] = gt_boxes[idx, 0:4]
outputs[layer_idx][grid_y, grid_x, anchor_position, 4] = 1.
outputs[layer_idx][grid_y, grid_x, anchor_position, 5+class_id] = 1.
return output_13, output_26, output_52
def parse_example(self, serialized_data):
features = tf.parse_single_example(
serialized_data,
features={
'image': tf.FixedLenFeature([], dtype=tf.string),
'boxes': tf.FixedLenFeature([], dtype=tf.string),
}
)
image_data = tf.image.decode_jpeg(features['image'], channels=3)
image_data = tf.image.convert_image_dtype(image_data, tf.uint8)
bounding_boxes = tf.decode_raw(features['boxes'], tf.float32)
bounding_boxes = tf.reshape(bounding_boxes, shape=[-1, 5])
return self.preprocessing_pipeline(image_data, bounding_boxes)
class DatasetHandler(object):
def __init__(self, parser, tfrecords_location, batch_size, shuffle_buffer=None, repeat_dataset=True):
self.parser = parser
self.file_names = tf.gfile.Glob(tfrecords_location)
self.batch_size = batch_size
self.shuffle_buffer = shuffle_buffer
self.repeat_dataset = repeat_dataset
self._initialize()
def _initialize(self):
try:
self.dataset = tf.data.TFRecordDataset(self.file_names)
except:
raise NotImplementedError("No TFRecords found!")
self.dataset = self.dataset.map(map_func=self.parser.parse_example, num_parallel_calls=10)
self.dataset = self.dataset.repeat() if self.repeat_dataset else self.dataset
if self.shuffle_buffer is not None:
self.dataset = self.dataset.shuffle(self.shuffle_buffer)
self.dataset = self.dataset.batch(self.batch_size).prefetch(self.batch_size)
self.iterator = self.dataset.make_one_shot_iterator()
def get_next_batch(self):
return self.iterator.get_next()
YOLOv3-Tiny Configuration
For deployment on ARM chips, YOLOv3-Tiny is used instead of the full YOLOv3 model due to its reduced computational requirements.
Configuration modifications required:
- Edit
yolov3-tiny.cfgfile - Focus on
[yolo]sections and preceding[convolutional]layers - Modify
classesparameter in all[yolo]sections to match target object count (e.g., classes=4 for 4 object types) - Update
filtersparameter in convolutional layers before[yolo]sections using formula:filters = 3 * (classes + 5) - For 4 classes, set filters=27
- Update
car_classes.txtinmodel_datadirectory with target object labels, one per line