System.Threading.Channels 多執行緒佇列 示範

今天上午在「ASP.NET Core 使用 BackgroundService 建立背景服務佇列」這篇文章下面有網友留言,補充可以使用 System.Threading.Channels 來達成,趕快來研究一下

System.Threading.Channels 是一個輕量、高效的通訊機制,使用 生產者-消費者 模式,在不同的執行緒中處理寫入和讀取,並且可以控制資料佇列上限。

安裝套件

在 .NET Core 3.0 (不含)以下的需要使用 NuGet 安裝 System.Threading.Channels 套件,或是使用 .NET CLI 執行以下指令安裝
	
dotnet add package System.Threading.Channels
    

而在 .NET Core 3.0 (含)以上的版本可以直接使用,不需要安裝。

基本使用示範

下面建立一個無限容量的 Channel ,在 WriteToChannel 中會持續寫入資料,而在 ReadFromChannel 中會將資料讀取並處理,中間加入 Task.Delay 模擬處理時間。 執行後會發現資料全部寫入後大約還有 7 個任務未處理,會依序慢慢將資料讀取出來處理,很簡單就達成「背景服務佇列」樣的功能。
    
public static void Main(string[] args)
{
    Console.WriteLine("Hello World!");

    var channel = Channel.CreateUnbounded<string>();

    var writerTask = WriteToChannel(channel.Writer);

    var readerTask = ReadFromChannel(channel.Reader);

    Task.WaitAll(writerTask, readerTask);
    channel.Writer.Complete();
}

public static async Task WriteToChannel(ChannelWriter<string> writer)
{
    for (int i = 0; i < 20; i++)
    {
        Console.WriteLine($"Writing message {i}");
        await writer.WriteAsync($"Message {i}");
        await Task.Delay(100); // 模擬處理時間
    }
}

public static async Task ReadFromChannel(ChannelReader<string> reader)
{
    while (await reader.WaitToReadAsync())
    {
        while (reader.TryRead(out var message))
        {
            Console.WriteLine(message);
            
            // 佇列中的數量
            Console.WriteLine($"Count: {reader.Count}");
            
            await Task.Delay(200); // 模擬處理時間
        }
    }
}
    

上面範例是建立無限容量的 channel ,不過如果資料處理速度不夠快,並且一直持續有新的資料進來,時間一長很可能塞滿全部記憶體空間,我們可以建立有限容量的 channel 來避開這個問題,當容量達到上限時就無法寫入 channel ,會持續的等待,直到有空間可以寫入。
    
// 無限容量
var channel = Channel.CreateUnbounded<string>();

// 有限容量/指定容量
var channel = Channel.CreateBounded<string>(5);
    

ASP.NET Core 使用示範

下面示範使用 System.Threading.Channels 達成和 ASP.NET Core 使用 BackgroundService 建立背景服務佇列 這篇同樣的功能

建立 EmailDto 類別,用來儲存寄送 Email 會用到的資訊:
    
public record EmailDto(string To, string Subject, string Body);
    

建立 BackgroundService ,從 Channel 中嘗試讀取 EmailDto ,有的話就會執行,這裡使用 Task.Delay 來模擬處理需要的時間:
    
public class EmailSenderBackgroundService : BackgroundService
{
    private readonly Channel<EmailDto> _channel;

    public EmailSenderBackgroundService(Channel<EmailDto> channel)
    {
        _channel = channel;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        // 等待並處理通道中的郵件寄送任務
        await foreach (var emailJob in _channel.Reader.ReadAllAsync(stoppingToken))
        {
            // 模擬郵件寄送過程
            Console.WriteLine($"Sending email to {emailJob.To}, subject: {emailJob.Subject}, body: {emailJob.Body}, time: {DateTime.Now:: HH:mm:ss}, Count: {_channel.Reader.Count}");
            await Task.Delay(500); // 模擬寄送郵件需要 500 毫秒
        }
    }
}
    

建立 EmailController,呼叫此 API 就會將 Email 資訊加入 Channel,等待寄送:
    
using System.Threading.Channels;
using Microsoft.AspNetCore.Mvc;

    
[ApiController]
[Route("[controller]")]
public class EmailController : ControllerBase
{
    private readonly Channel<EmailDto> _channel;

    public EmailController(Channel<EmailDto> channel)
    {
        _channel = channel;
    }

    [HttpPost]
    public async Task<IActionResult> Post([FromBody] EmailDto dto)
    {
        await _channel.Writer.WriteAsync(dto);
        return Ok();
    }
}
    

在 Program.cs 中註冊服務:
    
using System.Threading.Channels;


var builder = WebApplication.CreateBuilder(args);

builder.Services.AddSingleton<Channel<EmailDto>>(Channel.CreateUnbounded<EmailDto>());
builder.Services.AddHostedService<EmailSenderBackgroundService>();

var app = builder.Build();

    

完成!

參考資料:
Microsoft.Learn - System.Threading.Channels library

留言

張貼留言

如果有任何問題、建議、想說的話或文章題目推薦,都歡迎留言或來信: a@ruyut.com