同期出力型のカスタム変換コンポーネントの開発
同期出力型の変換コンポーネントは、上流コンポーネントから行を受け取り、これらの行の列の値を読み取ったり変更したりして、下流コンポーネントに渡します。このコンポーネントは、上流コンポーネントから提供される列から派生する、別の出力列も定義しますが、データ フローに行を追加することはありません。同期コンポーネントと非同期コンポーネントの相違点の詳細については、「同期および非同期変換について」を参照してください。
この種類のコンポーネントは、データがコンポーネントに提供されたときにインラインで変更され、処理する前にコンポーネントがすべての行を確認する必要がないタスクに適しています。通常、同期出力型の変換では、外部データ ソースへの接続、外部メタデータ列の管理、および出力バッファへの行の追加は行われないため、このコンポーネントを開発するのは最も簡単です。
同期出力型の変換コンポーネントを作成するには、コンポーネント用に選択された上流列を格納する IDTSInput90 オブジェクトと、コンポーネントによって作成された派生列を格納する IDTSOutput90 オブジェクトを追加します。さらに、デザイン時のメソッドを実装し、実行中に受信バッファ行内の列を読み取ったり変更するコードを設定します。
このセクションでは、カスタム変換コンポーネントの実装に必要な情報、および概念の理解に役立つコード例を提供します。同期出力型の変換コンポーネントのサンプルについては、「Change Case コンポーネント サンプル」を参照してください。この種類のコンポーネントの完全な例については、「Change Case コンポーネント サンプル」を参照してください。
デザイン時
このコンポーネントのデザイン時のコードでは、入力と出力の作成、コンポーネントが生成するすべての出力の追加、および、コンポーネントの構成の検証を行います。
コンポーネントの作成
コンポーネントの入力、出力、およびカスタム プロパティは、通常、ProvideComponentProperties メソッドの実行中に作成します。同期出力型の変換コンポーネントの入力および出力を追加するには、2 つの方法があります。メソッドの基本クラスの実装を使用して、作成された既定の入力および出力を変更する方法と、入力と出力を手動で明示的に追加する方法です。
次のコード例は、入力および出力オブジェクトを明示的に追加する ProvideComponentProperties を実装します。同様の処理を行うために呼び出す基本クラスについては、コメントに記載されています。
using Microsoft.SqlServer.Dts.Pipeline;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime;
namespace Microsoft.Samples.SqlServer.Dts
{
[DtsPipelineComponent(DisplayName = "SynchronousComponent", ComponentType = ComponentType.Transform)]
public class SyncComponent : PipelineComponent
{
public override void ProvideComponentProperties()
{
// Add the input.
IDTSInput90 input = ComponentMetaData.InputCollection.New();
input.Name = "Input";
// Add the output.
IDTSOutput90 output = ComponentMetaData.OutputCollection.New();
output.Name = "Output";
output.SynchronousInputID = input.ID;
// Alternatively, you can let the base class add the input and output
// and set the SynchronousInputID of the output to the ID of the input.
// base.ProvideComponentProperties();
}
}
}
Imports Microsoft.SqlServer.Dts.Pipeline
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper
Imports Microsoft.SqlServer.Dts.Runtime
<DtsPipelineComponent(DisplayName:="SynchronousComponent", ComponentType:=ComponentType.Transform)> _
Public Class SyncComponent
Inherits PipelineComponent
Public Overrides Sub ProvideComponentProperties()
' Add the input.
Dim input As IDTSInput90 = ComponentMetaData.InputCollection.New()
input.Name = "Input"
' Add the output.
Dim output As IDTSOutput90 = ComponentMetaData.OutputCollection.New()
output.Name = "Output"
output.SynchronousInputID = Input.ID
' Alternatively, you can let the base class add the input and output
' and set the SynchronousInputID of the output to the ID of the input.
' base.ProvideComponentProperties();
End Sub
End Class
出力列の作成と設定
同期出力型の変換コンポーネントでは、バッファに行は追加されません。ただし、コンポーネントの出力に出力列を追加することはできます。通常、コンポーネントが出力列を追加すると、新しい出力列の値は、実行時に上流コンポーネントから渡される 1 つ以上の列に含まれるデータから取得されます。
出力列が作成されたら、データ型のプロパティを設定する必要があります。出力列のデータ型のプロパティを設定するには特別な処理が必要で、その処理は、SetDataTypeProperties メソッドを呼び出して実行します。このメソッドが必要なのは、DataType、Length、Precision、および CodePage プロパティが個々に読み取り専用で、各プロパティの設定が他の設定に依存しているためです。このメソッドを使用すると、プロパティの値を確実に矛盾なく設定でき、データ フロー タスクにより、正しく設定されていることが検証されます。
列の DataType により、他のプロパティに設定される値が決定されます。次の表は、それぞれの DataType に対する、依存するプロパティの要件を示しています。ここに示されていないデータ型の場合、依存するプロパティは 0 に設定されます。
DataType | Length | Scale | Precision | CodePage |
---|---|---|---|---|
DT_DECIMAL |
0 |
0 より大きく、28 以下 |
0 |
0 |
DT_CY |
0 |
0 |
0 |
0 |
DT_NUMERIC |
0 |
0 より大きく 28 以下で、有効桁数の値未満 |
1 以上 38 以下 |
0 |
DT_BYTES |
0 より大きい |
0 |
0 |
0 |
DT_STR |
0 より大きく、8000 未満 |
0 |
0 |
0 以外の有効なコード ページ |
DT_WSTR |
0 より大きく、4000 未満 |
0 |
0 |
0 |
データ型プロパティの制約は出力列のデータ型に基づくため、マネージ型を処理する場合、Integration Services の正しいデータ型を選択する必要があります。基本クラスには、ConvertBufferDataTypeToFitManaged、BufferTypeToDataRecordType、および DataRecordTypeToBufferType の 3 つのヘルパ メソッドがあり、これを使用すると、マネージ コンポーネントの開発者は、マネージ型に対応する SSIS のデータ型を適切に選択できます。これらのメソッドは、マネージ データ型と SSIS のデータ型を相互に変換します。
実行時
一般的に、コンポーネントのランタイム部分の実装は、コンポーネントの入力および出力列をバッファ内で検索するタスクと、受信バッファ行内にあるこれらの列の値を読み取りまたは書き込みするタスクの 2 つに分類されます。
バッファ内の列の検索
実行中にコンポーネントに提供されるバッファ内の列数は、コンポーネントの入力または出力コレクション内の列数よりも大きい場合があります。各バッファには、データ フローのコンポーネントで定義されているすべての出力列が含まれているためです。バッファの列が入力または出力の列と正しく一致していることを確認するには、BufferManager の FindColumnByLineageID メソッドを使用する必要があります。このメソッドは、特定のバッファ内の列を系列 ID によって検索します。通常、列は PreExecute の実行中に検索されます。これは、BufferManager プロパティが使用できるようになる最初の実行時メソッドであるためです。
次のコード例は、PreExecute の実行中に、入力および出力列のコレクションで列のインデックスを検索するコンポーネントを示します。列インデックスは整数の配列に格納され、ProcessInput の実行中にコンポーネントからアクセスできます。
int []inputColumns;
int []outputColumns;
public override void PreExecute()
{
IDTSInput90 input = ComponentMetaData.InputCollection[0];
IDTSOutput90 output = ComponentMetaData.OutputCollection[0];
inputColumns = new int[input.InputColumnCollection.Count];
outputColumns = new int[output.OutputColumnCollection.Count];
for(int col=0; col < input.InputColumnCollection.Count; col++)
{
IDTSInputColumn90 inputColumn = input.InputColumnCollection[col];
inputColumns[col] = BufferManager.FindColumnByLineageID(input.Buffer, inputColumn.LineageID);
}
for(int col=0; col < output.OutputColumnCollection.Count; col++)
{
IDTSOutputColumn90 outputColumn = output.OutputColumnCollection[col];
outputColumns[col] = BufferManager.FindColumnByLineageID(input.Buffer, outputColumn.LineageID);
}
}
Public Overrides Sub PreExecute()
Dim input As IDTSInput90 = ComponentMetaData.InputCollection(0)
Dim output As IDTSOutput90 = ComponentMetaData.OutputCollection(0)
ReDim inputColumns(input.InputColumnCollection.Count)
ReDim outputColumns(output.OutputColumnCollection.Count)
For col As Integer = 0 To input.InputColumnCollection.Count
Dim inputColumn As IDTSInputColumn90 = input.InputColumnCollection(col)
inputColumns(col) = BufferManager.FindColumnByLineageID(input.Buffer, inputColumn.LineageID)
Next
For col As Integer = 0 To output.OutputColumnCollection.Count
Dim outputColumn As IDTSOutputColumn90 = output.OutputColumnCollection(col)
outputColumns(col) = BufferManager.FindColumnByLineageID(input.Buffer, outputColumn.LineageID)
Next
End Sub
行の処理
コンポーネントは、ProcessInput メソッドで、行および列を含む PipelineBuffer オブジェクトを受け取ります。このメソッドではバッファ内の行が繰り返され、PreExecute が読み取られて変更されている間に列が識別されます。このメソッドは、上流コンポーネントから行が渡されなくなるまで、データ フロー タスクによって繰り返し呼び出されます。
バッファ内の列は、配列のインデクサにアクセスする方法を使用するか、Get または Set メソッドのいずれかを使用することにより、個別に読み取りまたは書き込みが行われます。バッファ内の列のデータ型がわかっている場合には、Get および Set メソッドの方が効率的であり、こちらを使用するようお勧めします。
次のコード例では、受信行を処理する ProcessInput メソッドを実装します。
public override void ProcessInput( int InputID, PipelineBuffer buffer)
{
if( ! buffer.EndOfRowset)
{
while( buffer.NextRow())
{
for(int x=0; x < inputColumns.Length;x++)
{
if(!buffer.IsNull(inputColumns[x]))
{
object columnData = buffer[inputColumns[x]];
// TODO: Modify the column data.
buffer[inputColumns[x]] = columnData;
}
}
}
}
}
Public Overrides Sub ProcessInput(ByVal InputID As Integer, ByVal buffer As PipelineBuffer)
If buffer.EndOfRowset = False Then
While (buffer.NextRow())
For x As Integer = 0 To inputColumns.Length
if buffer.IsNull(inputColumns(x)) = false then
Dim columnData As Object = buffer(inputColumns(x))
' TODO: Modify the column data.
buffer(inputColumns(x)) = columnData
End If
Next
End While
End If
End Sub
サンプル
次のサンプルは、すべての文字列型の列の値を大文字に変換する、簡単な同期出力型の変換コンポーネントを示しています。ただし、この例ではここで説明したメソッドや機能のすべてが使われているわけではありません。同期出力型の変換コンポーネントで必ずオーバーライドしなければならない重要なメソッドは示していますが、デザイン時の検証のためのコードは含まれていません。同期出力型の変換コンポーネントの完全なサンプルについては、「Change Case コンポーネント サンプル」を参照してください。
using System;
using System.Collections;
using Microsoft.SqlServer.Dts.Pipeline;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
namespace Uppercase
{
[DtsPipelineComponent(DisplayName = "Uppercase")]
public class Uppercase : PipelineComponent
{
ArrayList m_ColumnIndexList = new ArrayList();
public override void ProvideComponentProperties()
{
base.ProvideComponentProperties();
ComponentMetaData.InputCollection[0].Name = "Uppercase Input";
ComponentMetaData.OutputCollection[0].Name = "Uppercase Output";
}
public override void PreExecute()
{
IDTSInput90 input = ComponentMetaData.InputCollection[0];
IDTSInputColumnCollection90 inputColumns = input.InputColumnCollection;
foreach (IDTSInputColumn90 column in inputColumns)
{
if (column.DataType == DataType.DT_STR || column.DataType == DataType.DT_WSTR)
{
m_ColumnIndexList.Add((int)BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID));
}
}
}
public override void ProcessInput(int inputID, PipelineBuffer buffer)
{
while (buffer.NextRow())
{
foreach (int columnIndex in m_ColumnIndexList)
{
string str = buffer.GetString(columnIndex);
buffer.SetString(columnIndex, str.ToUpper());
}
}
}
}
}
Imports System
Imports System.Collections
Imports Microsoft.SqlServer.Dts.Pipeline
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper
Imports Microsoft.SqlServer.Dts.Runtime.Wrapper
Namespace Uppercase
<DtsPipelineComponent(DisplayName="Uppercase")> _
Public Class Uppercase
Inherits PipelineComponent
Private m_ColumnIndexList As ArrayList = New ArrayList
Public Overrides Sub ProvideComponentProperties()
MyBase.ProvideComponentProperties
ComponentMetaData.InputCollection(0).Name = "Uppercase Input"
ComponentMetaData.OutputCollection(0).Name = "Uppercase Output"
End Sub
Public Overrides Sub PreExecute()
Dim input As IDTSInput90 = ComponentMetaData.InputCollection(0)
Dim inputColumns As IDTSInputColumnCollection90 = input.InputColumnCollection
For Each column As IDTSInputColumn90 In inputColumns
If column.DataType = DataType.DT_STR OrElse column.DataType = DataType.DT_WSTR Then
m_ColumnIndexList.Add(CType(BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID), Integer))
End If
Next
End Sub
Public Overrides Sub ProcessInput(ByVal inputID As Integer, ByVal buffer As PipelineBuffer)
While buffer.NextRow
For Each columnIndex As Integer In m_ColumnIndexList
Dim str As String = buffer.GetString(columnIndex)
buffer.SetString(columnIndex, str.ToUpper)
Next
End While
End Sub
End Class
End Namespace
参照
概念
非同期出力型のカスタム変換コンポーネントの開発
同期および非同期変換について
スクリプト コンポーネントによる同期変換の作成