Методы времени выполнения для компонента потока данных
Во время выполнения задача потока данных проверяет последовательность компонентов, подготавливает план выполнения и управляет пулом рабочих потоков, выполняющих план работы. Задача загружает строки данных из источников, обрабатывает их с помощью преобразований, а затем сохраняет в соответствующие назначения.
Последовательность выполнения методов
Во время выполнения компонента потока данных вызывается поднабор методов в базовом классе PipelineComponent. Всегда вызываются одни и те же методы в одной и той же последовательности, за исключением методов PrimeOutput и ProcessInput. Эти два метода вызываются в зависимости от существования и конфигурации объектов IDTSInput100 и IDTSOutput100 компонента.
Следующий список содержит методы в том порядке, в котором они вызываются во время выполнения компонента. Обратите внимание на то, что если метод PrimeOutput вызывается, это всегда происходит перед вызовом метода ProcessInput.
Метод PrimeOutput
Метод PrimeOutput вызывается, когда в компоненте имеется, по меньшей мере, один выход, присоединенный к нижестоящему компоненту через объект IDTSPath100, а свойство SynchronousInputID этого выхода имеет нулевое значение. Метод PrimeOutput вызывается для исходных компонентов и для преобразований с асинхронными выходами. В отличие от метода ProcessInput, описываемого ниже, метод PrimeOutput вызывается только один раз для каждого компонента, где он требуется.
Метод ProcessInput
Метод ProcessInput вызывается для компонентов, имеющих, по меньшей мере, один вход, присоединенный к вышестоящему компоненту посредством объекта IDTSPath100. Метод ProcessInput вызывается для целевых компонентов и для преобразований с синхронными выходами. Метод ProcessInput вызывается повторно до тех пор, пока не будут обработаны все строки из вышестоящих компонентов.
Работа с входами и выходами
Во время выполнения компонентами потока данных выполняются следующие задачи:
Исходные компоненты добавляют строки.
Компоненты преобразования с синхронными выходами получают строки, передаваемые исходными компонентами.
Компоненты преобразования с асинхронными выходами получают строки и добавляют строки.
Целевые компоненты получают строки и затем передают их в назначение.
Во время выполнения задача потока данных выделяет объекты PipelineBuffer, которые содержат все столбцы, определенные в коллекциях выходных столбцов последовательности компонентов. Например, если каждый из четырех компонентов в последовательности потока данных добавляет один выходной столбец в его коллекцию выходных столбцов, буфер, предоставляемый каждому компоненту, содержит четыре столбца, по одному для каждого выходного столбца на компонент. В силу этого компонент иногда получает буферы, содержащие столбцы, которые им не используются.
Поскольку буферы, которые получает компонент, могут содержать не используемые им столбцы, необходимо найти столбцы, которые должны быть использованы в коллекциях входных и выходных столбцов компонента, в буфере, предоставляемом компоненту задачей потока данных. Для этого используется метод FindColumnByLineageID свойства BufferManager. Для улучшения производительности эта задача обычно выполняется во время выполнения метода PreExecute, а не методов PrimeOutput или ProcessInput.
Метод PreExecute вызывается перед методами PrimeOutput и ProcessInput и является для компонента первой возможностью выполнить эту работу после получения компонентом доступа к BufferManager. Во время выполнения этого метода компоненту необходимо найти его столбцы в буферах и сохранить эти данные внутренне, чтобы столбцы можно было использовать в методах PrimeOutput или ProcessInput.
В следующем примере кода показано, как компонент преобразования с синхронным выходом осуществляет поиск своих входных столбцов в буфере во время выполнения PreExecute.
private int []bufferColumnIndex;
public override void PreExecute()
{
IDTSInput100 input = ComponentMetaData.InputCollection[0];
bufferColumnIndex = new int[input.InputColumnCollection.Count];
for( int x=0; x < input.InputColumnCollection.Count; x++)
{
IDTSInputColumn100 column = input.InputColumnCollection[x];
bufferColumnIndex[x] = BufferManager.FindColumnByLineageID( input.Buffer, column.LineageID);
}
}
Dim bufferColumnIndex As Integer()
Public Overrides Sub PreExecute()
Dim input As IDTSInput100 = ComponentMetaData.InputCollection(0)
ReDim bufferColumnIndex(input.InputColumnCollection.Count)
For x As Integer = 0 To input.InputColumnCollection.Count
Dim column As IDTSInputColumn100 = input.InputColumnCollection(x)
bufferColumnIndex(x) = BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID)
Next
End Sub
Добавление строк
Компоненты предоставляют строки нижестоящим компонентам путем добавления строк в объекты PipelineBuffer. Задача потока данных предоставляет массив выходных буферов, по одному для каждого объекта IDTSOutput100, соединенного с нижестоящим компонентом, в качестве параметра метода PrimeOutput. Исходные компоненты и компоненты преобразования с асинхронными выходами добавляют строки в буферы и вызывают метод SetEndOfRowset по окончании добавления строк. Задача потока данных управляет выходными буферами, предоставляемыми ею компонентам, и, по мере заполнения буфера, автоматически перемещает строки в буфере в следующий компонент. Метод PrimeOutput вызывается один раз для каждого компонента, в отличие от метода ProcessInput, вызываемого неоднократно.
В следующем примере кода показано, как компонент добавляет строки в свои выходные буферы во время выполнения метода PrimeOutput, а затем вызывает метод SetEndOfRowset.
public override void PrimeOutput( int outputs, int []outputIDs,PipelineBuffer []buffers)
{
for( int x=0; x < outputs; x++ )
{
IDTSOutput100 output = ComponentMetaData.OutputCollection.GetObjectByID( outputIDs[x]);
PipelineBuffer buffer = buffers[x];
// TODO: Add rows to the output buffer.
}
foreach( PipelineBuffer buffer in buffers )
{
/// Notify the data flow task that no more rows are coming.
buffer.SetEndOfRowset();
}
}
public overrides sub PrimeOutput( outputs as Integer , outputIDs() as Integer ,buffers() as PipelineBuffer buffers)
For x As Integer = 0 To outputs.MaxValue
Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection.GetObjectByID(outputIDs(x))
Dim buffer As PipelineBuffer = buffers(x)
' TODO: Add rows to the output buffer.
Next
For Each buffer As PipelineBuffer In buffers
' Notify the data flow task that no more rows are coming.
buffer.SetEndOfRowset()
Next
End Sub
Дополнительные сведения о разработке компонентов, добавляющих строки в выходные буферы, см. в разделах Разработка пользовательского компонента источника и Разработка пользовательского компонента преобразования с асинхронными выходами.
Получение строк
Компоненты получают строки из вышестоящих компонентов в объектах PipelineBuffer. Задача потока данных передает объект PipelineBuffer, который содержит строки, добавленные в поток данных вышестоящими компонентами, в качестве параметра метода ProcessInput. Этот входной буфер можно использовать для проверки и изменения строк и столбцов в буфере, но непригоден для добавления или удаления строк. Метод ProcessInput вызывается повторно до тех пор, пока не останется ни одного доступного буфера. При последнем вызове свойство EndOfRowset имеет значение true. Можно просматривать коллекцию строк в буфере с помощью метода NextRow, при этом буфер перемещается в следующую строку. Этот метод возвращает значение false, когда буфер располагается в последней строке коллекции. Нет необходимости проверять значение свойства EndOfRowset, если не требуется выполнить дополнительное действие после обработки последних строк данных.
Ниже показан правильный шаблон использования метода NextRow и свойства EndOfRowset.
while (buffer.NextRow())
{
// Do something with each row.
}
if (buffer.EndOfRowset)
{
// Optionally, do something after all rows have been processed.
}
В следующем примере кода демонстрируется, как компонент обрабатывает строки во входных буферах во время выполнения метода ProcessInput.
public override void ProcessInput( int inputID, PipelineBuffer buffer )
{
{
IDTSInput100 input = ComponentMetaData.InputCollection.GetObjectByID(inputID);
while( buffer.NextRow())
{
// TODO: Examine the columns in the current row.
}
}
Public Overrides Sub ProcessInput(ByVal inputID As Integer, ByVal buffer As PipelineBuffer)
Dim input As IDTSInput100 = ComponentMetaData.InputCollection.GetObjectByID(inputID)
While buffer.NextRow() = True
' TODO: Examine the columns in the current row.
End While
End Sub
Дополнительные сведения о разработке компонентов, получающих строки во входные буферы, см. в разделах Разработка пользовательского компонента назначения и Разработка пользовательского компонента преобразования с синхронными выходами.
|