Compartir vía


Desarrollar un componente de destino personalizado

Se aplica a: SQL Server SSIS Integration Runtime en Azure Data Factory

Microsoft SQL Server Integration Services permite a los desarrolladores escribir componentes de destino personalizados que pueden conectarse a cualquier origen de datos personalizado y almacenar datos en él. Los componentes de destino personalizados resultan útiles si necesitar conectarse a orígenes de datos a los que no se puede tener acceso mediante uno de los componentes de origen existentes incluidos con Integration Services.

Los componentes de destino tienen una o más entradas y ninguna salida. En tiempo de diseño, crean y configuran conexiones y leen metadatos de columna del origen de datos externo. Durante la ejecución, conectan a su origen de datos externo y agregan filas que se reciben de los componentes de nivel superior en el flujo de datos al origen de datos externo. Si el origen de datos externo existe antes de la ejecución del componente, el componente de destino también debe asegurarse de que los tipos de datos de las columnas que recibe el componente coinciden con los tipos de datos de las columnas en el origen de datos externo.

En esta sección se tratan los detalles de cómo desarrollar componentes de destino y se proporcionan ejemplos de código para aclarar conceptos importantes. Para obtener información general sobre el desarrollo de componentes de flujo de datos, vea Developing a Custom Data Flow Componen (Desarrollo de un componente de flujo de datos personalizado).

Tiempo de diseño

La implementación de la funcionalidad en tiempo de diseño de un componente de destino implica especificar una conexión a un origen de datos externo y validar que el componente se ha configurado correctamente. Por definición, un componente de destino tiene una entrada y posiblemente una salida de error.

Crear el componente

Los componentes de destino se conectan a los orígenes de datos externos utilizando objetos ConnectionManager definidos en un paquete. El componente de destino indica su requisito de un administrador de conexiones al Diseñador SSIS y a los usuarios del componente mediante la adición de un elemento a la colección RuntimeConnectionCollection de ComponentMetaData. Esta colección sirve a dos fines: anuncia la necesidad de un administrador de conexiones al Diseñador SSIS y, después de que el usuario ha seleccionado o creado un administrador de conexiones, incluye una referencia al administrador de conexiones en el paquete que utiliza el componente. Cuando se agrega un objeto IDTSRuntimeConnection100 a la colección, el Editor avanzado muestra la pestaña Propiedades de conexión, para solicitar al usuario que seleccione o cree en el paquete una conexión para que la use el componente.

En el ejemplo de código siguiente se muestra una implementación de ProvideComponentProperties que agrega una entrada y, a continuación, agrega un objeto IDTSRuntimeConnection100 a RuntimeConnectionCollection.

using System;  
using Microsoft.SqlServer.Dts.Pipeline;  
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;  
using Microsoft.SqlServer.Dts.Runtime;  
  
namespace Microsoft.Samples.SqlServer.Dts  
{  
    [DtsPipelineComponent(DisplayName = "Destination Component",ComponentType =ComponentType.DestinationAdapter)]  
    public class DestinationComponent : PipelineComponent   
    {  
        public override void ProvideComponentProperties()  
        {  
            // Reset the component.  
            base.RemoveAllInputsOutputsAndCustomProperties();  
            ComponentMetaData.RuntimeConnectionCollection.RemoveAll();  
  
            IDTSInput100 input = ComponentMetaData.InputCollection.New();  
            input.Name = "Input";  
  
            IDTSRuntimeConnection100 connection = ComponentMetaData.RuntimeConnectionCollection.New();  
            connection.Name = "ADO.net";  
        }  
    }  
}  
Imports System  
Imports System.Data  
Imports System.Data.SqlClient  
Imports Microsoft.SqlServer.Dts.Pipeline  
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper  
Imports Microsoft.SqlServer.Dts.Runtime  
  
Namespace Microsoft.Samples.SqlServer.Dts  
  
    <DtsPipelineComponent(DisplayName:="Destination Component", ComponentType:=ComponentType.DestinationAdapter)> _  
    Public Class DestinationComponent  
        Inherits PipelineComponent  
  
        Public Overrides Sub ProvideComponentProperties()  
  
            ' Reset the component.  
            Me.RemoveAllInputsOutputsAndCustomProperties()  
            ComponentMetaData.RuntimeConnectionCollection.RemoveAll()  
  
            Dim input As IDTSInput100 = ComponentMetaData.InputCollection.New()  
            input.Name = "Input"  
  
            Dim connection As IDTSRuntimeConnection100 = ComponentMetaData.RuntimeConnectionCollection.New()  
            connection.Name = "ADO.net"  
  
        End Sub  
    End Class  
End Namespace  

Conectar a un origen de datos externo

Una vez agregada una conexión a RuntimeConnectionCollection, invalide el método AcquireConnections para establecer una conexión al origen de datos externo. La llamada a este método se realiza en tiempo de diseño y en tiempo de ejecución. El componente debe establecer una conexión al administrador de conexiones especificado por la conexión en tiempo de ejecución y, como consecuencia, al origen de datos externo. Una vez establecida, el componente debe almacenar internamente en la memoria caché la conexión y liberarla cuando se llame a ReleaseConnections. Los programadores invalidan este método y liberan la conexión establecida por el componente durante AcquireConnections. Se llama a ambos métodos, ReleaseConnections y AcquireConnections, en tiempo de diseño y en tiempo de ejecución.

En el ejemplo de código siguiente se muestra un componente que conecta a una conexión de ADO.NET en el método AcquireConnections y, a continuación, cierra la conexión en el método ReleaseConnections.

using Microsoft.SqlServer.Dts.Runtime.Wrapper;  
  
private SqlConnection sqlConnection;  
  
public override void AcquireConnections(object transaction)  
{  
    if (ComponentMetaData.RuntimeConnectionCollection[0].ConnectionManager != null)  
    {  
        ConnectionManager cm = Microsoft.SqlServer.Dts.Runtime.DtsConvert.GetWrapper(ComponentMetaData.RuntimeConnectionCollection[0].ConnectionManager);  
        ConnectionManagerAdoNet cmado = cm.InnerObject as ConnectionManagerAdoNet;  
  
        if (cmado == null)  
            throw new Exception("The ConnectionManager " + cm.Name + " is not an ADO.NET connection.");  
  
        sqlConnection = cmado.AcquireConnection(transaction) as SqlConnection;  
        sqlConnection.Open();  
    }  
}  
  
public override void ReleaseConnections()  
{  
    if (sqlConnection != null && sqlConnection.State != ConnectionState.Closed)  
        sqlConnection.Close();  
}  
Imports Microsoft.SqlServer.Dts.Runtime.Wrapper  
  
Private sqlConnection As SqlConnection  
  
Public Overrides Sub AcquireConnections(ByVal transaction As Object)  
  
    If IsNothing(ComponentMetaData.RuntimeConnectionCollection(0).ConnectionManager) = False Then  
  
        Dim cm As ConnectionManager = DtsConvert.GetWrapper(ComponentMetaData.RuntimeConnectionCollection(0).ConnectionManager)  
        Dim cmado As ConnectionManagerAdoNet = CType(cm.InnerObject,ConnectionManagerAdoNet)  
  
        If IsNothing(cmado) Then  
            Throw New Exception("The ConnectionManager " + cm.Name + " is not an ADO.NET connection.")  
        End If  
  
        sqlConnection = CType(cmado.AcquireConnection(transaction), SqlConnection)  
        sqlConnection.Open()  
  
    End If  
End Sub  
  
Public Overrides Sub ReleaseConnections()  
  
    If IsNothing(sqlConnection) = False And sqlConnection.State <> ConnectionState.Closed Then  
        sqlConnection.Close()  
    End If  
  
End Sub  

Validar el componente

Los programadores de componentes de destino deben realizar la validación tal como se describe en Component Validation (Validación de componentes). Además, deben comprobar que las propiedades de tipo de datos de las columnas definidas en la colección de columnas de entrada del componente coinciden con las columnas en el origen de datos externo. En ocasiones, la comprobación de las columnas de entrada con respecto al origen de datos externo puede resultar imposible o no ser deseable; por ejemplo si el componente o el Diseñador SSIS están en un estado desconectado o si no resultan aceptables los viajes de ida y vuelta (round trip) al servidor. En estas situaciones, las columnas de la colección de columnas de entrada se pueden validar mediante la propiedad ExternalMetadataColumnCollection del objeto de entrada.

Esta colección existe en los objetos de entrada y salida; el programador del componente debe rellenarla a partir de las columnas del origen de datos externo. Esta colección se puede usar para validar las columnas de entrada cuando el Diseñador SSIS está en modo sin conexión, cuando el componente está desconectado o cuando la propiedad ValidateExternalMetadata es false.

En el siguiente código de ejemplo se agrega una columna de metadatos externa basada en una columna de entrada existente.

private void AddExternalMetaDataColumn(IDTSInput100 input,IDTSInputColumn100 inputColumn)  
{  
    // Set the properties of the external metadata column.  
    IDTSExternalMetadataColumn100 externalColumn = input.ExternalMetadataColumnCollection.New();  
    externalColumn.Name = inputColumn.Name;  
    externalColumn.Precision = inputColumn.Precision;  
    externalColumn.Length = inputColumn.Length;  
    externalColumn.DataType = inputColumn.DataType;  
    externalColumn.Scale = inputColumn.Scale;  
  
    // Map the external column to the input column.  
    inputColumn.ExternalMetadataColumnID = externalColumn.ID;  
}  
Private Sub AddExternalMetaDataColumn(ByVal input As IDTSInput100, ByVal inputColumn As IDTSInputColumn100)  
  
    ' Set the properties of the external metadata column.  
    Dim externalColumn As IDTSExternalMetadataColumn100 = input.ExternalMetadataColumnCollection.New()  
    externalColumn.Name = inputColumn.Name  
    externalColumn.Precision = inputColumn.Precision  
    externalColumn.Length = inputColumn.Length  
    externalColumn.DataType = inputColumn.DataType  
    externalColumn.Scale = inputColumn.Scale  
  
    ' Map the external column to the input column.  
    inputColumn.ExternalMetadataColumnID = externalColumn.ID  
  
End Sub  

Tiempo de ejecución

Durante la ejecución, el componente de destino recibe una llamada al método ProcessInput cada vez que está disponible un objeto PipelineBuffer completo del componente de nivel superior. Se llama a este método repetidamente hasta que no existen más búferes disponibles y la propiedad EndOfRowset es true. Durante este método, los componentes de destino leen las columnas y filas del búfer y las agregan al origen de datos externo.

Localizar columnas en el búfer

El búfer de entrada para un componente contiene todas las columnas definidas en las colecciones de columnas de salida de los componentes de nivel superior del componente en el flujo de datos. Por ejemplo, si un componente de origen proporciona tres columnas en su salida y el componente siguiente agrega una columna de salida adicional, el búfer proporcionado al componente de destino contiene cuatro columnas, aunque el componente de destino escriba solamente dos columnas.

El índice de la columna de la colección de columnas de entrada del componente de destino no define el orden de las columnas del búfer de entrada. Las columnas solo se pueden localizar en un búfer de manera confiable mediante el método FindColumnByLineageID de BufferManager. Este método localiza la columna con el identificador de linaje especificado en el búfer especificado y devuelve su ubicación en la fila. Normalmente los índices de las columnas de entrada se encuentran durante el método PreExecute y el programador los almacena en memoria caché para su uso posterior durante ProcessInput.

En el ejemplo de código siguiente se busca la ubicación de las columnas de entrada en el búfer durante PreExecute y se almacenan en una matriz. Posteriormente, la matriz se utiliza durante ProcessInput para leer los valores de las columnas en el búfer.

int[] cols;  
  
public override void PreExecute()  
{  
    IDTSInput100 input = ComponentMetaData.InputCollection[0];  
  
    cols = new int[input.InputColumnCollection.Count];  
  
    for (int x = 0; x < input.InputColumnCollection.Count; x++)  
    {  
        cols[x] = BufferManager.FindColumnByLineageID(input.Buffer, input.InputColumnCollection[x].LineageID);  
    }  
}  
Private cols As Integer()  
  
Public Overrides Sub PreExecute()  
  
    Dim input As IDTSInput100 = ComponentMetaData.InputCollection(0)  
  
    ReDim cols(input.InputColumnCollection.Count)  
  
    For x As Integer = 0 To input.InputColumnCollection.Count  
  
        cols(x) = BufferManager.FindColumnByLineageID(input.Buffer, input.InputColumnCollection(x).LineageID)  
    Next x  
  
End Sub  

Procesar las filas

Una vez localizadas en el búfer las columnas de entrada, se pueden leer y escribir en el origen de datos externo.

Mientras el componente de destino escribe las filas del origen de datos externo, puede actualizar los contadores de rendimiento "Filas leídas" o "Bytes BLOB leídos" mediante una llamada al método IncrementPipelinePerfCounter. Para más información, consulte Performance Counters.

En el ejemplo siguiente se muestra un componente que lee las filas del búfer proporcionado en ProcessInput. Los índices de las columnas en el búfer se localizaron durante PreExecute en el ejemplo de código anterior.

public override void ProcessInput(int inputID, PipelineBuffer buffer)  
{  
        while (buffer.NextRow())  
        {  
            foreach (int col in cols)  
            {  
                if (!buffer.IsNull(col))  
                {  
                    //  TODO: Read the column data.  
                }  
            }  
        }  
}  
Public Overrides Sub ProcessInput(ByVal inputID As Integer, ByVal buffer As PipelineBuffer)  
  
        While (buffer.NextRow())  
  
            For Each col As Integer In cols  
  
                If buffer.IsNull(col) = False Then  
  
                    '  TODO: Read the column data.  
                End If  
  
            Next col  
        End While  
End Sub  

Muestra

En el ejemplo siguiente se muestra un componente de destino simple que utiliza un administrador de conexiones de archivos para guardar los datos binarios del flujo de datos en archivos. En este ejemplo no se muestran todos los métodos ni funcionalidad tratados en este tema. Muestra los métodos importantes que cada componente de destino personalizado debe invalidar, pero no contiene código para la validación en tiempo de diseño.

using System;  
using System.IO;  
using Microsoft.SqlServer.Dts.Pipeline;  
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;  
  
namespace BlobDst  
{  
  [DtsPipelineComponent(DisplayName = "BLOB Extractor Destination", Description = "Writes values of BLOB columns to files")]  
  public class BlobDst : PipelineComponent  
  {  
    string m_DestDir;  
    int m_FileNameColumnIndex = -1;  
    int m_BlobColumnIndex = -1;  
  
    public override void ProvideComponentProperties()  
    {  
      IDTSInput100 input = ComponentMetaData.InputCollection.New();  
      input.Name = "BLOB Extractor Destination Input";  
      input.HasSideEffects = true;  
  
      IDTSRuntimeConnection100 conn = ComponentMetaData.RuntimeConnectionCollection.New();  
      conn.Name = "FileConnection";  
    }  
  
    public override void AcquireConnections(object transaction)  
    {  
      IDTSRuntimeConnection100 conn = ComponentMetaData.RuntimeConnectionCollection[0];  
      m_DestDir = (string)conn.ConnectionManager.AcquireConnection(null);  
  
      if (m_DestDir.Length > 0 && m_DestDir[m_DestDir.Length - 1] != '\\')  
        m_DestDir += "\\";  
    }  
  
    public override IDTSInputColumn100 SetUsageType(int inputID, IDTSVirtualInput100 virtualInput, int lineageID, DTSUsageType usageType)  
    {  
      IDTSInputColumn100 inputColumn = base.SetUsageType(inputID, virtualInput, lineageID, usageType);  
      IDTSCustomProperty100 custProp;  
  
      custProp = inputColumn.CustomPropertyCollection.New();  
      custProp.Name = "IsFileName";  
      custProp.Value = (object)false;  
  
      custProp = inputColumn.CustomPropertyCollection.New();  
      custProp.Name = "IsBLOB";  
      custProp.Value = (object)false;  
  
      return inputColumn;  
    }  
  
    public override void PreExecute()  
    {  
      IDTSInput100 input = ComponentMetaData.InputCollection[0];  
      IDTSInputColumnCollection100 inputColumns = input.InputColumnCollection;  
      IDTSCustomProperty100 custProp;  
  
      foreach (IDTSInputColumn100 column in inputColumns)  
      {  
        custProp = column.CustomPropertyCollection["IsFileName"];  
        if ((bool)custProp.Value == true)  
        {  
          m_FileNameColumnIndex = (int)BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID);  
        }  
  
        custProp = column.CustomPropertyCollection["IsBLOB"];  
        if ((bool)custProp.Value == true)  
        {  
          m_BlobColumnIndex = (int)BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID);  
        }  
      }  
    }  
  
    public override void ProcessInput(int inputID, PipelineBuffer buffer)  
    {  
      while (buffer.NextRow())  
      {  
        string strFileName = buffer.GetString(m_FileNameColumnIndex);  
        int blobLength = (int)buffer.GetBlobLength(m_BlobColumnIndex);  
        byte[] blobData = buffer.GetBlobData(m_BlobColumnIndex, 0, blobLength);  
  
        strFileName = TranslateFileName(strFileName);  
  
        // Make sure directory exists before creating file.  
        FileInfo fi = new FileInfo(strFileName);  
        if (!fi.Directory.Exists)  
          fi.Directory.Create();  
  
        // Write the data to the file.  
        FileStream fs = new FileStream(strFileName, FileMode.Create, FileAccess.Write, FileShare.None);  
        fs.Write(blobData, 0, blobLength);  
        fs.Close();  
      }  
    }  
  
    private string TranslateFileName(string fileName)  
    {  
      if (fileName.Length > 2 && fileName[1] == ':')  
        return m_DestDir + fileName.Substring(3, fileName.Length - 3);  
      else  
        return m_DestDir + fileName;  
    }  
  }  
}  
Imports System   
Imports System.IO   
Imports Microsoft.SqlServer.Dts.Pipeline   
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper   
Namespace BlobDst   
  
 <DtsPipelineComponent(DisplayName="BLOB Extractor Destination", Description="Writes values of BLOB columns to files")> _   
 Public Class BlobDst   
 Inherits PipelineComponent   
   Private m_DestDir As String   
   Private m_FileNameColumnIndex As Integer = -1   
   Private m_BlobColumnIndex As Integer = -1   
  
   Public  Overrides Sub ProvideComponentProperties()   
     Dim input As IDTSInput100 = ComponentMetaData.InputCollection.New   
     input.Name = "BLOB Extractor Destination Input"   
     input.HasSideEffects = True   
     Dim conn As IDTSRuntimeConnection100 = ComponentMetaData.RuntimeConnectionCollection.New   
     conn.Name = "FileConnection"   
   End Sub   
  
   Public  Overrides Sub AcquireConnections(ByVal transaction As Object)   
     Dim conn As IDTSRuntimeConnection100 = ComponentMetaData.RuntimeConnectionCollection(0)   
     m_DestDir = CType(conn.ConnectionManager.AcquireConnection(Nothing), String)   
     If m_DestDir.Length > 0 AndAlso Not (m_DestDir(m_DestDir.Length - 1) = "\"C) Then   
       m_DestDir += "\"   
     End If   
   End Sub   
  
   Public  Overrides Function SetUsageType(ByVal inputID As Integer, ByVal virtualInput As IDTSVirtualInput100, ByVal lineageID As Integer, ByVal usageType As DTSUsageType) As IDTSInputColumn100   
     Dim inputColumn As IDTSInputColumn100 = MyBase.SetUsageType(inputID, virtualInput, lineageID, usageType)   
     Dim custProp As IDTSCustomProperty100   
     custProp = inputColumn.CustomPropertyCollection.New   
     custProp.Name = "IsFileName"   
     custProp.Value = CType(False, Object)   
     custProp = inputColumn.CustomPropertyCollection.New   
     custProp.Name = "IsBLOB"   
     custProp.Value = CType(False, Object)   
     Return inputColumn   
   End Function   
  
   Public  Overrides Sub PreExecute()   
     Dim input As IDTSInput100 = ComponentMetaData.InputCollection(0)   
     Dim inputColumns As IDTSInputColumnCollection100 = input.InputColumnCollection   
     Dim custProp As IDTSCustomProperty100   
     For Each column As IDTSInputColumn100 In inputColumns   
       custProp = column.CustomPropertyCollection("IsFileName")   
       If CType(custProp.Value, Boolean) = True Then   
         m_FileNameColumnIndex = CType(BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID), Integer)   
       End If   
       custProp = column.CustomPropertyCollection("IsBLOB")   
       If CType(custProp.Value, Boolean) = True Then   
         m_BlobColumnIndex = CType(BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID), Integer)   
       End If   
     Next   
   End Sub   
  
   Public  Overrides Sub ProcessInput(ByVal inputID As Integer, ByVal buffer As PipelineBuffer)   
     While buffer.NextRow   
       Dim strFileName As String = buffer.GetString(m_FileNameColumnIndex)   
       Dim blobLength As Integer = CType(buffer.GetBlobLength(m_BlobColumnIndex), Integer)   
       Dim blobData As Byte() = buffer.GetBlobData(m_BlobColumnIndex, 0, blobLength)   
       strFileName = TranslateFileName(strFileName)   
       Dim fi As FileInfo = New FileInfo(strFileName)   
       ' Make sure directory exists before creating file.  
       If Not fi.Directory.Exists Then   
         fi.Directory.Create   
       End If   
       ' Write the data to the file.  
       Dim fs As FileStream = New FileStream(strFileName, FileMode.Create, FileAccess.Write, FileShare.None)   
       fs.Write(blobData, 0, blobLength)   
       fs.Close   
     End While   
   End Sub   
  
   Private Function TranslateFileName(ByVal fileName As String) As String   
     If fileName.Length > 2 AndAlso fileName(1) = ":"C Then   
       Return m_DestDir + fileName.Substring(3, fileName.Length - 3)   
     Else   
       Return m_DestDir + fileName   
     End If   
   End Function   
 End Class   
End Namespace  

Vea también

Desarrollar un componente de origen personalizado
Creating a Destination with the Script Component (Crear un destino con el componente de script)