Delen via


Message Priority II

Message Priority

Part Deux

As I promised last week, here’s the code to demonstrate Service Broker priorities. First a disclaimer. I generally try to make my sample code complete enough so it can serve as the basis of a real app but in this case I decided simplicity was more important so I went the other way and produced sample code that while it will run, is pretty much useless without a significant effort to complete it. These sample just return messages as result sets so they would have to be wrapped in some more code – either in a stored procedure or an external application that actually processes the messages. If I can figure out how to attach a file, I’ll put the code in a zip file to save some typing.

Priority Dialogs

The first sample uses a high priority dialog and a low priority dialog. These dialogs are associated with different contracts that have the same messages but different names so it’s easy to tell whether a dialog is high or low priority. This isn’t necessary but it makes the code more clear.

The first bit of code just sets up the Service Broker objects we’ll need and creates to little stored procedures to send high and low priority messages so you can see how it works:

CREATE DATABASE PrioritySample

GO

USE PrioritySample

GO

---------------------------------------------------------------------

-- Create the message types we will need for the conversation

---------------------------------------------------------------------

CREATE MESSAGE TYPE [//microsoft.com/Inventory/AddItem];

CREATE MESSAGE TYPE [//microsoft.com/Inventory/ItemAdded];

---------------------------------------------------------------------

-- Create a contract for the Add Item conversation

---------------------------------------------------------------------

CREATE CONTRACT [//microsoft.com/Inventory/AddItemContract]

      ([//microsoft.com/Inventory/AddItem] SENT BY INITIATOR,

      [//microsoft.com/Inventory/ItemAdded] SENT BY TARGET);

GO

---------------------------------------------------------------------

-- Create a contract for the Add Item High Priority conversation

---------------------------------------------------------------------

CREATE CONTRACT [//microsoft.com/Inventory/AddItemContractHP]

      ([//microsoft.com/Inventory/AddItem] SENT BY INITIATOR,

      [//microsoft.com/Inventory/ItemAdded] SENT BY TARGET);

GO

---------------------------------------------------------------------

-- Create the initiator queue.

---------------------------------------------------------------------

CREATE QUEUE ManufacturingQueue;

---------------------------------------------------------------------

-- Create the Manufacturing service.

---------------------------------------------------------------------

CREATE SERVICE [//microsoft.com/ManufacturingService] ON QUEUE ManufacturingQueue

----------------------------------------------------------------------

-- Create the Inventory Queue which will be the target of

-- the conversations.

----------------------------------------------------------------------

CREATE QUEUE InventoryQueue;

----------------------------------------------------------------------

-- Create the Inventory Service. Because this is the Target

-- service, the contract must be specified

----------------------------------------------------------------------

CREATE SERVICE [//microsoft.com/InventoryService] ON QUEUE InventoryQueue

      ([//microsoft.com/Inventory/AddItemContract],

      [//microsoft.com/Inventory/AddItemContractHP])

DECLARE           @conversationHandle uniqueidentifier

-- Begin LOW priority dialog

BEGIN DIALOG @conversationHandle

    FROM SERVICE [//microsoft.com/ManufacturingService]

    TO SERVICE '//microsoft.com/InventoryService'

    ON CONTRACT [//microsoft.com/Inventory/AddItemContract]

    WITH ENCRYPTION = OFF;

-- Begin HIGH priority dialog

BEGIN DIALOG @conversationHandle

    FROM SERVICE [//microsoft.com/ManufacturingService]

    TO SERVICE '//microsoft.com/InventoryService'

    ON CONTRACT [//microsoft.com/Inventory/AddItemContractHP]

    WITH ENCRYPTION = OFF;

GO

CREATE PROCEDURE SendHighPri

AS

DECLARE           @message_body varchar(MAX)

DECLARE     @HPDialog uniqueidentifier

-- Find the High Priority dialog handle

SELECT @HPDialog = conversation_handle FROM sys.conversation_endpoints E

      JOIN sys.service_contracts C

      ON E.service_contract_id = C.service_contract_id

      WHERE C.name = '//microsoft.com/Inventory/AddItemContractHP'

            AND E.is_initiator = 1;

-- Dummy message body - all our parts will be the same

SET @message_body = 'HIGH Priority Message'

BEGIN TRANSACTION;

-- Begin a dialog to the Hello World Service

-- Send message

SEND ON CONVERSATION @HPDialog

      MESSAGE TYPE [//microsoft.com/Inventory/AddItem] (@message_body);

COMMIT;

GO

CREATE PROCEDURE SendLowPri

AS

DECLARE           @message_body varchar(MAX)

DECLARE           @LPDialog uniqueidentifier

-- Find the High Priority dialog handle

SELECT @LPDialog = conversation_handle FROM sys.conversation_endpoints E

      JOIN sys.service_contracts C

      ON E.service_contract_id = C.service_contract_id

      WHERE C.name = '//microsoft.com/Inventory/AddItemContract'

            AND E.is_initiator = 1;

-- Dummy message body - all our parts will be the same

SET @message_body = 'LOW Priority Message'

BEGIN TRANSACTION;

-- Send message

SEND ON CONVERSATION @LPDialog

      MESSAGE TYPE [//microsoft.com/Inventory/AddItem] (@message_body);

COMMIT;

GO

There are a few things worth noting here. First, the high priority dialog and the low priority dialog are created once and never ended. This makes it easy to determine which is which. I also have only two which means the receive process will be single threaded. In most cases you will want to add more dialogs for efficiency. Adding more low priority dialogs is easy because I never receive from a particular low-priority dialog so you can add as many as you want without changing the code in the service. Adding more high priority dialogs is harder because the service receives all the available messages from the high priority dialog before receiving a low priority message. There would need to be a separate copy of the receive service for each high priority dialog to make this work. On the other hand, if high priority messages are rare (as they should be if the messages are really high priority) then a single high priority dialog may be sufficient. Here’s the logic to receive the messages in priority order:

CREATE PROCEDURE ReceivePriorityDialog

AS

declare           @HPdialog uniqueidentifier

-- Get the High Priority dialog handle

SELECT @HPDialog = conversation_handle

      FROM sys.conversation_endpoints E

      JOIN sys.service_contracts C

      ON E.service_contract_id = C.service_contract_id

      WHERE C.name = '//microsoft.com/Inventory/AddItemContractHP'

            AND E.is_initiator = 0;

WHILE (1 = 1)

BEGIN

      BEGIN TRANSACTION;

      -- Receive any High Priority messages in the queue

      RECEIVE CAST(message_body AS varchar(MAX)) AS MSG,

            conversation_handle

            FROM InventoryQueue

            WHERE conversation_handle = @HPDialog;

      -- Receive the next available message high or low priority

      -- This prevents blocking waiting for a low priority message

      WAITFOR (

            RECEIVE top(1) -- only process one low priority message at a time

                  CAST(message_body AS varchar(MAX)) MSG,

                  conversation_handle

                  FROM InventoryQueue

      ), TIMEOUT 10000

      -- If we didn't get anything, the queue is empty so bail out

      if (@@ROWCOUNT = 0)

            BEGIN

                  ROLLBACK TRANSACTION

                  BREAK

            END

      COMMIT TRANSACTION

END -- while

 

Note that after receiving all the high priority messages, only one message is received. This isn’t very efficient but it makes sure that high priority messages are handled after every low priority message. Also notice that there is no WAITFOR clause with the high priority receive because I want to drop through to process the low priority messages with no delay. It’s safe to WAITFOR messages in the second statement because either a high or low priority message will wake up the receive statement.

It’s almost impossible to run the two send procedure fast enough to see priority differences so the easiest way to see this working is to use the two send procedures to put a nice mixture of high and low priority messages on the queue and then run the ReceivePriorityDialog procedure to process the queue. You should see a single result set with all the high priority messages followed by a result set for each low priority message.

Priority Queues

Another alternative implementation is to use a high priority and low priority queue instead of different dialogs on the same queue. This introduces more queues to keep track of but can make the processing logic simpler. Let’s start with the basic Service Broker objects and send procedures again:

CREATE DATABASE PrioritySample2

GO

USE PrioritySample2

GO

---------------------------------------------------------------------

-- Create the message types we will need for the conversation

---------------------------------------------------------------------

CREATE MESSAGE TYPE [//microsoft.com/Inventory/AddItem];

CREATE MESSAGE TYPE [//microsoft.com/Inventory/ItemAdded];

---------------------------------------------------------------------

-- Create a contract for the Add Item conversation

---------------------------------------------------------------------

CREATE CONTRACT [//microsoft.com/Inventory/AddItemContract]

      ([//microsoft.com/Inventory/AddItem] SENT BY INITIATOR,

      [//microsoft.com/Inventory/ItemAdded] SENT BY TARGET);

GO

---------------------------------------------------------------------

-- Create the initiator queue.

---------------------------------------------------------------------

CREATE QUEUE ManufacturingQueue;

---------------------------------------------------------------------

-- Create the Manufacturing service.

---------------------------------------------------------------------

CREATE SERVICE [//microsoft.com/ManufacturingService] ON QUEUE ManufacturingQueue

----------------------------------------------------------------------

-- Create the Inventory Queues which will be the target of

-- the conversations.

----------------------------------------------------------------------

CREATE QUEUE InventoryQueueLP;

CREATE QUEUE InventoryQueueHP;

----------------------------------------------------------------------

-- Create the Inventory Service. Because this is the Target

-- service, the contract must be specified

----------------------------------------------------------------------

CREATE SERVICE [//microsoft.com/InventoryServiceLP]

      ON QUEUE InventoryQueueLP

      ([//microsoft.com/Inventory/AddItemContract])

CREATE SERVICE [//microsoft.com/InventoryServiceHP]

      ON QUEUE InventoryQueueHP

      ([//microsoft.com/Inventory/AddItemContract])

GO

CREATE PROCEDURE SendHighPriQ

AS

DECLARE           @message_body varchar(MAX)

DECLARE           @conversationHandle uniqueidentifier

BEGIN DIALOG @conversationHandle

    FROM SERVICE [//microsoft.com/ManufacturingService]

    TO SERVICE '//microsoft.com/InventoryServiceHP'

    ON CONTRACT [//microsoft.com/Inventory/AddItemContract]

    WITH ENCRYPTION = OFF, LIFETIME = 3600;

SET @message_body = 'HIGH Priority Message'

BEGIN TRANSACTION;

-- Send message

SEND ON CONVERSATION @conversationHandle

      MESSAGE TYPE [//microsoft.com/Inventory/AddItem] (@message_body);

COMMIT;

GO

CREATE PROCEDURE SendLowPriQ

AS

DECLARE           @message_body varchar(MAX)

DECLARE           @conversationHandle uniqueidentifier

BEGIN DIALOG @conversationHandle

    FROM SERVICE [//microsoft.com/ManufacturingService]

    TO SERVICE '//microsoft.com/InventoryServiceLP'

    ON CONTRACT [//microsoft.com/Inventory/AddItemContract]

    WITH ENCRYPTION = OFF, LIFETIME = 3600;

SET @message_body = 'LOW Priority Message'

BEGIN TRANSACTION;

-- Send message

SEND ON CONVERSATION @conversationHandle

      MESSAGE TYPE [//microsoft.com/Inventory/AddItem] (@message_body);

COMMIT;

GO

Notice that this time I start a new dialog for every message. Notice especially that I never end the dialogs. DON’T DO THIS IN YOUR CODE. Those dialogs will be around forever. I get rid of them in the example code by dropping the database but that’s not very practical in a production system.

I have two alternatives for the receive logic in this example. The first one is basically the same as the dialog sample so you can see how to do the same thing in this architecture:

CREATE PROCEDURE ReceivePriorityQueues

AS

declare           @HPdialog uniqueidentifier

WHILE (1 = 1)

BEGIN

      -- Receive any High Priority messages in the HP queue

      WHILE (1 = 1) -- Must loop because HP messages can

                              -- be in any number of conversation groups

      BEGIN

            BEGIN TRANSACTION;

            WAITFOR (

                  RECEIVE CAST(message_body AS varchar(MAX)) AS MSG,

                        conversation_handle

                        FROM InventoryQueueHP

            ), TIMEOUT 10; -- This 10 ms is here to keep this

                                    -- from being a tight loop

            if (@@ROWCOUNT = 0)

                  BEGIN

                        ROLLBACK TRANSACTION

                        BREAK

                  END

            COMMIT TRANSACTION

      END;

      -- Receive the next available message from low priority queue

            RECEIVE

                  CAST(message_body AS varchar(MAX)) MSG,

                  conversation_handle

                  FROM InventoryQueueLP

END -- while

There are a couple things you should notice here. First is that there’s now a WAITFOR clause on the high priority receive. This is there to prevent a tight loop of continuous receives when the queue is empty. The big disadvantage here is that there’s a 10 ms delay before processing a low priority message. If there are more than a few low priority messages a second, this will be unacceptable. We can wait in at the low priority receive as we did in the first example because this receive is in a different queue so a high priority message won’t wake up the receive. This example probably doesn’t make sense unless the message traffic is pretty light.

A better way to handle high and low priority queues is to receive from both of them separately. This doesn’t absolutely enforce priority as the other examples did because it’s possible for one procedure to be processing a low priority message while the other one is processing a high priority message. On the other hand, if the reason for having priority is just to ensure that high priority messages don’t get backed up behind low priority messages, then this architecture will satisfy that requirement while ensuring that low priority messages get processed also. This is also this simplest implementation:

CREATE PROCEDURE ReceiveHPQueue

AS

declare           @HPdialog uniqueidentifier

WHILE (1 = 1)

BEGIN

      -- Receive any High Priority messages in the HP queue

      BEGIN TRANSACTION;

      WAITFOR (

            RECEIVE CAST(message_body AS varchar(MAX)) AS MSG,

                  conversation_handle

                  FROM InventoryQueueHP

      ), TIMEOUT 1000;

      if (@@ROWCOUNT = 0)

            BEGIN

                  ROLLBACK TRANSACTION

                  BREAK

            END

      COMMIT TRANSACTION

END -- while

GO

CREATE PROCEDURE ReceiveLPQueue

AS

declare           @HPdialog uniqueidentifier

WHILE (1 = 1)

BEGIN

      -- Receive any Low Priority messages in the LP queue

      BEGIN TRANSACTION;

      WAITFOR (

            RECEIVE CAST(message_body AS varchar(MAX)) AS MSG,

                  conversation_handle

                  FROM InventoryQueueLP

      ), TIMEOUT 1000;

      if (@@ROWCOUNT = 0)

            BEGIN

                  ROLLBACK TRANSACTION

                  BREAK

            END

      COMMIT TRANSACTION

END -- while

GO

ALTER QUEUE InventoryQueueLP

      WITH ACTIVATION (

            STATUS = ON,

            PROCEDURE_NAME = ReceiveLPQueue ,

            MAX_QUEUE_READERS = 1,

            EXECUTE AS SELF

      )

ALTER QUEUE InventoryQueueHP

      WITH ACTIVATION (

            STATUS = ON,

            PROCEDURE_NAME = ReceiveHPQueue ,

            MAX_QUEUE_READERS = 10,

            EXECUTE AS SELF

      )

Notice that this time I configured activation to ensure that the message processing procedure would start as required. I used the MAX_QUEUE_READERS parameter to ensure that the high priority queue would have more resources available than the low priority queue. These setting will have to be adjusted to satisfy the loads of the high and low priority queues.

In general, the last implementation is the one I would normally recommend unless you don’t want low priority messages to be processed at all when high priority messages are on the queue. The first two queue implementation has some problems and I wouldn’t normally use it unless there were a lot more high priority messages than low priority messages (and that by itself probably indicates priority abuse). The other thing to keep in mind when using one of the first two alternatives is that they both are open to a starvation condition if high priority messages are arriving fast enough that no low priority messages ever get processed. This might be what you want to happen but you should think through the implications of doing it this way.

Well, that’s what I know about priority. These examples could be extended to more than two priority levels but the more levels you have the more messy they become. The key thing to remember is that each alternative will provide slightly different behavior so it’s important to understand what behavior your application requires before you choose an alternative. While these examples used Service Broker, the same general principals can be used in any queued messaging system that doesn’t natively support priority.

If there’s enough interest in this topic, I’ll expand it into a white paper or article. Let me know what you think.

Comments