Processing compressed event streams with Azure Stream Analytics


Did you know that Azure Stream Analytics now supports input data with compression data formats such as GZIP and Deflate streams? This is especially critical in scenarios where either bandwidth is limited, and/or message size is high.

This blog shows you how to squeeze the most value out of your data with compression input formats. Introduced last fall after a growing number of requests from customers, Azure Stream Analytics offers support for GZIP and Deflate streams.

Azure Stream Analytics is now a viable option for many customers who previously faced these bandwidth and message size limitations.

Putting the squeeze on your data with GZipStream

Compressed input can be used alongside most supported serialization formats, including Json and Csv, if desired. Avro supports its own compression options, so additional compression is not necessary when using Avro serialization. This blog post outlines how to spin up a job using an EventHub input, JSON serialization, and GZip compression. Both the VSTS Project shown and the sample data used can be found on our GitHub.

Step 1: Defining the Events

Let’s start by defining the events that would be sent to input source:

[DataContract]
public class SampleEvent
{
[DataMember]
public int Id { get; set; }
}

We will be using the Newtonsoft.Json library for Json serialization. You will need to add a NuGet reference for this library through Project -> Manage NuGet Packages.

I have added the package for Service Bus in the same manner.

Here is how Packages.config file looks like after adding NuGet references:

<?xmlversion="1.0"encoding="utf-8"?>
<packages>

<packageid="Newtonsoft.Json"version="11.0.2"targetFramework="netstandard2.0" />
<packageid="WindowsAzure.ServiceBus"version="4.1.10"targetFramework="net45" />

</packages>

Step 2: Serialize the Data

Now let’s look at the example serialization code. We will be using classes from Newtonsoft.Json namespace. The sample file can be found on our GitHub.

private class CompressedEventsJsonSerializer
{
    public byte[] GetSerializedEvents<T>(IEnumerable<object> events)
    {
        if (events == null)
        {
            throw new ArgumentNullException(nameof(events));
        }
        JsonSerializer serializer = new JsonSerializer();
        using (var memoryStream = new MemoryStream())
        using (FileStream file = new FileStream(@"C:\mystream.json.gz", FileMode.OpenOrCreate))
        using (GZipStream stream = new GZipStream(file, CompressionLevel.Optimal))
        using (StreamWriter sw = new StreamWriter(stream))
        using (JsonWriter writer = new JsonTextWriter(sw)) // or using (StringWriter writer = new StringWriter(sw))
        {
            foreach (var current in events)
            {
                serializer.Serialize(writer, current);
            }

            return memoryStream.ToArray();
       }
    }
}

The code above serializes the events in Json format and includes the schema in each payload. Azure Stream Analytics requires the schema to be specified with the payload. Note that the container has multiple events and schema is specified only once.

Step 3: Send the events to Event Hub

Finally, let’s send events to Event Hub:

private static void Main(string[] args)
{
    var eventHubClient =
        EventHubClient.CreateFromConnectionString(
            "",
            "");
    var compressedEventsJsonSerializer = new CompressedEventsJsonSerializer();
    while (true)
    {
        var eventsPayload =
            compressedEventsJsonSerializer.GetSerializedEvents(
                Enumerable.Range(0, 5).Select(i => new SampleEvent() { Id = i }));
        eventHubClient.Send(new EventData(eventsPayload));
        Thread.Sleep(TimeSpan.FromSeconds(10));
    }
}

We have now seen example code for sending events in Avro format to Event Hub; these events can now be consumed by a Stream analytics job by configuring an EventHub input, Json serialization format, and GZIP compression.

Please keep the feedback coming! The Azure Stream Analytics team is highly committed to listening to your feedback and letting the user voice influence our future investments. We welcome you to join the conversation and make your voice heard UserVoice, learn more about what you can do with Azure Stream Analytics, and connect with us on Twitter.

Comments (0)

Skip to main content