推荐用 Channel 实现线程安全异步生产者消费者队列,它无锁、轻量、原生支持 async/await;避免误用 ConcurrentQueue 或 BlockingCollection 的同步阻塞操作。
Channel 实现线程安全的异步生产者消费者队列直接上结论:C# 6+(.NET Core 2.1+)推荐用 System.Threading.Channels.Channel,它专为高并发异步场景设计,比手写 BlockingCollection + Task 组合更轻量、无锁、原生支持 async/await。
常见错误是试图用 ConcurrentQueue 自己封装 awaitable 操作——它本身不提供异步等待能力,强行加 Task.Delay 或轮询会浪费 CPU;也有人误用 BlockingCollection 的 Take(),它会同步阻塞线程,破坏 async 上下文。
Channel.CreateBounded() 创建有界通道,溢出时可配置拒绝策略(如 DropWrite 或抛异常)Channel.CreateUnbounded() 适合写入吞吐优先、内存可控的场景Writer.WriteAsync(item),消费端用 Reader.ReadAsync() —— 两者都返回 ValueTask,无分配开销Reader.ReadAsync() 会完成并返回 default(T),需配合 TryRead 或检查 WaitToReadAsync().IsCompletedSuccessfully
var channel = Channel.CreateBounded(100); // 生产者 _ = Task.Run(async () => { for (int i = 0; i < 5; i++) { await channel.Writer.WriteAsync($"msg-{i}"); await Task.Delay(100); } channel.Writer.Complete(); });
// 消费者 _ = Task.Run(async () => { await foreach (var msg in channel.Reader.ReadAllAsync()) { Console.WriteLine($"Received: {msg}"); } });
消费者任务崩溃或未捕获异常会导致 ReadAllAsync() 提前退出,但通道可能仍有未读项;更隐蔽的问题是:生产者调用 Complete() 后,若消费者没读完就退出,剩余数据会丢失。
await foreach 外层包 try/catch,否则异常会静默终止迭代channel.Writer.Complete() 触发消费者退出——应另设信号(如 CancellationToken)协调停机Reader.Completion 是 Task,反映消费者侧是否完成(包括异常终止),可用于 await channel.Reader.Completion 等待消费结束await channel.Reader.Completion —— 它不会等未读项,只等迭代器退出Channel?不是所有“队列”需求都适合 Channel。它定位是“流式数据传输”,不提供随机访问、计数查询、中间件插拔等能力。
Channel.Reader.Count 只在有界通道且未被并发写入时可靠;无界通道返回 -1Channel 是单消费者语义;此时该用 ServiceBus 或 RabbitMQ
Channel 纯内存,崩溃即丢;必须外接存储层Channel 不可用,只能降级用 BlockingCollection + GetConsumingEnumerable()(但无法真正异步)BlockingCollection 的异步包装陷阱有人用 Task.Run(() => collection.Take()) 伪异步,这本质是线程池抢占,增加调度开销且无法取消;正确做法是仅在必须兼容旧框架时,用 TryTake 配合短时 Task.Delay 循环,但务必设超时和取消令牌。
await Task.Run(() => collection.Take()) —— 这违背 async/await 减少线程占用的初衷BlockingCollection,消费循环应类似:while (collection.TryTake(out var item, 10, token)) { ... }
BlockingCollection 的 Add 在有界模式下可能阻塞线程,而 Channel.Writer.WriteAsync 在满时默认返回 ValueTask 并挂起,更可控实际落地时,最易忽略的是消费者异常传播路径和通道生命周期管理——写个 await foreach 很容易,但谁负责捕获异常?谁决定何时 Complete()?这些边界不厘清,上线后就会出现消息静默丢失或消费者假死。