你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

IReliableConcurrentQueue<T>.TryDequeueAsync 方法

定义

暂时将某个值从队列中取消排队。 如果队列为空,则取消排队操作将等待项变得可用。

public System.Threading.Tasks.Task<Microsoft.ServiceFabric.Data.ConditionalValue<T>> TryDequeueAsync(Microsoft.ServiceFabric.Data.ITransaction tx, System.Threading.CancellationToken cancellationToken = default, TimeSpan? timeout = default);
abstract member TryDequeueAsync : Microsoft.ServiceFabric.Data.ITransaction * System.Threading.CancellationToken * Nullable<TimeSpan> -> System.Threading.Tasks.Task<Microsoft.ServiceFabric.Data.ConditionalValue<'T>>
Public Function TryDequeueAsync (tx As ITransaction, Optional cancellationToken As CancellationToken = Nothing, Optional timeout As Nullable(Of TimeSpan) = Nothing) As Task(Of ConditionalValue(Of T))

参数

tx
ITransaction

要与此操作关联的事务。

cancellationToken
CancellationToken

要监视取消请求的标记。 默认为 None

timeout
Nullable<TimeSpan>

等待操作完成的时间量。 默认值为 NULL。 如果传递 null,将使用默认超时。

返回

表示异步取消排队操作的任务。 任务的结果是 T 类型的 ConditionalValue。如果某个值在给定时间内取消排队,则返回 HasValue 为 false 的 ConditionalValue,否则它将返回 HasValue 为 true 的 ConditionalValue,将 Value 作为 T 类型的取消排队项

例外

副本 (replica) 不再位于 中。

副本 (replica) 当前不可读。

副本 (replica) 出现暂时性故障。 对新事务重试操作

副本 (replica) 看到了除上面定义的类型以外的不可重试的故障。 清理并重新引发异常

操作无法在给定的超时时间内完成。 应中止事务,并创建一个新事务以重试。

tx 为 null。 请勿处理此异常。

操作已通过 cancellationToken取消。

事务在内部由系统出错。 对新事务重试操作

当方法调用对对象的当前状态无效时引发。 例如,使用的事务已终止:用户已提交或中止。 如果引发此异常,则很可能使用事务的服务代码中存在 bug。

示例

此示例演示如何取消排队,并在重试后无限次记录,直到取消令牌。

protected override async Task RunAsync(CancellationToken cancellationToken)
{
    var concurrentQueue = await this.StateManager.GetOrAddAsync<IReliableConcurrentQueue<long>>(new Uri("fabric:/concurrentQueue"));

    // Assumption: values are being enqueued by another source (e.g. the communication listener).
    while (true)
    {
        cancellationToken.ThrowIfCancellationRequested();

        try
        {
            using (var tx = this.StateManager.CreateTransaction())
            {
                var dequeueOutput = await concurrentQueue.TryDequeueAsync(tx, cancellationToken, TimeSpan.FromMilliseconds(100));
                await tx.CommitAsync();

                if (dequeueOutput.HasValue)
                {
                    Console.WriteLine("Dequeue # " + dequeueOutput);
                }
                else
                {
                    Console.WriteLine("Could not dequeue in the given time");
                }
            }
        }
        catch (TransactionFaultedException e)
        {
            // This indicates that the transaction was internally faulted by the system. One possible cause for this is that the transaction was long running
            // and blocked a checkpoint. Increasing the "ReliableStateManagerReplicatorSettings.CheckpointThresholdInMB" will help reduce the chances of running into this exception
            Console.WriteLine("Transaction was internally faulted, retrying the transaction: " + e);
        }
        catch (FabricNotPrimaryException e)
        {
            // Gracefully exit RunAsync as the new primary should have RunAsync invoked on it and continue work.
            // If instead dequeue was being executed as part of a client request, the client would be signaled to re-resolve.
            Console.WriteLine("Replica is not primary, exiting RunAsync: " + e);
            return;
        }
        catch (FabricNotReadableException e)
        {
            // Retry until the queue is readable or a different exception is thrown.
            Console.WriteLine("Queue is not readable, retrying the transaction: " + e);
        }
        catch (FabricObjectClosedException e)
        {
            // Gracefully exit RunAsync as this is happening due to replica close.
            // If instead dequeue was being executed as part of a client request, the client would be signaled to re-resolve.
            Console.WriteLine("Replica is closing, exiting RunAsync: " + e);
            return;
        }
        catch (TimeoutException e)
        {
            Console.WriteLine("Encountered TimeoutException during DequeueAsync, retrying the transaction: " + e);
        }
        catch (FabricTransientException e)
        {
            // Retry until the queue is writable or a different exception is thrown.
            Console.WriteLine("Queue is currently not writable, retrying the transaction: " + e);
        }

        // Delay and retry.
        await Task.Delay(TimeSpan.FromMilliseconds(100), cancellationToken);
    }
}

注解

虽然 TryDequeueAsync(ITransaction, CancellationToken, Nullable<TimeSpan>) 只能返回已提交相应 EnqueueAsync(ITransaction, T, CancellationToken, Nullable<TimeSpan>) 的值, TryDequeueAsync(ITransaction, CancellationToken, Nullable<TimeSpan>) 但操作不会相互隔离。 将某个值取消排队后,其他事务无法将其取消排队,但不会阻止其他值取消排队。

当包含一个或多个 TryDequeueAsync(ITransaction, CancellationToken, Nullable<TimeSpan>) 操作的事务中止时,取消排队的值将以任意顺序重新添加到队列的头部。 这将确保这些值很快再次取消排队,从而提高数据结构的公平性,但不强制实施严格的排序 (这需要降低允许的并发性,如 IReliableQueue<T>) 中所示。

适用于