Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
Dáta prichádzajúce do IoT Hub-u môžete preposielať do analytickej služby pre spracovanie v reálnom čase (napr. do Azure Stream Analytics), alebo ich môžete spracovať priamo v IoT Hub-e. Častým a najjednoduchším scenárom je ukladanie dát prijatých IoT Hub-om do lacného úložiska pre archiváciu a pravidelné analyzovanie na konci dňa (napr. Hadoop-om). Čaro Azure Stream Analytics si necháme na ďalší článok, na nasledujúcich riadkoch vytvoríme archív prijatých dát v úložisku Azure Storage, a aby sme dáta zo zdroja iba nekopírovali, rozšírime ich o časovú dimenziu.
Postspracovanie mikrodát v IoT Hub-e zabezpečuje „event processor“ , t.j. kód, ktorý ihneď po prijatí dáta spracuje, akumuluje ich v vyrovnávacej pamäti a potom ich v momente, keď nastane checkpoint (napr. ak bafer dát v pamäti dosiahol 4MB, alebo prešiel nejaký čas od posledného checkpointu), zapíše do súboru v úložisku.
Poznámka: Kód „event procesora“ by mal vždy urobiť len rýchle spracovanie dát. Ak bude príliš komplexný, časovo náročný, prichádzajúce dáta sa budú hromadiť v fronte čakajúcej na spracovanie, čo môže viesť k neresponzívnosti IoT Hub-u. Náš príklad nižšie je jednoduchý a nerieši možný problém, kedy zdroj dát pošle tú istú správu do IoT Hub-u dva razy. Deduplikáciu je dobré zveriť Azure Service Bus-u, ktorý by skôr, ako sa dáta zapíšu do úložiska, vyhodil z fronty správy s rovnakým „id“ správy („MessageId“).
1. Skôr ako sa pustíte do kódovania aplikácie zapisujúcej dáta prijaté IoT Hub-om do úložiska, musíte si pripraviť samotné úložisko – Azure Storage. Prihláste sa na Azure portál na https://portal.azure.com a cez menu „New -> Data + Storage -> Storage Account” prejdite k vytvoreniu účtu úložiska.
2. Vyberte model nasadenia ( Deployment model) „Resource Manager“ a potvrďte tlačidlo „Create“ .
Poznámka: Azure podporuje dva modely nasadenia. Starší Classic (označovaný aj ako Azure Service Management) a nový „Resource Manager“ postavený na tom, že každá služba v Azure sa dá popísať JSON šablónou.
3. Zadajte názov „storage“ účtu, vyberte si typ úložiska (napr. lacnejší Standard-LRS). Diagnostiku pre nekritický projekt môžete prepnúť na „Disabled“. Ak chcete mať všetky služby týkajúce sa IoT Hub-u ľahko dohľadateľné, vyberte rovnakú „Resource Group“ , v ktorej máte IoT Hub. Pre úložisko vyberte rovnaké dátové centrum, v ktorom je aj IoT Hub. Nakoniec potvrďte tlačidlo „Create“ .
4. Po vytvorení úložiska zobrazte „master“ kľúče cez menu „Access Keys“ a prekopírujte do „clipboardu“ hodnotu prvého „connection string-u“ úložiska. Budete ho potrebovať pre aplikáciu, ktorá bude zapisovať dáta do úložiska.
Poznámka: „master“ kľúče si chráňte. My sme ich použili priamo v aplikácii len kvoli jednoduchosti. Bezpečnejšie je v aplikáciach používať prístup cez politikami riadený spôsob Shared Access Signature.
5. Vytvorte v Visual Studiu projekt typu “Console Application” v C# napr. s názvom „UlozenieDatDoUloziska“ .
6. V Solution Explorer-e vyberte kontextové menu nad projektom “UlozenieDatDoUloziska” a vyberte z neho “Manage NuGet Packages” .
7. V okne NuGet Package Manager do položky na vyhľadavanie podľa názvu zadajte “Microsoft Azure Service Bus Event Hub – EventProcessorHost” . Po vyhľadaní balíčka potvrďte tlačidlo “Install” a následne akceptujte inštaláciu naviazaných knižníc tlačidlom "OK" a podmienky použitia tejto knižnice tlačidlom “I accept” .
8. V Solution Explorer-e vyberte kontextové menu nad projektom “UlozenieDatDoUloziska” a vyberte z neho “Add” a potom “Class” . Ako názov novej triedy v projekte zadajte “SpracovanieDat.cs” a potvrďte "OK" .
9. Pridajte na začiatok kódu „SpracovanieDat.cs“ nasledujúce referencie potrebné pre komunikáciu s IoT Hub-om a úložiskom:
using System.IO;
using System.Diagnostics;
using System.Security.Cryptography;
using Microsoft.ServiceBus.Messaging;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
10. Zameňte kód triedy „SpracovanieDat“ nasledujúcim kódom:
class SpracovanieDat : IEventProcessor
{
private const int MAX_VELKOST_BLOKU = 4 * 1024 * 1024;
public static string StorageConnectionString;
private CloudBlobClient blobKlient;
private CloudBlobContainer blobKontajner;
private long aktualnyOffsetInitBloku;
private MemoryStream naPridanie = new MemoryStream(MAX_VELKOST_BLOKU);
private Stopwatch stopwatch;
//ak sa nedosiahne MAX_BLOCK_SIZE skor, zapisu sa data do Storage kazdych 5 minut
private TimeSpan MAX_CHECKPOINT_CAS = TimeSpan.FromMinutes (5);
public SpracovanieDat()
{
//pripojenie k ulozisku, vytvorenie klientskej instancie a kontajnera
var storageAccount = CloudStorageAccount.Parse(StorageConnectionString);
blobKlient = storageAccount.CreateCloudBlobClient();
blobKontajner = blobKlient.GetContainerReference("mojiotarchiv");
blobKontajner.CreateIfNotExists();
}
Task IEventProcessor.CloseAsync(PartitionContext context, CloseReason reason)
{
Console.WriteLine("Procesor sa vypina. Particia '{0}', Pricina: '{1}'.", context.Lease.PartitionId, reason);
return Task.FromResult<object>(null);
}
Task IEventProcessor.OpenAsync(PartitionContext context)
{
Console.WriteLine("StoreEventProcessor inicializovany. Particia: '{0}', Offset: '{1}'", context.Lease.PartitionId, context.Lease.Offset);
if (!long.TryParse(context.Lease.Offset, out aktualnyOffsetInitBloku))
{
aktualnyOffsetInitBloku = 0;
}
stopwatch = new Stopwatch();
stopwatch.Start();
return Task.FromResult<object>(null);
}
async Task IEventProcessor.ProcessEventsAsync(PartitionContext kontext, IEnumerable<EventData> spravy)
{
foreach (EventData eventData in spravy)
{
//do JSON objektu chceme pridat aj cas prijatia
//preto prijatu JSON strukturu deserializujeme a pridame novu vlastnost 'cas'
dynamic zdrojovyJson = Newtonsoft.Json.JsonConvert.DeserializeObject(System.Text.Encoding.ASCII.GetString(eventData.GetBytes()), typeof(object));
Newtonsoft.Json.Linq.JObject zdrojovyJsonObjekt = new Newtonsoft.Json.Linq.JObject(zdrojovyJson);
zdrojovyJsonObjekt.Add("cas", eventData.EnqueuedTimeUtc);
var upravenyJson = Newtonsoft.Json.JsonConvert.SerializeObject( zdrojovyJsonObjekt, Newtonsoft.Json.Formatting.Indented);
//konverzia na pole bajtov pre zapis do uloziska
byte[] data = System.Text.Encoding.ASCII.GetBytes(upravenyJson);
if (naPridanie.Length + data.Length > MAX_VELKOST_BLOKU || stopwatch.Elapsed > MAX_CHECKPOINT_CAS)
{
await PridajAVyvolajCheckpoint(kontext);
}
await naPridanie.WriteAsync(data, 0, data.Length);
Console.WriteLine(string.Format("Sprava prijata. Particia: '{0}', Data: '{1}'",
kontext.Lease.PartitionId, Encoding.UTF8.GetString(data)));
}
}
private async Task PridajAVyvolajCheckpoint(PartitionContext kontext)
{
var blokIdString = String.Format("startSeq:{0}", aktualnyOffsetInitBloku.ToString("0000000000000000000000000"));
var blokId = Convert.ToBase64String(ASCIIEncoding.ASCII.GetBytes(blokIdString));
naPridanie.Seek(0, SeekOrigin.Begin);
byte[] md5 = MD5.Create().ComputeHash(naPridanie);
naPridanie.Seek(0, SeekOrigin.Begin);
var nazovBlobu = String.Format("iothub_{0}", kontext.Lease.PartitionId);
var aktualnyBlob = blobKontajner.GetBlockBlobReference(nazovBlobu);
if (await aktualnyBlob.ExistsAsync())
{
await aktualnyBlob.PutBlockAsync(blokId, naPridanie, Convert.ToBase64String(md5));
var blokList = await aktualnyBlob.DownloadBlockListAsync();
var novyBlokList = new List<string>(blokList.Select(b => b.Name));
if (novyBlokList.Count() > 0 && novyBlokList.Last() != blokId)
{
novyBlokList.Add(blokId);
VypisZvyraznenuSpravu(String.Format("Pridavam blok id: {0} do blobu: {1}", blokIdString, aktualnyBlob.Name));
}
else
{
VypisZvyraznenuSpravu(String.Format("Prepisujem blok id: {0}", blokIdString));
}
await aktualnyBlob.PutBlockListAsync(novyBlokList);
}
else
{
await aktualnyBlob.PutBlockAsync(blokId, naPridanie, Convert.ToBase64String(md5));
var novyBlokList = new List<string>();
novyBlokList.Add(blokId);
await aktualnyBlob.PutBlockListAsync(novyBlokList);
VypisZvyraznenuSpravu(String.Format("Vytvaram novy blob", aktualnyBlob.Name));
}
naPridanie.Dispose();
naPridanie = new MemoryStream(MAX_VELKOST_BLOKU);
// checkpoint
await kontext.CheckpointAsync();
VypisZvyraznenuSpravu(String.Format("Checkpoint particie: {0}", kontext.Lease.PartitionId));
aktualnyOffsetInitBloku = long.Parse(kontext.Lease.Offset);
stopwatch.Restart();
}
private void VypisZvyraznenuSpravu(string sprava)
{
Console.ForegroundColor = ConsoleColor.Yellow;
Console.WriteLine(sprava);
Console.ResetColor();
}
}
Poznámka: Trieda, ktorá má pracovať ako postprocesor dát IoT Hub-u, musí implementovať interfejs „IEventProcessor„. V konštruktore sa napojí na úložisko, v „ProcessEventsAsync“ preberá dáta zo zdroja a upravuje ich, v „PridajAVyvolajCheckpoint“ po blokoch zapíše upravené dáta do vytvoreného súboru v úložisku. Aby súbor nenarastal do nekonečna, môžete kód upraviť napríklad tak, že vždy po polnoci sa vytvorí nový súbor.
11. Otvorte „Program.cs“ a pridajte na začiatok referenciu na ServiceBus:
using Microsoft.ServiceBus.Messaging;
12. Zameňte metódu „Main“ nasledujúcim kódom:
static void Main(string[] args)
{
string iotHubConnectionString = "sem_vlozte_connectionstring_na_iothub";
string iotHubEndpoint = "messages/events";
SpracovanieDat.StorageConnectionString = "sem_vlozte_connectionstring_na_ulozisko";
string eventProcessorHostName = Guid.NewGuid().ToString();
EventProcessorHost eventProcessorHost = new EventProcessorHost(eventProcessorHostName, iotHubEndpoint, EventHubConsumerGroup.DefaultGroupName, iotHubConnectionString, SpracovanieDat.StorageConnectionString, "messages-events");
Console.WriteLine("Registrovanie EventProcessor-a...");
eventProcessorHost.RegisterEventProcessorAsync<SpracovanieDat>().Wait();
Console.WriteLine("Prijimam. Potvrdte klavesu Enter na zastavenie.");
Console.ReadLine();
eventProcessorHost.UnregisterEventProcessorAsync().Wait();
}
13. V vloženom kóde nastavte „SpracovanieDat.StorageConnectionString“ na hodnotu „connection string-u“ k úložisku, ktorý ste získali v kroku 4.
14. Prepnite sa na Azure portál, prejdite do nastavení IoT Hub-u cez „Settings-> Shared Access Policies” :
15. Vyberte politiku „service“ , ktorá umožní nášmu postprocesoru pripojiť sa do IoT Hub-u a čítať dáta. Prekopírujte „connection string“ tejto politiky do clipboardu:
16. Prepnite sa do Visual Studia a zmeňte hodnotu premennej “iotHubConnectionString“ za hodnotu načítanú do clipboardu v predošlom kroku.
17. Spustite aplikáciu “simulacia_zdroja” , ktorú ste vytvorili v 3.časti seriálu o IoT Hub-e.
18. Spustite práve vytvorenú aplikáciu “UlozenieDatDoUloziska” na postspracovanie dát prijatých IoT Hub-om.
Poznámka: Náš postprocesor beží ako konzolová aplikácia na lokálnom počítači. V produkčnom prostredí sa doporučuje spúšťať kód postprocesora priamo v Azure, v virtuálnych serveroch alebo v aplikačných farmách Cloud Service Worker Role.
19. V “UlozenieDatDoUloziska” ste nastavili “checkpoint” na 5 minút. Počkajte teda 5 minút a potom prejdite na Azure portále do úložiska vytvoreného v kroku 3. Vstúpte do jeho “Blobs” panela zobrazujúceho kontajnery súborov a otvorte obsah kontajnera “mojiotarchiv“ .
20. Do kontajnera pribudol súbor “iothub...“. Kliknite na názov tohto súboru a cez menu „Download“ si ho stiahnite na lokálny počítač.
21. Otvorte stiahnutý súbor. Obsahuje dáta prijaté IoT Hub-om z monitorovanej webovej aplikácie, ktoré sme navyše v posprocesore rozšírili aj o položku „čas“.
V tejto časti sme vytvorili aplikáciu napojenú priamo na IoT Hub, ktorá slúži na úpravu a archivovanie správ prijatých IoT Hub-om. V ďaľšej časti sa zoznámime so službou Azure Stream Analytics, ktorá dokáže v reálnom čase analyzovať dáta prijaté IoT Hub-om. Práve takáto služba je potrebná na rýchle odchytenie anomálii v mikrodátach odoslaných zo zdrojov dát.
Miro