Fast data push tuning
A common use of service broker is the "data push" scenario in which messages are asynchronously sent to a destination such as a data warehouse for storage and processing with minimal impact on the source application. Two frequent concerns are whether service broker can handle a proposed work load, and how to "tune" a broker application so that it can achieve the required performance. Since various applications impose different constraints and are hosted within differing computing and networking configurations, there is no "one size fits all" answer.
This sample offers a means of estimating the performance of a broker application and tuning it to suit a given load and configuration. The user does this by setting several application parameters, such as message volume, message size, and processing time, as well as several internal parameters, such as number of initiator transactions, number of dialogs, etc. On the initiator, the output is the time to send the specified number of messages and the time for the messages to be transmitted to the target. On the target, the output is the time to receive and process the messages, which allows the user to obtain an estimate of whether the initiator is overrunning the target, something to avoid for a sustained high volume message load.
The sample also implements a number of recommended practices for using service broker, and can serve as an example of how to build the service broker part of a data push application. Of particular significance, it is recommended that batch messaging be done where possible. On the initiator side, this refers to sending messages on a set of "reusable" dialogs to avoid the overhead of creating a dialog per message. The dialog pool sample shows how to do this. On the target, batching refers to receiving a set of messages at a time, which can significantly improve performance.
As an illustration of the effect of reusing dialogs, data pushes of 10000 messages were performed from an initiator instance to a target. Each message was 1000 bytes. Two extreme cases were evaluated and compared. In the first case, a dialog was created for each message. In the second, all messages were sent under the same dialog. Performance was measured in terms of the time for the application to send all messages, the time for the messages to be transmitted to the target, and the time for the target to process the messages. The results show that the single dialog case is approximately ten times faster for all of these metrics.
As mentioned for the above, these are "end of the continuum" cases. There are some trade-off considerations for using more or fewer dialogs. More dialogs can ease errror handling in the case of a dialog failure, for example, since fewer messages need recovery action. "Recycling" dialogs by periodically replacing dialogs in a shared pool is another way of minimizing the impact of dialog failures. This sample and the dialog pool sample show how recycling may be implemented. One of the major reasons to use more dialogs, however, it to achieve concurrent processing on the target. This is due to the fact that a dialog is associated with a conversation group lock which allows only a single receiving procedure on the target. Thus if all messages are in the same dialog, it will serialize message reception on the target. As an illustration of how more dialogs allow more target concurrency, again using the 10000 message data push, a processing time of 10ms per message is also imposed on the target. Using ten dialogs instead of one results in halving the total processing time on the target, with no significant impact on the sending times.
The previous illustrations underscore the purpose of this sample: to allow a user to tune an application to achieve performance goals given a particular system and networking configuration.
Parameters
----------------------------------------------------
-- The data push parameters.
--
-- Application parameters:
-- message_quantity: number of messages sent.
-- message_size: size of message in bytes.
-- message_processing_time: time for target to process a message.
-- Format: 'hh:mm:ss:xxx' hh=hours, mm=minutes, ss=seconds, xxx=milliseconds
--
-- Internal parameters:
-- number_initiator_transactions: number of initiator transactions used.
-- Notes: 1. Fewer is more efficient since each transaction entails an overhead.
-- 2. Messages are actually sent when transaction commits, so sending a large
-- number of messages in a transaction can result in increased latency.
-- initiator_transaction_delay: delay time between initiator transactions.
-- Format: 'hh:mm:ss:xxx' hh=hours, mm=minutes, ss=seconds, xxx=milliseconds
-- Notes: 1. A transaction can be thought of as a burst of message_quantity /
-- number_initiator_transactions messages. This delay specifies a time
-- to wait before the next transaction is run.
-- 2. This parameter can be used to simulate message traffic distributed
-- over time.
-- number_dialogs: number of dialogs used to send messages.
-- Notes: 1. Message ordering only guaranteed with a dialog.
-- 2. Multiple dialogs allows concurrent processing on target.
-- 3. Dialog creation is expensive; dialog reuse is employed here.
-- dialog_recycle_max_messages: maximum number messages sent on a dialog before
-- recycling the dialog. Recycling is defined as ending the old dialog and
-- beginning a new one. A value of -1 indicates no recycling.
-- Notes: 1. Larger is more efficient since is minimizes the overhead of
-- creating dialogs.
-- 2. Larger can complicate dialog error processing.
-- number_target_procedures: number of activated target procedures to receive messages.
-- Notes: 1. A target proc locks all messages in a dialog when it receives first message
-- for a dialog, blocking other procs from processing these messages.
-- 2. Thus more dialogs yields increased concurrent processing. However, unless
-- dialog recycling is used, this should be set to number_dialogs, which
-- can utilize a target proc for each dialog.
-- max_messages_per_receive: maximum number of messages per target receive call.
-- Notes: 1. Larger is more efficient, but can complicate transaction error processing.
-- 2. The maximum value can be set to message_quantity / number_dialogs.
--
-- Example:
--
-- I want to send 100000 messages in sets of 10000 with a delay of 10 seconds between
-- each set. This calls for 10 transactions. Each message is 100 bytes and the target
-- message processing time is 10 ms. The messages are independent of each other, so use
-- 5 dialogs and target procedures to get some concurrent processing on the target. Allow
-- each target proc to receive 2000 messages at a time. Do not recycle dialogs.
--
-- INSERT INTO data_push_parameters
-- VALUES
-- (
-- 100000,
-- 10000,
-- '00:00:00:010',
-- 10,
-- '00:00:10:000',
-- 5,
-- -1,
-- 5,
-- 2000
-- );
--
--
Running the Sample
This sample is normally run between two server instances on different machines using Windows transport security. However, it can easily be configured to perform a "loop around" data push in the same database by skipping the indicated sections of the initiator and target setup scripts. For the two server case, it is essential that the servers are configured to enable communication protocols. In this example, we will be using TCP, so use the SQL Server Configuration Manager to make sure TCP is enabled on both servers.
Edit the Common setup script and set the desired parameters. Make sure the edits are performed identically on both servers.
Run the scripts, in order:
Common setup (both servers).
Initiator setup.
Target setup.
Initiator send. The message sending start and end times are printed.
Target monitor. The message processing time is printed.
Cleanup. (both servers).
Scripts
--------------------------------------------------------------------
-- Script for fast data push sample.
--
-- This file is part of the Microsoft SQL Server Code Samples.
-- Copyright (C) Microsoft Corporation. All Rights reserved.
-- This source code is intended only as a supplement to Microsoft
-- Development Tools and/or on-line documentation. See these other
-- materials for detailed information regarding Microsoft code samples.
--
-- THIS CODE AND INFORMATION ARE PROVIDED "AS IS" WITHOUT WARRANTY OF
-- ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO
-- THE IMPLIED WARRANTIES OF MERCHANTABILITY AND/OR FITNESS FOR A
-- PARTICULAR PURPOSE.
--------------------------------------------------------------------
----------------------------------------------------
-- Common setup for fast data push.
-- Before running, replace the configuration-dependent
-- domain_name and partner_host names.
----------------------------------------------------
USE master;
GO
---------------------------------------------------------------------
-- Create the broker endpoint using Windows authentication.
-- On a secure network, encryption may be disabled to improve speed:
-- (AUTHENTICATION = Windows, ENCRYPTION = DISABLED)
-- This step can be skipped if services are in the same database instance.
---------------------------------------------------------------------
IF EXISTS (SELECT * FROM sys.endpoints WHERE name = 'service_broker_endpoint')
DROP ENDPOINT service_broker_endpoint;
GO
CREATE ENDPOINT service_broker_endpoint
STATE = STARTED
AS TCP (LISTENER_PORT = 4022)
FOR SERVICE_BROKER (AUTHENTICATION = Windows);
GO
-- A procedure to create a Windows login and grant it endpoint connection permission.
IF EXISTS (SELECT name FROM tempdb.sys.procedures WHERE name LIKE '#usp_windows_login_for_broker_endpoint%')
DROP PROCEDURE #usp_windows_login_for_broker_endpoint;
GO
CREATE PROCEDURE #usp_windows_login_for_broker_endpoint (
@domain_name VARCHAR(100),
@login_name VARCHAR(50),
@endpoint_name VARCHAR(50))
AS
BEGIN
SET NOCOUNT ON;
DECLARE @query VARCHAR(1000);
-- Create the login.
SET @query =
'IF EXISTS (SELECT * FROM sys.syslogins WHERE name = ''' + @domain_name + '\' + @login_name + ''')
DROP LOGIN [' + @domain_name + '\' + @login_name + ']';
EXEC (@query);
SET @query = 'CREATE LOGIN [' + @domain_name + '\' + @login_name + '] FROM Windows';
EXEC (@query);
-- Grant the login connection access to the endpoint.
SET @query = 'GRANT CONNECT ON ENDPOINT::' + @endpoint_name + ' TO [' + @domain_name + '\' + @login_name + ']';
EXEC (@query);
END;
GO
-- Create a login for the partner machine (partner_host) in the
-- shared domain (domain_name) and grant it endpoint connection permission.
-- This assumes the availability of Kerberos authentication.
-- Note: the '$' is significant.
EXEC #usp_windows_login_for_broker_endpoint 'domain_name', 'partner_host$', 'service_broker_endpoint';
GO
---------------------------------------------------------------------
-- Create the data push database.
---------------------------------------------------------------------
IF EXISTS (SELECT * FROM sys.databases WHERE name = 'data_push_database')
DROP DATABASE data_push_database;
GO
CREATE DATABASE data_push_database;
GO
USE data_push_database;
GO
-- Create messages and contract.
CREATE MESSAGE TYPE data_push_message VALIDATION = NONE;
CREATE MESSAGE TYPE end_of_stream;
CREATE CONTRACT data_push_contract
(
data_push_message SENT BY INITIATOR,
end_of_stream SENT BY INITIATOR
);
----------------------------------------------------
-- The data push parameters.
--
-- Application parameters:
-- message_quantity: number of messages sent.
-- message_size: size of message in bytes.
-- message_processing_time: time for target to process a message.
-- Format: 'hh:mm:ss:xxx' hh=hours, mm=minutes, ss=seconds, xxx=milliseconds
--
-- Internal parameters:
-- number_initiator_transactions: number of initiator transactions used.
-- Notes: 1. Fewer is more efficient since each transaction entails an overhead.
-- 2. Messages are actually sent when transaction commits, so sending a large
-- number of messages in a transaction can result in increased latency.
-- initiator_transaction_delay: delay time between initiator transactions.
-- Format: 'hh:mm:ss:xxx' hh=hours, mm=minutes, ss=seconds, xxx=milliseconds
-- Notes: 1. A transaction can be thought of as a burst of message_quantity /
-- number_initiator_transactions messages. This delay specifies a time
-- to wait before the next transaction is run.
-- 2. This parameter can be used to simulate message traffic distributed
-- over time.
-- number_dialogs: number of dialogs used to send messages.
-- Notes: 1. Message ordering only guaranteed with a dialog.
-- 2. Multiple dialogs allows concurrent processing on target.
-- 3. Dialog creation is expensive; dialog reuse is employed here.
-- dialog_recycle_max_messages: maximum number messages sent on a dialog before
-- recycling the dialog. Recycling is defined as ending the old dialog and
-- beginning a new one. A value of -1 indicates no recycling.
-- Notes: 1. Larger is more efficient since is minimizes the overhead of
-- creating dialogs.
-- 2. Larger can complicate dialog error processing.
-- number_target_procedures: number of activated target procedures to receive messages.
-- Notes: 1. A target proc locks all messages in a dialog when it receives first message
-- for a dialog, blocking other procs from processing these messages.
-- 2. Thus more dialogs yields increased concurrent processing. However, unless
-- dialog recycling is used, this should be set to number_dialogs, which
-- can utilize a target proc for each dialog.
-- max_messages_per_receive: maximum number of messages per target receive call.
-- Notes: 1. Larger is more efficient, but can complicate transaction error processing.
-- 2. The maximum value can be set to message_quantity / number_dialogs.
--
-- General note: for simplicity, @message_quantity should be evenly divisible
-- by @number_initiator_transactions x @number_dialogs, since this allows a
-- constant number of messages to be sent per dialog per transaction. "Remainder"
-- messages will not be sent to the target.
--
-- Example:
--
-- I want to send 100000 messages in sets of 10000 with a delay of 10 seconds between
-- each set. This calls for 10 transactions. Each message is 100 bytes and the target
-- message processing time is 10 ms. The messages are independent of each other, so use
-- 5 dialogs and target procedures to get some concurrent processing on the target. Allow
-- each target proc to receive 2000 messages at a time. Do not recycle dialogs.
--
-- INSERT INTO data_push_parameters
-- VALUES
-- (
-- 100000,
-- 10000,
-- '00:00:00:010',
-- 10,
-- '00:00:10:000',
-- 5,
-- -1,
-- 5,
-- 2000
-- );
--
--
CREATE TABLE data_push_parameters (
message_quantity BIGINT NOT NULL,
message_size INT NOT NULL,
message_processing_time CHAR(12) NOT NULL,
number_initiator_transactions INT NOT NULL,
initiator_transaction_delay CHAR(12) NOT NULL,
number_dialogs INT NOT NULL,
dialog_recycle_max_messages BIGINT NOT NULL,
number_target_procedures INT NOT NULL,
max_messages_per_receive BIGINT NOT NULL);
GO
-- Insert parameter values.
TRUNCATE TABLE data_push_parameters;
INSERT INTO data_push_parameters
(
message_quantity,
message_size,
message_processing_time,
number_initiator_transactions,
initiator_transaction_delay,
number_dialogs,
dialog_recycle_max_messages,
number_target_procedures,
max_messages_per_receive
)
VALUES
(
10000,
1000,
'00:00:00:000',
1,
'00:00:00:000',
1,
-1,
1,
1000
);
GO
-- Check parameters.
DECLARE @message_quantity BIGINT;
DECLARE @number_initiator_transactions INT;
DECLARE @number_dialogs INT;
DECLARE @i BIGINT;
DECLARE @string VARCHAR(50);
SET @message_quantity = (SELECT message_quantity FROM data_push_parameters);
SET @number_initiator_transactions = (SELECT number_initiator_transactions FROM data_push_parameters);
SET @number_dialogs = (SELECT number_dialogs FROM data_push_parameters);
SET @i = @message_quantity / (@number_dialogs * @number_initiator_transactions);
SET @i = @i * @number_dialogs * @number_initiator_transactions;
IF @message_quantity > @i
BEGIN
SET @i = @message_quantity - @i;
SET @string = (SELECT CAST( @i AS VARCHAR(50)));
PRINT 'Warning: @message_quantity is not evenly divisible by @number_dialogs * @number_initiator_transactions';
PRINT @string + ' messages will not be sent to the target';
END;
GO
--------------------------------------------------------------------
-- Script for fast data push sample.
--
-- This file is part of the Microsoft SQL Server Code Samples.
-- Copyright (C) Microsoft Corporation. All Rights reserved.
-- This source code is intended only as a supplement to Microsoft
-- Development Tools and/or on-line documentation. See these other
-- materials for detailed information regarding Microsoft code samples.
--
-- THIS CODE AND INFORMATION ARE PROVIDED "AS IS" WITHOUT WARRANTY OF
-- ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO
-- THE IMPLIED WARRANTIES OF MERCHANTABILITY AND/OR FITNESS FOR A
-- PARTICULAR PURPOSE.
--------------------------------------------------------------------
----------------------------------------------------
-- Initiator setup for fast data push.
-- Before running, customize the configuration-dependent
-- routing to the target service.
----------------------------------------------------
USE data_push_database;
GO
-- The data push procedure: send messages to target.
CREATE PROCEDURE usp_data_push
AS
BEGIN
SET NOCOUNT ON;
-- Get initiator parameters.
DECLARE @message_quantity BIGINT;
DECLARE @message_size INT;
DECLARE @number_initiator_transactions INT;
DECLARE @initiator_transaction_delay CHAR(12);
DECLARE @number_dialogs INT;
DECLARE @dialog_recycle_max_messages BIGINT;
SET @message_quantity = (SELECT message_quantity FROM data_push_parameters);
SET @message_size = (SELECT message_size FROM data_push_parameters);
SET @number_initiator_transactions = (SELECT number_initiator_transactions FROM data_push_parameters);
SET @initiator_transaction_delay = (SELECT initiator_transaction_delay FROM data_push_parameters);
SET @number_dialogs = (SELECT number_dialogs FROM data_push_parameters);
SET @dialog_recycle_max_messages = (SELECT dialog_recycle_max_messages FROM data_push_parameters);
-- Create a message payload of the requested size.
DECLARE @payload VARCHAR(MAX);
DECLARE @char VARCHAR(MAX);
SET @char = '0';
SELECT @payload = REPLICATE(@char, @message_size);
-- Loop controls.
DECLARE @messages_per_transaction BIGINT;
DECLARE @messages_per_dialog_transaction BIGINT;
DECLARE @transaction_counter INT;
DECLARE @message_counter BIGINT;
-- Compute messages to send per dialog per transaction:
-- @message_quantity / (@number_initiator_transactions x @number_dialogs)
-- Note that integer arithmetic may result in "remainder" messages that will not
-- be sent.
SET @messages_per_transaction = @message_quantity / @number_initiator_transactions;
SET @messages_per_dialog_transaction = @messages_per_transaction / @number_dialogs;
-- Error variables.
DECLARE @error_number INT;
DECLARE @error_message VARCHAR(4000);
-- Show start time.
SELECT GETDATE() AS 'Start sending';
-- Create a table containing requested number of dialogs.
DECLARE @dialogs TABLE (idx INT, handle UNIQUEIDENTIFIER, recycle_counter BIGINT);
DECLARE @idx INT;
DECLARE @handle UNIQUEIDENTIFIER;
DECLARE @recycle_counter BIGINT;
SET @idx = 0;
WHILE @idx < @number_dialogs
BEGIN
BEGIN DIALOG CONVERSATION @handle
FROM SERVICE initiator_service
TO SERVICE 'target_service'
ON CONTRACT data_push_contract
WITH ENCRYPTION = OFF;
INSERT INTO @dialogs (idx, handle, recycle_counter) VALUES (@idx, @handle, 0);
SET @idx = @idx + 1;
END
-- Loop through transactions.
SET @transaction_counter = 0;
WHILE @transaction_counter < @number_initiator_transactions
BEGIN
BEGIN TRANSACTION;
-- Loop through dialogs.
SET @idx = 0;
WHILE @idx < @number_dialogs
BEGIN
-- Send a batch of messages for dialog.
SET @handle = (SELECT handle FROM @dialogs WHERE idx = @idx);
SET @recycle_counter = (SELECT recycle_counter FROM @dialogs WHERE idx = @idx);
SET @message_counter = 0;
WHILE @message_counter < @messages_per_dialog_transaction
BEGIN
-- Time to recycle dialog?
IF @dialog_recycle_max_messages <> -1 AND
@recycle_counter = @dialog_recycle_max_messages
BEGIN
-- Inform target to end dialog.
SEND ON CONVERSATION @handle MESSAGE TYPE end_of_stream;
-- Replace the current dialog.
BEGIN DIALOG CONVERSATION @handle
FROM SERVICE initiator_service
TO SERVICE 'target_service'
ON CONTRACT data_push_contract
WITH ENCRYPTION = OFF;
UPDATE @dialogs SET handle = @handle WHERE idx = @idx;
SET @recycle_counter = 0;
END
-- Send a message.
BEGIN TRY
BEGIN
SEND ON CONVERSATION @handle MESSAGE TYPE data_push_message (@payload);
END
IF @dialog_recycle_max_messages <> -1
BEGIN
SET @recycle_counter = @recycle_counter + 1;
END
SET @message_counter = @message_counter + 1;
END TRY
BEGIN CATCH
SET @error_number = ERROR_NUMBER();
SET @error_message = ERROR_MESSAGE();
-- Dialog is faulty?
DECLARE @dialog_error INT;
SET @dialog_error = 1;
DECLARE @dialog_state VARCHAR(2);
SET @dialog_state = (SELECT state FROM sys.conversation_endpoints
WHERE conversation_handle = @handle);
IF @@ROWCOUNT = 1
BEGIN
-- Good dialog is starting or conversing.
IF @dialog_state = 'SO' OR @dialog_state = 'CO'
BEGIN
SET @dialog_error = 0;
END
END
IF @dialog_error = 1
BEGIN
-- Record the error.
INSERT INTO initiator_processing_errors VALUES(@handle, @error_number,
@error_message, NULL, NULL, NULL, NULL, 0);
-- Replace dialog and continue sending.
BEGIN DIALOG CONVERSATION @handle
FROM SERVICE initiator_service
TO SERVICE 'target_service'
ON CONTRACT data_push_contract
WITH ENCRYPTION = OFF;
UPDATE @dialogs SET handle = @handle WHERE idx = @idx;
SET @recycle_counter = 0;
END
ELSE
BEGIN
-- Record the error and return error.
INSERT INTO initiator_processing_errors VALUES(@handle, @error_number,
@error_message, NULL, NULL, NULL, NULL, 0);
RETURN 1;
END
END CATCH
END
UPDATE @dialogs SET recycle_counter = @recycle_counter WHERE idx = @idx;
SET @idx = @idx + 1;
END
COMMIT;
SET @transaction_counter = @transaction_counter + 1;
-- Wait for next transaction.
IF @transaction_counter < @number_initiator_transactions
BEGIN
WAITFOR DELAY @initiator_transaction_delay;
END
END
-- Gracefully end dialogs by informing target.
BEGIN TRANSACTION;
SET @idx = 0;
WHILE @idx < @number_dialogs
BEGIN
SET @handle = (SELECT handle FROM @dialogs WHERE idx = @idx);
BEGIN
SEND ON CONVERSATION @handle MESSAGE TYPE end_of_stream;
END
SET @idx = @idx + 1;
END
COMMIT;
-- Show end time.
SELECT GETDATE() AS 'End sending';
RETURN 0;
END;
GO
-- Resends all pending messages in sys.transmission_queue
-- belonging to an old colversation on a new conversation.
CREATE PROCEDURE usp_resend_pending (@old_handle UNIQUEIDENTIFIER)
AS
BEGIN
SET NOCOUNT ON;
DECLARE @message_type_name SYSNAME;
DECLARE @message_body VARCHAR(MAX);
-- Get a new dialog.
DECLARE @handle UNIQUEIDENTIFIER;
BEGIN DIALOG CONVERSATION @handle
FROM SERVICE initiator_service
TO SERVICE 'target_service'
ON CONTRACT data_push_contract
WITH ENCRYPTION = OFF;
-- Declare a cursor to iterate over all the pending messages.
-- It is important to keep the message order and to keep the original message type.
DECLARE cursor_pending CURSOR LOCAL FORWARD_ONLY READ_ONLY
FOR SELECT message_type_name, message_body
FROM sys.transmission_queue
WHERE conversation_handle = @old_handle
ORDER BY message_sequence_number;
OPEN cursorPending;
FETCH NEXT FROM cursor_pending INTO @message_type_name, @message_body;
WHILE (@@FETCH_STATUS = 0)
BEGIN
-- Resend the message on the new conversation
SEND ON CONVERSATION @handle MESSAGE TYPE @message_type_name (@message_body);
FETCH NEXT FROM cursor_pending INTO @message_type_name, @message_body;
END
CLOSE cursor_pending;
DEALLOCATE cursor_pending;
-- Signal end of stream to target.
SEND ON CONVERSATION @handle MESSAGE TYPE end_of_stream;
END;
GO
-- Activated store proc for the initiator to receive messages.
-- Dialogs are gracefully ended by the target after receiving
-- an end_of_stream message from the initiator; the end dialog
-- message is then processed here. This method is recommended
-- to avoid "fire and forget" message loss. One message per
-- invocation is OK here for expected low-volume load.
CREATE PROCEDURE initiator_queue_activated_procedure
AS
BEGIN
SET NOCOUNT ON;
DECLARE @conversation_handle UNIQUEIDENTIFIER,
@message_type_name SYSNAME,
@message_body VARCHAR(MAX);
-- Error variables.
DECLARE @error_number INT;
DECLARE @error_message VARCHAR(4000);
DECLARE @error_severity INT;
DECLARE @error_state INT;
DECLARE @error_procedure SYSNAME;
DECLARE @error_line INT;
BEGIN TRY
BEGIN TRANSACTION;
-- Wait 5 seconds for a message.
WAITFOR (
RECEIVE TOP(1)
@conversation_handle = conversation_handle,
@message_type_name = message_type_name,
@message_body = message_body
FROM initiator_queue), TIMEOUT 5000;
IF @@ROWCOUNT = 1
BEGIN
IF @message_type_name = 'https://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'
BEGIN
-- Target is ending dialog normally.
END CONVERSATION @conversation_handle;
END
ELSE IF @message_type_name = 'https://schemas.microsoft.com/SQL/ServiceBroker/Error'
BEGIN
-- Record the error.
WITH XMLNAMESPACES ('https://schemas.microsoft.com/SQL/ServiceBroker/Error' AS ssb)
SELECT
@error_number = CAST(@message_body AS XML).value('(//ssb:Error/ssb:Code)[1]', 'INT'),
@error_message = CAST(@message_body AS XML).value('(//ssb:Error/ssb:Description)[1]', 'VARCHAR(4000)');
INSERT INTO initiator_processing_errors VALUES(@conversation_handle, @error_number,
@error_message, NULL, NULL, NULL, NULL, 0);
-- Can messages be resent?
IF (@error_number IN (-8489, -8462, -9719, -28052))
BEGIN
-- Resend the messages on a new dialog.
EXEC usp_resend_pending @conversation_handle;
END
ELSE
BEGIN
-- Save the messages in a side table to be processed later.
INSERT INTO unsent_messages
SELECT message_type_name, message_body FROM sys.transmission_queue
WHERE conversation_handle = @conversation_handle;
END
-- End the conversation.
END CONVERSATION @conversation_handle;
END
END
COMMIT;
END TRY
BEGIN CATCH
SET @error_number = ERROR_NUMBER();
SET @error_message = ERROR_MESSAGE();
SET @error_severity = ERROR_SEVERITY();
SET @error_state = ERROR_STATE();
SET @error_procedure = ERROR_PROCEDURE();
SET @error_line = ERROR_LINE();
IF XACT_STATE() = -1
BEGIN
-- The transaction is doomed. Only rollback possible.
-- Note: 5 consecutive rollbacks will disable the queue!
ROLLBACK TRANSACTION;
-- Record the error.
BEGIN TRANSACTION;
INSERT INTO initiator_processing_errors VALUES(NULL, @error_number, @error_message,
@error_severity, @error_state, @error_procedure, @error_line, 1);
COMMIT;
END
ELSE IF XACT_STATE() = 1
BEGIN
-- Record error and commit transaction.
INSERT INTO initiator_processing_errors VALUES(NULL, @error_number, @error_message,
@error_severity, @error_state, @error_procedure, @error_line, 0);
COMMIT;
END
END CATCH
END;
GO
-- Create the initiator queue with activated procedure.
CREATE QUEUE initiator_queue
WITH ACTIVATION (
STATUS = ON,
MAX_QUEUE_READERS = 1,
PROCEDURE_NAME = initiator_queue_activated_procedure,
EXECUTE AS OWNER);
GO
-- Create initiator service.
CREATE SERVICE initiator_service ON QUEUE initiator_queue (data_push_contract);
GO
-- Any user can send on the service.
GRANT SEND ON SERVICE::initiator_service TO PUBLIC;
GO
-- This table stores unsent messages.
IF EXISTS (SELECT name FROM sys.tables WHERE name = 'unsent_messages')
DROP TABLE unsent_messages;
GO
CREATE TABLE unsent_messages ( message_type_name SYSNAME, message_body VARCHAR(MAX) );
GO
-- Table to store processing errors.
IF EXISTS (SELECT name FROM sys.tables WHERE name = 'initiator_processing_errors')
DROP TABLE initiator_processing_errors;
GO
CREATE TABLE initiator_processing_errors (error_conversation UNIQUEIDENTIFIER, error_number INT,
error_message VARCHAR(4000), error_severity INT, error_state INT, error_procedure SYSNAME NULL,
error_line INT, doomed_transaction TINYINT)
GO
---------------------------------------------------------------------
-- Routing.
-- Skip the following if services are in the same database instance.
---------------------------------------------------------------------
-- Create a route to the target service.
CREATE ROUTE target_route
WITH SERVICE_NAME = 'target_service',
ADDRESS = 'tcp://target_host:4022';
GO
-- In msdb, create an incoming route to the initiator service.
USE msdb;
GO
CREATE ROUTE initiator_route
WITH SERVICE_NAME = 'initiator_service',
ADDRESS = 'local';
GO
USE data_push_database;
GO
--------------------------------------------------------------------
-- Script for fast data push sample.
--
-- This file is part of the Microsoft SQL Server Code Samples.
-- Copyright (C) Microsoft Corporation. All Rights reserved.
-- This source code is intended only as a supplement to Microsoft
-- Development Tools and/or on-line documentation. See these other
-- materials for detailed information regarding Microsoft code samples.
--
-- THIS CODE AND INFORMATION ARE PROVIDED "AS IS" WITHOUT WARRANTY OF
-- ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO
-- THE IMPLIED WARRANTIES OF MERCHANTABILITY AND/OR FITNESS FOR A
-- PARTICULAR PURPOSE.
--------------------------------------------------------------------
----------------------------------------------------
-- Target setup for fast data push.
-- Before running, customize the configuration-dependent
-- routing to the initiator service.
----------------------------------------------------
--------------------------------------------------------------------
-- Poison message processing:
--
-- Although this should not occur here because messages are not
-- processed in any significant way, there are times that
-- an application might find itself temporarily unable to process
-- a message. The temptation is then to roll back the receive
-- transaction and try again. The danger of doing this is that
-- 5 consecutive roll backs on a queue will disable it.
-- If a queue does become disabled, possibly due to a doomed
-- transaction, a BROKER_QUEUE_DISABLED event notification can
-- be used as a recovery mechanism. You can also use the TRY-CATCH
-- construct to process transaction errors as shown below.
--------------------------------------------------------------------
USE data_push_database;
GO
-- Activated store proc for the target to receive messages.
CREATE PROCEDURE target_queue_activated_procedure
AS
BEGIN
SET NOCOUNT ON;
-- Variable table for received messages.
DECLARE @receive_table TABLE(
queuing_order BIGINT,
conversation_handle UNIQUEIDENTIFIER,
message_type_name SYSNAME,
message_body VARCHAR(MAX));
-- Cursor for received message table.
DECLARE message_cursor CURSOR LOCAL FORWARD_ONLY READ_ONLY
FOR SELECT
conversation_handle,
message_type_name,
message_body
FROM @receive_table ORDER BY queuing_order;
DECLARE @conversation_handle UNIQUEIDENTIFIER,
@message_type_name SYSNAME,
@message_body VARCHAR(MAX);
-- Count processed messages.
DECLARE @message_counter BIGINT;
SET @message_counter = 0;
-- Error variables.
DECLARE @error_number INT;
DECLARE @error_message VARCHAR(4000);
DECLARE @error_severity INT;
DECLARE @error_state INT;
DECLARE @error_procedure SYSNAME;
DECLARE @error_line INT;
-- Get target parameters.
DECLARE @message_processing_time CHAR(12);
SET @message_processing_time = (SELECT message_processing_time FROM data_push_parameters);
DECLARE @max_messages_per_receive BIGINT;
SET @max_messages_per_receive = (SELECT max_messages_per_receive FROM data_push_parameters);
-- Receive messages for available conversation groups.
BEGIN TRY
WHILE (1=1)
BEGIN
BEGIN TRANSACTION;
-- Receive max available messages into the table.
-- Wait 5 seconds for messages.
WAITFOR (
RECEIVE TOP(@max_messages_per_receive)
queuing_order,
conversation_handle,
message_type_name,
message_body
FROM target_queue
INTO @receive_table
), TIMEOUT 5000;
IF @@ROWCOUNT = 0
BEGIN
COMMIT;
BREAK;
END
-- Process the messages.
OPEN message_cursor;
WHILE (1=1)
BEGIN
FETCH NEXT FROM message_cursor
INTO @conversation_handle,
@message_type_name,
@message_body;
IF (@@FETCH_STATUS != 0) BREAK;
-- Process a message.
-- If an exception occurs, catch and attempt to recover.
BEGIN TRY
IF @message_type_name = 'data_push_message'
BEGIN
-- Process the message for the specified amount of time.
WAITFOR DELAY @message_processing_time;
SET @message_counter = @message_counter + 1;
END
ELSE IF @message_type_name = 'end_of_stream'
BEGIN
-- Initiator is signaling end of message stream: end the dialog.
END CONVERSATION @conversation_handle;
END
ELSE IF @message_type_name = 'https://schemas.microsoft.com/SQL/ServiceBroker/Error'
BEGIN
-- If the message_type_name indicates that the message is an error,
-- record the error and end the conversation.
WITH XMLNAMESPACES ('https://schemas.microsoft.com/SQL/ServiceBroker/Error' AS ssb)
SELECT
@error_number = CAST(@message_body AS XML).value('(//ssb:Error/ssb:Code)[1]', 'INT'),
@error_message = CAST(@message_body AS XML).value('(//ssb:Error/ssb:Description)[1]', 'VARCHAR(4000)');
INSERT INTO target_processing_errors VALUES(@conversation_handle, @error_number,
@error_message, NULL, NULL, NULL, NULL, 0);
END CONVERSATION @conversation_handle;
END
END TRY
BEGIN CATCH
SET @error_number = ERROR_NUMBER();
SET @error_message = ERROR_MESSAGE();
SET @error_severity = ERROR_SEVERITY();
SET @error_state = ERROR_STATE();
SET @error_procedure = ERROR_PROCEDURE();
SET @error_line = ERROR_LINE();
IF XACT_STATE() = -1
BEGIN
-- The transaction is doomed. Only rollback possible.
-- This could disable the queue if done 5 times consecutively!
ROLLBACK TRANSACTION;
-- Record the error.
BEGIN TRANSACTION;
INSERT INTO target_processing_errors VALUES(NULL, @error_number, @error_message,
@error_severity, @error_state, @error_procedure, @error_line, 1);
COMMIT;
-- For this level of error, it is best to exit the proc
-- and give the queue monitor control.
-- Breaking to the outer catch will accomplish this.
RAISERROR ('Message processing error', 16, 1);
END
ELSE IF XACT_STATE() = 1
BEGIN
-- Record error and continue processing messages.
-- Failing message could also be put aside for later processing here.
-- Otherwise it will be discarded.
INSERT INTO target_processing_errors VALUES(NULL, @error_number, @error_message,
@error_severity, @error_state, @error_procedure, @error_line, 0);
END
END CATCH
END
CLOSE message_cursor;
DELETE @receive_table;
COMMIT;
END
END TRY
BEGIN CATCH
-- Process the error and exit the proc to give the queue monitor control
SET @error_number = ERROR_NUMBER();
SET @error_message = ERROR_MESSAGE();
SET @error_severity = ERROR_SEVERITY();
SET @error_state = ERROR_STATE();
SET @error_procedure = ERROR_PROCEDURE();
SET @error_line = ERROR_LINE();
IF XACT_STATE() = -1
BEGIN
-- The transaction is doomed. Only rollback possible.
-- This could disable the queue if done 5 times consecutively!
ROLLBACK TRANSACTION;
-- Record the error.
BEGIN TRANSACTION;
INSERT INTO target_processing_errors VALUES(NULL, @error_number, @error_message,
@error_severity, @error_state, @error_procedure, @error_line, 1);
COMMIT;
END
ELSE IF XACT_STATE() = 1
BEGIN
-- Record error and commit transaction.
-- Here you could also save anything else you want before exiting.
INSERT INTO target_processing_errors VALUES(NULL, @error_number, @error_message,
@error_severity, @error_state, @error_procedure, @error_line, 0);
COMMIT;
END
END CATCH
-- Increment processed message counter.
BEGIN TRANSACTION;
DECLARE @counter BIGINT;
SET @counter = (SELECT TOP(1) counter FROM target_message_counter);
SET @counter = @counter + @message_counter;
UPDATE target_message_counter SET counter = @counter;
COMMIT;
END;
GO
-- Get number of activated target procedures parameter.
DECLARE @number_target_procedures INT;
SET @number_target_procedures = (SELECT number_target_procedures FROM data_push_parameters);
-- Create the target queue with specified number of activated procedures.
DECLARE @query VARCHAR(500);
DECLARE @string VARCHAR(50);
SET @string = (SELECT CAST( @number_target_procedures AS VARCHAR(50)));
SET @query = 'CREATE QUEUE target_queue
WITH ACTIVATION (
STATUS = ON,
MAX_QUEUE_READERS = ' + @string + ',
PROCEDURE_NAME = target_queue_activated_procedure,
EXECUTE AS OWNER)';
EXEC (@query);
GO
-- Create target service.
CREATE SERVICE target_service ON QUEUE target_queue (data_push_contract);
GO
-- Any user can send on the service.
GRANT SEND ON SERVICE::target_service TO PUBLIC;
GO
-- Table to count processed messages.
IF EXISTS (SELECT name FROM sys.tables WHERE name = 'target_message_counter')
DROP TABLE message_counter;
GO
CREATE TABLE target_message_counter (counter BIGINT NOT NULL);
GO
INSERT INTO target_message_counter VALUES (0);
GO
-- Table to store processing errors.
IF EXISTS (SELECT name FROM sys.tables WHERE name = 'target_processing_errors')
DROP TABLE target_processing_errors;
GO
CREATE TABLE target_processing_errors (error_conversation UNIQUEIDENTIFIER, error_number INT,
error_message VARCHAR(4000), error_severity INT, error_state INT, error_procedure SYSNAME NULL,
error_line INT, doomed_transaction TINYINT)
GO
---------------------------------------------------------------------
-- Get size of a message queue.
-- Method used is faster than SQL count operator.
---------------------------------------------------------------------
CREATE PROCEDURE usp_get_queue_size ( @queue_name VARCHAR(50) )
AS
BEGIN
SELECT p.rows
FROM sys.objects AS o
JOIN sys.partitions AS p ON p.object_id = o.object_id
JOIN sys.objects AS q ON o.parent_object_id = q.object_id
WHERE q.name = @queue_name
AND p.index_id = 1;
END;
GO
---------------------------------------------------------------------
-- Routing.
-- Skip the following if services are in the same database instance.
---------------------------------------------------------------------
-- Create a route to the initiator service.
CREATE ROUTE initiator_route
WITH SERVICE_NAME = 'initiator_service',
ADDRESS = 'tcp://initiator_host:4022';
GO
-- In msdb, create an incoming route to the target service.
USE msdb;
GO
CREATE ROUTE target_route
WITH SERVICE_NAME = 'target_service',
ADDRESS = 'local';
GO
USE data_push_database;
GO
--------------------------------------------------------------------
-- Script for fast data push sample.
--
-- This file is part of the Microsoft SQL Server Code Samples.
-- Copyright (C) Microsoft Corporation. All Rights reserved.
-- This source code is intended only as a supplement to Microsoft
-- Development Tools and/or on-line documentation. See these other
-- materials for detailed information regarding Microsoft code samples.
--
-- THIS CODE AND INFORMATION ARE PROVIDED "AS IS" WITHOUT WARRANTY OF
-- ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO
-- THE IMPLIED WARRANTIES OF MERCHANTABILITY AND/OR FITNESS FOR A
-- PARTICULAR PURPOSE.
--------------------------------------------------------------------
---------------------------------------------------------------------
-- Initiator data push.
--
-- This script can also be run on multiple connections to achieve transactional
-- concurrency. Since each connection will send message_quantity messages
-- with number_initiator_transactions transactions and number_dialogs
-- dialogs, these parameter quantities should be divided by the number
-- of connections to get the same total quantities.
---------------------------------------------------------------------
USE data_push_database;
GO
-- Send the messages. Output sending time.
EXEC usp_data_push;
GO
---------------------------------------------------------------------
-- Wait for transmission queue to be empty, signifying the
-- reception and acknowledgement of all messages by the target.
-- Use this efficient checking method every 5 seconds.
---------------------------------------------------------------------
DECLARE @count BIGINT;
WHILE (1=1)
BEGIN
SET @count =
(SELECT p.rows
FROM sys.objects AS o
JOIN sys.partitions AS p ON p.object_id = o.object_id
WHERE o.name = 'sysxmitqueue');
IF (@count = 0) BREAK;
WAITFOR DELAY '00:00:05:000';
END;
SELECT GETDATE() AS 'End transmission';
GO
-- View processing errors.
SELECT * FROM initiator_processing_errors;
GO
-- View unsent messages.
SELECT * FROM unsent_messages;
GO
--------------------------------------------------------------------
-- Script for fast data push sample.
--
-- This file is part of the Microsoft SQL Server Code Samples.
-- Copyright (C) Microsoft Corporation. All Rights reserved.
-- This source code is intended only as a supplement to Microsoft
-- Development Tools and/or on-line documentation. See these other
-- materials for detailed information regarding Microsoft code samples.
--
-- THIS CODE AND INFORMATION ARE PROVIDED "AS IS" WITHOUT WARRANTY OF
-- ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO
-- THE IMPLIED WARRANTIES OF MERCHANTABILITY AND/OR FITNESS FOR A
-- PARTICULAR PURPOSE.
--------------------------------------------------------------------
----------------------------------------------------
-- Monitor target.
--
-- A count of processed messages is kept in the
-- target_message_counter table.
-- The usp_get_queue_size procedure can also be used to
-- monitor the size of the target queue.
----------------------------------------------------
USE data_push_database;
GO
-- Wait for message_quantity messages to be processed.
DECLARE @message_quantity BIGINT;
SET @message_quantity = (SELECT message_quantity FROM data_push_parameters);
DECLARE @count BIGINT;
WHILE (1=1)
BEGIN
SET @count = (SELECT TOP(1) counter FROM target_message_counter);
IF (@count >= @message_quantity) BREAK;
WAITFOR DELAY '00:00:05:000';
END;
SELECT GETDATE() AS 'Messages processed';
GO
-- View message processing errors.
SELECT * FROM target_processing_errors;
GO
-- Clear processed message counter for next run.
UPDATE target_message_counter SET counter = 0;
GO
--------------------------------------------------------------------
-- Script for data push sample.
--
-- This file is part of the Microsoft SQL Server Code Samples.
-- Copyright (C) Microsoft Corporation. All Rights reserved.
-- This source code is intended only as a supplement to Microsoft
-- Development Tools and/or on-line documentation. See these other
-- materials for detailed information regarding Microsoft code samples.
--
-- THIS CODE AND INFORMATION ARE PROVIDED "AS IS" WITHOUT WARRANTY OF
-- ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO
-- THE IMPLIED WARRANTIES OF MERCHANTABILITY AND/OR FITNESS FOR A
-- PARTICULAR PURPOSE.
--------------------------------------------------------------------
----------------------------------------------------
-- Cleanup for fast data push.
-- Before running, replace the configuration-dependent
-- domain_name and partner_host names.
----------------------------------------------------
USE master;
GO
IF EXISTS (SELECT * FROM sys.databases WHERE name = 'data_push_database')
DROP DATABASE data_push_database;
GO
USE msdb;
GO
IF EXISTS (SELECT * FROM sys.routes WHERE name = 'initiator_route')
DROP ROUTE initiator_route;
GO
IF EXISTS (SELECT * FROM sys.routes WHERE name = 'target_route')
DROP ROUTE target_route;
GO
USE master;
IF EXISTS (SELECT * FROM sys.endpoints WHERE name = 'service_broker_endpoint')
DROP ENDPOINT service_broker_endpoint;
GO
IF EXISTS (SELECT * FROM sys.syslogins WHERE name = 'domain_name\partner_host$')
DROP LOGIN [domain_name\partner_host$];
GO
Comments
Anonymous
March 08, 2009
Hi, But by using two instances (default and named) on the same server, the parameters domain and host$ how must be valued? ThanksAnonymous
March 09, 2009
You should be able to use the same domain and host$ values for both instances. Of course they cannot use the same port for their endpoints, so that must be changed.Anonymous
November 18, 2009
Ok, I will use to check the broker itself. I' m checking the solution given inhttp://blogs.msdn.com/sql_service_broker/archive/2008/07/09/real-time-data-integration-with-service-broker-and-other-sql-techniques.aspx to populate my DataMart and I did a simple test to know how big is the impact on the operational database: I compare how much time it takes to insert 500 rows before and after implementing the solution. I have a big big difference. What can explain it? Is it supposed to be like that? Please help.