Freigeben über


Something's Brewing with Azure Data Factory

Awhile back I put together a presentation to show off HDInsight using Mahout and most everyone’s favorite Beer. The concept was simple. A spartan website allowed users to create a website and rate beers. The ratings were then fed into Mahout to generate recommendations using Collaborative filtering. Looking back, the process I used was good enough for demos but was pretty rough. Oozie was fully supported so I rolled my own solution. I used Sqoop to move data from an Azure SQL database to Azure Blob Storage, ran one Mahout job (MapReduce) to generate recommendations, another MapReduce job to unpivot the results before loading the results back to the Azure SQL database again using Sqoop. Your head is probably spinning after reading that and while it was messy, it worked and at the time it was what we had to work with. With Azure Data Factory there is a better and more efficient way. The next couple of posts will walk-through rebuilding that demo using a Data Factory to orchestrate both data movement and job execution.

The Set-Up

Before diving in, lets set the stage with a little background. The base of this demo is a website in which users can browse and rate beers and subsequently receive intelligent recommendations. Beers are rated on a scale from 1 to 5. Users and their ratings data is stored in a Azure SQL database called Beer. The review data is run through Mahout using collaborative filtering.

image image

Before you get started you want to make sure you have a couple of things. You will need an Azure account, access to the new Azure Portal and Azure PowerShellinstalled and configured on your machine. The example I will present uses an Azure SQL database, Azure Blob storage and of course Azure Data Factory. For brevity, I will not discuss setting up Azure resources outside of the Data Factory.

Step 1: Creating the Data Factory

The Data Factory is the container for all subsequent activities we will discuss. There are currently two ways to create the Data Factory: the Azure Portal UI and PowerShell. Using the Azure Portal to create the Data Factory is a trivial task. Click on the ‘+ New’ button and select Azure Data Factory. Within the dialog you will need to specify a globally unique name, a resource group, the subscription (if you have multiple) and the geographical region.

image image

You can likewise perform the same operation using Azure PowerShell. Before you begin executing commands, you’ve switched to AzureResourceManager mode and also that you are authenticated and your subscription is selected.

 Switch-AzureMode AzureResourceManager

Add-AzureAccount 

Select-AzureSubscription -SubscriptionName "Visual Studio Ultimate with MSDN"

To create the Data Factory through PowerShell the following command is used. Please note that I am using PowerShell variables to simplify my script and will not repeat the declarations subsequently.

 $df = "Brewhaus"
$ns = "BluewaterDataFactory"
$loc = "West US"

#Create the Data Factory
New-AzureDataFactory -ResourceGroupName $ns -Name $df -Location $loc -Force -ErrorAction Stop

After PowerShell has reported that the Data Factory was created successful, you can browse the Data Factory dashboard in the Azure Portal. Tip: To simplify working with this dashboard is to pin it to your Startboard to avoid having to constantly browse for it. image

Step 2: Defining Linked Services

With the Data Factory in place, the next step is to define the Linked Services required for our workflow. Linked Services can be define as one of five types: AzureStorageLinkedService, AzureSqlLinkedService, OnPremiseSqlLinkedService, HDInsightOnDemandLinkedService and HDInsightBYOCLinkedService. For our purposes we are only concerns with connecting two an Azure SQL database and Azure Blob Storage. This requires two Linked Services one for each. While Linked Services can be defined through the portal, I will define these using a JSON definition and PowerShell. To define the Linked Service for an Azure SQL database, we construct the required JSON containing a connection string to the appropriate database as seen below.

 {
    "name": "AzureSqlLinkedService",
    "properties":
    {
        "type": "AzureSqlLinkedService",
        "connectionString": " Server=tcp:<SERVER>.database.windows.net,1433; Database=Beer; User ID=<USER_ID>; Password=<PASSWORD>; Trusted_Connection=False; Encrypt=True; Connection Timeout=30"
    }
}

For a storage account Linked Service, the JSON contains the blob storage account name and key.

 {
    "name": "StorageLinkedService",
    "properties":
    {
        "type": "AzureStorageLinkedService",
        "connectionString": " DefaultEndpointsProtocol=https; AccountName=<ACCOUNT_NAME>; AccountKey=<ACCOUNT_KEY>"
    }
}

To create the Linked Services the following PowerShell commands are used taking care to use the correct path to the JSON configuration files on your machine.

 #Create Linked Services
New-AzureDataFactoryLinkedService -ResourceGroupName $ns -DataFactoryName $df -File .\AzureSqlLinkedService.json
New-AzureDataFactoryLinkedService -ResourceGroupName $ns -DataFactoryName $df -File .\StorageLinkedService.json

These Linked Services function as connection managers for your Data Factory. In the subsequent steps, we will define datasets and pipelines which reference these connections for the purpose of either accessing or processing data.

Step 3: Defining Datasets

The datasets or more accurately tables that we define provide the input and output schemas needed for this workflow. These schemas are where our we define the column (name and data type) structure, the location as well as the availability of the data. Like Linked Services, we must define two Tables one for each using JSON. Please note that as of the writing of the post creating datasets through the portal UI is not supported. Starting with source data located in the Azure SQL database, it is necessary to pull only three columns of data (UserID, BeerID and OverallRating) for this exercise. Since the data is coming from an Azure SQL database it is not necessary to specify a structure as it will be inferred from the database schema. This table points to our Azure SQL Linked Service and references the BeerReview table which contains the needed data. This demo will use a daily pull and does not have any external dependencies as is reflected in the availability element below.

 {
    "name": "UserRatings",
    "properties":
    {
        "location":
        {
            "type": "AzureSqlTableLocation",
            "tableName": "BeerReview",
            "linkedServiceName": "AzureSqlLinkedService"
        },
        "availability": 
        {
            "frequency": "Day",
            "interval": 1,
            "waitonexternal": {}           
        }
    }
}

The blob data is an exact mirror of our source data. The big difference in each definition is how the location is specified. With blob storage we are not only writing data files to the configured account but we are partitioning the data into monthly folders. This is also where we could define any formatting options such as column and row delimiters or even the serializer used (current Text or Avro). For our demo, we will accept the defaults.

 {
    "name": "UserRatingsBlob",
    "properties":
    {
        "location": 
        {
            "type": "AzureBlobLocation",
            "folderPath": "dfdemo/userratings/year={Year}/month={Month}/",
            "fileName": "ratings{Year}{Month}{Day}",
            "partitionedBy":
            [
                { name: "Year", value: { type: "DateTime", date: "SliceStart", format: "yyyy" } },
                { name: "Month", value: { type: "DateTime", date: "SliceStart", format: "%M" } },
                { name: "Day", value: { type: "DateTime", date: "SliceStart", format: "%d" } }
            ],
            "format":
            {
                "type": "TextFormat",
                "columnDelimiter": ","
            },
            "linkedServiceName": "StorageLinkedService"
        },
        "availability": 
        {
            "frequency": "Day",
            "interval": 1            
        }
    }
}

With the JSON in hand we can create both datasets using the following PowerShell snippet.

 #Create Tables
New-AzureDataFactoryTable -ResourceGroupName $ns -DataFactoryName $df –File .\Tables\UserRatings.json
New-AzureDataFactoryTable -ResourceGroupName $ns -DataFactoryName $df –File .\Tables\UserRatingsBlob.json

Step 4: Creating a Pipeline

The final piece in the little puzzle we have going is the pipeline. The data pipeline is where we tie together our input and output. Activities such as transforming or enriching the data are define in the pipeline as we will explore in the next post. For this post, we will define only a single activity, the built in CopyActivity. Like data sets previously there is currently no way to define pipeline in the Portal UI, so JSON and PowerShell will be our go-to solution. In the JSON, we reference the two datasets and define the transformation. The transformation uses a SQL Data Reader as the source and filters based on the slice date. Since both input and output schemas are identical it is not necessary to perform and mapping between the source and destination.

 {
    "name": "CopyRatingsPipeline",
    "properties":
    {
        "description" : "This pipelines moves User Ratings from an Azure SQL Database to Azure Blob Storage",
        "activities":
        [
            {
                "name": "CopyActivity",
                "description": "Data Mover", 
                "type": "CopyActivity",
                "inputs":  [ { "name": "UserRatings"  } ],
                "outputs":  [ { "name": "UserRatingsBlob" } ],
                "transformation":
                {
                    "source":
                    {
                        "type": "SqlSource",
                        "SqlReaderQuery": "$$Text.Format(' SELECT UserID, BeerID, Overall FROM dbo.BeerReview WHERE [ReviewDate] = \\'{0:MM/dd/yyyy}\\'', Time.AddHours(SliceStart, 0))"
                    },
                    "sink":
                    {
                        "type": "BlobSink"
                    }
                },
                "policy":
                {
                    "Concurrency": 1,
                    "ExecutionPriorityOrder": "OldestFirst",
                    "Retry": 1,
                    "Timeout": "01:00:00"
                }
            }
        ]
    }
}

With the JSON definition we can use PowerShell to create the pipeline as seen below.

 #Create Pipelines
New-AzureDataFactoryPipeline -ResourceGroupName $ns -DataFactoryName $df –File .\PipeLines\CopyRatingsPipeline.json

After the PowerShell command completes successfully, browse can browse your Data Factory in the Azure portal. You can see the Linked Services, Datasets and Pipeline that were all created in the preceding steps. From the summary or main dashboard, click on the ‘Diagram’ button to browse a visual representation of the orchestration or workflow.

image image

Step 5: Setting the Pipeline Activity Period

With all the pieces in place, it is only necessary to define a schedule for the pipeline. The PowerShell command takes the usual parameters as well as a start and optional end time. Set-AzureDataFactoryPipelineActivePeriod -ResourceGroupName $ns -DataFactoryName $df –Name CopyRatingsPipeline

-StartDateTime 2015-01-24Z -EndDateTime 2015-02-02Z -Force

After the activity period has been defined we can browse the slices and their processing status within the output data set. image

Wrap-Up

In this posts, we create a new Data Factory and then defined Linked Services which act as connection managers within the Data Flow. Next, we defined data sets that become the input and outputs for our pipeline. The pipeline is responsible for moving the data needed for recommendation generation from an Azure SQL database to the configured blob storage account. In the next post, we will expand the demo by creating an On-Demand cluster running the Mahout MapReduce job to generate beer recommendations. Till Next Time!

Chris