Implementing Thread-Safe Collections in C# for Concurrent Programming
The System.Collections.Concurrent namespace provides data structures designed for multi-threaded scenarios, offering scalability and thread safety while minimizing locking. These collections are essential for efficient parallel programming.
ConcurrentQueue
ConcurrentQueue implements a thread-safe first-in-first-out (FIFO) collection using atomic compare-and-swap operations and spin waiting. Elements are added with Enqueue and removed with TryDequeue.
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
class WorkItemProcessor
{
class JobItem
{
public int Identifier { get; init; }
}
static async Task GenerateJobs(ConcurrentQueue<JobItem> queue)
{
for (int i = 0; i < 15; i++)
{
await Task.Delay(30);
var job = new JobItem { Identifier = i };
queue.Enqueue(job);
Console.WriteLine($"Job {job.Identifier} added to queue.");
}
}
static async Task ProcessJobs(ConcurrentQueue<JobItem> queue, string workerName, CancellationToken token)
{
JobItem job;
bool hasJob = false;
await RandomPause();
do
{
hasJob = queue.TryDequeue(out job);
if (hasJob)
{
Console.WriteLine($"Job {job.Identifier} completed by {workerName}.");
}
await RandomPause();
}
while (!token.IsCancellationRequested);
}
static Task RandomPause()
{
int pauseMs = new Random().Next(50, 400);
return Task.Delay(pauseMs);
}
static async Task Main()
{
var jobQueue = new ConcurrentQueue<JobItem>();
var cancelSource = new CancellationTokenSource();
var producer = Task.Run(() => GenerateJobs(jobQueue));
Task[] workers = new Task[3];
for (int i = 0; i < 3; i++)
{
string workerId = $"Worker-{i}";
workers[i] = Task.Run(() => ProcessJobs(jobQueue, workerId, cancelSource.Token));
}
await producer;
cancelSource.CancelAfter(1500);
await Task.WhenAll(workers);
}
}
ConcurrentStack
ConcurrentStack provides a last-in-first-out (LIFO) collection without locks, using compare-and-swap operations. Elements are added with Push and removed with TryPop.
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
class StackProcessor
{
class TaskUnit
{
public int TaskId { get; init; }
}
static async Task Main()
{
var taskStack = new ConcurrentStack<TaskUnit>();
var cts = new CancellationTokenSource();
var producer = Task.Run(() => ProduceTasks(taskStack));
Task[] handlers = new Task[2];
for (int i = 0; i < 2; i++)
{
string handlerName = $"Handler{i}";
handlers[i] = Task.Run(() => HandleTasks(taskStack, handlerName, cts.Token));
}
await producer;
cts.CancelAfter(1000);
await Task.WhenAll(handlers);
}
static async Task ProduceTasks(ConcurrentStack<TaskUnit> stack)
{
for (int j = 0; j < 10; j++)
{
await Task.Delay(40);
stack.Push(new TaskUnit { TaskId = j });
Console.WriteLine($"Task {j} pushed onto stack.");
}
}
static async Task HandleTasks(ConcurrentStack<TaskUnit> stack, string name, CancellationToken token)
{
await Task.Delay(new Random().Next(20, 200));
do
{
TaskUnit task;
if (stack.TryPop(out task))
{
Console.WriteLine($"Task {task.TaskId} handled by {name}.");
}
await Task.Delay(new Random().Next(30, 300));
}
while (!token.IsCancellationRequested);
}
}
ConcurrentBag
ConcurrentBag is an unordered collection optimized for scenarios where threads primarily add and remove their own items, with minimal inter-thread interaction.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading.Tasks;
class WebCrawlerSimulation
{
static Dictionary<string, string[]> _siteLinks = new Dictionary<string, string[]>()
{
["site1.com"] = new[] { "site1.com/pageA", "site1.com/pageB" },
["site2.com"] = new[] { "site2.com/doc1", "site2.com/doc2" }
};
class CrawlJob
{
public string TargetUrl { get; init; }
public string Origin { get; init; }
}
static async Task Main()
{
var jobBag = new ConcurrentBag<CrawlJob>();
string[] startUrls = { "site1.com", "site2.com", "site3.com", "site4.com" };
foreach (var url in startUrls)
{
jobBag.Add(new CrawlJob { TargetUrl = url, Origin = "initial" });
}
Task[] crawlers = new Task[4];
for (int c = 0; c < 4; c++)
{
string crawlerId = $"Crawler{c}";
crawlers[c] = Task.Run(() => CrawlPages(jobBag, crawlerId));
}
await Task.WhenAll(crawlers);
}
static async Task CrawlPages(ConcurrentBag<CrawlJob> bag, string crawlerName)
{
CrawlJob job;
while (bag.TryTake(out job))
{
var discovered = await DiscoverLinks(job.TargetUrl);
if (discovered != null)
{
foreach (var link in discovered)
bag.Add(new CrawlJob { TargetUrl = link, Origin = crawlerName });
}
Console.WriteLine($"URL {job.TargetUrl} from {job.Origin} indexed by {crawlerName}.");
}
}
static async Task<IEnumerable<string>> DiscoverLinks(string url)
{
await Task.Delay(new Random().Next(100, 300));
return _siteLinks.ContainsKey(url) ? _siteLinks[url] : null;
}
}
ConcurrentDictionary
ConcurrentDictionary provides thread-safe key-value storage with fine-grained locking for writes and lock-free reads.
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
class DictionaryPerformance
{
const string ValueTemplate = "SampleValue";
const int Operations = 500000;
static string CurrentValue;
static void Main()
{
var concurrentDict = new ConcurrentDictionary<int, string>();
var regularDict = new Dictionary<int, string>();
var timer = new Stopwatch();
timer.Start();
for (int idx = 0; idx < Operations; idx++)
{
lock (regularDict)
{
regularDict[idx] = ValueTemplate;
}
}
timer.Stop();
Console.WriteLine($"Locked dictionary write: {timer.Elapsed}");
timer.Restart();
for (int idx = 0; idx < Operations; idx++)
{
concurrentDict[idx] = ValueTemplate;
}
timer.Stop();
Console.WriteLine($"Concurrent dictionary write: {timer.Elapsed}");
timer.Restart();
for (int idx = 0; idx < Operations; idx++)
{
lock (regularDict)
{
CurrentValue = regularDict[idx];
}
}
timer.Stop();
Console.WriteLine($"Locked dictionary read: {timer.Elapsed}");
timer.Restart();
for (int idx = 0; idx < Operations; idx++)
{
CurrentValue = concurrentDict[idx];
}
timer.Stop();
Console.WriteLine($"Concurrent dictionary read: {timer.Elapsed}");
}
}
BlockingCollection
BlockingCollection wraps a producer-consumer collection, supporting blocking operations, bounded capacity, and cancellation.
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
class PipelineExample
{
class DataItem
{
public int DataId { get; init; }
}
static async Task Main()
{
Console.WriteLine("Using default ConcurrentQueue backing:");
await ExecutePipeline();
Console.WriteLine("\nUsing ConcurrentStack backing:");
await ExecutePipeline(new ConcurrentStack<DataItem>());
}
static async Task ExecutePipeline(IProducerConsumerCollection<DataItem> backing = null)
{
var collection = backing == null
? new BlockingCollection<DataItem>()
: new BlockingCollection<DataItem>(backing);
var producer = Task.Run(() => ProduceData(collection));
Task[] consumers = new Task[3];
for (int c = 0; c < 3; c++)
{
string consumerId = $"Consumer{c}";
consumers[c] = Task.Run(() => ConsumeData(collection, consumerId));
}
await producer;
await Task.WhenAll(consumers);
}
static async Task ProduceData(BlockingCollection<DataItem> collection)
{
for (int i = 0; i < 8; i++)
{
await Task.Delay(80);
collection.Add(new DataItem { DataId = i });
Console.WriteLine($"Produced item {i}.");
}
collection.CompleteAdding();
}
static async Task ConsumeData(BlockingCollection<DataItem> collection, string name)
{
foreach (var item in collection.GetConsumingEnumerable())
{
Console.WriteLine($"Item {item.DataId} processed by {name}.");
await Task.Delay(new Random().Next(50, 250));
}
}
}