Jaa


Geospatial Queries Using Hive

During one recent engagement, I was helping my customer align ETL activities that were originally developed using SQL Server and T-SQL with the capabilities that were available using Hadoop and Hive. Most of the translations where straight-forward but when asked about geospatial support in Hive I was admittedly stumped. After full disclosure to the customer that I am not a geospatial expert I began digging for an answer. Knowing that most if not all of the query functionality available in Hive happens through the use of user-define functions (UDFs), I began my research by looking to see if someone else had already skinned the proverbial cat and I found a gold-mine. ESRI, a provider of GIS software, has not only already solved the problem, but has provided their toolkit of Hive functions to the open-source community via GitHub (get it HERE). For the remainder of this post, I will review the steps necessary to set-up and use this toolkit but also walk through a real-world demo.

Set-Up

To begin, there are two JAR files needed:

  • ersi-geometry-api.jar – contains all the base/foundational classes
  • spatial-sdk-hadoop.jar – contains UDF and serde implementations

Since you can download these jar files already compiled from the ESRI GitHub site, I’ll forgo the particulars on building each from its source and get right into the steps required to run these on a HDInsight cluster.

  1. Upload both jar files to a container in your blob storage account. Make note of whatever folder/directory structure you use.

  2. Use the following to format the path to your the jar files:

    • wasb://<container>@<account>.blob.core.windows.net/<directory>/esri-geometry-api.jar
    • wasb://<container>@<account>.blob.core.windows.net/<directory>/spatial-sdk-hadoop.jar
  3. Using the Hive CLI or the HDInsight Query Console, declare both jar files or alternatively this can be done at the time of you spin up your cluster.

     add jar wasb://<container>@<account>.blob.core.windows.net/<directory>/esri-geometry-api.jar;
    add jar wasb://<container>@<account>.blob.core.windows.net/<directory>/spatial-sdk-hadoop.jar;
    
  4. For each function that you intend to use, you declare a temporary function as seen in the example. Note that Hive v0.13 supports permanent functions if you will be making wide spread use of these functions.

     create temporary function ST_Point as 'com.esri.hadoop.hive.ST_Point';
    create temporary function ST_Intersects as ‘com.esri.hadoop.hive.ST_Intersects';
    

Now your ready to test. To test these functions without a spatial data set we can use two fixed points defined by their latitude and longitude. Since I spend an inordinate amount of time on an airplane, I’ve picked a point near La Guardia International (LGA) airport in New York and a second point near Los Angeles International (LAX) in California. From these two points, we can build a line string which will then allow us to measure the geodesic distance between the two points. The query including the function definitions discussed above looks like this:

 set hive.execution.engine=tez;

add jar wasb://gis@bluewaterdata.blob.core.windows.net/lib/esri-geometry-api.jar;
add jar wasb://gis@bluewaterdata.blob.core.windows.net/lib/spatial-sdk-hadoop.jar;

create temporary function ST_Point as 'com.esri.hadoop.hive.ST_Point';
create temporary function ST_LineString as 'com.esri.hadoop.hive.ST_LineString'; 
create temporary function ST_GeodesicLengthWGS84 as 'com.esri.hadoop.hive.ST_GeodesicLengthWGS84'; 
create temporary function ST_SetSRID as 'com.esri.hadoop.hive.ST_SetSRID'; 

SELECT
    ST_GeodesicLengthWGS84(
        ST_SetSRID(
            ST_Linestring(40.773773, -73.872307, 33.944115, -118.40377),
            4326))/1000.0 * 0.62137119 AS DistanceInMiles;

image

Building a Hive Geospatial Demo

Now that I’ve whet your appetite for the subject matter, let’s turn our attention to building a more real-world demo beginning with the data. There are a number of examples floating around the interwebs that use a mythical set of Uber data. I say mythical because there are bigfoot like spotting of it but it appears as though the data set has been removed from the public domain and is no longer available. That being said a suitable substitute is the NYC Taxi Trip data made available by NYC Taxi Commission and Chris Whong via a FOIL request (read more about it and get the data here). This data set consists of two parts one with trip information (pick-up/drop-off date time and locations) and the second with fare and payment information. To build/run the demo below you will need to download the data from one of its many free sources and then upload it a blob storage account connected to your HDInsight cluster. Now spin up an appropriate sized cluster and let’ go. First let’s create two external tables over top of the data file:

 DROP TABLE IF EXISTS TABLE fares;
CREATE EXTERNAL TABLE fares (
medallion STRING,
hack_license STRING,
vendor_id STRING,
pickup_datetime STRING,
payment_type STRING,
fare_amount FLOAT,
surcharge FLOAT,
mta_tax FLOAT,
tip_amount FLOAT,
tolls_amount FLOAT,
total_amount FLOAT
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
STORED AS TEXTFILE
LOCATION 'wasb://<container>@<account>.blob.core.windows.net/fares/'
TBLPROPERTIES ("skip.header.line.count"="1");

DROP TABLE IF EXISTS trips;
CREATE EXTERNAL TABLE trips (
medallion STRING,
hack_license STRING,
vendor_id STRING,
rate_code STRING,
store_and_fwd_flag STRING,
pickup_datetime STRING,
dropoff_datetime STRING,
passenger_count INT,
trip_time_in_secs INT,
trip_distance FLOAT,
pickup_longitude STRING,
pickup_latitude STRING,
dropoff_longitude STRING,
dropoff_latitude STRING
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
STORED AS TEXTFILE
LOCATION 'wasb://<container>@<account>.blob.core.windows.net/trips/'
TBLPROPERTIES ("skip.header.line.count"="1");

Once the tables are in place, verify by issuing a few select statements using the LIMIT clause to verify that everything is primed and ready to go. With the set-up out of the way let’s turn our attention to a use-case. The narrative for this demo is straight-forward: The Contoso Taxi company is exploring new business opportunities and would like to analyze trips that either originated or terminated at La Guardia International Airport during the last year. The primarily challenge as far as this post is concerned is indentifying which trips are in scope for the purpose of analysis. To start, the data for this time period is moderately substantial with about 2gb of data generated each month which makes using Hive beneficial since we can leverage it distributed nature for the purpose of processing. Complicating matters however it the trip data that only contains a latitude/longitude for both pickup and drop-off locations. So how can we go about solving this problem in Hive using the ESRI Geospatial UDFs? First, I used one of the many free mapping tools found on the interwebs to define a polygon that encapsulates La Guardia. This polygon is defined by a series of lat/long points and is represented in well-known text (WKT) format as seen below.

 POLYGON ((40.773506 -73.889508, 40.774081 -73.884028, 40.779855 -73.884919, 
40.780605 -73.879043, 40.781755 -73.879373, 40.782805 -73.878614, 
40.78143 -73.87528, 40.786004 -73.871153, 40.785379 -73.869668, 
40.785079 -73.869734, 40.783705 -73.869304, 40.78233 -73.87046, 
40.782255 -73.871648, 40.78063 -73.872804, 40.772881 -73.856991, 
40.772355 -73.856661, 40.770381 -73.858081, 40.76753 -73.852666, 
40.765155 -73.851214, 40.764905 -73.851676, 40.767105 -73.853062, 
40.768781 -73.856694, 40.76703 -73.858707, 40.766805 -73.862768, 
40.770356 -73.867753, 40.771481 -73.870228, 40.771931 -73.872143, 
40.772031 -73.873893, 40.771755 -73.876236, 40.771256 -73.877887, 
40.770505 -73.87967, 40.769606 -73.881321, 40.76853 -73.8834, 
40.767756 -73.885513, 40.76723 -73.887428, 40.767105 -73.888055, 
40.773506 -73.889508))

Using the shape data, I then can create a polygon object using the ST_Polygon UDF. Next, I use the ST_Point UDF to create a point for each trip’s pickup and drop-off location. Finally, I use ST_Contains as a predicate to find trips that either begin or end within the polygon. The resulting abbreviated query looks as follows:

 SELECT *
FROM trips
WHERE 
    ST_Contains(ST_Polygon("<OMITTED FROM BREVITY>"), ST_Point(pickup_longitude, pickup_latitude))
OR 
    ST_Contains(ST_Polygon("<OMITTED FROM BREVITY>"), ST_Point(dropoff_longitude, dropoff_latitude))

Since we are interested in doing an analysis only on the trips identified, we would most like load these into a new table making whatever data transformations are needed in the process. The resulting full example including a new partitioned internal table and declarations is provided below.

 DROP TABLE IF EXISTS nyc_taxi_analysis;

CREATE TABLE nyc_taxi_analysis (
    medallion STRING,
    hack_license STRING,
    vendor_id STRING,
    rate_code STRING,
    store_and_fwd_flag STRING,
    passenger_count INT,
    trip_time_in_secs INT,
    trip_distance FLOAT,
    pickup_longitude STRING,
    pickup_latitude STRING,
    dropoff_longitude STRING,
    dropoff_latitude STRING,
    trip_year INT, 
    trip_month INT, 
    trip_day INT
    trip_hour INT,
    trip_week INT,
    payment_type STRING,
    fare_amount FLOAT,
    surcharge FLOAT,
    mta_tax FLOAT,
    tip_amount FLOAT,
    tolls_amount FLOAT,
    total_amount FLOAT
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
STORED AS ORC;

set hive.execution.engine=tez;

add jar wasb://gis@bluewaterdata.blob.core.windows.net/lib/esri-geometry-api.jar;
add jar wasb://gis@bluewaterdata.blob.core.windows.net/lib/spatial-sdk-hadoop.jar;

create temporary function ST_Point as 'com.esri.hadoop.hive.ST_Point';
create temporary function ST_Contains as 'com.esri.hadoop.hive.ST_Contains';
create temporary function ST_Polygon as 'com.esri.hadoop.hive.ST_Polygon';

INSERT OVERWRITE TABLE nyc_taxi_analysis PARTITION (year, month, day) 
SELECT
    t.medallion,
    t.hack_license,
    t.vendor_id,
    t.rate_code,
    t.store_and_fwd_flag,
    t.passenger_count,
    t.trip_time_in_secs,
    t.trip_distance,
    t.pickup_longitude,
    t.pickup_latitude,
    t.dropoff_longitude,
    t.dropoff_latitude,
    YEAR(t.pickup_datetime) AS trip_year,
    MONTH(t.pickup_datetime) AS trip_month,
    DAY(t.pickup_datetime) AS trip_day,
    HOUR(t.pickup_datetime) AS trip_hour,
    WEEKOFYEAR(t.pickup_datetime) AS trip_week,
    payment_type,
    fare_amount,
    surcharge,
    mta_tax,
    tip_amount,
    tolls_amount,
    total_amount
FROM Trips t
JOIN Fares f ON t.medallion = f.medallion
AND t.hack_license = f.hack_license
AND t.vendor_id = f.vendor_id
AND t.pickup_datetime = f.pickup_datetime
WHERE 
    ST_Contains(ST_Polygon("POLYGON ((40.773506 -73.889508, 40.774081 -73.884028, 40.779855 -73.884919, 40.780605 -73.879043, 40.781755 -73.879373, 40.782805 -73.878614, 40.78143 -73.87528, 40.786004 -73.871153, 40.785379 -73.869668, 40.785079 -73.869734, 40.783705 -73.869304, 40.78233 -73.87046, 40.782255 -73.871648, 40.78063 -73.872804, 40.772881 -73.856991, 40.772355 -73.856661, 40.770381 -73.858081, 40.76753 -73.852666, 40.765155 -73.851214, 40.764905 -73.851676, 40.767105 -73.853062, 40.768781 -73.856694, 40.76703 -73.858707, 40.766805 -73.862768, 40.770356 -73.867753, 40.771481 -73.870228, 40.771931 -73.872143, 40.772031 -73.873893, 40.771755 -73.876236, 40.771256 -73.877887, 40.770505 -73.87967, 40.769606 -73.881321, 40.76853 -73.8834, 40.767756 -73.885513, 40.76723 -73.887428, 40.767105 -73.888055, 40.773506 -73.889508))"), 
ST_Point(pickup_longitude, pickup_latitude))
OR 
    ST_Contains(ST_Polygon("POLYGON ((40.773506 -73.889508, 40.774081 -73.884028, 40.779855 -73.884919, 40.780605 -73.879043, 40.781755 -73.879373, 40.782805 -73.878614, 40.78143 -73.87528, 40.786004 -73.871153, 40.785379 -73.869668, 40.785079 -73.869734, 40.783705 -73.869304, 40.78233 -73.87046, 40.782255 -73.871648, 40.78063 -73.872804, 40.772881 -73.856991, 40.772355 -73.856661, 40.770381 -73.858081, 40.76753 -73.852666, 40.765155 -73.851214, 40.764905 -73.851676, 40.767105 -73.853062, 40.768781 -73.856694, 40.76703 -73.858707, 40.766805 -73.862768, 40.770356 -73.867753, 40.771481 -73.870228, 40.771931 -73.872143, 40.772031 -73.873893, 40.771755 -73.876236, 40.771256 -73.877887, 40.770505 -73.87967, 40.769606 -73.881321, 40.76853 -73.8834, 40.767756 -73.885513, 40.76723 -73.887428, 40.767105 -73.888055, 40.773506 -73.889508))"), 
ST_Point(dropoff_longitude, dropoff_latitude));

Running the above script on a 16-node cluster and it completed in 638 seconds (your results may vary). Query the results to sample/review and ensure everything is in order.

Wrap-Up

In this post I introduced the ESRI Geospatial UDF library and the steps needed to run in on a HDInsight cluster. I discussed building a demo on real-world data to highlight a portion of the capabilities these functions provide. I hope you were able to take away at least a high level overview of the geospatial capabilities available within Hive and that should the need arise in a future project you can leverage these capabilities as appropriate. Till next time!

Chris