Orleans transakcí
Orleans podporuje distribuované transakce ACID proti trvalému stavu zrnitosti. Transakce se implementují pomocí Microsoftu .Orleans Transakce balíčku NuGet. Zdrojový kód ukázkové aplikace v tomto článku se skládá ze čtyř projektů:
- Abstrakce: Knihovna tříd obsahující rozhraní grain a sdílené třídy.
- Zrna: Knihovna tříd obsahující implementace zrnitosti.
- Server: Konzolová aplikace, která využívá abstrakce a knihovny tříd zrn a funguje jako Orleans sila.
- Klient: Konzolová aplikace, která využívá knihovnu Orleans abstrakcí tříd, která představuje klienta.
Nastavení
Orleans transakce jsou opt-in. Pro použití transakcí musí být nakonfigurovaný silo i klient. Pokud nejsou nakonfigurované, obdrží všechna volání transakčních metod v implementaci agregačního intervalu OrleansTransactionsDisabledException. Pokud chcete povolit transakce na silu, zavolejte SiloBuilderExtensions.UseTransactions tvůrce hostitelů sila:
var builder = Host.CreateDefaultBuilder(args)
UseOrleans((context, siloBuilder) =>
{
siloBuilder.UseTransactions();
});
Podobně pokud chcete povolit transakce na klientovi, zavolejte ClientBuilderExtensions.UseTransactions tvůrce hostitelů klienta:
var builder = Host.CreateDefaultBuilder(args)
UseOrleansClient((context, clientBuilder) =>
{
clientBuilder.UseTransactions();
});
Úložiště transakčního stavu
Pokud chcete používat transakce, musíte nakonfigurovat úložiště dat. K podpoře různých úložišť dat s transakcemi se používá abstrakce ITransactionalStateStorage<TState> úložiště. Tato abstrakce je specifická pro potřebytransakcíchIGrainStorage Pokud chcete použít úložiště specifické pro transakce, nakonfigurujte silo pomocí jakékoli implementace ITransactionalStateStorage
, jako je Azure (AddAzureTableTransactionalStateStorage).
Představte si například následující konfiguraci tvůrce hostitelů:
await Host.CreateDefaultBuilder(args)
.UseOrleans((_, silo) =>
{
silo.UseLocalhostClustering();
if (Environment.GetEnvironmentVariable(
"ORLEANS_STORAGE_CONNECTION_STRING") is { } connectionString)
{
silo.AddAzureTableTransactionalStateStorage(
"TransactionStore",
options => options.ConfigureTableServiceClient(connectionString));
}
else
{
silo.AddMemoryGrainStorageAsDefault();
}
silo.UseTransactions();
})
.RunConsoleAsync();
Pro účely vývoje platí, že pokud pro úložiště dat, které potřebujete, není k dispozici úložiště specifické pro transakce, můžete místo toho použít IGrainStorage
implementaci. V případě jakéhokoli transakčního stavu, který nemá nakonfigurované úložiště, se transakce pokusí převzít služby při selhání do úložiště grain pomocí mostu. Přístup k transakčnímu stavu prostřednictvím mostu do úložiště zrnek je méně efektivní a nemusí být v budoucnu podporován. Proto doporučujeme toto použít pouze pro účely vývoje.
Rozhraní pro agregační intervaly
Aby bylo možné podporovat transakce, transakční metody v rozhraní grain musí být označeny jako součást transakce pomocí TransactionAttribute. Atribut musí indikovat, jak se volání agregace chová v transakčním prostředí, jak je podrobně popsáno s následujícími TransactionOption hodnotami:
- TransactionOption.Create: Volání je transakční a vždy vytvoří nový kontext transakce (spustí novou transakci), i když je volána v existujícím kontextu transakce.
- TransactionOption.Join: Volání je transakční, ale lze volat pouze v kontextu existující transakce.
- TransactionOption.CreateOrJoin: Volání je transakční. Pokud je volána v kontextu transakce, použije tento kontext, jinak vytvoří nový kontext.
- TransactionOption.Suppress: Volání není transakční, ale lze volat z transakce. Pokud je volána v kontextu transakce, kontext se nepředá volání.
- TransactionOption.Supported: Volání není transakční, ale podporuje transakce. Pokud je volána v kontextu transakce, kontext se předá volání.
- TransactionOption.NotAllowed: Volání není transakční a nelze volat z transakce. Pokud je volána v kontextu transakce, vyvolá NotSupportedException.
Volání lze označit jako TransactionOption.Create
, což znamená, že volání vždy spustí svou transakci. Například Transfer
operace v níže uvedeném agregačním intervalu atm vždy spustí novou transakci, která zahrnuje dva odkazované účty.
namespace TransactionalExample.Abstractions;
public interface IAtmGrain : IGrainWithIntegerKey
{
[Transaction(TransactionOption.Create)]
Task Transfer(string fromId, string toId, decimal amountToTransfer);
}
Transakční operace Withdraw
a agregační agregace účtů jsou označeny TransactionOption.Join
, což znamená, že je lze volat pouze v kontextu existující transakce, což by byl případ, kdyby byly volány během IAtmGrain.Transfer
Deposit
. Volání GetBalance
je označené CreateOrJoin
tak, aby bylo možné volat z existující transakce, například prostřednictvím IAtmGrain.Transfer
, nebo samostatně.
namespace TransactionalExample.Abstractions;
public interface IAccountGrain : IGrainWithStringKey
{
[Transaction(TransactionOption.Join)]
Task Withdraw(decimal amount);
[Transaction(TransactionOption.Join)]
Task Deposit(decimal amount);
[Transaction(TransactionOption.CreateOrJoin)]
Task<decimal> GetBalance();
}
Důležitá poznámka
OnActivateAsync
Před voláním nelze označit jako transakční jako jakékoli takové volání, které vyžaduje správné nastavení. Existuje pouze pro rozhraní API odstupňované aplikace. To znamená, že pokus o čtení transakčního stavu jako součást těchto metod vyvolá výjimku v modulu runtime.
Implementace agregačních intervalů
Implementace zrnitosti potřebuje ke ITransactionalState<TState> správě stavu zrnitosti prostřednictvím transakcí ACID omezující vlastnost.
public interface ITransactionalState<TState>
where TState : class, new()
{
Task<TResult> PerformRead<TResult>(
Func<TState, TResult> readFunction);
Task<TResult> PerformUpdate<TResult>(
Func<TState, TResult> updateFunction);
}
Veškerý přístup pro čtení nebo zápis k trvalému stavu musí být proveden prostřednictvím synchronních funkcí předaných omezující vlastnosti transakčního stavu. To umožňuje transakčnímu systému provádět nebo rušit tyto operace transakční transakce. Chcete-li použít transakční stav v rámci agregačního intervalu, definujete serializovatelnou třídu stavu, která má být zachována a deklarovat transakční stav v konstruktoru grainu pomocí TransactionalStateAttribute. Druhá deklaruje název stavu a volitelně i úložiště transakčního stavu, které se má použít. Další informace naleznete v tématu Instalace.
[AttributeUsage(AttributeTargets.Parameter)]
public class TransactionalStateAttribute : Attribute
{
public TransactionalStateAttribute(string stateName, string storageName = null)
{
// ...
}
}
Jako příklad Balance
je objekt stavu definován takto:
namespace TransactionalExample.Abstractions;
[GenerateSerializer]
public record class Balance
{
[Id(0)]
public decimal Value { get; set; } = 1_000;
}
Předchozí stav objekt:
- Je vyzdoben pomocí GenerateSerializerAttribute pokyn generátoru Orleans kódu k vygenerování serializátoru.
Value
Má vlastnost, která je zdobenaIdAttribute
jedinečnou identifikací člena.
Objekt Balance
stavu se pak použije v implementaci AccountGrain
následujícím způsobem:
namespace TransactionalExample.Grains;
[Reentrant]
public class AccountGrain : Grain, IAccountGrain
{
private readonly ITransactionalState<Balance> _balance;
public AccountGrain(
[TransactionalState(nameof(balance))]
ITransactionalState<Balance> balance) =>
_balance = balance ?? throw new ArgumentNullException(nameof(balance));
public Task Deposit(decimal amount) =>
_balance.PerformUpdate(
balance => balance.Value += amount);
public Task Withdraw(decimal amount) =>
_balance.PerformUpdate(balance =>
{
if (balance.Value < amount)
{
throw new InvalidOperationException(
$"Withdrawing {amount} credits from account " +
$"\"{this.GetPrimaryKeyString()}\" would overdraw it." +
$" This account has {balance.Value} credits.");
}
balance.Value -= amount;
});
public Task<decimal> GetBalance() =>
_balance.PerformRead(balance => balance.Value);
}
Důležité
Transakční agregační agregační interval musí být označený, ReentrantAttribute aby se zajistilo, že kontext transakce je správně předán do volání agregačního intervalu.
V předchozím příkladu TransactionalStateAttribute se používá k deklaraci, že balance
konstruktor parametr by měl být přidružen k transakční stav s názvem "balance"
. S touto deklarací Orleans vloží ITransactionalState<TState> instanci se stavem načteným z úložiště transakčního stavu s názvem "TransactionStore"
. Stav lze upravit prostřednictvím PerformUpdate
nebo přečíst prostřednictvím PerformRead
. Infrastruktura transakcí zajistí, že všechny takové změny provedené jako součást transakce, a to i mezi více zrn distribuovanými v clusteru Orleans , budou potvrzeny nebo všechny budou vráceny zpět po dokončení volání agregace, která vytvořila transakci (IAtmGrain.Transfer
v předchozím příkladu).
Volání metod transakcí z klienta
Doporučeným způsobem volání metody agregační transakce je použít ITransactionClient
. Služba ITransactionClient
injektáže závislostí se při konfiguraci klienta automaticky zaregistruje u poskytovatele Orleans služby injektáže závislostí. Slouží ITransactionClient
k vytvoření kontextu transakce a k volání transakčních metod agregačního intervalu v tomto kontextu. Následující příklad ukazuje, jak použít ITransactionClient
k volání transakčních metod agregačního intervalu.
using IHost host = Host.CreateDefaultBuilder(args)
.UseOrleansClient((_, client) =>
{
client.UseLocalhostClustering()
.UseTransactions();
})
.Build();
await host.StartAsync();
var client = host.Services.GetRequiredService<IClusterClient>();
var transactionClient= host.Services.GetRequiredService<ITransactionClient>();
var accountNames = new[] { "Xaawo", "Pasqualino", "Derick", "Ida", "Stacy", "Xiao" };
var random = Random.Shared;
while (!Console.KeyAvailable)
{
// Choose some random accounts to exchange money
var fromIndex = random.Next(accountNames.Length);
var toIndex = random.Next(accountNames.Length);
while (toIndex == fromIndex)
{
// Avoid transferring to/from the same account, since it would be meaningless
toIndex = (toIndex + 1) % accountNames.Length;
}
var fromKey = accountNames[fromIndex];
var toKey = accountNames[toIndex];
var fromAccount = client.GetGrain<IAccountGrain>(fromKey);
var toAccount = client.GetGrain<IAccountGrain>(toKey);
// Perform the transfer and query the results
try
{
var transferAmount = random.Next(200);
await transactionClient.RunTransaction(
TransactionOption.Create,
async () =>
{
await fromAccount.Withdraw(transferAmount);
await toAccount.Deposit(transferAmount);
});
var fromBalance = await fromAccount.GetBalance();
var toBalance = await toAccount.GetBalance();
Console.WriteLine(
$"We transferred {transferAmount} credits from {fromKey} to " +
$"{toKey}.\n{fromKey} balance: {fromBalance}\n{toKey} balance: {toBalance}\n");
}
catch (Exception exception)
{
Console.WriteLine(
$"Error transferring credits from " +
$"{fromKey} to {toKey}: {exception.Message}");
if (exception.InnerException is { } inner)
{
Console.WriteLine($"\tInnerException: {inner.Message}\n");
}
Console.WriteLine();
}
// Sleep and run again
await Task.Delay(TimeSpan.FromMilliseconds(200));
}
V předchozím klientském kódu:
- Konfiguruje se
IHostBuilder
sUseOrleansClient
.- Používá
IClientBuilder
clustering localhost a transakce.
- Používá
- Rozhraní
IClusterClient
aITransactionClient
rozhraní se načítají od poskytovatele služeb. - Proměnné
from
jsouto
přiřazeny jejichIAccountGrain
odkazy. - Slouží
ITransactionClient
k vytvoření transakce, volání:Withdraw
v referenčních informacích o agregačním intervalufrom
účtu.Deposit
v referenčních informacích o agregačním intervaluto
účtu.
Transakce jsou vždy potvrzeny, pokud není výjimka, která je vyvolán v transactionDelegate
zadaném nebo v rozporu transactionOption
. I když doporučený způsob volání transakčních odstupňovaných metod je použít ITransactionClient
, můžete také volat transakční odstupňované metody přímo z jiného zrnitého.
Volání metod transakcí z jiného agregace
Transakční metody v rozhraní grain jsou volána jako jakákoli jiná metoda agregace. Jako alternativní přístup pomocí ITransactionClient
, AtmGrain
implementace níže volá metodu Transfer
(což je transakční) v IAccountGrain
rozhraní.
Zvažte implementaci AtmGrain
, která řeší dvě odkazovaná zrna účtů a provádí příslušná volání Withdraw
a Deposit
:
namespace TransactionalExample.Grains;
[StatelessWorker]
public class AtmGrain : Grain, IAtmGrain
{
public Task Transfer(
string fromId,
string toId,
decimal amount) =>
Task.WhenAll(
GrainFactory.GetGrain<IAccountGrain>(fromId).Withdraw(amount),
GrainFactory.GetGrain<IAccountGrain>(toId).Deposit(amount));
}
Kód klientské aplikace může volat AtmGrain.Transfer
transakčním způsobem následujícím způsobem:
IAtmGrain atmOne = client.GetGrain<IAtmGrain>(0);
Guid from = Guid.NewGuid();
Guid to = Guid.NewGuid();
await atmOne.Transfer(from, to, 100);
uint fromBalance = await client.GetGrain<IAccountGrain>(from).GetBalance();
uint toBalance = await client.GetGrain<IAccountGrain>(to).GetBalance();
V předchozích voláních IAtmGrain
se používá k přenosu 100 jednotek měny z jednoho účtu do druhého. Po dokončení převodu se oba účty dotazují, aby získaly aktuální zůstatek. Převod měny i dotazy na účty se provádějí jako transakce ACID.
Jak je znázorněno v předchozím příkladu, transakce mohou vracet hodnoty v rámci , Task
jako jsou jiné volání agregace. Ale při selhání volání nebudou vyvolány výjimky aplikace, ale spíše nebo OrleansTransactionException TimeoutException. Pokud aplikace vyvolá výjimku během transakce a tato výjimka způsobí selhání transakce (na rozdíl od selhání kvůli jiným systémovým selháním), výjimka aplikace bude vnitřní výjimkou OrleansTransactionException
.
Pokud je vyvolána výjimka transakce typu OrleansTransactionAbortedException, transakce selhala a lze ji opakovat. Jakákoli jiná vyvolaná výjimka značí, že transakce byla ukončena neznámým stavem. Vzhledem k tomu, že transakce jsou distribuované operace, transakce v neznámém stavu mohla být úspěšná, neúspěšná nebo stále probíhá. Z tohoto důvodu je vhodné před ověřením stavu nebo opakováním operace umožnit předáníSiloMessagingOptions.SystemResponseTimeout časového limitu časového limitu () volání, abyste se vyhnuli kaskádovým přerušením.