How to find absence of signal in a Stream Analytics job
Introduction
Recently, I worked at an IoT solution where the customer had to build a Stream Analytics query to check when no data has arrived from a given device in a configurable time window. To solve this problem, the idea is to correlate the data stream containing real-time events (e.g. sensor readings) with device reference data. In this context, real-time data comes through a data stream input that uses the Event Hub or IoT Hub provider for telemetry data ingestion, whereas the reference data is defined as a blob which contains the metadata of all the devices In the sample below, JSON format is used both by the data stream and reference data, but you can use another format between those supported by Stream Analytics providers.. Now, in order to find all the devices that didn’t send a message within a given time window, reference data input should appear in the Stream Analytics query as the left element of the LEFT JOIN clause. But Stream Analytics does not support Reference Data as the left element in a JOIN clause L So I had to tweak the code to bring Reference Data on the left part. However, at least at the time of this writing, Stream Analytics does not support Reference Data as the left element in a JOIN clause. So I had to tweak the code to bring reference data on the left part. This article describes this technique. Basically, you have to create a multi-step query as follows:
- The first step in the query below simply creates a single event every N seconds (60 in my sample).
- The second step joins reference data with the result of the first step to turn reference data into a temporal data stream. This way I can bring reference data on the left of a JOIN clause. Please note the use of the ON clause. This is sort of magic trick to correlate the event produced by the first query with the reference data.
- The third step calculates the event count for each device in the given time window. Note: the duration of the time window needs to be equal to the duration of the time window used by the first step.
- The final SELECT applies a LEFT JOIN to the output of the second step and the output of third step. Since the second step was obtained from reference data, which contains all the devices, and the reference data appears as the left element of a LEFT JOIN, the query will analyze all the devices, not only those that sent at least a message in the current time window.
Test Files
This section contains the test files used on the QUERY tab on the Azure Management Portal to test the query.
DeviceEvents.Json
[
{
"deviceId":1,
"name":"device001",
"value": 5,
"status": "active",
"timestamp": "2015-12-16T10:00:00.0000000Z"
},
{
"deviceId":3,
"name":"device003",
"value":23,
"status": "active",
"timestamp": "2015-12-16T10:00:05.0000000Z"
},
{
"deviceId":5,
"name":"device005",
"value":17,
"status": "active",
"timestamp": "2015-12-16T10:00:20.0000000Z"
},
{
"deviceId":1,
"name":"device001",
"value":8,
"status": "active",
"timestamp": "2015-12-16T10:00:30.0000000Z"
},
{
"deviceId":3,
"name":"device003",
"value":22,
"status": "active",
"timestamp": "2015-12-16T10:00:35.0000000Z"
},
{
"deviceId":4,
"name":"device004",
"value": 18,
"status": "active",
"timestamp": "2015-12-16T10:00:36.0000000Z"
},
{
"deviceId":1,
"name":"device001",
"value":65,
"status": "active",
"timestamp": "2015-12-16T10:01:00.0000000Z"
},
{
"deviceId":3,
"name":"device003",
"value":28,
"status": "active",
"timestamp": "2015-12-16T10:01:05.0000000Z"
},
{
"deviceId":5,
"name":"device005",
"value":3,
"status": "active",
"timestamp": "2015-12-16T10:01:15.0000000Z"
},
{
"deviceId":1,
"name":"device001",
"value":54,
"status": "active",
"timestamp": "2015-12-16T10:01:30.0000000Z"
},
{
"deviceId":3,
"name":"device003",
"value":43,
"status": "active",
"timestamp": "2015-12-16T10:01:35.0000000Z"
}
]
Remarks:
- Please look at the timestamp of the events: events are contained in about 2 minutes time range. Hence, they are segmented in 3 consecutive time windows of 60 seconds each.
- Device with deviceId = 2 does not produce any events
- Device with deviceId = 4 produces a single event in the second time window
DeviceReferenceData.json
This file needs to be copied to blob storage and used as reference data. It contains a record for each device.
[
{
"deviceId": 1,
"location": "Milan",
"building": "A01",
"minThreshold": 20.0,
"maxThreshold": 50.0
},
{
"deviceId": 2,
"location": "Milan",
"building": "A01",
"minThreshold": 20.0,
"maxThreshold": 50.0
},
{
"deviceId": 3,
"location": "Milan",
"building": "A01",
"minThreshold": 20.0,
"maxThreshold": 50.0
},
{
"deviceId": 4,
"location": "Milan",
"building": "A01",
"minThreshold": 20.0,
"maxThreshold": 50.0
},
{
"deviceId": 5,
"location": "Milan",
"building": "A02",
"minThreshold": 20.0,
"maxThreshold": 40.0
}
]
First Query
In the first query, I calculate how many events have been received from each device in a given time window.
-- Find devices that it's at least 1 minute that don't send a message
WITH
OneEvent AS /* generate one event per period, any event */
(
SELECT COUNT(*) As eventCount
FROM DeviceEvents TIMESTAMP BY DeviceEvents.[timestamp]
GROUP BY TumblingWindow(s, 60)
),
AllDevices AS /* generate one event per deviceId per period */
(
SELECT ReferenceData.deviceId
FROM OneEvent JOIN ReferenceData
ON OneEvent.eventCount - OneEvent.eventCount = ReferenceData.deviceId - ReferenceData.deviceId
),
ActualCounts AS /* compute how many events have been received in the time window from each device */
(
SELECT deviceId,
COUNT(*) AS eventCount
FROM DeviceEvents TIMESTAMP BY DeviceEvents.[timestamp]
GROUP BY TumblingWindow(s, 60), deviceId
)
/* left join AllDevices with ActualCounts to find devices with zero events */
SELECT
AllDevices.deviceId,
CASE WHEN ActualCounts.eventCount IS NULL THEN 0
ELSE ActualCounts.eventCount
END AS eventCount,
System.Timestamp AS [timestamp]
FROM
AllDevices LEFT JOIN ActualCounts
ON
ActualCounts.deviceId = AllDevices.deviceId
AND DATEDIFF(ms, ActualCounts, AllDevices) = 0
Test First Query
If you test the first query with using DeviceEvents.json and DeviceReferenceData.json as test files, you will get the following results. Please note, that in the figure below results are contained in 3 consecutive time windows that are highlighted in 3 different colors.
Second Query
In the second query, it’s sufficient to add a WHERE clause to project only the devices that didn’t produce any event in the given time window.
-- Find devices that it's at least 1 minute that don't send a message
WITH
OneEvent AS /* generate one event per period, any event */
(
SELECT COUNT(*) As eventCount
FROM DeviceEvents TIMESTAMP BY DeviceEvents.[timestamp]
GROUP BY TumblingWindow(s, 60)
),
AllDevices AS /* generate one event per deviceId per period */
(
SELECT ReferenceData.deviceId
FROM OneEvent JOIN ReferenceData
ON OneEvent.eventCount - OneEvent.eventCount = ReferenceData.deviceId - ReferenceData.deviceId
),
ActualCounts AS /* compute how many events have been received in the time window from each device */
(
SELECT deviceId,
COUNT(*) AS eventCount
FROM DeviceEvents TIMESTAMP BY DeviceEvents.[timestamp]
GROUP BY TumblingWindow(s, 60), deviceId
)
/* left join AllDevices with ActualCounts to find devices with zero events */
SELECT
AllDevices.deviceId,
CASE WHEN ActualCounts.eventCount IS NULL THEN 0
ELSE ActualCounts.eventCount
END AS eventCount,
System.Timestamp AS [timestamp]
FROM
AllDevices LEFT JOIN ActualCounts
ON
ActualCounts.deviceId = AllDevices.deviceId
AND DATEDIFF(ms, ActualCounts, AllDevices) = 0
WHERE ActualCounts.eventCount IS NULL
Test Second Query
If you test the first query with using DeviceEvents.json and DeviceReferenceData.json as test files, you will get the following results. Please note, that in the figure below results are contained in 3 consecutive time windows that are highlighted in 3 different colors.
Architecture Design
The following picture shows the architecture design of the demo. You can use my tool, Service Bus Explorer, to send device events to the input Event Hub and receive results from the output Event Hub.
Inputs:
DeviceEvents: Event Hub Data Stream
ReferenceData: Blob Reference Data
Outputs:
- Output: Event Hub Output
Provisoning
You can use the following ARM template and parameters file to create the Stream Analytics job. Note: this script creates only the Stream Analytics job and not the Event Hubs and Storage Account hosting the blob used as reference data. You can extend the ARM template to add the creation of the Event Hubs, storage account and container hosting the blob file. Make sure to copy the DeviceReferenceData.json to the container at the end of the provisioning process.
azuredeploy.json
{
"$schema": "https://schema.management.azure.com/schemas/2015-01-01/deploymentTemplate.json",
"contentVersion": "1.0.0.0",
"parameters": {
"jobName": {
"type": "string",
"metadata": {
"description": "Name of the Stream Analytics job",
"defaultValue": "IoTDeviceDemo"
}
},
"jobLocation": {
"type": "string",
"metadata": {
"description": "Location of the Stream Analytics job",
"defaultValue": "West Europe"
}
},
"referenceStorageAccountName": {
"type": "string",
"metadata": {
"description": "Name of the Storage Account containing the reference data blob"
}
},
"referenceStorageAccountKey": {
"type": "string",
"metadata": {
"description": "Key of the Storage Account containing the reference data blob"
}
},
"referenceContainerName": {
"type": "string",
"metadata": {
"description": "Name of the container of the reference data blob",
"defaultValue": "devices"
}
},
"referenceBlobName": {
"type": "string",
"metadata": {
"description": "Name of the reference data blob",
"defaultValue": "DeviceReferenceData.json"
}
},
"inputServiceBusNamespace": {
"type": "string",
"metadata": {
"description": "Name of the Service Bus namespace containing the input event hub"
}
},
"inputEventHubName": {
"type": "string",
"metadata": {
"description": "Name of the input event hub"
}
},
"inputEventHubConsumerGroupName": {
"type": "string",
"metadata": {
"description": "Name of the consumer group used by the job to read data out of the input event hub",
"defaultValue": "$Default"
}
},
"inputEventHubSharedAccessPolicyName": {
"type": "string",
"metadata": {
"description": "Name of the Shared Access Policy of the input event hub"
}
},
"inputEventHubSharedAccessPolicyKey": {
"type": "string",
"metadata": {
"description": "Key of the Shared Access Policy of the input event hub"
}
},
"outputServiceBusNamespace": {
"type": "string",
"metadata": {
"description": "Name of the Service Bus namespace containing the output event hub"
}
},
"outputEventHubName": {
"type": "string",
"metadata": {
"description": "Name of the output event hub"
}
},
"outputEventHubPartitionKey": {
"type": "string",
"metadata": {
"description": "Name of the payload property used as partition key"
}
},
"outputEventHubSharedAccessPolicyName": {
"type": "string",
"metadata": {
"description": "Name of the Shared Access Policy of the output event hub"
}
},
"outputEventHubSharedAccessPolicyKey": {
"type": "string",
"metadata": {
"description": "Key of the Shared Access Policy of the output event hub"
}
}
},
"resources": [
{
"name": "[parameters('jobName')]",
"type": "Microsoft.StreamAnalytics/streamingjobs",
"apiVersion": "2015-04-01",
"location": "[parameters('jobLocation')]",
"properties": {
"sku": {
"name": "Standard"
},
"eventsOutOfOrderPolicy": "Adjust",
"eventsOutOfOrderMaxDelayInSeconds": 0,
"eventsLateArrivalMaxDelayInSeconds": 5,
"dataLocale": "en-US",
"inputs": [
{
"name": "DeviceEvents",
"type": "Microsoft.StreamAnalytics/streamingjobs/inputs",
"properties": {
"type": "Stream",
"datasource": {
"type": "Microsoft.ServiceBus/EventHub",
"properties": {
"eventHubName": "[parameters('inputEventHubName')]",
"consumerGroupName": "[parameters('inputEventHubConsumerGroupName')]",
"serviceBusNamespace": "[parameters('inputServiceBusNamespace')]",
"sharedAccessPolicyName": "[parameters('inputEventHubSharedAccessPolicyName')]",
"sharedAccessPolicyKey": "[parameters('inputEventHubSharedAccessPolicyKey')]"
}
},
"serialization": {
"type": "Json",
"properties": {
"encoding": "UTF8"
}
}
}
},
{
"name": "ReferenceData",
"type": "Microsoft.StreamAnalytics/streamingjobs/inputs",
"properties": {
"type": "Reference",
"datasource": {
"type": "Microsoft.Storage/Blob",
"properties": {
"blobName": "[parameters('referenceBlobName')]",
"storageAccounts": [
{
"accountName": "[parameters('referenceStorageAccountName')]",
"accountKey": "[parameters('referenceStorageAccountKey')]"
}
],
"container": "[parameters('referenceContainerName')]",
"pathPattern": "[parameters('referenceBlobName')]"
}
},
"serialization": {
"type": "Json",
"properties": {
"encoding": "UTF8"
}
}
}
}
],
"transformation": {
"name": "DeviceDemo",
"type": "Microsoft.StreamAnalytics/streamingjobs/transformations",
"properties": {
"streamingUnits": 6,
"query": "-- Find devices that it's at least 1 minute that don't send a message\r\nWITH\r\n
OneEvent AS /* generate one event per period, any event */\r\n(\r\n
SELECT COUNT(*) As eventCount\r\n FROM DeviceEvents TIMESTAMP BY
DeviceEvents.[timestamp]\r\n GROUP BY TumblingWindow(s, 60)\r\n),
\r\nAllDevices AS /* generate one event per deviceId per period */\r\n(\r\n
SELECT ReferenceData.deviceId\r\n
FROM OneEvent JOIN ReferenceData\r\n ON OneEvent.eventCount -
OneEvent.eventCount = ReferenceData.deviceId - ReferenceData.deviceId\r\n),
\r\nActualCounts AS /* compute how many events have been received in the time
window from each device */\r\n(\r\n SELECT deviceId,
\r\n COUNT(*) AS eventCount\r\n
FROM DeviceEvents TIMESTAMP BY DeviceEvents.[timestamp]\r\n
GROUP BY TumblingWindow(s, 60), deviceId\r\n)\r\n/* left join AllDevices with
ActualCounts to find devices with zero events */\r\nSELECT\r\n
AllDevices.deviceId,\r\n CASE WHEN ActualCounts.eventCount IS NULL THEN 0
\r\n ELSE ActualCounts.eventCount\r\n END AS eventCount,
\r\n System.Timestamp AS [timestamp]\r\nFROM\r\nAllDevices LEFT JOIN
ActualCounts\r\nON\r\nActualCounts.deviceId = AllDevices.deviceId\r\nAND
DATEDIFF(ms, ActualCounts, AllDevices) = 0\r\nWHERE ActualCounts.eventCount IS NULL"
}
},
"outputs": [
{
"name": "Output",
"type": "Microsoft.StreamAnalytics/streamingjobs/outputs",
"properties": {
"datasource": {
"type": "Microsoft.ServiceBus/EventHub",
"properties": {
"eventHubName": "[parameters('outputEventHubName')]",
"partitionKey": "[parameters('outputEventHubPartitionKey')]",
"serviceBusNamespace": "[parameters('outputServiceBusNamespace')]",
"sharedAccessPolicyName": "[parameters('outputEventHubSharedAccessPolicyName')]",
"sharedAccessPolicyKey": "[parameters('outputEventHubSharedAccessPolicyKey')]"
}
},
"serialization": {
"type": "Json",
"properties": {
"encoding": "UTF8"
}
}
}
}
]
}
}
]
}
azuredeploy-parameters.json
{
"jobName": {
"value": "AbsenceOfSignal"
},
"jobLocation": {
"value": "<job-location>"
},
"referenceStorageAccountName": {
"value": "<storage-account-name>"
},
"referenceStorageAccountKey": {
"value": "<storage-account-key>"
},
"referenceContainerName": {
"value": "<container-name>"
},
"referenceBlobName": {
"value": "DeviceReferenceData.json"
},
"inputServiceBusNamespace": {
"value": "<input-event-hub-servicebus-namespace>"
},
"inputEventHubName": {
"value": "<input-event-hub-name>"
},
"inputEventHubConsumerGroupName": {
"value": "<input-event-hub-consumergroup-name>"
},
"inputEventHubSharedAccessPolicyName": {
"value": "<input-event-hub-shared-access-policy-name>"
},
"inputEventHubSharedAccessPolicyKey": {
"value": "<input-event-hub-shared-access-policy-key>"
},
"outputServiceBusNamespace": {
"value": "<output-event-hub-servicebus-namespace>"
},
"outputEventHubName": {
"value": "<output-event-hub-name>"
},
"outputEventHubPartitionKey": {
"value": "deviceId"
},
"outputEventHubSharedAccessPolicyName": {
"value": "<output-event-hub-shared-access-policy-name>"
},
"outputEventHubSharedAccessPolicyKey": {
"value": "<output-event-hub-shared-access-policy-key>"
}
}
Note: make sure that all the resources used by the solution are located in the same datacenter.
PowerShell Script
Use the following PowerShell Script to create the Stream Analytics job.
# To login to Azure Resource Manager
Login-AzureRmAccount
# Select a default subscription for your current session in case your account has multiple Azure subscriptions
Get-AzureRmSubscription –SubscriptionName “Paolo's Azure Account” | Select-AzureRmSubscription
# Initialize Variables
$resourceGroupName = 'StreamAnalyticsWestEuropeResourceGroup'
$deploymentName = 'StreamAnalyticsIoTDemo'
$location = "West Europe"
$templateFolderName = 'C:\Projects\Azure\StreamAnalytics\TestFiles\ARM\'
$templateFile = $templateFolderName + 'azuredeploy.json'
$templateParameterFile = $templateFolderName + 'azuredeploy-parameters.json'
# Check if the resource group for the Stream Analytics job already exists
$resourceGroup = Get-AzureRmResourceGroup -Name $resourceGroupName `
-ErrorAction Ignore
if (!$resourceGroup)
{
# Create the resource group if it does not exists
Write-Host 'Creating' $resourceGroupName 'resource group...'
$stopWatch = [Diagnostics.Stopwatch]::StartNew()
New-AzureRmResourceGroup -Name $resourceGroupName `
-Location $location `
-ErrorAction Stop
$stopWatch.Stop()
Write-Host 'The resource group' $resourceGroupName ' has been successfully created in' $stopWatch.Elapsed.TotalSeconds 'seconds'
}
else
{
Write-Host 'The resource group' $resourceGroupName 'already exists'
}
# Check if a deployment with the specified name already exists
$deployment = Get-AzureRmResourceGroupDeployment -Name $deploymentName `
-ResourceGroupName $resourceGroupName
if ($deployment)
{
# Delete the resource group if it already exists
Write-Host 'The' $deploymentName 'deployment already exists. Deleting the deployment...'
$stopWatch = [Diagnostics.Stopwatch]::StartNew()
Remove-AzureRmResourceGroupDeployment -Name $deploymentName `
-ResourceGroupName $resourceGroupName `
-Force `
-ErrorAction Stop
$stopWatch.Stop()
Write-Host 'The deployment' $deploymentName ' has been successfully deleted in ' $stopWatch.Elapsed.TotalSeconds 'seconds'
}
# Create the resource group deployment
Write-Host 'Adding deployment' $deploymentName 'to' $resourceGroupName 'resource group...'
$stopWatch = [Diagnostics.Stopwatch]::StartNew()
New-AzureRmResourceGroupDeployment -Name $deploymentName `
-ResourceGroupName $resourceGroupName `
-TemplateFile $templateFile `
-TemplateParameterFile $templateParameterFile `
-ErrorAction Stop
$stopWatch.Stop()
$deployment = Get-AzureRmResourceGroupDeployment -ResourceGroupName $resourceGroupName `
-Name $deploymentName `
-ErrorAction Stop
Write-Host 'Operation Complete'
Write-Host '=================='
Write-Host 'Provisioning State :' $deployment[0].ProvisioningState
Write-Host 'Time elapsed :' $stopWatch.Elapsed.TotalSeconds 'seconds'
Comments
Anonymous
December 22, 2015
Thanks for sharing PowerShell script and well explained article.Anonymous
January 26, 2016
"The second step joins reference data with the result of the first step to turn reference data into a temporal data stream. This way I can bring reference data on the left of a JOIN clause. Please note the use of the ON clause. This is sort of magic trick to correlate the event produced by the first query with the reference data." Could you elaborate on the magic trick? In my scenario deviceId is not a numeric value but a string. Thus the - operation is not possible on such a data type Would such an ON clause do the trick? ON OneEvent.eventCount - OneEvent.eventCount = len(ReferenceData.deviceId) - len(ReferenceData.deviceId)Anonymous
January 26, 2016
Hi Tony did you try to use the CAST operator (see msdn.microsoft.com/.../dn834995.aspx) to convert the type of the deviceId field? Ciao PaoloAnonymous
January 26, 2016
Hi Paolo, Thanks for getting back to me, unfortunately our deviceid contains both numbers and letters so casting to a numeric value is not possible. rgdsAnonymous
January 26, 2016
Can you can CAST and transform (using ASA functions) both the field from lookup data and the property from the actual message to adopt to the same format?