Freigeben über


How do I combine overlapping ranges using U-SQL? Introducing U-SQL Reducer UDOs

The problem statement

A few weeks ago, a customer on stackoverflow asked for a solution for the following problem:

Given a log file that contains time ranges in the form (begin, end) for a user-name, I would like to merge overlapping ranges for each user.

At the time I promised to provide a solution using a custom reducer, but got busy with a lot of other work items and some vacation. But now I will fulfill the promise.

First let's look at the sample data provided in the stackoverflow post with some augmentation:

 Start Time - End Time - User Name
5:00 AM - 6:00 AM - ABC
5:00 AM - 6:00 AM - XYZ
8:00 AM - 9:00 AM - ABC
8:00 AM - 10:00 AM - ABC
10:00 AM - 2:00 PM - ABC
7:00 AM - 11:00 AM - ABC
9:00 AM - 11:00 AM - ABC
11:00 AM - 11:30 AM - ABC
11:40 PM - 11:59 PM - FOO
11:50 PM - 0:40 AM - FOO

After combining the ranges, the expected result should look similar to:

 Start Time - End Time - User Name
5:00 AM - 6:00 AM - ABC
5:00 AM - 6:00 AM - XYZ
7:00 AM - 2:00 PM - ABC
11:40 PM - 0:40 AM - FOO

For the purpose of this post, I placed a sample input file /Samples/Blogs/MRys/Ranges/ranges.txt onto our U-SQL github repository.

Analysis of the problem and possible solution

If you look at the problem, you will at first notice that you want to define something like a user-defined aggregation to combine the overlapping time intervals. However, if you look at the input data, you will notice that since the data is not ordered, you will either have to maintain the state for all possible intervals and then merge disjoint intervals as bridging intervals appear, or you need to preorder the intervals for each user name to make the merging of the intervals easier.

The ordered aggregation is simpler to scale out, but U-SQL does not provide ordered user-defined aggregators (UDAGGs) yet. In addition, UDAGGs normally produce one row per group, while in this case, I may have multiple rows per group if the ranges are disjoint.

Luckily, U-SQL provides a scalable user-defined operator called a reducer which gives us the ability to aggregate a set of rows based on a grouping key set using custom code.

So my first script outline would look something like the following code where ReduceSample.RangeReducer is my user-defined reducer (Reducer UDO):

 @in = EXTRACT begin DateTime, end DateTime, user string
FROM "/Samples/Blogs/MRys/Ranges/ranges.txt"
USING Extractors.Text(delimiter:'-');

@r = REDUCE @in ON user
PRODUCE begin DateTime, end DateTime, user string
USING new ReduceSample.RangeReducer();

OUTPUT @r
TO "/temp/result.csv"
USING Outputters.Csv();

So now the questions are: how can I sort the rows in each group to perform the range reduction and how does my reducer look like.

First attempt at writing the range reducer

The way to write a reducer is to implement an instance of Microsoft.Analytics.Interfaces.IReducer. In this case, since we do not need to provide any parameters, we only need to overwrite the abstract Reduce method as follows:

 using Microsoft.Analytics.Interfaces;
using Microsoft.Analytics.Types.Sql;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace ReduceSample
{
    public class RangeReducer : IReducer
    {
        public override IEnumerable<IRow> Reduce(IRowset input, IUpdatableRow output)<br>        {<br>         // Insert your code to reduce<br>        } // Reduce
    } // RangeReducer
} // ReduceSample

The U-SQL REDUCE expression will apply the Reduce method once for each group in parallel. The input parameter thus will only contain the rows for a given group and the implementation can return zero to N rows as output.

Therefore, the implementation of the method can concentrate on just doing the reduction on each group. So how can we get each group sorted? The first thought is, to just take the input rowset and sort it and then iterate over it:

 public override IEnumerable<IRow> Reduce(IRowset input, IUpdatableRow output)
{
  foreach (var row in input.Rows.OrderBy(x => x.Get<DateTime>("begin")))<br>  {<br>   // perform the range merging<br>  } // foreach
} // Reduce

However, this approach will run into the following error message:

 E_RUNTIME_USER_UNHANDLED_EXCEPTION_FROM_USER_CODE
An unhandled exception from user code has been reported.
Unhandled exception from user code: "Access an expired row using Get method, column: begin, index: 0. Please clone row when iterating row set"

because the UDO's IRowset is implemented as a forward-only streaming API, that does not support accessing a row again after accessing the next row and the LINQ's OrderBy will need to see the values of at least two rows to be able to sort them.

The final version of the range reducer

Fortunately, U-SQL's REDUCE expression provides an option to pre-sort the rows before they are being passed to the Reduce method using the PRESORT clause (which just recently got released and at the time of the writing of this blog has not been documented in the reference documentation yet). When adding the PRESORT clause, the rows in the input rowset are now guaranteed to be ordered.

So now we can focus on implementing the actual range merge logic (refer to the comments in the code for the explanation of the code):

 public override IEnumerable<IRow> Reduce(IRowset input, IUpdatableRow output)
{
  // Init aggregation values
  int i = 0;
  var begin = DateTime.MaxValue; // Dummy value to make compiler happy
  var end = DateTime.MinValue; // Dummy value to make compiler happy

  // requires that the reducer is PRESORTED on begin and READONLY on the reduce key.
  foreach (var row in input.Rows)
  {
    // Initialize the first interval with the first row if i is 0
    if (i == 0)
    {
      i++; // mark that we handled the first row
      begin = row.Get<DateTime>("begin");
      end = row.Get<DateTime>("end");
      // If the end is just a time and not a date, it can be earlier than the begin, indicating it is on the next day.
      // This let's fix up the end to the next day in that case
      if (end < begin) { end = end.AddDays(1); }
    }
    else // handle the remaining rows
    {
      var b = row.Get<DateTime>("begin");
      var e = row.Get<DateTime>("end");
      // fix up the date if end is earlier than begin
      if (e < b) { e = e.AddDays(1); }

      // if the begin is still inside the interval, increase the interval if it is longer
      if (b <= end)
      {
        // if the new end time is later than the current, extend the interval
        if (e > end) { end = e; }
      }
      else // output the previous interval and start a new one
      {
        output.Set<DateTime>("begin", begin);
        output.Set<DateTime>("end", end);
        yield return output.AsReadOnly();
        begin = b; end = e;
      } // if
    } // if
  } // foreach

  // now output the last interval
  output.Set<DateTime>("begin", begin);
  output.Set<DateTime>("end", end);
  yield return output.AsReadOnly();
} // Reduce

You may notice that the code is not doing anything with the grouping key. One of the features of UDO model is, that if you specify in the U-SQL UDO expression that a column is READONLY, the column will be passed through automatically and you can write your UDO code more generic by focusing just on the columns you want to transform.

Now that we have written the reducer, we have to go back to our U-SQL script and add the PRESORT and READONLY clauses:

 @in = EXTRACT begin DateTime, end DateTime, user string
FROM "/Samples/Blogs/MRys/Ranges/ranges.txt"
USING Extractors.Text(delimiter:'-');

@r = REDUCE @in PRESORT begin ON user
PRODUCE begin DateTime, end DateTime, user string
READONLY user
USING new ReduceSample.RangeReducer();

OUTPUT @r
TO "/temp/result.csv"
USING Outputters.Csv();

Final note: Recursive vs non-recursive reducers

If you now apply the above reducer on a large set of data (the customer mentioned that his CSV files were in the GB range), and some of your grouping keys may be much more frequently appearing than others, you will encounter something that is normally called data skew. Such data skew in the best case can lead to some reducers taking much longer than others, and in the worst case can lead to some reducers running out of the available memory and time resources (a vertex will time-out after running for about 5 hours). If the reducer semantics is associative and commutative and its output schema is the same as its input schema, then a reducer can be marked as recursive which allows the query engine to split large groups into smaller sub-groups and recursively apply the reducer on these subgroups to calculate the final result. A reducer is marked as recursive by using the following property annotation:

 namespace ReduceSample
{
     [SqlUserDefinedReducer(IsRecursive = true)] 
    public class RangeReducer : IReducer
    {
        public override IEnumerable<IRow> Reduce(IRowset input, IUpdatableRow output)
        {
         // Insert your code to reduce
        } // Reduce
    } // RangeReducer
} // ReduceSample

In our case, assuming the processing will preserve the sort among the rows in each recursive invocation, can be marked as recursive to improve scalability and performance.

You can find a VisualStudio project of the example on our github repository.