Udostępnij za pośrednictwem


How to implement a partitioned SendBatch method for Azure Service Bus entities

Introduction

When a developer tries to use the SendBatch or SendBatchAsync methods exposed by the MessageSender, QueueClient, TopicClient, and EventHubClient classes contained in the Microsoft.ServiceBus.dll, and the batch size is greater than the maximum allowed size for a BrokeredMessage or an EventHub object (at the time of writing, the limit is 256 KB), the method call throws a MessageSizeExceededException. This library contains synchronous and asynchronous extension methods for the MessageSender, QueueClient,TopicClient, and EventHubClient classes that allow to send a batch which size is greater than the maximum allowed size for a batch. In particular, the implementation partitions the batch into one or multiple batches, each smaller than the maximum size allowed, and sends them in a loop, to respect the chronological order of the messages contained in the original batch. The code can be found here.

GitHub

The solution is also available on GitHub.

Solution

The ServiceBusExtensions library contains 4 classes:

  • MessageSenderExtensions: this class exposes the SendPartitionedBatch and SendPartitionedBatchAsync extension methods for the MessageSender class.
  • QueueClientExtensions: this class exposes the SendPartitionedBatch andSendPartitionedBatchAsync extension methods for the QueueClient class.
  • TopicClientExtensions: this class exposes the SendPartitionedBatch andSendPartitionedBatchAsync extension methods for the TopicClient class.
  • EventHubClientExtensions: this class exposes the SendPartitionedBatch andSendPartitionedBatchAsync extension methods for the EventHubClient class.

The TesterClient project contains a Console Application that can be used to test the library

ServiceBusExtensions Library

The following table contains the code of the QueueClientExtensions class. The code for the MessageSenderExtensions and TopicClientExtensions classes is very similar, so I will omit it for simplicity.

 #region Using Directives
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using Microsoft.ServiceBus.Messaging;
using System.Threading.Tasks;
#endregion

namespace Microsoft.AzureCat.ServiceBusExtensions
{
  /// <summary>
  /// This class contains extensions methods for the QueueClient class
  /// </summary>
  public static class QueueClientClientExtensions
  {
    #region Private Constants
    //*******************************
    // Formats
    //*******************************
    private const string BrokeredMessageListCannotBeNullOrEmpty = "The brokeredMessageEnumerable parameter cannot be null or empty.";
    private const string SendPartitionedBatchFormat = "[QueueClient.SendPartitionedBatch] Batch Sent: BatchSizeInBytes=[{0}] MessageCount=[{1}]";
    private const string SendPartitionedBatchAsyncFormat = "[QueueClient.SendPartitionedBatchAsync] Batch Sent: BatchSizeInBytes=[{0}] MessageCount=[{1}]";
    #endregion

    #region Public Methods
    /// <summary>
    /// Sends a set of brokered messages (for batch processing). 
    /// If the batch size is greater than the maximum batch size, 
    /// the method partitions the original batch into multiple batches, 
    /// each smaller in size than the maximum batch size.
    /// </summary>
    /// <param name="queueClient">The current QueueClient object.</param>
    /// <param name="brokeredMessageEnumerable">The collection of brokered messages to send.</param>
    /// <param name="trace">true to cause a message to be written; otherwise, false.</param>
    /// <returns>The asynchronous operation.</returns>
    public async static Task SendPartitionedBatchAsync(this QueueClient queueClient, IEnumerable<BrokeredMessage> brokeredMessageEnumerable, bool trace = false)
    {
      var brokeredMessageList = brokeredMessageEnumerable as IList<BrokeredMessage> ?? brokeredMessageEnumerable.ToList();
      if (brokeredMessageEnumerable == null || !brokeredMessageList.Any())
      {
        throw new ArgumentNullException(BrokeredMessageListCannotBeNullOrEmpty);
      }

      var batchList = new List<BrokeredMessage>();
      long batchSize = 0;

      foreach (var brokeredMessage in brokeredMessageList)
      {
        if ((batchSize + brokeredMessage.Size) > Constants.MaxBathSizeInBytes)
        {
          // Send current batch
          await queueClient.SendBatchAsync(batchList);
          Trace.WriteLineIf(trace, string.Format(SendPartitionedBatchAsyncFormat, batchSize, batchList.Count));

          // Initialize a new batch
          batchList = new List<BrokeredMessage> { brokeredMessage };
          batchSize = brokeredMessage.Size;
        }
        else
        {
          // Add the BrokeredMessage to the current batch
          batchList.Add(brokeredMessage);
          batchSize += brokeredMessage.Size;
        }
      }
      // The final batch is sent outside of the loop
      await queueClient.SendBatchAsync(batchList);
      Trace.WriteLineIf(trace, string.Format(SendPartitionedBatchAsyncFormat, batchSize, batchList.Count));
    }

    /// <summary>
    /// Sends a set of brokered messages (for batch processing). 
    /// If the batch size is greater than the maximum batch size, 
    /// the method partitions the original batch into multiple batches, 
    /// each smaller in size than the maximum batch size.
    /// </summary>
    /// <param name="queueClient">The current QueueClient object.</param>
    /// <param name="brokeredMessageEnumerable">The collection of brokered messages to send.</param>
    /// <param name="trace">true to cause a message to be written; otherwise, false.</param>
    public static void SendPartitionedBatch(this QueueClient queueClient, IEnumerable<BrokeredMessage> brokeredMessageEnumerable, bool trace = false)
    {
      var brokeredMessageList = brokeredMessageEnumerable as IList<BrokeredMessage> ?? brokeredMessageEnumerable.ToList();
      if (brokeredMessageEnumerable == null || !brokeredMessageList.Any())
      {
        throw new ArgumentNullException(BrokeredMessageListCannotBeNullOrEmpty);
      }

      var batchList = new List<BrokeredMessage>();
      long batchSize = 0;

      foreach (var brokeredMessage in brokeredMessageList)
      {
        if ((batchSize + brokeredMessage.Size) > Constants.MaxBathSizeInBytes)
        {
          // Send current batch
          queueClient.SendBatch(batchList);
          Trace.WriteLineIf(trace, string.Format(SendPartitionedBatchFormat, batchSize, batchList.Count));

          // Initialize a new batch
          batchList = new List<BrokeredMessage> { brokeredMessage };
          batchSize = brokeredMessage.Size;
        }
        else
        {
          // Add the BrokeredMessage to the current batch
          batchList.Add(brokeredMessage);
          batchSize += brokeredMessage.Size;
        }
      }
      // The final batch is sent outside of the loop
      queueClient.SendBatch(batchList);
      Trace.WriteLineIf(trace, string.Format(SendPartitionedBatchFormat, batchSize, batchList.Count));
    }
    #endregion
  }
}

The following table contains the code of the EventHubClientExtensions class. Note: all the event data sent in a batch using the EventHubClient.SendBatch or EventHubClient.SendBatchAsync methods need to have the same PartitionKey. In fact, when using one of these methods, all the event data contained in the batch are insersted in the same partition of the event hub by the Event Hub message broker. Hence, they need to share the same value in the PartitionKey property as the latter is used to determine to which partition to send event data.

 #region Copyright
//=======================================================================================
// Microsoft Azure Customer Advisory Team  
//
// This sample is supplemental to the technical guidance published on the community
// blog at https://blogs.msdn.com/b/paolos/. 
// 
// Author: Paolo Salvatori
//=======================================================================================
// Copyright © 2015 Microsoft Corporation. All rights reserved.
// 
// THIS CODE AND INFORMATION IS PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND, EITHER 
// EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE IMPLIED WARRANTIES OF 
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. YOU BEAR THE RISK OF USING IT.
//=======================================================================================
#endregion

#region Using Directives
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using Microsoft.ServiceBus.Messaging; 
using System.Threading.Tasks;
#endregion

namespace Microsoft.AzureCat.ServiceBusExtensions
{
  /// <summary>
  /// This class contains extensions methods for the EventHubClient class
  /// </summary>
  public static class EventHubClientExtensions
  {
    #region Private Constants
    //*******************************
    // Formats
    //*******************************
    private const string EventDataListCannotBeNullOrEmpty = "The eventDataEnumerable parameter cannot be null or empty.";
    private const string SendPartitionedBatchFormat = "[EventHubClient.SendPartitionedBatch] Batch Sent: BatchSizeInBytes=[{0}] MessageCount=[{1}]";
    private const string SendPartitionedBatchAsyncFormat = "[EventHubClient.SendPartitionedBatchAsync] Batch Sent: BatchSizeInBytes=[{0}] MessageCount=[{1}]";
    #endregion

    #region Public Methods
    /// <summary>
    /// Asynchronously sends a batch of event data to the same partition.
    /// All the event data in the batch need to have the same value in the Partitionkey property.
    /// If the batch size is greater than the maximum batch size, 
    /// the method partitions the original batch into multiple batches, 
    /// each smaller in size than the maximum batch size.
    /// </summary>
    /// <param name="eventHubClient">The current EventHubClient object.</param>
    /// <param name="eventDataEnumerable">An IEnumerable object containing event data instances.</param>
    /// <param name="trace">true to cause a message to be written; otherwise, false.</param>
    /// <returns>The asynchronous operation.</returns>
    public async static Task SendPartitionedBatchAsync(this EventHubClient eventHubClient, IEnumerable<EventData> eventDataEnumerable, bool trace = false)
    {
      var eventDataList = eventDataEnumerable as IList<EventData> ?? eventDataEnumerable.ToList();
      if (eventDataEnumerable == null || !eventDataList.Any())
      {
        throw new ArgumentNullException(EventDataListCannotBeNullOrEmpty);
      }

      var batchList = new List<EventData>();
      long batchSize = 0;

      foreach (var eventData in eventDataList)
      {
        if ((batchSize + eventData.SerializedSizeInBytes) > Constants.MaxBathSizeInBytes)
        {
          // Send current batch
          await eventHubClient.SendBatchAsync(batchList);
          Trace.WriteLineIf(trace, string.Format(SendPartitionedBatchAsyncFormat, batchSize, batchList.Count));

          // Initialize a new batch
          batchList = new List<EventData> { eventData };
          batchSize = eventData.SerializedSizeInBytes;
        }
        else
        {
          // Add the EventData to the current batch
          batchList.Add(eventData);
          batchSize += eventData.SerializedSizeInBytes;
        }
      }
      // The final batch is sent outside of the loop
      await eventHubClient.SendBatchAsync(batchList);
      Trace.WriteLineIf(trace, string.Format(SendPartitionedBatchAsyncFormat, batchSize, batchList.Count));
    }

    /// <summary>
    /// Asynchronously sends a batch of event data to the same partition.
    /// All the event data in the batch need to have the same value in the Partitionkey property.
    /// If the batch size is greater than the maximum batch size, 
    /// the method partitions the original batch into multiple batches, 
    /// each smaller in size than the maximum batch size.
    /// </summary>
    /// <param name="eventHubClient">The current EventHubClient object.</param>
    /// <param name="eventDataEnumerable">An IEnumerable object containing event data instances.</param>
    /// <param name="trace">true to cause a message to be written; otherwise, false.</param>
    public static void SendPartitionedBatch(this EventHubClient eventHubClient, IEnumerable<EventData> eventDataEnumerable, bool trace = false)
    {
      var eventDataList = eventDataEnumerable as IList<EventData> ?? eventDataEnumerable.ToList();
      if (eventDataEnumerable == null || !eventDataList.Any())
      {
        throw new ArgumentNullException(EventDataListCannotBeNullOrEmpty);
      }

      var batchList = new List<EventData>();
      long batchSize = 0;

      foreach (var eventData in eventDataList)
      {
        if ((batchSize + eventData.SerializedSizeInBytes) > Constants.MaxBathSizeInBytes)
        {
          // Send current batch
          eventHubClient.SendBatch(batchList);
          Trace.WriteLineIf(trace, string.Format(SendPartitionedBatchFormat, batchSize, batchList.Count));

          // Initialize a new batch
          batchList = new List<EventData> { eventData };
          batchSize = eventData.SerializedSizeInBytes;
        }
        else
        {
          // Add the EventData to the current batch
          batchList.Add(eventData);
          batchSize += eventData.SerializedSizeInBytes;
        }
      }
      // The final batch is sent outside of the loop
      eventHubClient.SendBatch(batchList);
      Trace.WriteLineIf(trace, string.Format(SendPartitionedBatchFormat, batchSize, batchList.Count));
    } 
    #endregion
  }
}




TestClient



The following picture shows the TestClient console application that can be used to test each extension method defined by the ServiceBusExtensions library.

TestClient

In the appSettings section of the configuration file you can define the following settings:

  • connectionString: the Service Bus namespace connectionstring.
  • messageSizeInBytes: the size of individual BrokeredMessage and EventData messages.
  • messageCountInBatch: the number of messages in a batch.

Upon start, the client application creates the following entities in the target Service Bus namespace, if they don't already exist.

  • batchtestqueue: this this is the queue used to test the extensions method contained in the QueueClientExtensions and MessageSenderExtensions classes.
  • batchtesttopic: this is the topic used to test the extensions method contained in the TopicClientExtensions class.
  • batcheventhub: this is the event hub used to test the extensions method contained in the EventHubClientExtensions class

Then, the user can use the menu shown by the application to select one of the tests. Each test tries to use the original SendBatchAsync method exposed by each of the essageSender, QueueClient,TopicClient, and EventHubClient classes. If the batch size is greater than the maximum allowed size, the method call will throw a MessageSizeExceededException. The SendPartitionedBatchAsync method instead will split the original batch into one or multiple batches, each smaller than the maximum allowed size, and will send them in the proper order to the target entity.

For your convenience, the following tables includes the code of the TestClient console application.

 #region Using Directives
using System;
using System.Collections.Generic;
using System.Configuration;
using System.Diagnostics;
using System.Globalization;
using System.IO;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading.Tasks;
using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;

#endregion

namespace Microsoft.AzureCat.ServiceBusExtensions.TestClient
{
  /// <summary>
  /// This class can be used to test the extensions methods defines in the ServiceBusExtensions library.
  /// </summary>
  public class Program
  {
    #region Private Constants
    //***************************
    // Configuration Parameters
    //***************************
    private const string ConnectionString = "connectionString";
    private const string MessageSizeInBytes = "messageSizeInBytes";
    private const string MessageCountInBatch = "messageCountInBatch";

    //***************************
    // Entities
    //***************************
    private const string QueueName = "batchtestqueue";
    private const string TopicName = "batchtesttopic";
    private const string SubscriptionName = "auditing";
    private const string EventHubName = "batchtesteventhub";

    //***************************
    // Default Values
    //***************************
    private const int DefaultMessageCountInBatch = 100;
    private const int DefaultMessageSizeInBytes = 16384;

    //***************************
    // Formats
    //***************************
    private const string ParameterFormat = "{0}: [{1}]";
    private const string PressKeyToExit = "Press a key to exit.";
    private const string MenuChoiceFormat = "Select a numeric key between 1 and {0}";
    private const string ConnectionStringCannotBeNull = "The Service Bus connection string has not been defined in the configuration file.";
    private const string QueueCreatedFormat = "Queue [{0}] successfully created.";
    private const string TopicCreatedFormat = "Topic [{0}] successfully created.";
    private const string SubscriptionCreatedFormat = "Subscription [{0}] successfully created.";
    private const string EventHubCreatedFormat = "Event Hub [{0}] successfully created.";
    private const string QueueAlreadyExistsFormat = "Queue [{0}] already exists.";
    private const string TopicAlreadyExistsFormat = "Topic [{0}] already exists.";
    private const string SubscriptionAlreadyExistsFormat = "Subscription [{0}] already exists.";
    private const string EventHubAlreadyExistsFormat = "Event Hub [{0}] already exists.";
    private const string CallingMessageSenderSendBatchAsync = "Calling MessageSender.SendBatchAsync...";
    private const string MessageSenderSendBatchAsyncCalled = "MessageSender.SendBatchAsync called.";
    private const string CallingMessageSenderSendPartitionedBatchAsync = "Calling MessageSender.SendPartitionedBatchAsync...";
    private const string MessageSenderSendPartitionedBatchAsyncCalled = "MessageSender.SendPartitionedBatchAsync called.";
    private const string CallingQueueClientSendBatchAsync = "Calling QueueClient.SendBatchAsync...";
    private const string QueueClientSendBatchAsyncCalled = "QueueClient.SendBatchAsync called.";
    private const string CallingQueueClientSendPartitionedBatchAsync = "Calling QueueClient.SendPartitionedBatchAsync...";
    private const string QueueClientSendPartitionedBatchAsyncCalled = "QueueClient.SendPartitionedBatchAsync called.";
    private const string CallingTopicClientSendBatchAsync = "Calling TopicClient.SendBatchAsync...";
    private const string TopicClientSendBatchAsyncCalled = "TopicClient.SendBatchAsync called.";
    private const string CallingTopicClientSendPartitionedBatchAsync = "Calling TopicClient.SendPartitionedBatchAsync...";
    private const string TopicClientSendPartitionedBatchAsyncCalled = "TopicClient.SendPartitionedBatchAsync called.";
    private const string CallingEventHubClientSendBatchAsync = "Calling EventHubClient.SendBatchAsync...";
    private const string EventHubClientSendBatchAsyncCalled = "EventHubClient.SendBatchAsync called.";
    private const string CallingEventHubClientSendPartitionedBatchAsync = "Calling EventHubClient.SendPartitionedBatchAsync...";
    private const string EventHubClientSendPartitionedBatchAsyncCalled = "EventHubClient.SendPartitionedBatchAsync called.";

    //***************************
    // Menu Items
    //***************************
    private const string MessageSenderSendPartitionedBatchAsyncTest = "MessageSender.SendPartitionedBatchAsync Test";
    private const string QueueClientSendPartitionedBatchAsyncTest = "QueueClient.SendPartitionedBatchAsync Test";
    private const string TopicClientSendPartitionedBatchAsyncTest = "TopicClient.SendPartitionedBatchAsync Test";
    private const string EventHubClientSendPartitionedBatchAsyncTest = "EventHubClient.SendPartitionedBatchAsync Test";
    private const string Exit = "Exit";
    #endregion

    #region Private Static Fields
    private static string connectionString;
    private static int messageSizeInBytes;
    private static int messageCountInBatch;
    private static MessagingFactory messagingFactory;
    private static readonly List<string> menuItemList = new List<string>
    {
      MessageSenderSendPartitionedBatchAsyncTest ,
      QueueClientSendPartitionedBatchAsyncTest,
      TopicClientSendPartitionedBatchAsyncTest,
      EventHubClientSendPartitionedBatchAsyncTest,
      Exit
    }; 
    #endregion

    #region Main Method
    public static void Main()
    {
      try
      {
        if (ReadConfiguration() && CreateEntitiesAsync().Result)
        {
          // Add ConsoleTraceListener
          Trace.Listeners.Add(new ConsoleTraceListener());

          // Create MessagingFactory object
          messagingFactory = MessagingFactory.CreateFromConnectionString(connectionString);

          int key;
          while ((key = ShowMenu()) != menuItemList.Count - 1)
          {
            switch (menuItemList[key])
            {
              case MessageSenderSendPartitionedBatchAsyncTest:
                // Test MessageSender.SendPartitionedBatchAsync method
                MessageSenderTest().Wait();
                break;
              case QueueClientSendPartitionedBatchAsyncTest:
                // Test QueueClient.SendPartitionedBatchAsync method
                QueueClientTest().Wait();
                break;
              case TopicClientSendPartitionedBatchAsyncTest:
                // Test TopicClient.SendPartitionedBatchAsync method
                TopicClientTest().Wait();
                break;
              case EventHubClientSendPartitionedBatchAsyncTest:
                // Test EventHubClient.SendPartitionedBatchAsync method
                EventHubClientTest().Wait();
                break;
              case Exit:
                break;
            }
          }
        }
      }
      catch (Exception ex)
      {
        PrintException(ex);
      }
      PrintMessage(PressKeyToExit);
      Console.ReadLine();
    } 
    #endregion

    #region Private Static Methods
    private async static Task MessageSenderTest()
    {
      // Create MessageSender object
      var messageSender = await messagingFactory.CreateMessageSenderAsync(QueueName);

      // Test MessageSender.SendBatchAsync: if the batch size is greater than the max batch size
      // the method throws a  MessageSizeExceededException
      try
      {
        PrintMessage(CallingMessageSenderSendBatchAsync);
        await messageSender.SendBatchAsync(CreateBrokeredMessageBatch());
        PrintMessage(MessageSenderSendBatchAsyncCalled);
      }
      catch (Exception ex)
      {
        PrintException(ex);
      }

      try
      {
        // Send the batch using the SendPartitionedBatchAsync method
        PrintMessage(CallingMessageSenderSendPartitionedBatchAsync);
        await messageSender.SendPartitionedBatchAsync(CreateBrokeredMessageBatch(), true);
        PrintMessage(MessageSenderSendPartitionedBatchAsyncCalled);
      }
      catch (Exception ex)
      {
        PrintException(ex);
      }
    }

    private async static Task QueueClientTest()
    {
      // Create QueueClient object
      var queueClient = messagingFactory.CreateQueueClient(QueueName);

      // Test QueueClient.SendBatchAsync: if the batch size is greater than the max batch size
      // the method throws a  MessageSizeExceededException
      try
      {
        PrintMessage(CallingQueueClientSendBatchAsync);
        await queueClient.SendBatchAsync(CreateBrokeredMessageBatch());
        PrintMessage(QueueClientSendBatchAsyncCalled);
      }
      catch (Exception ex)
      {
        PrintException(ex);
      }

      try
      {
        // Send the batch using the SendPartitionedBatchAsync method
        PrintMessage(CallingQueueClientSendPartitionedBatchAsync);
        await queueClient.SendPartitionedBatchAsync(CreateBrokeredMessageBatch(), true);
        PrintMessage(QueueClientSendPartitionedBatchAsyncCalled);
      }
      catch (Exception ex)
      {
        PrintException(ex);
      }
    }

    private async static Task TopicClientTest()
    {
      // Create TopicClient object
      var topicClient = messagingFactory.CreateTopicClient(TopicName);

      // Test TopicClient.SendBatchAsync: if the batch size is greater than the max batch size
      // the method throws a  MessageSizeExceededException
      try
      {
        PrintMessage(CallingTopicClientSendBatchAsync);
        await topicClient.SendBatchAsync(CreateBrokeredMessageBatch());
        PrintMessage(TopicClientSendBatchAsyncCalled);
      }
      catch (Exception ex)
      {
        PrintException(ex);
      }

      try
      {
        // Send the batch using the SendPartitionedBatchAsync method
        PrintMessage(CallingTopicClientSendPartitionedBatchAsync);
        await topicClient.SendPartitionedBatchAsync(CreateBrokeredMessageBatch(), true);
        PrintMessage(TopicClientSendPartitionedBatchAsyncCalled);
      }
      catch (Exception ex)
      {
        PrintException(ex);
      }
    }

    private async static Task EventHubClientTest()
    {
      // Create EventHubClient object
      var eventHubClient = EventHubClient.CreateFromConnectionString(connectionString, EventHubName);

      // Test EventHubClient.SendBatchAsync: if the batch size is greater than the max batch size
      // the method throws a  MessageSizeExceededException
      try
      {
        PrintMessage(CallingEventHubClientSendBatchAsync);
        await eventHubClient.SendBatchAsync(CreateEventDataBatch());
        PrintMessage(EventHubClientSendBatchAsyncCalled);
      }
      catch (Exception ex)
      {
        PrintException(ex);
      }

      try
      {
        // Send the batch using the SendPartitionedBatchAsync method
        PrintMessage(CallingEventHubClientSendPartitionedBatchAsync);
        await eventHubClient.SendPartitionedBatchAsync(CreateEventDataBatch(), true);
        PrintMessage(EventHubClientSendPartitionedBatchAsyncCalled);
      }
      catch (Exception ex)
      {
        PrintException(ex);
      }
    }

    private static IEnumerable<BrokeredMessage> CreateBrokeredMessageBatch()
    {
      var messageList = new List<BrokeredMessage>();
      for (var i = 0; i < messageCountInBatch; i++)
      {
        messageList.Add(new BrokeredMessage(Encoding.UTF8.GetBytes(new string('A', messageSizeInBytes))));
      }
      return messageList;
    }

    private static IEnumerable<EventData> CreateEventDataBatch()
    {
      var messageList = new List<EventData>();
      for (var i = 0; i < messageCountInBatch; i++)
      {
        // Note: the partition key in this sample is null.
        // it's mandatory that all event data in a batch have the same PartitionKey
        messageList.Add(new EventData(Encoding.UTF8.GetBytes(new string('A', messageSizeInBytes))));
      }
      return messageList;
    }

    private async static Task<bool> CreateEntitiesAsync()
    {
      try
      {
        // Create NamespaceManeger object
        var namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString);
        
        // Create test queue
        if (!await namespaceManager.QueueExistsAsync(QueueName))
        {
          await namespaceManager.CreateQueueAsync(new QueueDescription(QueueName)
          {
            EnableBatchedOperations = true,
            EnableExpress = true,
            EnablePartitioning = true,
            EnableDeadLetteringOnMessageExpiration = true
          });
          PrintMessage(string.Format(QueueCreatedFormat, QueueName));
        }
        else
        {
          PrintMessage(string.Format(QueueAlreadyExistsFormat, QueueName));
        }
        
        // Create test topic
        if (!await namespaceManager.TopicExistsAsync(TopicName))
        {
          await namespaceManager.CreateTopicAsync(new TopicDescription(TopicName)
          {
            EnableBatchedOperations = true,
            EnableExpress = true,
            EnablePartitioning = true,
          });
          PrintMessage(string.Format(TopicCreatedFormat, TopicName));
        }
        else
        {
          PrintMessage(string.Format(TopicAlreadyExistsFormat, TopicName));
        }
        
        // Create test subscription
        if (!await namespaceManager.SubscriptionExistsAsync(TopicName, SubscriptionName))
        {
          await namespaceManager.CreateSubscriptionAsync(new SubscriptionDescription(TopicName, SubscriptionName)
          {
            EnableBatchedOperations = true
          }, new RuleDescription(new TrueFilter()));
          PrintMessage(string.Format(SubscriptionCreatedFormat, SubscriptionName));
        }
        else
        {
          PrintMessage(string.Format(SubscriptionAlreadyExistsFormat, SubscriptionName));
        }

        // Create test event hub
        if (!await namespaceManager.EventHubExistsAsync(EventHubName))
        {
          await namespaceManager.CreateEventHubAsync(new EventHubDescription(EventHubName)
          {
            PartitionCount = 16,
            MessageRetentionInDays = 1
          });
          PrintMessage(string.Format(EventHubCreatedFormat, EventHubName));
        }
        else
        {
          PrintMessage(string.Format(EventHubAlreadyExistsFormat, EventHubName));
        }
      }
      catch (Exception ex)
      {
        PrintException(ex);
        return false;
      }
      return true;
    }

    private static int ShowMenu()
    {
      // Print Menu Header
      Console.ForegroundColor = ConsoleColor.Green;
      Console.Write("[");
      Console.ForegroundColor = ConsoleColor.Yellow;
      Console.Write("Menu");
      Console.ForegroundColor = ConsoleColor.Green;
      Console.WriteLine("]");
      Console.ResetColor();

      // Print Menu Items
      for (var i = 0; i < menuItemList.Count; i++)
      {
        Console.ForegroundColor = ConsoleColor.Green;
        Console.Write("[");
        Console.ForegroundColor = ConsoleColor.Yellow;
        Console.Write("{0}", i + 1);
        Console.ForegroundColor = ConsoleColor.Green;
        Console.Write("]");
        Console.ResetColor();
        Console.Write(": ");
        Console.WriteLine(menuItemList[i]);
        Console.ResetColor();
      }

      // Select an option
      Console.ForegroundColor = ConsoleColor.Green;
      Console.Write("[");
      Console.ForegroundColor = ConsoleColor.Yellow;
      Console.Write(MenuChoiceFormat, menuItemList.Count);
      Console.ForegroundColor = ConsoleColor.Green;
      Console.Write("]");
      Console.ResetColor();
      Console.Write(": ");

      var key = 'a';
      while (key < '1' || key > '9')
      {
        key = Console.ReadKey(true).KeyChar;
      }
      Console.WriteLine();
      return key - '1';
    }

    private static bool ReadConfiguration()
    {
      try
      {
        // Set window size
        Console.SetWindowSize(120, 40);

        // Read connectionString setting
        connectionString = ConfigurationManager.AppSettings[ConnectionString];
        if (string.IsNullOrWhiteSpace(connectionString))
        {
          throw new ArgumentException(ConnectionStringCannotBeNull);
        }
        PrintMessage(string.Format(ParameterFormat, ConnectionString, connectionString));

        // Read messageSizeInBytes setting
        int value;
        var setting = ConfigurationManager.AppSettings[MessageSizeInBytes];
        messageSizeInBytes = int.TryParse(setting, out value) ?
                value :
                DefaultMessageSizeInBytes;
        PrintMessage(string.Format(ParameterFormat, MessageSizeInBytes, messageSizeInBytes));

        // Read messageCountInBatch setting
        setting = ConfigurationManager.AppSettings[MessageCountInBatch];
        messageCountInBatch = int.TryParse(setting, out value) ?
                value :
                DefaultMessageCountInBatch;
        PrintMessage(string.Format(ParameterFormat, MessageCountInBatch, messageCountInBatch));
        return true;
      }
      catch (Exception ex)
      {
        PrintException(ex);
      }
      return false;
    }

    private static void PrintMessage(string message, [CallerMemberName] string memberName = "")
    {
      if (string.IsNullOrWhiteSpace(message) || string.IsNullOrWhiteSpace(memberName))
      {
        return;
      }
      Console.ForegroundColor = ConsoleColor.Green;
      Console.Write("[");
      Console.ForegroundColor = ConsoleColor.Yellow;
      Console.Write(memberName);
      Console.ForegroundColor = ConsoleColor.Green;
      Console.Write("]");
      Console.ResetColor();
      Console.Write(": ");
      Console.WriteLine(message);
    }

    private static void PrintException(Exception ex,
                       [CallerFilePath] string sourceFilePath = "",
                       [CallerMemberName] string memberName = "",
                       [CallerLineNumber] int sourceLineNumber = 0)
    {
      Console.ForegroundColor = ConsoleColor.Green;
      Console.Write("[");
      Console.ForegroundColor = ConsoleColor.Yellow;
      string fileName = null;
      if (File.Exists(sourceFilePath))
      {
        var file = new FileInfo(sourceFilePath);
        fileName = file.Name;
      }
      Console.Write(string.IsNullOrWhiteSpace(fileName) ? "Unknown" : fileName);
      Console.ForegroundColor = ConsoleColor.Green;
      Console.Write(":");
      Console.ForegroundColor = ConsoleColor.Yellow;
      Console.Write(string.IsNullOrWhiteSpace(memberName) ? "Unknown" : memberName);
      Console.ForegroundColor = ConsoleColor.Green;
      Console.Write(":");
      Console.ForegroundColor = ConsoleColor.Yellow;
      Console.Write(sourceLineNumber.ToString(CultureInfo.InvariantCulture));
      Console.ForegroundColor = ConsoleColor.Green;
      Console.Write("]");
      Console.ResetColor();
      Console.Write(": ");
      Console.WriteLine(ex != null && !string.IsNullOrWhiteSpace(ex.Message) ? ex.Message : "An error occurred.");
      var aggregateException = ex as AggregateException;
      if (aggregateException == null)
      {
        return;
      }
      foreach (var exception in aggregateException.InnerExceptions)
      {
        PrintException(exception);
      }
    }
    #endregion
  }
}

Comments

  • Anonymous
    October 14, 2015
    SerializedSizeInBytes property doesn't seems to be accurate because sum of SerializedSizeInBytes = 261757 but I'm getting this kind of error: Error: Microsoft.ServiceBus.Messaging.MessageSizeExceededException: The received message (delivery-id:0, size:266841 bytes) exceeds the limit (262144 bytes) currently allowed on the link.   at Microsoft.ServiceBus.Common.AsyncResult.End[TAsyncResult](IAsyncResult result)   at Microsoft.ServiceBus.Messaging.EventHubClient.SendBatch(IEnumerable`1 eventDataList)

  • Anonymous
    October 14, 2015
    Hi Vladimir Try the version from GitHub. The original calculation didn't take into account the size of custom properties. This should be fixed on GitHub. If not, you can make the fix. :) Ciao Paolo

  • Anonymous
    October 21, 2015
    Hi,  I see you are not providing the partition key when you create the events. In the scenario of sending the events in batches, is it possible to do not set the partition key? What exactly happens on EH when events sent in the same batch have the partition key set to null? Do they will be stored in the same partition? Thanks.

  • Anonymous
    October 21, 2015
    Ciao Andrea As I said in the article,  all the events sent in a batch using the EventHubClient.SendBatch or EventHubClient.SendBatchAsync methods need to have the same PartitionKey. Hence, the value of the PartitionKey property needs to be equal for all the events in the batch. This includes the null value. Hope this helps! Ciao Paolo

  • Anonymous
    October 21, 2015
    Ciao Paolo, I read the article, I was wondering what exactly happens inside EventHub when all events inside a batch have all null partitionkeys. In that case, can you confirm that all events in the same batch will be stored in the same partition and that the partition itself is selected in round robin as explained in the documentation? ref: azure.microsoft.com/.../event-hubs-programming-guide ("If you do not provide a value for PartitionKey, sent events are distributed to partitions using a round-robin model") Thanks, Andrea

  • Anonymous
    October 21, 2015
    Yep, confirmed, you perfectly described the actual behavior. Ciao Paolo