Поделиться через


IConnectableObservable<T>. Метод Connect

Подключает наблюдаемый объект .

Пространство имен:System.Reactive.Subjects
Сборки: System.Reactive (в System.Reactive.dll)

Синтаксис

'Declaration
Function Connect As IDisposable
'Usage
Dim instance As IConnectableObservable
Dim returnValue As IDisposable

returnValue = instance.Connect()
IDisposable Connect()
IDisposable^ Connect()
abstract Connect : unit -> IDisposable 
function Connect() : IDisposable

Возвращаемое значение

Тип: System.IDisposable
Объект IDisposable, используемый для отключения наблюдаемого объекта.

Примеры

В следующем примере мы преобразуем холодный наблюдаемый источник последовательности в горячий с помощью оператора Publish, который возвращает экземпляр IConnectableObservable T>, который называется горячим<. Оператор Publish предоставляет механизм для совместного использования подписок путем трансляции одной подписки нескольким подписчикам. Hot выступает в качестве прокси-сервера и подписывается на источник, а затем, получая значения из источника, отправляет их своим подписчикам. Чтобы установить подписку на резервный источник и начать получать значения, мы используем метод IConnectableObservable.Connect(). Так как IConnectableObservable наследует IObservable, мы можем использовать подписку на эту горячую последовательность еще до запуска. Обратите внимание, что в примере горячая последовательность не запущена, когда подписка subscription1 на нее подписывается. Таким образом, значение не передается подписчику. После вызова Connect значения отправляются в подписку subscription1. После задержки в 3 секунды подписка subscription2 подписывается на горячую и начинает получать значения сразу из текущей позиции (в данном случае 3) до конца. Выходные данные выглядят следующим образом.

Current Time: 6/1/2011 3:38:49 PM

Current Time after 1st subscription: 6/1/2011 3:38:49 PM

Current Time after Connect: 6/1/2011 3:38:52 PM

Observer 1: OnNext: 0

Observer 1: OnNext: 1

Current Time just before 2nd subscription: 6/1/2011 3:38:55 PM 

Observer 1: OnNext: 2

Observer 1: OnNext: 3

Observer 2: OnNext: 3

Observer 1: OnNext: 4

Observer 2: OnNext: 4
       
Console.WriteLine("Current Time: " + DateTime.Now);
var source = Observable.Interval(TimeSpan.FromSeconds(1));   //creates a sequence

IConnectableObservable<long> hot = Observable.Publish<long>(source);  // convert the sequence into a hot sequence

IDisposable subscription1 = hot.Subscribe(     // no value is pushed to 1st subscription at this point
                            x => Console.WriteLine("Observer 1: OnNext: {0}", x),
                            ex => Console.WriteLine("Observer 1: OnError: {0}", ex.Message),
                            () => Console.WriteLine("Observer 1: OnCompleted"));
Console.WriteLine("Current Time after 1st subscription: " + DateTime.Now);
Thread.Sleep(3000);  //idle for 3 seconds
hot.Connect();       // hot is connected to source and starts pushing value to subscribers 
Console.WriteLine("Current Time after Connect: " + DateTime.Now);
Thread.Sleep(3000);  //idle for 3 seconds
Console.WriteLine("Current Time just before 2nd subscription: " + DateTime.Now);
IDisposable subscription2 = hot.Subscribe(     // value will immediately be pushed to 2nd subscription
                            x => Console.WriteLine("Observer 2: OnNext: {0}", x),
                            ex => Console.WriteLine("Observer 2: OnError: {0}", ex.Message),
                            () => Console.WriteLine("Observer 2: OnCompleted"));
Console.ReadKey();

См. также:

Ссылка

Интерфейс IConnectableObservable<T>

Пространство имен System.Reactive.Subjects