Dela via


Observatörer

Det finns situationer där ett enkelt meddelande-/svarsmönster inte räcker och klienten måste ta emot asynkrona meddelanden. En användare kanske till exempel vill få ett meddelande när ett nytt snabbmeddelande har publicerats av en vän.

Klientobservatörer är en mekanism som gör det möjligt att meddela klienter asynkront. Övervakningsgränssnitt måste ärva från IGrainObserver, och alla metoder måste returnera antingen void, Task, Task<TResult>, ValueTaskeller ValueTask<TResult>. En returtyp void av rekommenderas inte eftersom det kan uppmuntra användningen av async void på implementeringen, vilket är ett farligt mönster eftersom det kan leda till programkrascher om ett undantag utlöses från metoden. För scenarier med bästa möjliga insatser bör du i stället överväga att tillämpa OneWayAttribute på observatörens gränssnittsmetod. Detta gör att mottagaren inte skickar ett svar för metodanropet och gör att metoden returneras omedelbart på anropsplatsen, utan att vänta på ett svar från övervakaren. Ett korn anropar en metod på en observatör genom att anropa den som vilken korngränssnittsmetod som helst. Körningen Orleans säkerställer leverans av begäranden och svar. Ett vanligt användningsfall för observatörer är att värva en klient för att ta emot meddelanden när en händelse inträffar i Orleans programmet. Ett korn som publicerar sådana meddelanden bör tillhandahålla ett API för att lägga till eller ta bort observatörer. Dessutom är det vanligtvis praktiskt att exponera en metod som gör att en befintlig prenumeration kan avbrytas.

Kornutvecklare kan använda en verktygsklass, till exempel ObserverManager<TObserver> för att förenkla utvecklingen av observerade korntyper. Till skillnad från korn, som automatiskt återaktiveras efter behov efter fel, är klienter inte feltoleranta: en klient som misslyckas kanske aldrig återställs. Därför ObserverManager<T> tar verktyget bort prenumerationer efter en konfigurerad varaktighet. Klienter som är aktiva bör prenumerera på en timer igen för att hålla prenumerationen aktiv.

Om du vill prenumerera på ett meddelande måste klienten först skapa ett lokalt objekt som implementerar övervakningsgränssnittet. Den anropar sedan en metod på observatörsfabriken för CreateObjectReferenceatt omvandla objektet till en kornreferens, som sedan kan skickas till prenumerationsmetoden på meddelandekornet.

Den här modellen kan också användas av andra korn för att ta emot asynkrona meddelanden. Korn kan också implementera IGrainObserver gränssnitt. Till skillnad från i klientprenumerationsfallet implementerar det prenumererande kornet bara övervakningsgränssnittet och skickar en referens till sig själv (t.ex. this.AsReference<IMyGrainObserverInterface>()). Det finns inget behov CreateObjectReference() av eftersom korn redan är adresserbara.

Kodexempel

Anta att vi har ett korn som regelbundet skickar meddelanden till klienter. För enkelhetens skull är meddelandet i vårt exempel en sträng. Vi definierar först gränssnittet på klienten som ska ta emot meddelandet.

Gränssnittet ser ut så här

public interface IChat : IGrainObserver
{
    Task ReceiveMessage(string message);
}

Det enda som är speciellt är att gränssnittet ska ärva från IGrainObserver. Nu ska alla klienter som vill observera dessa meddelanden implementera en klass som implementerar IChat.

Det enklaste fallet skulle vara ungefär så här:

public class Chat : IChat
{
    public Task ReceiveMessage(string message)
    {
        Console.WriteLine(message);
        return Task.CompletedTask;
    }
}

På servern bör vi nu ha ett Korn som skickar dessa chattmeddelanden till klienter. Grain bör också ha en mekanism för klienter att prenumerera och avbryta prenumerationen för meddelanden. För prenumerationer kan kornet använda en instans av verktygsklassen ObserverManager<TObserver>.

Kommentar

ObserverManager<TObserver> är en del av Orleans sedan version 7.0. För äldre versioner kan följande implementering kopieras.

class HelloGrain : Grain, IHello
{
    private readonly ObserverManager<IChat> _subsManager;

    public HelloGrain(ILogger<HelloGrain> logger)
    {
        _subsManager =
            new ObserverManager<IChat>(
                TimeSpan.FromMinutes(5), logger);
    }

    // Clients call this to subscribe.
    public Task Subscribe(IChat observer)
    {
        _subsManager.Subscribe(observer, observer);

        return Task.CompletedTask;
    }

    //Clients use this to unsubscribe and no longer receive messages.
    public Task UnSubscribe(IChat observer)
    {
        _subsManager.Unsubscribe(observer);

        return Task.CompletedTask;
    }
}

Om du vill skicka ett meddelande till klienter kan Notify metoden för instansen ObserverManager<IChat> användas. Metoden använder en Action<T> metod eller ett lambda-uttryck (där T är av typen IChat här). Du kan anropa valfri metod i gränssnittet för att skicka den till klienter. I vårt fall har vi bara en metod, ReceiveMessage, och vår sändningskod på servern skulle se ut så här:

public Task SendUpdateMessage(string message)
{
    _subsManager.Notify(s => s.ReceiveMessage(message));

    return Task.CompletedTask;
}

Nu har vår server en metod för att skicka meddelanden till observatörsklienter, två metoder för att prenumerera/avprenumerera och klienten har implementerat en klass som kan observera kornmeddelandena. Det sista steget är att skapa en observatörsreferens på klienten med hjälp av vår tidigare implementerade Chat klass och låta den ta emot meddelandena efter att ha prenumererat på den.

Koden skulle se ut så här:

//First create the grain reference
var friend = _grainFactory.GetGrain<IHello>(0);
Chat c = new Chat();

//Create a reference for chat, usable for subscribing to the observable grain.
var obj = _grainFactory.CreateObjectReference<IChat>(c);

//Subscribe the instance to receive messages.
await friend.Subscribe(obj);

Nu när vårt korn på servern anropar SendUpdateMessage metoden får alla prenumerationsklienter meddelandet. I vår klientkod tar instansen Chat i variabeln c emot meddelandet och matar ut det till konsolen.

Viktigt!

Objekt som skickas till CreateObjectReference lagras via en WeakReference<T> och kommer därför att samlas in skräp om det inte finns några andra referenser.

Användarna bör ha en referens för varje observatör som de inte vill ska samlas in.

Kommentar

Observatörer är i sig otillförlitliga eftersom en klient som är värd för en observatör kan misslyckas och observatörer som skapats efter återställningen har olika (randomiserade) identiteter. ObserverManager<TObserver> förlitar sig på regelbunden återprenumerering av observatörer, enligt beskrivningen ovan, så att inaktiva observatörer kan avlägsnas.

Körningsmodell

Implementeringar av IGrainObserver registreras via ett anrop till IGrainFactory.CreateObjectReference och varje anrop till den metoden skapar en ny referens som pekar på implementeringen. Orleans kommer att köra begäranden som skickas till var och en av dessa referenser en i taget, till slutförande. Observatörer är icke-återaktiverade och därför kommer samtidiga begäranden till en observatör inte att interfolieras av Orleans. Om det finns flera observatörer som tar emot begäranden samtidigt kan dessa begäranden köras parallellt. Körning av övervakningsmetoder påverkas inte av attribut som AlwaysInterleaveAttribute eller ReentrantAttribute: körningsmodellen kan inte anpassas av en utvecklare.