ReplaySubject<, constructeur T>
Crée un objet pouvant être relu.
Namespace:System.Reactive.Subjects
Assemblée: System.Reactive (en System.Reactive.dll)
Syntaxe
'Declaration
Public Sub New
'Usage
Dim instance As New ReplaySubject()
public ReplaySubject()
public:
ReplaySubject()
new : unit -> ReplaySubject
public function ReplaySubject()
Exemples
Dans cet exemple, nous avons créé une classe NewsHeadlineFeed qui n’est qu’un flux d’actualités fictif sous la forme d’une séquence observable de chaînes. Il utilise l’opérateur Generate pour générer en continu un titre d’actualité aléatoire en trois secondes.
Un replaySubject est créé pour s’abonner à deux flux d’actualités de la classe NewsHeadlineFeed. Avant que l’objet ne soit abonné aux flux, l’opérateur Timestamp est utilisé pour horodatage de chaque titre. Ainsi, la séquence à laquelle replaySubject s’abonne en fait est de type chaîne horodatée<>> IObservable<. Une fois la séquence de titre horodatée, les abonnés peuvent s’abonner à l’interface observable du sujet pour observer le ou les flux de données ou un sous-ensemble de flux en fonction de l’horodatage.
Un objet ReplaySubject met en mémoire tampon les éléments qu’il reçoit. Ainsi, un abonnement créé ultérieurement peut accéder aux éléments de la séquence qui ont déjà été mis en mémoire tampon et publiés. Un abonnement est créé à l’objet ReplaySubject qui reçoit uniquement les titres d’actualités locales qui se sont produits 10 secondes avant la création de l’abonnement aux actualités locales. Donc, nous avons essentiellement le ReplaySubject « replay » ce qui s’est passé 10 secondes plus tôt.
Un titre d’actualité local contient simplement la sous-chaîne newsLocation (« dans votre région »).
using System;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Concurrency;
using System.Threading;
namespace Example
{
class Program
{
static void Main()
{
//*****************************************************************************************************//
//*** A subject acts similar to a proxy in that it acts as both a subscriber and a publisher ***//
//*** It's IObserver interface can be used to subscribe to multiple streams or sequences of data. ***//
//*** The data is then published through it's IObservable interface. ***//
//*** ***//
//*** In this example a simple ReplaySubject is used to subscribe to multiple news feeds ***//
//*** that provide random news headlines. Before the subject is subscribed to the feeds, we use ***//
//*** Timestamp operator to timestamp each headline. Subscribers can then subscribe to the subject ***//
//*** observable interface to observe the data stream(s) or a subset of the stream(s) based on ***//
//*** time. ***//
//*** ***//
//*** A ReplaySubject buffers items it receives. So a subscription created at a later time can ***//
//*** access items from the sequence which have already been published. ***//
//*** ***//
//*** A subscriptions is created to the ReplaySubject that receives only local news headlines which ***//
//*** occurred 10 seconds before the local news subscription was created. So we basically have the ***//
//*** ReplaySubject "replay" what happened 10 seconds earlier. ***//
//*** ***//
//*** A local news headline just contains the newsLocation substring ("in your area."). ***//
//*** ***//
//*****************************************************************************************************//
ReplaySubject<Timestamped<string>> myReplaySubject = new ReplaySubject<Timestamped<String>>();
//*****************************************************************//
//*** Create news feed #1 and subscribe the ReplaySubject to it ***//
//*****************************************************************//
NewsHeadlineFeed NewsFeed1 = new NewsHeadlineFeed("Headline News Feed #1");
NewsFeed1.HeadlineFeed.Timestamp().Subscribe(myReplaySubject);
//*****************************************************************//
//*** Create news feed #2 and subscribe the ReplaySubject to it ***//
//*****************************************************************//
NewsHeadlineFeed NewsFeed2 = new NewsHeadlineFeed("Headline News Feed #2");
NewsFeed2.HeadlineFeed.Timestamp().Subscribe(myReplaySubject);
//*****************************************************************************************************//
//*** Create a subscription to the subject's observable sequence. This subscription will filter for ***//
//*** only local headlines that occurred 10 seconds before the subscription was created. ***//
//*** ***//
//*** Since we are using a ReplaySubject with timestamped headlines, we can subscribe to the ***//
//*** headlines already past. The ReplaySubject will "replay" them for the localNewSubscription ***//
//*** from its buffered sequence of headlines. ***//
//*****************************************************************************************************//
Console.WriteLine("Waiting for 10 seconds before subscribing to local news headline feed.\n");
Thread.Sleep(10000);
Console.WriteLine("\n*** Creating local news headline subscription at {0} ***\n", DateTime.Now.ToString());
Console.WriteLine("This subscription asks the ReplaySubject for the buffered headlines that\n" +
"occurred within the last 10 seconds.\n\nPress ENTER to exit.", DateTime.Now.ToString());
DateTime lastestHeadlineTime = DateTime.Now;
DateTime earliestHeadlineTime = lastestHeadlineTime - TimeSpan.FromSeconds(10);
IDisposable localNewsSubscription = myReplaySubject.Where(x => x.Value.Contains("in your area.") &&
(x.Timestamp >= earliestHeadlineTime) &&
(x.Timestamp < lastestHeadlineTime)).Subscribe(x =>
{
Console.WriteLine("\n************************************\n" +
"***[ Local news headline report ]***\n" +
"************************************\n" +
"Time : {0}\n{1}\n\n", x.Timestamp.ToString(), x.Value);
});
Console.ReadLine();
//*******************************//
//*** Cancel the subscription ***//
//*******************************//
localNewsSubscription.Dispose();
//*************************************************************************//
//*** Unsubscribe all the ReplaySubject's observers and free resources. ***//
//*************************************************************************//
myReplaySubject.Dispose();
}
}
//*********************************************************************************//
//*** ***//
//*** The NewsHeadlineFeed class is just a mock news feed in the form of an ***//
//*** observable sequence in Reactive Extensions. ***//
//*** ***//
//*********************************************************************************//
class NewsHeadlineFeed
{
private string feedName; // Feedname used to label the stream
private IObservable<string> headlineFeed; // The actual data stream
private readonly Random rand = new Random(); // Used to stream random headlines.
//*** A list of predefined news events to combine with a simple location string ***//
static readonly string[] newsEvents = { "A tornado occurred ",
"Weather watch for snow storm issued ",
"A robbery occurred ",
"We have a lottery winner ",
"An earthquake occurred ",
"Severe automobile accident "};
//*** A list of predefined location strings to combine with a news event. ***//
static readonly string[] newsLocations = { "in your area.",
"in Dallas, Texas.",
"somewhere in Iraq.",
"Lincolnton, North Carolina",
"Redmond, Washington"};
public IObservable<string> HeadlineFeed
{
get { return headlineFeed; }
}
public NewsHeadlineFeed(string name)
{
feedName = name;
//*****************************************************************************************//
//*** Using the Generate operator to generate a continous stream of headline that occur ***//
//*** randomly within 5 seconds. ***//
//*****************************************************************************************//
headlineFeed = Observable.Generate(RandNewsEvent(),
evt => true,
evt => RandNewsEvent(),
evt => { Thread.Sleep(rand.Next(3000)); return evt; },
Scheduler.ThreadPool);
}
//****************************************************************//
//*** Some very simple formatting of the headline event string ***//
//****************************************************************//
private string RandNewsEvent()
{
return "Feedname : " + feedName + "\nHeadline : " + newsEvents[rand.Next(newsEvents.Length)] +
newsLocations[rand.Next(newsLocations.Length)];
}
}
}
La sortie suivante a été générée avec l’exemple de code. Les nouveaux flux étant aléatoires, il est possible que vous deviez les exécuter plusieurs fois pour voir un titre d’actualité local.
Waiting for 10 seconds before subscribing to local news headline feed.
** Création d’un abonnement aux titres d’actualités locales au 09/05/2011 4:07:48 AM **
This subscription asks the ReplaySubject for the buffered headlines that
occurred within the last 10 seconds.
Press ENTER to exit.
********************************** [ Journal d’actualités locales ]********************************** Heure : 09/05/2011 4:07:42 AM -04:00 Feedname : Flux d’actualités de titre #2 Titre : Nous avons un gagnant de la loterie dans votre région.
********************************** [ Journal d’actualités locales ]********************************** Heure : 09/05/2011 4:07:47 -04:00 Feedname : Flux d’actualités à la une #1 Titre : Météo watch pour une tempête de neige émise dans votre région.