Mastering TPL: Best Practices for Task Parallel Library
What is the Task Parallel Library (TPL)?
The Task Parallel Library (TPL) is a powerful framework introduced in .NET that simplifies parallel and concurrent programming. It provides a set of APIs and patterns for creating and managing parallelism, allowing developers to write efficient and scalable code for multi-core processors and parallel computing scenarios.
TPL primarily revolves around the concept of tasks, which are lightweight units of work. These tasks can be executed concurrently, asynchronously, or in parallel, depending on the requirements of the application. Tasks are designed to abstract low-level thread management and synchronization, making it easier for developers to harness the full potential of modern hardware without dealing with the intricacies of threads and locks.
Why is TPL Required?
TPL is required for several reasons:
- Performance Optimization: With the increasing prevalence of multi-core processors, sequential code execution may not fully utilize available CPU resources. TPL enables developers to parallelize operations, improving application performance and responsiveness.
- Simplicity: TPL abstracts complex threading details, making parallel and asynchronous programming more accessible. Developers can focus on the logic of their application rather than managing threads manually.
- Scalability: TPL scales well for both compute-bound and I/O-bound operations. It enables applications to efficiently handle concurrent tasks, such as serving multiple client requests or processing large datasets.
- Responsive User Interfaces: For applications with user interfaces (e.g., desktop or web apps), TPL helps prevent UI freezes by allowing time-consuming tasks to run in the background without blocking the main thread.
Pros of TPL:
- Simplicity: TPL simplifies parallel programming by abstracting the complexity of thread management and synchronization.
- Performance: TPL can significantly improve the performance of applications by leveraging multi-core processors.
- Maintainability: Parallel code written with TPL is often more readable and maintainable compared to low-level thread-based code.
- Resource Management: TPL efficiently manages system resources, preventing thread and resource exhaustion.
- Cancellation and Exception Handling: TPL provides mechanisms for task cancellation and exception handling, making code more robust.
Cons of TPL:
- Learning Curve: Learning to use TPL effectively can be challenging for beginners, especially those new to parallel and asynchronous programming.
- Overhead: In some cases, there may be overhead associated with creating and managing tasks, which can impact performance.
- Complex Debugging: Debugging parallel code can be more complex, as race conditions and concurrency issues may arise.
- Thread Starvation: In scenarios with excessive parallelism, thread starvation or contention can occur, affecting performance.
Having discussed the background of TPL. Now, let’s look at the examples of it.
Section 1: Beginner Level
1.1 Getting Started with Tasks
- Basics of creating and running tasks.
Explanation:
- In this example, we have a
TPLBeginnerExample
class with aMain
method, which serves as our entry point. - Inside the
Main
method, we create a simple task usingTask.Run
. The lambda expression() => { Console.WriteLine("Hello, TPL!"); }
represents the work that the task will perform, which is printing "Hello, TPL!" to the console. - We then call the
Wait
method on the task to wait for it to complete. This is a simple way to ensure that the task finishes before moving on. - After waiting for the task to complete, we print “Task has completed.” to the console.
- When you run this code, you will see the output “Hello, TPL!” followed by “Task has completed.”
This example demonstrates the basic concept of creating and running a task using the Task Parallel Library (TPL). It’s a simple introduction to tasks and provides a foundation for more advanced TPL concepts in later sections.
1.2 Task Continuation
- Task continuations for executing code after a task completes.
Understanding Task Continuation
Task continuation is a mechanism for chaining tasks together, where one task starts automatically when another task completes. It allows you to define a sequence of operations that should occur in a specific order, often used for handling the results or errors of a preceding task.
Using ContinueWith
to Handle Task Results
ContinueWith
is a method available on tasks that allows you to specify an action to execute when the task completes. You can think of it as "continuing" with a specific action after the original task finishes. Here's a basic example:
Explanation:
- We start with an
originalTask
that performs a time-consuming computation and returns an integer result (in this case, 27). - We create a
continuationTask
usingoriginalTask.ContinueWith
. This continuation task defines what should happen afteroriginalTask
completes. - In the continuation task, we access the result of the previous task using
previousTask.Result
, and we use that result to construct a string message. - We wait for the continuation task to complete using
continuationTask.Wait()
, and then we print the result of the continuation task to the console.
This demonstrates how ContinueWith
allows you to specify what should occur after the completion of a previous task. You can use it to handle results, perform additional work, or propagate exceptions.
Keep in mind that while ContinueWith
is a powerful tool, .NET also provides a more modern and expressive way of handling task continuations using async/await
. Async/await simplifies asynchronous programming and makes it more readable, especially when dealing with multiple task continuations.
1.3 Parallel.For and Parallel.ForEach
- Usage
Parallel.For
andParallel.ForEach
for simple parallelism.
Parallel.For and Parallel.ForEach Overview
Parallel.For
: This construct is used to parallelize afor
loop. It divides the iterations of the loop into multiple tasks that can run concurrently on different threads. It's suitable for scenarios where you have a fixed number of iterations and want to distribute them among available processors.Parallel.ForEach
: This construct is used to parallelize aforeach
loop. It processes elements of a collection in parallel, spreading the work across multiple threads. It's especially useful when working with collections, such as arrays or lists.
Using Parallel.For
Here’s an example of using Parallel.For
to calculate the sum of integers from 1 to 100 in parallel:
In this example:
Parallel.For
divides the range from 1 to 100 into chunks and assigns each chunk to a separate task, allowing the tasks to run in parallel.- The lambda expression
i => { sum += i; }
is the action that gets executed for each iteration. It updates thesum
variable concurrently.
Using Parallel.ForEach
Here’s an example of using Parallel.ForEach
to process elements of a collection in parallel:
Explaination:
Parallel.ForEach
processes each element of thenumbers
collection in parallel, spreading the work across available threads.- The lambda expression
number => { double square = Math.Pow(number, 2); sumSquares += square; }
is executed for each element. It calculates the square of each number and accumulates the result in thesumSquares
variable concurrently.
Parallel.For
and Parallel.ForEach
are powerful tools for simple parallelism, and they can significantly improve the performance of operations that can be parallelized. However, keep in mind that not all loops are suitable for parallelization, and care should be taken when dealing with shared data and synchronization.
Section 2: Intermediate Level
2.1 Async/Await with TPL
async/await
with TPL for asynchronous task execution.
Combining async/await
with TPL
async/await
is a powerful feature in C# that simplifies asynchronous programming. It allows you to write asynchronous code in a more sequential and readable manner. However, there are scenarios where you may want to offload CPU-bound or long-running operations to a separate thread to prevent blocking the main thread. This is where TPL comes into play.
Using Task.Run
and await
Task.Run
is a TPL method that allows you to execute a delegate on a separate thread pool thread. You can use Task.Run
to offload CPU-bound work, and then await
the result asynchronously to ensure your application remains responsive. Here's an example:
Explaination:
- We start by printing the ID of the main thread using
Environment.CurrentManagedThreadId
to demonstrate that the main thread is separate from the thread created byTask.Run
. - Inside the
Task.Run
delegate, we simulate a CPU-bound operation (calculating square roots in a loop). This operation is offloaded to a separate thread pool thread. - We use
await
to asynchronously wait for the result of the CPU-bound operation, ensuring that the main thread remains responsive and doesn't get blocked. - Finally, we print the result of the CPU-bound operation after it completes.
By combining async/await
with Task.Run
, you can achieve the following:
- Offload CPU-bound work to separate threads, preventing the main thread from blocking.
- Maintain the responsiveness of your application.
- Simplify asynchronous code by using the familiar
async/await
syntax.
This combination is particularly useful in scenarios where you need to keep the user interface responsive while performing computationally intensive tasks asynchronously.
2.2 Task.WhenAll and Task.WhenAny
- Use of
Task.WhenAll
andTask.WhenAny
for parallelizing multiple tasks.
Using Task.WhenAll
for Parallel Execution
Task.WhenAll
is a TPL method that allows you to await multiple tasks concurrently. It returns a task that completes when all the input tasks have completed. This is useful when you have a collection of tasks and want to wait for all of them to finish. Here's an example of parallel downloads using Task.WhenAll
:
using System;
using System.Net.Http;
using System.Threading.Tasks;
using System.Collections.Generic;
public class WhenAllExample
{
public static async Task Main(string[] args)
{
List<Task<string>> downloadTasks = new List<Task<string>>
{
DownloadAsync("https://example.com/page1"),
DownloadAsync("https://example.com/page2"),
DownloadAsync("https://example.com/page3")
};
// Wait for all downloads to complete
string[] results = await Task.WhenAll(downloadTasks);
Console.WriteLine("Downloads completed:");
foreach (string result in results)
{
Console.WriteLine(result);
}
}
public static async Task<string> DownloadAsync(string url)
{
using (HttpClient client = new HttpClient())
{
return await client.GetStringAsync(url);
}
}
}
Explaination:
- We create a list of tasks (
downloadTasks
) that represent parallel downloads of three web pages. - We use
Task.WhenAll(downloadTasks)
to asynchronously wait for all download tasks to complete. This allows us to efficiently download multiple web pages concurrently. - After all downloads are complete, we process the results and print them to the console.
Using Task.WhenAny
for Fastest Completion
Task.WhenAny
is a TPL method that allows you to await the first task that completes among a collection of tasks. It returns a task that represents the first completed task. This is useful when you want to use the result of the task that finishes first. Here's an example of parallel API calls using Task.WhenAny
:
using System;
using System.Net.Http;
using System.Threading.Tasks;
using System.Collections.Generic;
public class WhenAnyExample
{
public static async Task Main(string[] args)
{
List<Task<string>> apiCallTasks = new List<Task<string>>
{
CallApiAsync("https://api.example.com/data1"),
CallApiAsync("https://api.example.com/data2"),
CallApiAsync("https://api.example.com/data3")
};
// Wait for the first API call to complete
Task<string> firstCompletedTask = await Task.WhenAny(apiCallTasks);
string result = await firstCompletedTask;
Console.WriteLine("First API call completed with result:");
Console.WriteLine(result);
}
public static async Task<string> CallApiAsync(string url)
{
using (HttpClient client = new HttpClient())
{
return await client.GetStringAsync(url);
}
}
}
Explaination:
- We create a list of tasks (
apiCallTasks
) that represent parallel API calls to different endpoints. - We use
Task.WhenAny(apiCallTasks)
to asynchronously wait for the first API call to complete. This allows us to retrieve data from the fastest responding API endpoint. - After the first API call is complete, we process and print the result to the console.
Both Task.WhenAll
and Task.WhenAny
are powerful tools for managing multiple asynchronous tasks concurrently. They can significantly improve the efficiency and responsiveness of applications that require parallel processing of data or interactions with external services.
2.3 Task.Factory and TaskCreationOptions
- Use
Task.Factory
class for advanced task creation.
Introducing Task.Factory for Advanced Task Creation
Task.Factory
is a class in the TPL that provides advanced options for creating and configuring tasks. While you can create tasks using the Task.Run
method or the Task
constructor, Task.Factory
offers more control over task creation and customization.
With Task.Factory
, you can:
- Set custom scheduling options.
- Specify child tasks and parent-child relationships.
- Configure exception handling and cancellation policies.
- Create tasks with specific options and state.
Using TaskCreationOptions for Configuring Tasks
TaskCreationOptions
is an enumeration that allows you to specify various options and behaviors when creating tasks. You can pass a combination of these options to the Task.Factory.StartNew
method to configure how the task should behave. Some common options include:
TaskCreationOptions.LongRunning
: Indicates that the task represents a long-running operation, and the TPL should create a dedicated thread for it. This can be useful for CPU-bound tasks.TaskCreationOptions.AttachedToParent
: Specifies that the new task should be attached to its parent task, so it's considered a child task. This is useful when you want to propagate exceptions or synchronization to the parent task.TaskCreationOptions.PreferFairness
: Suggests that the TPL scheduler should try to schedule the task fairly, which can be beneficial for preventing thread starvation.
Here’s an example that demonstrates the use of Task.Factory
and TaskCreationOptions
:
using System;
using System.Threading.Tasks;
public class TaskFactoryExample
{
public static void Main(string[] args)
{
// Creating a task with Task.Factory
Task.Factory.StartNew(() =>
{
Console.WriteLine("Task started.");
}, TaskCreationOptions.None);
// Creating a long-running task
Task.Factory.StartNew(() =>
{
Console.WriteLine("Long-running task started.");
// Simulate a CPU-bound operation
for (int i = 0; i < 1000000; i++)
{
Math.Sqrt(i);
}
}, TaskCreationOptions.LongRunning);
// Creating an attached child task
Task parentTask = Task.Factory.StartNew(() =>
{
Console.WriteLine("Parent task started.");
// Create an attached child task
Task.Factory.StartNew(() =>
{
Console.WriteLine("Child task started.");
}, TaskCreationOptions.AttachedToParent);
});
// Wait for the parent task to complete
parentTask.Wait();
Console.WriteLine("All tasks completed.");
}
}
Explaination:
- We use
Task.Factory.StartNew
to create tasks with different options. - The first task has no specific options (
TaskCreationOptions.None
). - The second task is marked as
LongRunning
, indicating that it represents a long-running operation. - The third task is a child task attached to the parent task using
AttachedToParent
.
By using Task.Factory
and TaskCreationOptions
, you have fine-grained control over how tasks are created and behave, allowing you to tailor task creation to your specific requirements and scenarios.
Section 3: Advanced Level
3.1 TaskSchedulers
- Advanced task scheduling using custom
TaskScheduler
implementations.
Custom TaskScheduler Implementations
A custom TaskScheduler
is a class that inherits from TaskScheduler
and overrides its methods to control how tasks are scheduled and executed. Some common methods to override include:
QueueTask
: This method is responsible for queuing a task for execution on the custom scheduler. You can implement custom logic to decide when and where tasks are executed.TryExecuteTaskInline
: This method allows you to execute a task synchronously on the current thread if it's compatible with the scheduler's policies.GetScheduledTasks
: This method returns an enumerable collection of tasks that are currently scheduled to be executed by the custom scheduler.
Scenarios for Custom Task Schedulers
Custom task schedulers can be beneficial in various scenarios, including:
- Thread Affinity: When you need to ensure that a specific task always runs on a particular thread, such as for UI updates or when interacting with legacy code that requires single-threaded access.
- Load Balancing: In scenarios where you have multiple custom threads or resources, you can implement load balancing logic to distribute tasks evenly across them.
- Resource Management: When tasks require access to limited resources, such as database connections or hardware devices, a custom scheduler can help manage and pool those resources efficiently.
- Priority Scheduling: If you need to prioritize certain tasks over others, a custom scheduler can be tailored to execute high-priority tasks first.
- Logging and Monitoring: Custom schedulers can be used to implement logging, monitoring, and instrumentation of task execution for debugging or performance analysis.
- Custom Policies: When you have unique scheduling policies or constraints specific to your application, a custom scheduler allows you to enforce those policies.
Example Scenario: Thread Affinity
Here’s a simplified example illustrating the use of a custom task scheduler for thread affinity:
using System;
using System.Threading.Tasks;
public class CustomTaskSchedulerExample
{
public static void Main(string[] args)
{
// Create a custom task scheduler with thread affinity
var scheduler = new ThreadAffinityTaskScheduler();
// Queue tasks to run on the custom scheduler
Task.Factory.StartNew(() => PrintThreadId("Task 1"), CancellationToken.None, TaskCreationOptions.None, scheduler);
Task.Factory.StartNew(() => PrintThreadId("Task 2"), CancellationToken.None, TaskCreationOptions.None, scheduler);
Console.ReadKey();
}
public static void PrintThreadId(string taskName)
{
Console.WriteLine($"{taskName} is running on Thread ID {Environment.CurrentManagedThreadId}");
}
}
public class ThreadAffinityTaskScheduler : TaskScheduler
{
protected override void QueueTask(Task task)
{
// For demonstration, execute the task on the current thread (thread affinity)
TryExecuteTask(task);
}
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
return false; // Don't execute inline
}
protected override IEnumerable<Task> GetScheduledTasks()
{
return Enumerable.Empty<Task>();
}
}
Explaination:
In this example, the ThreadAffinityTaskScheduler
custom scheduler ensures that tasks are executed on the same thread where they were queued, demonstrating thread affinity. In practice, more complex logic can be implemented based on specific requirements.
Custom task schedulers provide advanced control and flexibility in task execution, allowing you to tailor the scheduling behavior to the unique demands of your application. However, they should be used judiciously, as they introduce complexity and require careful consideration of thread safety and performance.
3.2 Parallel Patterns
- Advanced parallel patterns such as MapReduce, Producer-Consumer, and Dataflow.
In this section, we’ll explore advanced parallel patterns, including MapReduce, Producer-Consumer, and Dataflow. These patterns are essential for building efficient and scalable concurrent systems. We’ll provide code examples and real-world use cases for each pattern.
1. MapReduce Pattern
Description: The MapReduce pattern involves breaking down a large data processing task into smaller sub-tasks (mapping) and then aggregating the results (reducing). It’s commonly used for distributed data processing.
Code Example:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
public class MapReduceExample
{
public static async Task Main(string[] args)
{
List<int> data = Enumerable.Range(1, 1000).ToList();
// Map: Square each number
var mappedData = await Task.WhenAll(data.Select(async x => await SquareAsync(x)));
// Reduce: Sum all squared values
int sum = mappedData.Sum();
Console.WriteLine("MapReduce Result: " + sum);
}
public static async Task<int> SquareAsync(int number)
{
await Task.Delay(10); // Simulate async work
return number * number;
}
}
Real-World Use Case: MapReduce is widely used in distributed data processing frameworks like Apache Hadoop and Apache Spark for tasks like log analysis, data indexing, and large-scale data transformations.
2. Producer-Consumer Pattern
Description: The Producer-Consumer pattern involves two types of threads: producers that produce data and consumers that consume data. It’s used for efficient data sharing and synchronization between multiple threads.
Code Example:
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
public class ProducerConsumerExample
{
public static async Task Main(string[] args)
{
var buffer = new BlockingCollection<int>(boundedCapacity: 10);
var producer = Task.Run(() =>
{
for (int i = 1; i <= 20; i++)
{
buffer.Add(i);
Console.WriteLine($"Produced: {i}");
}
buffer.CompleteAdding();
});
var consumer = Task.Run(() =>
{
foreach (var item in buffer.GetConsumingEnumerable())
{
Console.WriteLine($"Consumed: {item}");
}
});
await Task.WhenAll(producer, consumer);
}
}
Real-World Use Case: Producer-Consumer is commonly used in scenarios like task scheduling, job processing queues, and parallel data processing pipelines.
3. Dataflow Pattern
Description: The Dataflow pattern is a model for message-passing concurrency, where data flows through a network of processing nodes (blocks). Each block performs a specific operation on the data and can be connected to other blocks, creating a dataflow network.
Code Example:
using System;
using System.Threading.Tasks.Dataflow;
public class DataflowExample
{
public static async Task Main(string[] args)
{
var multiplyBlock = new TransformBlock<int, int>(x => x * 2);
var addBlock = new TransformBlock<int, int>(x => x + 3);
var printBlock = new ActionBlock<int>(x => Console.WriteLine($"Result: {x}"));
multiplyBlock.LinkTo(addBlock);
addBlock.LinkTo(printBlock);
multiplyBlock.Post(5); // Start the dataflow
await printBlock.Completion;
}
}
Real-World Use Case: Dataflow patterns are employed in scenarios such as data processing pipelines, real-time stream processing, and concurrent data transformations.
These advanced parallel patterns are crucial for solving complex concurrency and parallelism challenges in various applications, from data processing to concurrent system design. Understanding these patterns and how to apply them can significantly improve the efficiency and scalability of your software.
3.3 Exception Handling Strategies
- Advanced exception handling strategies for tasks and parallel code.
In advanced parallel programming scenarios, handling exceptions effectively is crucial for building robust and reliable applications. Exception handling in tasks and parallel code can become more complex due to the possibility of multiple concurrent exceptions. In this section, we’ll discuss advanced exception handling strategies and demonstrate how to aggregate exceptions and gracefully handle errors.
1. Aggregating Exceptions
When running multiple tasks in parallel, it’s common to encounter exceptions from multiple tasks simultaneously. To handle these exceptions effectively, you can aggregate them into a single exception. .NET provides the AggregateException
class for this purpose.
Here’s an example of aggregating exceptions from multiple tasks:
using System;
using System.Threading.Tasks;
public class ExceptionHandlingExample
{
public static async Task Main(string[] args)
{
try
{
var task1 = Task.Run(() => ThrowException("Task 1"));
var task2 = Task.Run(() => ThrowException("Task 2"));
await Task.WhenAll(task1, task2);
}
catch (AggregateException ex)
{
foreach (var innerException in ex.InnerExceptions)
{
Console.WriteLine($"Caught exception: {innerException.Message}");
}
}
}
public static void ThrowException(string taskName)
{
throw new InvalidOperationException($"{taskName} encountered an error.");
}
}
In this example, both task1
and task2
throw exceptions, and these exceptions are aggregated into an AggregateException
when caught.
2. Handling Exceptions Gracefully
When dealing with parallel code, it’s essential to handle exceptions gracefully to prevent application crashes and ensure proper cleanup. You can use a try-catch
block within each task to catch and log exceptions while allowing other tasks to continue running. Additionally, you should mark the task as TaskStatus.Faulted
to signal that it has encountered an error.
Here’s an example of handling exceptions within tasks:
using System;
using System.Threading.Tasks;
public class ExceptionHandlingExample
{
public static async Task Main(string[] args)
{
var task1 = Task.Run(() => HandleException("Task 1"));
var task2 = Task.Run(() => HandleException("Task 2"));
await Task.WhenAll(task1, task2);
}
public static void HandleException(string taskName)
{
try
{
// Some operation that might throw an exception
throw new InvalidOperationException($"{taskName} encountered an error.");
}
catch (Exception ex)
{
Console.WriteLine($"Caught exception in {taskName}: {ex.Message}");
// Log or report the exception
}
}
}
In this example, each task handles exceptions internally, logs them, and continues executing. This approach allows other tasks to proceed even if some tasks encounter exceptions.
3. CancellationToken and TaskStatus
You can also use CancellationToken
to gracefully cancel tasks when exceptions occur or to signal cancellation based on external conditions. Additionally, you can check the TaskStatus
to determine if a task has completed successfully or encountered an exception.
Here’s an example using CancellationToken
and TaskStatus
:
using System;
using System.Threading;
using System.Threading.Tasks;
public class ExceptionHandlingExample
{
public static async Task Main(string[] args)
{
var cts = new CancellationTokenSource();
var task1 = Task.Run(() => HandleException("Task 1", cts.Token));
var task2 = Task.Run(() => HandleException("Task 2", cts.Token));
await Task.WhenAll(task1, task2);
if (task1.Status == TaskStatus.Faulted || task2.Status == TaskStatus.Faulted)
{
Console.WriteLine("At least one task encountered an exception.");
// Handle or report the error
}
}
public static void HandleException(string taskName, CancellationToken token)
{
try
{
// Simulate an operation
Thread.Sleep(1000);
// Check for cancellation request
token.ThrowIfCancellationRequested();
// Some operation that might throw an exception
throw new InvalidOperationException($"{taskName} encountered an error.");
}
catch (OperationCanceledException)
{
Console.WriteLine($"{taskName} was canceled.");
}
catch (Exception ex)
{
Console.WriteLine($"Caught exception in {taskName}: {ex.Message}");
// Log or report the exception
}
}
}
In this example, we use a CancellationToken
to signal cancellation if needed, and we check the TaskStatus
to identify any tasks that encountered exceptions.
These advanced exception handling strategies help you create robust parallel and concurrent code that gracefully handles errors, logs exceptions, and ensures that your application remains stable in the face of exceptions from multiple concurrent tasks.
Asynchronous Streams in C#
C# 8.0 introduced support for asynchronous streams, allowing you to work with sequences of data asynchronously. An asynchronous stream represents an asynchronous operation that yields a sequence of values over time. You can use the await foreach
construct to iterate over an asynchronous stream.
Here’s a simplified example of processing data from an asynchronous stream:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
public class AsyncStreamProcessingExample
{
public async Task Main()
{
await foreach (var dataPoint in GetDataStream())
{
// Process each data point asynchronously
await ProcessDataPointAsync(dataPoint);
}
}
public async IAsyncEnumerable<int> GetDataStream()
{
for (int i = 1; i <= 10; i++)
{
// Simulate fetching data asynchronously
await Task.Delay(100);
yield return i;
}
}
public async Task ProcessDataPointAsync(int dataPoint)
{
// Simulate asynchronous data processing
await Task.Delay(50);
Console.WriteLine($"Processed data point: {dataPoint}");
}
}
Explaination:
GetDataStream
is an asynchronous method that returns an asynchronous stream of integers. It uses theyield return
statement to produce data points over time.ProcessDataPointAsync
simulates asynchronous data processing for each data point.- In the
Main
method, we useawait foreach
to asynchronously iterate over the data stream and process each data point as it becomes available.
This pattern allows you to work with real-time data sources and process data as it arrives, making it suitable for applications that need to handle continuous data streams.
Real-World Use Cases
Asynchronous stream processing is commonly used in various real-world scenarios:
- IoT Data Processing: IoT devices generate a continuous stream of data. Asynchronous stream processing is ideal for ingesting, analyzing, and reacting to IoT data in real time.
- Log and Event Streaming: Processing logs and events from applications and systems can benefit from asynchronous stream processing, enabling real-time monitoring, alerting, and analysis.
- Financial Market Data: In the financial industry, real-time processing of market data streams is crucial for making timely trading decisions.
- Online Gaming: Online multiplayer games often require real-time processing of player actions, updates, and events.
- Social Media Analytics: Analyzing social media streams for trends, sentiments, and user interactions can benefit from asynchronous stream processing.
When dealing with asynchronous streams, it’s important to consider factors like backpressure handling, error handling, and scaling to handle high-throughput data sources efficiently. Reactive programming libraries like Rx can provide additional tools for handling complex asynchronous stream scenarios.
I have also created few enterprise courses On Micoservices and Docker. Below I have pasted the architecture diagrams and the links. Incase if you are intrested in doing parallel coding with me, join me there.

In order to know in detail about this clean architecture design, you can read my article here.
Course Links:
Free Course:- https://www.udemy.com/course/getting-started-with-clean-architecture-using-net-core
Creating .Net Core Microservices using Clean Architecture:- https://bit.ly/clean-architecture-net
Docker &Kubernetes for .Net and Angular Developers:- https://www.udemy.com/course/docker-for-net-and-angular-developers/?couponCode=LIMITED-OFFER
Github Link: https://github.com/rahulsahay19/eShopping
Thanks,
Happy Learning
Stackademic
Thank you for reading until the end. Before you go:
- Please consider clapping and following the writer! 👏
- Follow us on Twitter(X), LinkedIn, and YouTube.
- Visit Stackademic.com to find out more about how we are democratizing free programming education around the world.