Partilhar via


Desenvolvendo um componente de fonte personalizado

Aplica-se a: SQL Server SSIS Integration Runtime no Azure Data Factory

O SQL Server Integration Services habilita desenvolvedores a escrever componentes de origem que podem ser conectados a fontes de dados personalizadas e fornecer dados dessas fontes a outros componentes em uma tarefa de fluxo de dados. A capacidade de criar fontes personalizadas é valiosa quando você precisa se conectar a fontes de dados que não podem ser acessadas através de uma das fontes existentes do Integration Services.

Componentes de origem têm uma ou mais saídas e zero entradas. Em tempo de design, os componentes de origem são usados para criar e configurar conexões, ler metadados de colunas a partir da fonte de dados externa e configurar as colunas da saída da origem com base na fonte de dados externa. Durante a execução, eles se conectam à fonte de dados externa e adicionam linhas a um buffer de saída. A tarefa de fluxo de dados fornece esse buffer de linhas de dados a componentes downstream.

Para obter uma visão geral do desenvolvimento de componentes de fluxo de dados, consulte Desenvolvendo um componente de fluxo de dados personalizado.

Tempo de design

A implementação da funcionalidade em tempo de design de um componente de origem envolve a especificação de uma conexão a uma fonte de dados externa, a adição e configuração de colunas de saída que refletem a fonte dados, e a confirmação de que o componente está pronto para execução. Por definição, um componente de origem tem zero entradas e uma ou mais saídas assíncronas.

Criando o componente

Os componentes de origem se conectam a fontes de dados externas através de objetos ConnectionManager definidos em um pacote. Eles indicam o seu requisito para um gerenciador de conexões por meio da adição de um elemento à coleção RuntimeConnectionCollection da propriedade ComponentMetaData. Essa coleção tem duas finalidades: manter referências a gerenciadores de conexões no pacote usado pelo componente e divulgar a necessidade de um gerenciador de conexões para o designer. Quando um IDTSRuntimeConnection100 é adicionado à coleção, o Editor Avançado exibe a guia Propriedades da Conexão que permite aos usuários selecionar ou criar uma conexão no pacote.

O exemplo de código a seguir mostra uma implementação de ProvideComponentProperties que adiciona uma saída e um objeto IDTSRuntimeConnection100 ao RuntimeConnectionCollection.

using System;  
using System.Collections;  
using System.Data;  
using System.Data.SqlClient;  
using System.Data.OleDb;  
using Microsoft.SqlServer.Dts.Runtime;  
using Microsoft.SqlServer.Dts.Runtime.Wrapper;  
using Microsoft.SqlServer.Dts.Pipeline;  
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;  
  
namespace Microsoft.Samples.SqlServer.Dts  
{  
    [DtsPipelineComponent(DisplayName = "MySourceComponent",ComponentType = ComponentType.SourceAdapter)]  
    public class MyComponent : PipelineComponent  
    {  
        public override void ProvideComponentProperties()  
        {  
            // Reset the component.  
            base.RemoveAllInputsOutputsAndCustomProperties();  
            ComponentMetaData.RuntimeConnectionCollection.RemoveAll();  
  
            IDTSOutput100 output = ComponentMetaData.OutputCollection.New();  
            output.Name = "Output";  
  
            IDTSRuntimeConnection100 connection = ComponentMetaData.RuntimeConnectionCollection.New();  
            connection.Name = "ADO.NET";  
        }  
Imports System.Data  
Imports System.Data.SqlClient  
Imports Microsoft.SqlServer.Dts.Runtime  
Imports Microsoft.SqlServer.Dts.Runtime.Wrapper  
Imports Microsoft.SqlServer.Dts.Pipeline  
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper  
  
<DtsPipelineComponent(DisplayName:="MySourceComponent", ComponentType:=ComponentType.SourceAdapter)> _  
Public Class MySourceComponent  
    Inherits PipelineComponent  
  
    Public Overrides Sub ProvideComponentProperties()  
  
        ' Allow for resetting the component.  
        RemoveAllInputsOutputsAndCustomProperties()  
        ComponentMetaData.RuntimeConnectionCollection.RemoveAll()  
  
        Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection.New()  
        output.Name = "Output"  
  
        Dim connection As IDTSRuntimeConnection100 = ComponentMetaData.RuntimeConnectionCollection.New()  
        connection.Name = "ADO.NET"  
  
    End Sub  
End Class  

Conectando-se a uma fonte de dados externa

Depois da adição de uma conexão ao RuntimeConnectionCollection, substitua o método AcquireConnections para estabelecer uma conexão à fonte de dados externa. Esse método é chamado durante o tempo de design e execução. O componente deve estabelecer uma conexão com o gerenciador de conexões especificado pela conexão em tempo de execução e, depois, com a fonte de dados externa.

Depois de estabelecida, a conexão deve ser armazenada em cache interiormente pelo componente e liberada quando o método ReleaseConnections é chamado. O método ReleaseConnections é chamado em tempo de design e execução, como o método AcquireConnections. Os desenvolvedores substituem esse método e liberam a conexão estabelecida pelo componente durante o AcquireConnections.

O exemplo de código a seguir mostra um componente que se conecta a uma conexão ADO.NET no método AcquireConnections e fecha a conexão no método ReleaseConnections.

private SqlConnection sqlConnection;  
  
public override void AcquireConnections(object transaction)  
{  
    if (ComponentMetaData.RuntimeConnectionCollection[0].ConnectionManager != null)  
    {  
        ConnectionManager cm = Microsoft.SqlServer.Dts.Runtime.DtsConvert.GetWrapper(ComponentMetaData.RuntimeConnectionCollection[0].ConnectionManager);  
        ConnectionManagerAdoNet cmado = cm.InnerObject as ConnectionManagerAdoNet;  
  
        if (cmado == null)  
            throw new Exception("The ConnectionManager " + cm.Name + " is not an ADO.NET connection.");  
  
        sqlConnection = cmado.AcquireConnection(transaction) as SqlConnection;  
        sqlConnection.Open();  
    }  
}  
  
public override void ReleaseConnections()  
{  
    if (sqlConnection != null && sqlConnection.State != ConnectionState.Closed)  
        sqlConnection.Close();  
}  
Private sqlConnection As SqlConnection  
  
Public Overrides Sub AcquireConnections(ByVal transaction As Object)  
  
    If Not IsNothing(ComponentMetaData.RuntimeConnectionCollection(0).ConnectionManager) Then  
  
        Dim cm As ConnectionManager = Microsoft.SqlServer.Dts.Runtime.DtsConvert.GetWrapper(ComponentMetaData.RuntimeConnectionCollection(0).ConnectionManager)  
        Dim cmado As ConnectionManagerAdoNet = CType(cm.InnerObject, ConnectionManagerAdoNet)  
  
        If IsNothing(cmado) Then  
            Throw New Exception("The ConnectionManager " + cm.Name + " is not an ADO.NET connection.")  
        End If  
  
        sqlConnection = CType(cmado.AcquireConnection(transaction), SqlConnection)  
        sqlConnection.Open()  
  
    End If  
End Sub  
  
Public Overrides Sub ReleaseConnections()  
  
    If Not IsNothing(sqlConnection) And sqlConnection.State <> ConnectionState.Closed Then  
        sqlConnection.Close()  
    End If  
  
End Sub  

Criando e configurando colunas de saída

As colunas de saída de um componente de origem refletem colunas da fonte de dados externa que o componente adiciona ao fluxo de dados durante a execução. Em tempo de design, você adiciona colunas de saída depois da configuração do componente para se conectar a uma fonte de dados externa. O método em tempo de design que um componente usa para adicionar as colunas à sua coleção de saídas varia de acordo com as necessidades do componente, embora elas não devam ser adicionadas durante o Validate ou o AcquireConnections. Por exemplo, um componente contendo uma instrução SQL em uma propriedade personalizada que controla os dados definidos para o componente pode adicionar suas colunas de saída durante o método SetComponentProperty. O componente verifica se possui uma conexão em cache e, em caso afirmativo, conecta-se à fonte de dados e gera suas colunas de saída.

Depois da criação de uma coluna de saída, defina suas propriedades de tipo de dados chamando o método SetDataTypeProperties. Esse método é necessário porque as propriedades DataType, Length, Precision e CodePage são somente leitura e elas dependem das definições umas das outras. Esse método força a definição consistente desses valores, e a tarefa de fluxo de dados valida que eles sejam definidos corretamente.

O DataType da coluna determina os valores que são definidos para as outras propriedades. A tabela a seguir mostra os requisitos nas propriedades dependentes para cada DataType. Os tipos de dados não listados têm as propriedades dependentes definidas como zero.

Tipo de dados Tamanho Escala Precisão CodePage
DT_DECIMAL 0 Maior que 0 e menor ou igual a 28. 0 0
DT_CY 0 0 0 0
DT_NUMERIC 0 Maior que 0 e menor ou igual a 28, e menor que a Precisão. Maior ou igual a 1 e menor ou igual a 38. 0
DT_BYTES Maior que 0. 0 0 0
DT_STR Maior que 0 e menor que 8000. 0 0 Não 0 e uma página de código válida.
DT_WSTR Maior que 0 e menor que 4.000. 0 0 0

Como as restrições nas propriedades de tipo de dados são baseadas no tipo de dados da coluna de saída, você deve escolher o tipo de dados correto do SSIS quando trabalha com tipos gerenciados. A classe base fornece três métodos auxiliares, ConvertBufferDataTypeToFitManaged, BufferTypeToDataRecordType e DataRecordTypeToBufferType, para ajudar desenvolvedores de componentes gerenciados a selecionar um tipo de dados SSIS, considerando um tipo gerenciado. Esses métodos convertem tipos de dados gerenciados em tipos de dados do SSIS e vice-versa.

O exemplo de código a seguir mostra como a coleção de colunas de saída de um componente é preenchida com base no esquema de uma tabela. Os métodos auxiliares da classe base são usados para definir o tipo de dados da coluna e as propriedades dependentes são definidas com base no tipo de dados.

SqlCommand sqlCommand;  
  
private void CreateColumnsFromDataTable()  
{  
    // Get the output.  
    IDTSOutput100 output = ComponentMetaData.OutputCollection[0];  
  
    // Start clean, and remove the columns from both collections.  
    output.OutputColumnCollection.RemoveAll();  
    output.ExternalMetadataColumnCollection.RemoveAll();  
  
    this.sqlCommand = sqlConnection.CreateCommand();  
    this.sqlCommand.CommandType = CommandType.Text;  
    this.sqlCommand.CommandText = (string)ComponentMetaData.CustomPropertyCollection["SqlStatement"].Value;  
    SqlDataReader schemaReader = this.sqlCommand.ExecuteReader(CommandBehavior.SchemaOnly);  
    DataTable dataTable = schemaReader.GetSchemaTable();  
  
    // Walk the columns in the schema,   
    // and for each data column create an output column and an external metadata column.  
    foreach (DataRow row in dataTable.Rows)  
    {  
        IDTSOutputColumn100 outColumn = output.OutputColumnCollection.New();  
        IDTSExternalMetadataColumn100 exColumn = output.ExternalMetadataColumnCollection.New();  
  
        // Set column data type properties.  
        bool isLong = false;  
        DataType dt = DataRecordTypeToBufferType((Type)row["DataType"]);  
        dt = ConvertBufferDataTypeToFitManaged(dt, ref isLong);  
        int length = 0;  
        int precision = (short)row["NumericPrecision"];  
        int scale = (short)row["NumericScale"];  
        int codepage = dataTable.Locale.TextInfo.ANSICodePage;  
  
        switch (dt)  
        {  
            // The length cannot be zero, and the code page property must contain a valid code page.  
            case DataType.DT_STR:  
            case DataType.DT_TEXT:  
                length = precision;  
                precision = 0;  
                scale = 0;  
                break;  
  
            case DataType.DT_WSTR:  
                length = precision;  
                codepage = 0;  
                scale = 0;  
                precision = 0;  
                break;  
  
            case DataType.DT_BYTES:  
                precision = 0;  
                scale = 0;  
                codepage = 0;  
                break;  
  
            case DataType.DT_NUMERIC:  
                length = 0;  
                codepage = 0;  
  
                if (precision > 38)  
                    precision = 38;  
  
                if (scale > 6)  
                    scale = 6;  
                break;  
  
            case DataType.DT_DECIMAL:  
                length = 0;  
                precision = 0;  
                codepage = 0;  
                break;  
  
            default:  
                length = 0;  
                precision = 0;  
                codepage = 0;  
                scale = 0;  
                break;  
  
        }  
  
        // Set the properties of the output column.  
        outColumn.Name = (string)row["ColumnName"];  
        outColumn.SetDataTypeProperties(dt, length, precision, scale, codepage);  
    }  
}  
Private sqlCommand As SqlCommand  
  
Private Sub CreateColumnsFromDataTable()  
  
    ' Get the output.  
    Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0)  
  
    ' Start clean, and remove the columns from both collections.  
    output.OutputColumnCollection.RemoveAll()  
    output.ExternalMetadataColumnCollection.RemoveAll()  
  
    Me.sqlCommand = sqlConnection.CreateCommand()  
    Me.sqlCommand.CommandType = CommandType.Text  
    Me.sqlCommand.CommandText = CStr(ComponentMetaData.CustomPropertyCollection("SqlStatement").Value)  
  
    Dim schemaReader As SqlDataReader = Me.sqlCommand.ExecuteReader(CommandBehavior.SchemaOnly)  
    Dim dataTable As DataTable = schemaReader.GetSchemaTable()  
  
    ' Walk the columns in the schema,   
    ' and for each data column create an output column and an external metadata column.  
    For Each row As DataRow In dataTable.Rows  
  
        Dim outColumn As IDTSOutputColumn100 = output.OutputColumnCollection.New()  
        Dim exColumn As IDTSExternalMetadataColumn100 = output.ExternalMetadataColumnCollection.New()  
  
        ' Set column data type properties.  
        Dim isLong As Boolean = False  
        Dim dt As DataType = DataRecordTypeToBufferType(CType(row("DataType"), Type))  
        dt = ConvertBufferDataTypeToFitManaged(dt, isLong)  
        Dim length As Integer = 0  
        Dim precision As Integer = CType(row("NumericPrecision"), Short)  
        Dim scale As Integer = CType(row("NumericScale"), Short)  
        Dim codepage As Integer = dataTable.Locale.TextInfo.ANSICodePage  
  
        Select Case dt  
  
            ' The length cannot be zero, and the code page property must contain a valid code page.  
            Case DataType.DT_STR  
            Case DataType.DT_TEXT  
                length = precision  
                precision = 0  
                scale = 0  
  
            Case DataType.DT_WSTR  
                length = precision  
                codepage = 0  
                scale = 0  
                precision = 0  
  
            Case DataType.DT_BYTES  
                precision = 0  
                scale = 0  
                codepage = 0  
  
            Case DataType.DT_NUMERIC  
                length = 0  
                codepage = 0  
  
                If precision > 38 Then  
                    precision = 38  
                End If  
  
                If scale > 6 Then  
                    scale = 6  
                End If  
  
            Case DataType.DT_DECIMAL  
                length = 0  
                precision = 0  
                codepage = 0  
  
            Case Else  
                length = 0  
                precision = 0  
                codepage = 0  
                scale = 0  
        End Select  
  
        ' Set the properties of the output column.  
        outColumn.Name = CStr(row("ColumnName"))  
        outColumn.SetDataTypeProperties(dt, length, precision, scale, codepage)  
    Next  
End Sub  

Validando o componente

Procure validar um componente de origem e verificar se as colunas definidas nas suas coleções de colunas de saída coincidem com as colunas da fonte de dados externa. Às vezes, pode ser impossível verificar as colunas de saída em relação à fonte de dados externa, como, por exemplo, em um estado desconectado ou quando é preferível evitar viagens de ida e volta demoradas ao servidor. Nessas situações, as colunas da saída ainda podem ser validadas através do ExternalMetadataColumnCollection do objeto de saída. Para obter mais informações, consulte Validando um componente de fluxo de dados.

Essa coleção existe em objetos de entrada e saída e você pode preenchê-la com as colunas da fonte de dados externa. Use essa coleção para validar as colunas de saída quando o Designer do SSIS estiver offline, quando o componente estiver desconectado ou quando a propriedade ValidateExternalMetadata for false. A coleção deve ser preenchida primeiro ao mesmo tempo que as colunas de saída são criadas. É relativamente fácil adicionar colunas de metadados externas à coleção pois a coluna de metadados externa inicialmente deve corresponder à coluna de saída. As propriedades de tipo de dados da coluna já deveriam ter sido definidas corretamente e as propriedades podem ser copiadas diretamente no objeto IDTSExternalMetadataColumn100.

O código de exemplo a seguir adiciona uma coluna de metadados externa que se baseia em uma coluna de saída recém-criada. Ele pressupõe que a coluna de saída já foi criada.

private void CreateExternalMetaDataColumn(IDTSOutput100 output, IDTSOutputColumn100 outputColumn)  
{  
  
    // Set the properties of the external metadata column.  
    IDTSExternalMetadataColumn100 externalColumn = output.ExternalMetadataColumnCollection.New();  
    externalColumn.Name = outputColumn.Name;  
    externalColumn.Precision = outputColumn.Precision;  
    externalColumn.Length = outputColumn.Length;  
    externalColumn.DataType = outputColumn.DataType;  
    externalColumn.Scale = outputColumn.Scale;  
  
    // Map the external column to the output column.  
    outputColumn.ExternalMetadataColumnID = externalColumn.ID;  
  
}  
Private Sub CreateExternalMetaDataColumn(ByVal output As IDTSOutput100, ByVal outputColumn As IDTSOutputColumn100)  
  
        ' Set the properties of the external metadata column.  
        Dim externalColumn As IDTSExternalMetadataColumn100 = output.ExternalMetadataColumnCollection.New()  
        externalColumn.Name = outputColumn.Name  
        externalColumn.Precision = outputColumn.Precision  
        externalColumn.Length = outputColumn.Length  
        externalColumn.DataType = outputColumn.DataType  
        externalColumn.Scale = outputColumn.Scale  
  
        ' Map the external column to the output column.  
        outputColumn.ExternalMetadataColumnID = externalColumn.ID  
  
    End Sub  

Tempo de execução

Durante a execução, componentes adicionam linhas para gerar buffers que são criados pela tarefa de fluxo de dados e são fornecidos ao componente em PrimeOutput. Chamado uma vez para componentes de origem, o método recebe um buffer de saída para cada IDTSOutput100 do componente que é conectado a um componente downstream.

Localizando colunas no buffer

O buffer de saída para um componente contém as colunas definidas pelo componente e quaisquer colunas adicionadas à saída de um componente downstream. Por exemplo, se um componente de origem fornecer três colunas na saída e o próximo componente adicionar uma quarta coluna de saída, o buffer de saída fornecido para uso pelo componente de origem conterá essas quatro colunas.

A ordem das colunas em uma linha de buffer não é definida pelo índice da coluna de saída na coleção de colunas de saída. Uma coluna de saída só pode ser localizada com precisão em uma linha de buffer através do método FindColumnByLineageID do BufferManager. Esse método localiza a coluna com a ID de linhagem especificada no buffer especificado e retorna sua localização na linha. Os índices das colunas de saída costumam estar localizados no método PreExecute e são armazenados para uso durante o PrimeOutput.

O exemplo de código a seguir indica a localização das colunas de saída no buffer de saída durante uma chamada ao PreExecute e armazena-as em uma estrutura interna. O nome da coluna também é armazenado na estrutura e é usado no exemplo de código para o método PrimeOutput na próxima seção desse tópico.

ArrayList columnInformation;  
  
private struct ColumnInfo  
{  
    public int BufferColumnIndex;  
    public string ColumnName;  
}  
  
public override void PreExecute()  
{  
    this.columnInformation = new ArrayList();  
    IDTSOutput100 output = ComponentMetaData.OutputCollection[0];  
  
    foreach (IDTSOutputColumn100 col in output.OutputColumnCollection)  
    {  
        ColumnInfo ci = new ColumnInfo();  
        ci.BufferColumnIndex = BufferManager.FindColumnByLineageID(output.Buffer, col.LineageID);  
        ci.ColumnName = col.Name;  
        columnInformation.Add(ci);  
    }  
}  
Public Overrides Sub PreExecute()  
  
    Me.columnInformation = New ArrayList()  
    Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0)  
  
    For Each col As IDTSOutputColumn100 In output.OutputColumnCollection  
  
        Dim ci As ColumnInfo = New ColumnInfo()  
        ci.BufferColumnIndex = BufferManager.FindColumnByLineageID(output.Buffer, col.LineageID)  
        ci.ColumnName = col.Name  
        columnInformation.Add(ci)  
    Next  
End Sub  

Processando linhas

São adicionadas linhas ao buffer de saída chamando o método AddRow que cria uma nova linha de buffer com valores vazios em suas colunas. O componente atribui valores a cada coluna. Os buffers de saída fornecidos a um componente são criados e monitorados pela tarefa de fluxo de dados. Quando ficam cheias, a linhas do buffer são transferidas para o próximo componente. Não é possível determinar quando um lote de linhas é enviado ao próximo componente pois a movimentação de linhas pela tarefa de fluxo de dados é transparente para o desenvolvedor de componentes, e a propriedade RowCount é sempre zero nos buffers de saída. Quando um componente de origem termina de adicionar linhas ao seu buffer de saída, ele notifica a tarefa de fluxo de dados, chamando o método SetEndOfRowset do PipelineBuffer, e as demais linhas do buffer são transmitidas ao próximo componente.

Apesar de o componente de origem ler linhas da fonte de dados externa, talvez você queira atualizar o contador de desempenho "Linhas lidas" ou "Bytes de BLOB lidos", chamando o método IncrementPipelinePerfCounter. Para obter mais informações, consulte Performance Counters.

O exemplo de código a seguir mostra um componente que adiciona linhas a um buffer de saída em PrimeOutput. Os índices das colunas de saída no buffer foram localizados através do PreExecute no exemplo de código anterior.

public override void PrimeOutput(int outputs, int[] outputIDs, PipelineBuffer[] buffers)  
{  
    IDTSOutput100 output = ComponentMetaData.OutputCollection[0];  
    PipelineBuffer buffer = buffers[0];  
  
    SqlDataReader dataReader = sqlCommand.ExecuteReader();  
  
    // Loop over the rows in the DataReader,   
    // and add them to the output buffer.  
    while (dataReader.Read())  
    {  
        // Add a row to the output buffer.  
        buffer.AddRow();  
  
        for (int x = 0; x < columnInformation.Count; x++)  
        {  
            ColumnInfo ci = (ColumnInfo)columnInformation[x];  
            int ordinal = dataReader.GetOrdinal(ci.ColumnName);  
  
            if (dataReader.IsDBNull(ordinal))  
                buffer.SetNull(ci.BufferColumnIndex);  
            else  
            {  
                buffer[ci.BufferColumnIndex] = dataReader[ci.ColumnName];  
            }  
        }  
    }  
    buffer.SetEndOfRowset();  
}  
Public Overrides Sub PrimeOutput(ByVal outputs As Integer, ByVal outputIDs As Integer(), ByVal buffers As PipelineBuffer())  
  
    Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0)  
    Dim buffer As PipelineBuffer = buffers(0)  
  
    Dim dataReader As SqlDataReader = sqlCommand.ExecuteReader()  
  
    ' Loop over the rows in the DataReader,   
    ' and add them to the output buffer.  
    While (dataReader.Read())  
  
        ' Add a row to the output buffer.  
        buffer.AddRow()  
  
        For x As Integer = 0 To columnInformation.Count  
  
            Dim ci As ColumnInfo = CType(columnInformation(x), ColumnInfo)  
  
            Dim ordinal As Integer = dataReader.GetOrdinal(ci.ColumnName)  
  
            If (dataReader.IsDBNull(ordinal)) Then  
                buffer.SetNull(ci.BufferColumnIndex)  
            Else  
                buffer(ci.BufferColumnIndex) = dataReader(ci.ColumnName)  
  
            End If  
        Next  
  
    End While  
  
    buffer.SetEndOfRowset()  
End Sub  

Amostra

O exemplo a seguir mostra um componente de origem simples que usa um gerenciador de conexões do Arquivo para carregar o conteúdo binário dos arquivos no fluxo de dados. Esse exemplo não demonstra todos os métodos e todas as funcionalidades discutidas nesse tópico. Ele demonstra os métodos importantes que todo componente de origem personalizado deve substituir, mas não contém código para a validação em tempo de design.

using System;  
using System.IO;  
using Microsoft.SqlServer.Dts.Pipeline;  
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;  
using Microsoft.SqlServer.Dts.Runtime.Wrapper;  
  
namespace BlobSrc  
{  
  [DtsPipelineComponent(DisplayName = "BLOB Inserter Source", Description = "Inserts files into the data flow as BLOBs")]  
  public class BlobSrc : PipelineComponent  
  {  
    IDTSConnectionManager100 m_ConnMgr;  
    int m_FileNameColumnIndex = -1;  
    int m_FileBlobColumnIndex = -1;  
  
    public override void ProvideComponentProperties()  
    {  
      IDTSOutput100 output = ComponentMetaData.OutputCollection.New();  
      output.Name = "BLOB File Inserter Output";  
  
      IDTSOutputColumn100 column = output.OutputColumnCollection.New();  
      column.Name = "FileName";  
      column.SetDataTypeProperties(DataType.DT_WSTR, 256, 0, 0, 0);  
  
      column = output.OutputColumnCollection.New();  
      column.Name = "FileBLOB";  
      column.SetDataTypeProperties(DataType.DT_IMAGE, 0, 0, 0, 0);  
  
      IDTSRuntimeConnection100 conn = ComponentMetaData.RuntimeConnectionCollection.New();  
      conn.Name = "FileConnection";  
    }  
  
    public override void AcquireConnections(object transaction)  
    {  
      IDTSRuntimeConnection100 conn = ComponentMetaData.RuntimeConnectionCollection[0];  
      m_ConnMgr = conn.ConnectionManager;  
    }  
  
    public override void ReleaseConnections()  
    {  
      m_ConnMgr = null;  
    }  
  
    public override void PreExecute()  
    {  
      IDTSOutput100 output = ComponentMetaData.OutputCollection[0];  
  
      m_FileNameColumnIndex = (int)BufferManager.FindColumnByLineageID(output.Buffer, output.OutputColumnCollection[0].LineageID);  
      m_FileBlobColumnIndex = (int)BufferManager.FindColumnByLineageID(output.Buffer, output.OutputColumnCollection[1].LineageID);  
    }  
  
    public override void PrimeOutput(int outputs, int[] outputIDs, PipelineBuffer[] buffers)  
    {  
      string strFileName = (string)m_ConnMgr.AcquireConnection(null);  
  
      while (strFileName != null)  
      {  
        buffers[0].AddRow();  
  
        buffers[0].SetString(m_FileNameColumnIndex, strFileName);  
  
        FileInfo fileInfo = new FileInfo(strFileName);  
        byte[] fileData = new byte[fileInfo.Length];  
        FileStream fs = new FileStream(strFileName, FileMode.Open, FileAccess.Read, FileShare.Read);  
        fs.Read(fileData, 0, fileData.Length);  
  
        buffers[0].AddBlobData(m_FileBlobColumnIndex, fileData);  
  
        strFileName = (string)m_ConnMgr.AcquireConnection(null);  
      }  
  
      buffers[0].SetEndOfRowset();  
    }  
  }  
}  
Imports System   
Imports System.IO   
Imports Microsoft.SqlServer.Dts.Pipeline   
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper   
Imports Microsoft.SqlServer.Dts.Runtime.Wrapper   
Namespace BlobSrc   
  
 <DtsPipelineComponent(DisplayName="BLOB Inserter Source", Description="Inserts files into the data flow as BLOBs")> _   
 Public Class BlobSrc   
 Inherits PipelineComponent   
   Private m_ConnMgr As IDTSConnectionManager100   
   Private m_FileNameColumnIndex As Integer = -1   
   Private m_FileBlobColumnIndex As Integer = -1   
  
   Public  Overrides Sub ProvideComponentProperties()   
     Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection.New   
     output.Name = "BLOB File Inserter Output"   
     Dim column As IDTSOutputColumn100 = output.OutputColumnCollection.New   
     column.Name = "FileName"   
     column.SetDataTypeProperties(DataType.DT_WSTR, 256, 0, 0, 0)   
     column = output.OutputColumnCollection.New   
     column.Name = "FileBLOB"   
     column.SetDataTypeProperties(DataType.DT_IMAGE, 0, 0, 0, 0)   
     Dim conn As IDTSRuntimeConnection90 = ComponentMetaData.RuntimeConnectionCollection.New   
     conn.Name = "FileConnection"   
   End Sub   
  
   Public  Overrides Sub AcquireConnections(ByVal transaction As Object)   
     Dim conn As IDTSRuntimeConnection100 = ComponentMetaData.RuntimeConnectionCollection(0)   
     m_ConnMgr = conn.ConnectionManager   
   End Sub   
  
   Public  Overrides Sub ReleaseConnections()   
     m_ConnMgr = Nothing   
   End Sub   
  
   Public  Overrides Sub PreExecute()   
     Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0)   
     m_FileNameColumnIndex = CType(BufferManager.FindColumnByLineageID(output.Buffer, output.OutputColumnCollection(0).LineageID), Integer)   
     m_FileBlobColumnIndex = CType(BufferManager.FindColumnByLineageID(output.Buffer, output.OutputColumnCollection(1).LineageID), Integer)   
   End Sub   
  
   Public  Overrides Sub PrimeOutput(ByVal outputs As Integer, ByVal outputIDs As Integer(), ByVal buffers As PipelineBuffer())   
     Dim strFileName As String = CType(m_ConnMgr.AcquireConnection(Nothing), String)   
     While Not (strFileName Is Nothing)   
       buffers(0).AddRow   
       buffers(0).SetString(m_FileNameColumnIndex, strFileName)   
       Dim fileInfo As FileInfo = New FileInfo(strFileName)   
       Dim fileData(fileInfo.Length) As Byte   
       Dim fs As FileStream = New FileStream(strFileName, FileMode.Open, FileAccess.Read, FileShare.Read)   
       fs.Read(fileData, 0, fileData.Length)   
       buffers(0).AddBlobData(m_FileBlobColumnIndex, fileData)   
       strFileName = CType(m_ConnMgr.AcquireConnection(Nothing), String)   
     End While   
     buffers(0).SetEndOfRowset   
   End Sub   
 End Class   
End Namespace  

Consulte Também

Desenvolvendo um componente de destino personalizado
Criando uma fonte com o componente Script