Share via


Microsoft Hadoop Distribution (MHD): Fluent Queries on the Interactive JavaScript Console

 The interactive JavaScript console that comes as part of the Microsoft Hadoop Distribution (MHD) provides the ability to query data stored in HDFS and do interesting things with it. This is done by layering a fluent API on top of Apache Pig to provide a more direct and streamlined experience to the user.

Querying Data

Data stored in HDFS can be queried through the interactive console by the help of special objects that provide a fluent API to filter, project, and group the data. Each query is stored by one of this objects and they are created by using the from function:

from("my_input_folder/input1.txt")

The function takes as a required parameter the path to the file stored in HDFS that is going to be used as the data set on which the query will operate. The function can also a accept a directory as an input. In this case all the files inside the directory are going to be read as the data set on which the query will work.

from("my_input_folder")

Also wildcards are accepted:

from("my_input_dir/sales*")

Will make the query’s data set only those files whose name start with sales.

Query objects provide a set of different operators to do interesting transformations to the data. It can be projected, filtered, and grouped into refined data sets that answer a particular question or for further processing. All the query operators return a new query object on which a different operator can be applied. This chaining of operators allows the creation of complex queries in the console that are executed as a single job.

The following operators are available:

  • select
  • where
  • orderBy
  • groupBy
  • take
  • mapReduce
  • sum

Applying Schemas

Queries in the console follow the same rules as Apache Pig for referencing data from a data source. Data is divided into records and these into columns. Usually a column is referenced by its position (starting from 0) in the record. For example, $0 refers to the first column, $2 to the third and so on. This referencing scheme is effective and good enough with simple queries and small data sets; however it can become confusing fairly quickly… specially if it is not easy to determine whether $1 represent the region or the price for instance.

To ease this pain, the console allows hinting the query about the structure of the data by using a schema. In order to apply a schema a second string attribute is passed to the from function:

from("input.txt", "date, region, name, product, qty, price, total")

Such a function call instructs the console that in the data set read from the “input.txt” file, the first field is the date, the second the region, and so forth. Once a schema is applied to the query, the columns of a record can now be referenced by the names from the schema instead of their position. For example, the following two queries yield the same results:

from("input.txt").select("$1, $3, $5").run()

from("input.txt", "date, region, name, product, qty, price, total")
.select("region, product, total").run()

Schema strings use the same syntax as Apache Pig and they can be stored into variables in the console for their later use:

salesSchema = "date, region, name, product, qty, price, total"

from("input.txt", salesSchema).select("region, product, total").run()

from("input.txt", salesSchema).select("name, product, qty, total").run()

Input Data Format

The console, because of its use of Apache Pig, expects that the data sources are tab limited files; however, the from function allows for the creation of a query that can operate with files that of different formats. By using a third argument in the function, it is possible to instruct the console that the query will split the tuples in the input data set using a particular character combination:

from("input.csv", "", "," ).run()

This function call will yield a query that will read all the tuples from “input.csv” and split them into attributes using “,” as the attribute separator.

Extra care should be taken so that the selected column delimiter matches the input file format. If not properly matched, the query will yield the incorrect data.

 

Query with a delimiter matching the input data format.

Query using the wrong delimiter.

Executing Queries

Now that the console has provided a query object, it is necessary to know what to do with it. Queries in the console execute asynchronously and will not start until instructed to do so. The console supports to ways of submitting queries:

from("input.txt").run()

from("input.txt").to("my_output_path")

The former will execute the query in place and will load the result set into the console automatically; however, the output files of the Pig job will stored in a temporary location and eventually removed. The latter, will save the output files of the query to the directory specified (the directory MUST NOT exist); however, it will not load the results back into the console. More information on how to read a file’s content into the console can be found here.

 

In place execution of a query.

Normal execution of a query.

Once the data has been loaded into the console, then interesting things can be done with it. For example it can be graphed with the help of the console’s data visualization capabilities… or it can be manipulated using standard JavaScript statements and JQuery (through the $ function).

Storing and Reusing Queries

Queries in the console are JavaScript objects, thus they can be stored in any regular variable and used several times.

query1 = from("input.txt")

Will create a new query object and store into the query1 variable. No we can execute this query or use to create more refined queries by filtering, grouping, and projecting the data.

query1.run()

query2 = query1.where("$3 == 'Pencil'")

query2.to("q2_output")

JavaScript Predicates

Several query operators allow the use a JavaScript predicate function instead of a string as their input argument. This function is not executed on the browser; rather, it is sent over the wire to the server where it is compiled into the appropriate Apache Pig expression. Because of this the following rules apply to the predicates:

  • The function must contain only a return statement with the expression used as the predicate.
  • All identifiers in the return statement will be bound to the query’s data set columns.
  • All JavaScript relational, arithmetic, and logical operators can be used to build expressions that operate on the data set columns.
  • If a schema is not used with the query, then only positional references ($1, $3, ..) can be used.
  • Apache Pig functions such as SUM can be invoked as standard JavaScript function call.

For example, these are valid JavaScript predicates:

function() { return $3 === "pencil"; }

function() { return region === "Quebec" && qty >= 10; }

function() { return [region, product, qty * price]; }

function() { return {region: $2, sales: SUM($6) }; }

Data Projection

So far, the majority of the queries shown only read data from a file or a folder and returned it just as it is. This is the moral equivalent of selecting all the columns of all the rows stored in a database table. Depending on the needs at hand, it is no always necessary (nor desirable) to bring over all the columns found in the dataset. The query object allows to project the input data to a different data set with only the wanted columns via the select operator:

from("input.txt").select("$1, $3, $4").run()

This command line will query only for the fields one, three, and four in the file “input.txt” and load the result set into the console.

Computed columns and column aliases can be used as well:

from("input.txt")
.select("$2 AS region, $3 AS product, $4 * $5 AS total").run()

When using aliases, these will become the schema for the query object returned by the select operator. This allows an operator chained to the query above to now use friendly names to refer to the fields instead of a positional reference:

from("input.txt")
.select("$2 AS region, $3 AS product, $4 * $5 AS total")
.select("region, total").run()

A JavaScript predicate can be used with this operator and it should return the projected fields either as an array literal or an object literal. If an object literal is used, then the names of the properties in the literal become the field aliases. Computed fields can be used either with object literals or array literals. For example, the first query below uses a predicate returning an array literal while the second uses an object literal instead:

from("input.txt").select(function(){return [$2, $3, $4 * $5];}).run()

from("input.txt")
.select(function(){return {region: $2, product: $3, total: $4 * $5};}).run()

Filtering

A query can filter the data set it operates on with the where operator. For example, the following query can be used to retrieve only the data about the pencils that is stored in the file “input.txt:

from("input.txt").where("$2 != 'Quebec' AND $3 == 'Pencil'").run()

The operator takes as an argument a string containing an Apache Pig expression that evaluates to a boolean value. Also, a JavaScript predicate can be used, but unlike the select operator, the predicate must have a single return statement that yields a true of false value:

from("input.txt").where(function() { return $2 !== "Quebec" && $3 === "Pencil"; })

Sorting

Data can also be sorted by a query with the help of the orderBy operator:

from("input.txt", "date,region,name,product,qty,price,total")
.orderBy("region, product DESC").run()

This query will return all the data in the input.txt file sorted first by region in ascending order and then by product in descending order. Similar to other operators, the orderBy one takes as its only argument an string containing an Apache Pig expression indicating the sorting criteria.

Also a JavaScript predicate function can be used:

from("input.txt").orderBy(function() {return [$1, $3];}).run()

A caveat with this operator is that, currently, it only supports ascending order (which is the default) when using a JavaScript predicate.

Aggregation

The console has the ability of working with summarized data sets by supporting the aggregation of data in a query with the help of the groupBy operator:

from("input.txt").groupBy("$1, $3").run()

from("input.txt", "date,region,name,product,qty,price,total")
.groupBy("region, product").run()

The operator takes as its argument an Apache Pig expression indicating the fields or computed values that will be used as the grouping keys of the result data set. Also, like other operators a JavaScript predicate function can be used:

from("input.txt").groupBy(function() { return [$1, $3]; }).run() 

Special care has to be taken when dealing with the result set of a groupBy operator. It will consist of a set of records with two fields: group and $1. The former can be either the key field indicated in the group by or a record on its own if more than one grouping key was used. The latter, is a nested data set containing the aggregated data for a particular key set. The operator chained immediately after the groupBy will have to be aware of this structure if an Apache Pig expression string was used as an argument for the groupBy. This complexity can be avoided if a schema is applied to the query and JavaScript function predicates are used on both operators. This way all the referenced fields will be automatically bound to the right field of the grouped result set.

from("input.txt").groupBy("$1")
.select("group, $1.$3, $1.$6").run()

from("input.txt").groupBy("$1")
.select(function(){return [group, $1.$3, $1.$6];}).run()

from("input.txt","date,region,name,product,qty,price,total")
.groupBy(function() { return region };).
.select(function() { return [region, product, total]; }).run()

For example, the three queries above are equivalent and will yield the product and total by region. Notice how the queries differ in how they reference the fields after the groupBy. Both the first and the second queries need precise knowledge of the result set from the group by in their respective select operators. On the other hand, the third query doesn’t face this situation making it simpler, although longer to type.

Once a data set has been aggregated, then it can be summarized in the following operator by using Apache Pig eval functions:

from("input.txt","date,region,name,product,qty,price,total")
.groupBy("region")
.select("group, SUM($1.total)").run()

from("input.txt","date,region,name,product,qty,price,total")
.groupBy(function() { return region };)
.select(function() { return [region, SUM(total)]; }).run()

Another way of summarizing the data is to use the eval operators exposed by the query object directly. Currently, only the SUM eval function is supported and it can be used either directly or after a group by:

from("input.txt").groupBy("$1").sum("$1.$6").run()

from("input.txt").sum("$1.6").run()

from("input.txt","date,region,name,product,qty,price,total")
.sum("$1.total").run()

The sum operator takes as its only parameter either a string or a JavaScript predicate indicating on which field the sum should be computed. The same restrictions for referencing a field in any operator immediately following a group by also apply to sum even if it is chained with a different operator. This is because the data set will be implicitly aggregated during the query execution if hasn’t been already.

Limiting the Result Set

The result set from a particular query can be limited to an arbitrary number of records with the help of the take operator. This operator takes a its single parameter an integer value indicating how many records to return:

from("input.txt").take(10).run()

Map Reduce Jobs as Queries

It's not always possible to express the desired transformations to a data set with the available query operators. To enable more advanced scenarios the console allows executing of a JavaScript map reduce job as a query via the mapReduce operator.

from("gutenberg").mapReduce("AcmeWordCount.js", "word, count:long").run()

The operator takes two string arguments: the path to the script containing the map reduce job and the schema of the output produced by the job. For example, the query above will execute the “AcmeWordCount.js” job using the files inside the “gutenberg” directory. Keep in mind that the MapReduce script has to be uploaded to HDFS before running such a query.