Udostępnij za pośrednictwem


An Introduction to U-SQL in Azure Data Lake - Part 2

There has been a certain amount of speculation on what the U in U-SQL stands for. Some have suggested it is because U comes after T in the alphabet. This would suggest U-SQL is a successor to T-SQL which is not the case, you will not be writing your SQL Server stored procedures in U-SQL any time soon, if ever. Some have suggested U stands for ubiquitous (“present, appearing, or found everywhere“ - Oxford Dictionaries) which for the moment at least is not the case. The best explanation for the U is that it stands for Unterseeboot, more commonly abbreviated to U-boat. A U-Boat being a German submarine. Why is this relevant? Because as a U-Boat can dive deep into water so U-SQL lets you dive deep into your data lake.

Within this post some more examples of how U-SQL may be used.

This first example is looking for the keyword XBOX from a twitter stream. It performs aggregations on that data to provide a list of topics where XBOX is mentioned along with a sentiment for the tweet.

Note that the C# regular expression is returning a bool so we have to convert to a Int32 before we can perform a SUM. Also note the use of the “quoting: true” command. As this twitter data is free text we add a quote identifier to the string data which allows us to read in the data even if the text has commas in it. (If we had this set to ‘false’ any commas in the text would cause an error.)

 /*Read tweets that have already been serialized to JSON and saved to Azure Data Lake Store*/
@Twitter =
    EXTRACT CreatedAt DateTime,
    Topic String,
    SentimentScore float?,
    Text String,
    filename String
    FROM "/datalake/output/Twitter/{filename:*}"
    USING Extractors.Csv(quoting: true);

/*Group By topic*/
@XBOX = SELECT Topic,
       AVG(SentimentScore) AS AvgSentimentScore,
       SUM(RegExResult) AS XBox
    FROM
    (
    SELECT Topic.ToUpper() AS Topic,
    SentimentScore,
    Convert.ToInt32(System.Text.RegularExpressions.Regex.IsMatch(Text, "XBOX", System.Text.RegularExpressions.RegexOptions.IgnoreCase)) AS RegExResult
    FROM @Twitter) AS A
GROUP BY Topic;

/*Ouput the result*/
OUTPUT @XBOX
    TO "/SQLBits/TwitterXBOX.csv"
USING Outputters.Csv();

 

This example demonstrates how to select the top records from a query. In SQL Server it is common to use TOP n to return a limited set of rows. In Hive LIMIT is used. In U-SQL FETCH as part of an order statement is the standard used to limit a return from a record set.

 DECLARE @From string = "/DemoFolder/2014-05-cumbria-street.csv";
DECLARE @To string = "/DemoFolderResult/outputtop10.csv";

@policefiles =
EXTRACT CrimeID String,
Month String,
Reportedby String,
Fallswithin String,
Longitude String,
Latitude String,
Location String,
LSOAcode String,
LSOAname String,
Crimetype String,
Lastoutcomecategory String,
Context String
    FROM @From
    USING Extractors.Csv()
    ORDER BY Crimetype FETCH 10 ROWS;

OUTPUT @policefiles
    TO @To
USING Outputters.Csv();

 

As with Hive, complex data types may be used to manipulate the data in U-SQL. In this example we are looking for the keyword ‘Violence’ in our police crime data. We could do this with a regular expression, or a simple LIKE predicate, but to demonstrate the use of an array we will split the text data into individual words and then explode the array into a table so we may perform an equality select. We will then group by a location to find the locations where most violent crimes have taken place.

 DECLARE @From string = "/DemoFolder/2014-05-cumbria-street.csv";
DECLARE @To string = "/DemoFolderResult/outputsplit.csv";

/*Return the files and tidy up the data by removing "On or near" from the location*/
@policefiles =
    SELECT *,
           Location.ToString().Replace("On or near ", "") AS TrimLocation
    FROM
    (
    EXTRACT CrimeID String,
            Month String,
            Reportedby String,
            Fallswithin String,
            Longitude String,
            Latitude String,
            Location String,
            LSOAcode String,
            LSOAname String,
            Crimetype String,
            Lastoutcomecategory String,
            Context String
    FROM @From
    USING Extractors.Csv()
ORDER BY Crimetype
FETCH 1000000 ROWS
    ) AS A;

//Turn the crime type into a SQL string array 
@SplitCrimeTypes =
    SELECT TrimLocation,
           new SQL.ARRAY(Crimetype.Split(' ')) AS split
    FROM @policefiles;

/*Search though the split out crime type to just look for crimes that have the word Violent in them
Then return the number of locations where a violent crime took place more than once*/
@Res =
    SELECT TrimLocation,
           b,
           COUNT( * ) AS Incidents
    FROM @SplitCrimeTypes
         CROSS APPLY
             EXPLODE(split) AS a(b)
    WHERE b == "Violence"
    GROUP BY TrimLocation,
             b
    HAVING COUNT( * ) > 1;

OUTPUT @Res
TO @To
USING Outputters.Csv();

 

Within Azure Data Lake tables may be created to allow data to be stored in a manner for faster data retrieval. Streaming data will generally enter the system in many small files, these can give poor performance when querying. With Hive data might be placed into an ORC file for faster data retrieval and in Spark Parquet files are generally used. In Azure Data Lake data may be transformed with U-SQL and placed into a table for faster data retrieval and better usage of parallel threads.

Tables in Azure Data Lake are clustered indexes that may be partitioned in one of several ways:

RANGE distribution will keep ranges of values together

ROUND ROBIN distribution will equally distribute the data. This should be used where other distributions skew the data too much. Skew can cause issues in Azure Data Lake as a vertex could end up with too much work to do and time out (vertexes time out after 5 hours)

HASH distribution use an automatic hash function to separate the data

DIRECT HASH distribution is for times when you want more control over how the data is hashed.

The larger the table the more extents it will be placed into. The more extents a file have the greater the ability Data Lake will have to run in parallel. This will improve performance and in addition queries will be able to use partition elimination where the table is partitioned by a relevant key.

The following demonstrates the different ways of hashing data:

 //Create a new table with an index for storing police data 
DROP TABLE IF EXISTS PoliceTab;
CREATE TABLE PoliceTab(
CrimeID String,
Month DateTime?,
Reportedby String,
Fallswithin String,
Longitude double?,
Latitude double?,
Location String,
LSOAcode String,
LSOAname String,
Crimetype String,
Lastoutcomecategory String,
Context String,
DirectHASH long
//, INDEX idxLocation CLUSTERED (Longitude ASC, Latitude ASC) PARTITIONED BY RANGE(Longitude, Latitude) INTO 10);
//, INDEX idxLocation CLUSTERED (Longitude ASC, Latitude ASC) PARTITIONED BY ROUND ROBIN INTO 10);
//, INDEX idxLocation CLUSTERED (Longitude ASC, Latitude ASC) PARTITIONED BY HASH(CrimeID) INTO 10);
, INDEX idxLocation CLUSTERED (Longitude ASC, Latitude ASC) PARTITIONED BY DIRECT HASH(DirectHASH) INTO 10);

//Insert our police data into it
INSERT INTO PoliceTab
SELECT  CrimeID,
        Month,
        Reportedby,
        Fallswithin,
        Longitude,
        Latitude,
        Location,
        LSOAcode,
        LSOAname,
        Crimetype,
        Lastoutcomecategory,
        Context,
        Convert.ToInt64(DENSE_RANK() OVER (ORDER BY Crimetype)) AS DirectHASH
FROM (EXTRACT CrimeID String,
    Month DateTime?,
    Reportedby String,
    Fallswithin String,
    Longitude double?,
    Latitude double?,
    Location String,
    LSOAcode String,
    LSOAname String,
    Crimetype String,
    Lastoutcomecategory String,
    Context String
    FROM "/DemoFolder/2014-05-cumbria-street.csv"
    USING Extractors.Csv(silent: true)) AS A;

 

In this final example we are taking a table and inserting adhoc data into it. This method may be used for creating lookup tables.

 DROP TABLE IF EXISTS LookupTable;
CREATE TABLE LookupTable
(
    id int,
    Description String,
INDEX ixLookuptable CLUSTERED (id) PARTITIONED BY ROUND ROBIN);

INSERT INTO LookupTable
    SELECT * FROM 
        (VALUES
            (1,    "Red"),
            (2,    "Green"), 
            (3,    "Blue"),
            (4,    "Yellow")
        ) AS 
              A(id, Description);

 

(In the last post on U-SQL it was noted that running multiple files in local mode “Run Local Plan” did not work – this limitation has now been removed)