Freigeben über


CCR tips and tricks - part 19

When you want to throttle execution based on some resource you can do this easily with CCR and also get the benefit of being able to add and remove resources as needed. For example you may want to throttle some computation to two instances at any given time, but under heavy load you want to double the number of computations and then as load reduces also reduce the number of concurrent computations. To do this you can use what I call the resource port pattern. It uses a joined receiver and a resource port to control how many concurrent handlers there will be.

   1: [TestMethod]
  2: public void BasedOnResources()
  3: {
  4:     var resourcePort = new Port<int>();
  5:     var argumentPort = new Port<int>();
  6:     var resultPort = new Port<int>();
  7:     Arbiter.Activate(
  8:         dispatcherQueue,
  9:         Arbiter.JoinedReceive(
 10:             true,
 11:             resourcePort,
 12:             argumentPort,
 13:             (resource, argument) =>
 14:                 {
 15:                     try
 16:                     {
 17:                         resultPort.Post(resource + argument);
 18:                     }
 19:                     finally
 20:                     {
 21:                         resourcePort.Post(resource);
 22:                     }
 23:                 }));
 24:     resourcePort.Post(42);
 25:     for (int i = 0; i < 1000; i++)
 26:     {
 27:         argumentPort.Post(i);
 28:     }
 29:     resourcePort.Post(4711);
 30:     var mre = new ManualResetEvent(false);
 31:     Arbiter.Activate(dispatcherQueue, resultPort.Receive(_ => mre.Set()));
 32:     Assert.IsTrue(mre.WaitOne(TimeSpan.FromSeconds(3)));
 33:     mre.Reset();
 34:     Arbiter.Activate(dispatcherQueue, resourcePort.Receive(CcrServiceBase.EmptyHandler));
 35:     Arbiter.Activate(dispatcherQueue, resourcePort.Receive(_ => mre.Set()));
 36:     Assert.IsTrue(mre.WaitOne(TimeSpan.FromSeconds(3)));
 37:     Assert.AreNotEqual(1000, resultPort.ItemCount);
 38:     Assert.AreNotEqual(0, resultPort.ItemCount);
 39:     Assert.AreNotEqual(0, argumentPort.ItemCount);
 40: }

First notice that the joined receive handler always repost the resource when finished (line 21). Then notice how we add a second resource (line 29). Last the resource port is drained to completely stop execution of the joined handler (lines 34 and 35). Execution of the joined receiver could be resumed by posting to the resource port again.