Développement d'un composant source personnalisé
SQL Server Integration Services permet aux développeurs d'écrire des composants sources capables de se connecter à des sources de données personnalisées et de fournir des données, à partir de ces sources, à d'autres composants dans une tâche de flux de données. La possibilité de créer des sources personnalisées est particulièrement utile lorsque vous devez vous connecter à des sources de données qui ne sont pas accessibles à l'aide de l'une des sources Integration Services existantes.
Les composants sources possèdent une ou plusieurs sorties et zéro entrée. Au moment de la conception, les composants sources permettent de créer et configurer des connexions, lire des métadonnées de colonne à partir de la source de données externe et configurer les colonnes de sortie de la source en fonction de la source de données externe. Pendant l'exécution, ils se connectent à la source de données externe et ajoutent des lignes à une mémoire tampon de sortie. La tâche de flux de données fournit ensuite cette mémoire tampon de lignes de données aux composants en aval.
Pour obtenir un exemple de composant source, consultez les exemples Integration Services sur Codeplex. Pour une vue d'ensemble du développement de composants de flux de données, consultez Développement d'un composant de flux de données personnalisé.
Moment de la conception
Pour implémenter les fonctionnalités au moment de la conception d'un composant source, il est nécessaire de spécifier une connexion à une source de données externe, d'ajouter et configurer des colonnes de sortie qui reflètent la source de données et de confirmer que le composant est prêt à être exécuté. Par définition, un composant source possède zéro entrée et une ou plusieurs sorties asynchrones.
Création du composant
Les composants sources se connectent à des sources de données externes à l'aide d'objets ConnectionManager définis dans un package. Ils indiquent leur besoin d'un gestionnaire de connexions en ajoutant un élément à la collection RuntimeConnectionCollection de la propriété ComponentMetaData. Cette collection remplit deux fonctions : conserver les références aux gestionnaires de connexions dans le package utilisé par le composant et publier la nécessité d'un gestionnaire de connexions sur le concepteur. Lorsque IDTSRuntimeConnection100 est ajouté à la collection, l'éditeur avancé affiche l'onglet Propriétés de connexion, qui permet aux utilisateurs de sélectionner ou créer une connexion dans le package.
L'exemple de code suivant présente une implémentation de ProvideComponentProperties qui ajoute une sortie, puis ajoute un objet IDTSRuntimeConnection100 à RuntimeConnectionCollection.
using System;
using System.Collections;
using System.Data;
using System.Data.SqlClient;
using System.Data.OleDb;
using Microsoft.SqlServer.Dts.Runtime;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
using Microsoft.SqlServer.Dts.Pipeline;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
namespace Microsoft.Samples.SqlServer.Dts
{
[DtsPipelineComponent(DisplayName = "MySourceComponent",ComponentType = ComponentType.SourceAdapter)]
public class MyComponent : PipelineComponent
{
public override void ProvideComponentProperties()
{
// Reset the component.
base.RemoveAllInputsOutputsAndCustomProperties();
ComponentMetaData.RuntimeConnectionCollection.RemoveAll();
IDTSOutput100 output = ComponentMetaData.OutputCollection.New();
output.Name = "Output";
IDTSRuntimeConnection100 connection = ComponentMetaData.RuntimeConnectionCollection.New();
connection.Name = "ADO.NET";
}
Imports System.Data
Imports System.Data.SqlClient
Imports Microsoft.SqlServer.Dts.Runtime
Imports Microsoft.SqlServer.Dts.Runtime.Wrapper
Imports Microsoft.SqlServer.Dts.Pipeline
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper
<DtsPipelineComponent(DisplayName:="MySourceComponent", ComponentType:=ComponentType.SourceAdapter)> _
Public Class MySourceComponent
Inherits PipelineComponent
Public Overrides Sub ProvideComponentProperties()
' Allow for resetting the component.
RemoveAllInputsOutputsAndCustomProperties()
ComponentMetaData.RuntimeConnectionCollection.RemoveAll()
Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection.New()
output.Name = "Output"
Dim connection As IDTSRuntimeConnection100 = ComponentMetaData.RuntimeConnectionCollection.New()
connection.Name = "ADO.NET"
End Sub
End Class
Connexion à une source de données externe
Lorsqu'une connexion a été ajoutée à RuntimeConnectionCollection, vous pouvez substituer la méthode AcquireConnections pour établir une connexion à la source de données externe. Cette méthode est appelée au moment de la conception et de l'exécution. Le composant doit établir une connexion au gestionnaire de connexions spécifié par la connexion au moment de l'exécution, et par la suite, à la source de données externe.
Une fois la connexion établie, elle doit être mise en cache en interne par le composant et libérée lorsque la méthode ReleaseConnections est appelée. La méthode ReleaseConnections est appelée au moment de la conception et de l'exécution, comme la méthode AcquireConnections. Les développeurs substituent cette méthode et libèrent la connexion établie par le composant pendant l'exécution de la méthode AcquireConnections.
L'exemple de code suivant montre un composant qui établit une connexion ADO.NET dans la méthode AcquireConnections, puis ferme la connexion dans la méthode ReleaseConnections.
private SqlConnection sqlConnection;
public override void AcquireConnections(object transaction)
{
if (ComponentMetaData.RuntimeConnectionCollection[0].ConnectionManager != null)
{
ConnectionManager cm = Microsoft.SqlServer.Dts.Runtime.DtsConvert.GetWrapper(ComponentMetaData.RuntimeConnectionCollection[0].ConnectionManager);
ConnectionManagerAdoNet cmado = cm.InnerObject as ConnectionManagerAdoNet;
if (cmado == null)
throw new Exception("The ConnectionManager " + cm.Name + " is not an ADO.NET connection.");
sqlConnection = cmado.AcquireConnection(transaction) as SqlConnection;
sqlConnection.Open();
}
}
public override void ReleaseConnections()
{
if (sqlConnection != null && sqlConnection.State != ConnectionState.Closed)
sqlConnection.Close();
}
Private sqlConnection As SqlConnection
Public Overrides Sub AcquireConnections(ByVal transaction As Object)
If Not IsNothing(ComponentMetaData.RuntimeConnectionCollection(0).ConnectionManager) Then
Dim cm As ConnectionManager = Microsoft.SqlServer.Dts.Runtime.DtsConvert.GetWrapper(ComponentMetaData.RuntimeConnectionCollection(0).ConnectionManager)
Dim cmado As ConnectionManagerAdoNet = CType(cm.InnerObject, ConnectionManagerAdoNet)
If IsNothing(cmado) Then
Throw New Exception("The ConnectionManager " + cm.Name + " is not an ADO.NET connection.")
End If
sqlConnection = CType(cmado.AcquireConnection(transaction), SqlConnection)
sqlConnection.Open()
End If
End Sub
Public Overrides Sub ReleaseConnections()
If Not IsNothing(sqlConnection) And sqlConnection.State <> ConnectionState.Closed Then
sqlConnection.Close()
End If
End Sub
Création et configuration de colonnes de sortie
Les colonnes de sortie d'un composant source reflètent les colonnes de la source de données externe que le composant ajoute au flux de données pendant l'exécution. Au moment de la conception, vous devez ajouter des colonnes de sortie une fois que le composant a été configuré pour se connecter à une source de données externe. La méthode au moment de la conception qu'un composant utilise pour ajouter les colonnes à sa collection de sortie peut varier selon les besoins du composant, bien qu'elles ne doivent pas être ajoutées pendant l'exécution des méthodes Validate ou AcquireConnections. Par exemple, un composant contenant une instruction SQL dans une propriété personnalisée qui contrôle le jeu de données du composant peut ajouter ses colonnes de sortie pendant l'exécution de la méthode SetComponentProperty. Le composant vérifie s'il possède une connexion mise en cache, et si c'est le cas, il se connecte à la source de données et génère ses colonnes de sortie.
Après avoir créé une colonne de sortie, définissez ses propriétés de type de données en appelant la méthode SetDataTypeProperties. Cette méthode est nécessaire car les propriétés DataType, Length, Precision et CodePage sont en lecture seule et que chaque propriété dépend des paramètres de l'autre. Cette méthode impose l'obligation de définir ces valeurs de manière cohérente et la tâche de flux de données valide le fait qu'elles sont définies de manière correcte.
La propriété DataType de la colonne détermine les valeurs définies pour les autres propriétés. Le tableau suivant indique les conditions requises sur les propriétés dépendantes de chaque propriété DataType. Les types de données non répertoriés ont leurs propriétés dépendantes définies sur zéro.
DataType |
Longueur |
Échelle |
Précision |
CodePage |
---|---|---|---|---|
DT_DECIMAL |
0 |
Supérieur à 0 et inférieur ou égal à 28. |
0 |
0 |
DT_CY |
0 |
0 |
0 |
0 |
DT_NUMERIC |
0 |
Supérieur à 0 et inférieur ou égale à 28 et inférieur à Précision. |
Supérieur ou égal à 1 et inférieur ou égal à 38. |
0 |
DT_BYTES |
Supérieur à 0. |
0 |
0 |
0 |
DT_STR |
Supérieur à 0 et inférieur à 8000. |
0 |
0 |
Différent de 0 et une page de codes valide. |
DT_WSTR |
Supérieur à 0 et inférieur à 4000. |
0 |
0 |
0 |
Étant donné que les restrictions sur les propriétés de type de données sont basées sur le type de données de la colonne de sortie, vous devez choisir le type de données SSIS correct lorsque vous utilisez des types managés. La classe de base fournit trois méthodes d'assistance, ConvertBufferDataTypeToFitManaged, BufferTypeToDataRecordType et DataRecordTypeToBufferType, qui aident les développeurs de composants managés à sélectionner un type de données SSIS en fonction d'un type managé. Ces méthodes convertissent des types de données managés en types de données SSIS, et vice versa.
L'exemple de code suivant montre comment la collection de colonnes de sortie d'un composant est remplie selon le schéma d'une table. Les méthodes d'assistance de la classe de base permettent de définir le type de données de la colonne, qui lui-même définit les propriétés dépendantes.
SqlCommand sqlCommand;
private void CreateColumnsFromDataTable()
{
// Get the output.
IDTSOutput100 output = ComponentMetaData.OutputCollection[0];
// Start clean, and remove the columns from both collections.
output.OutputColumnCollection.RemoveAll();
output.ExternalMetadataColumnCollection.RemoveAll();
this.sqlCommand = sqlConnection.CreateCommand();
this.sqlCommand.CommandType = CommandType.Text;
this.sqlCommand.CommandText = (string)ComponentMetaData.CustomPropertyCollection["SqlStatement"].Value;
SqlDataReader schemaReader = this.sqlCommand.ExecuteReader(CommandBehavior.SchemaOnly);
DataTable dataTable = schemaReader.GetSchemaTable();
// Walk the columns in the schema,
// and for each data column create an output column and an external metadata column.
foreach (DataRow row in dataTable.Rows)
{
IDTSOutputColumn100 outColumn = output.OutputColumnCollection.New();
IDTSExternalMetadataColumn100 exColumn = output.ExternalMetadataColumnCollection.New();
// Set column data type properties.
bool isLong = false;
DataType dt = DataRecordTypeToBufferType((Type)row["DataType"]);
dt = ConvertBufferDataTypeToFitManaged(dt, ref isLong);
int length = 0;
int precision = (short)row["NumericPrecision"];
int scale = (short)row["NumericScale"];
int codepage = dataTable.Locale.TextInfo.ANSICodePage;
switch (dt)
{
// The length cannot be zero, and the code page property must contain a valid code page.
case DataType.DT_STR:
case DataType.DT_TEXT:
length = precision;
precision = 0;
scale = 0;
break;
case DataType.DT_WSTR:
length = precision;
codepage = 0;
scale = 0;
precision = 0;
break;
case DataType.DT_BYTES:
precision = 0;
scale = 0;
codepage = 0;
break;
case DataType.DT_NUMERIC:
length = 0;
codepage = 0;
if (precision > 38)
precision = 38;
if (scale > 6)
scale = 6;
break;
case DataType.DT_DECIMAL:
length = 0;
precision = 0;
codepage = 0;
break;
default:
length = 0;
precision = 0;
codepage = 0;
scale = 0;
break;
}
// Set the properties of the output column.
outColumn.Name = (string)row["ColumnName"];
outColumn.SetDataTypeProperties(dt, length, precision, scale, codepage);
}
}
Private sqlCommand As SqlCommand
Private Sub CreateColumnsFromDataTable()
' Get the output.
Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0)
' Start clean, and remove the columns from both collections.
output.OutputColumnCollection.RemoveAll()
output.ExternalMetadataColumnCollection.RemoveAll()
Me.sqlCommand = sqlConnection.CreateCommand()
Me.sqlCommand.CommandType = CommandType.Text
Me.sqlCommand.CommandText = CStr(ComponentMetaData.CustomPropertyCollection("SqlStatement").Value)
Dim schemaReader As SqlDataReader = Me.sqlCommand.ExecuteReader(CommandBehavior.SchemaOnly)
Dim dataTable As DataTable = schemaReader.GetSchemaTable()
' Walk the columns in the schema,
' and for each data column create an output column and an external metadata column.
For Each row As DataRow In dataTable.Rows
Dim outColumn As IDTSOutputColumn100 = output.OutputColumnCollection.New()
Dim exColumn As IDTSExternalMetadataColumn100 = output.ExternalMetadataColumnCollection.New()
' Set column data type properties.
Dim isLong As Boolean = False
Dim dt As DataType = DataRecordTypeToBufferType(CType(row("DataType"), Type))
dt = ConvertBufferDataTypeToFitManaged(dt, isLong)
Dim length As Integer = 0
Dim precision As Integer = CType(row("NumericPrecision"), Short)
Dim scale As Integer = CType(row("NumericScale"), Short)
Dim codepage As Integer = dataTable.Locale.TextInfo.ANSICodePage
Select Case dt
' The length cannot be zero, and the code page property must contain a valid code page.
Case DataType.DT_STR
Case DataType.DT_TEXT
length = precision
precision = 0
scale = 0
Case DataType.DT_WSTR
length = precision
codepage = 0
scale = 0
precision = 0
Case DataType.DT_BYTES
precision = 0
scale = 0
codepage = 0
Case DataType.DT_NUMERIC
length = 0
codepage = 0
If precision > 38 Then
precision = 38
End If
If scale > 6 Then
scale = 6
End If
Case DataType.DT_DECIMAL
length = 0
precision = 0
codepage = 0
Case Else
length = 0
precision = 0
codepage = 0
scale = 0
End Select
' Set the properties of the output column.
outColumn.Name = CStr(row("ColumnName"))
outColumn.SetDataTypeProperties(dt, length, precision, scale, codepage)
Next
End Sub
Validation du composant
Vous devez valider un composant source et vérifier que les colonnes définies dans ses collections de colonnes de sortie correspondent aux colonnes au niveau de la source de données externe. Il est parfois impossible de vérifier les colonnes de sortie par rapport à la source de données externe, notamment lorsque le composant est déconnecté ou lorsqu'il est préférable d'éviter de longs allers-retours au serveur. Dans ce cas, il est toujours possible de valider les colonnes dans la sortie à l'aide de la propriété ExternalMetadataColumnCollection de l'objet de sortie. Pour plus d'informations, consultez Validation d'un composant de flux de données.
Cette collection existe sur les objets d'entrée et de sortie et vous pouvez la remplir avec les colonnes de la source de données externe. Cette collection permet de valider les colonnes de sortie lorsque le concepteur SSIS est hors connexion, lorsque le composant est déconnecté ou lorsque la propriété ValidateExternalMetadata a la valeur false. La collection doit d'abord être remplie à mesure que les colonnes de sortie sont créées. Étant donné que la colonne de métadonnées externe doit correspondre initialement à la colonne de sortie, il est relativement facile de l'ajouter à la collection. Les propriétés de type de données de la colonne, qui doivent déjà être définies correctement, peuvent être copiées directement dans l'objet IDTSExternalMetadataColumn100.
L'exemple de code suivant ajoute une colonne de métadonnées externe basée sur une colonne de sortie créée récemment. Il suppose que la colonne de sortie a déjà été créée.
private void CreateExternalMetaDataColumn(IDTSOutput100 output, IDTSOutputColumn100 outputColumn)
{
// Set the properties of the external metadata column.
IDTSExternalMetadataColumn100 externalColumn = output.ExternalMetadataColumnCollection.New();
externalColumn.Name = outputColumn.Name;
externalColumn.Precision = outputColumn.Precision;
externalColumn.Length = outputColumn.Length;
externalColumn.DataType = outputColumn.DataType;
externalColumn.Scale = outputColumn.Scale;
// Map the external column to the output column.
outputColumn.ExternalMetadataColumnID = externalColumn.ID;
}
Private Sub CreateExternalMetaDataColumn(ByVal output As IDTSOutput100, ByVal outputColumn As IDTSOutputColumn100)
' Set the properties of the external metadata column.
Dim externalColumn As IDTSExternalMetadataColumn100 = output.ExternalMetadataColumnCollection.New()
externalColumn.Name = outputColumn.Name
externalColumn.Precision = outputColumn.Precision
externalColumn.Length = outputColumn.Length
externalColumn.DataType = outputColumn.DataType
externalColumn.Scale = outputColumn.Scale
' Map the external column to the output column.
outputColumn.ExternalMetadataColumnID = externalColumn.ID
End Sub
Moment de l'exécution
Pendant l'exécution, les composants ajoutent des lignes dans les mémoires tampons de sortie créées par la tâche de flux de données et fournies au composant dans PrimeOutput. Appelée une fois pour des composants sources, la méthode reçoit une mémoire tampon de sortie pour chaque IDTSOutput100 du composant connecté à un composant en aval.
Recherche de colonnes dans la mémoire tampon
La mémoire tampon de sortie d'un composant contient les colonnes définies par le composant et toutes les colonnes ajoutées à la sortie d'un composant en aval. Par exemple, si un composant source fournit trois colonnes dans sa sortie et que le composant suivant ajoute une quatrième colonne de sortie, la mémoire tampon de sortie destinée au composant source contient ces quatre colonnes.
L'ordre des colonnes dans une ligne de mémoire tampon n'est pas défini par l'index de la colonne de sortie dans la collection de colonnes de sortie. Seule la méthode FindColumnByLineageID de BufferManager permet de placer correctement une colonne de sortie dans une ligne de mémoire tampon. Cette méthode recherche la colonne avec l'ID de lignage spécifié dans la mémoire tampon spécifiée et retourne son emplacement dans la ligne. Les index des colonnes de sortie se trouvent généralement dans la méthode PreExecute et sont stockés pour être utilisés lors de l'exécution de la méthode PrimeOutput.
L'exemple de code suivant recherche l'emplacement des colonnes de sortie dans la mémoire tampon de sortie pendant un appel à PreExecute et stocke l'information dans une structure interne. Le nom de la colonne, également stocké dans la structure, est utilisé dans l'exemple de code de la méthode PrimeOutput présenté dans la section suivante de cette rubrique.
ArrayList columnInformation;
private struct ColumnInfo
{
public int BufferColumnIndex;
public string ColumnName;
}
public override void PreExecute()
{
this.columnInformation = new ArrayList();
IDTSOutput100 output = ComponentMetaData.OutputCollection[0];
foreach (IDTSOutputColumn100 col in output.OutputColumnCollection)
{
ColumnInfo ci = new ColumnInfo();
ci.BufferColumnIndex = BufferManager.FindColumnByLineageID(output.Buffer, col.LineageID);
ci.ColumnName = col.Name;
columnInformation.Add(ci);
}
}
Public Overrides Sub PreExecute()
Me.columnInformation = New ArrayList()
Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0)
For Each col As IDTSOutputColumn100 In output.OutputColumnCollection
Dim ci As ColumnInfo = New ColumnInfo()
ci.BufferColumnIndex = BufferManager.FindColumnByLineageID(output.Buffer, col.LineageID)
ci.ColumnName = col.Name
columnInformation.Add(ci)
Next
End Sub
Traitement des lignes
Des lignes sont ajoutées à la mémoire tampon de sortie en appelant la méthode AddRow, qui crée une ligne de mémoire tampon dont les colonnes contiennent des valeurs vides. Le composant attribue ensuite des valeurs aux colonnes individuelles. Les mémoires tampons de sortie fournies à un composant sont créées et analysées par la tâche de flux de données. À mesure que les mémoires tampons se remplissent, les lignes qu'elles contiennent sont déplacées vers le composant suivant. Il est impossible de déterminer le moment de l'envoi d'un lot de lignes au composant suivant car le déplacement des lignes par la tâche de flux de données est transparent pour les développeurs de composants. De plus, la propriété RowCount a toujours la valeur zéro sur les mémoires tampons de sortie. Lorsqu'un composant source a terminé d'ajouter des lignes à sa mémoire tampon de sortie, il le notifie à la tâche de flux de données en appelant la méthode SetEndOfRowset de PipelineBuffer, et les lignes restantes dans la mémoire tampon sont transférées au composant suivant.
Pendant que le composant source lit des lignes de la source de données externe, vous pouvez mettre à jour les compteurs de performance « Lignes lues » ou « Octets BLOB lus » en appelant la méthode IncrementPipelinePerfCounter. Pour plus d'informations, consultez Analyse des performances du moteur de flux de données.
L'exemple de code suivant montre un composant qui ajoute des lignes à une mémoire tampon de sortie dans PrimeOutput. La recherche des index des colonnes de sortie dans la mémoire tampon a été effectuée à l'aide de la méthode PreExecute dans l'exemple de code précédent.
public override void PrimeOutput(int outputs, int[] outputIDs, PipelineBuffer[] buffers)
{
IDTSOutput100 output = ComponentMetaData.OutputCollection[0];
PipelineBuffer buffer = buffers[0];
SqlDataReader dataReader = sqlCommand.ExecuteReader();
// Loop over the rows in the DataReader,
// and add them to the output buffer.
while (dataReader.Read())
{
// Add a row to the output buffer.
buffer.AddRow();
for (int x = 0; x < columnInformation.Count; x++)
{
ColumnInfo ci = (ColumnInfo)columnInformation[x];
int ordinal = dataReader.GetOrdinal(ci.ColumnName);
if (dataReader.IsDBNull(ordinal))
buffer.SetNull(ci.BufferColumnIndex);
else
{
buffer[ci.BufferColumnIndex] = dataReader[ci.ColumnName];
}
}
}
buffer.SetEndOfRowset();
}
Public Overrides Sub PrimeOutput(ByVal outputs As Integer, ByVal outputIDs As Integer(), ByVal buffers As PipelineBuffer())
Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0)
Dim buffer As PipelineBuffer = buffers(0)
Dim dataReader As SqlDataReader = sqlCommand.ExecuteReader()
' Loop over the rows in the DataReader,
' and add them to the output buffer.
While (dataReader.Read())
' Add a row to the output buffer.
buffer.AddRow()
For x As Integer = 0 To columnInformation.Count
Dim ci As ColumnInfo = CType(columnInformation(x), ColumnInfo)
Dim ordinal As Integer = dataReader.GetOrdinal(ci.ColumnName)
If (dataReader.IsDBNull(ordinal)) Then
buffer.SetNull(ci.BufferColumnIndex)
Else
buffer(ci.BufferColumnIndex) = dataReader(ci.ColumnName)
End If
Next
End While
buffer.SetEndOfRowset()
End Sub
Exemple
L'exemple suivant montre un composant source simple qui utilise un gestionnaire de connexions de fichiers pour charger le contenu binaire des fichiers dans le flux de données. Cet exemple ne contient pas toutes les méthodes et fonctionnalités présentées dans cette rubrique. Il illustre les méthodes importantes que chaque composant source personnalisé doit substituer, mais ne contient pas de code pour la validation au moment de la conception. Pour obtenir un exemple de composant source plus complet, consultez les exemples Integration Services sur Codeplex.
using System;
using System.IO;
using Microsoft.SqlServer.Dts.Pipeline;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
namespace BlobSrc
{
[DtsPipelineComponent(DisplayName = "BLOB Inserter Source", Description = "Inserts files into the data flow as BLOBs")]
public class BlobSrc : PipelineComponent
{
IDTSConnectionManager100 m_ConnMgr;
int m_FileNameColumnIndex = -1;
int m_FileBlobColumnIndex = -1;
public override void ProvideComponentProperties()
{
IDTSOutput100 output = ComponentMetaData.OutputCollection.New();
output.Name = "BLOB File Inserter Output";
IDTSOutputColumn100 column = output.OutputColumnCollection.New();
column.Name = "FileName";
column.SetDataTypeProperties(DataType.DT_WSTR, 256, 0, 0, 0);
column = output.OutputColumnCollection.New();
column.Name = "FileBLOB";
column.SetDataTypeProperties(DataType.DT_IMAGE, 0, 0, 0, 0);
IDTSRuntimeConnection100 conn = ComponentMetaData.RuntimeConnectionCollection.New();
conn.Name = "FileConnection";
}
public override void AcquireConnections(object transaction)
{
IDTSRuntimeConnection100 conn = ComponentMetaData.RuntimeConnectionCollection[0];
m_ConnMgr = conn.ConnectionManager;
}
public override void ReleaseConnections()
{
m_ConnMgr = null;
}
public override void PreExecute()
{
IDTSOutput100 output = ComponentMetaData.OutputCollection[0];
m_FileNameColumnIndex = (int)BufferManager.FindColumnByLineageID(output.Buffer, output.OutputColumnCollection[0].LineageID);
m_FileBlobColumnIndex = (int)BufferManager.FindColumnByLineageID(output.Buffer, output.OutputColumnCollection[1].LineageID);
}
public override void PrimeOutput(int outputs, int[] outputIDs, PipelineBuffer[] buffers)
{
string strFileName = (string)m_ConnMgr.AcquireConnection(null);
while (strFileName != null)
{
buffers[0].AddRow();
buffers[0].SetString(m_FileNameColumnIndex, strFileName);
FileInfo fileInfo = new FileInfo(strFileName);
byte[] fileData = new byte[fileInfo.Length];
FileStream fs = new FileStream(strFileName, FileMode.Open, FileAccess.Read, FileShare.Read);
fs.Read(fileData, 0, fileData.Length);
buffers[0].AddBlobData(m_FileBlobColumnIndex, fileData);
strFileName = (string)m_ConnMgr.AcquireConnection(null);
}
buffers[0].SetEndOfRowset();
}
}
}
Imports System
Imports System.IO
Imports Microsoft.SqlServer.Dts.Pipeline
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper
Imports Microsoft.SqlServer.Dts.Runtime.Wrapper
Namespace BlobSrc
<DtsPipelineComponent(DisplayName="BLOB Inserter Source", Description="Inserts files into the data flow as BLOBs")> _
Public Class BlobSrc
Inherits PipelineComponent
Private m_ConnMgr As IDTSConnectionManager100
Private m_FileNameColumnIndex As Integer = -1
Private m_FileBlobColumnIndex As Integer = -1
Public Overrides Sub ProvideComponentProperties()
Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection.New
output.Name = "BLOB File Inserter Output"
Dim column As IDTSOutputColumn100 = output.OutputColumnCollection.New
column.Name = "FileName"
column.SetDataTypeProperties(DataType.DT_WSTR, 256, 0, 0, 0)
column = output.OutputColumnCollection.New
column.Name = "FileBLOB"
column.SetDataTypeProperties(DataType.DT_IMAGE, 0, 0, 0, 0)
Dim conn As IDTSRuntimeConnection90 = ComponentMetaData.RuntimeConnectionCollection.New
conn.Name = "FileConnection"
End Sub
Public Overrides Sub AcquireConnections(ByVal transaction As Object)
Dim conn As IDTSRuntimeConnection100 = ComponentMetaData.RuntimeConnectionCollection(0)
m_ConnMgr = conn.ConnectionManager
End Sub
Public Overrides Sub ReleaseConnections()
m_ConnMgr = Nothing
End Sub
Public Overrides Sub PreExecute()
Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0)
m_FileNameColumnIndex = CType(BufferManager.FindColumnByLineageID(output.Buffer, output.OutputColumnCollection(0).LineageID), Integer)
m_FileBlobColumnIndex = CType(BufferManager.FindColumnByLineageID(output.Buffer, output.OutputColumnCollection(1).LineageID), Integer)
End Sub
Public Overrides Sub PrimeOutput(ByVal outputs As Integer, ByVal outputIDs As Integer(), ByVal buffers As PipelineBuffer())
Dim strFileName As String = CType(m_ConnMgr.AcquireConnection(Nothing), String)
While Not (strFileName Is Nothing)
buffers(0).AddRow
buffers(0).SetString(m_FileNameColumnIndex, strFileName)
Dim fileInfo As FileInfo = New FileInfo(strFileName)
Dim fileData(fileInfo.Length) As Byte
Dim fs As FileStream = New FileStream(strFileName, FileMode.Open, FileAccess.Read, FileShare.Read)
fs.Read(fileData, 0, fileData.Length)
buffers(0).AddBlobData(m_FileBlobColumnIndex, fileData)
strFileName = CType(m_ConnMgr.AcquireConnection(Nothing), String)
End While
buffers(0).SetEndOfRowset
End Sub
End Class
End Namespace
|