Dela via


Using the WCF SQL Adapter to read messages from SSB queues and submit them to BizTalk

In this post, I’m going to demonstrate how the Polling feature in the WCF SQL Adapter can be used to read messages from SQL Service Broker (SSB) queues and publish them in the BizTalk MessageBox. And to make it really simple, we’re going to do it without an orchestration.

(Note – most of the SSB portion in this post has been based on the SSB tutorial available at https://msdn2.microsoft.com/en-us/library/bb839489.aspx).

The scenario:

  1. An external source publishes messages into a SSB queue.
  2. At fixed intervals (the polling interval), the adapter checks the SSB queue to see if a message is available.
  3. If a message is available, the adapter performs the actual RECEIVE operation to remove the message from the SSB queue and publish it in BizTalk – these operations are performed within the same DTC transaction.
  4. A filter is used to send the data to a folder on the file system (using the File Adapter)

Creating the database artifacts required for the SSB conversation:

The artifacts which we need to create are:

  1. A message type, which denotes the format of the messages in the queue
  2. A contract, which denotes the conversation between a sender and a receiver (the contract also specifies the type of the message which will flow between them)
  3. The Initiator and Target queues, in which the messages are stored
  4. The Initiator and Target services, utilizing the above-mentioned queues.

I’m not going to explain the SQL statements below in detail, mainly because this post is not meant to be a tutorial on SSB – you can refer to SQL Server Books Online for more information on SSB.

USE master;
GO
ALTER DATABASE <your db name here>
SET ENABLE_BROKER;
GO
USE <your db name here>;
GO

CREATE MESSAGE TYPE
[//SqlAdapterSSBSample/RequestMessage]
VALIDATION = WELL_FORMED_XML;

CREATE CONTRACT [//SqlAdapterSSBSample/SampleContract]
([//SqlAdapterSSBSample/RequestMessage]
SENT BY INITIATOR
);

CREATE QUEUE InitiatorQueue1DB;

CREATE SERVICE
[//SqlAdapterSSBSample/InitiatorService]
ON QUEUE InitiatorQueue1DB;

CREATE QUEUE TargetQueue1DB;

CREATE SERVICE
[//SqlAdapterSSBSample/TargetService]
ON QUEUE TargetQueue1DB
([//SqlAdapterSSBSample/SampleContract]);

At this point, we’ve created the main database objects required for the SSB side of things.

Creating the BizTalk Artifacts

  1. Start the BizTalk Server 2006 Administration Console.
  2. Create a new BizTalk application – let’s call it “SSBPollingApplication”.
  3. Create a new 1-way Receive Port – call it “SqlReceivePort”. Create a new Receive Location – call it “SqlReceiveLocation”.
  4. Choose the Transport Type (in the Receive Location Configuration dialog) as “WCF-Custom”. Choose “PassThruReceive” for the Receive Pipeline.
  5. Click configure, the WCF-Custom adapter configuration dialog pops up.
  6. In the General tab, enter the URI denoting your SQL Server. The format is “mssql://servername/instancename/databasename”. For example, on my machine (since I am using the default instance of SQL Server, the uri I entered is “mssql://localhost//SSBTestDb” (my database is named “SSBTestDb”)).
  7. In the Binding Tab, choose the binding as “sqlBinding”. In the configuration properties which are displayed below, set these:
    1. pollingIntervalInSeconds = 2 (or whatever you're comfortable with)

    2. polledDataAvailableStatement = SELECT COUNT(*) FROM TargetQueue1DB WITH (NOLOCK)

    3. pollingStatement = (note, multi line statement follows):

      DECLARE @DlgHandle UNIQUEIDENTIFIER;
      DECLARE @RecvMsg XML;
      RECEIVE TOP (1)
      @DlgHandle=conversation_handle,
      @RecvMsg = CAST(message_body as XML)
      FROM TargetQueue1DB;
      IF NOT (@DlgHandle IS NULL)
      BEGIN
      END CONVERSATION @DlgHandle;
      SELECT @RecvMsg AS ReceivedMessage;
      END

    4. A brief explanation of the 2 SQL blocks above:

      1. In the polledDataAvailableStatement, we’re checking if there are any rows in the SSB queue. It is this statement which is going to execute every “pollingIntervalInSeconds” seconds. If a non-zero value is returned, the adapter interprets it to mean that data is available, and only then proceeds to execute the pollingStatement.
      2. In the pollingStatement, we are selecting (and removing – the RECEIVE syntax removes the message, while SELECT would just peek at it) the first message in the SSB queue. We’re also selecting the conversation handle for that message. Also, if we did indeed successfully pick up the message from the queue (maybe someone got to it first?), we end that specific conversation.
  8. In the Behavior tab, right click on the ServiceBehavior, and choose “Add Extension”. Add the “sqlAdapterInboundTransactionBehavior” behavior. You can control the transaction isolation level (the default is ReadCommitted). It is this transaction isolation level which will be applied to the DTC transaction spanning the BizTalk Message Box and your SQL Server (from where you’re pulling messages from the SSB queue).
  9. In the Other tab, choose None for credentials (if you want to use Integrated Security) – or specify a username + password.
  10. Click OK as many times as required to close the Receive Port configuration dialogs.
  11. Create a new static 1-way send port named “FileSendPort”. Configure the transport type as FILE, and configure the port to drop messages to a valid folder on your file system. Select PassThruTransmit as the Send pipeline. Also, click on “Filters” in the left pane. Add a filter condition:
    1. Property = BTS.ReceivePortName
    2. Value = SqlReceivePort
  12. Click OK to complete the configuration of the Send Port.

At this point, the configuration of the BizTalk application is complete. Start the application.

We’re now going to send messages to the SSB queue (I’m using SQL Server Management Studio – you could also use osql.exe). I used the following SQL block to send a message to the queue:

DECLARE @RequestMsg XML;
SELECT @RequestMsg = N'<TestMessage>Hello, World</TestMessage>';
DECLARE @DlgHandle UNIQUEIDENTIFIER;
BEGIN DIALOG @DlgHandle
FROM SERVICE
[//SqlAdapterSSBSample/InitiatorService]
TO SERVICE
N'//SqlAdapterSSBSample/TargetService'
ON CONTRACT
[//SqlAdapterSSBSample/SampleContract]
WITH ENCRYPTION = OFF;
SEND ON CONVERSATION @DlgHandle
MESSAGE TYPE
[//SqlAdapterSSBSample/RequestMessage]
(@RequestMsg);

Once you send the above message to the SSB queue, within a short while, you should see it in the folder specified in the FileSendPort. The message I see is:

<Polling xmlns=" https://schemas.microsoft.com/Sql/2008/05/Polling/" >
<PolledData>
<DataSet xmlns="
https://schemas.datacontract.org/2004/07/System.Data" >
<xs:schema id="NewDataSet" xmlns:xs="
https://www.w3.org/2001/XMLSchema" xmlns:msdata="urn:schemas-microsoft-com:xml-msdata">
<xs:element msdata:IsDataSet="true" name="NewDataSet">
<xs:complexType>
<xs:sequence>
<xs:element minOccurs="0" maxOccurs="unbounded" name="NewTable">
<xs:complexType>
<xs:sequence>
<xs:element minOccurs="0" name="ReceivedMessage" type="xs:string"/>
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:schema>
<diffgr:diffgram xmlns:diffgr="urn:schemas-microsoft-com:xml-diffgram-v1">
<NewDataSet xmlns="">
<NewTable>
<ReceivedMessage><![CDATA[<TestMessage>Hello, World</TestMessage>]]></ReceivedMessage>
</NewTable>
</NewDataSet>
</diffgr:diffgram>
</DataSet>
</PolledData>
</Polling>

As can be seen above, the adapter has returned the message in System.Data.DataSet format. You could ofcourse use an XPath query to extract only the body of the message. Here’s how -

  1. Navigate back to the SqlReceivePort configuration. Bring up the WCF-Custom configuration dialog (the dialog which contained the General tab, the Binding tab, the Behavior tab, etc).
  2. Navigate to the Messages Tab.
  3. In the “Inbound BizTalk Message Body” section
    1. Select “Path”. Enter this XPath query: /*[local-name()='Polling']/*[local-name()='PolledData']/*[local-name()='DataSet']/*[local-name()='diffgram']/*[local-name()='NewDataSet']/*[local-name()='NewTable']/*[local-name()='ReceivedMessage']
    2. For the Node Encoding, select “String”.

The data in the file should now just be:

<TestMessage>Hello, World2</TestMessage>

Comments

  • Anonymous
    December 30, 2009
    Thank you very much for the post. This was really helpful.when I poll the queue using the WCF SQL adapter, I can only get one message per poll ( even after removing the top1). So if my queue has 1000 message I have to poll 1000 times to get all the messages. Is there a workaround? Can I use "Inbound operation Type" to "Notification"? If so can you please tell me how do i configure it for the above example?Appreciate your response..
  • Anonymous
    February 24, 2010
    You can set the pollWhileDataFound to True for receive multiple messages without having to wait the polling interval.
  • Anonymous
    June 28, 2010
    Thank's for this porting.. This came in handy at the right time.. solved similar issue I had.
  • Anonymous
    July 06, 2010
    I used your instructions to create a simple BT 2009 application to simply read a SQL 2008 database and write out to an XML file via the send port. Other words, I'm not reading an SSB queue. I'm simply querying the DB table. But my BT application is not writing out files through the send port. I don't get any errors or anything that directs me to the reason for this issue!   Any ideas?  Thanks in advance.Ben BagheriBenBagheri@yahoo.comDallas, TX
  • Anonymous
    July 06, 2010
    Thanks Mustansir. Disregard my previous inquiry. I figured it out. There was a typo in my DB instance name and not sure why I was not getting any errors or warnings for it!I'm good now; the typed polling works and the integration is flowing.
  • Anonymous
    October 11, 2010
    I m getting this error even after increasing the timeouts to 24 days.The adapter "WCF-Custom" raised an error message. Details "System.ObjectDisposedException: Cannot access a disposed object.Object name: 'TransactionScope'.  at System.Transactions.TransactionScope.Complete()  at System.ServiceModel.Dispatcher.TransactionRpcFacet.ThreadLeave()  at System.ServiceModel.Dispatcher.TransactionBehavior.ClearCallContext(MessageRpc& rpc)  at System.ServiceModel.Dispatcher.ImmutableDispatchRuntime.ProcessMessage7(MessageRpc& rpc)  at System.ServiceModel.Dispatcher.MessageRpc.Process(Boolean isOperationContextSet)".