Поделиться через


REDUCE Expression (U-SQL)

Summary

U-SQL provides the ability to write custom rowset reducers in C# using the user-defined operator extensibility framework by implementing an IReducer. See U-SQL Programmability Guide: User-Defined Reducer for more information.

A reducer, unlike an aggregator that operates on columns and produces a scalar value, provides a way to implement custom grouping and aggregation on a rowset and returning a rowset.

A reducer is being invoked with the REDUCE expression that provide the necessary information about both the input rowset, the grouping columns, the expected result schema as well as additional information that is useful for optimization.

A reducer is invoked once per group and produces zero or one or multiple rows per group in return.

A reducer provides limited optimization support, because an optimizer cannot reason about the procedural C# code defining the reducer. For example, it cannot push predicates through to earlier statements unless the column used in the predicate is marked as read only. Therefore, it is recommended to instead use a user-defined aggregator or use the optional READONLY or REQUIRED clauses.

Syntax

Reduce_Expression :=                                                                                      
    'REDUCE' Input_Rowset   
    ['PRESORT' Identifier_List] 
    ('ALL' | 'ON' Identifier_List) 
    Produce_Clause  
    [Readonly_Clause]  
    [Required_Clause]
    USING_Clause.

Remarks

  • Input_Rowset
    Specifies the input rowset that the reducer will operate on as either the reference to a rowset name or by a nested rowset expression:

Syntax

  Input_Rowset :=                                                                                     
      Rowset | Rowset_Expression.

with the following semantics:

  • Rowset
    The two simplest rowset sources are a rowset variable such as @rowset that has been defined in a previous statement of the script or a table that has been created in the account’s catalog:

Syntax

    Rowset :=                                                                                      
        Rowset_Variable | Identifier.

A table can be referenced either with its fully qualified 3-part name, within the current database context with a 2-part name, or within the current database and schema context with a single-part name.

Syntax

    Rowset_Expression :=                                                                      
        '(' Query_Expression ')'
    |   Function_Call
    |   External_Rowset_Expression.

The UDO programming model makes both the values and the schema of the input rowset available in the context of the reducer's implementation.

  • PRESORT
    The optional PRESORT clause guarantees the rows are ordered by the given identifier.

  • ALL
    The optional ALL indicates that the whole input rowset will become the group to be reduced. Similar to a GROUP BY ALL.

  • ON Identifier_List
    This option specifies the list of columns that define the groups.

Syntax

  Identifier_List :=                                                                                  
      Quoted_or_Unquoted_Identifier                                               
      {',' Quoted_or_Unquoted_Identifier}.

If the columns are not part of the input rowset’s columns or are not comparable, an error is raised.

  • Produce_Clause
    Specifies the rowset schema returned by the REDUCE expression.

Syntax

  Produce_Clause :=                                                                                   
      'PRODUCE' Column_Definition_List.
  • Column_Definition_List
    This list defines the schema of the reducer. The returned columns are defined as a pair of column names and column types:

Syntax

    Column_Definition_List :=                                                                      
        Column_Definition { ',' Column_Definition}.
Column_Definition := Quoted_or_Unquoted_Identifier Built_in_Type.

Each column has an identifier that can be either a quoted or unquoted identifier. A column is typed with one of the U-SQL types that the reducer supports.

The UDO programming model makes the specified rowset schema available to the implementation of the reducer. An error is raised if the reducer is producing a schema that is incompatible with the specified return schema.

  • Readonly_Clause
    The optional READONLY clause can help the UDO programmer to write more efficient code. For more information on how the UDO programmer can take advantage of this hint, see the U-SQL C# Developer’s Guide.

    The optional READONLY clause specifies the columns are read only for the reducer and will be passed through to the output using either the same name or the specified column name in parenthesis. Only columns in the reduce expression’s ON clause can be marked READONLY, otherwise the error “E_CSC_USER_UDOREADONLYNOTKEYCOLUMN: Column '…' cannot be marked as READONLY” is raised.

Syntax

  Readonly_Clause :=                                                                                  
      'READONLY' Star_Or_Readonly_Column_List.
Star_Or_Readonly_Column_List := '*' | Readonly_Column_List.
Readonly_Column_List := Readonly_Column { ',' Readonly_Column }.
Readonly_Column := Column_Identifier [Output_Column_Dependency_Alias].
Output_Column_Dependency_Alias := '(' Quoted_or_Unquoted_Identifier ')'.
  • Required_Clause
    The optional REQUIRED clause can help the UDO programmer to write more efficient code. For more information on how the UDO programmer can take advantage of this hint, see the U-SQL C# Developer’s Guide.

    The optional REQUIRED clause specifies that either all columns are required on input for the reducer (if specified with *) or the specified columns are required. If a specified column is followed by a list of columns in parenthesis, then the input column is only required if the columns in that list are referenced from the output.

Syntax

  Required_Clause :=                                                                                  
      'REQUIRED' Star_Or_Required_Column_List.
Star_Or_Required_Column_List := '*' | Required_Column_List.
Required_Column_List := Required_Column { ',' Required_Column}.
Required_Column := Column_Identifier [Required_Output_Column_Dependency_List].
Required_Output_Column_Dependency_List := '(' Identifier_List ')'.
  • Using_Clause
    The USING clause specifies which reducer should be used to transform the input rowset.

Syntax

  USING_Clause :=                                                                                     
      'USING' udo_expression.

The USING clause takes a C# expression that returns an instance of IReducer. Users can write their own by implementing an IReducer (see U-SQL Programmability Guide: User-Defined Reducer for more detail on how to write your own reducer). Most commonly, the UDO expression is either the instantiation of a reducer class of the form

USING new MyNameSpace.MyReducer(parameter:"value")

or the invocation of a factory method

USING MyNameSpace.MyReducerFactory(parameter:"value")

where parameter is a parameter of the reducer.

Examples

User-Defined Reducer - RangeReducer
Example is a slightly modified version of the example given at How do I combine overlapping ranges using U-SQL? Introducing U-SQL Reducer UDOs and usql/Examples/RangeReducer/RangeReducer/. Please review the reducer article for details.
c# code is placed in the associated Code-Behind .cs file. See usage in next section, below.

using Microsoft.Analytics.Interfaces;
using Microsoft.Analytics.Types.Sql;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace ReduceSample
{
    [SqlUserDefinedReducer(IsRecursive = true)]                                                                        // not sure if it can run recursive yet. Need to test with large data sets.
    public class RangeReducer : IReducer
    {
        public override IEnumerable<IRow> Reduce(IRowset input, IUpdatableRow output)
        {
            // Init aggregation values
            bool first_row_processed = false;
            var begin = DateTime.MaxValue; // Dummy value to make compiler happy
            var end = DateTime.MinValue; // Dummy value to make compiler happy

            // requires that the reducer is PRESORTED on begin and READONLY on the reduce key.
            foreach (var row in input.Rows)
            {
                // Initialize the first interval with the first row if i is 0
                if (!first_row_processed)
                {
                    first_row_processed = true; // mark that we handled the first row
                    begin = row.Get<DateTime>("begin");
                    end = row.Get<DateTime>("end");
                    // If the end is just a time and not a date, it can be earlier than the begin, indicating it is on the next day.
                    // This let's fix up the end to the next day in that case
                    if (end < begin) { end = end.AddDays(1); }
                }
                else
                {
                    var b = row.Get<DateTime>("begin");
                    var e = row.Get<DateTime>("end");
                    // fix up the date if end is earlier than begin
                    if (e < b) { e = e.AddDays(1); }

                    // if the begin is still inside the interval, increase the interval if it is longer
                    if (b <= end)
                    {
                        // if the new end time is later than the current, extend the interval
                        if (e > end) { end = e; }
                    }
                    else // output the previous interval and start a new one
                    {
                        output.Set<DateTime>("begin", begin);
                        output.Set<DateTime>("end", end);
                        yield return output.AsReadOnly();
                        begin = b; end = e;
                    } // if
                } // if
            } // foreach

            // now output the last interval
            output.Set<DateTime>("begin", begin);
            output.Set<DateTime>("end", end);
            yield return output.AsReadOnly();
        } // Reduce

    } // RangeReducer
} // ReduceSample

Using User-Defined Reducer - RangeReducer
Using Code-Behind from previous section, above.

// Dataset
@aLog = 
    SELECT * FROM 
        ( VALUES
        ("ABC", new DateTime(2017,01,01, 05, 00, 00),    new DateTime(2017,01,01, 06, 00, 00)),
        ("XYZ", new DateTime(2017,01,01, 05, 00, 00),    new DateTime(2017,01,01, 06, 00, 00)),
        ("ABC", new DateTime(2017,01,01, 08, 00, 00),    new DateTime(2017,01,01, 09, 00, 00)),
        ("ABC", new DateTime(2017,01,01, 08, 00, 00),    new DateTime(2017,01,01, 10, 00, 00)),
        ("ABC", new DateTime(2017,01,01, 10, 00, 00),    new DateTime(2017,01,01, 14, 00, 00)),
        ("ABC", new DateTime(2017,01,01, 07, 00, 00),    new DateTime(2017,01,01, 11, 00, 00)),
        ("ABC", new DateTime(2017,01,01, 09, 00, 00),    new DateTime(2017,01,01, 11, 00, 00)),
        ("ABC", new DateTime(2017,01,01, 11, 00, 00),    new DateTime(2017,01,01, 11, 30, 00)),
        ("FOO", new DateTime(2017,01,01, 23, 40, 00),    new DateTime(2017,01,01, 23, 59, 00)),
        ("FOO", new DateTime(2017,01,01, 23, 50, 00),    new DateTime(2017,01,02, 00, 40, 00))
        ) AS T(user, begin, end);

// Data as is
@results =
    SELECT user,
           String.Format("{0:t}", begin) AS StartTime,
           String.Format("{0:t}", end) AS EndTime
    FROM @aLog;

OUTPUT @results
TO "/ReferenceGuide/QSE/PrimaryRowsetExpressions/Reducer/RangeReducerA.csv"  
USING Outputters.Csv();


// Using Reducer
@results =
    REDUCE @aLog
    PRESORT begin
    ON user
    PRODUCE user string,
            begin DateTime,
            end  DateTime
    READONLY user
    REQUIRED begin, end
    USING new ReduceSample.RangeReducer();

// Some formatting
@results =
    SELECT user,
           String.Format("{0:t}", begin) AS StartTime,
           String.Format("{0:t}", end) AS EndTime
    FROM @results
    ORDER BY begin OFFSET 0 ROWS;

OUTPUT @results
TO "/ReferenceGuide/QSE/PrimaryRowsetExpressions/Reducer/RangeReducerB.csv"
USING Outputters.Csv();

User-Defined Reducer - SalesReducer
c# code is placed in the associated Code-Behind .cs file. See usage in next section, below.

using Microsoft.Analytics.Interfaces;
using Microsoft.Analytics.Types.Sql;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace ReferenceGuide_Examples
{
    [SqlUserDefinedReducer]
    public class SalesReducer : IReducer
    {
        public override IEnumerable<IRow> Reduce(IRowset input, IUpdatableRow output)
        {
            decimal SalesAmount;
            decimal TaxAmt;

            foreach (IRow row in input.Rows)
            {
                SalesAmount = row.Get<decimal>("SalesAmount");
                TaxAmt = row.Get<decimal>("TaxAmt");

                if (SalesAmount + TaxAmt > 1000m)
                {
                    yield return output.AsReadOnly();
                }
            }
        }
    }
}

Using User-Defined Reducer - SalesReducer
Using Code-Behind from previous section, above.

// Dataset
@sales = 
    SELECT * FROM 
        ( VALUES
        ("SO43659", 349, 5, 2024.99m, 162.00m),
        ("SO43660", 326, 5, 419.46m, 33.56m),
        ("SO43661", 300, 6, 809.76m, 64.78m),
        ("SO43662", 330, 6, 1258.38m, 100.67m),
        ("SO43663", 322, 4, 419.46m, 33.56m),
        ("SO43664", 345, 1, 2039.99m, 163.20m),
        ("SO43665", 220, 1, 40.37m, 3.23m),
        ("SO43666", 330, 4, 419.46m, 33.56m),
        ("SO43667", 219, 3, 17.10m, 1.37m),
        ("SO43668", 317, 6, 2624.38m, 209.95m)
        ) AS T(SalesOrderNumber, ProductKey, SalesTerritoryKey, SalesAmount, TaxAmt);

// Using reducer
@reducer =
    REDUCE @sales 
    ON SalesOrderNumber, ProductKey, SalesTerritoryKey, SalesAmount, TaxAmt
    PRODUCE SalesOrderNumber, 
            ProductKey, 
            SalesTerritoryKey,
            SalesAmount,
            TaxAmt
    READONLY * 
    REQUIRED SalesAmount, TaxAmt
    USING new ReferenceGuide_Examples.SalesReducer();

OUTPUT @reducer
TO "/ReferenceGuide/QSE/PrimaryRowsetExpressions/Reducer/SalesReducerC.csv"
USING Outputters.Csv(outputHeader: true);

// NOT using reducer
@result = 
    SELECT *
    FROM @sales 
    WHERE SalesAmount + TaxAmt > 1000m;

OUTPUT @result
TO "/ReferenceGuide/QSE/PrimaryRowsetExpressions/Reducer/SalesReducerD.csv"
USING Outputters.Csv(outputHeader: true);

Reducer with ORDER BY and FETCH
The ORDER BY clause with FETCH allows the selection of a limited number of rows based on the specified order. This examples continues to use SalesReducer defined earlier.

// Same as previous example but only returns top 3 records ordered by SalesAmount
@reducer =
    REDUCE @sales
    PRESORT SalesOrderNumber
    ON SalesOrderNumber, ProductKey, SalesTerritoryKey, SalesAmount, TaxAmt
    PRODUCE SalesOrderNumber, 
            ProductKey, 
            SalesTerritoryKey,
            SalesAmount,
            TaxAmt
    READONLY * 
    REQUIRED SalesAmount, TaxAmt
    USING new ReferenceGuide_Examples.SalesReducer()
    ORDER BY SalesAmount DESC FETCH 3 ROWS;

OUTPUT @reducer
TO "/ReferenceGuide/QSE/PrimaryRowsetExpressions/Reducer/SalesReducer_fetch3.txt"
USING Outputters.Tsv(outputHeader: true);

See Also