Utilisation de sujets
Le type Objet<T> implémente À la fois IObservable<T> et IObserver<T>, dans le sens où il s’agit à la fois d’un observateur et d’un observable. Vous pouvez utiliser un objet pour abonner tous les observateurs, puis l’abonner à une source de données back-end. De cette façon, l’objet peut servir de proxy pour un groupe d’abonnés et une source. Vous pouvez utiliser des sujets pour implémenter une observable personnalisée avec mise en cache, mise en mémoire tampon et décalage temporel. En outre, vous pouvez utiliser des sujets pour diffuser des données à plusieurs abonnés.
Par défaut, les sujets n’effectuent aucune synchronisation entre les threads. Ils n’acceptent pas de planificateur, mais supposent plutôt que toutes les sérialisations et l’exactitude grammaticale sont gérées par l’appelant du sujet. Un sujet diffuse simplement à tous les observateurs abonnés dans la liste thread-safe des abonnés. Cela présente l’avantage de réduire la surcharge et d’améliorer les performances. Si, toutefois, vous souhaitez synchroniser les appels sortants aux observateurs à l’aide d’un planificateur, vous pouvez utiliser la méthode Synchronize pour ce faire.
Utilisation de sujets
Dans l’exemple suivant, nous créons un objet, nous nous abonneons à ce sujet, puis nous utilisons le même objet pour publier des valeurs sur l’observateur. Ce faisant, nous combinons la publication et l’abonnement dans la même source.
En plus d’effectuer un IObserver<T>, la méthode Subscribe a également une surcharge qui prend une action<T> pour onNext, ce qui signifie que l’action est exécutée chaque fois qu’un élément est publié. Dans notre exemple, chaque fois qu’OnNext est appelé, l’élément est écrit dans la console.
Subject<int> subject = new Subject<int>();
var subscription = subject.Subscribe(
x => Console.WriteLine("Value published: {0}", x),
() => Console.WriteLine("Sequence Completed."));
subject.OnNext(1);
subject.OnNext(2);
Console.WriteLine("Press any key to continue");
Console.ReadKey();
subject.OnCompleted();
subscription.Dispose();
L’exemple suivant illustre la nature du proxy et de la diffusion d’un objet. Nous créons d’abord une séquence source qui produit un entier toutes les 1 secondes. Nous créons ensuite un Objet et le transmettons en tant qu’observateur à la source afin qu’elle reçoive toutes les valeurs envoyées par cette séquence source. Après cela, nous créons deux autres abonnements, cette fois avec l’objet comme source. Les subSubject1
abonnements et subSubject2
reçoivent ensuite toute valeur transmise (à partir de la source) par l’objet.
var source = Observable.Interval(TimeSpan.FromSeconds(1));
Subject<long> subject = new Subject<long>();
var subSource = source.Subscribe(subject);
var subSubject1 = subject.Subscribe(
x => Console.WriteLine("Value published to observer #1: {0}", x),
() => Console.WriteLine("Sequence Completed."));
var subSubject2 = subject.Subscribe(
x => Console.WriteLine("Value published to observer #2: {0}", x),
() => Console.WriteLine("Sequence Completed."));
Console.WriteLine("Press any key to continue");
Console.ReadKey();
subject.OnCompleted();
subSubject1.Dispose();
subSubject2.Dispose();
Différents types de sujets
Le type Objet<T> dans la bibliothèque Rx est une implémentation de base de l’interface ISubject<T> (vous pouvez également implémenter l’interface ISubject<T> pour créer vos propres types de sujets). Il existe d’autres implémentations d’ISubject<T> qui offrent différentes fonctionnalités. Tous ces types stockent une partie (ou la totalité) des valeurs qui leur sont envoyées via OnNext et les diffusent à leurs observateurs. De cette façon, ils convertissent un observable chaud en un froid. Cela signifie que si vous vous abonnez à l’un de ces éléments plusieurs fois (c’est-à-dire s’abonner -> Se désabonner -> S’abonner à nouveau), au moins une de ces valeurs s’affiche à nouveau. Pour plus d’informations sur les observables à chaud et à froid, consultez la dernière section de la rubrique Création et abonnement à des séquences observables simples .
ReplaySubject stocke toutes les valeurs qu’il a publiées. Par conséquent, lorsque vous vous abonnez à celui-ci, vous recevez automatiquement un historique complet des valeurs qu’il a publiées, même si votre abonnement a pu être entré après que certaines valeurs ont été envoyées. BehaviourSubject est similaire à ReplaySubject, sauf qu’il a stocké uniquement la dernière valeur qu’il a publiée. BehaviourSubject nécessite également une valeur par défaut de type T lors de l’initialisation. Cette valeur est envoyée aux observateurs lorsqu’aucune autre valeur n’a encore été reçue par l’objet. Cela signifie que tous les abonnés recevront une valeur instantanément sur s’abonner, sauf si l’objet est déjà terminé. AsyncSubject est similaire aux sujets Replay et Behavior, mais il stocke uniquement la dernière valeur et ne la publie qu’une fois la séquence terminée. Vous pouvez utiliser le type AsyncSubject pour les situations où la source observable est à chaud et peut se terminer avant qu’un observateur puisse s’y abonner. Dans ce cas, AsyncSubject peut toujours fournir la dernière valeur et la publier pour tous les futurs abonnés.