Métodos de tempo de execução de um componente de fluxo de dados
No tempo de execução, a tarefa de fluxo de dados examina a sequência de componentes, prepara um plano de execução e gerencia um pool de threads de trabalho que executa o plano de trabalho. A tarefa carrega linhas de dados de origens, processa essas linhas através de transformações e as salva em destinos.
Sequência de execução do método
Durante a execução de um componente de fluxo de dados, um subconjunto dos métodos na classe base PipelineComponent é chamado. Os métodos e a sequência na qual eles são chamados são sempre os mesmos, com exceção dos métodos PrimeOutput e ProcessInput. Esses dois métodos são chamados com base na existência e na configuração dos objetos IDTSInput100 e IDTSOutput100 de um componente.
A lista seguinte mostra os métodos na ordem em que são chamados durante a execução do componente. Observe que PrimeOutput, quando chamado, é sempre chamado antes do ProcessInput.
Método PrimeOutput
O método PrimeOutput é chamado quando um componente tem pelo menos uma saída anexada a um componente downstream por meio de um objeto IDTSPath100, e a propriedade SynchronousInputID da saída é zero. O método PrimeOutput é chamado para componentes de origem e para transformações com saídas assíncronas. Ao contrário do método ProcessInput descrito abaixo, o método PrimeOutput só é chamado uma vez para cada componente que o requer.
Método ProcessInput
O método ProcessInput é chamado para componentes que têm pelo menos uma entrada anexada a um componente upstream por um objeto IDTSPath100. O método ProcessInput é chamado para componentes de destino e para transformações com saídas síncronas. ProcessInput é chamado repetidamente até que não haja mais nenhuma linha para processamento de componentes upstream.
Funcionando com entradas e saídas
Em tempo de execução, componentes de fluxo de dados executam as tarefas seguintes:
Componentes de origem adicionam linhas.
Componentes de transformação com saídas síncronas recebem linhas fornecidas por componentes de origem.
Componentes de transformação com saídas assíncronas recebem linhas e adicionam linhas.
Componentes de destino recebem linhas e as carregam em um destino.
Durante a execução, a tarefa de fluxo de dados aloca objetos PipelineBuffer que contêm todas as colunas definidas nas coleções de colunas de saída de uma sequência de componentes. Por exemplo, se cada um dos quatro componentes em uma sequência de fluxo de dados adicionar uma coluna de saída à sua coleção de colunas de saída, o buffer fornecido a cada componente conterá quatro colunas, uma para cada coluna de saída por componente. Por causa desse comportamento, às vezes um componente recebe buffers que contêm colunas que ele não usa.
Como os buffers recebidos pelo seu componente podem conter colunas que o componente não vai usar, você deve localizar as colunas que quer usar nas coleções de colunas de entrada e saída do seu componente no buffer fornecido ao componente pela tarefa de fluxo de dados. Para isso, use o método FindColumnByLineageID da propriedade BufferManager. Por razões de desempenho, essa tarefa é executada ordinariamente durante o método PreExecute, em lugar de em PrimeOutput ou ProcessInput.
PreExecute é chamado antes dos métodos PrimeOutput e ProcessInput, e é a primeira oportunidade para um componente realizar esse trabalho depois que BufferManager fica disponível para o componente. Durante esse método, o componente deve localizar suas colunas nos buffers e armazenar essas informações internamente, de forma que as colunas possam ser usadas nos métodos PrimeOutput ou ProcessInput.
O exemplo de código seguinte demonstra como um componente de transformação com uma saída síncrona localiza suas colunas de entrada no buffer durante o PreExecute.
private int []bufferColumnIndex;
public override void PreExecute()
{
IDTSInput100 input = ComponentMetaData.InputCollection[0];
bufferColumnIndex = new int[input.InputColumnCollection.Count];
for( int x=0; x < input.InputColumnCollection.Count; x++)
{
IDTSInputColumn100 column = input.InputColumnCollection[x];
bufferColumnIndex[x] = BufferManager.FindColumnByLineageID( input.Buffer, column.LineageID);
}
}
Dim bufferColumnIndex As Integer()
Public Overrides Sub PreExecute()
Dim input As IDTSInput100 = ComponentMetaData.InputCollection(0)
ReDim bufferColumnIndex(input.InputColumnCollection.Count)
For x As Integer = 0 To input.InputColumnCollection.Count
Dim column As IDTSInputColumn100 = input.InputColumnCollection(x)
bufferColumnIndex(x) = BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID)
Next
End Sub
Adicionando linhas
Os componentes fornecem linhas a componentes downstream acrescentando linhas aos objetos PipelineBuffer. A tarefa de fluxo de dados fornece uma matriz de buffers de saída – uma para cada objeto IDTSOutput100 que é conectado a um componente downstream – como um parâmetro para o método PrimeOutput. Os componentes de origem e os componentes de transformação com saídas assíncronas acrescentam linhas aos buffers e chamam o método SetEndOfRowset quando terminam de adicionar linhas. A tarefa de fluxo de dados gerencia os buffers de saída que fornece aos componentes e, quando um buffer fica cheio, automaticamente move as linhas do buffer para o próximo componente. O método PrimeOutput é chamado uma vez por componente, ao contrário do método ProcessInput, que é chamado repetidamente.
O exemplo de código seguinte demonstra como um componente acrescenta linhas aos seus buffers de saída durante o método PrimeOutput, em seguida chama o método SetEndOfRowset.
public override void PrimeOutput( int outputs, int []outputIDs,PipelineBuffer []buffers)
{
for( int x=0; x < outputs; x++ )
{
IDTSOutput100 output = ComponentMetaData.OutputCollection.GetObjectByID( outputIDs[x]);
PipelineBuffer buffer = buffers[x];
// TODO: Add rows to the output buffer.
}
foreach( PipelineBuffer buffer in buffers )
{
/// Notify the data flow task that no more rows are coming.
buffer.SetEndOfRowset();
}
}
public overrides sub PrimeOutput( outputs as Integer , outputIDs() as Integer ,buffers() as PipelineBuffer buffers)
For x As Integer = 0 To outputs.MaxValue
Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection.GetObjectByID(outputIDs(x))
Dim buffer As PipelineBuffer = buffers(x)
' TODO: Add rows to the output buffer.
Next
For Each buffer As PipelineBuffer In buffers
' Notify the data flow task that no more rows are coming.
buffer.SetEndOfRowset()
Next
End Sub
Para obter mais informações sobre como desenvolver componentes que adicionam linhas a buffers de saída, consulte Desenvolvendo um componente de fonte personalizado e Desenvolvendo um componente de transformação personalizado com saídas assíncronas.
Recebendo linhas
Os componentes recebem linhas de componentes upstream em objetos PipelineBuffer. A tarefa de fluxo de dados fornece um objeto PipelineBuffer que contém as linhas adicionadas ao fluxo de dados pelos componentes upstream como um parâmetro para o método ProcessInput. Esse buffer de entrada pode ser usado para examinar e modificar as linhas e colunas no buffer, mas não pode ser usado para adicionar ou remover linhas. O método ProcessInput é chamado repetidamente até que não haja mais nenhum buffer disponível. Na última vez que é chamado, a propriedade EndOfRowset é true. Você pode repetir a coleção de linhas no buffer usando o método NextRow, que avança o buffer à próxima linha. Esse método retorna false quando o buffer está na última linha da coleção. Não é necessário verificar a propriedade EndOfRowset, a menos que você tenha de executar uma ação adicional depois que as últimas linhas de dados forem processadas.
O texto abaixo mostra o padrão correto para usar o método NextRow e a propriedade EndOfRowset:
while (buffer.NextRow())
{
// Do something with each row.
}
if (buffer.EndOfRowset)
{
// Optionally, do something after all rows have been processed.
}
O exemplo de código a seguir demonstra como um componente processa as linhas em buffers de entrada durante o método ProcessInput.
public override void ProcessInput( int inputID, PipelineBuffer buffer )
{
{
IDTSInput100 input = ComponentMetaData.InputCollection.GetObjectByID(inputID);
while( buffer.NextRow())
{
// TODO: Examine the columns in the current row.
}
}
Public Overrides Sub ProcessInput(ByVal inputID As Integer, ByVal buffer As PipelineBuffer)
Dim input As IDTSInput100 = ComponentMetaData.InputCollection.GetObjectByID(inputID)
While buffer.NextRow() = True
' TODO: Examine the columns in the current row.
End While
End Sub
Para obter mais informações sobre como desenvolver componentes que recebem linhas em buffers de entrada, consulte Desenvolvendo um componente de destino personalizado e Desenvolvendo um componente de transformação personalizado com saídas síncronas.
|
Consulte também
Conceitos
Métodos de tempo de design de um componente de fluxo de dados