Desarrollar un componente de origen personalizado
SQL ServerIntegration Services permite a los desarrolladores escribir componentes de origen que pueden conectar a los orígenes de datos personalizados y proporcionar datos de estos orígenes a otros componentes en una tarea de flujo de datos. La capacidad para crear orígenes personalizados es importante cuando es necesario conectar a orígenes de datos a los que no se puede tener acceso utilizando uno de los orígenes de Integration Services existentes.
Los componentes de origen tienen una o más salidas y ninguna entrada. En tiempo de diseño, los componentes de origen se utilizan para crear y configurar conexiones, leer metadatos de columna del origen de datos externo y configurar las columnas de salida del origen en función del origen de datos externo. Durante la ejecución, conectan al origen de datos externo y agregan filas a un búfer de salida. A continuación, la tarea de flujo de datos proporciona este búfer de filas de datos a componentes de nivel inferior.
Para obtener un componente de origen de ejemplo, vea los ejemplos de Integration Services en Codeplex. Para obtener información general sobre el desarrollo de componentes de flujo de datos, vea Desarrollar un componente de flujo de datos personalizado.
Tiempo de diseño
La implementación de la funcionalidad en tiempo de diseño de un componente de origen implica especificar una conexión a un origen de datos externo, agregar y configurar columnas de salida que reflejen el origen de datos y validar que el componente esté listo para ejecutarse. Por definición, un componente de origen no tiene ninguna entrada y una o más salidas asincrónicas.
Crear el componente
Los componentes de origen se conectan a los orígenes de datos externos utilizando objetos ConnectionManager definidos en un paquete. Indican su requisito de un administrador de conexión agregando un elemento a la colección RuntimeConnectionCollection de la propiedad ComponentMetaData. Esta colección sirve a dos fines: incluir referencias a administradores de conexión del paquete que utiliza el componente y anunciar la necesidad de un administrador de conexión al diseñador. Cuando se ha agregado un contenedor IDTSRuntimeConnection100 a la colección, el Editor avanzado muestra la ficha Propiedades de conexión, que permite a los usuarios seleccionar o crea una conexión en el paquete.
En el ejemplo de código siguiente se muestra una implementación de ProvideComponentProperties que agrega una salida y agrega un objeto IDTSRuntimeConnection100 a RuntimeConnectionCollection.
using System;
using System.Collections;
using System.Data;
using System.Data.SqlClient;
using System.Data.OleDb;
using Microsoft.SqlServer.Dts.Runtime;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
using Microsoft.SqlServer.Dts.Pipeline;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
namespace Microsoft.Samples.SqlServer.Dts
{
[DtsPipelineComponent(DisplayName = "MySourceComponent",ComponentType = ComponentType.SourceAdapter)]
public class MyComponent : PipelineComponent
{
public override void ProvideComponentProperties()
{
// Reset the component.
base.RemoveAllInputsOutputsAndCustomProperties();
ComponentMetaData.RuntimeConnectionCollection.RemoveAll();
IDTSOutput100 output = ComponentMetaData.OutputCollection.New();
output.Name = "Output";
IDTSRuntimeConnection100 connection = ComponentMetaData.RuntimeConnectionCollection.New();
connection.Name = "ADO.NET";
}
Imports System.Data
Imports System.Data.SqlClient
Imports Microsoft.SqlServer.Dts.Runtime
Imports Microsoft.SqlServer.Dts.Runtime.Wrapper
Imports Microsoft.SqlServer.Dts.Pipeline
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper
<DtsPipelineComponent(DisplayName:="MySourceComponent", ComponentType:=ComponentType.SourceAdapter)> _
Public Class MySourceComponent
Inherits PipelineComponent
Public Overrides Sub ProvideComponentProperties()
' Allow for resetting the component.
RemoveAllInputsOutputsAndCustomProperties()
ComponentMetaData.RuntimeConnectionCollection.RemoveAll()
Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection.New()
output.Name = "Output"
Dim connection As IDTSRuntimeConnection100 = ComponentMetaData.RuntimeConnectionCollection.New()
connection.Name = "ADO.NET"
End Sub
End Class
Conectar a un origen de datos externo
Una vez agregada una conexión a RuntimeConnectionCollection, invalide el método AcquireConnections para establecer una conexión al origen de datos externo. A este método se le llama en tiempo de diseño y en tiempo de ejecución. El componente debe establecer una conexión al administrador de conexión especificado por la conexión en tiempo de ejecución y, como consecuencia, al origen de datos externo.
Una vez establecida la conexión, el componente debe almacenarla en caché y liberarla al llamar al método ReleaseConnections. Al método ReleaseConnections se le llama en tiempo de diseño y en tiempo de ejecución, como el método AcquireConnections. Los programadores invalidan este método y liberan la conexión establecida por el componente durante AcquireConnections.
En el ejemplo de código siguiente se muestra un componente que conecta a una conexión de ADO.NET en el método AcquireConnections y cierra la conexión en el método ReleaseConnections.
private SqlConnection sqlConnection;
public override void AcquireConnections(object transaction)
{
if (ComponentMetaData.RuntimeConnectionCollection[0].ConnectionManager != null)
{
ConnectionManager cm = Microsoft.SqlServer.Dts.Runtime.DtsConvert.GetWrapper(ComponentMetaData.RuntimeConnectionCollection[0].ConnectionManager);
ConnectionManagerAdoNet cmado = cm.InnerObject as ConnectionManagerAdoNet;
if (cmado == null)
throw new Exception("The ConnectionManager " + cm.Name + " is not an ADO.NET connection.");
sqlConnection = cmado.AcquireConnection(transaction) as SqlConnection;
sqlConnection.Open();
}
}
public override void ReleaseConnections()
{
if (sqlConnection != null && sqlConnection.State != ConnectionState.Closed)
sqlConnection.Close();
}
Private sqlConnection As SqlConnection
Public Overrides Sub AcquireConnections(ByVal transaction As Object)
If Not IsNothing(ComponentMetaData.RuntimeConnectionCollection(0).ConnectionManager) Then
Dim cm As ConnectionManager = Microsoft.SqlServer.Dts.Runtime.DtsConvert.GetWrapper(ComponentMetaData.RuntimeConnectionCollection(0).ConnectionManager)
Dim cmado As ConnectionManagerAdoNet = CType(cm.InnerObject, ConnectionManagerAdoNet)
If IsNothing(cmado) Then
Throw New Exception("The ConnectionManager " + cm.Name + " is not an ADO.NET connection.")
End If
sqlConnection = CType(cmado.AcquireConnection(transaction), SqlConnection)
sqlConnection.Open()
End If
End Sub
Public Overrides Sub ReleaseConnections()
If Not IsNothing(sqlConnection) And sqlConnection.State <> ConnectionState.Closed Then
sqlConnection.Close()
End If
End Sub
Crear y configurar columnas de salida
Las columnas de salida de un componente de origen reflejan las columnas del origen de datos externo que el componente agrega al flujo de datos durante la ejecución. En tiempo de diseño, agrega columnas de salida una vez configurado el componente para que se conecte a un origen de datos externo. El método en tiempo de diseño que un componente utiliza para agregar las columnas a su colección de salida puede variar en función de las necesidades del componente, aunque no se deben agregar durante Validate o AcquireConnections. Por ejemplo, un componente que contiene una instrucción SQL en una propiedad personalizada que controla el conjunto de datos para el componente puede agregar sus columnas de salida durante el método SetComponentProperty. El componente comprueba si incluye una conexión almacenada en la caché, y, si es así, se conecta al origen de datos y genera sus columnas de salida.
Una vez creada una columna de salida, establezca sus propiedades de tipo de datos llamando al método SetDataTypeProperties. Este método es necesario porque las propiedades DataType, Length, Precision y CodePage son de sólo lectura y cada propiedad depende de los valores de la otra. Este método impone la necesidad de que estos valores se establezcan de forma coherente y la tarea de flujo de datos valida que se establezcan correctamente.
El DataType de la columna determina los valores que se establecen para otras propiedades. En la tabla siguiente se muestran los requisitos de las propiedades dependientes para cada DataType. En los tipos de datos no enumerados, sus propiedades dependientes se establecen en cero.
Tipo de datos |
Longitud |
Escala |
Precisión |
CodePage |
---|---|---|---|---|
DT_DECIMAL |
0 |
Mayor que 0 y menor o igual que 28. |
0 |
0 |
DT_CY |
0 |
0 |
0 |
0 |
DT_NUMERIC |
0 |
Mayor que 0 y menor o igual que 28 y menor que Precisión. |
Mayor o igual que 1 y menor o igual que 38. |
0 |
DT_BYTES |
Mayor que 0. |
0 |
0 |
0 |
DT_STR |
Mayor que 0 y menor que 8.000. |
0 |
0 |
Distinto de 0 y una página de códigos válida. |
DT_WSTR |
Mayor que 0 y menor que 4.000. |
0 |
0 |
0 |
Dado que las restricciones en las propiedades de tipo de datos se basan en el tipo de datos de la columna de salida, debe elegir el tipo de datos de SSIS correcto al trabajar con tipos administrados. La clase base proporciona tres métodos de aplicación auxiliar, ConvertBufferDataTypeToFitManaged, BufferTypeToDataRecordType y DataRecordTypeToBufferType, para ayudar a los programadores de componentes administrados a seleccionar un tipo de datos de SSIS dado un tipo administrado. Estos métodos convierten los tipos de datos administrados en tipos de datos de SSIS y viceversa.
En el ejemplo de código siguiente se muestra cómo se rellena la colección de columnas de salida de un componente en función del esquema de una tabla. Los métodos de aplicación auxiliar de la clase base se utilizan para establecer el tipo de datos de la columna; las propiedades dependientes se establecen en función del tipo de datos.
SqlCommand sqlCommand;
private void CreateColumnsFromDataTable()
{
// Get the output.
IDTSOutput100 output = ComponentMetaData.OutputCollection[0];
// Start clean, and remove the columns from both collections.
output.OutputColumnCollection.RemoveAll();
output.ExternalMetadataColumnCollection.RemoveAll();
this.sqlCommand = sqlConnection.CreateCommand();
this.sqlCommand.CommandType = CommandType.Text;
this.sqlCommand.CommandText = (string)ComponentMetaData.CustomPropertyCollection["SqlStatement"].Value;
SqlDataReader schemaReader = this.sqlCommand.ExecuteReader(CommandBehavior.SchemaOnly);
DataTable dataTable = schemaReader.GetSchemaTable();
// Walk the columns in the schema,
// and for each data column create an output column and an external metadata column.
foreach (DataRow row in dataTable.Rows)
{
IDTSOutputColumn100 outColumn = output.OutputColumnCollection.New();
IDTSExternalMetadataColumn100 exColumn = output.ExternalMetadataColumnCollection.New();
// Set column data type properties.
bool isLong = false;
DataType dt = DataRecordTypeToBufferType((Type)row["DataType"]);
dt = ConvertBufferDataTypeToFitManaged(dt, ref isLong);
int length = 0;
int precision = (short)row["NumericPrecision"];
int scale = (short)row["NumericScale"];
int codepage = dataTable.Locale.TextInfo.ANSICodePage;
switch (dt)
{
// The length cannot be zero, and the code page property must contain a valid code page.
case DataType.DT_STR:
case DataType.DT_TEXT:
length = precision;
precision = 0;
scale = 0;
break;
case DataType.DT_WSTR:
length = precision;
codepage = 0;
scale = 0;
precision = 0;
break;
case DataType.DT_BYTES:
precision = 0;
scale = 0;
codepage = 0;
break;
case DataType.DT_NUMERIC:
length = 0;
codepage = 0;
if (precision > 38)
precision = 38;
if (scale > 6)
scale = 6;
break;
case DataType.DT_DECIMAL:
length = 0;
precision = 0;
codepage = 0;
break;
default:
length = 0;
precision = 0;
codepage = 0;
scale = 0;
break;
}
// Set the properties of the output column.
outColumn.Name = (string)row["ColumnName"];
outColumn.SetDataTypeProperties(dt, length, precision, scale, codepage);
}
}
Private sqlCommand As SqlCommand
Private Sub CreateColumnsFromDataTable()
' Get the output.
Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0)
' Start clean, and remove the columns from both collections.
output.OutputColumnCollection.RemoveAll()
output.ExternalMetadataColumnCollection.RemoveAll()
Me.sqlCommand = sqlConnection.CreateCommand()
Me.sqlCommand.CommandType = CommandType.Text
Me.sqlCommand.CommandText = CStr(ComponentMetaData.CustomPropertyCollection("SqlStatement").Value)
Dim schemaReader As SqlDataReader = Me.sqlCommand.ExecuteReader(CommandBehavior.SchemaOnly)
Dim dataTable As DataTable = schemaReader.GetSchemaTable()
' Walk the columns in the schema,
' and for each data column create an output column and an external metadata column.
For Each row As DataRow In dataTable.Rows
Dim outColumn As IDTSOutputColumn100 = output.OutputColumnCollection.New()
Dim exColumn As IDTSExternalMetadataColumn100 = output.ExternalMetadataColumnCollection.New()
' Set column data type properties.
Dim isLong As Boolean = False
Dim dt As DataType = DataRecordTypeToBufferType(CType(row("DataType"), Type))
dt = ConvertBufferDataTypeToFitManaged(dt, isLong)
Dim length As Integer = 0
Dim precision As Integer = CType(row("NumericPrecision"), Short)
Dim scale As Integer = CType(row("NumericScale"), Short)
Dim codepage As Integer = dataTable.Locale.TextInfo.ANSICodePage
Select Case dt
' The length cannot be zero, and the code page property must contain a valid code page.
Case DataType.DT_STR
Case DataType.DT_TEXT
length = precision
precision = 0
scale = 0
Case DataType.DT_WSTR
length = precision
codepage = 0
scale = 0
precision = 0
Case DataType.DT_BYTES
precision = 0
scale = 0
codepage = 0
Case DataType.DT_NUMERIC
length = 0
codepage = 0
If precision > 38 Then
precision = 38
End If
If scale > 6 Then
scale = 6
End If
Case DataType.DT_DECIMAL
length = 0
precision = 0
codepage = 0
Case Else
length = 0
precision = 0
codepage = 0
scale = 0
End Select
' Set the properties of the output column.
outColumn.Name = CStr(row("ColumnName"))
outColumn.SetDataTypeProperties(dt, length, precision, scale, codepage)
Next
End Sub
Validar el componente
Debe validar un componente de origen y comprobar que las columnas definidas en sus colecciones de columnas de salida coinciden con las columnas del origen de datos externo. En ocasiones, la comprobación de las columnas de salida en el origen de datos externo puede resultar imposible, como en un estado desconectado, o cuando es preferible evitar largos viajes de ida y vuelta (round trip) al servidor. En estas situaciones, las columnas de la salida se pueden seguir validando mediante la propiedad ExternalMetadataColumnCollection del objeto de salida. Para obtener más información, vea Validar un componente de flujo de datos.
Esta colección existe tanto en los objetos de entrada como en los objetos de salida, y se puede rellenar con las columnas del origen de datos externo. Puede utilizar esta colección para validar las columnas de salida cuando el Diseñador SSIS no tiene conexión, cuando el componente está desconectado o cuando la propiedad ValidateExternalMetadata es false. La colección se debe rellenar por primera vez al mismo tiempo que se crean las columnas de salida. Agregar columnas de metadatos externas a la colección resulta relativamente fácil porque la columna de metadatos externa debe coincidir inicialmente con la columna de salida. Las propiedades de tipo de datos se deberán haber establecido correctamente y se pueden copiar directamente en el objeto IDTSExternalMetadataColumn100.
En el siguiente código de ejemplo se agrega una columna de metadatos externa basada en una columna de salida recién creada. Se supone que la columna de salida ya se ha creado.
private void CreateExternalMetaDataColumn(IDTSOutput100 output, IDTSOutputColumn100 outputColumn)
{
// Set the properties of the external metadata column.
IDTSExternalMetadataColumn100 externalColumn = output.ExternalMetadataColumnCollection.New();
externalColumn.Name = outputColumn.Name;
externalColumn.Precision = outputColumn.Precision;
externalColumn.Length = outputColumn.Length;
externalColumn.DataType = outputColumn.DataType;
externalColumn.Scale = outputColumn.Scale;
// Map the external column to the output column.
outputColumn.ExternalMetadataColumnID = externalColumn.ID;
}
Private Sub CreateExternalMetaDataColumn(ByVal output As IDTSOutput100, ByVal outputColumn As IDTSOutputColumn100)
' Set the properties of the external metadata column.
Dim externalColumn As IDTSExternalMetadataColumn100 = output.ExternalMetadataColumnCollection.New()
externalColumn.Name = outputColumn.Name
externalColumn.Precision = outputColumn.Precision
externalColumn.Length = outputColumn.Length
externalColumn.DataType = outputColumn.DataType
externalColumn.Scale = outputColumn.Scale
' Map the external column to the output column.
outputColumn.ExternalMetadataColumnID = externalColumn.ID
End Sub
Tiempo de ejecución
Durante la ejecución, los componentes agregan filas a los búferes de salida que crea la tarea de flujo de datos y que se proporcionan al componente en PrimeOutput. El método, al que se llama una vez para los componentes de origen, recibe un búfer de salida para cada objeto IDTSOutput100 del componente conectado a un componente de nivel inferior.
Localizar columnas en el búfer
El búfer de salida de un componente contiene las columnas definidas por el componente y cualquier columna agregada a la salida de un componente de nivel inferior. Por ejemplo, si un componente de origen proporciona tres columnas en su salida y el componente siguiente agrega una cuarta columna de salida, el búfer de salida que proporciona el componente de origen contiene estas cuatro columnas.
El índice de la columna de salida de la colección de columnas de salida no define el orden de las columnas de una fila de búfer. Una columna de salida sólo se puede localizar de forma precisa en una fila de búfer mediante el método FindColumnByLineageID de BufferManager. Este método localiza la columna con el identificador de linaje especificado en el búfer especificado y devuelve su ubicación en la fila. Los índices de las columnas de salida se encuentran normalmente en el método PreExecute y se almacenan para su uso durante PrimeOutput.
En el ejemplo de código siguiente se busca la ubicación de las columnas de salida del búfer de salida durante una llamada a PreExecute y se almacenan en una estructura interna. El nombre de la columna también se almacena en la estructura y se utiliza en el ejemplo de código para el método PrimeOutput en la sección siguiente de este tema.
ArrayList columnInformation;
private struct ColumnInfo
{
public int BufferColumnIndex;
public string ColumnName;
}
public override void PreExecute()
{
this.columnInformation = new ArrayList();
IDTSOutput100 output = ComponentMetaData.OutputCollection[0];
foreach (IDTSOutputColumn100 col in output.OutputColumnCollection)
{
ColumnInfo ci = new ColumnInfo();
ci.BufferColumnIndex = BufferManager.FindColumnByLineageID(output.Buffer, col.LineageID);
ci.ColumnName = col.Name;
columnInformation.Add(ci);
}
}
Public Overrides Sub PreExecute()
Me.columnInformation = New ArrayList()
Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0)
For Each col As IDTSOutputColumn100 In output.OutputColumnCollection
Dim ci As ColumnInfo = New ColumnInfo()
ci.BufferColumnIndex = BufferManager.FindColumnByLineageID(output.Buffer, col.LineageID)
ci.ColumnName = col.Name
columnInformation.Add(ci)
Next
End Sub
Procesar las filas
Las filas se agregan al búfer de salida llamando al método AddRow, que crea una nueva fila de búfer con valores vacíos en sus columnas. A continuación, el componente asigna los valores a las columnas individuales. La tarea de flujo de datos crea y supervisa los búferes de salida que se proporcionan a un componente. Cuando se llenan, las filas del búfer se mueven al componente siguiente. No hay ninguna manera de determinar cuándo se ha enviado un lote de filas al componente siguiente porque el movimiento de filas de la tarea de flujo de datos es transparente para el programador de componentes y la propiedad RowCount siempre es cero en los búferes de salida. Cuando un componente de origen ha terminado de agregar filas a su búfer de salida, lo notifica a la tarea de flujo de datos mediante una llamada al método SetEndOfRowset de PipelineBuffer y las filas restantes del búfer se pasan al componente siguiente.
Mientras el componente de origen lee las filas del origen de datos externo, puede actualizar los contadores de rendimiento "Filas leídas" o "Bytes BLOB leídos" mediante una llamada al método IncrementPipelinePerfCounter. Para obtener más información, vea Supervisar el rendimiento del motor de flujo de datos.
En el ejemplo de código siguiente se muestra un componente que agrega filas a un búfer de salida en PrimeOutput. Los índices de las columnas de salida del búfer se localizaron con PreExecute del ejemplo de código anterior.
public override void PrimeOutput(int outputs, int[] outputIDs, PipelineBuffer[] buffers)
{
IDTSOutput100 output = ComponentMetaData.OutputCollection[0];
PipelineBuffer buffer = buffers[0];
SqlDataReader dataReader = sqlCommand.ExecuteReader();
// Loop over the rows in the DataReader,
// and add them to the output buffer.
while (dataReader.Read())
{
// Add a row to the output buffer.
buffer.AddRow();
for (int x = 0; x < columnInformation.Count; x++)
{
ColumnInfo ci = (ColumnInfo)columnInformation[x];
int ordinal = dataReader.GetOrdinal(ci.ColumnName);
if (dataReader.IsDBNull(ordinal))
buffer.SetNull(ci.BufferColumnIndex);
else
{
buffer[ci.BufferColumnIndex] = dataReader[ci.ColumnName];
}
}
}
buffer.SetEndOfRowset();
}
Public Overrides Sub PrimeOutput(ByVal outputs As Integer, ByVal outputIDs As Integer(), ByVal buffers As PipelineBuffer())
Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0)
Dim buffer As PipelineBuffer = buffers(0)
Dim dataReader As SqlDataReader = sqlCommand.ExecuteReader()
' Loop over the rows in the DataReader,
' and add them to the output buffer.
While (dataReader.Read())
' Add a row to the output buffer.
buffer.AddRow()
For x As Integer = 0 To columnInformation.Count
Dim ci As ColumnInfo = CType(columnInformation(x), ColumnInfo)
Dim ordinal As Integer = dataReader.GetOrdinal(ci.ColumnName)
If (dataReader.IsDBNull(ordinal)) Then
buffer.SetNull(ci.BufferColumnIndex)
Else
buffer(ci.BufferColumnIndex) = dataReader(ci.ColumnName)
End If
Next
End While
buffer.SetEndOfRowset()
End Sub
Ejemplo
En el ejemplo siguiente se muestra un componente de origen simple que utiliza un administrador de conexión de archivos para cargar el contenido binario de los archivos en el flujo de datos. En este ejemplo no se muestran todos los métodos ni funcionalidad tratados en este tema. Muestra los métodos importantes que cada componente de origen personalizado debe invalidar, pero no contiene código para la validación en tiempo de diseño. Para obtener un ejemplo de componente de origen más completo, vea los ejemplos de Integration Services en Codeplex.
using System;
using System.IO;
using Microsoft.SqlServer.Dts.Pipeline;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
namespace BlobSrc
{
[DtsPipelineComponent(DisplayName = "BLOB Inserter Source", Description = "Inserts files into the data flow as BLOBs")]
public class BlobSrc : PipelineComponent
{
IDTSConnectionManager100 m_ConnMgr;
int m_FileNameColumnIndex = -1;
int m_FileBlobColumnIndex = -1;
public override void ProvideComponentProperties()
{
IDTSOutput100 output = ComponentMetaData.OutputCollection.New();
output.Name = "BLOB File Inserter Output";
IDTSOutputColumn100 column = output.OutputColumnCollection.New();
column.Name = "FileName";
column.SetDataTypeProperties(DataType.DT_WSTR, 256, 0, 0, 0);
column = output.OutputColumnCollection.New();
column.Name = "FileBLOB";
column.SetDataTypeProperties(DataType.DT_IMAGE, 0, 0, 0, 0);
IDTSRuntimeConnection100 conn = ComponentMetaData.RuntimeConnectionCollection.New();
conn.Name = "FileConnection";
}
public override void AcquireConnections(object transaction)
{
IDTSRuntimeConnection100 conn = ComponentMetaData.RuntimeConnectionCollection[0];
m_ConnMgr = conn.ConnectionManager;
}
public override void ReleaseConnections()
{
m_ConnMgr = null;
}
public override void PreExecute()
{
IDTSOutput100 output = ComponentMetaData.OutputCollection[0];
m_FileNameColumnIndex = (int)BufferManager.FindColumnByLineageID(output.Buffer, output.OutputColumnCollection[0].LineageID);
m_FileBlobColumnIndex = (int)BufferManager.FindColumnByLineageID(output.Buffer, output.OutputColumnCollection[1].LineageID);
}
public override void PrimeOutput(int outputs, int[] outputIDs, PipelineBuffer[] buffers)
{
string strFileName = (string)m_ConnMgr.AcquireConnection(null);
while (strFileName != null)
{
buffers[0].AddRow();
buffers[0].SetString(m_FileNameColumnIndex, strFileName);
FileInfo fileInfo = new FileInfo(strFileName);
byte[] fileData = new byte[fileInfo.Length];
FileStream fs = new FileStream(strFileName, FileMode.Open, FileAccess.Read, FileShare.Read);
fs.Read(fileData, 0, fileData.Length);
buffers[0].AddBlobData(m_FileBlobColumnIndex, fileData);
strFileName = (string)m_ConnMgr.AcquireConnection(null);
}
buffers[0].SetEndOfRowset();
}
}
}
Imports System
Imports System.IO
Imports Microsoft.SqlServer.Dts.Pipeline
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper
Imports Microsoft.SqlServer.Dts.Runtime.Wrapper
Namespace BlobSrc
<DtsPipelineComponent(DisplayName="BLOB Inserter Source", Description="Inserts files into the data flow as BLOBs")> _
Public Class BlobSrc
Inherits PipelineComponent
Private m_ConnMgr As IDTSConnectionManager100
Private m_FileNameColumnIndex As Integer = -1
Private m_FileBlobColumnIndex As Integer = -1
Public Overrides Sub ProvideComponentProperties()
Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection.New
output.Name = "BLOB File Inserter Output"
Dim column As IDTSOutputColumn100 = output.OutputColumnCollection.New
column.Name = "FileName"
column.SetDataTypeProperties(DataType.DT_WSTR, 256, 0, 0, 0)
column = output.OutputColumnCollection.New
column.Name = "FileBLOB"
column.SetDataTypeProperties(DataType.DT_IMAGE, 0, 0, 0, 0)
Dim conn As IDTSRuntimeConnection90 = ComponentMetaData.RuntimeConnectionCollection.New
conn.Name = "FileConnection"
End Sub
Public Overrides Sub AcquireConnections(ByVal transaction As Object)
Dim conn As IDTSRuntimeConnection100 = ComponentMetaData.RuntimeConnectionCollection(0)
m_ConnMgr = conn.ConnectionManager
End Sub
Public Overrides Sub ReleaseConnections()
m_ConnMgr = Nothing
End Sub
Public Overrides Sub PreExecute()
Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0)
m_FileNameColumnIndex = CType(BufferManager.FindColumnByLineageID(output.Buffer, output.OutputColumnCollection(0).LineageID), Integer)
m_FileBlobColumnIndex = CType(BufferManager.FindColumnByLineageID(output.Buffer, output.OutputColumnCollection(1).LineageID), Integer)
End Sub
Public Overrides Sub PrimeOutput(ByVal outputs As Integer, ByVal outputIDs As Integer(), ByVal buffers As PipelineBuffer())
Dim strFileName As String = CType(m_ConnMgr.AcquireConnection(Nothing), String)
While Not (strFileName Is Nothing)
buffers(0).AddRow
buffers(0).SetString(m_FileNameColumnIndex, strFileName)
Dim fileInfo As FileInfo = New FileInfo(strFileName)
Dim fileData(fileInfo.Length) As Byte
Dim fs As FileStream = New FileStream(strFileName, FileMode.Open, FileAccess.Read, FileShare.Read)
fs.Read(fileData, 0, fileData.Length)
buffers(0).AddBlobData(m_FileBlobColumnIndex, fileData)
strFileName = CType(m_ConnMgr.AcquireConnection(Nothing), String)
End While
buffers(0).SetEndOfRowset
End Sub
End Class
End Namespace
|