Using Snowflake on Azure for Querying Azure Event Hubs Capture Avro Files


In this video, we look at how to use Snowflake on Azure to query Avro files generated by Azure Event Hubs Capture feature. In our example, we'll create Azure Blob Storage account and configure Azure Event Grid to send blob creation and deletion events to an Azure Event Hub and Azure Storage Queue simultaneously, and then use Snowflake on Azure to parse and query the Avro files generated by the capture.

Video Walkthrough

Tip: Play the video full screen.

Table of Contents

00:00 Beginning of video
01:58 Create resource group, storage account for files, and storage queue
03:15 View storage account in Microsoft Storage Explorer
04:15 Create Azure Event Hubs Namespace and Event Hub
05:50 Create Event Grid subscription blob2queue using Azure CLI
06:45 Create Event Grid subscription blob2eventhub Azure Portal
08:40 Uploading files to Azure Blob Storage using Azure CLI upload-batch
10:40 Viewing messages in the queue
12:10 Using Snowflake on Azure worksheet
13:28 Create Snowflake stage pointing to the Azure Blob Storage container
17:20 Querying number of records in Avro files
20:40 Decoding message body
23:00 Inserting decoded data into Snowflake table
24:50 Querying captured events using Snowflake JSON parsing capability
27:45 Comparing events captured in queue with ones captured in Event Hub
29:30 Grouping query to view events by minute
30:25 Deleting files and querying deletion events
34:20 Comparing total number of events in queue vs Event Hub after deleting all files

Create Storage Account For Files and Queue

# Create resource group
az group create -n avehc1 -l eastus2

# Create storage account for upload files and for queue
az storage account create -g avehc1 -n avmyfiles1 --sku Standard_LRS -l eastus2 --kind StorageV2

# Create container
az storage container create -n myfiles --account-name avmyfiles1

# Create queue for events az storage queue create -n queue1 --account-name avmyfiles1

Create Azure Event Hub

# Create event hub namespace
az eventhubs namespace create -g avehc1 -n avehc1ns -l eastus2 --sku Standard

# Create event hub
az eventhubs eventhub create -g avehc1 -n avehc1 --namespace-name avehc1ns

# Create storage account and container for event hub capture files
az storage account create -g avehc1 -n avmycapture1 --sku Standard_LRS -l eastus2 --kind StorageV2 az storage container create -n mycapture --account-name avmycapture1

Create Event Grid Subscription

az eventgrid event-subscription create --resource-id /subscriptions/SUBSCRIPTION_ID/resourceGroups/avehc1/providers/Microsoft.Storage/storageAccounts/avmyfiles1 --name blob2queue --endpoint-type storagequeue --endpoint /subscriptions/SUSBSCRIPTION_ID/resourceGroups/avehc1/providers/Microsoft.Storage/storageAccounts/avmyfiles1/queueservices/default/queues/queue1

Upload Batch of Files to Generate Events

az storage blob upload-batch --account-name avmyfiles1 --destination myfiles --source /mnt/c/Python36 --pattern "*.*"

Snowflake Queries

use database TEST_DB;

create or replace file format av_avro_format
  type = 'AVRO'
  compression = 'NONE';
show file formats;

-- Create Snowflake stage pointing to the container with the captured Avro files
create or replace stage aveventgrid_capture
  url='azure://avmycapture1.blob.core.windows.net/mycapture'
  credentials=(azure_sas_token='?st=xxxxxxxxxxxxxxxxxxxxxxx')   file_format = av_avro_format;

-- List all Avro files
list @aveventgrid_capture;

-- Count records in all Avro files
select count(*) from @aveventgrid_capture;

-- Look at raw data in one Avro file
select * from @aveventgrid_capture/avehc1ns/avehc1/0/2018/12/27/01/38/26.avro;

-- Decode the body
select HEX_DECODE_STRING($1:Body) from @aveventgrid_capture/avehc1ns/avehc1/0/2018/12/27/01/38/26.avro;

-- Parse other fields of the Avro file
select HEX_DECODE_STRING($1:Body), TO_TIMESTAMP(REPLACE($1:EnqueuedTimeUtc,'""',''),'MM/DD/YYYY HH:MI:SS AM'), TO_NUMBER($1:Offset), $1:Properties, TO_NUMBER($1:SequenceNumber), $1:SystemProperties from @aveventgrid_capture/avehc1ns/avehc1/0/2018/12/27/01/38/26.avro;

-- Create table to store parsed Avro capture files
create or replace table aveventgrid_capture (   jsontext variant,
  eh_enqueued_time_utc timestamp_ntz,
  eh_offset int,
  eh_properties variant,
  eh_sequence_number int,
  eh_system_properties variant
);

-- Review the table which is initially empty
select * from aveventgrid_capture;

-- Load data from Avro files into the created Snowflake table copy into aveventgrid_capture (jsontext, eh_enqueued_time_utc, eh_offset, eh_properties, eh_sequence_number, eh_system_properties) from (   select HEX_DECODE_STRING($1:Body), TO_TIMESTAMP(REPLACE($1:EnqueuedTimeUtc,'""',''),'MM/DD/YYYY HH:MI:SS AM'), TO_NUMBER($1:Offset), $1:Properties, TO_NUMBER($1:SequenceNumber), $1:SystemProperties   from @aveventgrid_capture );

-- Review how lateral flatten works to break up a JSON array into individual records
select value from aveventgrid_capture, lateral flatten ( input => jsontext );

-- Query event grid blob storage events by parsing JSON using Snowflake’s built-in functions
select
    value:eventType::string as eventType,
    value:eventTime::timestamp as eventTime,
    value:subject::string as subject,
    value:id::string as id
from aveventgrid_capture, lateral flatten ( input => jsontext )
where not value:eventType::string is null;

-- Look for specific event id
select
    value:eventType::string as eventType,
    value:eventTime::timestamp as eventTime,
    value:subject::string as subject,
    value:id::string as id
from aveventgrid_capture, lateral flatten ( input => jsontext )
where value:id::string = '50eac4d4-e01e-00b5-5584-9da1d9063d9d';

-- Group events by minute and event type
select
    date_trunc('MINUTE',value:eventTime::timestamp) as eventTimeWindow,
    value:eventType::string as eventType,
    count(*) as eventCount
from aveventgrid_capture, lateral flatten ( input => jsontext )
where not value:eventType::string is null
group by eventTimeWindow, eventType
order by eventTimeWindow desc;

image

Thank you!

Please leave feedback and questions below or on Twitter https://twitter.com/ArsenVlad

Comments (0)

Skip to main content