Freigeben über


A simple feedback loop in Rx

Feedback loops are very common and incredibly useful. Unsurprisingly, my colleagues and I keep finding these loops in distributed systems. I thought I’d blog about a simple pattern to express them in Rx. Many thanks to Valery Mizonov, Erik Meijer, and Colin Meek for a fruitful discussion about this topic.

Consider a data source emitting a strictly monotonic sequence of natural numbers. Assume the data source can also filter out odd or even numbers. We would like to stimulate the data source to flip the filter based on a condition detected by a data consumer. We need a mechanism to enable communication contrary to the stream flow. For this purpose we will use a BehaviorSubject.

A BehaviorSubject is a type of subject that remembers the last event it has received. One can also declare an initial state, such as

  1: BehaviorSubject<bool> behaviorSubject = new BehaviorSubject<bool>(true);

We will use the last state of the subject as the gating logic that decides whether to produce odd or even numbers. Here is a sample observable source implementing this behavior:

  1: var data = from x in Observable.Interval(TimeSpan.FromSeconds(0.5)).Take(100)
  2:            where (x % 2 == 0) == behaviorSubject.First()
  3:            select x;

Last, we would like to express the logic that sends an event to the subject as a result of a condition in the consumer. Consider, for example, flipping from odd numbers to even numbers every time the consumer receives 10 events. Here is an example implementation of this logic:

  1: var printer = data
  2:     .Do(Console.WriteLine)
  3:     .Select((_, count) => count)
  4:     .Where(count => (count + 1) % 10 == 0)
  5:     .Do(_ => behaviorSubject.OnNext(!behaviorSubject.First()));

Notice the use of .Do() to side-effect (or “T”) console output, and the call to OnNext()on the BehaviorSubject.

Two practices to notice: (1) We exclusively use the subject to relay the state change from the consumer to the producer, and (2) we use side-effects to conduct this communication.