Delen via


Azure Event Hubs- en Kafka-gegevensstroomeindpunten configureren

Belangrijk

Deze pagina bevat instructies voor het beheren van Azure IoT Operations-onderdelen met behulp van Kubernetes-implementatiemanifesten, die in preview zijn. Deze functie is voorzien van verschillende beperkingen en mag niet worden gebruikt voor productieworkloads.

Raadpleeg de Aanvullende voorwaarden voor Microsoft Azure-previews voor juridische voorwaarden die van toepassing zijn op Azure-functies die in bèta of preview zijn of die anders nog niet algemeen beschikbaar zijn.

Als u bidirectionele communicatie tussen Azure IoT Operations en Apache Kafka-brokers wilt instellen, kunt u een gegevensstroomeindpunt configureren. Met deze configuratie kunt u het eindpunt, TLS (Transport Layer Security), verificatie en andere instellingen opgeven.

Vereisten

Azure Event Hubs

Azure Event Hubs is compatibel met het Kafka-protocol en kan worden gebruikt met gegevensstromen met enkele beperkingen.

Een Azure Event Hubs-naamruimte en Event Hub maken

Maak eerst een Azure Event Hubs-naamruimte waarvoor Kafka is ingeschakeld

Maak vervolgens een Event Hub in de naamruimte. Elke afzonderlijke Event Hub komt overeen met een Kafka-onderwerp. U kunt meerdere Event Hubs maken in dezelfde naamruimte om meerdere Kafka-onderwerpen weer te geven.

Machtiging toewijzen aan beheerde identiteit

Als u een gegevensstroomeindpunt voor Azure Event Hubs wilt configureren, raden we u aan een door de gebruiker toegewezen of door het systeem toegewezen beheerde identiteit te gebruiken. Deze methode is veilig en elimineert de noodzaak om referenties handmatig te beheren.

Nadat de Azure Event Hubs-naamruimte en event hub zijn gemaakt, moet u een rol toewijzen aan de door Azure IoT Operations beheerde identiteit die machtigingen verleent voor het verzenden of ontvangen van berichten naar de Event Hub.

Als u een door het systeem toegewezen beheerde identiteit gebruikt, gaat u in Azure Portal naar uw Azure IoT Operations-exemplaar en selecteert u Overzicht. Kopieer de naam van de extensie die wordt vermeld na de Azure IoT Operations Arc-extensie. Bijvoorbeeld azure-iot-operations-xxxx7. Uw door het systeem toegewezen beheerde identiteit vindt u met dezelfde naam van de Azure IoT Operations Arc-extensie.

Ga vervolgens naar de Event Hubs-naamruimte >Toegangsbeheer (IAM)>Roltoewijzing toevoegen.

  1. Selecteer op het tabblad Rol een geschikte rol, zoals Azure Event Hubs Data Sender of Azure Event Hubs Data Receiver. Hiermee krijgt de beheerde identiteit de benodigde machtigingen voor het verzenden of ontvangen van berichten voor alle Event Hubs in de naamruimte. Zie Een toepassing verifiëren met Microsoft Entra ID voor toegang tot Event Hubs-resources voor meer informatie.
  2. Op het tabblad Leden :
    1. Als u een door het systeem toegewezen beheerde identiteit gebruikt, selecteert u voor Toegang toewijzen aan, selecteert u de optie Gebruiker, groep of service-principal, selecteert u + Leden selecteren en zoekt u naar de naam van de Azure IoT Operations Arc-extensie.
    2. Als u een door de gebruiker toegewezen beheerde identiteit gebruikt, selecteert u de optie Beheerde identiteit toewijzen, selecteert u +Leden selecteren en zoekt u naar uw door de gebruiker toegewezen beheerde identiteit die is ingesteld voor cloudverbindingen.

Gegevensstroomeindpunt maken voor Azure Event Hubs

Zodra de Azure Event Hubs-naamruimte en Event Hub zijn geconfigureerd, kunt u een gegevensstroomeindpunt maken voor de Azure Event Hubs-naamruimte waarvoor Kafka is ingeschakeld.

  1. Selecteer in de bewerkingservaring het tabblad Gegevensstroomeindpunten .

  2. Selecteer onder Nieuw gegevensstroomeindpunt maken de optie Azure Event Hubs>New.

    Schermopname van bewerkingen om een Azure Event Hubs-gegevensstroomeindpunt te maken.

  3. Voer de volgende instellingen in voor het eindpunt:

    Instelling Omschrijving
    Naam De naam van het gegevensstroomeindpunt.
    Host De hostnaam van de Kafka-broker in de indeling <NAMESPACE>.servicebus.windows.net:9093. Neem het poortnummer 9093 op in de hostinstelling voor Event Hubs.
    Verificatiemethode De methode die wordt gebruikt voor verificatie. U wordt aangeraden door het systeem toegewezen beheerde identiteit of door de gebruiker toegewezen beheerde identiteit te kiezen.
  4. Selecteer Toepassen om het eindpunt in te richten.

Notitie

Het Kafka-onderwerp of afzonderlijke Event Hub wordt later geconfigureerd wanneer u de gegevensstroom maakt. Het Kafka-onderwerp is de bestemming voor de gegevensstroomberichten.

Verbindingsreeks gebruiken voor verificatie bij Event Hubs

Belangrijk

Als u de portal voor bewerkingservaring wilt gebruiken om geheimen te beheren, moet Azure IoT Operations eerst worden ingeschakeld met beveiligde instellingen door een Azure Key Vault te configureren en workloadidentiteiten in te schakelen. Zie Beveiligde instellingen inschakelen in azure IoT Operations-implementatie voor meer informatie.

Selecteer op de pagina eindpuntinstellingen voor gegevensstroombewerkingen het tabblad Basis en kies VERVOLGENS SASL voor verificatiemethode>.

Voer de volgende instellingen in voor het eindpunt:

Instelling Beschrijving
SASL-type Kies Plain.
Gesynchroniseerde geheime naam Voer een naam in van het Kubernetes-geheim dat de verbindingsreeks bevat.
Gebruikersnaamreferentie of tokengeheim De verwijzing naar de gebruikersnaam of het tokengeheim dat wordt gebruikt voor SASL-verificatie. Kies deze in de key vault-lijst of maak een nieuwe. De waarde moet zijn $ConnectionString.
Wachtwoordreferentie van tokengeheim De verwijzing naar het wachtwoord of tokengeheim dat wordt gebruikt voor SASL-verificatie. Kies deze in de key vault-lijst of maak een nieuwe. De waarde moet de notatie hebben van Endpoint=sb://<NAMESPACE>.servicebus.windows.net/;SharedAccessKeyName=<KEY-NAME>;SharedAccessKey=<KEY>.

Nadat u Verwijzing toevoegen hebt geselecteerd, voert u de volgende instellingen in als u Nieuwe maken selecteert:

Instelling Beschrijving
Geheime naam De naam van het geheim in Azure Key Vault. Kies een naam die u gemakkelijk kunt onthouden om het geheim later in de lijst te selecteren.
Geheime waarde Voer de gebruikersnaam in $ConnectionString. Voer voor het wachtwoord de verbindingsreeks in de notatie Endpoint=sb://<NAMESPACE>.servicebus.windows.net/;SharedAccessKeyName=<KEY-NAME>;SharedAccessKey=<KEY>in.
Activeringsdatum instellen Als dit is ingeschakeld, wordt de datum waarop het geheim actief wordt.
Vervaldatum instellen Als dit is ingeschakeld, wordt de datum waarop het geheim verloopt.

Zie Geheimen maken en beheren in Azure IoT Operations voor meer informatie over geheimen.

Beperkingen

Azure Event Hubs biedt geen ondersteuning voor alle compressietypen die Kafka ondersteunt. Momenteel wordt alleen GZIP-compressie ondersteund in Azure Event Hubs Premium- en toegewezen lagen. Het gebruik van andere compressietypen kan leiden tot fouten.

Aangepaste Kafka-brokers

Als u een gegevensstroomeindpunt wilt configureren voor niet-Event Hub Kafka-brokers, stelt u indien nodig de host, TLS, verificatie en andere instellingen in.

  1. Selecteer in de bewerkingservaring het tabblad Gegevensstroomeindpunten .

  2. Selecteer onder Nieuw gegevensstroomeindpunt maken de optie Aangepaste Kafka Broker>New.

    Schermopname van bewerkingen om een Kafka-gegevensstroomeindpunt te maken.

  3. Voer de volgende instellingen in voor het eindpunt:

    Instelling Omschrijving
    Naam De naam van het gegevensstroomeindpunt.
    Host De hostnaam van de Kafka-broker in de indeling <Kafka-broker-host>:xxxx. Voeg het poortnummer toe aan de hostinstelling.
    Verificatiemethode De methode die wordt gebruikt voor verificatie. Kies SASL.
    SASL-type Het type SASL-verificatie. Kies Plain, ScramSha256 of ScramSha512. Vereist als u SASL gebruikt.
    Gesynchroniseerde geheime naam De naam van het geheim. Vereist als u SASL gebruikt.
    Gebruikersnaamreferentie van tokengeheim De verwijzing naar de gebruikersnaam in het SASL-tokengeheim. Vereist als u SASL gebruikt.
  4. Selecteer Toepassen om het eindpunt in te richten.

Notitie

Momenteel biedt de bewerkingservaring geen ondersteuning voor het gebruik van een Kafka-gegevensstroomeindpunt als bron. U kunt een gegevensstroom maken met een kafka-gegevensstroomeindpunt van een bron met behulp van Kubernetes of Bicep.

Als u de eindpuntinstellingen wilt aanpassen, gebruikt u de volgende secties voor meer informatie.

Beschikbare verificatiemethoden

De volgende verificatiemethoden zijn beschikbaar voor Kafka-brokergegevensstroomeindpunten.

Door het systeem toegewezen beheerde identiteit

Voordat u het gegevensstroomeindpunt configureert, wijst u een rol toe aan de beheerde identiteit van Azure IoT Operations die toestemming verleent om verbinding te maken met de Kafka-broker:

  1. Ga in Azure Portal naar uw Azure IoT Operations-exemplaar en selecteer Overzicht.
  2. Kopieer de naam van de extensie die wordt vermeld na de Azure IoT Operations Arc-extensie. Bijvoorbeeld azure-iot-operations-xxxx7.
  3. Ga naar de cloudresource die u machtigingen moet verlenen. Ga bijvoorbeeld naar de Event Hubs-naamruimte >Toegangsbeheer (IAM)>Roltoewijzing toevoegen.
  4. Selecteer op het tabblad Rol een geschikte rol.
  5. Selecteer op het tabblad Leden voor Toegang toewijzen aan, selecteer de optie Gebruiker, groep of service-principal en selecteer vervolgens + Leden selecteren en zoek naar de beheerde identiteit van Azure IoT Operations. Bijvoorbeeld azure-iot-operations-xxxx7.

Configureer vervolgens het gegevensstroomeindpunt met door het systeem toegewezen beheerde identiteitsinstellingen.

Selecteer op de pagina eindpuntinstellingen voor gegevensstroombewerkingen het tabblad Basis en kies vervolgens de door het systeem toegewezen beheerde identiteit van de verificatiemethode>.

Met deze configuratie maakt u een beheerde identiteit met de standaarddoelgroep, die gelijk is aan de hostwaarde van de Event Hubs-naamruimte in de vorm van https://<NAMESPACE>.servicebus.windows.net. Als u de standaard doelgroep echter wilt overschrijven, kunt u het audience veld instellen op de gewenste waarde.

Niet ondersteund in de bewerkingservaring.

Door de gebruiker toegewezen beheerde identiteit

Als u door de gebruiker toegewezen beheerde identiteit wilt gebruiken voor verificatie, moet u Eerst Azure IoT Operations implementeren met beveiligde instellingen ingeschakeld. Vervolgens moet u een door de gebruiker toegewezen beheerde identiteit instellen voor cloudverbindingen. Zie Beveiligde instellingen inschakelen in azure IoT Operations-implementatie voor meer informatie.

Voordat u het gegevensstroomeindpunt configureert, wijst u een rol toe aan de door de gebruiker toegewezen beheerde identiteit die toestemming verleent om verbinding te maken met de Kafka-broker:

  1. Ga in Azure Portal naar de cloudresource die u moet verlenen. Ga bijvoorbeeld naar de Event Grid-naamruimte >Toegangsbeheer (IAM)>Roltoewijzing toevoegen.
  2. Selecteer op het tabblad Rol een geschikte rol.
  3. Selecteer op het tabblad Leden, voor Toegang toewijzen, de optie Beheerde identiteit toewijzen, selecteer vervolgens + Leden selecteren en zoek naar uw door de gebruiker toegewezen beheerde identiteit.

Configureer vervolgens het gegevensstroomeindpunt met door de gebruiker toegewezen beheerde identiteitsinstellingen.

Selecteer op de pagina eindpuntinstellingen voor gegevensstroombewerkingen het tabblad Basic en kies vervolgens verificatiemethode>door de gebruiker toegewezen beheerde identiteit.

Hier is het bereik de doelgroep van de beheerde identiteit. De standaardwaarde is hetzelfde als de hostwaarde van de Event Hubs-naamruimte in de vorm van https://<NAMESPACE>.servicebus.windows.net. Als u de standaard doelgroep echter wilt overschrijven, kunt u het bereikveld instellen op de gewenste waarde met Bicep of Kubernetes.

SASL

Als u SASL wilt gebruiken voor verificatie, geeft u de SASL-verificatiemethode op en configureert u het SASL-type en een geheime verwijzing met de naam van het geheim dat het SASL-token bevat.

Selecteer op de pagina eindpuntinstellingen voor gegevensstroombewerkingen het tabblad Basis en kies VERVOLGENS SASL voor verificatiemethode>.

Voer de volgende instellingen in voor het eindpunt:

Instelling Beschrijving
SASL-type Het type SASL-verificatie dat moet worden gebruikt. Ondersteunde typen zijn Plain, ScramSha256 en ScramSha512.
Gesynchroniseerde geheime naam De naam van het Kubernetes-geheim dat het SASL-token bevat.
Gebruikersnaamreferentie of tokengeheim De verwijzing naar de gebruikersnaam of het tokengeheim dat wordt gebruikt voor SASL-verificatie.
Wachtwoordreferentie van tokengeheim De verwijzing naar het wachtwoord of tokengeheim dat wordt gebruikt voor SASL-verificatie.

De ondersteunde SASL-typen zijn:

  • Plain
  • ScramSha256
  • ScramSha512

Het geheim moet zich in dezelfde naamruimte bevinden als het Kafka-gegevensstroomeindpunt. Het geheim moet het SASL-token hebben als sleutel-waardepaar.

Anoniem

Als u anonieme verificatie wilt gebruiken, werkt u de verificatiesectie van de Kafka-instellingen bij om de anonieme methode te gebruiken.

Selecteer op de pagina eindpuntinstellingen voor de gegevensstroomervaring het tabblad Basis en kies vervolgens Verificatiemethode>Geen.

Geavanceerde instellingen

U kunt geavanceerde instellingen instellen voor het Kafka-gegevensstroomeindpunt, zoals TLS, vertrouwd CA-certificaat, Kafka-berichteninstellingen, batchverwerking en CloudEvents. U kunt deze instellingen instellen op het tabblad Advanced Portal van het gegevensstroomeindpunt of in de gegevensstroomeindpuntresource.

Selecteer in de bewerkingservaring het tabblad Geavanceerd voor het gegevensstroomeindpunt.

Schermopname van bewerkingen om geavanceerde instellingen voor Kafka-gegevensstroomeindpunten in te stellen.

TLS-instellingen

TLS-modus

Als u TLS voor het Kafka-eindpunt wilt in- of uitschakelen, werkt u de mode instelling bij in de TLS-instellingen.

Schakel op de pagina eindpuntinstellingen voor gegevensstroomgegevensstromen het tabblad Geavanceerd in en gebruik vervolgens het selectievakje naast de TLS-modus ingeschakeld.

De TLS-modus kan worden ingesteld op Enabled of Disabled. Als de modus is ingesteld op Enabled, gebruikt de gegevensstroom een beveiligde verbinding met de Kafka-broker. Als de modus is ingesteld op Disabled, gebruikt de gegevensstroom een onveilige verbinding met de Kafka-broker.

Vertrouwd CA-certificaat

Configureer het vertrouwde CA-certificaat voor het Kafka-eindpunt om een beveiligde verbinding met de Kafka-broker tot stand te brengen. Deze instelling is belangrijk als de Kafka-broker gebruikmaakt van een zelfondertekend certificaat of een certificaat dat is ondertekend door een aangepaste CA die niet standaard wordt vertrouwd.

Selecteer op de pagina eindpuntinstellingen voor gegevensstroombewerkingen het tabblad Geavanceerd en gebruik vervolgens het veld Configuratietoewijzing voor vertrouwde CA-certificaten om de ConfigMap met het vertrouwde CA-certificaat op te geven.

Deze ConfigMap moet het CA-certificaat in PEM-indeling bevatten. De ConfigMap moet zich in dezelfde naamruimte bevinden als de Kafka-gegevensstroomresource. Voorbeeld:

kubectl create configmap client-ca-configmap --from-file root_ca.crt -n azure-iot-operations

Tip

Wanneer u verbinding maakt met Azure Event Hubs, is het CA-certificaat niet vereist omdat de Event Hubs-service gebruikmaakt van een certificaat dat is ondertekend door een openbare CERTIFICERINGsinstantie die standaard wordt vertrouwd.

Consumentengroep-id

De consumentengroep-id wordt gebruikt om de consumentengroep te identificeren die door de gegevensstroom wordt gebruikt om berichten uit het Kafka-onderwerp te lezen. De consumentengroep-id moet uniek zijn binnen de Kafka-broker.

Belangrijk

Wanneer het Kafka-eindpunt wordt gebruikt als bron, is de id van de consumentengroep vereist. Anders kan de gegevensstroom geen berichten lezen uit het Kafka-onderwerp en krijgt u de foutmelding 'Kafka-type broneindpunten moeten een consumerGroupId hebben gedefinieerd'.

Selecteer op de pagina eindpuntinstellingen voor gegevensstroombewerkingen het tabblad Geavanceerd en gebruik vervolgens het veld Consumentengroep-id om de id van de consumentengroep op te geven.

Deze instelling wordt alleen van kracht als het eindpunt wordt gebruikt als bron (de gegevensstroom is een consument).

Compressie

Het compressieveld maakt compressie mogelijk voor de berichten die naar Kafka-onderwerpen worden verzonden. Compressie helpt de netwerkbandbreedte en opslagruimte te verminderen die nodig is voor gegevensoverdracht. Compressie voegt echter ook enige overhead en latentie toe aan het proces. De ondersteunde compressietypen worden vermeld in de volgende tabel.

Weergegeven als Beschrijving
None Er wordt geen compressie of batchverwerking toegepast. Geen is de standaardwaarde als er geen compressie is opgegeven.
Gzip GZIP-compressie en batchverwerking worden toegepast. GZIP is een algoritme voor algemene compressie dat een goede balans biedt tussen de compressieverhouding en snelheid. Momenteel wordt alleen GZIP-compressie ondersteund in Azure Event Hubs Premium- en toegewezen lagen .
Snappy Snappy-compressie en batchverwerking worden toegepast. Snappy is een snel compressiealgoritme dat gemiddelde compressieverhouding en snelheid biedt. Deze compressiemodus wordt niet ondersteund door Azure Event Hubs.
Lz4 LZ4-compressie en batchverwerking worden toegepast. LZ4 is een snel compressiealgoritme dat lage compressieverhouding en hoge snelheid biedt. Deze compressiemodus wordt niet ondersteund door Azure Event Hubs.

Compressie configureren:

Selecteer op de pagina eindpuntinstellingen voor gegevensstroombewerkingen het tabblad Geavanceerd en gebruik vervolgens het veld Compressie om het compressietype op te geven.

Deze instelling wordt alleen van kracht als het eindpunt wordt gebruikt als bestemming waar de gegevensstroom een producent is.

Batching

Naast compressie kunt u ook batchverwerking configureren voor berichten voordat u ze naar Kafka-onderwerpen verzendt. Met batchverwerking kunt u meerdere berichten groeperen en comprimeren als één eenheid, waardoor de compressieefficiëntie kan worden verbeterd en de netwerkoverhead wordt verminderd.

Veld Beschrijving Vereist
mode Deze waarde kan Enabled of Disabled zijn. De standaardwaarde is Enabled omdat Kafka geen notie van niet-verwerkte berichten heeft. Als deze optie is ingesteld Disabled, wordt de batchverwerking geminimaliseerd om telkens één bericht te maken met één bericht. Nee
latencyMs Het maximale tijdsinterval in milliseconden dat berichten kunnen worden gebufferd voordat ze worden verzonden. Als dit interval is bereikt, worden alle gebufferde berichten verzonden als een batch, ongeacht hoeveel of hoe groot ze zijn. Als deze niet is ingesteld, is de standaardwaarde 5. Nee
maxMessages Het maximum aantal berichten dat kan worden gebufferd voordat deze wordt verzonden. Als dit aantal is bereikt, worden alle gebufferde berichten verzonden als een batch, ongeacht hoe groot of hoe lang ze worden gebufferd. Als deze niet is ingesteld, is de standaardwaarde 100000. Nee
maxBytes De maximale grootte in bytes die kunnen worden gebufferd voordat ze worden verzonden. Als deze grootte is bereikt, worden alle gebufferde berichten verzonden als een batch, ongeacht hoeveel of hoe lang ze worden gebufferd. De standaardwaarde is 10000000 (1 MB). Nee

Als u bijvoorbeeld latentiem's instelt op 1000, maxMessages op 100 en maxBytes op 1024, worden berichten verzonden wanneer er 100 berichten in de buffer staan of wanneer er 1024 bytes in de buffer staan, of wanneer er 1000 milliseconden zijn verstreken sinds de laatste verzending, afhankelijk van wat het eerst gebeurt.

Batchverwerking configureren:

Selecteer op de pagina eindpuntinstellingen van de bewerkingservaring het tabblad Geavanceerd en gebruik vervolgens het veld Batching ingeschakeld om batchverwerking in te schakelen. Gebruik de velden Batching-latentie, Maximum aantal bytes en Aantal berichten om de batchinstellingen op te geven.

Deze instelling wordt alleen van kracht als het eindpunt wordt gebruikt als bestemming waar de gegevensstroom een producent is.

Strategie voor het afhandelen van partities

De strategie voor het afhandelen van partities bepaalt hoe berichten worden toegewezen aan Kafka-partities wanneer ze naar Kafka-onderwerpen worden verzonden. Kafka-partities zijn logische segmenten van een Kafka-onderwerp dat parallelle verwerking en fouttolerantie mogelijk maakt. Elk bericht in een Kafka-onderwerp heeft een partitie en een offset, die worden gebruikt om de berichten te identificeren en te rangschikken.

Deze instelling wordt alleen van kracht als het eindpunt wordt gebruikt als bestemming waar de gegevensstroom een producent is.

Standaard wijst een gegevensstroom berichten toe aan willekeurige partities met behulp van een round robin-algoritme. U kunt echter verschillende strategieën gebruiken om berichten toe te wijzen aan partities op basis van bepaalde criteria, zoals de MQTT-onderwerpnaam of een MQTT-berichteigenschap. Dit kan u helpen om een betere taakverdeling, gegevenslocatie of berichtvolgorde te bereiken.

Weergegeven als Beschrijving
Default Hiermee worden berichten toegewezen aan willekeurige partities met behulp van een round robin-algoritme. Dit is de standaardwaarde als er geen strategie is opgegeven.
Static Hiermee worden berichten toegewezen aan een vast partitienummer dat is afgeleid van de exemplaar-id van de gegevensstroom. Dit betekent dat elk gegevensstroomexemplaren berichten naar een andere partitie verzenden. Dit kan helpen om betere taakverdeling en gegevenslocatie te bereiken.
Topic Gebruikt de MQTT-onderwerpnaam uit de gegevensstroombron als de sleutel voor partitionering. Dit betekent dat berichten met dezelfde MQTT-onderwerpnaam naar dezelfde partitie worden verzonden. Dit kan helpen om een betere volgorde van berichten en gegevenslocatie te bereiken.
Property Gebruikt een MQTT-berichteigenschap van de gegevensstroombron als sleutel voor partitionering. Geef de naam van de eigenschap in het partitionKeyProperty veld op. Dit betekent dat berichten met dezelfde eigenschapswaarde naar dezelfde partitie worden verzonden. Dit kan helpen om een betere volgorde van berichten en gegevenslocatie te bereiken op basis van een aangepast criterium.

Als u bijvoorbeeld de partitieverwerkingsstrategie Property instelt op en de eigenschap device-idpartitiesleutel op, worden berichten met dezelfde device-id eigenschap naar dezelfde partitie verzonden.

De strategie voor het afhandelen van partities configureren:

Selecteer op de pagina eindpuntinstellingen voor gegevensstroombewerkingen het tabblad Geavanceerd en gebruik vervolgens het veld Strategie voor afhandeling van partities om de strategie voor het afhandelen van partities op te geven. Gebruik het eigenschapsveld Partitiesleutel om de eigenschap op te geven die wordt gebruikt voor partitioneren als de strategie is ingesteld op Property.

Kafka-bevestigingen

Kafka-bevestigingen (acks) worden gebruikt voor het beheren van de duurzaamheid en consistentie van berichten die naar Kafka-onderwerpen worden verzonden. Wanneer een producent een bericht naar een Kafka-onderwerp verzendt, kan het verschillende niveaus van bevestigingen van de Kafka-broker aanvragen om ervoor te zorgen dat het bericht naar het onderwerp is geschreven en wordt gerepliceerd in het Kafka-cluster.

Deze instelling wordt alleen van kracht als het eindpunt wordt gebruikt als een bestemming (dat wil gezegd, de gegevensstroom is een producent).

Weergegeven als Beschrijving
None De gegevensstroom wacht niet op eventuele bevestigingen van de Kafka-broker. Deze instelling is de snelste maar minst duurzame optie.
All De gegevensstroom wacht totdat het bericht naar de leiderpartitie en alle volgerpartities is geschreven. Deze instelling is de langzaamste maar meest duurzame optie. Deze instelling is ook de standaardoptie
One De gegevensstroom wacht totdat het bericht naar de leiderpartitie en ten minste één volgpartitie wordt geschreven.
Zero De gegevensstroom wacht totdat het bericht naar de leiderpartitie wordt geschreven, maar wacht niet op eventuele bevestigingen van de volgers. Dit is sneller dan One maar minder duurzaam.

Als u bijvoorbeeld de Kafka-bevestiging Allinstelt, wacht de gegevensstroom totdat het bericht naar de leiderpartitie en alle volgerpartities wordt geschreven voordat het volgende bericht wordt verzonden.

De Kafka-bevestigingen configureren:

Selecteer op de pagina eindpuntinstellingen voor gegevensstroombewerkingen het tabblad Geavanceerd en gebruik vervolgens het kafka-bevestigingsveld om het bevestigingsniveau kafka op te geven.

Deze instelling wordt alleen van kracht als het eindpunt wordt gebruikt als een bestemming waar de gegevensstroom een producent is.

MQTT-eigenschappen kopiëren

De instelling voor het kopiëren van MQTT-eigenschappen is standaard ingeschakeld. Deze gebruikerseigenschappen bevatten waarden zoals subject die de naam van de asset die het bericht verzendt, opslaat.

Schakel op de pagina eindpuntinstellingen voor de gegevensstroomervaring het tabblad Geavanceerd in en gebruik vervolgens het selectievakje naast het veld MQTT-eigenschappen kopiëren om het kopiëren van MQTT-eigenschappen in of uit te schakelen.

In de volgende secties wordt beschreven hoe MQTT-eigenschappen worden vertaald naar Kafka-gebruikersheaders en omgekeerd wanneer de instelling is ingeschakeld.

Kafka-eindpunt is een bestemming

Wanneer een Kafka-eindpunt een gegevensstroombestemming is, worden alle gedefinieerde eigenschappen van de MQTT v5-gebruikers vertaald. Een MQTT v5-bericht met 'Inhoudstype' dat wordt doorgestuurd naar Kafka, wordt bijvoorbeeld omgezet in de Kafka-gebruikerskoptekst "Content Type":{specifiedValue}. Vergelijkbare regels zijn van toepassing op andere ingebouwde MQTT-eigenschappen, gedefinieerd in de volgende tabel.

MQTT-eigenschap Vertaald gedrag
Indicator voor nettoladingnotatie Sleutel: "Indicator voor nettoladingnotatie"
Waarde: "0" (Payload is bytes) of "1" (Payload is UTF-8)
Antwoordonderwerp Sleutel: 'Antwoordonderwerp'
Waarde: Kopie van antwoordonderwerp uit het oorspronkelijke bericht.
Verloopinterval van bericht Sleutel: "Interval voor verlopen bericht"
Waarde: UTF-8 weergave van het aantal seconden voordat het bericht verloopt. Zie de eigenschap Verloopinterval van bericht voor meer informatie.
Correlatiegegevens: Sleutel: "Correlatiegegevens"
Waarde: Kopie van correlatiegegevens uit het oorspronkelijke bericht. In tegenstelling tot veel MQTT v5-eigenschappen die UTF-8 zijn gecodeerd, kunnen correlatiegegevens willekeurige gegevens zijn.
Inhoudstype: Sleutel: 'Inhoudstype'
Waarde: Kopie van inhoudstype van oorspronkelijk bericht.

Sleutel-waardeparen voor MQTT v5-gebruikerseigenschapssleutels worden rechtstreeks vertaald naar Kafka-gebruikersheaders. Als een gebruikerskoptekst in een bericht dezelfde naam heeft als een ingebouwde MQTT-eigenschap (bijvoorbeeld een gebruikersheader met de naam Correlatiegegevens), wordt de eigenschapswaarde van de MQTT v5-specificatie doorsturen of de gebruikerseigenschap niet gedefinieerd.

Gegevensstromen ontvangen deze eigenschappen nooit van een MQTT Broker. Een gegevensstroom stuurt ze dus nooit door:

  • Onderwerpalias
  • Abonnements-id's
De eigenschap Verloopinterval van bericht

Het interval voor het verlopen van het bericht geeft aan hoe lang een bericht in een MQTT-broker kan blijven voordat het wordt verwijderd.

Wanneer een gegevensstroom een MQTT-bericht ontvangt met het opgegeven interval voor het verlopen van het bericht, wordt het volgende weergegeven:

  • Registreert de tijd waarop het bericht is ontvangen.
  • Voordat het bericht naar de bestemming wordt verzonden, wordt de tijd afgetrokken van het bericht in de wachtrij van de oorspronkelijke verlooptijd.
  • Als het bericht niet is verlopen (de bovenstaande bewerking is > 0), wordt het bericht verzonden naar de bestemming en bevat de bijgewerkte verlooptijd van het bericht.
  • Als het bericht is verlopen (de bovenstaande bewerking is <= 0), wordt het bericht niet verzonden door het doel.

Voorbeelden:

  • Een gegevensstroom ontvangt een MQTT-bericht met het interval voor het verlopen van berichten = 3600 seconden. De bijbehorende bestemming is tijdelijk verbroken, maar kan opnieuw verbinding maken. 1000 seconden verstreken voordat dit MQTT-bericht naar het doel wordt verzonden. In dit geval heeft het bericht van de bestemming het interval voor het verlopen van het bericht ingesteld op 2600 (3600 - 1000) seconden.
  • De gegevensstroom ontvangt een MQTT-bericht met het interval voor het verlopen van berichten = 3600 seconden. De bijbehorende bestemming is tijdelijk verbroken, maar kan opnieuw verbinding maken. In dit geval duurt het echter 4000 seconden om opnieuw verbinding te maken. Het bericht is verlopen en de gegevensstroom stuurt dit bericht niet door naar de bestemming.

Kafka-eindpunt is een gegevensstroombron

Notitie

Er is een bekend probleem bij het gebruik van Event Hubs-eindpunt als een gegevensstroombron waarbij kafka-header beschadigd raakt als vertaald naar MQTT. Dit gebeurt alleen als u Event Hub gebruikt via de Event Hub-client die AMQP onder de dekking gebruikt. Bijvoorbeeld "foo"="bar", de "foo" wordt vertaald, maar de waarde wordt"\xa1\x03bar".

Wanneer een Kafka-eindpunt een gegevensstroombron is, worden Kafka-gebruikersheaders vertaald naar MQTT v5-eigenschappen. In de volgende tabel wordt beschreven hoe Kafka-gebruikersheaders worden vertaald naar MQTT v5-eigenschappen.

Kafka-header Vertaald gedrag
Sleutel Sleutel: "Sleutel"
Waarde: Kopieer de sleutel van het oorspronkelijke bericht.
Tijdstempel Sleutel: "Timestamp"
Waarde: UTF-8-codering van Kafka Timestamp, een aantal milliseconden sinds unix-epoch.

Kafka-headersleutel/waardeparen voor kafka-gebruikers, mits ze allemaal zijn gecodeerd in UTF-8, worden rechtstreeks omgezet in eigenschappen van MQTT-gebruikerssleutel/-waarde.

UTF-8/Binaire niet-overeenkomende items

MQTT v5 kan alleen op UTF-8 gebaseerde eigenschappen ondersteunen. Als de gegevensstroom een Kafka-bericht ontvangt dat een of meer niet-UTF-8-headers bevat, gebeurt het volgende:

  • Verwijder de offending-eigenschap of eigenschappen.
  • Stuur de rest van het bericht door volgens de vorige regels.

Toepassingen die binaire overdracht in Kafka-bronheaders => MQTT-doeleigenschappen vereisen, moeten ze eerst coderen via UTF-8, bijvoorbeeld via Base64.

>=64 kB-eigenschap komt niet overeen

MQTT v5-eigenschappen moeten kleiner zijn dan 64 kB. Als de gegevensstroom een Kafka-bericht ontvangt dat een of meer headers bevat die = 64 kB zijn >, zal de gegevensstroom het volgende doen:

  • Verwijder de offending-eigenschap of eigenschappen.
  • Stuur de rest van het bericht door volgens de vorige regels.
Eigenschapsomzetting bij gebruik van Event Hubs en producenten die AMQP gebruiken

Als u een client hebt die berichten doorstuurt via een Kafka-gegevensstroombroneindpunt, voert u een van de volgende acties uit:

  • Berichten verzenden naar Event Hubs met behulp van clientbibliotheken zoals Azure.Messaging.EventHubs
  • AMQP rechtstreeks gebruiken

Er zijn nuances voor het vertalen van eigenschappen waar u rekening mee moet houden.

U moet een van de volgende handelingen uitvoeren:

  • Vermijd het verzenden van eigenschappen
  • Als u eigenschappen moet verzenden, verzendt u waarden die zijn gecodeerd als UTF-8.

Wanneer Event Hubs eigenschappen van AMQP naar Kafka vertaalt, bevat deze de onderliggende met AMQP gecodeerde typen in het bericht. Zie Gebeurtenissen uitwisselen tussen consumenten en producenten met behulp van verschillende protocollen voor meer informatie over het gedrag.

In het volgende codevoorbeeld wanneer het gegevensstroomeindpunt de waarde "foo":"bar"ontvangt, ontvangt het de eigenschap als <0xA1 0x03 "bar">.

using global::Azure.Messaging.EventHubs;
using global::Azure.Messaging.EventHubs.Producer;

var propertyEventBody = new BinaryData("payload");

var propertyEventData = new EventData(propertyEventBody)
{
  Properties =
  {
    {"foo", "bar"},
  }
};

var propertyEventAdded = eventBatch.TryAdd(propertyEventData);
await producerClient.SendAsync(eventBatch);

Het eindpunt van de gegevensstroom kan de eigenschap <0xA1 0x03 "bar"> nettolading niet doorsturen naar een MQTT-bericht omdat de gegevens niet UTF-8 zijn. Als u echter een UTF-8-tekenreeks opgeeft, vertaalt het gegevensstroomeindpunt de tekenreeks voordat deze naar MQTT wordt verzonden. Als u een UTF-8-tekenreeks gebruikt, zou het MQTT-bericht als gebruikerseigenschappen hebben "foo":"bar" .

Alleen UTF-8-headers worden vertaald. Bijvoorbeeld, in het volgende scenario waarin de eigenschap is ingesteld als een float:

Properties = 
{
  {"float-value", 11.9 },
}

Het gegevensstroomeindpunt verwijdert pakketten die het "float-value" veld bevatten.

Niet alle eigenschappen van gebeurtenisgegevens, waaronder propertyEventData.correlationId, worden doorgestuurd. Zie Eigenschappen van gebeurtenisgebruikers voor meer informatie,

CloudEvents

CloudEvents zijn een manier om gebeurtenisgegevens op een gemeenschappelijke manier te beschrijven. De CloudEvents-instellingen worden gebruikt voor het verzenden of ontvangen van berichten in de CloudEvents-indeling. U kunt CloudEvents gebruiken voor gebeurtenisgestuurde architecturen waarbij verschillende services met elkaar moeten communiceren in dezelfde of verschillende cloudproviders.

De CloudEventAttributes opties zijn Propagate ofCreateOrRemap.

Selecteer op de pagina eindpuntinstellingen voor gegevensstroombewerkingen het tabblad Geavanceerd en gebruik vervolgens het veld Kenmerken van cloud-gebeurtenissen om de instelling CloudEvents op te geven.

In de volgende secties wordt beschreven hoe CloudEvent-eigenschappen worden doorgegeven of gemaakt en opnieuw worden toegewezen.

Instelling doorgeven

CloudEvent-eigenschappen worden doorgegeven voor berichten die de vereiste eigenschappen bevatten. Als het bericht niet de vereiste eigenschappen bevat, wordt het bericht als zodanig doorgegeven. Als de vereiste eigenschappen aanwezig zijn, wordt er een ce_ voorvoegsel toegevoegd aan de naam van de cloudEvent-eigenschap.

Naam Vereist Voorbeeldwaarde Uitvoernaam Uitvoerwaarde
specversion Ja 1.0 ce-specversion Doorgegeven zoals is
type Ja ms.aio.telemetry ce-type Doorgegeven zoals is
source Ja aio://mycluster/myoven ce-source Doorgegeven zoals is
id Ja A234-1234-1234 ce-id Doorgegeven zoals is
subject Nee aio/myoven/telemetry/temperature ce-subject Doorgegeven zoals is
time Nee 2018-04-05T17:31:00Z ce-time Doorgegeven zoals is. Het is niet herampt.
datacontenttype Nee application/json ce-datacontenttype Gewijzigd in het inhoudstype uitvoergegevens na de optionele transformatiefase.
dataschema Nee sr://fabrikam-schemas/123123123234234234234234#1.0.0 ce-dataschema Als er een uitvoergegevenstransformatieschema wordt gegeven in de transformatieconfiguratie, dataschema wordt dit gewijzigd in het uitvoerschema.

Instelling CreateOrRemap

CloudEvent-eigenschappen worden doorgegeven voor berichten die de vereiste eigenschappen bevatten. Als het bericht niet de vereiste eigenschappen bevat, worden de eigenschappen gegenereerd.

Naam Vereist Uitvoernaam Gegenereerde waarde als deze ontbreekt
specversion Ja ce-specversion 1.0
type Ja ce-type ms.aio-dataflow.telemetry
source Ja ce-source aio://<target-name>
id Ja ce-id Gegenereerde UUID in de doelclient
subject Nee ce-subject Het uitvoeronderwerp waarin het bericht wordt verzonden
time Nee ce-time Gegenereerd als RFC 3339 in de doelclient
datacontenttype Nee ce-datacontenttype Gewijzigd in het inhoudstype uitvoergegevens na de optionele transformatiefase
dataschema Nee ce-dataschema Schema dat is gedefinieerd in het schemaregister

Volgende stappen

Zie Een gegevensstroom maken voor meer informatie over gegevensstromen.