How to deal with files containing rows with different column counts in U-SQL: Introducing a Flexible Schema Extractor
If your input data is not rectangular in nature, but contains rows with different column counts, the built-in U-SQL extractors will not be able to parse the data. Instead, you should write a custom extractor.
In this blog post we will take a look at a flexible schema extractor that is able to read a row-oriented file that has different column counts by filling missing columns with null values. If you don't know the full schema for all rows a priori, it also allows you to fill a SqlMap column with the data that does not fit into the provided columns.
Example Scenario
Let's introduce a simple scenario with some sample data. Assume that the file /Samples/Blogs/MRys/FlexExtractor/OrderData.csv
contains the order of different products where only the first 4 columns are fixed as order ID, product type, ordered amount and per item price. All remaining columns both in their number and semantics depend upon the product type:
1,Shoes,2,99.99,10
1,Pants,3,59,34,32
2,Camera,1,999.00,Canon,70D
2,Lens,1,999.99,Canon,100mm.Macro,1.4
2,Lens,1,459.99,Sigma,28-85mm,Macro/Zoom,2.8
3,Camera,1,745,Sony,RX-100-II
3,Shoes,1,69.99,13
For example, the Camera product type has the camera make and model as additional columns, whereas the Lens product type provides additional columns providing the make, the focal length, lens type and the aperture setting.
Writing scripts to extract data
Let's write one script that does schematize all possible columns and then produces one file for each of the product types. A naive approach may create an extractor per product type (or a generic extractor that takes the product type as an argument), but that would mean that the file has to be read once for every product type. Obviously, you would want to avoid such costly reads if there is a lot of data with many product types. Instead, we will parse the file once and produce a single rowset that then will be filtered in subsequent SELECT statements.
Since the additional flexible columns are different across different product types, or not existing for some product types, the extract statement will handle the flexible columns in a generic way (as strings) and the "typing" SELECT statements will give the values their type specific semantics.
With this information, we can generate the following script:
DECLARE @input string = "/Samples/Blogs/MRys/FlexExtractor/OrderData.csv";
// Usage with all columns known
@data = EXTRACT orderid int, producttype string, orderamount int, itemprice decimal,
c1 string, c2 string, c3 string, c4 string
FROM @input
USING new FlexibleSchemaExtractor.FlexExtractor();
// product type Camera
@cameras =
SELECT orderid,
orderamount,
itemprice,
c1 AS make,
c2 AS model
FROM @data
WHERE producttype == "Camera";
OUTPUT @cameras
TO "/output/cameras.csv"
USING Outputters.Csv();
// product type Lens
@lenses =
SELECT orderid,
orderamount,
itemprice,
c1 AS make,
c2 AS focallength,
c3 AS lenstype,
c4 == null ? (decimal?) null : Decimal.Parse(c4) AS aperture
FROM @data
WHERE producttype == "Lens";
OUTPUT @lenses
TO "/output/lenses.csv"
USING Outputters.Csv();
Note that the extract knows how to schematize the common columns, and then uses a generic naming and typing scheme for the rest, providing as many additional columns as the most complex product type provides.
Writing the Flexible Schema Extractor UDO
Now we need to implement the extractor:
using Microsoft.Analytics.Interfaces;
using Microsoft.Analytics.Types.Sql;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.IO;
namespace FlexibleSchemaExtractor
{
public class FlexExtractor : IExtractor
{
private Encoding _encoding;
private byte[] _row_delim;
private string _col_delim;
public FlexExtractor(Encoding encoding = null, string row_delim = "\r\n", string col_delim = ",")
{
this._encoding = ((encoding == null) ? Encoding.UTF8 : encoding);
this._row_delim = this._encoding.GetBytes(row_delim);
this._col_delim = col_delim;
}
public override IEnumerable<IRow> Extract(IUnstructuredReader input, IUpdatableRow output)
{
var colsInSchema = output.Schema.Count;
// let's check global assumptions
// - first 4 provided columns are int, string, int, decimal.
if ( output.Schema[0].Type != typeof(System.Int32)
|| output.Schema[1].Type != typeof(System.String)
|| output.Schema[2].Type != typeof(System.Int32)
|| output.Schema[3].Type != typeof(System.Decimal)
)
{
throw new Exception("First 4 columns are not of expected types int32, string, int32, decimal.");
}
foreach (Stream currentline in input.Split(this._row_delim))
{
using (StreamReader lineReader = new StreamReader(currentline, this._encoding))
{
string[] columns = lineReader.ReadToEnd().Split( new string[] { this._col_delim }
, StringSplitOptions.None);
var colsInData = columns.Length;
// let's check row level assumptions
// - if less columns are specified, then last column needs to be of type SqlMap<Int32, string>
if ( colsInData > colsInSchema
&& output.Schema[colsInSchema - 1].Type != typeof(SqlMap<Int32, string>))
{
throw new Exception(
"Too many columns detected and last column is not of type SqlMap<Int32,string>. "
+ "Add a final column of type SqlMap<Int32,string&ht; into your extract schema.");
}
// Set first 4 fixed columns
output.Set<Int32>(0, Int32.Parse(columns[0]));
output.Set<String>(1, columns[1]);
output.Set<Int32>(2, Int32.Parse(columns[2]));
output.Set<Decimal>(3, Decimal.Parse(columns[3]));
// Fill all remaining columns except the last which may be a map
for (int i = 4; i < Math.Min(colsInData, colsInSchema) - 1; i++)
{
output.Set<String>(i, columns[i]);
}
// Now handle last column: if it is a map
if ( colsInData >= colsInSchema
&& output.Schema[colsInSchema - 1].Type == typeof(SqlMap<Int32,string>))
{
var sqlmap = new Dictionary<Int32,string>();
for (int j = colsInSchema - 1; j < colsInData; j++)
{
sqlmap.Add(j - colsInSchema + 1, columns[j]);
}
output.Set<SqlMap<Int32,string>>(colsInSchema - 1, new SqlMap<Int32,string>(sqlmap));
}
// Now handle last column: if it is not a map
else if (colsInData == Math.Min(colsInData, colsInSchema))
{
output.Set<string>(colsInData - 1, columns[colsInData - 1]);
}
yield return output.AsReadOnly();
}
}
}
}
}
As you can see, the extractor code is fairly simple. The only noteworthy special processing handles the last column that may be a SqlMap<Int32,string>
if we do not know all the columns ahead of the query.
Extracting Flexible columns with a SqlMap
The extractor allows to use a final column of type SqlMap<Int32,string>
in the following way (@input
is defined as above):
@data = EXTRACT orderid int, producttype string, orderamount int, itemprice decimal,
c1 string, map SqlMap<Int32,string>
FROM @input
USING new FlexibleSchemaExtractor.FlexExtractor();
// product type Camera
@cameras =
SELECT orderid,
orderamount,
itemprice,
c1 AS make,
map[0] AS model
FROM @data
WHERE producttype == "Camera";
OUTPUT @cameras
TO "/output/cameras2.csv"
USING Outputters.Csv();
// product type Lens
@lenses =
SELECT orderid,
orderamount,
itemprice,
c1 AS make,
map[0] AS focallength,
map[1] AS lenstype,
map[2] == null ? (decimal?) null : Decimal.Parse(map[2]) AS aperture
FROM @data
WHERE producttype == "Lens";
OUTPUT @lenses
TO "/output/lenses2.csv"
USING Outputters.Csv();
// remaining product types (serialize map generically)
@others =
SELECT orderid,
producttype,
orderamount,
itemprice,
c1,
map == null ? (string) null
: string.Join(" ; ",
from p in map
select string.Format("{0}{1}{2}", p.Key, " : ", p.Value)) AS map
FROM @data
WHERE producttype NOT IN ("Camera", "Lens");
OUTPUT @others
TO "/users/herring/others.csv"
USING Outputters.Csv();
Note that the @others
expression is serializing the map with a C# expression so we can use the built-in outputter.
Conclusion
In this post I showed how easy it is to handle more complex file formats with flexible schema formats. You can find the sample data and the sample project on the Azure U-SQL Github repository.
Please leave your feedback and questions in the comments below.
Comments
- Anonymous
August 15, 2016
i had this problem on [link removed] . but you solve it now - Anonymous
August 15, 2016
Great post :-) - Anonymous
August 17, 2016
Thanks for sharing this. It will come in very handy. - Anonymous
August 19, 2016
Hi Mike,Great post. If I have a Json string in a structured table, when I am querying the data from table, can I cast that string to sqlmap using function or something else? It's the same scenario you mentioned in the article. But i want to do that while querying than whike extracting from files?- Anonymous
August 21, 2016
We have several blog posts in the pipeline talking about extracting and processing JSON documents. Please stay tuned!
- Anonymous
- Anonymous
December 24, 2016
Really cool. I will implement a custom extractor the next time I have files with varying columns. Thanks.