Orleans事务

Orleans支持针对持久 grain 状态的分布式 ACID 事务。 事务使用Microsoft.Orleans.Transactions NuGet 包实现。 本文中示例应用的源代码由四个项目组成:

  • 抽象:包含 grain 接口和共享类的类库。
  • grain:包含 grain 实现的类库。
  • 服务器:使用抽象和 grain 类库并充当Orleans silo 的控制台应用。
  • 客户端:使用表示Orleans客户端的抽象类库的 控制台应用。

设置

Orleans事务是选用的。 silo 和客户端必须配置为使用事务。 如果未配置它们,对 grain 实现上事务方法的任何调用都将接收OrleansTransactionsDisabledException。 要在 silo 上启用事务,请在 silo 主机生成器上调用SiloBuilderExtensions.UseTransactions

var builder = Host.CreateDefaultBuilder(args)
    UseOrleans((context, siloBuilder) =>
    {
        siloBuilder.UseTransactions();
    });

同样,要在客户端上启用事务,请在客户端主机生成器上调用ClientBuilderExtensions.UseTransactions

var builder = Host.CreateDefaultBuilder(args)
    UseOrleansClient((context, clientBuilder) =>
    {
        clientBuilder.UseTransactions();
    });

事务状态存储

要使用事务,需要配置数据存储。 为了支持具有事务的各种数据存储,使用了存储抽象ITransactionalStateStorage<TState>。 此抽象特定于事务的需求,与常规的 grain 存储 (IGrainStorage) 不同。 要使用特定于事务的存储,可使用ITransactionalStateStorage的任何实现(例如 Azure (AddAzureTableTransactionalStateStorage))配置 silo。

例如,请考虑以下主机生成器配置:

await Host.CreateDefaultBuilder(args)
    .UseOrleans((_, silo) =>
    {
        silo.UseLocalhostClustering();

        if (Environment.GetEnvironmentVariable(
                "ORLEANS_STORAGE_CONNECTION_STRING") is { } connectionString)
        {
            silo.AddAzureTableTransactionalStateStorage(
                "TransactionStore", 
                options => options.ConfigureTableServiceClient(connectionString));
        }
        else
        {
            silo.AddMemoryGrainStorageAsDefault();
        }

        silo.UseTransactions();
    })
    .RunConsoleAsync();

对于开发,如果特定于事务的存储不可用于所需的数据存储,可以改用IGrainStorage实现。 对于未为其配置存储的任何事务状态,事务将尝试使用桥故障转移到 grain 存储。 通过到 grain 存储的桥访问事务状态效率较低,未来可能不受支持。 因此,建议仅将它用于开发目的。

Grain 接口

要使 grain 支持事务,必须使用TransactionAttribute属性将 grain 接口上的事务方法标记为事务的一部分。 该属性需要指示 grain 调用在事务环境中的行为方式,具体为以下TransactionOption值:

  • TransactionOption.Create:调用是事务性的,始终会新建事务上下文(即,它会启动新事务),即使在现有事务上下文中调用也是如此。
  • TransactionOption.Join:调用是事务性的,但只能在现有事务的上下文中调用。
  • TransactionOption.CreateOrJoin:调用是事务性的。 如果在事务的上下文中调用,则它将使用该上下文;否则它将创建新的上下文。
  • TransactionOption.Suppress:调用不是事务性的,但可以从事务中调用。 如果在事务的上下文中调用,则不会将上下文传递给调用。
  • TransactionOption.Supported:调用不是事务性的,但支持事务。 如果在事务的上下文中调用,则会将上下文传递给调用。
  • TransactionOption.NotAllowed:调用不是事务性的,无法从事务中调用。 如果在事务上下文中调用,它将引发NotSupportedException

调用可以标记为TransactionOption.Create,意味着调用将始终启动其事务。 例如,以下 ATM grain 中的Transfer操作始终启动涉及两个被引用帐户的新事务。

namespace TransactionalExample.Abstractions;

public interface IAtmGrain : IGrainWithIntegerKey
{
    [Transaction(TransactionOption.Create)]
    Task Transfer(string fromId, string toId, decimal amountToTransfer);
}

帐户 grain 上的事务操作WithdrawDeposit标记为TransactionOption.Join,指示它们只能在现有事务的上下文中调用,如果在IAtmGrain.Transfer期间调用它们,会发生这种情况。 GetBalance 调用标记为 CreateOrJoin,因此可以从现有事务中调用它(例如通过 IAtmGrain.Transfer 调用),也可以单独调用它。

namespace TransactionalExample.Abstractions;

public interface IAccountGrain : IGrainWithStringKey
{
    [Transaction(TransactionOption.Join)]
    Task Withdraw(decimal amount);

    [Transaction(TransactionOption.Join)]
    Task Deposit(decimal amount);

    [Transaction(TransactionOption.CreateOrJoin)]
    Task<decimal> GetBalance();
}

重要注意事项

不能将OnActivateAsync标记为事务性,因为任何此类调用都需要在调用之前正确设置。 它只适用于 grain 应用程序 API。 这意味着,尝试将事务状态作为这些方法的一部分读取将在运行时引发异常。

粒度实现

grain 实现需要使用ITransactionalState<TState> facet 通过ACID 事务来管理 grain 状态。

public interface ITransactionalState<TState>
    where TState : class, new()
{
    Task<TResult> PerformRead<TResult>(
        Func<TState, TResult> readFunction);

    Task<TResult> PerformUpdate<TResult>(
        Func<TState, TResult> updateFunction);
}

对持久状态的所有读取或写入访问都必须通过传递给事务状态 facet 的同步函数来执行。 这样,事务系统便能以事务方式执行或取消这些操作。 要在 grain 中使用事务状态,可定义要持久保存的可序列化状态类,并在 grain 的构造函数中使用TransactionalStateAttribute声明事务状态。 后者声明状态名称和(可选)要使用的事务状态存储。 有关详细信息,请参阅设置

[AttributeUsage(AttributeTargets.Parameter)]
public class TransactionalStateAttribute : Attribute
{
    public TransactionalStateAttribute(string stateName, string storageName = null)
    {
        // ...
    }
}

例如,Balance状态对象的定义如下:

namespace TransactionalExample.Abstractions;

[GenerateSerializer]
public record class Balance
{
    [Id(0)]
    public decimal Value { get; set; } = 1_000;
}

前面的状态对象:

  • 使用GenerateSerializerAttribute进行修饰,以指示Orleans代码生成器生成序列化程序。
  • 具有Value属性,该属性使用IdAttribute进行修饰以唯一识别成员。

然后,Balance状态对象用于AccountGrain实现,如下所示:

namespace TransactionalExample.Grains;

[Reentrant]
public class AccountGrain : Grain, IAccountGrain
{
    private readonly ITransactionalState<Balance> _balance;

    public AccountGrain(
        [TransactionalState(nameof(balance))]
        ITransactionalState<Balance> balance) =>
        _balance = balance ?? throw new ArgumentNullException(nameof(balance));

    public Task Deposit(decimal amount) =>
        _balance.PerformUpdate(
            balance => balance.Value += amount);

    public Task Withdraw(decimal amount) =>
        _balance.PerformUpdate(balance =>
        {
            if (balance.Value < amount)
            {
                throw new InvalidOperationException(
                    $"Withdrawing {amount} credits from account " +
                    $"\"{this.GetPrimaryKeyString()}\" would overdraw it." +
                    $" This account has {balance.Value} credits.");
            }

            balance.Value -= amount;
        });

    public Task<decimal> GetBalance() =>
        _balance.PerformRead(balance => balance.Value);
}

重要

必须使用ReentrantAttribute标记事务 grain,以确保事务上下文正确传递到 grain 调用。

在前面的示例中,TransactionalStateAttribute用于声明balance构造函数参数应与名为"balance"的事务状态相关联。 使用此声明,Orleans将注入ITransactionalState<TState>实例,该实例具有从名为"TransactionStore"的事务状态存储加载的状态。 可以通过 PerformUpdate 修改或者通过 PerformRead 读取状态。 事务基础结构将确保作为事务的一部分执行的任何此类更改,即使是分布在Orleans群集上的多个 grain,也会在完成创建事务的 grain 调用(前面示例中的IAtmGrain.Transfer)后全部提交或全部撤消。

从客户端调用事务方法

调用事务 grain 方法的推荐方式是使用ITransactionClient。 配置Orleans客户端时,会自动向依赖项注入服务提供程序注册ITransactionClientITransactionClient用于创建事务上下文并调用该上下文中的事务 grain 方法。 以下示例演示了如何使用ITransactionClient调用事务 grain 方法。

using IHost host = Host.CreateDefaultBuilder(args)
    .UseOrleansClient((_, client) =>
    {
        client.UseLocalhostClustering()
            .UseTransactions();
    })
    .Build();

await host.StartAsync();

var client = host.Services.GetRequiredService<IClusterClient>();
var transactionClient= host.Services.GetRequiredService<ITransactionClient>();

var accountNames = new[] { "Xaawo", "Pasqualino", "Derick", "Ida", "Stacy", "Xiao" };
var random = Random.Shared;

while (!Console.KeyAvailable)
{
    // Choose some random accounts to exchange money
    var fromIndex = random.Next(accountNames.Length);
    var toIndex = random.Next(accountNames.Length);
    while (toIndex == fromIndex)
    {
        // Avoid transferring to/from the same account, since it would be meaningless
        toIndex = (toIndex + 1) % accountNames.Length;
    }

    var fromKey = accountNames[fromIndex];
    var toKey = accountNames[toIndex];
    var fromAccount = client.GetGrain<IAccountGrain>(fromKey);
    var toAccount = client.GetGrain<IAccountGrain>(toKey);

    // Perform the transfer and query the results
    try
    {
        var transferAmount = random.Next(200);

        await transactionClient.RunTransaction(
            TransactionOption.Create, 
            async () =>
            {
                await fromAccount.Withdraw(transferAmount);
                await toAccount.Deposit(transferAmount);
            });

        var fromBalance = await fromAccount.GetBalance();
        var toBalance = await toAccount.GetBalance();

        Console.WriteLine(
            $"We transferred {transferAmount} credits from {fromKey} to " +
            $"{toKey}.\n{fromKey} balance: {fromBalance}\n{toKey} balance: {toBalance}\n");
    }
    catch (Exception exception)
    {
        Console.WriteLine(
            $"Error transferring credits from " +
            $"{fromKey} to {toKey}: {exception.Message}");

        if (exception.InnerException is { } inner)
        {
            Console.WriteLine($"\tInnerException: {inner.Message}\n");
        }

        Console.WriteLine();
    }

    // Sleep and run again
    await Task.Delay(TimeSpan.FromMilliseconds(200));
}

在前面的客户端代码中:

  • 使用UseOrleansClient配置IHostBuilder
    • IClientBuilder使用 localhost 聚类分析和事务。
  • IClusterClientITransactionClient接口从服务提供程序中检索。
  • fromto变量分配其IAccountGrain引用。
  • ITransactionClient用于创建事务,调用:
    • from帐户 grain 引用上的Withdraw
    • to帐户 grain 引用上的Deposit

事务始终提交,除非指定的transactionDelegate中引发异常或存在自相矛盾项transactionOption。 虽然调用事务 grain 方法的推荐方式是使用ITransactionClient,但也可以直接从另一个 grain 调用事务性 grain 方法。

从另一个 grain 调用事务方法

grain 接口上的事务方法的调用方式如同任何其他 grain 方法。 作为使用ITransactionClient的替代方法,下面的AtmGrain实现在IAccountGrain接口上调用Transfer方法(事务性)。

请考虑AtmGrain实现,它会解析两个引用的帐户 grain,并发出对WithdrawDeposit的相应调用:

namespace TransactionalExample.Grains;

[StatelessWorker]
public class AtmGrain : Grain, IAtmGrain
{
    public Task Transfer(
        string fromId,
        string toId,
        decimal amount) =>
        Task.WhenAll(
            GrainFactory.GetGrain<IAccountGrain>(fromId).Withdraw(amount),
            GrainFactory.GetGrain<IAccountGrain>(toId).Deposit(amount));
}

客户端应用代码可以按事务方式调用AtmGrain.Transfer,如下所示:

IAtmGrain atmOne = client.GetGrain<IAtmGrain>(0);

Guid from = Guid.NewGuid();
Guid to = Guid.NewGuid();

await atmOne.Transfer(from, to, 100);

uint fromBalance = await client.GetGrain<IAccountGrain>(from).GetBalance();
uint toBalance = await client.GetGrain<IAccountGrain>(to).GetBalance();

在前面的调用中,IAtmGrain用于将 100 个货币单位从一个帐户转帐到另一个帐户。 转帐完成后,将查询这两个帐户以获取当前余额。 货币转帐以及两个帐户查询都作为 ACID 事务执行。

如前面的示例所示,事务可以像其他 grain 调用一样在 Task内返回值。 但在调用失败时,它们不会引发应用程序异常,而是引发OrleansTransactionExceptionTimeoutException。 如果应用程序在事务期间引发异常并且该异常导致事务失败(而不是由于其他系统故障而失败),则应用程序异常将是 OrleansTransactionException 的内部异常。

如果引发 OrleansTransactionAbortedException 类型的事务异常,则表示事务失败并可重试。 引发的任何其他异常都表示事务以未知状态终止。 由于事务是分布式操作,因此处于未知状态的事务可能已成功、失败或仍在进行中。 出于此原因,在验证状态或重试操作之前,建议允许调用超时期限 (SiloMessagingOptions.SystemResponseTimeout) 完全消逝,以避免发生级联式中止。