Orleans事务
Orleans支持针对持久 grain 状态的分布式 ACID 事务。 事务使用Microsoft.Orleans.Transactions NuGet 包实现。 本文中示例应用的源代码由四个项目组成:
- 抽象:包含 grain 接口和共享类的类库。
- grain:包含 grain 实现的类库。
- 服务器:使用抽象和 grain 类库并充当Orleans silo 的控制台应用。
- 客户端:使用表示Orleans客户端的抽象类库的 控制台应用。
设置
Orleans事务是选用的。 silo 和客户端必须配置为使用事务。 如果未配置它们,对 grain 实现上事务方法的任何调用都将接收OrleansTransactionsDisabledException。 要在 silo 上启用事务,请在 silo 主机生成器上调用SiloBuilderExtensions.UseTransactions:
var builder = Host.CreateDefaultBuilder(args)
UseOrleans((context, siloBuilder) =>
{
siloBuilder.UseTransactions();
});
同样,要在客户端上启用事务,请在客户端主机生成器上调用ClientBuilderExtensions.UseTransactions :
var builder = Host.CreateDefaultBuilder(args)
UseOrleansClient((context, clientBuilder) =>
{
clientBuilder.UseTransactions();
});
事务状态存储
要使用事务,需要配置数据存储。 为了支持具有事务的各种数据存储,使用了存储抽象ITransactionalStateStorage<TState>。 此抽象特定于事务的需求,与常规的 grain 存储 (IGrainStorage) 不同。 要使用特定于事务的存储,可使用ITransactionalStateStorage
的任何实现(例如 Azure (AddAzureTableTransactionalStateStorage))配置 silo。
例如,请考虑以下主机生成器配置:
await Host.CreateDefaultBuilder(args)
.UseOrleans((_, silo) =>
{
silo.UseLocalhostClustering();
if (Environment.GetEnvironmentVariable(
"ORLEANS_STORAGE_CONNECTION_STRING") is { } connectionString)
{
silo.AddAzureTableTransactionalStateStorage(
"TransactionStore",
options => options.ConfigureTableServiceClient(connectionString));
}
else
{
silo.AddMemoryGrainStorageAsDefault();
}
silo.UseTransactions();
})
.RunConsoleAsync();
对于开发,如果特定于事务的存储不可用于所需的数据存储,可以改用IGrainStorage
实现。 对于未为其配置存储的任何事务状态,事务将尝试使用桥故障转移到 grain 存储。 通过到 grain 存储的桥访问事务状态效率较低,未来可能不受支持。 因此,建议仅将它用于开发目的。
Grain 接口
要使 grain 支持事务,必须使用TransactionAttribute属性将 grain 接口上的事务方法标记为事务的一部分。 该属性需要指示 grain 调用在事务环境中的行为方式,具体为以下TransactionOption值:
- TransactionOption.Create:调用是事务性的,始终会新建事务上下文(即,它会启动新事务),即使在现有事务上下文中调用也是如此。
- TransactionOption.Join:调用是事务性的,但只能在现有事务的上下文中调用。
- TransactionOption.CreateOrJoin:调用是事务性的。 如果在事务的上下文中调用,则它将使用该上下文;否则它将创建新的上下文。
- TransactionOption.Suppress:调用不是事务性的,但可以从事务中调用。 如果在事务的上下文中调用,则不会将上下文传递给调用。
- TransactionOption.Supported:调用不是事务性的,但支持事务。 如果在事务的上下文中调用,则会将上下文传递给调用。
- TransactionOption.NotAllowed:调用不是事务性的,无法从事务中调用。 如果在事务上下文中调用,它将引发NotSupportedException。
调用可以标记为TransactionOption.Create
,意味着调用将始终启动其事务。 例如,以下 ATM grain 中的Transfer
操作始终启动涉及两个被引用帐户的新事务。
namespace TransactionalExample.Abstractions;
public interface IAtmGrain : IGrainWithIntegerKey
{
[Transaction(TransactionOption.Create)]
Task Transfer(string fromId, string toId, decimal amountToTransfer);
}
帐户 grain 上的事务操作Withdraw
和Deposit
标记为TransactionOption.Join
,指示它们只能在现有事务的上下文中调用,如果在IAtmGrain.Transfer
期间调用它们,会发生这种情况。 GetBalance
调用标记为 CreateOrJoin
,因此可以从现有事务中调用它(例如通过 IAtmGrain.Transfer
调用),也可以单独调用它。
namespace TransactionalExample.Abstractions;
public interface IAccountGrain : IGrainWithStringKey
{
[Transaction(TransactionOption.Join)]
Task Withdraw(decimal amount);
[Transaction(TransactionOption.Join)]
Task Deposit(decimal amount);
[Transaction(TransactionOption.CreateOrJoin)]
Task<decimal> GetBalance();
}
重要注意事项
不能将OnActivateAsync
标记为事务性,因为任何此类调用都需要在调用之前正确设置。 它只适用于 grain 应用程序 API。 这意味着,尝试将事务状态作为这些方法的一部分读取将在运行时引发异常。
粒度实现
grain 实现需要使用ITransactionalState<TState> facet 通过ACID 事务来管理 grain 状态。
public interface ITransactionalState<TState>
where TState : class, new()
{
Task<TResult> PerformRead<TResult>(
Func<TState, TResult> readFunction);
Task<TResult> PerformUpdate<TResult>(
Func<TState, TResult> updateFunction);
}
对持久状态的所有读取或写入访问都必须通过传递给事务状态 facet 的同步函数来执行。 这样,事务系统便能以事务方式执行或取消这些操作。 要在 grain 中使用事务状态,可定义要持久保存的可序列化状态类,并在 grain 的构造函数中使用TransactionalStateAttribute声明事务状态。 后者声明状态名称和(可选)要使用的事务状态存储。 有关详细信息,请参阅设置。
[AttributeUsage(AttributeTargets.Parameter)]
public class TransactionalStateAttribute : Attribute
{
public TransactionalStateAttribute(string stateName, string storageName = null)
{
// ...
}
}
例如,Balance
状态对象的定义如下:
namespace TransactionalExample.Abstractions;
[GenerateSerializer]
public record class Balance
{
[Id(0)]
public decimal Value { get; set; } = 1_000;
}
前面的状态对象:
- 使用GenerateSerializerAttribute进行修饰,以指示Orleans代码生成器生成序列化程序。
- 具有
Value
属性,该属性使用IdAttribute
进行修饰以唯一识别成员。
然后,Balance
状态对象用于AccountGrain
实现,如下所示:
namespace TransactionalExample.Grains;
[Reentrant]
public class AccountGrain : Grain, IAccountGrain
{
private readonly ITransactionalState<Balance> _balance;
public AccountGrain(
[TransactionalState(nameof(balance))]
ITransactionalState<Balance> balance) =>
_balance = balance ?? throw new ArgumentNullException(nameof(balance));
public Task Deposit(decimal amount) =>
_balance.PerformUpdate(
balance => balance.Value += amount);
public Task Withdraw(decimal amount) =>
_balance.PerformUpdate(balance =>
{
if (balance.Value < amount)
{
throw new InvalidOperationException(
$"Withdrawing {amount} credits from account " +
$"\"{this.GetPrimaryKeyString()}\" would overdraw it." +
$" This account has {balance.Value} credits.");
}
balance.Value -= amount;
});
public Task<decimal> GetBalance() =>
_balance.PerformRead(balance => balance.Value);
}
重要
必须使用ReentrantAttribute标记事务 grain,以确保事务上下文正确传递到 grain 调用。
在前面的示例中,TransactionalStateAttribute用于声明balance
构造函数参数应与名为"balance"
的事务状态相关联。 使用此声明,Orleans将注入ITransactionalState<TState>实例,该实例具有从名为"TransactionStore"
的事务状态存储加载的状态。 可以通过 PerformUpdate
修改或者通过 PerformRead
读取状态。 事务基础结构将确保作为事务的一部分执行的任何此类更改,即使是分布在Orleans群集上的多个 grain,也会在完成创建事务的 grain 调用(前面示例中的IAtmGrain.Transfer
)后全部提交或全部撤消。
从客户端调用事务方法
调用事务 grain 方法的推荐方式是使用ITransactionClient
。 配置Orleans客户端时,会自动向依赖项注入服务提供程序注册ITransactionClient
。 ITransactionClient
用于创建事务上下文并调用该上下文中的事务 grain 方法。 以下示例演示了如何使用ITransactionClient
调用事务 grain 方法。
using IHost host = Host.CreateDefaultBuilder(args)
.UseOrleansClient((_, client) =>
{
client.UseLocalhostClustering()
.UseTransactions();
})
.Build();
await host.StartAsync();
var client = host.Services.GetRequiredService<IClusterClient>();
var transactionClient= host.Services.GetRequiredService<ITransactionClient>();
var accountNames = new[] { "Xaawo", "Pasqualino", "Derick", "Ida", "Stacy", "Xiao" };
var random = Random.Shared;
while (!Console.KeyAvailable)
{
// Choose some random accounts to exchange money
var fromIndex = random.Next(accountNames.Length);
var toIndex = random.Next(accountNames.Length);
while (toIndex == fromIndex)
{
// Avoid transferring to/from the same account, since it would be meaningless
toIndex = (toIndex + 1) % accountNames.Length;
}
var fromKey = accountNames[fromIndex];
var toKey = accountNames[toIndex];
var fromAccount = client.GetGrain<IAccountGrain>(fromKey);
var toAccount = client.GetGrain<IAccountGrain>(toKey);
// Perform the transfer and query the results
try
{
var transferAmount = random.Next(200);
await transactionClient.RunTransaction(
TransactionOption.Create,
async () =>
{
await fromAccount.Withdraw(transferAmount);
await toAccount.Deposit(transferAmount);
});
var fromBalance = await fromAccount.GetBalance();
var toBalance = await toAccount.GetBalance();
Console.WriteLine(
$"We transferred {transferAmount} credits from {fromKey} to " +
$"{toKey}.\n{fromKey} balance: {fromBalance}\n{toKey} balance: {toBalance}\n");
}
catch (Exception exception)
{
Console.WriteLine(
$"Error transferring credits from " +
$"{fromKey} to {toKey}: {exception.Message}");
if (exception.InnerException is { } inner)
{
Console.WriteLine($"\tInnerException: {inner.Message}\n");
}
Console.WriteLine();
}
// Sleep and run again
await Task.Delay(TimeSpan.FromMilliseconds(200));
}
在前面的客户端代码中:
- 使用
UseOrleansClient
配置IHostBuilder
。IClientBuilder
使用 localhost 聚类分析和事务。
IClusterClient
和ITransactionClient
接口从服务提供程序中检索。- 为
from
和to
变量分配其IAccountGrain
引用。 ITransactionClient
用于创建事务,调用:from
帐户 grain 引用上的Withdraw
。to
帐户 grain 引用上的Deposit
。
事务始终提交,除非指定的transactionDelegate
中引发异常或存在自相矛盾项transactionOption
。 虽然调用事务 grain 方法的推荐方式是使用ITransactionClient
,但也可以直接从另一个 grain 调用事务性 grain 方法。
从另一个 grain 调用事务方法
grain 接口上的事务方法的调用方式如同任何其他 grain 方法。 作为使用ITransactionClient
的替代方法,下面的AtmGrain
实现在IAccountGrain
接口上调用Transfer
方法(事务性)。
请考虑AtmGrain
实现,它会解析两个引用的帐户 grain,并发出对Withdraw
和Deposit
的相应调用:
namespace TransactionalExample.Grains;
[StatelessWorker]
public class AtmGrain : Grain, IAtmGrain
{
public Task Transfer(
string fromId,
string toId,
decimal amount) =>
Task.WhenAll(
GrainFactory.GetGrain<IAccountGrain>(fromId).Withdraw(amount),
GrainFactory.GetGrain<IAccountGrain>(toId).Deposit(amount));
}
客户端应用代码可以按事务方式调用AtmGrain.Transfer
,如下所示:
IAtmGrain atmOne = client.GetGrain<IAtmGrain>(0);
Guid from = Guid.NewGuid();
Guid to = Guid.NewGuid();
await atmOne.Transfer(from, to, 100);
uint fromBalance = await client.GetGrain<IAccountGrain>(from).GetBalance();
uint toBalance = await client.GetGrain<IAccountGrain>(to).GetBalance();
在前面的调用中,IAtmGrain
用于将 100 个货币单位从一个帐户转帐到另一个帐户。 转帐完成后,将查询这两个帐户以获取当前余额。 货币转帐以及两个帐户查询都作为 ACID 事务执行。
如前面的示例所示,事务可以像其他 grain 调用一样在 Task
内返回值。 但在调用失败时,它们不会引发应用程序异常,而是引发OrleansTransactionException或TimeoutException。 如果应用程序在事务期间引发异常并且该异常导致事务失败(而不是由于其他系统故障而失败),则应用程序异常将是 OrleansTransactionException
的内部异常。
如果引发 OrleansTransactionAbortedException 类型的事务异常,则表示事务失败并可重试。 引发的任何其他异常都表示事务以未知状态终止。 由于事务是分布式操作,因此处于未知状态的事务可能已成功、失败或仍在进行中。 出于此原因,在验证状态或重试操作之前,建议允许调用超时期限 (SiloMessagingOptions.SystemResponseTimeout) 完全消逝,以避免发生级联式中止。