データ フロー コンポーネントの実行時のメソッド
実行時に、データ フロー タスクは、コンポーネントの順序の確認、実行プランの準備、および作業プランを実行するワーカー スレッドのプールの管理を行います。タスクは、データの行を変換元から読み込み、変換を使用して処理し、変換先に保存します。
メソッドの実行順序
データ フロー コンポーネントの実行中に、PipelineComponent 基本クラス内のメソッドのサブセットが呼び出されます。呼び出されるメソッドとその順序は常に同じです。ただし、PrimeOutput および ProcessInput メソッドは例外です。これら 2 つのメソッドは、コンポーネントの IDTSInput100 および IDTSOutput100 オブジェクトの存在と構成に基づいて呼び出されます。
次の一覧では、コンポーネントの実行中に呼び出される順にメソッドを示します。PrimeOutput は、常に ProcessInput の前に呼び出されます。
PrimeOutput メソッド
PrimeOutput メソッドは、コンポーネントに少なくとも 1 つの出力があり、その出力が IDTSPath100 オブジェクトを介して下流コンポーネントにアタッチされ、出力の SynchronousInputID プロパティが 0 の場合に呼び出されます。PrimeOutput メソッドは、非同期出力型の変換元コンポーネントと変換の場合に呼び出されます。後述の ProcessInput メソッドとは異なり、PrimeOutput メソッドは必要とされる各コンポーネントに対して 1 回だけ呼び出されます。
ProcessInput メソッド
ProcessInput メソッドは、IDTSPath100 オブジェクトによって上流コンポーネントにアタッチされる入力が少なくとも 1 つあるコンポーネントに対して呼び出されます。ProcessInput メソッドは、同期出力型の変換先コンポーネントと変換に対して呼び出されます。ProcessInput は、処理する行が上流コンポーネントから渡されなくなるまで繰り返し呼び出されます。
入力と出力の処理
データ フロー コンポーネントは、実行時に次のタスクを実行します。
変換コンポーネントは行を追加します。
同期出力型の変換コンポーネントは、変換元コンポーネントから渡される行を受け取ります。
非同期出力型の変換コンポーネントは、行を受け取って行を追加します。
変換先コンポーネントは、行を受け取って変換先に読み込みます。
実行中、データ フロー タスクは、コンポーネントのシーケンスの出力列コレクションで定義されたすべての列を含む PipelineBuffer オブジェクトを割り当てます。たとえば、データ フロー シーケンス内の 4 つの各コンポーネントが 1 つの出力列をその出力列コレクションに追加した場合、各コンポーネントに提供されているバッファには 4 つの列、つまりコンポーネントごとに 1 つの出力列が格納されます。この動作のため、使用しない列を含むバッファをコンポーネントが受け取る場合があります。
コンポーネントが受け取るバッファにはコンポーネントが使用しない列が含まれる場合もあるので、データ フロー タスクがコンポーネントに提供するバッファ内のコンポーネントの入力および出力列のコレクションから、使用する列を検索する必要があります。これを行うには、BufferManager プロパティの FindColumnByLineageID メソッドを使用します。パフォーマンスの理由により、このタスクは通常、PrimeOutput や ProcessInput ではなく、PreExecute メソッドで実行します。
PreExecute は PrimeOutput および ProcessInput メソッドの前に呼び出され、BufferManager がコンポーネントで使用できるようになった後に初めてコンポーネントで作業を実行できます。このメソッドの実行中、コンポーネントはバッファ内の列を検索してこの情報を内部的に格納し、列が PrimeOutput または ProcessInput メソッドのどちらかで使用できるようにします。
次のコード例では、PreExecute で、同期出力型の変換コンポーネントがバッファ内の入力列を検索する方法を示します。
private int []bufferColumnIndex;
public override void PreExecute()
{
IDTSInput100 input = ComponentMetaData.InputCollection[0];
bufferColumnIndex = new int[input.InputColumnCollection.Count];
for( int x=0; x < input.InputColumnCollection.Count; x++)
{
IDTSInputColumn100 column = input.InputColumnCollection[x];
bufferColumnIndex[x] = BufferManager.FindColumnByLineageID( input.Buffer, column.LineageID);
}
}
Dim bufferColumnIndex As Integer()
Public Overrides Sub PreExecute()
Dim input As IDTSInput100 = ComponentMetaData.InputCollection(0)
ReDim bufferColumnIndex(input.InputColumnCollection.Count)
For x As Integer = 0 To input.InputColumnCollection.Count
Dim column As IDTSInputColumn100 = input.InputColumnCollection(x)
bufferColumnIndex(x) = BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID)
Next
End Sub
行の追加
コンポーネントは、PipelineBuffer オブジェクトに行を追加することで、下流コンポーネントに行を提供します。データ フロー タスクは、下流コンポーネントに接続される IDTSOutput100 オブジェクトごとに 1 つずつ、出力バッファの配列を PrimeOutput メソッドに渡すパラメータとして提供します。非同期出力型の変換元コンポーネントと変換コンポーネントは、行をバッファに追加し、行の追加が完了したら SetEndOfRowset メソッドを呼び出します。データ フロー タスクは、コンポーネントに提供する出力バッファを管理し、バッファがいっぱいになると、バッファ内の行を自動的に次のコンポーネントに移動します。PrimeOutput メソッドは、繰り返し呼び出される ProcessInput メソッドと異なり、コンポーネントごとに 1 回ずつ呼び出されます。
次のコード例では、コンポーネントが PrimeOutput メソッドで出力バッファに行を追加し、SetEndOfRowset メソッドを呼び出す方法を示します。
public override void PrimeOutput( int outputs, int []outputIDs,PipelineBuffer []buffers)
{
for( int x=0; x < outputs; x++ )
{
IDTSOutput100 output = ComponentMetaData.OutputCollection.GetObjectByID( outputIDs[x]);
PipelineBuffer buffer = buffers[x];
// TODO: Add rows to the output buffer.
}
foreach( PipelineBuffer buffer in buffers )
{
/// Notify the data flow task that no more rows are coming.
buffer.SetEndOfRowset();
}
}
public overrides sub PrimeOutput( outputs as Integer , outputIDs() as Integer ,buffers() as PipelineBuffer buffers)
For x As Integer = 0 To outputs.MaxValue
Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection.GetObjectByID(outputIDs(x))
Dim buffer As PipelineBuffer = buffers(x)
' TODO: Add rows to the output buffer.
Next
For Each buffer As PipelineBuffer In buffers
' Notify the data flow task that no more rows are coming.
buffer.SetEndOfRowset()
Next
End Sub
出力バッファに行を追加するコンポーネントの開発の詳細については、「カスタム変換元コンポーネントの開発」および「非同期出力型のカスタム変換コンポーネントの開発」を参照してください。
行の受信
コンポーネントは、PipelineBuffer オブジェクト内の上流コンポーネントから行を受け取ります。データ フロー タスクでは、上流コンポーネントがデータ フローに追加する行が含まれる PipelineBuffer オブジェクトが、ProcessInput メソッドに渡されるパラメータとして用意されています。この入力バッファを使用して、バッファ内の行および列を検証して変更することはできますが、行を追加したり削除することはできません。ProcessInput メソッドは、渡されるバッファがなくなるまで繰り返し呼び出されます。このメソッドが最後に呼び出されたとき、EndOfRowset プロパティは true になります。バッファを次の行に進める NextRow メソッドを使用することにより、バッファ内の行のコレクションを反復処理できます。バッファがコレクション内の最後の行にある場合、このメソッドは false を返します。データの最後の行が処理された後にさらに操作を実行する必要がある場合を除き、EndOfRowset プロパティを確認する必要はありません。
次のテキストは、NextRow メソッドと EndOfRowset プロパティを使用する正しいパターンを示しています。
while (buffer.NextRow())
{
// 各行について何らかの処理を行う。
}
if (buffer.EndOfRowset)
{
// 必要に応じて、すべての行が処理された後に何らかの処理を行う。
}
次のコード例では、ProcessInput メソッドの実行中に、コンポーネントが入力バッファ内の行を処理する方法を示します。
public override void ProcessInput( int inputID, PipelineBuffer buffer )
{
{
IDTSInput100 input = ComponentMetaData.InputCollection.GetObjectByID(inputID);
while( buffer.NextRow())
{
// TODO: Examine the columns in the current row.
}
}
Public Overrides Sub ProcessInput(ByVal inputID As Integer, ByVal buffer As PipelineBuffer)
Dim input As IDTSInput100 = ComponentMetaData.InputCollection.GetObjectByID(inputID)
While buffer.NextRow() = True
' TODO: Examine the columns in the current row.
End While
End Sub
入力バッファ内の行を受け取るコンポーネントの開発の詳細については、「カスタム変換先コンポーネントの開発」および「同期出力型のカスタム変換コンポーネントの開発」を参照してください。
|