次の方法で共有


カスタム変換先コンポーネントの開発

適用対象: SQL Server Azure Data Factory の SSIS 統合ランタイム

開発者は、MicrosoftSQL Server Integration Services を使用すると、任意のカスタム データ ソースに接続してデータを格納するためのカスタム変換先コンポーネントを記述することができます。 カスタム変換先コンポーネントは、Integration Services に含まれている、既存の変換先コンポーネントを使用してもアクセスできないデータ ソースに接続する必要がある場合に役立ちます。

変換先コンポーネントには 1 つ以上の入力がありますが、出力はありません。 デザイン時に、外部データ ソースとの接続を作成して設定を行い、そこから列メタデータを読み込みます。 実行時には、外部データ ソースに接続し、データ フロー内で上流にあるコンポーネントから受け取った行を、この外部データ ソースに追加します。 コンポーネントの実行前に外部データ ソースが存在する場合は、変換先コンポーネントにより、コンポーネントが受け取る列のデータ型が、外部データ ソースの列のデータ型と一致することを確認する必要もあります。

このセクションでは、変換先コンポーネントの開発方法の詳細について説明し、重要な概念をわかりやすくするため、コード例を示します。 データ フロー コンポーネントの開発全般の概要については、「カスタム データ フロー コンポーネントの開発」を参照してください。

デザイン時

変換先コンポーネントのデザイン時の機能を実装する作業には、外部データ ソースへの接続の指定、およびコンポーネントが正しく設定されているかどうかの検証が含まれます。 定義上、変換先コンポーネントには 1 つの入力と、場合によって 1 つのエラー出力があります。

コンポーネントの作成

変換先コンポーネントは、パッケージで定義された ConnectionManager オブジェクトを使用して、外部データ ソースに接続します。 変換先コンポーネントは、接続マネージャーに関する要求を SSIS デザイナーやコンポーネントのユーザーに示すため、ComponentMetaDataRuntimeConnectionCollection コレクションに要素を追加します。 このコレクションには 2 つの目的があります。1 つは、接続マネージャーの必要性を SSIS デザイナーに通知することで、もう 1 つは、ユーザーが接続マネージャーを選択または作成した後に、コンポーネントが使用する、パッケージ内の接続マネージャーへの参照を保持することです。 IDTSRuntimeConnection100 がコレクションに追加されると、[詳細エディター][接続プロパティ] タブを表示します。このタブでは、コンポーネントが使用する接続をパッケージ内で選択したり、新しく作成したりすることができます。

次のコード例は、ProvideComponentProperties に入力を追加し、次に IDTSRuntimeConnection100 オブジェクトを追加する RuntimeConnectionCollection の実装を示します。

using System;  
using Microsoft.SqlServer.Dts.Pipeline;  
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;  
using Microsoft.SqlServer.Dts.Runtime;  
  
namespace Microsoft.Samples.SqlServer.Dts  
{  
    [DtsPipelineComponent(DisplayName = "Destination Component",ComponentType =ComponentType.DestinationAdapter)]  
    public class DestinationComponent : PipelineComponent   
    {  
        public override void ProvideComponentProperties()  
        {  
            // Reset the component.  
            base.RemoveAllInputsOutputsAndCustomProperties();  
            ComponentMetaData.RuntimeConnectionCollection.RemoveAll();  
  
            IDTSInput100 input = ComponentMetaData.InputCollection.New();  
            input.Name = "Input";  
  
            IDTSRuntimeConnection100 connection = ComponentMetaData.RuntimeConnectionCollection.New();  
            connection.Name = "ADO.net";  
        }  
    }  
}  
Imports System  
Imports System.Data  
Imports System.Data.SqlClient  
Imports Microsoft.SqlServer.Dts.Pipeline  
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper  
Imports Microsoft.SqlServer.Dts.Runtime  
  
Namespace Microsoft.Samples.SqlServer.Dts  
  
    <DtsPipelineComponent(DisplayName:="Destination Component", ComponentType:=ComponentType.DestinationAdapter)> _  
    Public Class DestinationComponent  
        Inherits PipelineComponent  
  
        Public Overrides Sub ProvideComponentProperties()  
  
            ' Reset the component.  
            Me.RemoveAllInputsOutputsAndCustomProperties()  
            ComponentMetaData.RuntimeConnectionCollection.RemoveAll()  
  
            Dim input As IDTSInput100 = ComponentMetaData.InputCollection.New()  
            input.Name = "Input"  
  
            Dim connection As IDTSRuntimeConnection100 = ComponentMetaData.RuntimeConnectionCollection.New()  
            connection.Name = "ADO.net"  
  
        End Sub  
    End Class  
End Namespace  

外部データ ソースへの接続

RuntimeConnectionCollection に接続を追加した後、AcquireConnections メソッドをオーバーライドして、外部データ ソースへの接続を確立します。 このメソッドは、デザイン時と実行時に呼び出されます。 コンポーネントは、実行時の接続によって指定された接続マネージャーへの接続を確立し、続いて外部データ ソースへの接続を確立する必要があります。 接続が確立されると、コンポーネントは接続を内部にキャッシュし、ReleaseConnections が呼び出されると解放します。 開発者はこのメソッドをオーバーライドし、コンポーネントが確立した接続を AcquireConnections の実行中に解放します。 これらの ReleaseConnections メソッドおよび AcquireConnections メソッドは、どちらもデザイン時と実行時に呼び出されます。

次のコード例では、AcquireConnections メソッドで ADO.NET 接続へ接続し、次に ReleaseConnections で接続を閉じるコンポーネントを示します。

using Microsoft.SqlServer.Dts.Runtime.Wrapper;  
  
private SqlConnection sqlConnection;  
  
public override void AcquireConnections(object transaction)  
{  
    if (ComponentMetaData.RuntimeConnectionCollection[0].ConnectionManager != null)  
    {  
        ConnectionManager cm = Microsoft.SqlServer.Dts.Runtime.DtsConvert.GetWrapper(ComponentMetaData.RuntimeConnectionCollection[0].ConnectionManager);  
        ConnectionManagerAdoNet cmado = cm.InnerObject as ConnectionManagerAdoNet;  
  
        if (cmado == null)  
            throw new Exception("The ConnectionManager " + cm.Name + " is not an ADO.NET connection.");  
  
        sqlConnection = cmado.AcquireConnection(transaction) as SqlConnection;  
        sqlConnection.Open();  
    }  
}  
  
public override void ReleaseConnections()  
{  
    if (sqlConnection != null && sqlConnection.State != ConnectionState.Closed)  
        sqlConnection.Close();  
}  
Imports Microsoft.SqlServer.Dts.Runtime.Wrapper  
  
Private sqlConnection As SqlConnection  
  
Public Overrides Sub AcquireConnections(ByVal transaction As Object)  
  
    If IsNothing(ComponentMetaData.RuntimeConnectionCollection(0).ConnectionManager) = False Then  
  
        Dim cm As ConnectionManager = DtsConvert.GetWrapper(ComponentMetaData.RuntimeConnectionCollection(0).ConnectionManager)  
        Dim cmado As ConnectionManagerAdoNet = CType(cm.InnerObject,ConnectionManagerAdoNet)  
  
        If IsNothing(cmado) Then  
            Throw New Exception("The ConnectionManager " + cm.Name + " is not an ADO.NET connection.")  
        End If  
  
        sqlConnection = CType(cmado.AcquireConnection(transaction), SqlConnection)  
        sqlConnection.Open()  
  
    End If  
End Sub  
  
Public Overrides Sub ReleaseConnections()  
  
    If IsNothing(sqlConnection) = False And sqlConnection.State <> ConnectionState.Closed Then  
        sqlConnection.Close()  
    End If  
  
End Sub  

コンポーネントの検証

コンポーネントの検証で説明しているように、変換先コンポーネントの開発者は検証を実行する必要があります。 また、コンポーネントの入力列コレクションで定義されている列のデータ型プロパティが、外部データ ソースの列と一致することを検証する必要もあります。 ただし、コンポーネントまたは SSIS デザイナーが接続されていない状態の場合や、サーバーへのラウンド トリップが許可されない場合など、外部データ ソースに対する入力列の検証が不可能または望ましくないこともあります。 このような状況でも、入力オブジェクトの ExternalMetadataColumnCollection を使用して、入力列コレクションの列を検証することができます。

このコレクションは入力オブジェクトおよび出力オブジェクトの両方に存在し、コンポーネント開発者が外部データ ソースの列から設定します。 SSIS デザイナーがオフラインである場合、コンポーネントが接続されていない場合、または ValidateExternalMetadata プロパティが false である場合は、このコレクションを使用して入力列を検証できます。

次のコード例は、既存の入力列に基づく外部メタデータ列を追加します。

private void AddExternalMetaDataColumn(IDTSInput100 input,IDTSInputColumn100 inputColumn)  
{  
    // Set the properties of the external metadata column.  
    IDTSExternalMetadataColumn100 externalColumn = input.ExternalMetadataColumnCollection.New();  
    externalColumn.Name = inputColumn.Name;  
    externalColumn.Precision = inputColumn.Precision;  
    externalColumn.Length = inputColumn.Length;  
    externalColumn.DataType = inputColumn.DataType;  
    externalColumn.Scale = inputColumn.Scale;  
  
    // Map the external column to the input column.  
    inputColumn.ExternalMetadataColumnID = externalColumn.ID;  
}  
Private Sub AddExternalMetaDataColumn(ByVal input As IDTSInput100, ByVal inputColumn As IDTSInputColumn100)  
  
    ' Set the properties of the external metadata column.  
    Dim externalColumn As IDTSExternalMetadataColumn100 = input.ExternalMetadataColumnCollection.New()  
    externalColumn.Name = inputColumn.Name  
    externalColumn.Precision = inputColumn.Precision  
    externalColumn.Length = inputColumn.Length  
    externalColumn.DataType = inputColumn.DataType  
    externalColumn.Scale = inputColumn.Scale  
  
    ' Map the external column to the input column.  
    inputColumn.ExternalMetadataColumnID = externalColumn.ID  
  
End Sub  

実行時間

実行中は、データでいっぱいになった ProcessInput を上流コンポーネントから渡されるたびに、変換先コンポーネントは PipelineBuffer メソッドに対する呼び出しを受け取ります。 このメソッドは、渡されるバッファーがなくなり、EndOfRowset プロパティが true になるまで、繰り返して呼び出されます。 このメソッド内で、変換先コンポーネントはバッファー内の行と列を読み取り、外部データ ソースに追加します。

バッファー内の列の検索

コンポーネントの入力バッファーには、データ フロー内でそのコンポーネントの上流に位置するコンポーネントの出力列コレクションで定義された、すべての列が格納されています。 たとえば、変換元コンポーネントの出力に 3 つの列があり、その次のコンポーネントで出力列が 1 つ追加された場合、変換先コンポーネントが書き出す列が 2 つのみの場合でも、変換先コンポーネントに用意されたバッファーには 4 つの列が格納されます。

入力バッファー内の列の順序は、変換先コンポーネントの入力列コレクション内の列のインデックスでは定義されません。 バッファー行で列を確実に検索するには、FindColumnByLineageIDBufferManager メソッドを使用するしか方法がありません。 このメソッドは、指定されたバッファー内で指定された系列 ID の列を検索し、行内の位置を返します。 入力列のインデックスは、通常、PreExecute メソッド内で検索され、後で ProcessInput で使用するために開発者によってキャッシュされます。

次のコード例は、PreExecute でバッファー内の入力列の位置を検索し、配列に保存します。 続いてこの配列を ProcessInput の処理中に使用し、バッファー内の列の値を読み取ります。

int[] cols;  
  
public override void PreExecute()  
{  
    IDTSInput100 input = ComponentMetaData.InputCollection[0];  
  
    cols = new int[input.InputColumnCollection.Count];  
  
    for (int x = 0; x < input.InputColumnCollection.Count; x++)  
    {  
        cols[x] = BufferManager.FindColumnByLineageID(input.Buffer, input.InputColumnCollection[x].LineageID);  
    }  
}  
Private cols As Integer()  
  
Public Overrides Sub PreExecute()  
  
    Dim input As IDTSInput100 = ComponentMetaData.InputCollection(0)  
  
    ReDim cols(input.InputColumnCollection.Count)  
  
    For x As Integer = 0 To input.InputColumnCollection.Count  
  
        cols(x) = BufferManager.FindColumnByLineageID(input.Buffer, input.InputColumnCollection(x).LineageID)  
    Next x  
  
End Sub  

行の処理

バッファー内の入力列を検索したら、列の値を読み取って外部データ ソースに書き出すことができます。

変換先コンポーネントが行を外部データ ソースに書き込んでいる間に、IncrementPipelinePerfCounter メソッドを呼び出して "Rows read" または "BLOB bytes read" パフォーマンス カウンターを更新することができます。 これらのパフォーマンス カウンターの詳細については、「 パフォーマンス カウンター」を参照してください。

次の例は、ProcessInput で提供されるバッファーから行を読み取るコンポーネントを示します。 バッファー内の列のインデックスは、先のコード例にある PreExecute 内で検索したものを使用しています。

public override void ProcessInput(int inputID, PipelineBuffer buffer)  
{  
        while (buffer.NextRow())  
        {  
            foreach (int col in cols)  
            {  
                if (!buffer.IsNull(col))  
                {  
                    //  TODO: Read the column data.  
                }  
            }  
        }  
}  
Public Overrides Sub ProcessInput(ByVal inputID As Integer, ByVal buffer As PipelineBuffer)  
  
        While (buffer.NextRow())  
  
            For Each col As Integer In cols  
  
                If buffer.IsNull(col) = False Then  
  
                    '  TODO: Read the column data.  
                End If  
  
            Next col  
        End While  
End Sub  

サンプル

次の例は、ファイル接続マネージャーを使用して、データ フローからのバイナリ データをファイルに保存する、簡単な変換先コンポーネントを示します。 ただし、この例ではここで説明したメソッドや機能のすべてが使われているわけではありません。 変換先コンポーネントが必ずオーバーライドしなければならない重要なメソッドは示していますが、デザイン時の検証のためのコードは含まれていません。

using System;  
using System.IO;  
using Microsoft.SqlServer.Dts.Pipeline;  
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;  
  
namespace BlobDst  
{  
  [DtsPipelineComponent(DisplayName = "BLOB Extractor Destination", Description = "Writes values of BLOB columns to files")]  
  public class BlobDst : PipelineComponent  
  {  
    string m_DestDir;  
    int m_FileNameColumnIndex = -1;  
    int m_BlobColumnIndex = -1;  
  
    public override void ProvideComponentProperties()  
    {  
      IDTSInput100 input = ComponentMetaData.InputCollection.New();  
      input.Name = "BLOB Extractor Destination Input";  
      input.HasSideEffects = true;  
  
      IDTSRuntimeConnection100 conn = ComponentMetaData.RuntimeConnectionCollection.New();  
      conn.Name = "FileConnection";  
    }  
  
    public override void AcquireConnections(object transaction)  
    {  
      IDTSRuntimeConnection100 conn = ComponentMetaData.RuntimeConnectionCollection[0];  
      m_DestDir = (string)conn.ConnectionManager.AcquireConnection(null);  
  
      if (m_DestDir.Length > 0 && m_DestDir[m_DestDir.Length - 1] != '\\')  
        m_DestDir += "\\";  
    }  
  
    public override IDTSInputColumn100 SetUsageType(int inputID, IDTSVirtualInput100 virtualInput, int lineageID, DTSUsageType usageType)  
    {  
      IDTSInputColumn100 inputColumn = base.SetUsageType(inputID, virtualInput, lineageID, usageType);  
      IDTSCustomProperty100 custProp;  
  
      custProp = inputColumn.CustomPropertyCollection.New();  
      custProp.Name = "IsFileName";  
      custProp.Value = (object)false;  
  
      custProp = inputColumn.CustomPropertyCollection.New();  
      custProp.Name = "IsBLOB";  
      custProp.Value = (object)false;  
  
      return inputColumn;  
    }  
  
    public override void PreExecute()  
    {  
      IDTSInput100 input = ComponentMetaData.InputCollection[0];  
      IDTSInputColumnCollection100 inputColumns = input.InputColumnCollection;  
      IDTSCustomProperty100 custProp;  
  
      foreach (IDTSInputColumn100 column in inputColumns)  
      {  
        custProp = column.CustomPropertyCollection["IsFileName"];  
        if ((bool)custProp.Value == true)  
        {  
          m_FileNameColumnIndex = (int)BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID);  
        }  
  
        custProp = column.CustomPropertyCollection["IsBLOB"];  
        if ((bool)custProp.Value == true)  
        {  
          m_BlobColumnIndex = (int)BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID);  
        }  
      }  
    }  
  
    public override void ProcessInput(int inputID, PipelineBuffer buffer)  
    {  
      while (buffer.NextRow())  
      {  
        string strFileName = buffer.GetString(m_FileNameColumnIndex);  
        int blobLength = (int)buffer.GetBlobLength(m_BlobColumnIndex);  
        byte[] blobData = buffer.GetBlobData(m_BlobColumnIndex, 0, blobLength);  
  
        strFileName = TranslateFileName(strFileName);  
  
        // Make sure directory exists before creating file.  
        FileInfo fi = new FileInfo(strFileName);  
        if (!fi.Directory.Exists)  
          fi.Directory.Create();  
  
        // Write the data to the file.  
        FileStream fs = new FileStream(strFileName, FileMode.Create, FileAccess.Write, FileShare.None);  
        fs.Write(blobData, 0, blobLength);  
        fs.Close();  
      }  
    }  
  
    private string TranslateFileName(string fileName)  
    {  
      if (fileName.Length > 2 && fileName[1] == ':')  
        return m_DestDir + fileName.Substring(3, fileName.Length - 3);  
      else  
        return m_DestDir + fileName;  
    }  
  }  
}  
Imports System   
Imports System.IO   
Imports Microsoft.SqlServer.Dts.Pipeline   
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper   
Namespace BlobDst   
  
 <DtsPipelineComponent(DisplayName="BLOB Extractor Destination", Description="Writes values of BLOB columns to files")> _   
 Public Class BlobDst   
 Inherits PipelineComponent   
   Private m_DestDir As String   
   Private m_FileNameColumnIndex As Integer = -1   
   Private m_BlobColumnIndex As Integer = -1   
  
   Public  Overrides Sub ProvideComponentProperties()   
     Dim input As IDTSInput100 = ComponentMetaData.InputCollection.New   
     input.Name = "BLOB Extractor Destination Input"   
     input.HasSideEffects = True   
     Dim conn As IDTSRuntimeConnection100 = ComponentMetaData.RuntimeConnectionCollection.New   
     conn.Name = "FileConnection"   
   End Sub   
  
   Public  Overrides Sub AcquireConnections(ByVal transaction As Object)   
     Dim conn As IDTSRuntimeConnection100 = ComponentMetaData.RuntimeConnectionCollection(0)   
     m_DestDir = CType(conn.ConnectionManager.AcquireConnection(Nothing), String)   
     If m_DestDir.Length > 0 AndAlso Not (m_DestDir(m_DestDir.Length - 1) = "\"C) Then   
       m_DestDir += "\"   
     End If   
   End Sub   
  
   Public  Overrides Function SetUsageType(ByVal inputID As Integer, ByVal virtualInput As IDTSVirtualInput100, ByVal lineageID As Integer, ByVal usageType As DTSUsageType) As IDTSInputColumn100   
     Dim inputColumn As IDTSInputColumn100 = MyBase.SetUsageType(inputID, virtualInput, lineageID, usageType)   
     Dim custProp As IDTSCustomProperty100   
     custProp = inputColumn.CustomPropertyCollection.New   
     custProp.Name = "IsFileName"   
     custProp.Value = CType(False, Object)   
     custProp = inputColumn.CustomPropertyCollection.New   
     custProp.Name = "IsBLOB"   
     custProp.Value = CType(False, Object)   
     Return inputColumn   
   End Function   
  
   Public  Overrides Sub PreExecute()   
     Dim input As IDTSInput100 = ComponentMetaData.InputCollection(0)   
     Dim inputColumns As IDTSInputColumnCollection100 = input.InputColumnCollection   
     Dim custProp As IDTSCustomProperty100   
     For Each column As IDTSInputColumn100 In inputColumns   
       custProp = column.CustomPropertyCollection("IsFileName")   
       If CType(custProp.Value, Boolean) = True Then   
         m_FileNameColumnIndex = CType(BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID), Integer)   
       End If   
       custProp = column.CustomPropertyCollection("IsBLOB")   
       If CType(custProp.Value, Boolean) = True Then   
         m_BlobColumnIndex = CType(BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID), Integer)   
       End If   
     Next   
   End Sub   
  
   Public  Overrides Sub ProcessInput(ByVal inputID As Integer, ByVal buffer As PipelineBuffer)   
     While buffer.NextRow   
       Dim strFileName As String = buffer.GetString(m_FileNameColumnIndex)   
       Dim blobLength As Integer = CType(buffer.GetBlobLength(m_BlobColumnIndex), Integer)   
       Dim blobData As Byte() = buffer.GetBlobData(m_BlobColumnIndex, 0, blobLength)   
       strFileName = TranslateFileName(strFileName)   
       Dim fi As FileInfo = New FileInfo(strFileName)   
       ' Make sure directory exists before creating file.  
       If Not fi.Directory.Exists Then   
         fi.Directory.Create   
       End If   
       ' Write the data to the file.  
       Dim fs As FileStream = New FileStream(strFileName, FileMode.Create, FileAccess.Write, FileShare.None)   
       fs.Write(blobData, 0, blobLength)   
       fs.Close   
     End While   
   End Sub   
  
   Private Function TranslateFileName(ByVal fileName As String) As String   
     If fileName.Length > 2 AndAlso fileName(1) = ":"C Then   
       Return m_DestDir + fileName.Substring(3, fileName.Length - 3)   
     Else   
       Return m_DestDir + fileName   
     End If   
   End Function   
 End Class   
End Namespace  

関連項目

カスタム変換元コンポーネントの開発
スクリプト コンポーネントによる変換先の作成