Freigeben über


Something's Brewing with Azure Data Factory Part 2

In my last post (HERE), I started hacking my way through the new Azure Data Factory service to automate my beer recommendation demo. The first post was all about setting up the necessary scaffolding and then building that first pipeline to move data from the Azure SQL Database into Azure Blob Storage. In this post, we will do something interesting with that data, by using the Mahout library to apply collaborative filtering to generate user-based recommendations. Without further delay, lets get started.

Getting Ready to Ride

Mahoutis a scalable machine learning library that runs as MapReduce job within an HDInsight cluster. While a discussion of Mahout particulars is outside the scope of this blog, in order to leverage this library we simply need to execute a MapReduce job with the correct configuration. Recalling from the prior post, we defined linked services which allowed for access to Azure SQL and Blob data source, then datasets which act as both inputs and outputs for pipelines which are responsible for the heavy lifting. Before worrying about the running any Mahout job we first need to define a linked service for a HDInsight cluster Two options exists for creating a HDInsight cluster. You can either bring your own HDInsight cluster or have one created on-demand by Azure Data Factory. For this demo, we will use the on-demand variety. To create an on-demand cluster, we first must create and configure a LinkedService which access to the Azure Blob Storage account that will serve as a container for HDInsight as seen in the JSON below.

 {
    "name": "HDIStorageLinkedService",
    "properties":
    {
        "type": "AzureStorageLinkedService",
        "connectionString": "DefaultEndpointsProtocol=https;AccountName=<ACCOUNT />;AccountKey=<ACCOUNT_KEY />"
    }
}

Next, we use JSON to configure the actual on-demand HDInsight service.

 {
    "name": "HDILinkedService",
    "properties": 
    {
    "type": "HDInsightOnDemandLinkedService",
    "clusterSize": "4",
    "jobsContainer": "DataFactoryJobs",
    "timeToLive": "00:05:00",
    "linkedServiceName": "HDIStorageLinkedService"
    }
}

The final step is to create both linked services using the following PowerShell command.

 New-AzureDataFactoryLinkedService -ResourceGroupName $ns -DataFactoryName $df -File .\HDIStorageLinkedService.json
New-AzureDataFactoryLinkedService -ResourceGroupName $ns -DataFactoryName $df -File .\HDILinkedService.json

With the required Linked Services in place, we can turn our attention to inputs and outputs. The input we will use for our new Pipeline is the output from the Ingress step (the UserRatingsBlob dataset) we defined in the prior post. That means that it is only necessary to define one new dataset. In this step our output dataset is only used as scaffolding to wire our pipelines together. This makes it unnecessary to define a structure or schema. To define the new Mahout output data set we can use the following JSON:

 {
    "name": "MahoutRecommendations",
    "properties":
    {
        "location": 
        {
            "type": "AzureBlobLocation",
            "folderPath": "<CONTAINER />/mahout/recommendations/{Year}_{Month}_{Day}/",
            "linkedServiceName": "StorageLinkedService",
            "partitionedBy":
            [
                { name: "Year", value: { type: "DateTime", date: "SliceStart", format: "yyyy" } },
                { name: "Month", value: { type: "DateTime", date: "SliceStart", format: "%M" } },
                { name: "Day", value: { type: "DateTime", date: "SliceStart", format: "%d" } }
            ]
        },
        "availability": 
        {
            "frequency": "day",
            "interval": 1
        }
    }
}

After the JSON file has been created on your local file system, the PowerShell command below will create the new dataset.

 New-AzureDataFactoryTable -ResourceGroupName $ns -DataFactoryName $df –File .\MahoutRecommendations.json

After all the required Linked Services and datasets are created, we can turn our attention to the Pipeline that will ultimately generate the recommendations.

Ride the Elephant

The Azure Data Factory contains a number of built in transformation types that allow you to manipulate data using Hive, Pig, MapReduce or even a bit of custom C# code. Since Mahout runs as a MapReduce job, we will leverage the MapReduce type using the JSON below.

 {  
       "name": "MahoutPipeline",       
       "properties":{  
          "description":"Pipeline uses collaborative filtering to generate recommendations",
          "activities":[  
             {  
                "name":"MahoutActivity",
                "description":"Map Reduce job to generate Mahout recommendations",
                "type":"HDInsightActivity",
                "inputs":[  
                   {  
                      "Name":"UserRatingsBlob"
                   }
                ],
                "outputs":[  
                   {  
                      "Name":"MahoutRecommendations"
                   }
                ],
                "linkedServiceName":"HDILinkedService",
                "transformation":{  
                   "type":"MapReduce",
                   "className":"org.apache.mahout.cf.taste.hadoop.item.RecommenderJob",
                   "jarFilePath":"dfdemo/mahout/jars/mahout-examples-0.9.0.2.1.9.0-2196-job.jar",
                   "jarLinkedService":"StorageLinkedService",
                   "arguments":[  
                      "-s",
                      "SIMILARITY_PEARSON_CORRELATION",
                      "--input",
                      "wasb://@.blob.core.windows.net/reviews/",
                      "--output",
                      "$$Text.Format('wasb://@.blob.core.windows.net/mahout/recommendations/{0:yyyy_MM_dd}/', Time.AddHours(SliceStart, 0))",    
                      "--maxSimilaritiesPerItem",
                      "10",
                      "--tempDir",
                      "$$Text.Format('wasb://@.blob.core.windows.net/mahout/temp/{0:yyyy_MM_dd}/', Time.AddHours(SliceStart, 0))"                        
                   ]
                },
                "policy":{  
                   "concurrency":1,
                   "executionPriorityOrder":"OldestFirst",
                   "retry":1,
                   "timeout":"01:00:00"
                }
             }
          ]
       }
    }

A couple of things to point out in the configuration. Beyond the the input, outputs and linked services I want to call your attention to the MapReduce specific items. Note that the Mahout jar is stored in a blob storage container (dfdemo/mahout/jars) and a linked service is provided to allow for access to the library. Next, note that all the command-line or runtime arguments required by the Mahout RecommenderJob class are define in the arguments configuration section. These include the similarity calculation method, the maximum number of recommendations to generate for each user, the input, output and temp directories that will be used when the job runs. Finally, Mahout presents an interesting challenge since it neither cleans-up after itself nor allows for overwriting of files. This means that both the Temp and less-so the Output directory pose a potential issue in a system where Mahout will be run daily or on whatever cyclical schedule you define. To avoid errors, it is necessary to use the slice to make these folders unique on each run.

 New-AzureDataFactoryPipeline -ResourceGroupName $ns -DataFactoryName $df –File .\PipeLines\MahoutPipeline.json
Set-AzureDataFactoryPipelineActivePeriod -ResourceGroupName $ns -DataFactoryName $df –Name MahoutPipeline -StartDateTime 2015-01-24Z -EndDateTime 2015-01-24Z -Force

Lastly, after the JSON config file is in place, create the pipeline and set the activity period to create the slices and run the pipeline as seen in the preceding code snippet.

Wrap-Up

Using the Azure Portal UI, verify the Data Factory is in order and that the diagram for the workflow looks similar to the screenshot below.

image

image

When the job runs, the Data Factory will spin up a HDInsight cluster on-demand using the configuration you specified. This step will take longer than anything else in the workflow. You can monitor the workflow by observing the event log or by verifying that that the slice is running as expected by drilling into the MahoutRecommendations dataset.

image

image

After the job completes, you can browse the recommendation results from blob storage.

image

In the next post, we will wrap-up this demo as we use the Hive transformation to un-pivot the Mahout recommendation results and then the CopyActivity to load or egress the data back to our Azure SQL Database.

Till next time!

Chris