Using TensorFlow Queues and Threads for Asynchronous Data Loading and Preprocessing
Queues for Managing Data
Queues are essential for organizing training samples, particularly when the order of data matters.
tf.FIFOQueue implements a first-in, first-out queue, maintaining the order of enqueued elements. tf.RandomShuffleQueue dequeues elements in a random order.
Using tf.FIFOQueue
tf.FIFOQueue(capacity, dtypes, name='fifo_queue') creates a queue with a fixed capacity and specified data types. The dtypes argument defines the structure of each element. Key operations include:
dequeue(name=None): Removes and returns an element.enqueue(vals, name=None): Adds an element to the queue.enqueue_many(vals, name=None): Enqueues multiple elements from a list or tuple.size(name=None): Returns the current queue size as a tensor.
The following example demonstrates synchronous operations within a single thread, performing dequeue, increment, and re-enqueue.
import tensorflow as tf
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
def process_with_fifo():
data_queue = tf.FIFOQueue(capacity=3, dtypes=tf.float32)
enqueue_initial = data_queue.enqueue_many([[0.1, 0.2, 0.3]])
dequeue_op = data_queue.dequeue()
increment_op = dequeue_op + 1.0
enqueue_incremented = data_queue.enqueue(increment_op)
with tf.Session() as sess:
sess.run(enqueue_initial)
for _ in range(3):
sess.run(enqueue_incremented)
for _ in range(data_queue.size().eval()):
result = sess.run(dequeue_op)
print(result)
if __name__ == '__main__':
process_with_fifo()
Synchronous processing can become inefficient with large datasets, as the main thread blocks on I/O operations. Asynchronous execution with multiple threads solves this by allowing the main thread to train the model while separate threads load data.
QueueRunners and Coordinator for Asynchronous Operations
tf.train.QueueRunner(queue, enqueue_ops=None) manages multiple threads for enqueue operations. The create_threads(sess, coord=None, start=False) method creates and optionally starts these threads.
An uncoordinated asynchronous setup can leave threads running after the main task completes. The tf.train.Coordinator() clas provides a mechanism for graceful thread termination.
request_stop(): Signals all managed threads to stop.join(threads=None, stop_grace_period_secs=120): Waits for the specified threads to terminate.
The example below showcases asynchronous incrementation and enqueuing with proper thread coordination.
def async_queue_example():
main_queue = tf.FIFOQueue(1000, tf.float32)
counter = tf.Variable(0.0)
increment = tf.assign_add(counter, tf.constant(1.0))
enqueue_increment = main_queue.enqueue(increment)
dequeue_op = main_queue.dequeue()
queue_runner = tf.train.QueueRunner(queue=main_queue, enqueue_ops=[enqueue_increment] * 2)
init = tf.global_variables_initializer()
with tf.Session() as sess:
sess.run(init)
coordinator = tf.train.Coordinator()
worker_threads = queue_runner.create_threads(sess=sess, coord=coordinator, start=True)
for i in range(1000):
value = sess.run(dequeue_op)
print(value)
coordinator.request_stop()
coordinator.join(threads=worker_threads)
if __name__ == '__main__':
async_queue_example()
File and Image Reading Pipeline
TensorFlow's data reading pipeline typically follows three steps: constructing a file queue, reading with a suitable reader, and decoding the content.
1. Constructing a File Queue
tf.train.string_input_producer(string_tensor, shuffle=True) creates a queue of filenames from a tensor.
2. Choosing a File Reader
Select a reader based on the file format:
tf.TextLineReader(): For text files like CSV.tf.FixedLengthRecordReader(record_bytes): For binary files with fixed-length records.tf.TFRecordReader(): For TFRecord files.tf.WholeFileReader(): Reads the entire content of an image file.
The read(file_queue) method returns a tuple (key, value).
3. Decoding File Content
Decoders convert the raw string bytes into tensors.
tf.decode_csv(records, record_defaults, field_delim=','): Parses CSV lines.tf.decode_raw(bytes, out_type): Converts bytes to numeric vectors (e.g., for binary data).tf.image.decode_jpeg(contents)/tf.image.decode_png(contents): Decodes image data.
Batch Processing with Queues
For efficient training, data is often batched using:
tf.train.batch(tensors, batch_size, capacity=32): Creates batches in order.tf.train.shuffle_batch(tensors, batch_size, capacity, min_after_dequeue): Creates randomly shuffled batches.
To start all queue-related threads, call tf.train.start_queue_runners(sess=sess, coord=coord).
Example: Reading CSV Files
import tensorflow as tf
import os
def read_csv_data():
data_files = ['./data/file1.csv', './data/file2.csv']
file_queue = tf.train.string_input_producer(data_files)
reader = tf.TextLineReader()
_, line_content = reader.read(file_queue)
col_a, col_b = tf.decode_csv(line_content, record_defaults=[[''], ['']], field_delim=',')
batched_col_a, batched_col_b = tf.train.batch([col_a, col_b], batch_size=100, num_threads=2, capacity=10)
with tf.Session() as sess:
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
batch_a, batch_b = sess.run([batched_col_a, batched_col_b])
print(batch_a, batch_b)
coord.request_stop()
coord.join(threads)
Example: Reading and Processing Images
Images often require resizing for uniformity. Use tf.image.resize_images(images, size).
def load_and_process_images():
image_paths = ['./images/dog1.jpg', './images/dog2.jpg']
path_queue = tf.train.string_input_producer(image_paths)
reader = tf.WholeFileReader()
_, file_content = reader.read(path_queue)
image_data = tf.image.decode_jpeg(file_content)
resized_image = tf.image.resize_images(image_data, size=[200, 200])
resized_image.set_shape([200, 200, 3]) # Set static shape for batching
image_batch = tf.train.batch([resized_image], batch_size=10, capacity=100)
with tf.Session() as sess:
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
batch_result = sess.run(image_batch)
print(batch_result.shape)
coord.request_stop()
coord.join(threads)
TFRecords: TensorFlow's Native Format
TFRecords is a binary file format optimized for TensorFlow, efficient storing both data and labels.
Writing to TFRecords
- Create a writer:
writer = tf.python_io.TFRecordWriter(path). - For each sample, construct an
tf.train.Exampleprotocol buffer. - Serialize and write the example:
writer.write(example.SerializeToString()).
def create_tfrecord_example(feature_dict):
features = tf.train.Features(feature=feature_dict)
return tf.train.Example(features=features)
def bytes_feature(value):
return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))
def int64_feature(value):
return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))
Reading from TFRecords
- Create a
tf.TFRecordReader(). - Read and parse each serialized example using
tf.parse_single_example(serialized, features). - Define the expected features using
tf.FixedLenFeature([], dtype). - Decode and reshape the data as needed.
def parse_tfrecord_function(example_proto):
feature_description = {
'image': tf.FixedLenFeature([], tf.string),
'label': tf.FixedLenFeature([], tf.int64),
}
parsed_features = tf.parse_single_example(example_proto, feature_description)
image = tf.decode_raw(parsed_features['image'], tf.uint8)
image = tf.reshape(image, [32, 32, 3])
label = tf.cast(parsed_features['label'], tf.int32)
return image, label
def read_tfrecords():
file_queue = tf.train.string_input_producer(['data.tfrecords'])
reader = tf.TFRecordReader()
_, serialized_example = reader.read(file_queue)
image, label = parse_tfrecord_function(serialized_example)
image_batch, label_batch = tf.train.batch([image, label], batch_size=64)
return image_batch, label_batch
Storing data as uint8 saves space, but converting to float32 before computation improves precision. Use tf.cast(tensor, tf.float32) for conversion.