Parallel Query Execution with PLINQ and Concurrent Data Structures in .NET
Understanding PLINQ Fundamentals
PLINQ extends Language Integrated Query capabilities in .NET by enabling parallel execution across available CPU cores. This extension transforms standard LINQ operations into multi-threaded processes that automatically partition data and distribute workload through the thread pool.
A critical concept shared between LINQ and PLINQ is deferred execution. Queries remain dormant until enumeration triggers actual processing. Consider this inventory management example:
internal void FilterInventory(List<Product> warehouseStock)
{
// Executes immediately due to ToList()
var highValueItems = warehouseStock.Where(p => p.Price > 1000).ToList();
// Deferred execution - query defined but not executed
IEnumerable<Product> discountedItems = warehouseStock.Where(p => p.DiscountPercent > 20);
// Execution occurs during enumeration
foreach (var item in discountedItems)
{
// Process each discounted product
}
}
The first query materializes instantly, while the second waits until the foreach loop begins iteration. PLINQ follows identical deferred exeecution principles.
Performance Characteristics and Trade-offs
Not every LINQ query benefits from parallelization. The overhead of thread management can outweigh gains for trivial operations on small datasets. Effective PLINQ scenarios involve:
- Large-scale collections (thousands+ elements)
- CPU-intensive transformations per element
- Independent operations without shared state
Hardware configuration significantly impacts results. Multi-core systems see substantial improvements as PLINQ can create more partitions. However, ordering operations like OrderBy introduce synchronization costs that may reduce throughput. Queries where element sequence is irrelevant achieve optimal performance.
Core PLINQ Operators
The ParallelEnumerable class exposes PLINQ functionality through specialized operators. Two essential methods control execution flow:
AsParallel()- Enables parallel execution for subsequent operationsAsSequential()- Reverts to sequential processing
Examine this user analytics scenario processing millions of records:
internal class UserProfile
{
public string GivenName { get; set; } = "";
public string FamilyName { get; set; } = "";
public int Years { get; set; }
}
internal IEnumerable<IGrouping<string, UserProfile>> AnalyzeUsers(List<UserProfile> userData)
{
return userData.AsParallel()
.Where(u => u.Years >= 18)
.AsSequential()
.GroupBy(u => u.FamilyName);
}
The Where clause executes in parallel while GroupBy runs sequentially. The optimal split depends on data characteristics and requires empirical testing.
Syntax Variations
PLINQ supports both method chaining and query comprehension syntaxes. These equivalent queries filter active users:
Method Syntax:
var activeUsers = userProfiles.AsParallel().Where(u => u.IsActive);
Query Syntax:
var activeUsers = from profile in userProfiles.AsParallel()
where profile.IsActive
select profile;
Both produce identical results. Method syntax appears throughout remaining examples for consistency.
Transforming Sequential LINQ to PLINQ
Migrating existing queries requires minimal changes. Insert AsParallel() into the call chain:
// Sequential
var results = source.Where(x => x.Value > threshold);
// Parallel
var results = source.AsParallel().Where(x => x.Value > threshold);
For order-dependent operations, append AsOrdered():
var orderedResults = dataset.AsParallel().AsOrdered()
.Where(x => x.Category == "Premium");
To explicitly discard ordering for performance:
var unorderedResults = dataset.AsParallel().AsUnordered()
.Select(x => x.Transform());
Parallel Iteration with ForAll
Traditional loops can be replaced with PLINQ's ForAll method for superior performance. Consider order processing:
internal void ProcessOrdersSequentially(List<Order> orders)
{
foreach (var order in orders)
{
if (order.Status != OrderStatus.Pending) continue;
ValidateAndShip(order);
}
}
internal void ProcessOrdersWithPLINQ(List<Order> orders)
{
orders.AsParallel()
.Where(o => o.Status == OrderStatus.Pending)
.ForAll(ValidateAndShip);
}
ForAll avoids the merge step required by Parallel.ForEach, maintaining partitioned data throughout execution. This reduces synchronization overhead significantly.
Exception Management Strategies
PLINQ aggregates unhandled exceptions from all partitions into an AggregateException. Always wrap PLINQ queries in try/catch blocks:
internal class OrderProcessor
{
private void ValidateAndShip(Order shipment)
{
if (shipment.TotalWeight > 1000)
throw new InvalidOperationException($"Order {shipment.Id} exceeds weight limit");
// Shipping logic
}
internal void ProcessBatch(List<Order> batch)
{
try
{
batch.AsParallel()
.Where(o => o.Status == OrderStatus.Pending)
.ForAll(ValidateAndShip);
}
catch (AggregateException aggEx)
{
foreach (var ex in aggEx.InnerExceptions)
{
Console.WriteLine($"Processing failed: {ex.Message}");
}
}
}
}
Unhandled exceptions abort the entire query. For fault-tolerant processing, handle exceptions within the delegate itself.
Data Ordering Control
PLINQ provides granular control over element sequencing. Create a test class to observe ordering behavior:
internal class CustomerRecord
{
public string Identifier { get; set; } = "";
public int PriorityScore { get; set; }
public bool RequiresFastTrack { get; set; }
}
internal IEnumerable<CustomerRecord> GetPriorityCustomers(List<CustomerRecord> customers)
{
// No ordering specified - fastest execution
return customers.AsParallel()
.Where(c => c.RequiresFastTrack && c.PriorityScore > 50);
}
internal IEnumerable<CustomerRecord> GetPriorityCustomersOrdered(List<CustomerRecord> customers)
{
// Preserves source sequence
return customers.AsParallel().AsOrdered()
.Where(c => c.RequiresFastTrack && c.PriorityScore > 50);
}
Only the AsOrdered() variant guarantees predictable sequence. AsUnordered() explicitly tells PLINQ to ignore order for maximum throughput.
Merge Option Configuration
The WithMergeOptions method controls how results stream from parallel partitions:
NotBuffered
Elements return immediately after processing, ideal for streaming scenarios. Unsupported by sorting operations.
AutoBuffered
Returns variable-sized batches. Default behavior for most operators, balancing throughput and latency.
FullyBuffered
Waits for complete query execution before returning any data. Maximizes total throughput but increases time-to-first-result.
Default
Equivalent to omitting WithMergeOptions. Lets PLINQ choose optimal strategy.
Practical Merge Option Example
internal class DataAnalyzer
{
internal IEnumerable<Metric> StreamProcessing(List<Metric> measurements)
{
return measurements.AsParallel()
.WithMergeOptions(ParallelMergeOptions.NotBuffered)
.Where(m => m.Value > threshold);
}
internal IEnumerable<Metric> BatchProcessing(List<Metric> measurements)
{
return measurements.AsParallel()
.WithMergeOptions(ParallelMergeOptions.FullyBuffered)
.OrderBy(m => m.Timestamp);
}
}
Concurrent Collections Integration
When PLINQ queries modify shared collections, use thread-safe concurrent collections from System.Collections.Concurrent:
ConcurrentBag<T>- Unordered collection for object poolingConcurrentDictionary<TKey, TValue>- Thread-safe key-value storeBlockingCollection<T>- Producer-consumer boundary with bounding
These eliminate manual synchronization for add/remove operationss.
Synchronization Primitive Usage
For fine-grained control without collection overhead, apply lightweight synchronization primitives. This example uses SpinLock to protect critical sections:
internal class ConcurrentUpdater
{
private SpinLock _lock = new SpinLock();
internal void UpdateRecords(List<UserProfile> profiles)
{
profiles.AsParallel()
.Where(p => p.Years > 100)
.ForAll(CappedUpdate);
}
private void CappedUpdate(UserProfile profile)
{
bool lockTaken = false;
try
{
_lock.Enter(ref lockTaken);
if (profile.Years > 100)
profile.Years = 100;
}
finally
{
if (lockTaken) _lock.Exit();
}
}
}