사용자 지정 원본 구성 요소 개발
적용 대상: Azure Data Factory의 SQL Server SSIS Integration Runtime
SQL Server Integration Services는 개발자에게 사용자 지정 데이터 원본에 연결하고 해당 원본에서 데이터 흐름 태스크의 다른 구성 요소로 데이터를 제공할 수 있는 원본 구성 요소를 작성할 수 있는 기능을 제공합니다. 사용자 지정 원본을 만드는 기능은 기존 Integration Services 원본 중 하나를 사용하여 액세스할 수 없는 데이터 원본에 연결해야 하는 경우에 유용합니다.
원본 구성 요소에는 하나 이상의 출력과 0개의 입력이 있습니다. 디자인 타임에는 원본 구성 요소를 사용하여 연결을 만들고 구성하고, 외부 데이터 원본에서 열 메타데이터를 읽고, 외부 데이터 원본을 기반으로 원본의 출력 열을 구성합니다. 실행하는 동안 외부 데이터 원본에 연결하고 출력 버퍼에 행을 추가합니다. 그런 다음 데이터 흐름 태스크에서는 다운스트림 구성 요소에 이 데이터 행 버퍼를 제공합니다.
데이터 흐름 구성 요소 개발에 대한 일반적인 개요는 사용자 지정 데이터 흐름 구성 요소 개발을 참조 하세요.
디자인 타임
원본 구성 요소의 디자인 타임 기능을 구현하려면 외부 데이터 원본에 대한 연결을 지정하고, 데이터 원본을 반영하는 출력 열을 추가 및 구성하고, 구성 요소를 실행할 준비가 되었다는 유효성을 검사해야 합니다. 정의에 따라 원본 구성 요소에는 입력이 없으며 하나 이상의 비동기 출력이 있습니다.
구성 요소 만들기
원본 구성 요소는 패키지에 정의된 ConnectionManager 개체를 사용하여 외부 데이터 원본에 연결합니다. 원본 구성 요소에서는 RuntimeConnectionCollection 속성의 ComponentMetaData 컬렉션에 요소를 추가하여 연결 관리자가 필요함을 나타냅니다. 이 컬렉션은 구성 요소에서 사용하는 패키지의 연결 관리자에 대한 참조를 보관하고 디자이너에 연결 관리자의 필요성을 알리기 위해 두 가지 용도로 사용됩니다. 컬렉션에 IDTSRuntimeConnection100 추가된 경우 고급 편집기 사용자가 패키지에서 연결을 선택하거나 만들 수 있는 연결 속성 탭을 표시합니다.
다음 코드 예제에서는 출력을 추가하고 개체를 추가하는 구현 ProvideComponentProperties 을 IDTSRuntimeConnection100 RuntimeConnectionCollection보여줍니다.
using System;
using System.Collections;
using System.Data;
using System.Data.SqlClient;
using System.Data.OleDb;
using Microsoft.SqlServer.Dts.Runtime;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
using Microsoft.SqlServer.Dts.Pipeline;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
namespace Microsoft.Samples.SqlServer.Dts
{
[DtsPipelineComponent(DisplayName = "MySourceComponent",ComponentType = ComponentType.SourceAdapter)]
public class MyComponent : PipelineComponent
{
public override void ProvideComponentProperties()
{
// Reset the component.
base.RemoveAllInputsOutputsAndCustomProperties();
ComponentMetaData.RuntimeConnectionCollection.RemoveAll();
IDTSOutput100 output = ComponentMetaData.OutputCollection.New();
output.Name = "Output";
IDTSRuntimeConnection100 connection = ComponentMetaData.RuntimeConnectionCollection.New();
connection.Name = "ADO.NET";
}
Imports System.Data
Imports System.Data.SqlClient
Imports Microsoft.SqlServer.Dts.Runtime
Imports Microsoft.SqlServer.Dts.Runtime.Wrapper
Imports Microsoft.SqlServer.Dts.Pipeline
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper
<DtsPipelineComponent(DisplayName:="MySourceComponent", ComponentType:=ComponentType.SourceAdapter)> _
Public Class MySourceComponent
Inherits PipelineComponent
Public Overrides Sub ProvideComponentProperties()
' Allow for resetting the component.
RemoveAllInputsOutputsAndCustomProperties()
ComponentMetaData.RuntimeConnectionCollection.RemoveAll()
Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection.New()
output.Name = "Output"
Dim connection As IDTSRuntimeConnection100 = ComponentMetaData.RuntimeConnectionCollection.New()
connection.Name = "ADO.NET"
End Sub
End Class
외부 데이터 원본에 연결
RuntimeConnectionCollection에 연결을 추가한 후에는 AcquireConnections 메서드를 재정의하여 외부 데이터 원본에 대한 연결을 설정합니다. 이 메서드는 디자인 및 실행 시간 모두에 호출됩니다. 구성 요소에서는 런타임 연결에 지정된 연결 관리자에 대한 연결을 설정한 후 외부 데이터 원본에 대한 연결을 설정해야 합니다.
연결이 설정되면 구성 요소에 의해 내부적으로 캐시되고 메서드가 호출될 때 ReleaseConnections 해제되어야 합니다. ReleaseConnections 메서드는 디자인 및 실행 시 메서드와 같이 호출됩니다AcquireConnections. 개발자는 이 메서드를 재정의하고 구성 요소에서 설정한 연결을 해제합니다 AcquireConnections.
다음 코드 예제에서는 메서드에서 ADO.NET 연결에 AcquireConnections 연결 하 고 메서드에서 연결을 ReleaseConnections 닫는 구성 요소를 보여 줍니다.
private SqlConnection sqlConnection;
public override void AcquireConnections(object transaction)
{
if (ComponentMetaData.RuntimeConnectionCollection[0].ConnectionManager != null)
{
ConnectionManager cm = Microsoft.SqlServer.Dts.Runtime.DtsConvert.GetWrapper(ComponentMetaData.RuntimeConnectionCollection[0].ConnectionManager);
ConnectionManagerAdoNet cmado = cm.InnerObject as ConnectionManagerAdoNet;
if (cmado == null)
throw new Exception("The ConnectionManager " + cm.Name + " is not an ADO.NET connection.");
sqlConnection = cmado.AcquireConnection(transaction) as SqlConnection;
sqlConnection.Open();
}
}
public override void ReleaseConnections()
{
if (sqlConnection != null && sqlConnection.State != ConnectionState.Closed)
sqlConnection.Close();
}
Private sqlConnection As SqlConnection
Public Overrides Sub AcquireConnections(ByVal transaction As Object)
If Not IsNothing(ComponentMetaData.RuntimeConnectionCollection(0).ConnectionManager) Then
Dim cm As ConnectionManager = Microsoft.SqlServer.Dts.Runtime.DtsConvert.GetWrapper(ComponentMetaData.RuntimeConnectionCollection(0).ConnectionManager)
Dim cmado As ConnectionManagerAdoNet = CType(cm.InnerObject, ConnectionManagerAdoNet)
If IsNothing(cmado) Then
Throw New Exception("The ConnectionManager " + cm.Name + " is not an ADO.NET connection.")
End If
sqlConnection = CType(cmado.AcquireConnection(transaction), SqlConnection)
sqlConnection.Open()
End If
End Sub
Public Overrides Sub ReleaseConnections()
If Not IsNothing(sqlConnection) And sqlConnection.State <> ConnectionState.Closed Then
sqlConnection.Close()
End If
End Sub
출력 열 만들기 및 구성
원본 구성 요소의 출력 열은 실행 중에 구성 요소가 데이터 흐름에 추가하는 외부 데이터 원본의 열을 반영합니다. 디자인 타임에 구성 요소가 외부 데이터 원본에 연결하도록 구성된 후 출력 열을 추가합니다. 구성 요소에서 출력 컬렉션에 열을 추가하는 데 사용하는 디자인 타임 메서드는 구성 요소의 요구 사항에 따라 달라질 수 있지만 Validate 또는 AcquireConnections 실행 중에는 디자인 타임 메서드를 추가하면 안 됩니다. 예를 들어 구성 요소의 데이터 집합을 제어하는 사용자 지정 속성에서 SQL 문이 포함된 구성 요소는 SetComponentProperty 메서드 실행 중 출력 열을 추가할 수 있습니다. 구성 요소에서는 캐시된 연결이 있는지 여부를 확인하고, 캐시된 연결이 있으면 데이터 원본에 연결하여 출력 열을 생성합니다.
출력 열을 만든 후 메서드를 호출 SetDataTypeProperties 하여 해당 데이터 형식 속성을 설정합니다. 이 메서드는 , Length및 PrecisionCodePage 속성이 읽기 전용이며 각 속성이 다른 속성의 설정에 따라 달라지므로 필요합니다DataType. 이 메서드는 이러한 값을 일관되게 설정해야 하며 데이터 흐름 태스크에서 올바르게 설정되었는지 확인합니다.
DataType 열에 따라 다른 속성에 대해 설정된 값이 결정됩니다. 다음 표에서는 각 종속 속성에 대한 요구 사항을 보여 줍니다 DataType. 나열되지 않은 데이터 형식에는 해당 종속 속성이 0으로 설정됩니다.
DataType | Length | Scale | Precision | CodePage |
---|---|---|---|---|
DT_DECIMAL | 0 | 0보다 크고 28보다 작거나 같습니다. | 0 | 0 |
DT_CY | 0 | 0 | 0 | 0 |
DT_NUMERIC | 0 | 0보다 크고 28보다 작거나 같으며 전체 자릿수보다 작습니다. | 1보다 크거나 같고 38보다 작거나 같습니다. | 0 |
DT_BYTES | 0보다 큽다. | 0 | 0 | 0 |
DT_STR | 0보다 크고 8000보다 작습니다. | 0 | 0 | 0이 아니고 유효한 코드 페이지입니다. |
DT_WSTR | 0보다 크고 4000보다 작습니다. | 0 | 0 | 0 |
데이터 형식 속성에 대한 제한은 출력 열의 데이터 형식을 기반으로 하기 때문에 관리되는 형식으로 작업할 때 올바른 SSIS 데이터 형식을 선택해야 합니다. 기본 클래스는 관리되는 형식이 지정된 경우 관리되는 구성 요소 개발자가 SSIS 데이터 형식을 선택할 수 있도록 지원하는 세 가지 도우미 메서드 ConvertBufferDataTypeToFitManagedBufferTypeToDataRecordType및 DataRecordTypeToBufferType이를 제공합니다. 이러한 메서드는 관리되는 데이터 형식을 SSIS 데이터 형식으로 변환하고 그 반대의 경우도 마찬가지입니다.
다음 코드 예제에서는 테이블의 스키마에 따라 구성 요소의 출력 열 컬렉션이 채워지는 방법을 보여줍니다. 기본 클래스의 도우미 메서드는 열의 데이터 형식을 설정하는 데 사용되며 종속 속성은 데이터 형식에 따라 설정됩니다.
SqlCommand sqlCommand;
private void CreateColumnsFromDataTable()
{
// Get the output.
IDTSOutput100 output = ComponentMetaData.OutputCollection[0];
// Start clean, and remove the columns from both collections.
output.OutputColumnCollection.RemoveAll();
output.ExternalMetadataColumnCollection.RemoveAll();
this.sqlCommand = sqlConnection.CreateCommand();
this.sqlCommand.CommandType = CommandType.Text;
this.sqlCommand.CommandText = (string)ComponentMetaData.CustomPropertyCollection["SqlStatement"].Value;
SqlDataReader schemaReader = this.sqlCommand.ExecuteReader(CommandBehavior.SchemaOnly);
DataTable dataTable = schemaReader.GetSchemaTable();
// Walk the columns in the schema,
// and for each data column create an output column and an external metadata column.
foreach (DataRow row in dataTable.Rows)
{
IDTSOutputColumn100 outColumn = output.OutputColumnCollection.New();
IDTSExternalMetadataColumn100 exColumn = output.ExternalMetadataColumnCollection.New();
// Set column data type properties.
bool isLong = false;
DataType dt = DataRecordTypeToBufferType((Type)row["DataType"]);
dt = ConvertBufferDataTypeToFitManaged(dt, ref isLong);
int length = 0;
int precision = (short)row["NumericPrecision"];
int scale = (short)row["NumericScale"];
int codepage = dataTable.Locale.TextInfo.ANSICodePage;
switch (dt)
{
// The length cannot be zero, and the code page property must contain a valid code page.
case DataType.DT_STR:
case DataType.DT_TEXT:
length = precision;
precision = 0;
scale = 0;
break;
case DataType.DT_WSTR:
length = precision;
codepage = 0;
scale = 0;
precision = 0;
break;
case DataType.DT_BYTES:
precision = 0;
scale = 0;
codepage = 0;
break;
case DataType.DT_NUMERIC:
length = 0;
codepage = 0;
if (precision > 38)
precision = 38;
if (scale > 6)
scale = 6;
break;
case DataType.DT_DECIMAL:
length = 0;
precision = 0;
codepage = 0;
break;
default:
length = 0;
precision = 0;
codepage = 0;
scale = 0;
break;
}
// Set the properties of the output column.
outColumn.Name = (string)row["ColumnName"];
outColumn.SetDataTypeProperties(dt, length, precision, scale, codepage);
}
}
Private sqlCommand As SqlCommand
Private Sub CreateColumnsFromDataTable()
' Get the output.
Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0)
' Start clean, and remove the columns from both collections.
output.OutputColumnCollection.RemoveAll()
output.ExternalMetadataColumnCollection.RemoveAll()
Me.sqlCommand = sqlConnection.CreateCommand()
Me.sqlCommand.CommandType = CommandType.Text
Me.sqlCommand.CommandText = CStr(ComponentMetaData.CustomPropertyCollection("SqlStatement").Value)
Dim schemaReader As SqlDataReader = Me.sqlCommand.ExecuteReader(CommandBehavior.SchemaOnly)
Dim dataTable As DataTable = schemaReader.GetSchemaTable()
' Walk the columns in the schema,
' and for each data column create an output column and an external metadata column.
For Each row As DataRow In dataTable.Rows
Dim outColumn As IDTSOutputColumn100 = output.OutputColumnCollection.New()
Dim exColumn As IDTSExternalMetadataColumn100 = output.ExternalMetadataColumnCollection.New()
' Set column data type properties.
Dim isLong As Boolean = False
Dim dt As DataType = DataRecordTypeToBufferType(CType(row("DataType"), Type))
dt = ConvertBufferDataTypeToFitManaged(dt, isLong)
Dim length As Integer = 0
Dim precision As Integer = CType(row("NumericPrecision"), Short)
Dim scale As Integer = CType(row("NumericScale"), Short)
Dim codepage As Integer = dataTable.Locale.TextInfo.ANSICodePage
Select Case dt
' The length cannot be zero, and the code page property must contain a valid code page.
Case DataType.DT_STR
Case DataType.DT_TEXT
length = precision
precision = 0
scale = 0
Case DataType.DT_WSTR
length = precision
codepage = 0
scale = 0
precision = 0
Case DataType.DT_BYTES
precision = 0
scale = 0
codepage = 0
Case DataType.DT_NUMERIC
length = 0
codepage = 0
If precision > 38 Then
precision = 38
End If
If scale > 6 Then
scale = 6
End If
Case DataType.DT_DECIMAL
length = 0
precision = 0
codepage = 0
Case Else
length = 0
precision = 0
codepage = 0
scale = 0
End Select
' Set the properties of the output column.
outColumn.Name = CStr(row("ColumnName"))
outColumn.SetDataTypeProperties(dt, length, precision, scale, codepage)
Next
End Sub
구성 요소 유효성 검사
원본 구성 요소의 유효성을 검사하고 출력 열 컬렉션에 정의된 열이 외부 데이터 원본의 열과 일치하는지 확인해야 합니다. 때로는 연결이 끊어진 상태이거나 오랜 서버 왕복을 피하는 것이 바람직할 때와 같이 외부 데이터 원본을 기준으로 출력 열의 유효성을 검사하는 것이 불가능한 경우가 있습니다. 이러한 경우에도 출력 개체의 ExternalMetadataColumnCollection을 사용하여 출력의 열에 대한 유효성을 검사할 수 있습니다. 자세한 내용은 데이터 흐름 구성 요소에 대한 유효성 검사를 참조하세요.
이 컬렉션은 입력 및 출력 개체 둘 다에 존재하며 외부 데이터 원본의 열로 채울 수 있습니다. 이 컬렉션을 사용하여 SSIS Designer가 오프라인 상태이거나 구성 요소가 연결이 끊어지거나 속성이 false일 때 출력 열의 유효성을 ValidateExternalMetadata 검사할 수 있습니다. 먼저 출력 열이 만들어지는 동시에 컬렉션을 채워야 합니다. 외부 메타데이터 열은 처음에 출력 열과 일치해야 하므로 컬렉션에 외부 메타데이터 열을 추가하는 것이 비교적 쉽습니다. 열의 데이터 형식 속성이 이미 올바르게 설정되어 있어야 하며 속성을 개체에 직접 복사할 IDTSExternalMetadataColumn100 수 있습니다.
다음 예제 코드에서는 새로 만들어진 출력 열을 기반으로 외부 메타데이터 열을 추가합니다. 출력 열은 이미 만들어진 것으로 가정합니다.
private void CreateExternalMetaDataColumn(IDTSOutput100 output, IDTSOutputColumn100 outputColumn)
{
// Set the properties of the external metadata column.
IDTSExternalMetadataColumn100 externalColumn = output.ExternalMetadataColumnCollection.New();
externalColumn.Name = outputColumn.Name;
externalColumn.Precision = outputColumn.Precision;
externalColumn.Length = outputColumn.Length;
externalColumn.DataType = outputColumn.DataType;
externalColumn.Scale = outputColumn.Scale;
// Map the external column to the output column.
outputColumn.ExternalMetadataColumnID = externalColumn.ID;
}
Private Sub CreateExternalMetaDataColumn(ByVal output As IDTSOutput100, ByVal outputColumn As IDTSOutputColumn100)
' Set the properties of the external metadata column.
Dim externalColumn As IDTSExternalMetadataColumn100 = output.ExternalMetadataColumnCollection.New()
externalColumn.Name = outputColumn.Name
externalColumn.Precision = outputColumn.Precision
externalColumn.Length = outputColumn.Length
externalColumn.DataType = outputColumn.DataType
externalColumn.Scale = outputColumn.Scale
' Map the external column to the output column.
outputColumn.ExternalMetadataColumnID = externalColumn.ID
End Sub
실행 시간
실행하는 동안 구성 요소는 데이터 흐름 태스크에서 만들고 구성 요소에 제공된 출력 버퍼에 PrimeOutput행을 추가합니다. 원본 구성 요소에 대해 한 번 호출된 메서드는 다운스트림 구성 요소에 연결된 각 IDTSOutput100 구성 요소에 대한 출력 버퍼를 받습니다.
버퍼에서 열 찾기
구성 요소의 출력 버퍼에는 구성 요소에서 정의한 열과 다운스트림 구성 요소의 출력에 추가된 모든 열이 포함됩니다. 예를 들어 원본 구성 요소가 출력에 세 개의 열을 제공하고 다음 구성 요소가 네 번째 출력 열을 추가하는 경우 원본 구성 요소에서 사용하기 위해 제공되는 출력 버퍼에는 이러한 4개의 열이 포함됩니다.
버퍼 행의 열 순서는 출력 열 컬렉션에 있는 출력 열의 인덱스로 정의되지 않습니다. 따라서 FindColumnByLineageID의 BufferManager 메서드를 사용해야 버퍼 행에서 출력 열을 정확하게 찾을 수 있습니다. 이 메서드는 지정된 버퍼에서 지정된 계보 ID가 있는 열을 찾아 행의 위치를 반환합니다. 출력 열의 인덱스는 일반적으로 PreExecute 메서드 실행 중에 검색되며 PrimeOutput 실행 중에 사용할 수 있도록 저장됩니다.
다음 코드 예제에서는 호출 PreExecute하는 동안 출력 버퍼에서 출력 열의 위치를 찾아 내부 구조에 저장합니다. 또한 열의 이름은 구조에 저장되며 이 항목의 다음 섹션에서 PrimeOutput 메서드에 대한 코드 예제에 사용됩니다.
ArrayList columnInformation;
private struct ColumnInfo
{
public int BufferColumnIndex;
public string ColumnName;
}
public override void PreExecute()
{
this.columnInformation = new ArrayList();
IDTSOutput100 output = ComponentMetaData.OutputCollection[0];
foreach (IDTSOutputColumn100 col in output.OutputColumnCollection)
{
ColumnInfo ci = new ColumnInfo();
ci.BufferColumnIndex = BufferManager.FindColumnByLineageID(output.Buffer, col.LineageID);
ci.ColumnName = col.Name;
columnInformation.Add(ci);
}
}
Public Overrides Sub PreExecute()
Me.columnInformation = New ArrayList()
Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0)
For Each col As IDTSOutputColumn100 In output.OutputColumnCollection
Dim ci As ColumnInfo = New ColumnInfo()
ci.BufferColumnIndex = BufferManager.FindColumnByLineageID(output.Buffer, col.LineageID)
ci.ColumnName = col.Name
columnInformation.Add(ci)
Next
End Sub
행 처리
AddRow 메서드를 호출하여 출력 버퍼에 행을 추가하면 열에 빈 값이 있는 새 버퍼 행이 만들어집니다. 그런 다음 구성 요소는 개별 열에 값을 할당합니다. 구성 요소에 제공된 출력 버퍼는 데이터 흐름 태스크에 의해 생성되고 모니터링됩니다. 출력 버퍼가 가득 차면 버퍼의 행은 다음 구성 요소로 이동됩니다. 데이터 흐름 태스크에 의한 행 이동이 구성 요소 개발자에게 투명하고 RowCount 출력 버퍼에서 속성이 항상 0이므로 행 배치가 다음 구성 요소로 전송된 시기를 확인할 수 있는 방법은 없습니다. 원본 구성 요소가 출력 버퍼에 행 추가를 마치면 해당 메서드를 호출 SetEndOfRowset 하여 데이터 흐름 태스크에 알리고 버퍼의 PipelineBuffer나머지 행은 다음 구성 요소에 전달됩니다.
원본 구성 요소가 외부 데이터 원본에서 행을 읽는 동안 메서드를 호출 IncrementPipelinePerfCounter 하여 "행 읽기" 또는 "BLOB 바이트 읽기" 성능 카운터를 업데이트할 수 있습니다. 자세한 내용은 성능 카운터를 참조하세요.
다음 코드 예제에서는 출력 버퍼에 행을 추가하는 구성 요소를 보여 줍니다 PrimeOutput. 버퍼에 있는 출력 열의 인덱스는 이전 코드 예제에서 사용 PreExecute 했습니다.
public override void PrimeOutput(int outputs, int[] outputIDs, PipelineBuffer[] buffers)
{
IDTSOutput100 output = ComponentMetaData.OutputCollection[0];
PipelineBuffer buffer = buffers[0];
SqlDataReader dataReader = sqlCommand.ExecuteReader();
// Loop over the rows in the DataReader,
// and add them to the output buffer.
while (dataReader.Read())
{
// Add a row to the output buffer.
buffer.AddRow();
for (int x = 0; x < columnInformation.Count; x++)
{
ColumnInfo ci = (ColumnInfo)columnInformation[x];
int ordinal = dataReader.GetOrdinal(ci.ColumnName);
if (dataReader.IsDBNull(ordinal))
buffer.SetNull(ci.BufferColumnIndex);
else
{
buffer[ci.BufferColumnIndex] = dataReader[ci.ColumnName];
}
}
}
buffer.SetEndOfRowset();
}
Public Overrides Sub PrimeOutput(ByVal outputs As Integer, ByVal outputIDs As Integer(), ByVal buffers As PipelineBuffer())
Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0)
Dim buffer As PipelineBuffer = buffers(0)
Dim dataReader As SqlDataReader = sqlCommand.ExecuteReader()
' Loop over the rows in the DataReader,
' and add them to the output buffer.
While (dataReader.Read())
' Add a row to the output buffer.
buffer.AddRow()
For x As Integer = 0 To columnInformation.Count
Dim ci As ColumnInfo = CType(columnInformation(x), ColumnInfo)
Dim ordinal As Integer = dataReader.GetOrdinal(ci.ColumnName)
If (dataReader.IsDBNull(ordinal)) Then
buffer.SetNull(ci.BufferColumnIndex)
Else
buffer(ci.BufferColumnIndex) = dataReader(ci.ColumnName)
End If
Next
End While
buffer.SetEndOfRowset()
End Sub
Sample
다음 샘플에서는 파일 연결 관리자를 사용하여 파일의 이진 콘텐츠를 데이터 흐름에 로드하는 간단한 원본 구성 요소를 보여 줍니다. 이 샘플에서는 이 항목에서 설명하는 모든 메서드와 기능을 보여 주지 않습니다. 모든 사용자 지정 원본 구성 요소가 재정의해야 하지만 디자인 타임 유효성 검사를 위한 코드는 포함하지 않는 중요한 메서드를 보여 줍니다.
using System;
using System.IO;
using Microsoft.SqlServer.Dts.Pipeline;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
namespace BlobSrc
{
[DtsPipelineComponent(DisplayName = "BLOB Inserter Source", Description = "Inserts files into the data flow as BLOBs")]
public class BlobSrc : PipelineComponent
{
IDTSConnectionManager100 m_ConnMgr;
int m_FileNameColumnIndex = -1;
int m_FileBlobColumnIndex = -1;
public override void ProvideComponentProperties()
{
IDTSOutput100 output = ComponentMetaData.OutputCollection.New();
output.Name = "BLOB File Inserter Output";
IDTSOutputColumn100 column = output.OutputColumnCollection.New();
column.Name = "FileName";
column.SetDataTypeProperties(DataType.DT_WSTR, 256, 0, 0, 0);
column = output.OutputColumnCollection.New();
column.Name = "FileBLOB";
column.SetDataTypeProperties(DataType.DT_IMAGE, 0, 0, 0, 0);
IDTSRuntimeConnection100 conn = ComponentMetaData.RuntimeConnectionCollection.New();
conn.Name = "FileConnection";
}
public override void AcquireConnections(object transaction)
{
IDTSRuntimeConnection100 conn = ComponentMetaData.RuntimeConnectionCollection[0];
m_ConnMgr = conn.ConnectionManager;
}
public override void ReleaseConnections()
{
m_ConnMgr = null;
}
public override void PreExecute()
{
IDTSOutput100 output = ComponentMetaData.OutputCollection[0];
m_FileNameColumnIndex = (int)BufferManager.FindColumnByLineageID(output.Buffer, output.OutputColumnCollection[0].LineageID);
m_FileBlobColumnIndex = (int)BufferManager.FindColumnByLineageID(output.Buffer, output.OutputColumnCollection[1].LineageID);
}
public override void PrimeOutput(int outputs, int[] outputIDs, PipelineBuffer[] buffers)
{
string strFileName = (string)m_ConnMgr.AcquireConnection(null);
while (strFileName != null)
{
buffers[0].AddRow();
buffers[0].SetString(m_FileNameColumnIndex, strFileName);
FileInfo fileInfo = new FileInfo(strFileName);
byte[] fileData = new byte[fileInfo.Length];
FileStream fs = new FileStream(strFileName, FileMode.Open, FileAccess.Read, FileShare.Read);
fs.Read(fileData, 0, fileData.Length);
buffers[0].AddBlobData(m_FileBlobColumnIndex, fileData);
strFileName = (string)m_ConnMgr.AcquireConnection(null);
}
buffers[0].SetEndOfRowset();
}
}
}
Imports System
Imports System.IO
Imports Microsoft.SqlServer.Dts.Pipeline
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper
Imports Microsoft.SqlServer.Dts.Runtime.Wrapper
Namespace BlobSrc
<DtsPipelineComponent(DisplayName="BLOB Inserter Source", Description="Inserts files into the data flow as BLOBs")> _
Public Class BlobSrc
Inherits PipelineComponent
Private m_ConnMgr As IDTSConnectionManager100
Private m_FileNameColumnIndex As Integer = -1
Private m_FileBlobColumnIndex As Integer = -1
Public Overrides Sub ProvideComponentProperties()
Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection.New
output.Name = "BLOB File Inserter Output"
Dim column As IDTSOutputColumn100 = output.OutputColumnCollection.New
column.Name = "FileName"
column.SetDataTypeProperties(DataType.DT_WSTR, 256, 0, 0, 0)
column = output.OutputColumnCollection.New
column.Name = "FileBLOB"
column.SetDataTypeProperties(DataType.DT_IMAGE, 0, 0, 0, 0)
Dim conn As IDTSRuntimeConnection90 = ComponentMetaData.RuntimeConnectionCollection.New
conn.Name = "FileConnection"
End Sub
Public Overrides Sub AcquireConnections(ByVal transaction As Object)
Dim conn As IDTSRuntimeConnection100 = ComponentMetaData.RuntimeConnectionCollection(0)
m_ConnMgr = conn.ConnectionManager
End Sub
Public Overrides Sub ReleaseConnections()
m_ConnMgr = Nothing
End Sub
Public Overrides Sub PreExecute()
Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0)
m_FileNameColumnIndex = CType(BufferManager.FindColumnByLineageID(output.Buffer, output.OutputColumnCollection(0).LineageID), Integer)
m_FileBlobColumnIndex = CType(BufferManager.FindColumnByLineageID(output.Buffer, output.OutputColumnCollection(1).LineageID), Integer)
End Sub
Public Overrides Sub PrimeOutput(ByVal outputs As Integer, ByVal outputIDs As Integer(), ByVal buffers As PipelineBuffer())
Dim strFileName As String = CType(m_ConnMgr.AcquireConnection(Nothing), String)
While Not (strFileName Is Nothing)
buffers(0).AddRow
buffers(0).SetString(m_FileNameColumnIndex, strFileName)
Dim fileInfo As FileInfo = New FileInfo(strFileName)
Dim fileData(fileInfo.Length) As Byte
Dim fs As FileStream = New FileStream(strFileName, FileMode.Open, FileAccess.Read, FileShare.Read)
fs.Read(fileData, 0, fileData.Length)
buffers(0).AddBlobData(m_FileBlobColumnIndex, fileData)
strFileName = CType(m_ConnMgr.AcquireConnection(Nothing), String)
End While
buffers(0).SetEndOfRowset
End Sub
End Class
End Namespace