在数据流组件中使用错误输出
适用范围:SQL Server Azure 数据工厂中的 SSIS Integration Runtime
称为错误输出的特殊的 IDTSOutput100 对象可添加到组件中,以使组件重定向无法在执行期间处理的行。 组件可能遇到的问题通常分为错误或截断,这些问题特定于每个组件。 提供错误输出的组件为组件用户处理错误条件提供了灵活性,既可以筛选出结果集中的错误行,也可以在出现问题时中止组件运行,还可以忽略错误并继续。
若要实现和支持组件中的错误输出,首先必须将组件的 UsesDispositions 属性设置为 true。 然后必须向其 IsErrorOut 属性已设置为 true 的组件添加一个输出。 最后,该组件必须包含在出现错误或截断时将行重定位到错误输出的代码。 本主题将介绍这三个步骤,并说明同步和异步错误输出之间的差异。
创建错误输出
创建错误输出的方法为调用 OutputCollection 的 New 方法,然后将新输出的 IsErrorOut 属性设置为 true。 如果是异步输出,则无需再对该输出执行任何操作。 如果是同步输出,并且还存在与同一输入同步的另一输出,则还必须设置 ExclusionGroup 和 SynchronousInputID 属性。 这两个属性的值应该和与同一输入同步的另一输出的值相同。 如果这些属性未设置为非零值,则输入提供的行将发送到与该输入同步的两个输出。
组件在执行期间遇到错误或截断时,将基于出现错误的输入或输出或者输入列或输出列的 ErrorRowDisposition 和 TruncationRowDisposition 属性继续执行。 这些属性的值在默认情况下应设置为 RD_NotUsed。 组件的错误输出连接到下游组件时,此属性由组件用户设置,并允许用户控制组件处理错误或截断的方式。
填充错误列
创建错误输出时,数据流任务会自动向输出列集合添加两列。 组件将使用这些列指定导致错误或截断的列的 ID,并提供特定于组件的错误代码。 这些列是自动生成的,但是包含在列中的值必须由组件设置。
设置这些列的值所用的方法取决于该错误输出是同步还是异步的。 具有同步输出的组件调用 DirectErrorRow 方法(在下一部分有更加详细的讨论),并以参数的形式提供错误代码和错误列的值。 具有异步输出的组件在设置这些列的值时有两种选择。 可以调用输出缓冲区的 SetErrorInfo 方法,并提供值,或使用 FindColumnByLineageID 在缓冲区中查找错误列,并直接设置这些列的值。 但是,因为这些列的名称可能已经更改,或它们在输出列集合中的位置可能已被修改,所以后一种方法可能不可靠。 SetErrorInfo 方法将自动在这些错误列中设置值,无需手动查找它们。
如果需要获取与特定错误代码对应的错误说明,可以使用 GetErrorDescription 接口的 IDTSComponentMetaData100 方法,该接口可通过组件的 ComponentMetaData 属性来访问。
下面的代码示例演示具有一个输入和两个输出(包括一个错误输出)的组件。 第一个示例演示如何创建与输入同步的错误输出。 第二个示例演示如何创建异步错误输出。
public override void ProvideComponentProperties()
{
// Specify that the component has an error output.
ComponentMetaData.UsesDispositions = true;
// Create the input.
IDTSInput100 input = ComponentMetaData.InputCollection.New();
input.Name = "Input";
input.ErrorRowDisposition = DTSRowDisposition.RD_NotUsed;
input.ErrorOrTruncationOperation = "A string describing the possible error or truncation that may occur during execution.";
// Create the default output.
IDTSOutput100 output = ComponentMetaData.OutputCollection.New();
output.Name = "Output";
output.SynchronousInputID = input.ID;
output.ExclusionGroup = 1;
// Create the error output.
IDTSOutput100 errorOutput = ComponentMetaData.OutputCollection.New();
errorOutput.IsErrorOut = true;
errorOutput.Name = "ErrorOutput";
errorOutput.SynchronousInputID = input.ID;
errorOutput.ExclusionGroup = 1;
}
Public Overrides Sub ProvideComponentProperties()
' Specify that the component has an error output.
ComponentMetaData.UsesDispositions = True
Dim input As IDTSInput100 = ComponentMetaData.InputCollection.New
' Create the input.
input.Name = "Input"
input.ErrorRowDisposition = DTSRowDisposition.RD_NotUsed
input.ErrorOrTruncationOperation = "A string describing the possible error or truncation that may occur during execution."
' Create the default output.
Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection.New
output.Name = "Output"
output.SynchronousInputID = input.ID
output.ExclusionGroup = 1
' Create the error output.
Dim errorOutput As IDTSOutput100 = ComponentMetaData.OutputCollection.New
errorOutput.IsErrorOut = True
errorOutput.Name = "ErrorOutput"
errorOutput.SynchronousInputID = input.ID
errorOutput.ExclusionGroup = 1
End Sub
下面的代码示例创建一个异步错误输出。
public override void ProvideComponentProperties()
{
// Specify that the component has an error output.
ComponentMetaData.UsesDispositions = true;
// Create the input.
IDTSInput100 input = ComponentMetaData.InputCollection.New();
input.Name = "Input";
input.ErrorRowDisposition = DTSRowDisposition.RD_NotUsed;
input.ErrorOrTruncationOperation = "A string describing the possible error or truncation that may occur during execution.";
// Create the default output.
IDTSOutput100 output = ComponentMetaData.OutputCollection.New();
output.Name = "Output";
// Create the error output.
IDTSOutput100 errorOutput = ComponentMetaData.OutputCollection.New();
errorOutput.Name = "ErrorOutput";
errorOutput.IsErrorOut = true;
}
Public Overrides Sub ProvideComponentProperties()
' Specify that the component has an error output.
ComponentMetaData.UsesDispositions = True
' Create the input.
Dim input As IDTSInput100 = ComponentMetaData.InputCollection.New
' Create the default output.
input.Name = "Input"
input.ErrorRowDisposition = DTSRowDisposition.RD_NotUsed
input.ErrorOrTruncationOperation = "A string describing the possible error or truncation that may occur during execution."
' Create the error output.
Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection.New
output.Name = "Output"
Dim errorOutput As IDTSOutput100 = ComponentMetaData.OutputCollection.New
errorOutput.Name = "ErrorOutput"
errorOutput.IsErrorOut = True
End Sub
将行重定向到错误输出
向组件添加错误输出后,必须提供代码,用于处理特定于组件的错误或截断条件,并将错误或截断行重定向到错误输出。 您可以采用两种方法执行此操作,具体方法取决于该错误输出是同步还是异步的。
重定向具有同步输出的行
行是通过调用 DirectErrorRow 类的 PipelineBuffer 方法发送到同步输出的。 该方法调用包括的参数有错误输出的 ID、组件定义的错误代码以及组件无法处理的列的索引。
下面的代码示例演示如何使用 DirectErrorRow 方法将缓冲区中的行定向到同步错误输出。
public override void ProcessInput(int inputID, PipelineBuffer buffer)
{
IDTSInput100 input = ComponentMetaData.InputCollection.GetObjectByID(inputID);
// This code sample assumes the component has two outputs, one the default,
// the other the error output. If the errorOutputIndex returned from GetErrorOutputInfo
// is 0, then the default output is the second output in the collection.
int defaultOutputID = -1;
int errorOutputID = -1;
int errorOutputIndex = -1;
GetErrorOutputInfo(ref errorOutputID,ref errorOutputIndex);
if (errorOutputIndex == 0)
defaultOutputID = ComponentMetaData.OutputCollection[1].ID;
else
defaultOutputID = ComponentMetaData.OutputCollection[0].ID;
while (buffer.NextRow())
{
try
{
// TODO: Implement code to process the columns in the buffer row.
// Ideally, your code should detect potential exceptions before they occur, rather
// than having a generic try/catch block such as this.
// However, because the error or truncation implementation is specific to each component,
// this sample focuses on actually directing the row, and not a single error or truncation.
// Unless an exception occurs, direct the row to the default
buffer.DirectRow(defaultOutputID);
}
catch
{
// Yes, has the user specified to redirect the row?
if (input.ErrorRowDisposition == DTSRowDisposition.RD_RedirectRow)
{
// Yes, direct the row to the error output.
// TODO: Add code to include the errorColumnIndex.
buffer.DirectErrorRow(errorOutputID, 0, errorColumnIndex);
}
else if (input.ErrorRowDisposition == DTSRowDisposition.RD_FailComponent || input.ErrorRowDisposition == DTSRowDisposition.RD_NotUsed)
{
// No, the user specified to fail the component, or the error row disposition was not set.
throw new Exception("An error occurred, and the DTSRowDisposition is either not set, or is set to fail component.");
}
else
{
// No, the user specified to ignore the failure so
// direct the row to the default output.
buffer.DirectRow(defaultOutputID);
}
}
}
}
Public Overrides Sub ProcessInput(ByVal inputID As Integer, ByVal buffer As PipelineBuffer)
Dim input As IDTSInput100 = ComponentMetaData.InputCollection.GetObjectByID(inputID)
' This code sample assumes the component has two outputs, one the default,
' the other the error output. If the errorOutputIndex returned from GetErrorOutputInfo
' is 0, then the default output is the second output in the collection.
Dim defaultOutputID As Integer = -1
Dim errorOutputID As Integer = -1
Dim errorOutputIndex As Integer = -1
GetErrorOutputInfo(errorOutputID, errorOutputIndex)
If errorOutputIndex = 0 Then
defaultOutputID = ComponentMetaData.OutputCollection(1).ID
Else
defaultOutputID = ComponentMetaData.OutputCollection(0).ID
End If
While buffer.NextRow
Try
' TODO: Implement code to process the columns in the buffer row.
' Ideally, your code should detect potential exceptions before they occur, rather
' than having a generic try/catch block such as this.
' However, because the error or truncation implementation is specific to each component,
' this sample focuses on actually directing the row, and not a single error or truncation.
' Unless an exception occurs, direct the row to the default
buffer.DirectRow(defaultOutputID)
Catch
' Yes, has the user specified to redirect the row?
If input.ErrorRowDisposition = DTSRowDisposition.RD_RedirectRow Then
' Yes, direct the row to the error output.
' TODO: Add code to include the errorColumnIndex.
buffer.DirectErrorRow(errorOutputID, 0, errorColumnIndex)
Else
If input.ErrorRowDisposition = DTSRowDisposition.RD_FailComponent OrElse input.ErrorRowDisposition = DTSRowDisposition.RD_NotUsed Then
' No, the user specified to fail the component, or the error row disposition was not set.
Throw New Exception("An error occurred, and the DTSRowDisposition is either not set, or is set to fail component.")
Else
' No, the user specified to ignore the failure so
' direct the row to the default output.
buffer.DirectRow(defaultOutputID)
End If
End If
End Try
End While
End Sub
重定向具有异步输出的行
与对同步错误输出那样将行定向到输出不同,具有异步输出的组件通过向输出 PipelineBuffer 显式添加行来将行发送到错误输出。 实现使用异步错误输出的组件需要在 PrimeOutput 方法中将列添加到为下游组件提供的错误输出,并缓存为组件提供的错误输出的输出缓冲区。 开发具有异步输出的自定义转换组件主题中详细介绍了实现具有异步输出的组件的详细信息。 如果列未显式添加到错误输出,则添加到输出缓冲区的缓冲区行只包含两个错误列。
若要将行发送到异步错误输出,必须将行添加到错误输出缓冲区。 有时,行可能已经添加到非错误输出缓冲区,所以您必须使用 RemoveRow 方法删除此行。 然后设置输出缓冲区列的值,最后调用 SetErrorInfo 方法以提供特定于组件的错误代码和错误列的值。
下面的示例演示如何对具有异步输出的组件使用错误输出。 出现模拟错误时,组件调用 SetErrorInfo 方法执行以下操作:将行添加到错误输出缓冲区、将之前已添加到非错误输出缓冲区的值复制到错误输出缓冲区、删除已添加到非错误输出缓冲区的行,最后设置错误代码和错误列的值。
int []columnIndex;
int errorOutputID = -1;
int errorOutputIndex = -1;
public override void PreExecute()
{
IDTSOutput100 defaultOutput = null;
this.GetErrorOutputInfo(ref errorOutputID, ref errorOutputIndex);
foreach (IDTSOutput100 output in ComponentMetaData.OutputCollection)
{
if (output.ID != errorOutputID)
defaultOutput = output;
}
columnIndex = new int[defaultOutput.OutputColumnCollection.Count];
for(int col =0 ; col < defaultOutput.OutputColumnCollection.Count; col++)
{
IDTSOutputColumn100 column = defaultOutput.OutputColumnCollection[col];
columnIndex[col] = BufferManager.FindColumnByLineageID(defaultOutput.Buffer, column.LineageID);
}
}
public override void PrimeOutput(int outputs, int[] outputIDs, PipelineBuffer[] buffers)
{
for( int x=0; x < outputs; x++ )
{
if (outputIDs[x] == errorOutputID)
this.errorBuffer = buffers[x];
else
this.defaultBuffer = buffers[x];
}
int rows = 100;
Random random = new Random(System.DateTime.Now.Millisecond);
for (int row = 0; row < rows; row++)
{
try
{
defaultBuffer.AddRow();
for (int x = 0; x < columnIndex.Length; x++)
defaultBuffer[columnIndex[x]] = random.Next();
// Simulate an error.
if ((row % 2) == 0)
throw new Exception("A simulated error.");
}
catch
{
// Add a row to the error buffer.
errorBuffer.AddRow();
// Get the values from the default buffer
// and copy them to the error buffer.
for (int x = 0; x < columnIndex.Length; x++)
errorBuffer[columnIndex[x]] = defaultBuffer[columnIndex[x]];
// Set the error information.
errorBuffer.SetErrorInfo(errorOutputID, 1, 0);
// Remove the row that was added to the default buffer.
defaultBuffer.RemoveRow();
}
}
if (defaultBuffer != null)
defaultBuffer.SetEndOfRowset();
if (errorBuffer != null)
errorBuffer.SetEndOfRowset();
}
Private columnIndex As Integer()
Private errorOutputID As Integer = -1
Private errorOutputIndex As Integer = -1
Public Overrides Sub PreExecute()
Dim defaultOutput As IDTSOutput100 = Nothing
Me.GetErrorOutputInfo(errorOutputID, errorOutputIndex)
For Each output As IDTSOutput100 In ComponentMetaData.OutputCollection
If Not (output.ID = errorOutputID) Then
defaultOutput = output
End If
Next
columnIndex = New Integer(defaultOutput.OutputColumnCollection.Count) {}
Dim col As Integer = 0
While col < defaultOutput.OutputColumnCollection.Count
Dim column As IDTSOutputColumn100 = defaultOutput.OutputColumnCollection(col)
columnIndex(col) = BufferManager.FindColumnByLineageID(defaultOutput.Buffer, column.LineageID)
System.Math.Min(System.Threading.Interlocked.Increment(col),col-1)
End While
End Sub
Public Overrides Sub PrimeOutput(ByVal outputs As Integer, ByVal outputIDs As Integer(), ByVal buffers As PipelineBuffer())
Dim x As Integer = 0
While x < outputs
If outputIDs(x) = errorOutputID Then
Me.errorBuffer = buffers(x)
Else
Me.defaultBuffer = buffers(x)
End If
System.Math.Min(System.Threading.Interlocked.Increment(x),x-1)
End While
Dim rows As Integer = 100
Dim random As Random = New Random(System.DateTime.Now.Millisecond)
Dim row As Integer = 0
While row < rows
Try
defaultBuffer.AddRow
Dim x As Integer = 0
While x < columnIndex.Length
defaultBuffer(columnIndex(x)) = random.Next
System.Math.Min(System.Threading.Interlocked.Increment(x),x-1)
End While
' Simulate an error.
If (row Mod 2) = 0 Then
Throw New Exception("A simulated error.")
End If
Catch
' Add a row to the error buffer.
errorBuffer.AddRow
' Get the values from the default buffer
' and copy them to the error buffer.
Dim x As Integer = 0
While x < columnIndex.Length
errorBuffer(columnIndex(x)) = defaultBuffer(columnIndex(x))
System.Math.Min(System.Threading.Interlocked.Increment(x),x-1)
End While
' Set the error information.
errorBuffer.SetErrorInfo(errorOutputID, 1, 0)
' Remove the row that was added to the default buffer.
defaultBuffer.RemoveRow
End Try
System.Math.Min(System.Threading.Interlocked.Increment(row),row-1)
End While
If Not (defaultBuffer Is Nothing) Then
defaultBuffer.SetEndOfRowset
End If
If Not (errorBuffer Is Nothing) Then
errorBuffer.SetEndOfRowset
End If
End Sub