Čtení vstupu v libovolném formátu pomocí vlastních deserializérů .NET (Preview)
Důležité
Vlastní deserializátor .net pro Azure Stream Analytics bude vyřazen 30. září 2024. Po tomto datu nebude možné tuto funkci použít. Do tohoto data přejděte na předdefinovaný deserializátor JSON, AVRO nebo CSV.
Vlastní deserializátory .NET umožňují vaší úloze Azure Stream Analytics číst data z formátů mimo tři předdefinované formáty dat. Tento článek vysvětluje formát serializace a rozhraní, která definují vlastní deserializátory .NET pro cloudové a hraniční úlohy Azure Stream Analytics. Existují také příklady deserializerů pro vyrovnávací paměť protokolu a formát CSV.
Vlastní deserializátor .NET
Následující ukázky kódu jsou rozhraní, která definují vlastní deserializátor a implementují StreamDeserializer<T>
.
UserDefinedOperator
je základní třída pro všechny vlastní operátory streamování. Inicializuje StreamingContext
, což poskytuje kontext, který zahrnuje mechanismus pro publikování diagnostiky, pro které budete muset ladit všechny problémy s deserializátorem.
public abstract class UserDefinedOperator
{
public abstract void Initialize(StreamingContext streamingContext);
}
Následující fragment kódu je deserializace pro streamovaná data.
Přeskočená chyba by se měla generovat pomocí IStreamingDiagnostics
metody UserDefinedOperator
Inicializace. Všechny výjimky budou považovány za chyby a deserializátor se znovu vytvoří. Po několika chybách přejde úloha do stavu selhání.
StreamDeserializer<T>
deserializuje datový proud do objektu typu T
. Musí být splněny následující podmínky:
- T je třída nebo struktura.
- Všechna veřejná pole v jazyce T jsou buď
- Jeden z [sbyte, byte, short, ushort, int, uint, long, DateTime, string, float, double] nebo jejich ekvivalenty s možnou hodnotou null.
- Jiná struktura nebo třída se stejnými pravidly.
- Pole typu
T2
, které se řídí stejnými pravidly. IListT2
kde T2 se řídí stejnými pravidly.- Nemá žádné rekurzivní typy.
stream
Parametr je datový proud obsahující serializovaný objekt. Deserialize
vrátí kolekci T
instancí.
public abstract class StreamDeserializer<T> : UserDefinedOperator
{
public abstract IEnumerable<T> Deserialize(Stream stream);
}
StreamingContext
poskytuje kontext, který zahrnuje mechanismus publikování diagnostiky pro operátora uživatele.
public abstract class StreamingContext
{
public abstract StreamingDiagnostics Diagnostics { get; }
}
StreamingDiagnostics
je diagnostika pro uživatelem definované operátory, včetně serializátoru, deserializátoru a uživatelem definovaných funkcí.
WriteError
zapíše chybovou zprávu do protokolů prostředků a odešle chybu do diagnostiky.
briefMessage
je krátká chybová zpráva. Tato zpráva se zobrazí v diagnostice a produktový tým ji používá k ladění. Nezahrnujte citlivé informace a neuchovávejte zprávu méně než 200 znaků.
detailedMessage
je podrobná chybová zpráva, která se přidá jenom do protokolů prostředků ve vašem úložišti. Tato zpráva by měla mít méně než 2 000 znaků.
public abstract class StreamingDiagnostics
{
public abstract void WriteError(string briefMessage, string detailedMessage);
}
Příklady deserializátoru
V této části se dozvíte, jak psát vlastní deserializátory pro Protobuf a CSV. Další příklady, jako je formát AVRO pro Event Hubs Capture, najdete v Azure Stream Analytics na GitHubu.
Formát vyrovnávací paměti protokolu (Protobuf)
Toto je příklad použití formátu vyrovnávací paměti protokolu.
Předpokládejme následující definici vyrovnávací paměti protokolu.
syntax = "proto3";
// protoc.exe from nuget "Google.Protobuf.Tools" is used to generate .cs file from this schema definition.
// Run below command to generate the csharp class
// protoc.exe --csharp_out=. MessageBodyProto.proto
package SimulatedTemperatureSensor;
message MessageBodyProto {
message Ambient {
double temperature = 1;
int64 humidity = 2;
}
message Machine {
double temperature = 1;
double pressure = 2;
}
Machine machine = 1;
Ambient ambient = 2;
string timeCreated = 3;
}
Spuštění protoc.exe
z Webu NuGet Google.Protobuf.Tools vygeneruje soubor .cs s definicí. Vygenerovaný soubor se tady nezobrazuje. Musíte zajistit, aby verze Protobuf NuGet, kterou používáte v projektu Stream Analytics, odpovídala verzi Protobuf, která byla použita k vygenerování vstupu.
Následující fragment kódu je implementace deserializátoru za předpokladu, že vygenerovaný soubor je součástí projektu. Tato implementace je pouze tenký obálka nad vygenerovaným souborem.
public class MessageBodyDeserializer : StreamDeserializer<SimulatedTemperatureSensor.MessageBodyProto>
{
public override IEnumerable<SimulatedTemperatureSensor.MessageBodyProto> Deserialize(Stream stream)
{
while (stream.Position < stream.Length)
{
yield return SimulatedTemperatureSensor.MessageBodyProto.Parser.ParseDelimitedFrom(stream);
}
}
public override void Initialize(StreamingContext streamingContext)
{
}
}
CSV
Následující fragment kódu je jednoduchý deserializátor CSV, který také ukazuje šíření chyb.
using System.Collections.Generic;
using System.IO;
using Microsoft.Azure.StreamAnalytics;
using Microsoft.Azure.StreamAnalytics.Serialization;
namespace ExampleCustomCode.Serialization
{
public class CustomCsvDeserializer : StreamDeserializer<CustomEvent>
{
private StreamingDiagnostics streamingDiagnostics;
public override IEnumerable<CustomEvent> Deserialize(Stream stream)
{
using (var sr = new StreamReader(stream))
{
string line = sr.ReadLine();
while (line != null)
{
if (line.Length > 0 && !string.IsNullOrWhiteSpace(line))
{
string[] parts = line.Split(',');
if (parts.Length != 3)
{
streamingDiagnostics.WriteError("Did not get expected number of columns", $"Invalid line: {line}");
}
else
{
yield return new CustomEvent()
{
Column1 = parts[0],
Column2 = parts[1],
Column3 = parts[2]
};
}
}
line = sr.ReadLine();
}
}
}
public override void Initialize(StreamingContext streamingContext)
{
this.streamingDiagnostics = streamingContext.Diagnostics;
}
}
public class CustomEvent
{
public string Column1 { get; set; }
public string Column2 { get; set; }
public string Column3 { get; set; }
}
}
Formát serializace pro rozhraní REST API
Každý vstup Stream Analytics má formát serializace. Další informace o možnostech vstupu najdete v dokumentaci ke vstupnímu rozhraní REST API .
Následující kód JavaScriptu je příkladem formátu serializace deserializátoru .NET při použití rozhraní REST API:
{
"properties":{
"type":"stream",
"serialization":{
"type":"CustomCLR",
"properties":{
"serializationDllPath":"<path to the dll inside UserCustomCode\CLR\ folder>",
"serializationClassName":"<Full name of the deserializer class name>"
}
}
}
}
serializationClassName
by měla být třída, která implementuje StreamDeserializer<T>
. Toto je popsáno v následující části.
Podpora oblastí
Tato funkce je dostupná v následujících oblastech při použití skladové položky Standard:
- Středozápad USA
- Severní Evropa
- USA – východ
- USA – západ
- USA – východ 2
- Západní Evropa
Můžete požádat o podporu pro více oblastí. Při používání clusterů Stream Analytics ale neexistuje žádné takové omezení oblasti.
Nejčastější dotazy
Kdy bude tato funkce dostupná ve všech oblastech Azure?
Tato funkce je dostupná v 6 oblastech. Pokud vás zajímá použití této funkce v jiné oblasti, můžete odeslat žádost. Podpora všech oblastí Azure je v plánu.
Můžu získat přístup k MetadataPropertyValue z mých vstupů podobných GetMetadataPropertyValue funkce?
Tato funkce není podporovaná. Pokud tuto funkci potřebujete, můžete hlasovat pro tuto žádost na webu UserVoice.
Můžu sdílet svoji implementaci deserializátoru s komunitou, aby ostatní mohli těžit?
Po implementaci deserializátoru můžete ostatním pomoct tím, že ho nasdílíte s komunitou. Odešlete svůj kód do úložiště Azure Stream Analytics na GitHubu.
Jaká jsou další omezení používání vlastních deserializérů ve Stream Analytics?
Pokud je váš vstup ve formátu Protobuf se schématem obsahujícím MapField
typ, nebudete moct implementovat vlastní deserializátor. Vlastní deserializátory také nepodporují ukázková data ani náhledová data.