Konfigurowanie punktów końcowych przepływu danych usługi Azure Event Hubs i platformy Kafka
Ważne
Ta strona zawiera instrukcje dotyczące zarządzania składnikami operacji usługi Azure IoT przy użyciu manifestów wdrażania platformy Kubernetes, które są w wersji zapoznawczej. Ta funkcja jest udostępniana z kilkoma ograniczeniami i nie powinna być używana w przypadku obciążeń produkcyjnych.
Zobacz Dodatkowe warunki użytkowania wersji zapoznawczych platformy Microsoft Azure, aby zapoznać się z postanowieniami prawnymi dotyczącymi funkcji platformy Azure, które są w wersji beta lub wersji zapoznawczej albo w inny sposób nie zostały jeszcze wydane jako ogólnie dostępne.
Aby skonfigurować dwukierunkową komunikację między operacjami usługi Azure IoT i brokerami platformy Apache Kafka, możesz skonfigurować punkt końcowy przepływu danych. Ta konfiguracja umożliwia określenie punktu końcowego, protokołu Transport Layer Security (TLS), uwierzytelniania i innych ustawień.
Wymagania wstępne
- Wystąpienie operacji usługi Azure IoT
Azure Event Hubs
Usługa Azure Event Hubs jest zgodna z protokołem Kafka i może być używana z przepływami danych z pewnymi ograniczeniami.
Tworzenie przestrzeni nazw i centrum zdarzeń usługi Azure Event Hubs
Najpierw utwórz przestrzeń nazw usługi Azure Event Hubs z obsługą platformy Kafka
Następnie utwórz centrum zdarzeń w przestrzeni nazw. Każde pojedyncze centrum zdarzeń odpowiada tematowi platformy Kafka. Możesz utworzyć wiele centrów zdarzeń w tej samej przestrzeni nazw, aby reprezentować wiele tematów platformy Kafka.
Przypisywanie uprawnień do tożsamości zarządzanej
Aby skonfigurować punkt końcowy przepływu danych dla usługi Azure Event Hubs, zalecamy użycie tożsamości zarządzanej przypisanej przez użytkownika lub przypisanej przez system. Takie podejście jest bezpieczne i eliminuje konieczność ręcznego zarządzania poświadczeniami.
Po utworzeniu przestrzeni nazw i centrum zdarzeń usługi Azure Event Hubs należy przypisać rolę do tożsamości zarządzanej operacji usługi Azure IoT, która udziela uprawnień do wysyłania lub odbierania komunikatów do centrum zdarzeń.
Jeśli korzystasz z tożsamości zarządzanej przypisanej przez system, w witrynie Azure Portal przejdź do wystąpienia operacji usługi Azure IoT i wybierz pozycję Przegląd. Skopiuj nazwę rozszerzenia wymienionego po rozszerzeniu Usługi Azure IoT Operations Arc. Na przykład azure-iot-operations-xxxx7. Tożsamość zarządzana przypisana przez system można znaleźć przy użyciu tej samej nazwy rozszerzenia usługi Azure IoT Operations Arc.
Następnie przejdź do przestrzeni nazw >usługi Event Hubs Access Control (IAM)>Dodaj przypisanie roli.
- Na karcie Rola wybierz odpowiednią rolę, na przykład
Azure Event Hubs Data Sender
lubAzure Event Hubs Data Receiver
. Daje to tożsamości zarządzanej niezbędne uprawnienia do wysyłania lub odbierania komunikatów dla wszystkich centrów zdarzeń w przestrzeni nazw. Aby dowiedzieć się więcej, zobacz Uwierzytelnianie aplikacji przy użyciu identyfikatora Entra firmy Microsoft w celu uzyskania dostępu do zasobów usługi Event Hubs. - Na karcie Członkowie:
- W przypadku korzystania z tożsamości zarządzanej przypisanej przez system w polu Przypisz dostęp do wybierz opcję Użytkownik, grupa lub jednostka usługi , a następnie wybierz pozycję + Wybierz członków i wyszukaj nazwę rozszerzenia Azure IoT Operations Arc.
- Jeśli używasz tożsamości zarządzanej przypisanej przez użytkownika, w obszarze Przypisz dostęp do wybierz opcję Tożsamość zarządzana, a następnie wybierz pozycję + Wybierz członków i wyszukaj tożsamość zarządzaną przypisaną przez użytkownika skonfigurowaną dla połączeń w chmurze.
Tworzenie punktu końcowego przepływu danych dla usługi Azure Event Hubs
Po skonfigurowaniu przestrzeni nazw i centrum zdarzeń usługi Azure Event Hubs można utworzyć punkt końcowy przepływu danych dla przestrzeni nazw usługi Azure Event Hubs z obsługą platformy Kafka.
W środowisku operacji wybierz kartę Punkty końcowe przepływu danych.
W obszarze Tworzenie nowego punktu końcowego przepływu danych wybierz pozycję Azure Event Hubs>Nowy.
Wprowadź następujące ustawienia punktu końcowego:
Ustawienie opis Nazwa/nazwisko Nazwa punktu końcowego przepływu danych. Gospodarz Nazwa hosta brokera platformy Kafka w formacie <NAMESPACE>.servicebus.windows.net:9093
. Dołącz numer9093
portu w ustawieniu hosta dla usługi Event Hubs.Metoda uwierzytelniania Metoda używana do uwierzytelniania. Zalecamy wybranie tożsamości zarządzanej przypisanej przez system lub tożsamości zarządzanej przypisanej przez użytkownika. Wybierz pozycję Zastosuj , aby aprowizować punkt końcowy.
Uwaga
Temat platformy Kafka lub pojedyncze centrum zdarzeń jest konfigurowany później podczas tworzenia przepływu danych. Temat platformy Kafka jest miejscem docelowym komunikatów przepływu danych.
Używanie parametry połączenia do uwierzytelniania w usłudze Event Hubs
Ważne
Aby można było zarządzać wpisami tajnymi za pomocą portalu obsługi operacji, najpierw należy włączyć operacje usługi Azure IoT z bezpiecznymi ustawieniami, konfigurując usługę Azure Key Vault i włączając tożsamości obciążeń. Aby dowiedzieć się więcej, zobacz Włączanie bezpiecznych ustawień we wdrożeniu operacji usługi Azure IoT.
Na stronie ustawienia punktu końcowego przepływu danych środowiska operacji wybierz kartę Podstawowa, a następnie wybierz pozycję SasL metody>uwierzytelniania.
Wprowadź następujące ustawienia punktu końcowego:
Ustawienie | opis |
---|---|
Typ sygnatury dostępu współdzielonego | Wybierz plik Plain . |
Nazwa zsynchronizowanego wpisu tajnego | Wprowadź nazwę wpisu tajnego kubernetes zawierającego parametry połączenia. |
Odwołanie do nazwy użytkownika lub wpis tajny tokenu | Odwołanie do nazwy użytkownika lub klucza tajnego tokenu używanego do uwierzytelniania SASL. Wybierz ją z listy usługi Key Vault lub utwórz nową. Wartość musi mieć wartość $ConnectionString . |
Odwołanie do hasła wpisu tajnego tokenu | Odwołanie do hasła lub klucza tajnego tokenu używanego do uwierzytelniania SASL. Wybierz ją z listy usługi Key Vault lub utwórz nową. Wartość musi być w formacie Endpoint=sb://<NAMESPACE>.servicebus.windows.net/;SharedAccessKeyName=<KEY-NAME>;SharedAccessKey=<KEY> . |
Po wybraniu pozycji Dodaj odwołanie, jeśli wybierzesz pozycję Utwórz nową, wprowadź następujące ustawienia:
Ustawienie | opis |
---|---|
Nazwa wpisu tajnego | Nazwa wpisu tajnego w usłudze Azure Key Vault. Wybierz nazwę, która jest łatwa do zapamiętania, aby wybrać wpis tajny później z listy. |
Wartość wpisu tajnego | W polu nazwa użytkownika wprowadź .$ConnectionString Jako hasło wprowadź parametry połączenia w formacie Endpoint=sb://<NAMESPACE>.servicebus.windows.net/;SharedAccessKeyName=<KEY-NAME>;SharedAccessKey=<KEY> . |
Ustawianie daty aktywacji | Jeśli jest włączona, data, gdy wpis tajny stanie się aktywny. |
Ustawianie daty wygaśnięcia | Jeśli jest włączona, data wygaśnięcia wpisu tajnego. |
Aby dowiedzieć się więcej na temat wpisów tajnych, zobacz Tworzenie wpisów tajnych i zarządzanie nimi w operacjach usługi Azure IoT.
Ograniczenia
Usługa Azure Event Hubs nie obsługuje wszystkich typów kompresji, które obsługuje platforma Kafka. Obecnie tylko kompresja GZIP jest obsługiwana w warstwach Premium i dedykowanych usługi Azure Event Hubs. Użycie innych typów kompresji może spowodować błędy.
Niestandardowe brokery platformy Kafka
Aby skonfigurować punkt końcowy przepływu danych dla brokerów platformy Kafka spoza centrum zdarzeń, ustaw host, protokół TLS, uwierzytelnianie i inne ustawienia zgodnie z potrzebami.
W środowisku operacji wybierz kartę Punkty końcowe przepływu danych.
W obszarze Tworzenie nowego punktu końcowego przepływu danych wybierz pozycję Niestandardowy broker>platformy Kafka Nowy.
Wprowadź następujące ustawienia punktu końcowego:
Ustawienie opis Nazwa/nazwisko Nazwa punktu końcowego przepływu danych. Gospodarz Nazwa hosta brokera platformy Kafka w formacie <Kafka-broker-host>:xxxx
. Dołącz numer portu w ustawieniu hosta.Metoda uwierzytelniania Metoda używana do uwierzytelniania. Wybierz pozycję SASL. Typ sygnatury dostępu współdzielonego Typ uwierzytelniania SASL. Wybierz pozycję Zwykły, ScramSha256 lub ScramSha512. Wymagane w przypadku używania sygnatury dostępu współdzielonego( SASL). Nazwa zsynchronizowanego wpisu tajnego Nazwa wpisu tajnego. Wymagane w przypadku używania sygnatury dostępu współdzielonego( SASL). Odwołanie do nazwy użytkownika wpisu tajnego tokenu Odwołanie do nazwy użytkownika w kluczu tajnym tokenu SASL. Wymagane w przypadku używania sygnatury dostępu współdzielonego( SASL). Wybierz pozycję Zastosuj , aby aprowizować punkt końcowy.
Uwaga
Obecnie środowisko operacji nie obsługuje używania punktu końcowego przepływu danych platformy Kafka jako źródła. Przepływ danych można utworzyć za pomocą źródłowego punktu końcowego przepływu danych platformy Kafka przy użyciu platformy Kubernetes lub Bicep.
Aby dostosować ustawienia punktu końcowego, skorzystaj z poniższych sekcji, aby uzyskać więcej informacji.
Dostępne metody uwierzytelniania
Następujące metody uwierzytelniania są dostępne dla punktów końcowych przepływu danych brokera platformy Kafka.
Tożsamość zarządzana przypisana przez system
Przed skonfigurowaniem punktu końcowego przepływu danych przypisz rolę do tożsamości zarządzanej operacje usługi Azure IoT, która udziela uprawnień do nawiązywania połączenia z brokerem platformy Kafka:
- W witrynie Azure Portal przejdź do wystąpienia operacji usługi Azure IoT i wybierz pozycję Przegląd.
- Skopiuj nazwę rozszerzenia wymienionego po rozszerzeniu Usługi Azure IoT Operations Arc. Na przykład azure-iot-operations-xxxx7.
- Przejdź do zasobu w chmurze, który musisz udzielić uprawnień. Na przykład przejdź do obszaru nazw >Usługi Event Hubs Kontrola dostępu (IAM)>Dodaj przypisanie roli.
- Na karcie Rola wybierz odpowiednią rolę.
- Na karcie Członkowie w polu Przypisz dostęp do wybierz opcję Użytkownik, grupa lub jednostka usługi, a następnie wybierz pozycję + Wybierz członków i wyszukaj tożsamość zarządzaną operacji usługi Azure IoT Operations. Na przykład azure-iot-operations-xxxx7.
Następnie skonfiguruj punkt końcowy przepływu danych przy użyciu ustawień tożsamości zarządzanej przypisanej przez system.
Na stronie ustawienia punktu końcowego przepływu danych środowiska operacji wybierz kartę Podstawowa, a następnie wybierz pozycję Metoda>uwierzytelniania Przypisana tożsamość zarządzana przez system.
Ta konfiguracja tworzy tożsamość zarządzaną z domyślnymi odbiorcami, która jest taka sama jak wartość hosta przestrzeni nazw usługi Event Hubs w postaci https://<NAMESPACE>.servicebus.windows.net
. Jeśli jednak chcesz zastąpić domyślną grupę odbiorców, możesz ustawić audience
pole na żądaną wartość.
Nieobsługiwane w środowisku operacji.
Tożsamość zarządzana przypisana przez użytkownika
Aby użyć tożsamości zarządzanej przypisanej przez użytkownika do uwierzytelniania, należy najpierw wdrożyć operacje usługi Azure IoT z włączonymi bezpiecznymi ustawieniami. Następnie należy skonfigurować tożsamość zarządzaną przypisaną przez użytkownika dla połączeń w chmurze. Aby dowiedzieć się więcej, zobacz Włączanie bezpiecznych ustawień we wdrożeniu operacji usługi Azure IoT.
Przed skonfigurowaniem punktu końcowego przepływu danych przypisz rolę do tożsamości zarządzanej przypisanej przez użytkownika, która przyznaje uprawnienia do nawiązywania połączenia z brokerem platformy Kafka:
- W witrynie Azure Portal przejdź do zasobu w chmurze, który musisz udzielić uprawnień. Na przykład przejdź do obszaru >nazw Usługi Event Grid Kontrola dostępu (IAM)>Dodaj przypisanie roli.
- Na karcie Rola wybierz odpowiednią rolę.
- Na karcie Członkowie w polu Przypisz dostęp do wybierz opcję Tożsamość zarządzana, a następnie wybierz pozycję + Wybierz członków i wyszukaj tożsamość zarządzaną przypisaną przez użytkownika.
Następnie skonfiguruj punkt końcowy przepływu danych przy użyciu ustawień tożsamości zarządzanej przypisanej przez użytkownika.
Na stronie ustawienia punktu końcowego przepływu danych środowiska operacji wybierz kartę Podstawowa, a następnie wybierz pozycję Metoda>uwierzytelniania Przypisana przez użytkownika tożsamość zarządzana.
W tym miejscu zakresem jest grupa odbiorców tożsamości zarządzanej. Wartość domyślna jest taka sama jak wartość hosta przestrzeni nazw usługi Event Hubs w postaci https://<NAMESPACE>.servicebus.windows.net
. Jeśli jednak chcesz zastąpić domyślnych odbiorców, możesz ustawić pole zakresu na żądaną wartość przy użyciu platformy Bicep lub Kubernetes.
SASL
Aby użyć sygnatury dostępu współdzielonego na potrzeby uwierzytelniania, określ metodę uwierzytelniania SASL i skonfiguruj typ SASL oraz odwołanie do wpisu tajnego z nazwą wpisu tajnego zawierającego token SASL.
Na stronie ustawienia punktu końcowego przepływu danych środowiska operacji wybierz kartę Podstawowa, a następnie wybierz pozycję SasL metody>uwierzytelniania.
Wprowadź następujące ustawienia punktu końcowego:
Ustawienie | opis |
---|---|
Typ sygnatury dostępu współdzielonego | Typ uwierzytelniania SASL do użycia. Obsługiwane typy to Plain , ScramSha256 i ScramSha512 . |
Nazwa zsynchronizowanego wpisu tajnego | Nazwa wpisu tajnego kubernetes zawierającego token SASL. |
Odwołanie do nazwy użytkownika lub wpis tajny tokenu | Odwołanie do nazwy użytkownika lub klucza tajnego tokenu używanego do uwierzytelniania SASL. |
Odwołanie do hasła wpisu tajnego tokenu | Odwołanie do hasła lub klucza tajnego tokenu używanego do uwierzytelniania SASL. |
Obsługiwane typy SASL to:
Plain
ScramSha256
ScramSha512
Wpis tajny musi znajdować się w tej samej przestrzeni nazw co punkt końcowy przepływu danych platformy Kafka. Wpis tajny musi mieć token SASL jako parę klucz-wartość.
Anonimowe
Aby użyć uwierzytelniania anonimowego, zaktualizuj sekcję uwierzytelniania ustawień platformy Kafka, aby użyć metody Anonimowe.
Na stronie ustawienia punktu końcowego przepływu danych środowiska operacji wybierz kartę Podstawowa , a następnie wybierz pozycję Metoda>uwierzytelniania Brak.
Ustawienia zaawansowane
Możesz ustawić zaawansowane ustawienia punktu końcowego przepływu danych platformy Kafka, takie jak TLS, zaufany certyfikat urzędu certyfikacji, ustawienia obsługi komunikatów platformy Kafka, przetwarzanie wsadowe i cloudEvents. Te ustawienia można ustawić na karcie Zaawansowane portal punktu końcowego przepływu danych lub w zasobie punktu końcowego przepływu danych.
W środowisku operacji wybierz kartę Zaawansowane dla punktu końcowego przepływu danych.
Ustawienia protokołu TLS
Tryb TLS
Aby włączyć lub wyłączyć protokół TLS dla punktu końcowego platformy Kafka, zaktualizuj mode
ustawienie w ustawieniach protokołu TLS.
Na stronie ustawienia punktu końcowego przepływu danych środowiska operacji wybierz kartę Zaawansowane , a następnie użyj pola wyboru obok włączonego trybu TLS.
Tryb TLS można ustawić na Enabled
lub Disabled
. Jeśli tryb jest ustawiony na Enabled
, przepływ danych używa bezpiecznego połączenia z brokerem platformy Kafka. Jeśli tryb jest ustawiony na Disabled
, przepływ danych używa niezabezpieczonego połączenia z brokerem platformy Kafka.
Zaufany certyfikat urzędu certyfikacji
Skonfiguruj zaufany certyfikat urzędu certyfikacji dla punktu końcowego platformy Kafka, aby nawiązać bezpieczne połączenie z brokerem platformy Kafka. To ustawienie jest ważne, jeśli broker platformy Kafka używa certyfikatu z podpisem własnym lub certyfikatu podpisanego przez niestandardowy urząd certyfikacji, który nie jest domyślnie zaufany.
Na stronie ustawienia punktu końcowego przepływu danych środowiska operacji wybierz kartę Zaawansowane, a następnie użyj pola mapy konfiguracji certyfikatu zaufanego urzędu certyfikacji, aby określić ConfigMap zawierającą zaufany certyfikat urzędu certyfikacji.
Ta mapa konfiguracji powinna zawierać certyfikat urzędu certyfikacji w formacie PEM. Obiekt ConfigMap musi znajdować się w tej samej przestrzeni nazw co zasób przepływu danych platformy Kafka. Na przykład:
kubectl create configmap client-ca-configmap --from-file root_ca.crt -n azure-iot-operations
Napiwek
Podczas nawiązywania połączenia z usługą Azure Event Hubs certyfikat urzędu certyfikacji nie jest wymagany, ponieważ usługa Event Hubs używa certyfikatu podpisanego przez publiczny urząd certyfikacji, który jest domyślnie zaufany.
Identyfikator grupy odbiorców
Identyfikator grupy odbiorców służy do identyfikowania grupy odbiorców używanej przez przepływ danych do odczytywania komunikatów z tematu platformy Kafka. Identyfikator grupy odbiorców musi być unikatowy w obrębie brokera platformy Kafka.
Ważne
Gdy punkt końcowy platformy Kafka jest używany jako źródło, wymagany jest identyfikator grupy odbiorców. W przeciwnym razie przepływ danych nie może odczytać komunikatów z tematu platformy Kafka i zostanie wyświetlony błąd "Punkty końcowe źródła typu platformy Kafka muszą mieć zdefiniowany identyfikator consumerGroupId".
Na stronie ustawienia punktu końcowego przepływu danych środowiska operacji wybierz kartę Zaawansowane , a następnie użyj pola Identyfikator grupy odbiorców, aby określić identyfikator grupy odbiorców.
To ustawienie ma zastosowanie tylko wtedy, gdy punkt końcowy jest używany jako źródło (czyli przepływ danych jest odbiorcą).
Kompresja
Pole kompresji umożliwia kompresję komunikatów wysyłanych do tematów platformy Kafka. Kompresja pomaga zmniejszyć przepustowość sieci i miejsce do magazynowania wymagane do transferu danych. Jednak kompresja dodaje również pewne obciążenie i opóźnienie do procesu. Obsługiwane typy kompresji są wymienione w poniższej tabeli.
Wartość | Opis |
---|---|
None |
Nie zastosowano kompresji ani dzielenia na partie. Brak jest wartością domyślną, jeśli nie określono kompresji. |
Gzip |
Zastosowano kompresję GZIP i przetwarzanie wsadowe. GZIP to algorytm kompresji ogólnego przeznaczenia, który zapewnia dobrą równowagę między współczynnikiem kompresji a szybkością. Obecnie tylko kompresja GZIP jest obsługiwana w warstwach Premium i dedykowanych usługi Azure Event Hubs. |
Snappy |
Zastosowano kompresję Snappy i przetwarzanie wsadowe. Snappy to szybki algorytm kompresji, który oferuje umiarkowany współczynnik kompresji i szybkość. Ten tryb kompresji nie jest obsługiwany przez usługę Azure Event Hubs. |
Lz4 |
Stosuje się kompresję LZ4 i przetwarzanie wsadowe. LZ4 to szybki algorytm kompresji, który oferuje niski współczynnik kompresji i dużą szybkość. Ten tryb kompresji nie jest obsługiwany przez usługę Azure Event Hubs. |
Aby skonfigurować kompresję:
Na stronie ustawienia punktu końcowego przepływu danych środowiska operacji wybierz kartę Zaawansowane , a następnie użyj pola Kompresja , aby określić typ kompresji.
To ustawienie ma zastosowanie tylko wtedy, gdy punkt końcowy jest używany jako miejsce docelowe, w którym przepływ danych jest producentem.
Dzielenie na partie
Oprócz kompresji można również skonfigurować przetwarzanie wsadowe dla komunikatów przed wysłaniem ich do tematów platformy Kafka. Przetwarzanie wsadowe pozwala grupować wiele komunikatów i kompresować je jako pojedynczą jednostkę, co może zwiększyć wydajność kompresji i zmniejszyć obciążenie sieci.
Pole | opis | Wymagania |
---|---|---|
mode |
Możliwe wartości to Enabled i Disabled . Wartością domyślną jest Enabled to, że platforma Kafka nie ma pojęcia komunikatów bez szyfrowania . Jeśli ustawiono Disabled wartość , przetwarzanie wsadowe jest zminimalizowane w celu utworzenia partii z pojedynczym komunikatem za każdym razem. |
Nie. |
latencyMs |
Maksymalny przedział czasu w milisekundach, który można buforować przed wysłaniem komunikatów. Jeśli ten interwał zostanie osiągnięty, wszystkie buforowane komunikaty są wysyłane jako partia, niezależnie od liczby lub wielkości tych komunikatów. Jeśli nie zostanie ustawiona, wartość domyślna to 5. | Nie. |
maxMessages |
Maksymalna liczba komunikatów, które można buforować przed wysłaniem. Jeśli ta liczba zostanie osiągnięta, wszystkie buforowane komunikaty są wysyłane jako partia, niezależnie od tego, jak duże lub jak długo są buforowane. Jeśli nie zostanie ustawiona, wartość domyślna to 100000. | Nie. |
maxBytes |
Maksymalny rozmiar bajtów, który można buforować przed wysłaniem. Jeśli ten rozmiar zostanie osiągnięty, wszystkie buforowane komunikaty są wysyłane jako partia, niezależnie od tego, ile lub ile czasu są buforowane. Wartość domyślna to 10000000 (1 MB). | Nie. |
Jeśli na przykład ustawisz wartość latencyMs na 1000, maxMessages na 100 i maxBytes na 1024, komunikaty są wysyłane, gdy w buforze znajduje się 100 komunikatów lub gdy w buforze przypada 1024 bajty lub gdy upłynie 1000 milisekund, w zależności od czasu ostatniego wysłania.
Aby skonfigurować przetwarzanie wsadowe:
Na stronie ustawienia punktu końcowego przepływu danych środowiska operacji wybierz kartę Zaawansowane , a następnie użyj pola Włączone wsadowe, aby włączyć przetwarzanie wsadowe . Użyj pól Opóźnienie wsadowe, Maksymalna liczba bajtów i Liczba komunikatów, aby określić ustawienia przetwarzania wsadowego.
To ustawienie ma zastosowanie tylko wtedy, gdy punkt końcowy jest używany jako miejsce docelowe, w którym przepływ danych jest producentem.
Strategia obsługi partycji
Strategia obsługi partycji określa, jak komunikaty są przypisywane do partycji platformy Kafka podczas wysyłania ich do tematów platformy Kafka. Partycje platformy Kafka to logiczne segmenty tematu platformy Kafka, które umożliwiają równoległe przetwarzanie i odporność na uszkodzenia. Każdy komunikat w temacie platformy Kafka ma partycję i przesunięcie, które są używane do identyfikowania i porządkowenia komunikatów.
To ustawienie ma zastosowanie tylko wtedy, gdy punkt końcowy jest używany jako miejsce docelowe, w którym przepływ danych jest producentem.
Domyślnie przepływ danych przypisuje komunikaty do losowych partycji przy użyciu algorytmu działania okrężnego. Można jednak użyć różnych strategii, aby przypisać komunikaty do partycji na podstawie niektórych kryteriów, takich jak nazwa tematu MQTT lub właściwość komunikatu MQTT. Może to pomóc w lepszej równoważeniu obciążenia, lokalności danych lub porządkoweniu komunikatów.
Wartość | Opis |
---|---|
Default |
Przypisuje komunikaty do losowych partycji przy użyciu algorytmu działania okrężnego. Jest to wartość domyślna, jeśli nie określono żadnej strategii. |
Static |
Przypisuje komunikaty do stałego numeru partycji pochodzącego z identyfikatora wystąpienia przepływu danych. Oznacza to, że każde wystąpienie przepływu danych wysyła komunikaty do innej partycji. Może to pomóc w osiągnięciu lepszego równoważenia obciążenia i lokalizacji danych. |
Topic |
Używa nazwy tematu MQTT ze źródła przepływu danych jako klucza do partycjonowania. Oznacza to, że komunikaty o tej samej nazwie tematu MQTT są wysyłane do tej samej partycji. Może to pomóc w osiągnięciu lepszej kolejności komunikatów i lokalizacji danych. |
Property |
Używa właściwości komunikatu MQTT ze źródła przepływu danych jako klucza do partycjonowania. Określ nazwę właściwości w partitionKeyProperty polu. Oznacza to, że komunikaty o tej samej wartości właściwości są wysyłane do tej samej partycji. Może to pomóc w lepszej kolejności komunikatów i lokalności danych na podstawie kryterium niestandardowego. |
Jeśli na przykład ustawisz strategię obsługi partycji na Property
, a właściwość klucza partycji na device-id
, komunikaty z tą samą device-id
właściwością są wysyłane do tej samej partycji.
Aby skonfigurować strategię obsługi partycji:
Na stronie ustawienia punktu końcowego przepływu danych środowiska operacji wybierz kartę Zaawansowane , a następnie użyj pola Strategia obsługi partycji, aby określić strategię obsługi partycji. Użyj pola właściwości Klucz partycji, aby określić właściwość używaną do partycjonowania, jeśli strategia jest ustawiona na Property
wartość .
Potwierdzenia platformy Kafka
Potwierdzenia platformy Kafka (acks) służą do kontrolowania trwałości i spójności komunikatów wysyłanych do tematów platformy Kafka. Gdy producent wysyła komunikat do tematu platformy Kafka, może zażądać różnych poziomów potwierdzenia od brokera platformy Kafka, aby upewnić się, że wiadomość została pomyślnie zapisana w temacie i zreplikowana w klastrze platformy Kafka.
To ustawienie ma zastosowanie tylko wtedy, gdy punkt końcowy jest używany jako miejsce docelowe (czyli przepływ danych jest producentem).
Wartość | Opis |
---|---|
None |
Przepływ danych nie czeka na żadne potwierdzenia od brokera platformy Kafka. To ustawienie jest najszybszą, ale najmniej trwałą opcją. |
All |
Przepływ danych czeka na zapisanie komunikatu w partycji lidera i wszystkich partycji obserwowanych. To ustawienie jest najwolniejsze, ale najbardziej trwałe. To ustawienie jest również opcją domyślną |
One |
Przepływ danych czeka na zapisanie komunikatu w partycji lidera i co najmniej jednej partycji obserwowanej. |
Zero |
Przepływ danych czeka na zapisanie wiadomości w partycji lidera, ale nie czeka na potwierdzenie od obserwatorów. Jest to szybsze niż One ale mniej trwałe. |
Jeśli na przykład ustawisz potwierdzenie platformy Kafka na All
wartość , przepływ danych czeka na zapisanie komunikatu w partycji lidera i wszystkich partycji obserwowanych przed wysłaniem następnej wiadomości.
Aby skonfigurować potwierdzenia platformy Kafka:
Na stronie Ustawienia punktu końcowego przepływu danych środowiska operacji wybierz kartę Zaawansowane , a następnie użyj pola potwierdzenia platformy Kafka, aby określić poziom potwierdzenia platformy Kafka.
To ustawienie ma zastosowanie tylko wtedy, gdy punkt końcowy jest używany jako miejsce docelowe, w którym przepływ danych jest producentem.
Kopiowanie właściwości MQTT
Domyślnie ustawienie właściwości kopiowania MQTT jest włączone. Te właściwości użytkownika obejmują wartości, takie jak subject
, które przechowują nazwę zasobu wysyłającego komunikat.
Na stronie ustawienia punktu końcowego przepływu danych środowiska operacji wybierz kartę Zaawansowane , a następnie użyj pola wyboru obok pola Kopiuj właściwości MQTT, aby włączyć lub wyłączyć kopiowanie właściwości MQTT.
W poniższych sekcjach opisano sposób tłumaczenia właściwości MQTT na nagłówki użytkownika platformy Kafka i na odwrót po włączeniu ustawienia.
Punkt końcowy platformy Kafka jest miejscem docelowym
Gdy punkt końcowy platformy Kafka jest miejscem docelowym przepływu danych, wszystkie zdefiniowane właściwości specyfikacji MQTT v5 są tłumaczone nagłówki użytkownika platformy Kafka. Na przykład komunikat MQTT v5 z komunikatem "Typ zawartości" przekazywany do platformy Kafka przekłada się na nagłówek "Content Type":{specifiedValue}
użytkownika platformy Kafka . Podobne reguły dotyczą innych wbudowanych właściwości MQTT zdefiniowanych w poniższej tabeli.
Właściwość MQTT | Przetłumaczone zachowanie |
---|---|
Wskaźnik formatu ładunku | Klucz: "Wskaźnik formatu ładunku" Wartość: "0" (Ładunek jest bajtami) lub "1" (ładunek to UTF-8) |
Temat odpowiedzi | Klucz: "Temat odpowiedzi" Wartość: kopia tematu odpowiedzi z oryginalnej wiadomości. |
Interwał wygaśnięcia komunikatu | Klucz: "Interwał wygaśnięcia komunikatu" Wartość: reprezentacja UTF-8 liczby sekund przed wygaśnięciem komunikatu. Aby uzyskać więcej informacji, zobacz właściwość Interwał wygasania komunikatów. |
Dane korelacji: | Klucz: "Dane korelacji" Wartość: kopia danych korelacji z oryginalnego komunikatu. W przeciwieństwie do wielu właściwości MQTT v5, które są zakodowane w formacie UTF-8, dane korelacji mogą być dowolnymi danymi. |
Typ zawartości: | Klucz: "Typ zawartości" Wartość: kopia typu zawartości z oryginalnej wiadomości. |
Pary wartości klucza właściwości użytkownika MQTT v5 są bezpośrednio tłumaczone na nagłówki użytkownika platformy Kafka. Jeśli nagłówek użytkownika w komunikacie ma taką samą nazwę jak wbudowana właściwość MQTT (na przykład nagłówek użytkownika o nazwie "Dane korelacji"), czy przekazywanie wartości właściwości specyfikacji MQTT v5 lub właściwość użytkownika jest niezdefiniowana.
Przepływy danych nigdy nie otrzymują tych właściwości z brokera MQTT. W związku z tym przepływ danych nigdy nie przekazuje ich dalej:
- Alias tematu
- Identyfikatory subskrypcji
Właściwość Interwał wygaśnięcia komunikatu
Interwał wygaśnięcia komunikatu określa, jak długo komunikat może pozostać w brokerze MQTT przed odrzuceniem.
Gdy przepływ danych odbiera komunikat MQTT z określonym interwałem wygaśnięcia komunikatu, to:
- Rejestruje czas odebrania wiadomości.
- Zanim komunikat zostanie wyemitowany do miejsca docelowego, czas odejmowany od komunikatu został odejmowany z oryginalnego interwału wygaśnięcia.
- Jeśli komunikat nie wygasł (operacja powyżej wynosi > 0), komunikat jest emitowany do miejsca docelowego i zawiera zaktualizowany czas wygaśnięcia komunikatu.
- Jeśli komunikat wygasł (operacja powyżej wynosi <= 0), komunikat nie jest emitowany przez element docelowy.
Przykłady:
- Przepływ danych odbiera komunikat MQTT z interwałem wygaśnięcia komunikatu = 3600 sekund. Odpowiednie miejsce docelowe jest tymczasowo odłączone, ale można ponownie nawiązać połączenie. Po upływie 1000 sekund przed wysłaniem tego komunikatu MQTT do elementu docelowego. W takim przypadku komunikat miejsca docelowego ma ustawiony interwał wygaśnięcia komunikatu na 2600 (3600– 1000) sekund.
- Przepływ danych odbiera komunikat MQTT z interwałem wygaśnięcia komunikatu = 3600 sekund. Odpowiednie miejsce docelowe jest tymczasowo odłączone, ale można ponownie nawiązać połączenie. W tym przypadku jednak ponowne nawiązanie połączenia zajmuje 4000 sekund. Komunikat wygasł, a przepływ danych nie przekazuje tej wiadomości do miejsca docelowego.
Punkt końcowy platformy Kafka to źródło przepływu danych
Uwaga
Istnieje znany problem podczas korzystania z punktu końcowego usługi Event Hubs jako źródła przepływu danych, w którym nagłówek platformy Kafka jest uszkodzony jako przetłumaczony na MQTT. Dzieje się tak tylko w przypadku korzystania z centrum zdarzeń, choć klient centrum zdarzeń, który korzysta z protokołu AMQP w ramach okładek. Na przykład "foo"="bar" jest tłumaczona wartość "foo", ale wartość staje się "\xa1\x03bar".
Gdy punkt końcowy platformy Kafka jest źródłem przepływu danych, nagłówki użytkowników platformy Kafka są tłumaczone na właściwości MQTT v5. W poniższej tabeli opisano sposób tłumaczenia nagłówków użytkownika platformy Kafka na właściwości MQTT v5.
Nagłówek platformy Kafka | Przetłumaczone zachowanie |
---|---|
Klucz | Klucz: "Klucz" Wartość: kopia klucza z oryginalnej wiadomości. |
Sygnatura czasowa | Klucz: "Sygnatura czasowa" Wartość: kodowanie UTF-8 znacznika czasu platformy Kafka, który jest liczbą milisekund od epoki Unix. |
Pary klucz/wartość nagłówka użytkownika platformy Kafka — pod warunkiem, że wszystkie są kodowane w formacie UTF-8 — są bezpośrednio tłumaczone na właściwości klucza użytkownika/wartości MQTT.
UTF-8 / Niezgodność binarna
Protokół MQTT v5 może obsługiwać tylko właściwości oparte na protokole UTF-8. Jeśli przepływ danych otrzyma komunikat platformy Kafka zawierający co najmniej jeden nagłówek inny niż UTF-8, przepływ danych:
- Usuń właściwości lub właściwości, które obrażają.
- Prześlij dalej pozostałą część komunikatu, postępując zgodnie z poprzednimi regułami.
Aplikacje, które wymagają transferu binarnego w nagłówkach źródłowych platformy Kafka => właściwości obiektu docelowego MQTT, muszą najpierw kodować je za pomocą protokołu UTF-8 — na przykład za pośrednictwem base64.
>=64 KB niezgodności właściwości
Właściwości MQTT v5 muszą być mniejsze niż 64 KB. Jeśli przepływ danych odbiera komunikat platformy Kafka zawierający co najmniej jeden nagłówek >= 64 KB, przepływ danych:
- Usuń właściwości lub właściwości, które obrażają.
- Prześlij dalej pozostałą część komunikatu, postępując zgodnie z poprzednimi regułami.
Tłumaczenie właściwości w przypadku korzystania z usługi Event Hubs i producentów korzystających z protokołu AMQP
Jeśli masz klienta przekazujący komunikaty, punkt końcowy źródła przepływu danych platformy Kafka wykonuje dowolną z następujących akcji:
- Wysyłanie komunikatów do usługi Event Hubs przy użyciu bibliotek klienckich, takich jak Azure.Messaging.EventHubs
- Bezpośrednie używanie protokołu AMQP
Istnieją niuanse tłumaczenia właściwości, o których należy pamiętać.
Należy wykonać jedną z następujących czynności:
- Unikaj wysyłania właściwości
- Jeśli musisz wysłać właściwości, wyślij wartości zakodowane jako UTF-8.
Gdy usługa Event Hubs tłumaczy właściwości z protokołu AMQP na platformę Kafka, zawiera podstawowe typy zakodowane przez protokół AMQP w komunikacie. Aby uzyskać więcej informacji na temat zachowania, zobacz Wymiana zdarzeń między konsumentami i producentami przy użyciu różnych protokołów.
W poniższym przykładzie kodu, gdy punkt końcowy przepływu danych odbiera wartość "foo":"bar"
, otrzymuje właściwość jako <0xA1 0x03 "bar">
.
using global::Azure.Messaging.EventHubs;
using global::Azure.Messaging.EventHubs.Producer;
var propertyEventBody = new BinaryData("payload");
var propertyEventData = new EventData(propertyEventBody)
{
Properties =
{
{"foo", "bar"},
}
};
var propertyEventAdded = eventBatch.TryAdd(propertyEventData);
await producerClient.SendAsync(eventBatch);
Punkt końcowy przepływu danych nie może przekazać właściwości <0xA1 0x03 "bar">
ładunku do komunikatu MQTT, ponieważ dane nie są utF-8. Jeśli jednak określisz ciąg UTF-8, punkt końcowy przepływu danych tłumaczy ciąg przed wysłaniem na MQTT. Jeśli używasz ciągu UTF-8, komunikat MQTT będzie miał "foo":"bar"
jako właściwości użytkownika.
Tłumaczone są tylko nagłówki UTF-8. Na przykład w przypadku następującego scenariusza, w którym właściwość jest ustawiana jako zmiennoprzecinkowa:
Properties =
{
{"float-value", 11.9 },
}
Punkt końcowy przepływu danych odrzuca pakiety zawierające "float-value"
pole.
Nie wszystkie właściwości danych zdarzenia, w tym propertyEventData.correlationId, są przekazywane. Aby uzyskać więcej informacji, zobacz Właściwości użytkownika zdarzenia,
CloudEvents
CloudEvents to sposób opisywania danych zdarzeń w typowy sposób. Ustawienia CloudEvents służą do wysyłania lub odbierania komunikatów w formacie CloudEvents. Usługi CloudEvents można używać w przypadku architektur opartych na zdarzeniach, w których różne usługi muszą komunikować się ze sobą u tych samych lub różnych dostawców chmury.
Dostępne CloudEventAttributes
opcje to Propagate
lubCreateOrRemap
.
Na stronie ustawienia punktu końcowego przepływu danych środowiska operacji wybierz kartę Zaawansowane, a następnie użyj pola Atrybuty zdarzeń w chmurze, aby określić ustawienie CloudEvents.
W poniższych sekcjach opisano, jak właściwości CloudEvent są propagowane lub tworzone i ponownie mapowane.
Propagacja ustawienia
Właściwości CloudEvent są przekazywane dla komunikatów zawierających wymagane właściwości. Jeśli komunikat nie zawiera wymaganych właściwości, komunikat jest przekazywany w następujący sposób. Jeśli są obecne wymagane właściwości, ce_
do nazwy właściwości CloudEvent zostanie dodany prefiks.
Nazwisko | Wymagania | Przykładowa wartość | Nazwa danych wyjściowych | Wartość wyjściowa |
---|---|---|---|---|
specversion |
Tak | 1.0 |
ce-specversion |
Przekazywane w formacie jest |
type |
Tak | ms.aio.telemetry |
ce-type |
Przekazywane w formacie jest |
source |
Tak | aio://mycluster/myoven |
ce-source |
Przekazywane w formacie jest |
id |
Tak | A234-1234-1234 |
ce-id |
Przekazywane w formacie jest |
subject |
Nie. | aio/myoven/telemetry/temperature |
ce-subject |
Przekazywane w formacie jest |
time |
Nie. | 2018-04-05T17:31:00Z |
ce-time |
Przekazywane tak, jak to jest. Nie jest wypoczęty. |
datacontenttype |
Nie. | application/json |
ce-datacontenttype |
Zmieniono typ zawartości danych wyjściowych po opcjonalnym etapie przekształcania. |
dataschema |
Nie. | sr://fabrikam-schemas/123123123234234234234234#1.0.0 |
ce-dataschema |
Jeśli schemat przekształcania danych wyjściowych zostanie podany w konfiguracji przekształcania, dataschema zostanie zmieniony na schemat wyjściowy. |
Ustawienie CreateOrRemap
Właściwości CloudEvent są przekazywane dla komunikatów zawierających wymagane właściwości. Jeśli komunikat nie zawiera wymaganych właściwości, zostaną wygenerowane właściwości.
Nazwisko | Wymagania | Nazwa danych wyjściowych | Wygenerowana wartość, jeśli brakuje |
---|---|---|---|
specversion |
Tak | ce-specversion |
1.0 |
type |
Tak | ce-type |
ms.aio-dataflow.telemetry |
source |
Tak | ce-source |
aio://<target-name> |
id |
Tak | ce-id |
Wygenerowany identyfikator UUID w kliencie docelowym |
subject |
Nie. | ce-subject |
Temat wyjściowy, w którym jest wysyłany komunikat |
time |
Nie. | ce-time |
Wygenerowany jako RFC 3339 w kliencie docelowym |
datacontenttype |
Nie. | ce-datacontenttype |
Zmieniono na typ zawartości danych wyjściowych po opcjonalnym etapie przekształcania |
dataschema |
Nie. | ce-dataschema |
Schemat zdefiniowany w rejestrze schematów |
Następne kroki
Aby dowiedzieć się więcej na temat przepływów danych, zobacz Tworzenie przepływu danych.