사용자 지정 대상 구성 요소 개발
적용 대상: Azure Data Factory의 SQL Server SSIS Integration Runtime
Microsoft SQL Server Integration Services를 통해 개발자는 모든 사용자 지정 데이터 원본에 연결하고 데이터를 저장할 수 있는 사용자 지정 대상 구성 요소를 작성할 수 있습니다. 사용자 지정 대상 구성 요소는 Integration Services에 포함된 기존 원본 구성 요소 중 하나를 사용하여 액세스할 수 없는 데이터 원본에 연결해야 하는 경우에 유용합니다.
대상 구성 요소에는 하나 이상의 입력과 0개의 출력이 있습니다. 디자인 타임에 연결을 만들고 구성하며 외부 데이터 원본에서 열 메타데이터를 읽습니다. 실행하는 동안 외부 데이터 원본에 연결하고 데이터 흐름의 업스트림 구성 요소에서 받은 행을 외부 데이터 원본에 추가합니다. 구성 요소를 실행하기 전에 외부 데이터 원본이 있는 경우 대상 구성 요소는 구성 요소가 받는 열의 데이터 형식이 외부 데이터 원본에 있는 열의 데이터 형식과 일치하는지 확인해야 합니다.
이 섹션에서는 대상 구성 요소를 개발하는 방법에 대한 세부 정보를 설명하고 중요한 개념을 명확히 하는 코드 예제를 제공합니다. 데이터 흐름 구성 요소 개발에 대한 일반적인 개요는 사용자 지정 데이터 흐름 구성 요소 개발을 참조 하세요.
디자인 타임
대상 구성 요소의 디자인 타임 기능을 구현하려면 외부 데이터 원본에 대한 연결을 지정하고 구성 요소가 올바르게 구성되었는지 확인하는 작업이 포함됩니다. 정의에 따라 대상 구성 요소에는 하나의 입력이 있으며 하나의 오류 출력이 있을 수 있습니다.
구성 요소 만들기
대상 구성 요소는 패키지에 정의된 개체를 사용하여 ConnectionManager 외부 데이터 원본에 연결합니다. 대상 구성 요소는 요소를 컬렉션ComponentMetaData에 추가하여 RuntimeConnectionCollection SSIS 디자이너 및 구성 요소 사용자에게 연결 관리자에 대한 요구 사항을 나타냅니다. 이 컬렉션은 두 가지 용도로 사용됩니다. 첫째, SSIS 디자이너에 대한 연결 관리자의 필요성을 보급합니다. 그런 다음 사용자가 연결 관리자를 선택하거나 만든 후 구성 요소에서 사용 중인 패키지의 연결 관리자에 대한 참조를 보유합니다. 컬렉션에 IDTSRuntimeConnection100 추가되면 고급 편집기 연결 속성 탭을 표시하여 구성 요소에서 사용할 패키지의 연결을 선택하거나 만들라는 메시지를 표시합니다.
다음 코드 샘플에서는 입력을 추가한 다음 개체를 추가하는 구현 ProvideComponentProperties 을 IDTSRuntimeConnection100 RuntimeConnectionCollection보여줍니다.
using System;
using Microsoft.SqlServer.Dts.Pipeline;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime;
namespace Microsoft.Samples.SqlServer.Dts
{
[DtsPipelineComponent(DisplayName = "Destination Component",ComponentType =ComponentType.DestinationAdapter)]
public class DestinationComponent : PipelineComponent
{
public override void ProvideComponentProperties()
{
// Reset the component.
base.RemoveAllInputsOutputsAndCustomProperties();
ComponentMetaData.RuntimeConnectionCollection.RemoveAll();
IDTSInput100 input = ComponentMetaData.InputCollection.New();
input.Name = "Input";
IDTSRuntimeConnection100 connection = ComponentMetaData.RuntimeConnectionCollection.New();
connection.Name = "ADO.net";
}
}
}
Imports System
Imports System.Data
Imports System.Data.SqlClient
Imports Microsoft.SqlServer.Dts.Pipeline
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper
Imports Microsoft.SqlServer.Dts.Runtime
Namespace Microsoft.Samples.SqlServer.Dts
<DtsPipelineComponent(DisplayName:="Destination Component", ComponentType:=ComponentType.DestinationAdapter)> _
Public Class DestinationComponent
Inherits PipelineComponent
Public Overrides Sub ProvideComponentProperties()
' Reset the component.
Me.RemoveAllInputsOutputsAndCustomProperties()
ComponentMetaData.RuntimeConnectionCollection.RemoveAll()
Dim input As IDTSInput100 = ComponentMetaData.InputCollection.New()
input.Name = "Input"
Dim connection As IDTSRuntimeConnection100 = ComponentMetaData.RuntimeConnectionCollection.New()
connection.Name = "ADO.net"
End Sub
End Class
End Namespace
외부 데이터 원본에 연결
연결이 추가된 후 메서드를 재정의 RuntimeConnectionCollectionAcquireConnections 하여 외부 데이터 원본에 대한 연결을 설정합니다. 이 메서드는 디자인 타임 및 런타임에 호출됩니다. 구성 요소에서는 런타임 연결에 지정된 연결 관리자에 대한 연결을 설정한 후 외부 데이터 원본에 대한 연결을 설정해야 합니다. 연결이 설정되면 구성 요소에서는 연결을 내부에 캐시했다가 ReleaseConnections가 호출되면 이를 해제해야 합니다. 개발자는 이 메서드를 재정의하고 구성 요소에서 설정한 연결을 해제합니다 AcquireConnections. ReleaseConnections 및 AcquireConnections 메서드는 모두 디자인 타임과 런타임에 호출됩니다.
다음 코드 예제에서는 메서드의 ADO.NET 연결에 AcquireConnections 연결한 다음 연결을 닫는 구성 요소를 보여 줍니다 ReleaseConnections.
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
private SqlConnection sqlConnection;
public override void AcquireConnections(object transaction)
{
if (ComponentMetaData.RuntimeConnectionCollection[0].ConnectionManager != null)
{
ConnectionManager cm = Microsoft.SqlServer.Dts.Runtime.DtsConvert.GetWrapper(ComponentMetaData.RuntimeConnectionCollection[0].ConnectionManager);
ConnectionManagerAdoNet cmado = cm.InnerObject as ConnectionManagerAdoNet;
if (cmado == null)
throw new Exception("The ConnectionManager " + cm.Name + " is not an ADO.NET connection.");
sqlConnection = cmado.AcquireConnection(transaction) as SqlConnection;
sqlConnection.Open();
}
}
public override void ReleaseConnections()
{
if (sqlConnection != null && sqlConnection.State != ConnectionState.Closed)
sqlConnection.Close();
}
Imports Microsoft.SqlServer.Dts.Runtime.Wrapper
Private sqlConnection As SqlConnection
Public Overrides Sub AcquireConnections(ByVal transaction As Object)
If IsNothing(ComponentMetaData.RuntimeConnectionCollection(0).ConnectionManager) = False Then
Dim cm As ConnectionManager = DtsConvert.GetWrapper(ComponentMetaData.RuntimeConnectionCollection(0).ConnectionManager)
Dim cmado As ConnectionManagerAdoNet = CType(cm.InnerObject,ConnectionManagerAdoNet)
If IsNothing(cmado) Then
Throw New Exception("The ConnectionManager " + cm.Name + " is not an ADO.NET connection.")
End If
sqlConnection = CType(cmado.AcquireConnection(transaction), SqlConnection)
sqlConnection.Open()
End If
End Sub
Public Overrides Sub ReleaseConnections()
If IsNothing(sqlConnection) = False And sqlConnection.State <> ConnectionState.Closed Then
sqlConnection.Close()
End If
End Sub
구성 요소 유효성 검사
대상 구성 요소 개발자는 구성 요소 유효성 검사에 설명된 대로 유효성 검사를 수행해야 합니다. 또한 구성 요소의 입력 열 컬렉션에 정의된 열의 데이터 형식 속성이 외부 데이터 원본의 열과 일치하는지 확인해야 합니다. 때때로 구성 요소 또는 SSIS 디자이너가 연결이 끊어진 상태이거나 서버로의 왕복이 허용되지 않는 경우와 같이 외부 데이터 원본에 대한 입력 열을 확인하는 것은 불가능하거나 바람직하지 않을 수 있습니다. 이러한 경우 입력 개체를 사용하여 입력 열 컬렉션의 열의 유효성을 ExternalMetadataColumnCollection 계속 검사할 수 있습니다.
이 컬렉션은 입력 및 출력 개체 둘 다에 존재하며 외부 데이터 원본의 열에서 구성 요소 개발자가 채워야 합니다. 이 컬렉션을 사용하여 SSIS 디자이너가 오프라인 상태이거나 구성 요소의 연결이 끊어지거나 속성이 false일 때 입력 열의 유효성을 ValidateExternalMetadata 검사할 수 있습니다.
다음 예제 코드에서는 기존 입력 열을 기반으로 외부 메타데이터 열을 추가합니다.
private void AddExternalMetaDataColumn(IDTSInput100 input,IDTSInputColumn100 inputColumn)
{
// Set the properties of the external metadata column.
IDTSExternalMetadataColumn100 externalColumn = input.ExternalMetadataColumnCollection.New();
externalColumn.Name = inputColumn.Name;
externalColumn.Precision = inputColumn.Precision;
externalColumn.Length = inputColumn.Length;
externalColumn.DataType = inputColumn.DataType;
externalColumn.Scale = inputColumn.Scale;
// Map the external column to the input column.
inputColumn.ExternalMetadataColumnID = externalColumn.ID;
}
Private Sub AddExternalMetaDataColumn(ByVal input As IDTSInput100, ByVal inputColumn As IDTSInputColumn100)
' Set the properties of the external metadata column.
Dim externalColumn As IDTSExternalMetadataColumn100 = input.ExternalMetadataColumnCollection.New()
externalColumn.Name = inputColumn.Name
externalColumn.Precision = inputColumn.Precision
externalColumn.Length = inputColumn.Length
externalColumn.DataType = inputColumn.DataType
externalColumn.Scale = inputColumn.Scale
' Map the external column to the input column.
inputColumn.ExternalMetadataColumnID = externalColumn.ID
End Sub
실행 시간
실행하는 동안 대상 구성 요소는 업스트림 구성 요소에서 전체 PipelineBuffer 기능을 사용할 수 있게 될 때마다 메서드에 대한 호출 ProcessInput 을 받습니다. 이 메서드는 더 이상 사용할 수 있는 버퍼가 없고 EndOfRowset 속성이 true가 될 때까지 반복적으로 호출됩니다. 이 메서드 중에 대상 구성 요소는 버퍼의 열과 행을 읽고 외부 데이터 원본에 추가합니다.
버퍼에서 열 찾기
구성 요소의 입력 버퍼에는 데이터 흐름의 구성 요소에서 업스트림으로 구성 요소의 출력 열 컬렉션에 정의된 모든 열이 포함됩니다. 예를 들어 원본 구성 요소가 출력에 세 개의 열을 제공하고 다음 구성 요소가 추가 출력 열을 추가하는 경우 대상 구성 요소에 제공된 버퍼에는 대상 구성 요소가 두 개의 열만 쓰는 경우에도 4개의 열이 포함됩니다.
입력 버퍼의 열 순서는 대상 구성 요소의 입력 열 컬렉션에 있는 열의 인덱스로 정의되지 않습니다. 열은 의 메서드BufferManager를 사용해야 FindColumnByLineageID 만 버퍼 행에 안정적으로 배치할 수 있습니다. 이 메서드는 지정된 버퍼에서 지정된 계보 ID가 있는 열을 찾아 행의 위치를 반환합니다. 입력 열의 인덱스는 일반적으로 PreExecute 메서드 실행 중에 검색되며 나중에 ProcessInput 실행 중에 사용할 수 있도록 개발자에 의해 캐시됩니다.
다음 코드 예제에서는 버 PreExecute 퍼에서 입력 열의 위치를 찾아 배열에 저장합니다. 이 배열은 이후 ProcessInput 실행 중에 버퍼의 열 값을 읽는 데 사용됩니다.
int[] cols;
public override void PreExecute()
{
IDTSInput100 input = ComponentMetaData.InputCollection[0];
cols = new int[input.InputColumnCollection.Count];
for (int x = 0; x < input.InputColumnCollection.Count; x++)
{
cols[x] = BufferManager.FindColumnByLineageID(input.Buffer, input.InputColumnCollection[x].LineageID);
}
}
Private cols As Integer()
Public Overrides Sub PreExecute()
Dim input As IDTSInput100 = ComponentMetaData.InputCollection(0)
ReDim cols(input.InputColumnCollection.Count)
For x As Integer = 0 To input.InputColumnCollection.Count
cols(x) = BufferManager.FindColumnByLineageID(input.Buffer, input.InputColumnCollection(x).LineageID)
Next x
End Sub
행 처리
버퍼에서 입력 열을 찾은 후에는 해당 입력 열을 읽고 외부 데이터 원본에 쓸 수 있습니다.
대상 구성 요소가 외부 데이터 원본에 행을 쓰는 동안 메서드를 호출 IncrementPipelinePerfCounter 하여 "행 읽기" 또는 "BLOB 바이트 읽기" 성능 카운터를 업데이트할 수 있습니다. 자세한 내용은 성능 카운터를 참조하세요.
다음 예제에서는 제공된 버퍼에서 행을 읽는 구성 요소를 보여 줍니다 ProcessInput. 버퍼에 있는 열의 인덱스는 이전 코드 예제에서 배치 PreExecute 되었습니다.
public override void ProcessInput(int inputID, PipelineBuffer buffer)
{
while (buffer.NextRow())
{
foreach (int col in cols)
{
if (!buffer.IsNull(col))
{
// TODO: Read the column data.
}
}
}
}
Public Overrides Sub ProcessInput(ByVal inputID As Integer, ByVal buffer As PipelineBuffer)
While (buffer.NextRow())
For Each col As Integer In cols
If buffer.IsNull(col) = False Then
' TODO: Read the column data.
End If
Next col
End While
End Sub
Sample
다음 샘플에서는 파일 연결 관리자를 사용하여 데이터 흐름의 이진 데이터를 파일에 저장하는 간단한 대상 구성 요소를 보여줍니다. 이 샘플에서는 이 항목에서 설명하는 모든 메서드와 기능을 보여 주지 않습니다. 모든 사용자 지정 대상 구성 요소가 재정의해야 하지만 디자인 타임 유효성 검사를 위한 코드는 포함하지 않는 중요한 메서드를 보여 줍니다.
using System;
using System.IO;
using Microsoft.SqlServer.Dts.Pipeline;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
namespace BlobDst
{
[DtsPipelineComponent(DisplayName = "BLOB Extractor Destination", Description = "Writes values of BLOB columns to files")]
public class BlobDst : PipelineComponent
{
string m_DestDir;
int m_FileNameColumnIndex = -1;
int m_BlobColumnIndex = -1;
public override void ProvideComponentProperties()
{
IDTSInput100 input = ComponentMetaData.InputCollection.New();
input.Name = "BLOB Extractor Destination Input";
input.HasSideEffects = true;
IDTSRuntimeConnection100 conn = ComponentMetaData.RuntimeConnectionCollection.New();
conn.Name = "FileConnection";
}
public override void AcquireConnections(object transaction)
{
IDTSRuntimeConnection100 conn = ComponentMetaData.RuntimeConnectionCollection[0];
m_DestDir = (string)conn.ConnectionManager.AcquireConnection(null);
if (m_DestDir.Length > 0 && m_DestDir[m_DestDir.Length - 1] != '\\')
m_DestDir += "\\";
}
public override IDTSInputColumn100 SetUsageType(int inputID, IDTSVirtualInput100 virtualInput, int lineageID, DTSUsageType usageType)
{
IDTSInputColumn100 inputColumn = base.SetUsageType(inputID, virtualInput, lineageID, usageType);
IDTSCustomProperty100 custProp;
custProp = inputColumn.CustomPropertyCollection.New();
custProp.Name = "IsFileName";
custProp.Value = (object)false;
custProp = inputColumn.CustomPropertyCollection.New();
custProp.Name = "IsBLOB";
custProp.Value = (object)false;
return inputColumn;
}
public override void PreExecute()
{
IDTSInput100 input = ComponentMetaData.InputCollection[0];
IDTSInputColumnCollection100 inputColumns = input.InputColumnCollection;
IDTSCustomProperty100 custProp;
foreach (IDTSInputColumn100 column in inputColumns)
{
custProp = column.CustomPropertyCollection["IsFileName"];
if ((bool)custProp.Value == true)
{
m_FileNameColumnIndex = (int)BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID);
}
custProp = column.CustomPropertyCollection["IsBLOB"];
if ((bool)custProp.Value == true)
{
m_BlobColumnIndex = (int)BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID);
}
}
}
public override void ProcessInput(int inputID, PipelineBuffer buffer)
{
while (buffer.NextRow())
{
string strFileName = buffer.GetString(m_FileNameColumnIndex);
int blobLength = (int)buffer.GetBlobLength(m_BlobColumnIndex);
byte[] blobData = buffer.GetBlobData(m_BlobColumnIndex, 0, blobLength);
strFileName = TranslateFileName(strFileName);
// Make sure directory exists before creating file.
FileInfo fi = new FileInfo(strFileName);
if (!fi.Directory.Exists)
fi.Directory.Create();
// Write the data to the file.
FileStream fs = new FileStream(strFileName, FileMode.Create, FileAccess.Write, FileShare.None);
fs.Write(blobData, 0, blobLength);
fs.Close();
}
}
private string TranslateFileName(string fileName)
{
if (fileName.Length > 2 && fileName[1] == ':')
return m_DestDir + fileName.Substring(3, fileName.Length - 3);
else
return m_DestDir + fileName;
}
}
}
Imports System
Imports System.IO
Imports Microsoft.SqlServer.Dts.Pipeline
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper
Namespace BlobDst
<DtsPipelineComponent(DisplayName="BLOB Extractor Destination", Description="Writes values of BLOB columns to files")> _
Public Class BlobDst
Inherits PipelineComponent
Private m_DestDir As String
Private m_FileNameColumnIndex As Integer = -1
Private m_BlobColumnIndex As Integer = -1
Public Overrides Sub ProvideComponentProperties()
Dim input As IDTSInput100 = ComponentMetaData.InputCollection.New
input.Name = "BLOB Extractor Destination Input"
input.HasSideEffects = True
Dim conn As IDTSRuntimeConnection100 = ComponentMetaData.RuntimeConnectionCollection.New
conn.Name = "FileConnection"
End Sub
Public Overrides Sub AcquireConnections(ByVal transaction As Object)
Dim conn As IDTSRuntimeConnection100 = ComponentMetaData.RuntimeConnectionCollection(0)
m_DestDir = CType(conn.ConnectionManager.AcquireConnection(Nothing), String)
If m_DestDir.Length > 0 AndAlso Not (m_DestDir(m_DestDir.Length - 1) = "\"C) Then
m_DestDir += "\"
End If
End Sub
Public Overrides Function SetUsageType(ByVal inputID As Integer, ByVal virtualInput As IDTSVirtualInput100, ByVal lineageID As Integer, ByVal usageType As DTSUsageType) As IDTSInputColumn100
Dim inputColumn As IDTSInputColumn100 = MyBase.SetUsageType(inputID, virtualInput, lineageID, usageType)
Dim custProp As IDTSCustomProperty100
custProp = inputColumn.CustomPropertyCollection.New
custProp.Name = "IsFileName"
custProp.Value = CType(False, Object)
custProp = inputColumn.CustomPropertyCollection.New
custProp.Name = "IsBLOB"
custProp.Value = CType(False, Object)
Return inputColumn
End Function
Public Overrides Sub PreExecute()
Dim input As IDTSInput100 = ComponentMetaData.InputCollection(0)
Dim inputColumns As IDTSInputColumnCollection100 = input.InputColumnCollection
Dim custProp As IDTSCustomProperty100
For Each column As IDTSInputColumn100 In inputColumns
custProp = column.CustomPropertyCollection("IsFileName")
If CType(custProp.Value, Boolean) = True Then
m_FileNameColumnIndex = CType(BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID), Integer)
End If
custProp = column.CustomPropertyCollection("IsBLOB")
If CType(custProp.Value, Boolean) = True Then
m_BlobColumnIndex = CType(BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID), Integer)
End If
Next
End Sub
Public Overrides Sub ProcessInput(ByVal inputID As Integer, ByVal buffer As PipelineBuffer)
While buffer.NextRow
Dim strFileName As String = buffer.GetString(m_FileNameColumnIndex)
Dim blobLength As Integer = CType(buffer.GetBlobLength(m_BlobColumnIndex), Integer)
Dim blobData As Byte() = buffer.GetBlobData(m_BlobColumnIndex, 0, blobLength)
strFileName = TranslateFileName(strFileName)
Dim fi As FileInfo = New FileInfo(strFileName)
' Make sure directory exists before creating file.
If Not fi.Directory.Exists Then
fi.Directory.Create
End If
' Write the data to the file.
Dim fs As FileStream = New FileStream(strFileName, FileMode.Create, FileAccess.Write, FileShare.None)
fs.Write(blobData, 0, blobLength)
fs.Close
End While
End Sub
Private Function TranslateFileName(ByVal fileName As String) As String
If fileName.Length > 2 AndAlso fileName(1) = ":"C Then
Return m_DestDir + fileName.Substring(3, fileName.Length - 3)
Else
Return m_DestDir + fileName
End If
End Function
End Class
End Namespace