跳转至

C# 实现“单飞阀门”:用 TaskCompletionSource 去重并发调用

在一个 C# 项目中,我遇到这样一个问题:某个函数会被不同线程在短时间内同时调用,它内部是一次耗时的网络请求。并发触发会导致重复请求,既浪费带宽又增加后端压力。我的目标是:在并发场景下,如果当前正在执行同一路径/同一类更新,其它并发调用应等待它完成并复用结果。

思路:单飞阀门(single-flight)

“单飞阀门”的核心是去重:在同一时间窗口内,只允许一个“真正的执行者”跑更新逻辑,其它调用全部等待这次执行完成并获得同一个结果。Go 生态把这种模式称为 single-flight;在 C# 中可用 Task/TaskCompletionSource 与原子操作轻松实现。

我的实现:全局单飞阀门(TCS + Interlocked)

下面是我在业务中采用的实现。要点:

  • 使用一个 TaskCompletionSource? 字段作为“闸门”。
  • 快路径:如果已有任务在执行,直接 await 等待并返回。
  • 竞争阶段:用 Interlocked.CompareExchange 原子地设置 TCS,只有一个赢家负责真正的更新。
  • 完成后 TrySetResult,并最终把字段置回 null,允许后续再次触发。
  • 根据业务决定是否把异常传播给等待方;我的场景选择记录日志并把结果视为 false。
/// <summary>
/// 1) 在类中增加一个 TaskCompletionSource<bool>? 字段作为“单飞(single-flight)”闸门。
///    - 任意时刻仅允许一个更新任务在执行。
///    - 其他并发调用发现已有任务在执行时,直接等待该任务完成后返回,不再重复执行更新逻辑。
/// 2) 在 UpdateResourceNodesAsync(IReadOnlyList<string>...) 方法中:
///    - 快速读取 _singleFlightUpdateTcs,若存在则 await 其 Task 并返回。
///    - 若不存在,则用 Interlocked.CompareExchange 原子地设置为新的 TCS,只有一个调用者能成功成为“赢家”。
///    - 赢家执行 UpdateResourceAvailabilityAsync;完成后 TrySetResult(true)。
///    - 发生异常或取消时记录日志并 TrySetResult(false),等待方不抛异常(本文示例语义)。
///    - 最后将 _singleFlightUpdateTcs 置回 null,使后续调用可再次触发更新。
/// 3) 等待方只需 await 该 TCS 的 Task,视为已完成一次更新,无需重复执行。
/// </summary>
private TaskCompletionSource<bool>? _singleFlightUpdateTcs;

/// <summary>
/// 更新业务资源节点状态
/// </summary>
private async Task UpdateResourceNodesAsync(IReadOnlyList<string> resourceKeys, CancellationToken token)
{
    // 快路径:已有更新在执行,等待其完成后直接返回(不抛异常)
    var existing = Volatile.Read(ref _singleFlightUpdateTcs);
    if (existing != null)
    {
        try
        {
            await existing.Task.ConfigureAwait(false);
        }
        catch
        {
            // 按当前需求吞掉异常即可
        }
        return;
    }

    // 竞争成为唯一执行者
    var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
    var winner = Interlocked.CompareExchange(ref _singleFlightUpdateTcs, tcs, null);
    if (winner != null)
    {
        // 已有执行者,等待其完成(不抛异常)
        try
        {
            await winner.Task.ConfigureAwait(false);
        }
        catch
        {
            // 按当前需求吞掉异常即可
        }
        return;
    }

    try
    {
        // 只有赢家会实际执行更新
        await UpdateResourceAvailabilityAsync(resourceKeys, token).ConfigureAwait(false);

        // 通知等待者:完成(成功)
        tcs.TrySetResult(true);
    }
    catch (OperationCanceledException)
    {
        // 取消也视为一次“完成”但未成功的更新;不抛异常到外部
        Log.Message("[ResourceUpdater] 更新被取消。");
        tcs.TrySetResult(false);
    }
    catch (Exception ex)
    {
        // 失败亦不通过异常对外通知,仅记录日志
        Log.Error("[ResourceUpdater] 更新节点及可用状态失败。", ex);
        tcs.TrySetResult(false);
    }
    finally
    {
        // 释放闸门,允许后续调用再次触发
        Interlocked.CompareExchange(ref _singleFlightUpdateTcs, null, tcs);
    }
}

关键点小结

  • 原子性:Interlocked.CompareExchange 确保只有一个赢家设置闸门。
  • 不阻塞线程池:通过 Task + await 协调,而非锁 + 阻塞等待。
  • 继续选项:TaskCreationOptions.RunContinuationsAsynchronously 避免在 TrySetResult 调用栈内同步执行延续导致栈爆或死锁。
  • 可观测性:保留日志,便于定位谁成为赢家、执行耗时与异常。

进阶:按 Key 的“单飞”通用封装(可复用)

如果你的“路径”是具体的资源 Key(如 URL、文件路径、设备 Id),建议按 Key 去重——相同 Key 合并,不同 Key 并行:

using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

public sealed class SingleFlight<TKey, TResult> where TKey : notnull
{
    // 使用 Lazy<Task<TResult>> 确保真正执行仅发生一次,避免 GetOrAdd 工厂被并发多次调用导致的“幽灵任务”。
    private readonly ConcurrentDictionary<TKey, Lazy<Task<TResult>>> _inflight = new();

    public Task<TResult> DoAsync(TKey key, Func<CancellationToken, Task<TResult>> action, CancellationToken ct = default)
    {
        var lazy = _inflight.GetOrAdd(
            key,
            static (_, state) => new Lazy<Task<TResult>>(
                () => state.action(state.ct),
                LazyThreadSafetyMode.ExecutionAndPublication),
            (action: action, ct));

        return AwaitAndCleanupAsync(key, lazy);
    }

    private async Task<TResult> AwaitAndCleanupAsync(TKey key, Lazy<Task<TResult>> lazy)
    {
        try
        {
            return await lazy.Value.ConfigureAwait(false);
        }
        finally
        {
            // 仅当字典中的值仍是当前 lazy 时才移除,避免误删后来者
            _inflight.TryRemove(new System.Collections.Generic.KeyValuePair<TKey, Lazy<Task<TResult>>>(key, lazy));
        }
    }
}

// 用法示意
// var sf = new SingleFlight<string, HttpResponseMessage>();
// var resp = await sf.DoAsync(url, ct => httpClient.GetAsync(url, ct), ct);

特点(修正后):

  • 所有等待者共享同一个 Task,异常会一致地传播;无需手工 TrySetException。
  • 借助 Lazy 确保真正执行仅发生一次,避免 ConcurrentDictionary 工厂在竞争下被多次调用导致的重复请求。
  • 任务完成后原子移除 inflight,下一次允许重新触发;并避免误删新一轮的 lazy。
  • 线程安全、无锁等待;可方便接入你的现有异步代码。

边界与实践建议

  • 结果缓存:单飞只“合并”同一时刻的并发,不替代结果缓存。若需要在时间窗口内复用结果,可叠加 MemoryCache/IMemoryCache。
  • 超时与取消:外部 CancellationToken 应参与执行体;避免赢家取消导致永远卡住(上文的 finally 会清理字典/闸门)。
  • 结果传播策略:是否吞掉异常取决于业务。若等待方需要感知失败,用 TrySetException 或让异常自然从共享 Task 传播。
  • 热点抖动:若更新很频繁,可加入“冷却时间”或节流(Throttle/Debounce),避免连续重复触发。

补充:等待方取消、TTL 兜底与上下文注意事项

  • 等待方取消:如果等待方不想一直等,可用 WhenAny 提前返回;不影响正在进行的共享执行。

    // existing 为单飞返回的共享任务
    using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(3));
    var done = await Task.WhenAny(existing, Task.Delay(Timeout.InfiniteTimeSpan, cts.Token));
    if (done != existing)
    {
            // 超时/取消,选择放弃等待;可稍后再次调用触发或复用新的共享任务
            return;
    }
    await existing; // 确保观察异常并完成
    
  • TTL 兜底:若担心底层 action 卡死导致条目长驻(按文中实现,finally 才清理),可在执行体内部添加超时;或为字典条目叠加监控/清理机制。

    // 例:在 action 内部包一层超时
    async Task<TResult> WithTimeoutAsync<TResult>(Func<CancellationToken, Task<TResult>> inner, TimeSpan timeout, CancellationToken ct)
    {
            using var linked = CancellationTokenSource.CreateLinkedTokenSource(ct);
            linked.CancelAfter(timeout);
            return await inner(linked.Token).ConfigureAwait(false);
    }
    
  • 同步上下文:示例中对 await 均使用 ConfigureAwait(false),避免在库代码中意外切回捕获的上下文导致死锁或切换开销(UI/ASP.NET 旧版场景尤需注意)。

  • 工具选择:

    • 全局阀门(TCS)适合“整批更新”类的单通道任务。
    • 按 Key single-flight(ConcurrentDictionary + Lazy)适合多资源并行且同 Key 去重的场景。

小结

并发下的重复调用是常见且隐蔽的性能/稳定性问题。“单飞阀门”提供了简单有效的去重思路:让一个赢家跑,其它等待者复用同一结果。基于 TaskCompletionSource 与原子交换可以轻松实现;当需要按 Key 合并时,使用 ConcurrentDictionary + Task 的模式更自然、可复用、异常语义更清晰。