Dela via


Läsa indata i valfritt format med anpassade .NET-deserializers (förhandsversion)

Viktigt!

Anpassad .net deserializer för Azure Stream Analytics dras tillbaka den 30 september 2024. Efter det datumet går det inte att använda funktionen. Övergå till en inbyggd JSON-, AVRO- eller CSV-deserialiserare vid det datumet.

Med anpassade .NET-deserialiserare kan ditt Azure Stream Analytics-jobb läsa data från format utanför de tre inbyggda dataformaten. I den här artikeln beskrivs serialiseringsformatet och gränssnitten som definierar anpassade .NET-deserialiserare för Azure Stream Analytics-moln- och gränsjobb. Det finns också exempel på deserialiserare för protokollbuffert och CSV-format.

Anpassad deserialiserare för .NET

Följande kodexempel är de gränssnitt som definierar den anpassade deserialiseraren och implementerar StreamDeserializer<T>.

UserDefinedOperator är basklassen för alla anpassade strömningsoperatorer. Den initierar StreamingContext, vilket ger kontext, vilket omfattar en mekanism för att publicera diagnostik som du behöver felsöka eventuella problem med deserialiseraren för.

    public abstract class UserDefinedOperator
    {
        public abstract void Initialize(StreamingContext streamingContext);
    }

Följande kodfragment är deserialiseringen för strömmande data.

Överhoppningsbara fel ska genereras med hjälp av IStreamingDiagnostics metoden UserDefinedOperatorInitialize (Initiera). Alla undantag behandlas som fel och deserialiseraren återskapas. Efter några fel kommer jobbet att gå till en misslyckad status.

StreamDeserializer<T> deserialiserar en ström till objekt av typen T. Följande villkor måste uppfyllas:

  1. T är en klass eller en struct.
  2. Alla offentliga fält i T är antingen
    1. En av [sbyte, byte, short, ushort, int, uint, long, DateTime, string, float, double] eller deras nullable equivalents.
    2. En annan struct eller klass som följer samma regler.
    3. Matris av typen T2 som följer samma regler.
    4. IListT2 där T2 följer samma regler.
    5. Har inga rekursiva typer.

Parametern stream är strömmen som innehåller det serialiserade objektet. Deserialize returnerar en samling T instanser.

    public abstract class StreamDeserializer<T> : UserDefinedOperator
    {
        public abstract IEnumerable<T> Deserialize(Stream stream);
    }

StreamingContext ger kontext, vilket inkluderar mekanism för att publicera diagnostik för användaroperator.

    public abstract class StreamingContext
    {
        public abstract StreamingDiagnostics Diagnostics { get; }
    }

StreamingDiagnostics är diagnostiken för användardefinierade operatorer, inklusive serialiserare, deserialiserare och användardefinierade funktioner.

WriteError skriver ett felmeddelande till resursloggarna och skickar felet till diagnostiken.

briefMessage är ett kort felmeddelande. Det här meddelandet visas i diagnostiken och används av produktteamet i felsökningssyfte. Ta inte med känslig information och behåll meddelandet färre än 200 tecken

detailedMessage är ett detaljerat felmeddelande som bara läggs till i dina resursloggar i lagringen. Det här meddelandet bör innehålla mindre än 2 000 tecken.

    public abstract class StreamingDiagnostics
    {
        public abstract void WriteError(string briefMessage, string detailedMessage);
    }

Deserializer-exempel

Det här avsnittet visar hur du skriver anpassade deserializers för Protobuf och CSV. Fler exempel, till exempel AVRO-format för Event Hubs Capture, finns i Azure Stream Analytics på GitHub.

Protokollbuffertformat (Protobuf)

Det här är ett exempel med hjälp av protokollbuffertformat.

Anta följande definition av protokollbufferten.

syntax = "proto3";
// protoc.exe from nuget "Google.Protobuf.Tools" is used to generate .cs file from this schema definition.
// Run below command to generate the csharp class
// protoc.exe --csharp_out=. MessageBodyProto.proto

package SimulatedTemperatureSensor;
message MessageBodyProto {
    message Ambient {
      double temperature = 1;
      int64 humidity = 2;
    }

    message Machine {
      double temperature = 1;
      double pressure = 2;
    }

    Machine machine = 1;
    Ambient ambient = 2;
    string timeCreated = 3;
}

När du kör protoc.exe från Google.Protobuf.Tools genererar NuGet en .cs fil med definitionen. Den genererade filen visas inte här. Du måste se till att den version av Protobuf NuGet som du använder i Stream Analytics-projektet matchar den Protobuf-version som användes för att generera indata.

Följande kodfragment är deserializer-implementeringen förutsatt att den genererade filen ingår i projektet. Den här implementeringen är bara en tunn omslutning över den genererade filen.

    public class MessageBodyDeserializer : StreamDeserializer<SimulatedTemperatureSensor.MessageBodyProto>
    {
        public override IEnumerable<SimulatedTemperatureSensor.MessageBodyProto> Deserialize(Stream stream)
        {
            while (stream.Position < stream.Length)
            {
                yield return SimulatedTemperatureSensor.MessageBodyProto.Parser.ParseDelimitedFrom(stream);
            }
        }

        public override void Initialize(StreamingContext streamingContext)
        {
        }
    }

CSV

Följande kodfragment är en enkel CSV-deserialiserare som också visar spridningsfel.

using System.Collections.Generic;
using System.IO;

using Microsoft.Azure.StreamAnalytics;
using Microsoft.Azure.StreamAnalytics.Serialization;

namespace ExampleCustomCode.Serialization
{
    public class CustomCsvDeserializer : StreamDeserializer<CustomEvent>
    {
        private StreamingDiagnostics streamingDiagnostics;

        public override IEnumerable<CustomEvent> Deserialize(Stream stream)
        {
            using (var sr = new StreamReader(stream))
            {
                string line = sr.ReadLine();
                while (line != null)
                {
                    if (line.Length > 0 && !string.IsNullOrWhiteSpace(line))
                    {
                        string[] parts = line.Split(',');
                        if (parts.Length != 3)
                        {
                            streamingDiagnostics.WriteError("Did not get expected number of columns", $"Invalid line: {line}");
                        }
                        else
                        {
                            yield return new CustomEvent()
                            {
                                Column1 = parts[0],
                                Column2 = parts[1],
                                Column3 = parts[2]
                            };
                        }
                    }

                    line = sr.ReadLine();
                }
            }
        }

        public override void Initialize(StreamingContext streamingContext)
        {
            this.streamingDiagnostics = streamingContext.Diagnostics;
        }
    }

    public class CustomEvent
    {
        public string Column1 { get; set; }

        public string Column2 { get; set; }

        public string Column3 { get; set; }
    }
}

Serialiseringsformat för REST-API:er

Varje Stream Analytics-indata har ett serialiseringsformat. Mer information om indataalternativ finns i dokumentationen för REST API för indata.

Följande JavaScript-kod är ett exempel på serialiseringsformatet .NET deserializer när du använder REST-API:et:

{    
   "properties":{    
      "type":"stream",  
      "serialization":{    
         "type":"CustomCLR",  
         "properties":{    
            "serializationDllPath":"<path to the dll inside UserCustomCode\CLR\ folder>", 
            "serializationClassName":"<Full name of the deserializer class name>" 
         }  
      }
   }  
}  

serializationClassName ska vara en klass som implementerar StreamDeserializer<T>. Detta beskrivs i följande avsnitt.

Stöd för regioner

Den här funktionen är tillgänglig i följande regioner när du använder Standard SKU:

  • Västra centrala USA
  • Europa, norra
  • USA, östra
  • Västra USA
  • USA, östra 2
  • Västeuropa

Du kan begära stöd för fler regioner. Det finns dock ingen sådan regionbegränsning när du använder Stream Analytics-kluster.

Vanliga frågor och svar

När kommer den här funktionen att vara tillgänglig i alla Azure-regioner?

Den här funktionen är tillgänglig i 6 regioner. Om du är intresserad av att använda den här funktionen i en annan region kan du skicka en begäran. Stöd för alla Azure-regioner finns i översikten.

Kan jag komma åt MetadataPropertyValue från mina indata som liknar funktionen GetMetadataPropertyValue?

Den här funktionen stöds inte. Om du behöver den här funktionen kan du rösta på den här begäran på UserVoice.

Kan jag dela min deserializer-implementering med communityn så att andra kan dra nytta av det?

När du har implementerat din deserializer kan du hjälpa andra genom att dela den med communityn. Skicka din kod till GitHub-lagringsplatsen för Azure Stream Analytics.

Vilka är de andra begränsningarna med att använda anpassade deserialiserare i Stream Analytics?

Om dina indata är i Protobuf-format med ett schema som innehåller MapField typ kan du inte implementera en anpassad deserialiserare. Anpassade deserialiserare stöder inte heller exempeldata eller förhandsgranskningsdata.

Nästa steg