Mikrodáta do Azure (4.časť) - Ako najjednoduchšie spracovať dáta v IoT Hub-e ( krok za krokom )

Dáta prichádzajúce do IoT Hub-u môžete preposielať do analytickej služby pre spracovanie v reálnom čase (napr. do Azure Stream Analytics), alebo ich môžete spracovať priamo v IoT Hub-e. Častým a najjednoduchším scenárom je ukladanie dát prijatých IoT Hub-om do lacného úložiska pre archiváciu a pravidelné analyzovanie na konci dňa (napr. Hadoop-om). Čaro Azure Stream Analytics si necháme na ďalší článok, na nasledujúcich riadkoch vytvoríme archív prijatých dát v úložisku Azure Storage, a aby sme dáta zo zdroja iba nekopírovali, rozšírime ich o časovú dimenziu.
Postspracovanie mikrodát v IoT Hub-e zabezpečuje „event processor“ , t.j. kód, ktorý ihneď po prijatí dáta spracuje, akumuluje ich v vyrovnávacej pamäti a potom ich v momente, keď nastane checkpoint (napr. ak bafer dát v pamäti dosiahol 4MB, alebo prešiel nejaký čas od posledného checkpointu), zapíše do súboru v úložisku.
Poznámka: Kód „event procesora“ by mal vždy urobiť len rýchle spracovanie dát. Ak bude príliš komplexný, časovo náročný, prichádzajúce dáta sa budú hromadiť v fronte čakajúcej na spracovanie, čo môže viesť k neresponzívnosti IoT Hub-u. Náš príklad nižšie je jednoduchý a nerieši možný problém, kedy zdroj dát pošle tú istú správu do IoT Hub-u dva razy. Deduplikáciu je dobré zveriť Azure Service Bus-u, ktorý by skôr, ako sa dáta zapíšu do úložiska, vyhodil z fronty správy s rovnakým „id“ správy („MessageId“).

1. Skôr ako sa pustíte do kódovania aplikácie zapisujúcej dáta prijaté IoT Hub-om do úložiska, musíte si pripraviť samotné úložisko – Azure Storage. Prihláste sa na Azure portál na https://portal.azure.com a cez menu „New -> Data + Storage -> Storage Account” prejdite k vytvoreniu účtu úložiska.
2. Vyberte model nasadenia ( Deployment model) „Resource Manager“ a potvrďte tlačidlo „Create“ .
Poznámka: Azure podporuje dva modely nasadenia. Starší Classic (označovaný aj ako Azure Service Management) a nový „Resource Manager“ postavený na tom, že každá služba v Azure sa dá popísať JSON šablónou.
3. Zadajte názov „storage“ účtu, vyberte si typ úložiska (napr. lacnejší Standard-LRS). Diagnostiku pre nekritický projekt môžete prepnúť na „Disabled“. Ak chcete mať všetky služby týkajúce sa IoT Hub-u ľahko dohľadateľné, vyberte rovnakú „Resource Group“ , v ktorej máte IoT Hub. Pre úložisko vyberte rovnaké dátové centrum, v ktorom je aj IoT Hub. Nakoniec potvrďte tlačidlo „Create“ .

2_storagearchiv
4. Po vytvorení úložiska zobrazte „master“ kľúče cez menu „Access Keys“ a prekopírujte do „clipboardu“ hodnotu prvého „connection string-u“ úložiska. Budete ho potrebovať pre aplikáciu, ktorá bude zapisovať dáta do úložiska.

3_storagearchivconnstring
Poznámka: „master“ kľúče si chráňte. My sme ich použili priamo v aplikácii len kvoli jednoduchosti. Bezpečnejšie je v aplikáciach používať prístup cez politikami riadený spôsob Shared Access Signature.
5. Vytvorte v Visual Studiu projekt typu “Console Application” v C# napr. s názvom „UlozenieDatDoUloziska“ .
6. V Solution Explorer-e vyberte kontextové menu nad projektom “UlozenieDatDoUloziska” a vyberte z neho “Manage NuGet Packages” .
7. V okne NuGet Package Manager do položky na vyhľadavanie podľa názvu zadajte “Microsoft Azure Service Bus Event Hub – EventProcessorHost” . Po vyhľadaní balíčka potvrďte tlačidlo “Install” a následne akceptujte inštaláciu naviazaných knižníc tlačidlom "OK" a podmienky použitia tejto knižnice tlačidlom “I accept” .

1_nuget
8. V Solution Explorer-e vyberte kontextové menu nad projektom “UlozenieDatDoUloziska” a vyberte z neho “Add” a potom “Class” . Ako názov novej triedy v projekte zadajte “SpracovanieDat.cs” a potvrďte "OK" .
9. Pridajte na začiatok kódu „SpracovanieDat.cs“ nasledujúce referencie potrebné pre komunikáciu s IoT Hub-om a úložiskom:

 using System.IO;
using System.Diagnostics;
using System.Security.Cryptography;
using Microsoft.ServiceBus.Messaging;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;

10. Zameňte kód triedy „SpracovanieDat“ nasledujúcim kódom:

 class SpracovanieDat : IEventProcessor
{
private const int MAX_VELKOST_BLOKU = 4 * 1024 * 1024;
public static string StorageConnectionString;

private CloudBlobClient blobKlient;
private CloudBlobContainer blobKontajner;

private long aktualnyOffsetInitBloku;
private MemoryStream naPridanie = new MemoryStream(MAX_VELKOST_BLOKU);

private Stopwatch stopwatch;
//ak sa nedosiahne MAX_BLOCK_SIZE skor, zapisu sa data do Storage kazdych 5 minut
private TimeSpan MAX_CHECKPOINT_CAS = TimeSpan.FromMinutes (5);

public SpracovanieDat()
{
//pripojenie k ulozisku, vytvorenie klientskej instancie a kontajnera
var storageAccount = CloudStorageAccount.Parse(StorageConnectionString);
blobKlient = storageAccount.CreateCloudBlobClient();
blobKontajner = blobKlient.GetContainerReference("mojiotarchiv");
blobKontajner.CreateIfNotExists();
}
Task IEventProcessor.CloseAsync(PartitionContext context, CloseReason reason)
{
Console.WriteLine("Procesor sa vypina. Particia '{0}', Pricina: '{1}'.", context.Lease.PartitionId, reason);
return Task.FromResult<object>(null);
}

Task IEventProcessor.OpenAsync(PartitionContext context)
{
Console.WriteLine("StoreEventProcessor inicializovany. Particia: '{0}', Offset: '{1}'", context.Lease.PartitionId, context.Lease.Offset);

if (!long.TryParse(context.Lease.Offset, out aktualnyOffsetInitBloku))
{
aktualnyOffsetInitBloku = 0;
}
stopwatch = new Stopwatch();
stopwatch.Start();

return Task.FromResult<object>(null);
}

async Task IEventProcessor.ProcessEventsAsync(PartitionContext kontext, IEnumerable<EventData> spravy)
{
foreach (EventData eventData in spravy)
{
//do JSON objektu chceme pridat aj cas prijatia
//preto prijatu JSON strukturu deserializujeme a pridame novu vlastnost 'cas'
dynamic zdrojovyJson = Newtonsoft.Json.JsonConvert.DeserializeObject(System.Text.Encoding.ASCII.GetString(eventData.GetBytes()), typeof(object));
Newtonsoft.Json.Linq.JObject zdrojovyJsonObjekt = new Newtonsoft.Json.Linq.JObject(zdrojovyJson);
zdrojovyJsonObjekt.Add("cas", eventData.EnqueuedTimeUtc);
var upravenyJson = Newtonsoft.Json.JsonConvert.SerializeObject( zdrojovyJsonObjekt, Newtonsoft.Json.Formatting.Indented);

//konverzia na pole bajtov pre zapis do uloziska
byte[] data = System.Text.Encoding.ASCII.GetBytes(upravenyJson);

if (naPridanie.Length + data.Length > MAX_VELKOST_BLOKU || stopwatch.Elapsed > MAX_CHECKPOINT_CAS)
{
await PridajAVyvolajCheckpoint(kontext);
}
await naPridanie.WriteAsync(data, 0, data.Length);

Console.WriteLine(string.Format("Sprava prijata. Particia: '{0}', Data: '{1}'",
kontext.Lease.PartitionId, Encoding.UTF8.GetString(data)));
}
}

private async Task PridajAVyvolajCheckpoint(PartitionContext kontext)
{
var blokIdString = String.Format("startSeq:{0}", aktualnyOffsetInitBloku.ToString("0000000000000000000000000"));
var blokId = Convert.ToBase64String(ASCIIEncoding.ASCII.GetBytes(blokIdString));
naPridanie.Seek(0, SeekOrigin.Begin);
byte[] md5 = MD5.Create().ComputeHash(naPridanie);
naPridanie.Seek(0, SeekOrigin.Begin);

var nazovBlobu = String.Format("iothub_{0}", kontext.Lease.PartitionId);
var aktualnyBlob = blobKontajner.GetBlockBlobReference(nazovBlobu);

if (await aktualnyBlob.ExistsAsync())
{
await aktualnyBlob.PutBlockAsync(blokId, naPridanie, Convert.ToBase64String(md5));
var blokList = await aktualnyBlob.DownloadBlockListAsync();
var novyBlokList = new List<string>(blokList.Select(b => b.Name));

if (novyBlokList.Count() > 0 && novyBlokList.Last() != blokId)
{
novyBlokList.Add(blokId);
VypisZvyraznenuSpravu(String.Format("Pridavam blok id: {0} do blobu: {1}", blokIdString, aktualnyBlob.Name));
}
else
{
VypisZvyraznenuSpravu(String.Format("Prepisujem blok id: {0}", blokIdString));
}
await aktualnyBlob.PutBlockListAsync(novyBlokList);
}
else
{
await aktualnyBlob.PutBlockAsync(blokId, naPridanie, Convert.ToBase64String(md5));
var novyBlokList = new List<string>();
novyBlokList.Add(blokId);
await aktualnyBlob.PutBlockListAsync(novyBlokList);
VypisZvyraznenuSpravu(String.Format("Vytvaram novy blob", aktualnyBlob.Name));
}
naPridanie.Dispose();
naPridanie = new MemoryStream(MAX_VELKOST_BLOKU);

// checkpoint
await kontext.CheckpointAsync();
VypisZvyraznenuSpravu(String.Format("Checkpoint particie: {0}", kontext.Lease.PartitionId));

aktualnyOffsetInitBloku = long.Parse(kontext.Lease.Offset);
stopwatch.Restart();
}

private void VypisZvyraznenuSpravu(string sprava)
{
Console.ForegroundColor = ConsoleColor.Yellow;
Console.WriteLine(sprava);
Console.ResetColor();
}
}

Poznámka: Trieda, ktorá má pracovať ako postprocesor dát IoT Hub-u, musí implementovať interfejs „IEventProcessor„. V konštruktore sa napojí na úložisko, v „ProcessEventsAsync“ preberá dáta zo zdroja a upravuje ich, v „PridajAVyvolajCheckpoint“ po blokoch zapíše upravené dáta do vytvoreného súboru v úložisku. Aby súbor nenarastal do nekonečna, môžete kód upraviť napríklad tak, že vždy po polnoci sa vytvorí nový súbor.

11. Otvorte „Program.cs“ a pridajte na začiatok referenciu na ServiceBus:

 using Microsoft.ServiceBus.Messaging;

12. Zameňte metódu „Main“ nasledujúcim kódom:

 static void Main(string[] args)
{
string iotHubConnectionString = "sem_vlozte_connectionstring_na_iothub";
string iotHubEndpoint = "messages/events";
SpracovanieDat.StorageConnectionString = "sem_vlozte_connectionstring_na_ulozisko";

string eventProcessorHostName = Guid.NewGuid().ToString();
EventProcessorHost eventProcessorHost = new EventProcessorHost(eventProcessorHostName, iotHubEndpoint, EventHubConsumerGroup.DefaultGroupName, iotHubConnectionString, SpracovanieDat.StorageConnectionString, "messages-events");
Console.WriteLine("Registrovanie EventProcessor-a...");
eventProcessorHost.RegisterEventProcessorAsync<SpracovanieDat>().Wait();

Console.WriteLine("Prijimam. Potvrdte klavesu Enter na zastavenie.");
Console.ReadLine();
eventProcessorHost.UnregisterEventProcessorAsync().Wait();
}

13. V vloženom kóde nastavte „SpracovanieDat.StorageConnectionString“ na hodnotu „connection string-u“ k úložisku, ktorý ste získali v kroku 4.
14. Prepnite sa na Azure portál, prejdite do nastavení IoT Hub-u cez „Settings-> Shared Access Policies” :

4_iothub_politiky
15. Vyberte politiku „service“ , ktorá umožní nášmu postprocesoru pripojiť sa do IoT Hub-u a čítať dáta. Prekopírujte „connection string“ tejto politiky do clipboardu:

5_iothub_politikydetail
16. Prepnite sa do Visual Studia a zmeňte hodnotu premennej “iotHubConnectionString“ za hodnotu načítanú do clipboardu v predošlom kroku.
17. Spustite aplikáciu “simulacia_zdroja” , ktorú ste vytvorili v 3.časti seriálu o IoT Hub-e.
18. Spustite práve vytvorenú aplikáciu “UlozenieDatDoUloziska” na postspracovanie dát prijatých IoT Hub-om.

6spustene_aplikacie
Poznámka: Náš postprocesor beží ako konzolová aplikácia na lokálnom počítači. V produkčnom prostredí sa doporučuje spúšťať kód postprocesora priamo v Azure, v virtuálnych serveroch alebo v aplikačných farmách Cloud Service Worker Role.
19. V “UlozenieDatDoUloziska” ste nastavili “checkpoint” na 5 minút. Počkajte teda 5 minút a potom prejdite na Azure portále do úložiska vytvoreného v kroku 3. Vstúpte do jeho “Blobs” panela zobrazujúceho kontajnery súborov a otvorte obsah kontajnera “mojiotarchiv“ .

7vytvorenyarchiv
20. Do kontajnera pribudol súbor “iothub...“. Kliknite na názov tohto súboru a cez menu „Download“ si ho stiahnite na lokálny počítač.

8downloadarchiv

21. Otvorte stiahnutý súbor. Obsahuje dáta prijaté IoT Hub-om z monitorovanej webovej aplikácie, ktoré sme navyše v posprocesore rozšírili aj o položku „čas“.

9obsaharchivu

V tejto časti sme vytvorili aplikáciu napojenú priamo na IoT Hub, ktorá slúži na úpravu a archivovanie správ prijatých IoT Hub-om. V ďaľšej časti sa zoznámime so službou Azure Stream Analytics, ktorá dokáže v reálnom čase analyzovať dáta prijaté IoT Hub-om. Práve takáto služba je potrebná na rýchle odchytenie anomálii v mikrodátach odoslaných zo zdrojov dát.

 

Miro