Fading Coder

One Final Commit for the Last Sprint

Home > Tech > Content

Implementing Thread-Safe Collections in C# for Concurrent Programming

Tech 2

The System.Collections.Concurrent namespace provides data structures designed for multi-threaded scenarios, offering scalability and thread safety while minimizing locking. These collections are essential for efficient parallel programming.

ConcurrentQueue

ConcurrentQueue implements a thread-safe first-in-first-out (FIFO) collection using atomic compare-and-swap operations and spin waiting. Elements are added with Enqueue and removed with TryDequeue.

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

class WorkItemProcessor
{
    class JobItem
    {
        public int Identifier { get; init; }
    }

    static async Task GenerateJobs(ConcurrentQueue<JobItem> queue)
    {
        for (int i = 0; i < 15; i++)
        {
            await Task.Delay(30);
            var job = new JobItem { Identifier = i };
            queue.Enqueue(job);
            Console.WriteLine($"Job {job.Identifier} added to queue.");
        }
    }

    static async Task ProcessJobs(ConcurrentQueue<JobItem> queue, string workerName, CancellationToken token)
    {
        JobItem job;
        bool hasJob = false;
        
        await RandomPause();
        do
        {
            hasJob = queue.TryDequeue(out job);
            if (hasJob)
            {
                Console.WriteLine($"Job {job.Identifier} completed by {workerName}.");
            }
            await RandomPause();
        }
        while (!token.IsCancellationRequested);
    }

    static Task RandomPause()
    {
        int pauseMs = new Random().Next(50, 400);
        return Task.Delay(pauseMs);
    }

    static async Task Main()
    {
        var jobQueue = new ConcurrentQueue<JobItem>();
        var cancelSource = new CancellationTokenSource();
        
        var producer = Task.Run(() => GenerateJobs(jobQueue));
        
        Task[] workers = new Task[3];
        for (int i = 0; i < 3; i++)
        {
            string workerId = $"Worker-{i}";
            workers[i] = Task.Run(() => ProcessJobs(jobQueue, workerId, cancelSource.Token));
        }
        
        await producer;
        cancelSource.CancelAfter(1500);
        await Task.WhenAll(workers);
    }
}

ConcurrentStack

ConcurrentStack provides a last-in-first-out (LIFO) collection without locks, using compare-and-swap operations. Elements are added with Push and removed with TryPop.

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

class StackProcessor
{
    class TaskUnit
    {
        public int TaskId { get; init; }
    }

    static async Task Main()
    {
        var taskStack = new ConcurrentStack<TaskUnit>();
        var cts = new CancellationTokenSource();

        var producer = Task.Run(() => ProduceTasks(taskStack));
        
        Task[] handlers = new Task[2];
        for (int i = 0; i < 2; i++)
        {
            string handlerName = $"Handler{i}";
            handlers[i] = Task.Run(() => HandleTasks(taskStack, handlerName, cts.Token));
        }

        await producer;
        cts.CancelAfter(1000);
        await Task.WhenAll(handlers);
    }

    static async Task ProduceTasks(ConcurrentStack<TaskUnit> stack)
    {
        for (int j = 0; j < 10; j++)
        {
            await Task.Delay(40);
            stack.Push(new TaskUnit { TaskId = j });
            Console.WriteLine($"Task {j} pushed onto stack.");
        }
    }

    static async Task HandleTasks(ConcurrentStack<TaskUnit> stack, string name, CancellationToken token)
    {
        await Task.Delay(new Random().Next(20, 200));
        do
        {
            TaskUnit task;
            if (stack.TryPop(out task))
            {
                Console.WriteLine($"Task {task.TaskId} handled by {name}.");
            }
            await Task.Delay(new Random().Next(30, 300));
        }
        while (!token.IsCancellationRequested);
    }
}

ConcurrentBag

ConcurrentBag is an unordered collection optimized for scenarios where threads primarily add and remove their own items, with minimal inter-thread interaction.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading.Tasks;

class WebCrawlerSimulation
{
    static Dictionary<string, string[]> _siteLinks = new Dictionary<string, string[]>()
    {
        ["site1.com"] = new[] { "site1.com/pageA", "site1.com/pageB" },
        ["site2.com"] = new[] { "site2.com/doc1", "site2.com/doc2" }
    };

    class CrawlJob
    {
        public string TargetUrl { get; init; }
        public string Origin { get; init; }
    }

    static async Task Main()
    {
        var jobBag = new ConcurrentBag<CrawlJob>();
        string[] startUrls = { "site1.com", "site2.com", "site3.com", "site4.com" };
        
        foreach (var url in startUrls)
        {
            jobBag.Add(new CrawlJob { TargetUrl = url, Origin = "initial" });
        }

        Task[] crawlers = new Task[4];
        for (int c = 0; c < 4; c++)
        {
            string crawlerId = $"Crawler{c}";
            crawlers[c] = Task.Run(() => CrawlPages(jobBag, crawlerId));
        }

        await Task.WhenAll(crawlers);
    }

    static async Task CrawlPages(ConcurrentBag<CrawlJob> bag, string crawlerName)
    {
        CrawlJob job;
        while (bag.TryTake(out job))
        {
            var discovered = await DiscoverLinks(job.TargetUrl);
            if (discovered != null)
            {
                foreach (var link in discovered)
                bag.Add(new CrawlJob { TargetUrl = link, Origin = crawlerName });
            }
            Console.WriteLine($"URL {job.TargetUrl} from {job.Origin} indexed by {crawlerName}.");
        }
    }

    static async Task<IEnumerable<string>> DiscoverLinks(string url)
    {
        await Task.Delay(new Random().Next(100, 300));
        return _siteLinks.ContainsKey(url) ? _siteLinks[url] : null;
    }
}

ConcurrentDictionary

ConcurrentDictionary provides thread-safe key-value storage with fine-grained locking for writes and lock-free reads.

using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;

class DictionaryPerformance
{
    const string ValueTemplate = "SampleValue";
    const int Operations = 500000;
    static string CurrentValue;

    static void Main()
    {
        var concurrentDict = new ConcurrentDictionary<int, string>();
        var regularDict = new Dictionary<int, string>();
        var timer = new Stopwatch();

        timer.Start();
        for (int idx = 0; idx < Operations; idx++)
        {
            lock (regularDict)
            {
                regularDict[idx] = ValueTemplate;
            }
        }
        timer.Stop();
        Console.WriteLine($"Locked dictionary write: {timer.Elapsed}");

        timer.Restart();
        for (int idx = 0; idx < Operations; idx++)
        {
            concurrentDict[idx] = ValueTemplate;
        }
        timer.Stop();
        Console.WriteLine($"Concurrent dictionary write: {timer.Elapsed}");

        timer.Restart();
        for (int idx = 0; idx < Operations; idx++)
        {
            lock (regularDict)
            {
                CurrentValue = regularDict[idx];
            }
        }
        timer.Stop();
        Console.WriteLine($"Locked dictionary read: {timer.Elapsed}");

        timer.Restart();
        for (int idx = 0; idx < Operations; idx++)
        {
            CurrentValue = concurrentDict[idx];
        }
        timer.Stop();
        Console.WriteLine($"Concurrent dictionary read: {timer.Elapsed}");
    }
}

BlockingCollection

BlockingCollection wraps a producer-consumer collection, supporting blocking operations, bounded capacity, and cancellation.

using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;

class PipelineExample
{
    class DataItem
    {
        public int DataId { get; init; }
    }

    static async Task Main()
    {
        Console.WriteLine("Using default ConcurrentQueue backing:");
        await ExecutePipeline();
        
        Console.WriteLine("\nUsing ConcurrentStack backing:");
        await ExecutePipeline(new ConcurrentStack<DataItem>());
    }

    static async Task ExecutePipeline(IProducerConsumerCollection<DataItem> backing = null)
    {
        var collection = backing == null 
            ? new BlockingCollection<DataItem>() 
            : new BlockingCollection<DataItem>(backing);

        var producer = Task.Run(() => ProduceData(collection));
        
        Task[] consumers = new Task[3];
        for (int c = 0; c < 3; c++)
        {
            string consumerId = $"Consumer{c}";
            consumers[c] = Task.Run(() => ConsumeData(collection, consumerId));
        }

        await producer;
        await Task.WhenAll(consumers);
    }

    static async Task ProduceData(BlockingCollection<DataItem> collection)
    {
        for (int i = 0; i < 8; i++)
        {
            await Task.Delay(80);
            collection.Add(new DataItem { DataId = i });
            Console.WriteLine($"Produced item {i}.");
        }
        collection.CompleteAdding();
    }

    static async Task ConsumeData(BlockingCollection<DataItem> collection, string name)
    {
        foreach (var item in collection.GetConsumingEnumerable())
        {
            Console.WriteLine($"Item {item.DataId} processed by {name}.");
            await Task.Delay(new Random().Next(50, 250));
        }
    }
}

Related Articles

Understanding Strong and Weak References in Java

Strong References Strong reference are the most prevalent type of object referencing in Java. When an object has a strong reference pointing to it, the garbage collector will not reclaim its memory. F...

Comprehensive Guide to SSTI Explained with Payload Bypass Techniques

Introduction Server-Side Template Injection (SSTI) is a vulnerability in web applications where user input is improper handled within the template engine and executed on the server. This exploit can r...

Implement Image Upload Functionality for Django Integrated TinyMCE Editor

Django’s Admin panel is highly user-friendly, and pairing it with TinyMCE, an effective rich text editor, simplifies content management significantly. Combining the two is particular useful for bloggi...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.