How to find absence of signal in a Stream Analytics job

Introduction

Recently, I worked at an IoT solution where the customer had to build a Stream Analytics query to check when no data has arrived from a given device in a configurable time window. To solve this problem, the idea is to correlate the data stream containing real-time events (e.g. sensor readings) with device reference data. In this context, real-time data comes through a data stream input that uses the Event Hub or IoT Hub provider for telemetry data ingestion, whereas the reference data is defined as a blob which contains the metadata of all the devices In the sample below, JSON format is used both by the data stream and reference data, but you can use another format between those supported by Stream Analytics providers.. Now, in order to find all the devices that didn’t send a message within a given time window, reference data input should appear in the Stream Analytics query as the left element of the LEFT JOIN clause. But Stream Analytics does not support Reference Data as the left element in a JOIN clause L So I had to tweak the code to bring Reference Data on the left part. However, at least at the time of this writing, Stream Analytics does not support Reference Data as the left element in a JOIN clause. So I had to tweak the code to bring reference data on the left part. This article describes this technique. Basically, you have to create a multi-step query as follows:

  1. The first step in the query below simply creates a single event every N seconds (60 in my sample).
  2. The second step joins reference data with the result of the first step to turn reference data into a temporal data stream. This way I can bring reference data on the left of a JOIN clause. Please note the use of the ON clause. This is sort of magic trick to correlate the event produced by the first query with the reference data.
  3. The third step calculates the event count for each device in the given time window. Note: the duration of the time window needs to be equal to the duration of the time window used by the first step.
  4. The final SELECT applies a LEFT JOIN to the output of the second step and the output of third step. Since the second step was obtained from reference data, which contains all the devices, and the reference data appears as the left element of a LEFT JOIN, the query will analyze all the devices, not only those that sent at least a message in the current time window.

Test Files

This section contains the test files used on the QUERY tab on the Azure Management Portal to test the query.

DeviceEvents.Json

 [
    {
        "deviceId":1,
        "name":"device001",
        "value": 5,
        "status": "active",
        "timestamp": "2015-12-16T10:00:00.0000000Z"
    },
    {
        "deviceId":3,
        "name":"device003",
        "value":23,
        "status": "active",
        "timestamp": "2015-12-16T10:00:05.0000000Z"
    },
    {
        "deviceId":5,
        "name":"device005",
        "value":17,
        "status": "active",
        "timestamp": "2015-12-16T10:00:20.0000000Z"
    },
    {
        "deviceId":1,
        "name":"device001",
        "value":8,
        "status": "active",
        "timestamp": "2015-12-16T10:00:30.0000000Z"
    },
    {
        "deviceId":3,
        "name":"device003",
        "value":22,
        "status": "active",
        "timestamp": "2015-12-16T10:00:35.0000000Z"
    },
    {
        "deviceId":4,
        "name":"device004",
        "value": 18,
        "status": "active",
        "timestamp": "2015-12-16T10:00:36.0000000Z"
    },
    {
        "deviceId":1,
        "name":"device001",
        "value":65,
        "status": "active",
        "timestamp": "2015-12-16T10:01:00.0000000Z"
    },
    {
        "deviceId":3,
        "name":"device003",
        "value":28,
        "status": "active",
        "timestamp": "2015-12-16T10:01:05.0000000Z"
    },
    {
        "deviceId":5,
        "name":"device005",
        "value":3,
        "status": "active",
        "timestamp": "2015-12-16T10:01:15.0000000Z"
    },
    {
        "deviceId":1,
        "name":"device001",
        "value":54,
        "status": "active",
        "timestamp": "2015-12-16T10:01:30.0000000Z"
    },
    {
        "deviceId":3,
        "name":"device003",
        "value":43,
        "status": "active",
        "timestamp": "2015-12-16T10:01:35.0000000Z"
    }
]

Remarks:

  • Please look at the timestamp of the events: events are contained in about 2 minutes time range. Hence, they are segmented in 3 consecutive time windows of 60 seconds each.
  • Device with deviceId = 2 does not produce any events
  • Device with deviceId = 4 produces a single event in the second time window

DeviceReferenceData.json

This file needs to be copied to blob storage and used as reference data. It contains a record for each device.

 [
    {
        "deviceId": 1,
        "location": "Milan",
        "building": "A01",
        "minThreshold": 20.0,
        "maxThreshold": 50.0
    },
    {
        "deviceId": 2,
        "location": "Milan",
        "building": "A01",
        "minThreshold": 20.0,
        "maxThreshold": 50.0
    },
    {
        "deviceId": 3,
        "location": "Milan",
        "building": "A01",
        "minThreshold": 20.0,
        "maxThreshold": 50.0
    },
    {
        "deviceId": 4,
        "location": "Milan",
        "building": "A01",
        "minThreshold": 20.0,
        "maxThreshold": 50.0
    },
    {
        "deviceId": 5,
        "location": "Milan",
        "building": "A02",
        "minThreshold": 20.0,
        "maxThreshold": 40.0
    }
]

First Query

In the first query, I calculate how many events have been received from each device in a given time window.

 -- Find devices that it's at least 1 minute that don't send a message
WITH
OneEvent AS /* generate one event per period, any event */
(
                SELECT COUNT(*) As eventCount
                FROM DeviceEvents TIMESTAMP BY DeviceEvents.[timestamp]
                GROUP BY TumblingWindow(s, 60)
),
AllDevices AS /* generate one event per deviceId per period */
(
                SELECT ReferenceData.deviceId
                FROM OneEvent JOIN ReferenceData
                ON OneEvent.eventCount - OneEvent.eventCount = ReferenceData.deviceId - ReferenceData.deviceId
),
ActualCounts AS /* compute how many events have been received in the time window from each device */
(
                SELECT deviceId,
                       COUNT(*) AS eventCount
                FROM DeviceEvents TIMESTAMP BY DeviceEvents.[timestamp]
                GROUP BY TumblingWindow(s, 60), deviceId
)
/* left join AllDevices with ActualCounts to find devices with zero events */
SELECT
      AllDevices.deviceId,
      CASE WHEN ActualCounts.eventCount IS NULL THEN 0
           ELSE ActualCounts.eventCount
      END AS eventCount,
      System.Timestamp AS [timestamp]
FROM
AllDevices LEFT JOIN ActualCounts
ON
ActualCounts.deviceId = AllDevices.deviceId
AND DATEDIFF(ms, ActualCounts, AllDevices) = 0

Test First Query

If you test the first query with using DeviceEvents.json and DeviceReferenceData.json as test files, you will get the following results. Please note, that in the figure below results are contained in 3 consecutive time windows that are highlighted in 3 different colors.

image1

Second Query

In the second query, it’s sufficient to add a WHERE clause to project only the devices that didn’t produce any event in the given time window.

 -- Find devices that it's at least 1 minute that don't send a message
WITH
OneEvent AS /* generate one event per period, any event */
(
                SELECT COUNT(*) As eventCount
                FROM DeviceEvents TIMESTAMP BY DeviceEvents.[timestamp]
                GROUP BY TumblingWindow(s, 60)
),
AllDevices AS /* generate one event per deviceId per period */
(
                SELECT ReferenceData.deviceId
                FROM OneEvent JOIN ReferenceData
                ON OneEvent.eventCount - OneEvent.eventCount = ReferenceData.deviceId - ReferenceData.deviceId
),
ActualCounts AS /* compute how many events have been received in the time window from each device */
(
                SELECT deviceId,
                       COUNT(*) AS eventCount
                FROM DeviceEvents TIMESTAMP BY DeviceEvents.[timestamp]
                GROUP BY TumblingWindow(s, 60), deviceId
)
/* left join AllDevices with ActualCounts to find devices with zero events */
SELECT
      AllDevices.deviceId,
      CASE WHEN ActualCounts.eventCount IS NULL THEN 0
           ELSE ActualCounts.eventCount
      END AS eventCount,
      System.Timestamp AS [timestamp]
FROM
AllDevices LEFT JOIN ActualCounts
ON
ActualCounts.deviceId = AllDevices.deviceId
AND DATEDIFF(ms, ActualCounts, AllDevices) = 0
WHERE ActualCounts.eventCount IS NULL

Test Second Query

If you test the first query with using DeviceEvents.json and DeviceReferenceData.json as test files, you will get the following results. Please note, that in the figure below results are contained in 3 consecutive time windows that are highlighted in 3 different colors.

Image2

Architecture Design

The following picture shows the architecture design of the demo. You can use my tool, Service Bus Explorer, to send device events to the input Event Hub and receive results from the output Event Hub.

Architecture

Inputs:

  • DeviceEvents: Event Hub Data Stream

  • ReferenceData: Blob Reference Data

 

Outputs:

  • Output: Event Hub Output

Provisoning

You can use the following ARM template and parameters file to create the Stream Analytics job. Note: this script creates only the Stream Analytics job and not the Event Hubs and Storage Account hosting the blob used as reference data. You can extend the ARM template to add the creation of the Event Hubs, storage account and container hosting the blob file. Make sure to copy the DeviceReferenceData.json to the container at the end of the provisioning process.

azuredeploy.json

 {
    "$schema": "https://schema.management.azure.com/schemas/2015-01-01/deploymentTemplate.json",
 
    "contentVersion": "1.0.0.0",
 
    "parameters": {
        "jobName": {
            "type": "string",
            "metadata": {
                "description": "Name of the Stream Analytics job",
                "defaultValue": "IoTDeviceDemo"
            }
        },
        "jobLocation": {
            "type": "string",
            "metadata": {
                "description": "Location of the Stream Analytics job",
                "defaultValue": "West Europe"
            }
        },
         "referenceStorageAccountName": {
            "type": "string",
            "metadata": {
                "description": "Name of the Storage Account containing the reference data blob"
            }
        },
        "referenceStorageAccountKey": {
            "type": "string",
            "metadata": {
                "description": "Key of the Storage Account containing the reference data blob"
            }
        },
        "referenceContainerName": {
            "type": "string",
            "metadata": {
                "description": "Name of the container of the reference data blob",
                "defaultValue": "devices"
            }
        },
        "referenceBlobName": {
            "type": "string",
            "metadata": {
                "description": "Name of the reference data blob",
                "defaultValue": "DeviceReferenceData.json"
            }
        },
        "inputServiceBusNamespace": {
            "type": "string",
            "metadata": {
                "description": "Name of the Service Bus namespace containing the input event hub"
            }
        },
        "inputEventHubName": {
            "type": "string",
            "metadata": {
                "description": "Name of the input event hub"
            }
        },
        "inputEventHubConsumerGroupName": {
            "type": "string",
            "metadata": {
                "description": "Name of the consumer group used by the job to read data out of the input event hub",
                "defaultValue": "$Default"
            }
        },
        "inputEventHubSharedAccessPolicyName": {
            "type": "string",
            "metadata": {
                "description": "Name of the Shared Access Policy of the input event hub"
            }
        },
        "inputEventHubSharedAccessPolicyKey": {
            "type": "string",
            "metadata": {
                "description": "Key of the Shared Access Policy of the input event hub"
            }
        },
        "outputServiceBusNamespace": {
            "type": "string",
            "metadata": {
                "description": "Name of the Service Bus namespace containing the output event hub"
            }
        },
        "outputEventHubName": {
            "type": "string",
            "metadata": {
                "description": "Name of the output event hub"
            }
        },
        "outputEventHubPartitionKey": {
            "type": "string",
            "metadata": {
                "description": "Name of the payload property used as partition key"
            }
        },
        "outputEventHubSharedAccessPolicyName": {
            "type": "string",
            "metadata": {
                "description": "Name of the Shared Access Policy of the output event hub"
            }
        },
        "outputEventHubSharedAccessPolicyKey": {
            "type": "string",
            "metadata": {
                "description": "Key of the Shared Access Policy of the output event hub"
            }
        }
    },
    "resources": [
        {
            "name": "[parameters('jobName')]",
            "type": "Microsoft.StreamAnalytics/streamingjobs",
            "apiVersion": "2015-04-01",
            "location": "[parameters('jobLocation')]",
            "properties": {
                "sku": {
                    "name": "Standard"
                },
                "eventsOutOfOrderPolicy": "Adjust",
                "eventsOutOfOrderMaxDelayInSeconds": 0,
                "eventsLateArrivalMaxDelayInSeconds": 5,
                "dataLocale": "en-US",
                "inputs": [
                    {
                        "name": "DeviceEvents",
                        "type": "Microsoft.StreamAnalytics/streamingjobs/inputs",
                        "properties": {
                            "type": "Stream",
                            "datasource": {
                                "type": "Microsoft.ServiceBus/EventHub",
                                "properties": {
                                    "eventHubName": "[parameters('inputEventHubName')]",
                                    "consumerGroupName": "[parameters('inputEventHubConsumerGroupName')]",
                                    "serviceBusNamespace": "[parameters('inputServiceBusNamespace')]",
                                    "sharedAccessPolicyName": "[parameters('inputEventHubSharedAccessPolicyName')]",
                                    "sharedAccessPolicyKey": "[parameters('inputEventHubSharedAccessPolicyKey')]"
                                }
                            },
                            "serialization": {
                                "type": "Json",
                                "properties": {
                                    "encoding": "UTF8"
                                }
                            }
                        }
                    },
                    {
                        "name": "ReferenceData",
                        "type": "Microsoft.StreamAnalytics/streamingjobs/inputs",
                        "properties": {
                            "type": "Reference",
                            "datasource": {
                                "type": "Microsoft.Storage/Blob",
                                "properties": {
                                    "blobName": "[parameters('referenceBlobName')]",
                                    "storageAccounts": [
                                        {
                                            "accountName": "[parameters('referenceStorageAccountName')]",
                                            "accountKey": "[parameters('referenceStorageAccountKey')]"
                                        }
                                    ],
                                    "container": "[parameters('referenceContainerName')]",
                                    "pathPattern": "[parameters('referenceBlobName')]"
                                }
                            },
                            "serialization": {
                                "type": "Json",
                                "properties": {
                                    "encoding": "UTF8"
                                }
                            }
                        }
                    }
                ],
                "transformation": {
                    "name": "DeviceDemo",
                    "type": "Microsoft.StreamAnalytics/streamingjobs/transformations",
                    "properties": {
                        "streamingUnits": 6,
                        "query": "-- Find devices that it's at least 1 minute that don't send a message\r\nWITH\r\n
                                  OneEvent AS /* generate one event per period, any event */\r\n(\r\n                
                                  SELECT COUNT(*) As eventCount\r\n                FROM DeviceEvents TIMESTAMP BY 
                                  DeviceEvents.[timestamp]\r\n                GROUP BY TumblingWindow(s, 60)\r\n),
                                  \r\nAllDevices AS /* generate one event per deviceId per period */\r\n(\r\n
                                                  SELECT ReferenceData.deviceId\r\n                
                                  FROM OneEvent JOIN ReferenceData\r\n                ON OneEvent.eventCount - 
                                  OneEvent.eventCount = ReferenceData.deviceId - ReferenceData.deviceId\r\n),
                                  \r\nActualCounts AS /* compute how many events have been received in the time 
                                  window from each device */\r\n(\r\n                SELECT deviceId,
                                  \r\n                       COUNT(*) AS eventCount\r\n                
                                  FROM DeviceEvents TIMESTAMP BY DeviceEvents.[timestamp]\r\n                
                                  GROUP BY TumblingWindow(s, 60), deviceId\r\n)\r\n/* left join AllDevices with 
                                  ActualCounts to find devices with zero events */\r\nSELECT\r\n      
                                  AllDevices.deviceId,\r\n      CASE WHEN ActualCounts.eventCount IS NULL THEN 0
                                  \r\n           ELSE ActualCounts.eventCount\r\n      END AS eventCount,
                                  \r\n      System.Timestamp AS [timestamp]\r\nFROM\r\nAllDevices LEFT JOIN 
                                  ActualCounts\r\nON\r\nActualCounts.deviceId = AllDevices.deviceId\r\nAND 
                                  DATEDIFF(ms, ActualCounts, AllDevices) = 0\r\nWHERE ActualCounts.eventCount IS NULL"
                    }
                },
                "outputs": [
                    {
                        "name": "Output",
                        "type": "Microsoft.StreamAnalytics/streamingjobs/outputs",
                        "properties": {
                            "datasource": {
                                "type": "Microsoft.ServiceBus/EventHub",
                                "properties": {
                                    "eventHubName": "[parameters('outputEventHubName')]",
                                    "partitionKey": "[parameters('outputEventHubPartitionKey')]",
                                    "serviceBusNamespace": "[parameters('outputServiceBusNamespace')]",
                                    "sharedAccessPolicyName": "[parameters('outputEventHubSharedAccessPolicyName')]",
                                    "sharedAccessPolicyKey": "[parameters('outputEventHubSharedAccessPolicyKey')]"
                                }
                            },
                            "serialization": {
                                "type": "Json",
                                "properties": {
                                    "encoding": "UTF8"
                                }
                            }
                        }
                    }
                ]
            }
        }
    ]
}

azuredeploy-parameters.json

 {
    "jobName": {
        "value": "AbsenceOfSignal"
    },
    "jobLocation": {
        "value": "<job-location>"
    },
     "referenceStorageAccountName": {
        "value": "<storage-account-name>"
    },
    "referenceStorageAccountKey": {
        "value": "<storage-account-key>"
    },
    "referenceContainerName": {
        "value": "<container-name>"
    },
    "referenceBlobName": {
        "value": "DeviceReferenceData.json"
    },
    "inputServiceBusNamespace": {
        "value": "<input-event-hub-servicebus-namespace>"
    },
    "inputEventHubName": {
        "value": "<input-event-hub-name>"
    },
    "inputEventHubConsumerGroupName": {
        "value": "<input-event-hub-consumergroup-name>"
    },
    "inputEventHubSharedAccessPolicyName": {
        "value": "<input-event-hub-shared-access-policy-name>"
    },
    "inputEventHubSharedAccessPolicyKey": {
        "value": "<input-event-hub-shared-access-policy-key>"
    },
    "outputServiceBusNamespace": {
        "value": "<output-event-hub-servicebus-namespace>"
    },
    "outputEventHubName": {
        "value": "<output-event-hub-name>"
    },
    "outputEventHubPartitionKey": {
        "value": "deviceId"
    },
    "outputEventHubSharedAccessPolicyName": {
        "value": "<output-event-hub-shared-access-policy-name>"
    },
    "outputEventHubSharedAccessPolicyKey": {
        "value": "<output-event-hub-shared-access-policy-key>"
    }
}

Note: make sure that all the resources used by the solution are located in the same datacenter.

PowerShell Script

Use the following PowerShell Script to create the Stream Analytics job.

 # To login to Azure Resource Manager
Login-AzureRmAccount

# Select a default subscription for your current session in case your account has multiple Azure subscriptions
Get-AzureRmSubscription –SubscriptionName “Paolo's Azure Account” | Select-AzureRmSubscription

# Initialize Variables
$resourceGroupName = 'StreamAnalyticsWestEuropeResourceGroup'
$deploymentName = 'StreamAnalyticsIoTDemo'
$location = "West Europe"
$templateFolderName = 'C:\Projects\Azure\StreamAnalytics\TestFiles\ARM\'
$templateFile = $templateFolderName + 'azuredeploy.json'
$templateParameterFile = $templateFolderName + 'azuredeploy-parameters.json'

# Check if the resource group for the Stream Analytics job already exists
$resourceGroup = Get-AzureRmResourceGroup -Name $resourceGroupName `
                                          -ErrorAction Ignore

if (!$resourceGroup)
{
    # Create the resource group if it does not exists
    Write-Host 'Creating' $resourceGroupName 'resource group...'
    $stopWatch = [Diagnostics.Stopwatch]::StartNew()
    New-AzureRmResourceGroup -Name $resourceGroupName `
                             -Location $location `
                             -ErrorAction Stop
    $stopWatch.Stop()
    Write-Host 'The resource group' $resourceGroupName ' has been successfully created in' $stopWatch.Elapsed.TotalSeconds 'seconds'
}
else
{
    Write-Host 'The resource group' $resourceGroupName 'already exists'
}

# Check if a deployment with the specified name already exists
$deployment = Get-AzureRmResourceGroupDeployment -Name $deploymentName `
                                                 -ResourceGroupName $resourceGroupName

if ($deployment)
{
    # Delete the resource group if it already exists
    Write-Host 'The' $deploymentName 'deployment already exists. Deleting the deployment...'
    $stopWatch = [Diagnostics.Stopwatch]::StartNew()
    Remove-AzureRmResourceGroupDeployment -Name $deploymentName `
                                          -ResourceGroupName $resourceGroupName `
                                          -Force `
                                          -ErrorAction Stop
    $stopWatch.Stop()
    Write-Host 'The deployment' $deploymentName ' has been successfully deleted in ' $stopWatch.Elapsed.TotalSeconds 'seconds'
}

# Create the resource group deployment
Write-Host 'Adding deployment' $deploymentName 'to' $resourceGroupName 'resource group...'
$stopWatch = [Diagnostics.Stopwatch]::StartNew()
New-AzureRmResourceGroupDeployment -Name $deploymentName `
                                   -ResourceGroupName $resourceGroupName `
                                   -TemplateFile $templateFile `
                                   -TemplateParameterFile $templateParameterFile `
                                   -ErrorAction Stop
$stopWatch.Stop()
$deployment = Get-AzureRmResourceGroupDeployment -ResourceGroupName $resourceGroupName `
                                                 -Name $deploymentName `
                                                 -ErrorAction Stop
Write-Host 'Operation Complete'
Write-Host '=================='
Write-Host 'Provisioning State :' $deployment[0].ProvisioningState
Write-Host 'Time elapsed :' $stopWatch.Elapsed.TotalSeconds 'seconds'