Sdílet prostřednictvím


Vytváření a přihlášení k odběru jednoduchých pozorovatelných sekvencí

Není nutné implementovat IObservable<T> rozhraní ručně vytvořit pozorovatelné sekvence. Podobně nemusíte ani implementovat IObserver<T> , abyste se přihlásili k odběru sekvence. Instalací sestavení reaktivního rozšíření můžete využít pozorovatelného typu, který poskytuje mnoho statických operátorů LINQ pro vytvoření jednoduché sekvence s nulovým, jedním nebo více prvky. Kromě toho Rx poskytuje metody rozšíření Subscribe, které z hlediska delegátů využívají různé kombinace obslužných rutin OnNext, OnError a OnCompleted.

Vytvoření a přihlášení k odběru jednoduché sekvence

Následující ukázka používá operátor Rozsah pozorovatelného typu k vytvoření jednoduché pozorovatelné kolekce čísel. Pozorovatel se přihlásí k odběru této kolekce pomocí Metody Subscribe pozorovatelné třídy a poskytuje akce, které jsou delegáty, které zpracovávají OnNext, OnError a OnCompleted.

Operátor Rozsah má několik přetížení. V našem příkladu vytvoří sekvenci celých čísel, která začíná x a následně vytvoří pořadová čísla y. 

Jakmile dojde k odběru, odešlou se hodnoty pozorovateli. Delegát OnNext pak vytiskne hodnoty.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;

namespace SimpleSequence
{
    class Program
    {
        static void Main(string[] args)
        {
            IObservable<int> source = Observable.Range(1, 10);
            IDisposable subscription = source.Subscribe(
                x => Console.WriteLine("OnNext: {0}", x),
                ex => Console.WriteLine("OnError: {0}", ex.Message),
                () => Console.WriteLine("OnCompleted"));
            Console.WriteLine("Press ENTER to unsubscribe...");
            Console.ReadLine();
            subscription.Dispose();
        }
    }
}

Když pozorovatel se přihlásí k odběru pozorovatelné sekvence, vlákno volání Subscribe metoda se může lišit od vlákna, ve kterém sekvence běží až do dokončení. Proto je volání Subscribe asynchronní v tom, že volající není blokován, dokud pozorování sekvence nedokončí. Podrobnější informace najdete v tématu Používání plánovačů .

Všimněte si, že Subscribe metoda vrátí IDisposable, takže se můžete odhlásit od sekvenci a snadno ji odstranit. Když vyvoláte metodu Dispose na pozorovatelné sekvenci, pozorovatel přestane naslouchat pozorovateli dat.  Za normálních okolností není nutné explicitně volat Dispose, pokud není nutné odběr předčasně zrušit nebo pokud má pozorovatelná sekvence zdroje delší životnost než pozorovatel. Předplatná v Rx jsou navržená pro scénáře požáru a zapomenutí bez použití finalizační metody. Když je instance IDisposable shromažďována uvolňováním paměti, Rx automaticky nevyhazuje předplatné. Mějte však na paměti, že výchozím chováním pozorovatelných operátorů je co nejdříve odstranit odběr (tj. při publikování zpráv OnCompleted nebo OnError). Kód var x = Observable.Zip(a,b).Subscribe(); například přihlásí x k odběru obou sekvencí a a b. Pokud vyvolá chybu, x se okamžitě odhlásí z odběru funkce b.

Vzorový kód můžete také upravit tak, aby používal operátor Create pozorovatelného typu, který vytvoří a vrátí pozorovatele ze zadaných delegátů akcí OnNext, OnError a OnCompleted. Tohoto pozorovatele pak můžete předat metodě Subscribe pozorovatelného typu. Následující ukázka ukazuje, jak to udělat.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;

namespace SimpleSequence
{
    class Program
    {
        static void Main(string[] args)
        {
            IObservable<int> source = Observable.Range(1, 10);
            IObserver<int> obsvr = Observer.Create<int>(
                x => Console.WriteLine("OnNext: {0}", x),
                ex => Console.WriteLine("OnError: {0}", ex.Message),
                () => Console.WriteLine("OnCompleted"));
            IDisposable subscription = source.Subscribe(obsvr);
            Console.WriteLine("Press ENTER to unsubscribe...");
            Console.ReadLine();
            subscription.Dispose();
       }
    }
}

Kromě vytvoření pozorovatelné sekvence od začátku můžete převést existující enumerátory, události .NET a asynchronní vzory na pozorovatelné sekvence. Další témata v této části vám ukážou, jak to udělat.

Všimněte si, že toto téma ukazuje jenom několik operátorů, které můžou vytvořit pozorovatelnou sekvenci úplně od začátku. Další informace o dalších operátorech LINQ najdete v tématu Dotazování pozorovatelných sekvencí pomocí operátorů LINQ.

Použití časovače

Následující ukázka používá operátor Časovač k vytvoření sekvence. Sekvence vysune první hodnotu po uplynutí 5 sekundy a pak vysune následující hodnoty každou 1 sekundu. Pro ilustraci zřetězeme operátor časového razítka s dotazem tak, aby se každá vysunutá hodnota připojila k času publikování. Když se tak přihlásíme k odběru této zdrojové sekvence, můžeme získat její hodnotu i časové razítko.

Console.WriteLine(“Current Time: “ + DateTime.Now);

var source = Observable.Timer(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(1))
                       .Timestamp();
using (source.Subscribe(x => Console.WriteLine("{0}: {1}", x.Value, x.Timestamp)))
      {
           Console.WriteLine("Press any key to unsubscribe");
           Console.ReadKey();
      }
Console.WriteLine("Press any key to exit");
Console.ReadKey();

Výstup bude podobný tomuto:

Current Time: 5/31/2011 5:35:08 PM

Press any key to unsubscribe

0: 5/31/2011 5:35:13 PM -07:00

1: 5/31/2011 5:35:14 PM -07:00

2: 5/31/2011 5:35:15 PM -07:00

Pomocí operátoru časového razítka jsme ověřili, že první položka je skutečně vysunutá 5 sekund po zahájení sekvence a každá položka se publikuje o 1 sekundu později.

Převod výčtové kolekce na pozorovatelnou sekvenci

Pomocí operátoru ToObservable můžete převést obecnou výčtovou kolekci na pozorovatelnou sekvenci a přihlásit se k jejímu odběru.

IEnumerable<int> e = new List<int> { 1, 2, 3, 4, 5 };

IObservable<int> source = e.ToObservable();
IDisposable subscription = source.Subscribe(
                            x => Console.WriteLine("OnNext: {0}", x),
                            ex => Console.WriteLine("OnError: {0}", ex.Message),
                            () => Console.WriteLine("OnCompleted"));
Console.ReadKey();

Studená vs. horká pozorovatelná

Studená pozorovatelná začnou běžet při odběru, tj. pozorovatelná posloupnost začne tlačit hodnoty pozorovatelům pouze při zavolání odběru. Hodnoty se také nesdílejí mezi předplatiteli. To se liší od horkých pozorovatelných událostí, jako jsou události přesunu myší nebo burzovní akcie, které již vytvářejí hodnoty ještě před aktivací předplatného. Když se pozorovatel přihlásí k odběru horké pozorovatelné sekvence, získá aktuální hodnotu v datovém proudu. Horká pozorovatelná sekvence se sdílí mezi všemi odběrateli a každému odběrateli se odešle další hodnota v pořadí. Například i v případě, že se nikdo neuplatní k určitému burzovnímu tickeru, bude burza dál aktualizovat svou hodnotu na základě pohybu trhu. Když předplatitel zaregistruje zájem o tento ticker, automaticky získá nejnovější tick.

Následující příklad ukazuje studenou pozorovatelnou sekvenci. V tomto příkladu pomocí operátoru Interval vytvoříme jednoduchou pozorovatelnou posloupnost čísel v určitých intervalech, v tomto případě každých 1 sekundu.

Dva pozorovatelé se pak přihlásí k odběru této sekvence a vypíše její hodnoty. Všimněte si, že pořadí se resetuje pro každého odběratele, ve kterém druhé předplatné restartuje sekvenci z první hodnoty.

IObservable<int> source = Observable.Interval(TimeSpan.FromSeconds(1));   

IDisposable subscription1 = source.Subscribe(
                x => Console.WriteLine("Observer 1: OnNext: {0}", x),
                ex => Console.WriteLine("Observer 1: OnError: {0}", ex.Message),
                () => Console.WriteLine("Observer 1: OnCompleted"));

IDisposable subscription2 = source.Subscribe(
                x => Console.WriteLine("Observer 2: OnNext: {0}", x),
                ex => Console.WriteLine("Observer 2: OnError: {0}", ex.Message),
                () => Console.WriteLine("Observer 2: OnCompleted"));

Console.WriteLine("Press any key to unsubscribe");
Console.ReadLine();
subscription1.Dispose();
subscription2.Dispose();

V následujícím příkladu převedeme předchozí studenou pozorovatelnou sekvenci source na horkou pomocí operátoru Publish, který vrátí instanci IConnectableObservable, kterou pojmenujeme hot. Operátor Publish poskytuje mechanismus pro sdílení předplatných vysíláním jednoho předplatného více odběratelům. hot funguje jako proxy server a přihlašuje se k odběru sourcea pak, jakmile obdrží hodnoty od source, nasdílí je vlastním odběratelům. K vytvoření předplatného backing source a zahájení příjmu hodnot používáme metodu IConnectableObservable.Connect(). Vzhledem k tomu, že IConnectableObservable dědí IObservable, můžeme použít Možnost Přihlásit k odběru této horké sekvence ještě předtím, než začne běžet. Všimněte si, že v příkladu nebyla spuštěna horká sekvence při subscription1 přihlášení k odběru. Odběrateli se proto neodesílala žádná hodnota. Po volání connect se hodnoty nasdílí do subscription1. Po 3sekundovém subscription2 zpoždění se přihlásí k hot odběru a začne přijímat hodnoty okamžitě od aktuální pozice (v tomto případě 3) až do konce. Výstup vypadá takto:

Current Time: 6/1/2011 3:38:49 PM

Current Time after 1st subscription: 6/1/2011 3:38:49 PM

Current Time after Connect: 6/1/2011 3:38:52 PM

Observer 1: OnNext: 0

Observer 1: OnNext: 1

Current Time just before 2nd subscription: 6/1/2011 3:38:55 PM 

Observer 1: OnNext: 2

Observer 1: OnNext: 3

Observer 2: OnNext: 3

Observer 1: OnNext: 4

Observer 2: OnNext: 4
       
Console.WriteLine("Current Time: " + DateTime.Now);
var source = Observable.Interval(TimeSpan.FromSeconds(1));            //creates a sequence

IConnectableObservable<long> hot = Observable.Publish<long>(source);  // convert the sequence into a hot sequence

IDisposable subscription1 = hot.Subscribe(                        // no value is pushed to 1st subscription at this point
                            x => Console.WriteLine("Observer 1: OnNext: {0}", x),
                            ex => Console.WriteLine("Observer 1: OnError: {0}", ex.Message),
                            () => Console.WriteLine("Observer 1: OnCompleted"));
Console.WriteLine("Current Time after 1st subscription: " + DateTime.Now);
Thread.Sleep(3000);  //idle for 3 seconds
hot.Connect();       // hot is connected to source and starts pushing value to subscribers 
Console.WriteLine("Current Time after Connect: " + DateTime.Now);
Thread.Sleep(3000);  //idle for 3 seconds
Console.WriteLine("Current Time just before 2nd subscription: " + DateTime.Now);

IDisposable subscription2 = hot.Subscribe(     // value will immediately be pushed to 2nd subscription
                            x => Console.WriteLine("Observer 2: OnNext: {0}", x),
                            ex => Console.WriteLine("Observer 2: OnError: {0}", ex.Message),
                            () => Console.WriteLine("Observer 2: OnCompleted"));
Console.ReadKey();