Freigeben über


Aggregationsintervall-Persistenz

Aggregationsintervalle können über mehrere benannte persistente Datenobjekte verfügen. Diese Zustandsobjekte werden während der Aktivierung der Aggregationsintervalle aus dem Speicher geladen, sodass sie während der Anforderungen verfügbar sind. Die Aggregationsintervall-Persistenz verwendet ein erweiterbares Plug-In-Modell, sodass Speicheranbieter für jede Datenbank verwendet werden können. Dieses Persistenzmodell ist der Einfachheit halber konzipiert und soll nicht alle Datenzugriffsmuster abdecken. Aggregationsintervalle können auch direkt auf Datenbanken zugreifen, ohne das Aggregationsintervall-Persistenzmodell zu verwenden.

Im obigen Diagramm verfügt UserGrain über einen Profil-Zustand und einen Warenkorb-Zustand, die jeweils in einem separaten Speichersystem gespeichert sind.

Ziele

  1. Mehrere benannte persistente Datenobjekte pro Aggregationsintervall.
  2. Mehrere konfigurierte Speicheranbieter, von denen jeder über eine andere Konfiguration verfügen und von einem anderen Speichersystem unterstützt werden kann.
  3. Speicheranbieter können von der Community entwickelt und veröffentlicht werden.
  4. Speicheranbieter haben die vollständige Kontrolle darüber, wie sie Aggregationsintervall-Zustandsdaten im persistenten Sicherungsspeicher speichern. Corollary: Orleans bietet keine umfassende ORM-Speicherlösung, sondern ermöglicht es benutzerdefinierten Speicheranbietern, bestimmte ORM-Anforderungen bei Bedarf zu unterstützen.

Pakete

Orleans-Aggregationsintervall-Speicheranbieter finden Sie auf NuGet. Offiziell verwaltete Pakete umfassen:

API

Aggregationsintervalle interagieren mit ihrem persistenten Zustand, IPersistentState<TState>wobeiTState der serialisierbare Zustandstyp ist:

public interface IPersistentState<TState> where TState : new()
{
    TState State { get; set; }
    string Etag { get; }
    Task ClearStateAsync();
    Task WriteStateAsync();
    Task ReadStateAsync();
}

Instanzen von IPersistentState<TState> werden als Konstruktorparameter in das Aggregationsintervall eingefügt. Diese Parameter können mit einem PersistentStateAttribute Attribut kommentiert werden, um den Namen des eingefügten Zustands und den Namen des Speicheranbieters zu identifizieren, der ihn bereitstellt. Im folgenden Beispiel wird dies veranschaulicht, indem zwei benannte Zustände in den UserGrain Konstruktor eingefügt werden:

public class UserGrain : Grain, IUserGrain
{
    private readonly IPersistentState<ProfileState> _profile;
    private readonly IPersistentState<CartState> _cart;

    public UserGrain(
        [PersistentState("profile", "profileStore")] IPersistentState<ProfileState> profile,
        [PersistentState("cart", "cartStore")] IPersistentState<CartState> cart)
    {
        _profile = profile;
        _cart = cart;
    }
}

Verschiedene Aggregationsintervall-Typen können unterschiedliche konfigurierte Speicheranbieter verwenden, auch wenn beide denselben Typ haben. Beispielsweise zwei verschiedene Azure Table Storage-Anbieterinstanzen, die mit unterschiedlichen Azure Storage-Konten verbunden sind.

Lesen des Zustands

Der Aggregationsintervall-Zustand wird automatisch gelesen, wenn der Aggregationsintervall aktiviert wird, aber Aggregationsintervalle sind dafür verantwortlich, den Schreibvorgang für jeden geänderten Aggregationsintervall-Zustand bei Bedarf explizit auszulösen.

Wenn ein Aggregationsintervall den aktuellen Zustand für diesen Aggregationsintervall explizit aus dem Sicherungsspeicher lesen möchte, sollte der Aggregationsintervalle die ReadStateAsync Methode aufrufen. Dadurch wird der Aggregationsintervall-Zustand aus dem persistenten Speicher über den Speicheranbieter neu geladen, und die vorherige Arbeitsspeicher-Kopie des Aggregationsintervall-Zustands wird überschrieben und ersetzt, wenn die Task von ReadStateAsync() abgeschlossen ist.

Auf den Wert des Zustands wird mithilfe der State Eigenschaft zugegriffen. Die folgende Methode greift beispielsweise auf den im obigen Code deklarierten Profilzustand zu:

public Task<string> GetNameAsync() => Task.FromResult(_profile.State.Name);

Es ist nicht erforderlich, während des normalen Betriebs ReadStateAsync() aufzurufen. Der Zustand wird während der Aktivierung automatisch geladen. ReadStateAsync() kann jedoch verwendet werden, um den Zustand zu aktualisieren, der extern geändert wird.

Ausführliche Informationen zu Den Mechanismen zur Fehlerbehandlung finden Sie weiter unten im Abschnitt Fehlerzustände .

Schreiben des Zustands

Der Zustand kann mithilfe der State-Eigenschaft geändert werden. Der geänderte Zustand wird nicht automatisch beibehalten. Stattdessen entscheidet der Entwickler, wann der Zustand beibehalten werden soll, indem er die WriteStateAsync -Methode aufruft. Die folgende Methode aktualisiert z. B. eine Eigenschaft auf State und behält den aktualisierten Zustand bei:

public async Task SetNameAsync(string name)
{
    _profile.State.Name = name;
    await _profile.WriteStateAsync();
}

Konzeptionell nimmt die Orleans Runtime eine vollständige Kopie des Aggregationsintervall-Zustand-Datenobjekts für seine Verwendung bei allen Schreibvorgängen. Im Hintergrund kann die Runtime Optimierungsregeln und Heuristiken verwenden, um unter bestimmten Umständen die Durchführung einiger oder aller vollständigen Kopien zu vermeiden, sofern die erwartete logische Isolationssemantik erhalten bleibt.

Ausführliche Informationen zu den Mechanismen zur Fehlerbehandlung finden Sie weiter unten im Abschnitt Fehlerzustände .

Status löschen

Die ClearStateAsync Methode löscht den Aggregationsintervall-Zustand im Speicher. Abhängig vom Anbieter kann dieser Vorgang den Aggregationsintervall-Zustand optional vollständig löschen.

Erste Schritte

Bevor ein Aggregationsintervall Persistenz verwenden kann, muss im Silo ein Speicheranbieter konfiguriert werden.

Konfigurieren Sie zunächst Speicheranbieter, einen für den Profilstatus und einen für den Warenkorbzustand:

var host = new HostBuilder()
    .UseOrleans(siloBuilder =>
    {
        siloBuilder.AddAzureTableGrainStorage(
            name: "profileStore",
            configureOptions: options =>
            {
                // Use JSON for serializing the state in storage
                options.UseJson = true;

                // Configure the storage connection key
                options.ConnectionString =
                    "DefaultEndpointsProtocol=https;AccountName=data1;AccountKey=SOMETHING1";
            })
            .AddAzureBlobGrainStorage(
                name: "cartStore",
                configureOptions: options =>
                {
                    // Use JSON for serializing the state in storage
                    options.UseJson = true;

                    // Configure the storage connection key
                    options.ConnectionString =
                        "DefaultEndpointsProtocol=https;AccountName=data2;AccountKey=SOMETHING2";
                });
    })
    .Build();

Nachdem nun ein Speicheranbieter mit dem Namen "profileStore" konfiguriert wurde, können wir von einem Aggregationsintervall aus auf diesen Anbieter zugreifen.

Der persistente Zustand kann einem Aggregationsintervall auf zwei primäre Arten hinzugefügt werden:

  1. Durch Einfügen von IPersistentState<TState> in den Konstruktor des Aggregationsintervalls.
  2. Durch Erben von Grain<TGrainState>.

Die empfohlene Möglichkeit zum Hinzufügen von Speicher zu einem Aggregationsintervall besteht darin, IPersistentState<TState> zu einem zugeordneten [PersistentState("stateName", "providerName")]-Attribut in den Konstruktor des Aggregationsintervalls einzufügen. Einzelheiten zu Grain<TState> finden Sie weiter unten. Dies wird weiterhin unterstützt, gilt aber als Legacy-Ansatz.

Deklarieren Sie eine Klasse, die den Status des Aggregationsintervalls enthält:

[Serializable]
public class ProfileState
{
    public string Name { get; set; }

    public Date DateOfBirth
}

Fügen Sie IPersistentState<ProfileState> in den Konstruktor des Aggregationsintervalls ein:

public class UserGrain : Grain, IUserGrain
{
    private readonly IPersistentState<ProfileState> _profile;

    public UserGrain(
        [PersistentState("profile", "profileStore")]
        IPersistentState<ProfileState> profile)
    {
        _profile = profile;
    }
}

Hinweis

Der Profilzustand wird zum Zeitpunkt des Einfügens in den Konstruktor nicht geladen, sodass der Zugriff darauf zu diesem Zeitpunkt ungültig ist. Der Zustand wird geladen, bevor OnActivateAsync aufgerufen wird.

Nachdem der Aggregationsintervall nun einen persistenten Zustand aufweist, können wir Methoden hinzufügen, um den Zustand zu lesen und zu schreiben:

public class UserGrain : Grain, IUserGrain
    {
    private readonly IPersistentState<ProfileState> _profile;

    public UserGrain(
        [PersistentState("profile", "profileStore")]
        IPersistentState<ProfileState> profile)
    {
        _profile = profile;
    }

    public Task<string> GetNameAsync() => Task.FromResult(_profile.State.Name);

    public async Task SetNameAsync(string name)
    {
        _profile.State.Name = name;
        await _profile.WriteStateAsync();
    }
}

Fehlerzustände für Persistenzvorgänge

Fehlerzustände für Lesevorgänge

Fehler, die vom Speicheranbieter beim anfänglichen Lesen von Zustandsdaten für diesen bestimmten Aggregationsintervall zurückgegeben werden, schlagen beim Aktivierungsvorgang für diesen Aggregationsintervall fehl. In diesem Fall wird es keinen Aufruf der Rückrufmethode für den OnActivateAsync() Lebenszyklus dieses Aggregationsintervalls geben. Die ursprüngliche Anforderung an den Aggregationsintervall, der die Aktivierung verursacht hat, wird auf die gleiche Weise wie jeder andere Fehler während der Aktivierung des Aggregationsintervalls an den Aufrufer zurückgegeben. Fehler, die vom Speicheranbieter beim Lesen von Zustandsdaten für ein bestimmtes Aggregationsintervall auftreten, führen zu einer Ausnahme von ReadStateAsync()Task. Der Aggregationsintervall kann die Task-Ausnahme behandeln oder ignorieren, genau wie jede andere Task in Orleans.

Jeder Versuch, eine Nachricht an ein Aggregationsintervall zu senden, das zum Startzeitpunkt des Silos aufgrund einer fehlenden/fehlerhaften Speicheranbieterkonfiguration nicht geladen werden konnte, gibt den permanenten Fehler BadProviderConfigException zurück.

Fehlerzustände für Schreibvorgänge

Fehler, die vom Speicheranbieter beim Lesen von Zustandsdaten für ein bestimmtes Aggregationsintervall auftreten, führen zu einer Ausnahme, die von WriteStateAsync()Task ausgelöst wird. In der Regel bedeutet dies, dass die Aggregationsintervall-Aufruf-Ausnahme an den Clientaufrufer zurückgegeben wird, vorausgesetzt, dass WriteStateAsync()Task ordnungsgemäß mit der endgültigen Rückgabe Task für diese Aggregationsintervall-Methode verkettet ist. In bestimmten erweiterten Szenarien ist es jedoch möglich, Aggregationsintervall-Code zu schreiben, um solche Schreibfehler speziell zu behandeln, genau wie alle anderen fehlerhaften Task.

Aggregationsintervalle, die Fehlerbehandlung/Wiederherstellungscode ausführen, müssen Ausnahmen/fehlerhafte WriteStateAsync()Tasks abfangen und nicht erneut auslösen, um anzugeben, dass sie den Schreibfehler erfolgreich behandelt haben.

Empfehlungen

Verwenden der JSON-Serialisierung oder eines anderen versionstoleranten Serialisierungsformats

Code wird weiterentwickelt, und dies umfasst häufig auch Speichertypen. Um diese Änderungen zu berücksichtigen, sollte ein geeignetes Serialisierungsmodul konfiguriert werden. Für die meisten Speicheranbieter steht eine UseJson Option oder eine ähnliche Option zur Verwendung von JSON als Serialisierungsformat zur Verfügung. Stellen Sie sicher, dass beim Entwickeln von Datenverträgen bereits gespeicherte Daten weiterhin geladen werden können.

Verwenden von Aggregationsintervall-<TState> zum Hinzufügen von Speicher zu einem Aggregationsintervall

Wichtig

Die Verwendung von Grain<T> zum Hinzufügen von Speicher zu einem Aggregationsintervall wird als Legacy-Funktionalität betrachtet: Aggregationsintervall-Speicher sollte mit IPersistentState<T> hinzugefügt werden, wie zuvor beschrieben.

Bei Aggregationsintervall-Klassen, die von Grain<T> erben (wobei T ein anwendungsspezifischer Datentyp ist, der persistiert werden muss), wird der Status automatisch aus dem angegebenen Speicher geladen.

Solche Aggregationsintervalle werden mit einem StorageProviderAttribute gekennzeichnet, das eine benannte Instanz eines Speicheranbieters angibt, der zum Lesen/Schreiben der Zustandsdaten für diesen Aggregationsintervall verwendet werden soll.

[StorageProvider(ProviderName="store1")]
public class MyGrain : Grain<MyGrainState>, /*...*/
{
  /*...*/
}

Die Grain<T> Basisklasse definierte die folgenden Methoden für Unterklassen, die aufgerufen werden sollen:

protected virtual Task ReadStateAsync() { /*...*/ }
protected virtual Task WriteStateAsync() { /*...*/ }
protected virtual Task ClearStateAsync() { /*...*/ }

Das Verhalten dieser Methoden entspricht dem ihrer Gegenstücke auf IPersistentState<TState>, die zuvor definiert wurden.

Hinzufügen eines Speicheranbieters

Es gibt zwei Teile der Zustandspersistenz-APIs: die API, die dem Aggregationsintervall über IPersistentState<T> oder Grain<T> verfügbar gemacht wird, und die Speicheranbieter-API, die in IGrainStorage zentriert ist – die Schnittstelle, die Speicheranbieter implementieren müssen:

/// <summary>
/// Interface to be implemented for a storage able to read and write Orleans grain state data.
/// </summary>
public interface IGrainStorage
{
    /// <summary>Read data function for this storage instance.</summary>
    /// <param name="grainType">Type of this grain [fully qualified class name]</param>
    /// <param name="grainReference">Grain reference object for this grain.</param>
    /// <param name="grainState">State data object to be populated for this grain.</param>
    /// <returns>Completion promise for the Read operation on the specified grain.</returns>
    Task ReadStateAsync(
        string grainType, GrainReference grainReference, IGrainState grainState);

    /// <summary>Write data function for this storage instance.</summary>
    /// <param name="grainType">Type of this grain [fully qualified class name]</param>
    /// <param name="grainReference">Grain reference object for this grain.</param>
    /// <param name="grainState">State data object to be written for this grain.</param>
    /// <returns>Completion promise for the Write operation on the specified grain.</returns>
    Task WriteStateAsync(
        string grainType, GrainReference grainReference, IGrainState grainState);

    /// <summary>Delete / Clear data function for this storage instance.</summary>
    /// <param name="grainType">Type of this grain [fully qualified class name]</param>
    /// <param name="grainReference">Grain reference object for this grain.</param>
    /// <param name="grainState">Copy of last-known state data object for this grain.</param>
    /// <returns>Completion promise for the Delete operation on the specified grain.</returns>
    Task ClearStateAsync(
        string grainType, GrainReference grainReference, IGrainState grainState);
}

Erstellen Sie einen benutzerdefinierten Speicheranbieter, indem Sie diese Schnittstelle implementieren und diese Implementierung registrieren . Ein Beispiel für eine vorhandene Speicheranbieterimplementierung finden Sie unter AzureBlobGrainStorage.

Speicheranbietersemantik

Ein nicht transparenter anbieterspezifischer Etag Wert (string) kann von einem Speicheranbieter als Teil der Metadaten für den Aggregationsintervall-Zustand festgelegt werden, die beim Lesen des Zustands aufgefüllt wurden. Einige Anbieter können dies bei null belassen, wenn sie nicht Etagverwenden.

Jeder Versuch, einen Schreibvorgang auszuführen, wenn der Speicheranbieter einen EtagVerstoß gegen eine Einschränkung feststellt, sollte dazu führen, dass der SchreibvorgangTask mit einem vorübergehenden Fehler InconsistentStateException und einem Umbruch der zugrunde liegenden Speicherausnahme abgebrochen wird.

public class InconsistentStateException : OrleansException
{
    public InconsistentStateException(
    string message,
    string storedEtag,
    string currentEtag,
    Exception storageException)
        : base(message, storageException)
    {
        StoredEtag = storedEtag;
        CurrentEtag = currentEtag;
    }

    public InconsistentStateException(
        string storedEtag,
        string currentEtag,
        Exception storageException)
        : this(storageException.Message, storedEtag, currentEtag, storageException)
    {
    }

    /// <summary>The Etag value currently held in persistent storage.</summary>
    public string StoredEtag { get; }

    /// <summary>The Etag value currently held in memory, and attempting to be updated.</summary>
    public string CurrentEtag { get; }
}

Alle anderen Fehlerbedingungen eines Speichervorgangs müssen dazu führen, dass die zurückgegebene Task mit einer Ausnahme unterbrochen wird, die das zugrunde liegende Speicherproblem angibt. In vielen Fällen kann diese Ausnahme an den Aufrufer zurückgegeben werden, der den Speichervorgang durch Aufrufen einer Methode für den Aggregationsintervall ausgelöst hat. Es ist wichtig zu überlegen, ob der Aufrufer diese Ausnahme deserialisieren kann oder nicht. Beispielsweise hat der Client möglicherweise nicht die spezifische Persistenzbibliothek geladen, die den Ausnahmetyp enthält. Aus diesem Grund ist es ratsam, Ausnahmen in Ausnahmen zu konvertieren, die an den Aufrufer zurückgegeben werden können.

Datenzuordnung

Einzelne Speicheranbieter sollten entscheiden, wie der Aggregationsintervall-Zustand am besten gespeichert werden soll. Blob (verschiedene Formate/serialisierte Formulare) oder Spalten pro Feld sind naheliegende Optionen.

Registrieren eines Speicheranbieters

Die Orleans-Runtime löst einen Speicheranbieter vom Dienstanbieter (IServiceProvider), wenn ein Aggregationsintervall erstellt wird. Die Runtime löst eine Instanz von IGrainStorage auf. Wenn der Speicheranbieter benannt wird, z. B. über das [PersistentState(stateName, storageName)]-Attribut, wird eine benannte Instanz von IGrainStorage aufgelöst.

Um eine benannte Instanz von IGrainStorage zu registrieren, verwenden Sie die AddSingletonNamedService Erweiterungsmethode, die dem Beispiel des Anbieters AzureTableGrainStorage hier folgt.