WCF 채널 모델을 사용하여 Oracle Database에서 폴링 기반 데이터 변경 메시지 받기
Oracle 데이터베이스용 Microsoft BizTalk 어댑터를 구성하여 Oracle 데이터베이스 테이블 또는 뷰에서 데이터 변경 내용을 폴링할 수 있습니다. 이러한 폴링 작업을 수행하기 위해 어댑터는 Oracle 테이블 또는 뷰에 대해 SQL 쿼리를 주기적으로 실행한 다음 선택적 PL/SQL 코드 블록을 실행합니다. 그런 다음 SQL 쿼리의 결과는 Oracle 데이터베이스 어댑터에서 코드에 인바운드 POLLINGSTMT 작업에서 강력한 형식의 결과 집합으로 반환됩니다. Oracle 데이터베이스 어댑터를 사용하여 Oracle 데이터베이스에서 폴링을 구성하고 수행하는 데 사용되는 메커니즘에 대한 자세한 내용은 Oracle 데이터베이스 어 댑터에서 폴링 기반 데이터 변경 메시지 받기를 참조하세요. 계속하기 전에 이 항목을 읽는 것이 좋습니다.
OracleDBBinding의 instance 바인딩 속성을 설정하여 Oracle 데이터베이스 어댑터를 폴링하고 Oracle 데이터베이스 테이블 또는 뷰로 구성합니다. 그런 다음 WCF 채널 모델에서 이 바인딩을 사용하여 IInputChannel 채널을 가져와 어댑터에서 POLLINGSTMT 작업을 받을 수 있는 채널 수신기를 빌드합니다.
WCF에서 IInputChannel 을 사용하여 작업을 수신하는 방법에 대한 개요는 서비스 Channel-Level 프로그래밍을 참조하세요.
이 항목의 섹션에서는 WCF 채널 모델을 사용하여 Oracle 데이터베이스 테이블 및 뷰에 대한 폴링을 수행하는 데 도움이 되는 정보를 제공합니다.
POLLINGSTMT 요청 메시지 사용
어댑터는 코드에서 POLLINGSTMT 작업을 호출하여 Oracle 데이터베이스를 폴링합니다. 즉, 어댑터는 IInputChannel 채널 셰이프를 통해 수신하는 POLLINGSTMT 요청 메시지를 보냅니다. POLLINGSTMT 요청 메시지에는 PollingStatement 바인딩 속성에 지정된 쿼리의 결과 집합이 포함됩니다. 다음 두 가지 방법 중 하나로 POLLINGSTMT 메시지를 사용할 수 있습니다.
노드 값 스트리밍을 사용하여 메시지를 사용하려면 응답 메시지에서 WriteBodyContents 메서드를 호출하고 노드 값 스트리밍을 구현하는 XmlDictionaryWriter 를 전달해야 합니다.
노드 스트리밍을 사용하여 메시지를 사용하려면 응답 메시지에서 GetReaderAtBodyContents 를 호출하여 XmlReader를 가져올 수 있습니다.
일반적으로 노드 값 스트리밍을 사용하여 Oracle LOB 데이터 열을 포함하는 결과 집합을 사용합니다.
POLLINGSTMT 작업의 메시지 구조에 대한 자세한 내용은 폴링 작업에 대한 메시지 스키마를 참조하세요.
Oracle 데이터베이스 어댑터가 LOB 데이터 스트리밍을 지원하는 방법에 대한 자세한 내용은 Oracle Database 어댑터에서 큰 개체 데이터 형식 스트리밍을 참조하세요.
LOB 데이터의 엔드 투 엔드 스트리밍을 지원하기 위해 코드에서 노드 값 스트리밍을 구현하는 방법에 대한 자세한 내용은 WCF 채널 모델을 사용하여 Oracle 데이터베이스 LOB 데이터 형식 스트리밍을 참조하세요.
이 항목에 사용된 예제 정보
이 항목의 예제에서는 SCOTT을 사용합니다. ACCOUNTACTIVITY 테이블 및 SCOTT. ACCOUNT_PKG. PROCESS_ACTIVITY 함수입니다. 이러한 아티팩트 생성 스크립트는 샘플과 함께 제공됩니다. 이 예제에서는 다음 작업을 수행합니다.
폴링 문의 일부로 ACCOUNTACTIVITY 테이블에서 모든 레코드를 선택하고 콘솔에 표시합니다.
설문 조사 후 문의 일부로 이 예제에서는 ACCOUNTACTIVITY 테이블에서 ACTIVITYHISTORY 테이블로 모든 레코드를 이동하는 PROCESS_ACTIVITY 함수를 호출합니다.
ACCOUNTACTIVITY 테이블에 대한 후속 설문 조사는 레코드를 반환하지 않습니다. 그러나 예제에서 폴링 작업의 일부로 더 많은 레코드를 반환하려면 ACCOUNTACTIVITY 테이블에 일부 레코드를 삽입해야 합니다. 샘플과 함께 제공되는 more_activity_data.sql 스크립트를 실행하여 수행할 수 있습니다.
샘플에 대한 자세한 내용은 어댑터 샘플을 참조하세요.
IInputChannel을 사용하여 Oracle 데이터베이스를 폴링하려면 어떻게 하나요?
WCF 채널 모델을 사용하여 데이터 변경 메시지를 수신하도록 Oracle 데이터베이스 테이블 또는 뷰를 폴링하려면 다음 단계를 수행합니다.
IInputChannel을 사용하여 데이터 변경 메시지를 받으려면
Visual Studio에서 Visual C# 프로젝트를 만듭니다. 이 항목의 경우 콘솔 애플리케이션을 만듭니다.
솔루션 탐색기 , ,
Microsoft.ServiceModel.Channels
System.ServiceModel
및System.Runtime.Serialization
에 대한 참조를Microsoft.Adapters.OracleDB
추가합니다.Program.cs 파일을 열고 다음 네임스페이스를 추가합니다.
Microsoft.Adapters.OracleDB
Microsoft.ServiceModel.Channels
System.ServiceModel
System.ServiceModel.Description
System.ServiceModel.Channels
System.Xml
System.Runtime.Serialization
System.IO
Microsoft.ServiceModel.Channels.Common
OracleDBBinding의 instance 만들고 폴링을 구성하는 데 필요한 바인딩 속성을 설정합니다. 최소한 InboundOperationType, PollingStatement 및 PollingInterval 바인딩 속성을 설정해야 합니다. 이 예제에서는 PostPollStatement 바인딩 속성도 설정합니다. 폴링을 구성하는 데 사용되는 바인딩 속성에 대한 자세한 내용은 Oracle Database 어댑터에서 폴링 기반 데이터 변경 메시지 받기를 참조하세요.
OracleDBBinding binding = new OracleDBBinding(); binding.InboundOperationType = InboundOperation.Polling; binding.PollingInterval = 30; binding.PollingStatement = "SELECT * FROM ACCOUNTACTIVITY FOR UPDATE"; binding.PostPollStatement = "BEGIN ACCOUNT_PKG.PROCESS_ACTIVITY(); END;"
바인딩 매개 변수 컬렉션을 만들고 자격 증명을 설정합니다.
ClientCredentials credentials = new ClientCredentials(); credentials.UserName.UserName = "SCOTT"; credentials.UserName.Password = "TIGER"; BindingParameterCollection bindingParams = new BindingParameterCollection(); bindingParams.Add(credentials);
채널 수신기를 만들고 엽니다. OracleDBBinding에서 BuildChannelListener<IInputChannel> 메서드를 호출하여 수신기를 만듭니다. 연결 URI에서 PollingId 속성을 설정하여 POLLINGSTMT 작업의 대상 네임스페이스를 수정할 수 있습니다. 어댑터 연결 URI에 대한 자세한 내용은 Oracle 데이터베이스 연결 URI 만들기를 참조하세요.
IChannelListener<IInputChannel> listener = binding.BuildChannelListener<IInputChannel>(connectionUri, bindingParams); listener.Open();
수신기에서 AcceptChannel 메서드를 호출하여 IInputChannel 채널을 가져와서 엽니다.
IInputChannel channel = listener.AcceptChannel(); channel.Open();
채널에서 Receive 를 호출하여 어댑터에서 다음 POLLINGSTMT 메시지를 가져옵니다.
Message message = channel.Receive();
POLLINGSTMT 작업에서 반환된 결과 집합을 사용합니다. XmlReader 또는 XmlDictionaryWriter를 사용하여 메시지를 사용할 수 있습니다.
XmlReader reader = message.GetReaderAtBodyContents();
요청 처리를 완료하면 채널을 닫습니다.
channel.Close()
중요
POLLINGSTMT 작업 처리를 완료한 후 채널을 닫아야 합니다. 채널을 닫지 못하면 코드의 동작에 영향을 줄 수 있습니다.
데이터 변경 메시지 수신이 완료되면 수신기를 닫습니다.
listener.Close()
중요
수신기를 닫으면 수신기를 사용하여 만든 채널이 닫히지 않습니다. 수신기를 사용하여 만든 각 채널을 명시적으로 닫아야 합니다.
예제
다음 예제에서는 Oracle 데이터베이스 테이블 및 뷰를 폴링하고 WCF 채널 모델을 사용하여 POLLLINGSTMT 작업을 수신하도록 Oracle 데이터베이스 어댑터를 구성하는 방법을 보여 줍니다. POLLINGSTMT 작업에서 반환된 결과 집합은 XmlReader를 사용하여 콘솔에 기록됩니다.
using System;
using System.Collections.Generic;
using System.Text;
// Add WCF, WCF LOB Adapter SDK, and Oracle Database adapter namepaces
using System.ServiceModel;
using System.ServiceModel.Description;
using Microsoft.ServiceModel.Channels;
using Microsoft.Adapters.OracleDB;
// Add this namespace for channel model
using System.ServiceModel.Channels;
using System.Xml;
using System.Runtime.Serialization;
using System.IO;
// Include this namespace for the WCF LOB Adapter SDK and Oracle exceptions
using Microsoft.ServiceModel.Channels.Common;
namespace OraclePollingCM
{
class Program
{
static void Main(string[] args)
{
Uri connectionUri = new Uri("oracleDB://ADAPTER/");
IChannelListener<IInputChannel> listener = null;
IInputChannel channel = null;
// set timeout to receive POLLINGSTMT message
TimeSpan messageTimeout = new TimeSpan(0, 0, 30);
Console.WriteLine("Sample Started");
try
{
// Create a binding: specify the InboundOperationType, PollingInterval (in seconds), the
// PollingStatement,and the PostPollStatement.
OracleDBBinding binding = new OracleDBBinding();
binding.InboundOperationType = InboundOperation.Polling;
binding.PollingInterval = 30;
binding.PollingStatement = "SELECT * FROM ACCOUNTACTIVITY FOR UPDATE";
binding.PostPollStatement = "BEGIN ACCOUNT_PKG.PROCESS_ACTIVITY(); END;";
// Create a binding parameter collection and set the credentials
ClientCredentials credentials = new ClientCredentials();
credentials.UserName.UserName = "SCOTT";
credentials.UserName.Password = "TIGER";
BindingParameterCollection bindingParams = new BindingParameterCollection();
bindingParams.Add(credentials);
Console.WriteLine("Opening listener");
// get a listener from the binding
listener = binding.BuildChannelListener<IInputChannel>(connectionUri, bindingParams);
listener.Open();
Console.WriteLine("Opening channel");
// get a channel from the listener
channel = listener.AcceptChannel();
channel.Open();
Console.WriteLine("Channel opened -- waiting for polled data");
Console.WriteLine("Receive request timeout is {0}", messageTimeout);
// Poll five times with the specified message timeout
// If a timeout occurs polling will be aborted
for (int i = 0; i < 5; i++)
{
Console.WriteLine("Polling: " + i);
Message message = null;
XmlReader reader = null;
try
{
//Message is received so process the results
message = channel.Receive(messageTimeout);
}
catch (System.TimeoutException toEx)
{
Console.WriteLine("\nNo data for request number {0}: {1}", i + 1, toEx.Message);
continue;
}
// Get the query results using an XML reader
try
{
reader = message.GetReaderAtBodyContents();
}
catch (Exception ex)
{
Console.WriteLine("Exception :" + ex);
throw;
}
// Write the TID, ACCOUNT, AMOUNT, and TRANSDATE for each record to the Console
Console.WriteLine("\nPolling data received for request number {0}", i+1);
Console.WriteLine("Tx ID\tACCOUNT\tAMOUNT\tTx DATE");
while (reader.Read())
{
if (reader.IsStartElement())
{
switch (reader.Name)
{
case "POLLINGSTMTRECORD":
Console.Write("\n");
break;
case "TID":
reader.Read();
Console.Write(reader.ReadString() + "\t");
break;
case "ACCOUNT":
reader.Read();
Console.Write(reader.ReadString() + "\t");
break;
case "AMOUNT":
reader.Read();
Console.Write(reader.ReadString() + "\t");
break;
case "TRANSDATE":
reader.Read();
Console.Write(reader.ReadString() + "\t");
break;
default:
break;
}
}
}
// return the cursor
Console.WriteLine();
// close the reader
reader.Close();
// To save the polling data to a file you can REPLACE the code above with the following
//
// XmlDocument doc = new XmlDocument();
// doc.Load(reader);
// using (XmlWriter writer = XmlWriter.Create("PollingOutput.xml"))
// {
// doc.WriteTo(writer);
// }
message.Close();
}
Console.WriteLine("\nPolling done -- hit <RETURN> to finish");
Console.ReadLine();
}
catch (TargetSystemException tex)
{
Console.WriteLine("Exception occurred on the Oracle Database");
Console.WriteLine(tex.InnerException.Message);
}
catch (ConnectionException cex)
{
Console.WriteLine("Exception occurred connecting to the Oracle Database");
Console.WriteLine(cex.InnerException.Message);
}
catch (Exception ex)
{
Console.WriteLine("Exception is: " + ex.Message);
if (ex.InnerException != null)
{
Console.WriteLine("Inner Exception is: " + ex.InnerException.Message);
}
}
finally
{
// IMPORTANT: close the channel and listener to stop polling
if (channel != null)
{
if (channel.State == CommunicationState.Opened)
channel.Close();
else
channel.Abort();
}
if (listener != null)
{
if (listener.State == CommunicationState.Opened)
listener.Close();
else
listener.Abort();
}
}
}
}
}