Interakcja z klastrami platformy Apache Kafka w usłudze Azure HDInsight przy użyciu serwera proxy REST
Serwer proxy REST platformy Kafka umożliwia interakcję z klastrem Platformy Kafka za pośrednictwem interfejsu API REST za pośrednictwem protokołu HTTPS. Ta akcja oznacza, że klienci platformy Kafka mogą znajdować się poza siecią wirtualną. Klienci mogą tworzyć proste, bezpieczne wywołania HTTPS do klastra Kafka zamiast polegać na bibliotekach platformy Kafka. W tym artykule pokazano, jak utworzyć klaster platformy Kafka z włączonym serwerem proxy REST. Zawiera również przykładowy kod, który pokazuje, jak wykonywać wywołania serwera proxy REST.
Dokumentacja interfejsu API REST
Aby uzyskać informacje na temat operacji obsługiwanych przez interfejs API REST platformy Kafka, zobacz Dokumentacja interfejsu API serwera proxy REST platformy Kafka w usłudze HDInsight.
Tło
Aby uzyskać pełną specyfikację operacji obsługiwanych przez interfejs API, zobacz Interfejs API REST platformy Apache Kafka.
Punkt końcowy serwera proxy REST
Tworzenie klastra platformy Kafka usługi HDInsight za pomocą serwera proxy REST powoduje utworzenie nowego publicznego punktu końcowego dla klastra, który można znaleźć we właściwościach klastra usługi HDInsight w witrynie Azure Portal.
Zabezpieczenia
Dostęp do serwera proxy REST platformy Kafka zarządzanego za pomocą grup zabezpieczeń firmy Microsoft Entra. Podczas tworzenia klastra platformy Kafka podaj grupę zabezpieczeń Firmy Microsoft Entra z dostępem do punktu końcowego REST. Klienci platformy Kafka, którzy potrzebują dostępu do serwera proxy REST, powinni być zarejestrowani w tej grupie przez właściciela grupy. Właściciel grupy może zarejestrować się za pośrednictwem portalu lub za pośrednictwem programu PowerShell.
W przypadku żądań punktu końcowego serwera proxy REST aplikacje klienckie powinny uzyskać token OAuth. Token używa metody do weryfikowania członkostwa w grupie zabezpieczeń. W przykładzie aplikacji klienckiej pokazano, jak uzyskać token OAuth. Aplikacja kliencka przekazuje token OAuth w żądaniu HTTPS do serwera proxy REST.
Uwaga
Aby dowiedzieć się więcej o grupach zabezpieczeń firmy Microsoft Entra, zobacz Zarządzanie dostępem do aplikacji i zasobów przy użyciu grup zabezpieczeń firmy Microsoft. Aby uzyskać więcej informacji na temat sposobu działania tokenów OAuth, zobacz Autoryzowanie dostępu do aplikacji internetowych firmy Microsoft Entra przy użyciu przepływu udzielania kodu OAuth 2.0.
Serwer proxy REST platformy Kafka z sieciowymi grupami zabezpieczeń
Jeśli przenosisz własną sieć wirtualną i kontrolujesz ruch sieciowy z sieciowymi grupami zabezpieczeń, zezwalaj na ruch przychodzący na porcie 9400 oprócz portu 443. Dzięki temu serwer proxy REST platformy Kafka jest osiągalny.
Wymagania wstępne
Rejestrowanie aplikacji usługi Tożsamości Microsoft Entra. Aplikacje klienckie, które zapisujesz w celu interakcji z serwerem proxy REST platformy Kafka, używają identyfikatora i wpisu tajnego tej aplikacji do uwierzytelniania na platformie Azure.
Utwórz grupę zabezpieczeń Entra firmy Microsoft. Dodaj aplikację zarejestrowaną w usłudze Microsoft Entra ID do grupy zabezpieczeń jako członek grupy. Ta grupa zabezpieczeń będzie używana do kontrolowania, które aplikacje umożliwiają interakcję z serwerem proxy REST. Aby uzyskać więcej informacji na temat tworzenia grup entra firmy Microsoft, zobacz Tworzenie podstawowej grupy i dodawanie członków przy użyciu identyfikatora Entra firmy Microsoft.
Sprawdź, czy grupa jest typu Zabezpieczenia.
Sprawdź, czy aplikacja jest członkiem grupy.
Tworzenie klastra platformy Kafka z włączonym serwerem proxy REST
W krokach jest używana witryna Azure Portal. Przykład użycia interfejsu wiersza polecenia platformy Azure można znaleźć w temacie Create Apache Kafka REST proxy cluster using Azure CLI (Tworzenie klastra proxy REST platformy Apache Kafka przy użyciu interfejsu wiersza polecenia platformy Azure).
Podczas przepływu pracy tworzenia klastra platformy Kafka na karcie Zabezpieczenia i sieć zaznacz opcję Włącz serwer proxy REST platformy Kafka.
Kliknij pozycję Wybierz grupę zabezpieczeń. Z listy grup zabezpieczeń wybierz grupę zabezpieczeń, do której chcesz mieć dostęp do serwera proxy REST. Możesz użyć pola wyszukiwania, aby znaleźć odpowiednią grupę zabezpieczeń. Kliknij przycisk Wybierz u dołu.
Wykonaj pozostałe kroki, aby utworzyć klaster zgodnie z opisem w temacie Create Apache Kafka cluster in Azure HDInsight using Azure Portal (Tworzenie klastra platformy Apache Kafka w usłudze Azure HDInsight przy użyciu witryny Azure Portal).
Po utworzeniu klastra przejdź do właściwości klastra, aby zarejestrować adres URL serwera proxy REST platformy Kafka.
Przykład aplikacji klienckiej
Kod języka Python umożliwia interakcję z serwerem proxy REST w klastrze platformy Kafka. Aby użyć przykładowego kodu, wykonaj następujące kroki:
Zapisz przykładowy kod na maszynie z zainstalowanym językiem Python.
Zainstaluj wymagane zależności języka Python, wykonując polecenie
pip3 install msal
.Zmodyfikuj sekcję kodu Skonfiguruj te właściwości i zaktualizuj następujące właściwości środowiska:
Właściwości opis Identyfikator dzierżawy Dzierżawa platformy Azure, w której znajduje się Twoja subskrypcja. Client ID Identyfikator aplikacji zarejestrowanej w grupie zabezpieczeń. Klucz tajny klienta Wpis tajny aplikacji zarejestrowanej w grupie zabezpieczeń. Kafkarest_endpoint Pobierz tę wartość z karty Właściwości w przeglądzie klastra zgodnie z opisem w sekcji wdrażania. Powinien mieć następujący format: https://<clustername>-kafkarest.azurehdinsight.net
W wierszu polecenia wykonaj plik języka Python, wykonując polecenie
sudo python3 <filename.py>
Ten kod wykonuje następującą akcję:
- Pobiera token OAuth z identyfikatora Entra firmy Microsoft.
- Pokazuje, jak wysłać żądanie do serwera proxy REST platformy Kafka.
Aby uzyskać więcej informacji na temat pobierania tokenów OAuth w języku Python, zobacz Python AuthenticationContext class (Klasa Python AuthenticationContext). Może wystąpić opóźnienie, gdy topics
nie zostanie on utworzony lub usunięty za pośrednictwem serwera proxy REST platformy Kafka. To opóźnienie wynika z odświeżania pamięci podręcznej. Pole wartości interfejsu API producenta zostało ulepszone. Teraz akceptuje obiekty JSON i dowolną serializowaną formę.
#Required Python packages
#pip3 install msal
import json
import msal
import random
import requests
import string
import sys
import time
def get_random_string():
letters = string.ascii_letters
random_string = ''.join(random.choice(letters) for i in range(7))
return random_string
#--------------------------Configure these properties-------------------------------#
# Tenant ID for your Azure Subscription
tenant_id = 'aaaabbbb-0000-cccc-1111-dddd2222eeee'
# Your Client Application Id
client_id = '00001111-aaaa-2222-bbbb-3333cccc4444'
# Your Client Credentials
client_secret = 'password'
# kafka rest proxy -endpoint
kafkarest_endpoint = "https://<clustername>-kafkarest.azurehdinsight.net"
#--------------------------Configure these properties-------------------------------#
# Get access token
# Scope
scope = 'https://hib.azurehdinsight.net/.default'
#Authority
authority = 'https://login.microsoftonline.com/' + tenant_id
app = msal.ConfidentialClientApplication(
client_id , client_secret, authority,
#cache - For details on how look at this example: https://github.com/Azure-Samples/ms-identity-python-webapp/blob/master/app.py
)
# The pattern to acquire a token looks like this.
result = None
result = app.acquire_token_for_client(scopes=[scope])
accessToken = result['access_token']
verify_https = True
request_timeout = 10
# Print access token
print("Access token: " + accessToken)
# API format
api_version = 'v1'
api_format = kafkarest_endpoint + '/{api_version}/{rest_api}'
get_topic_api = 'metadata/topics'
topic_api_format = 'topics/{topic_name}'
producer_api_format = 'producer/topics/{topic_name}'
consumer_api_format = 'consumer/topics/{topic_name}/partitions/{partition_id}/offsets/{offset}?count={count}' # by default count = 1
partitions_api_format = 'metadata/topics/{topic_name}/partitions'
partition_api_format = 'metadata/topics/{topic_name}/partitions/{partition_id}'
# Request header
headers = {
'Authorization': 'Bearer ' + accessToken,
'Content-type': 'application/json' # set Content-type to 'application/json'
}
# New topic
new_topic = 'hello_topic_' + get_random_string()
print("Topic " + new_topic + " is going to be used for demo.")
topics = []
# Create a new topic
# Example of topic config
topic_config = {
"partition_count": 1,
"replication_factor": 1,
"topic_properties": {
"retention.ms": 604800000,
"min.insync.replicas": "1"
}
}
create_topic_url = api_format.format(api_version=api_version, rest_api=topic_api_format.format(topic_name=new_topic))
response = requests.put(create_topic_url, headers=headers, json=topic_config, timeout=request_timeout, verify=verify_https)
print(response.content)
if response.ok:
while new_topic not in topics:
print("The new topic " + new_topic + " is not visible yet. sleep 30 seconds...")
time.sleep(30)
# List Topic
get_topic_url = api_format.format(api_version=api_version, rest_api=get_topic_api)
response = requests.get(get_topic_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
topic_list = response.json()
topics = topic_list.get("topics", [])
else:
print("Topic " + new_topic + " was created. Exit.")
sys.exit(1)
# Produce messages to new_topic
# Example payload of Producer REST API
payload_json = {
"records": [
{
"key": "key1",
"value": "**********" # A string
},
{
"partition": 0,
"value": 5 # An integer
},
{
"value": 3.14 # A floating number
},
{
"value": { # A JSON object
"id": 1,
"name": "HDInsight Kafka REST proxy"
}
},
{
"value": [ # A list of JSON objects
{
"id": 1,
"name": "HDInsight Kafka REST proxy 1"
},
{
"id": 2,
"name": "HDInsight Kafka REST proxy 2"
},
{
"id": 3,
"name": "HDInsight Kafka REST proxy 3"
}
]
},
{
"value": { # A nested JSON object
"group id": 1,
"HDI Kafka REST": {
"id": 1,
"name": "HDInsight Kafka REST proxy 1"
},
"HDI Kafka REST server info": {
"id": 1,
"name": "HDInsight Kafka REST proxy 1",
"servers": [
{
"server id": 1,
"server name": "HDInsight Kafka REST proxy server 1"
},
{
"server id": 2,
"server name": "HDInsight Kafka REST proxy server 2"
},
{
"server id": 3,
"server name": "HDInsight Kafka REST proxy server 3"
}
]
}
}
}
]
}
print("Payloads in a Producer request: \n", payload_json)
producer_url = api_format.format(api_version=api_version, rest_api=producer_api_format.format(topic_name=new_topic))
response = requests.post(producer_url, headers=headers, json=payload_json, timeout=request_timeout, verify=verify_https)
print(response.content)
# Consume messages from the topic
partition_id = 0
offset = 0
count = 2
while True:
consumer_url = api_format.format(api_version=api_version, rest_api=consumer_api_format.format(topic_name=new_topic, partition_id=partition_id, offset=offset, count=count))
print("Consuming " + str(count) + " messages from offset " + str(offset))
response = requests.get(consumer_url, headers=headers, timeout=request_timeout, verify=verify_https)
if response.ok:
messages = response.json()
print("Consumed messages: \n" + json.dumps(messages, indent=2))
next_offset = response.headers.get("NextOffset")
if offset == next_offset or not messages.get("records", []):
print("Consumer caught up with producer. Exit for now...")
break
offset = next_offset
else:
print("Error " + str(response.status_code))
break
# List partitions
get_partitions_url = api_format.format(api_version=api_version, rest_api=partitions_api_format.format(topic_name=new_topic))
print("Fetching partitions from " + get_partitions_url)
response = requests.get(get_partitions_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
partition_list = response.json()
print("Partition list: \n" + json.dumps(partition_list, indent=2))
# List a partition
get_partition_url = api_format.format(api_version=api_version, rest_api=partition_api_format.format(topic_name=new_topic, partition_id=partition_id))
print("Fetching metadata of a partition from " + get_partition_url)
response = requests.get(get_partition_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
partition = response.json()
print("Partition metadata: \n" + json.dumps(partition, indent=2))
Poniżej znajduje się kolejny przykład dotyczący pobierania tokenu z serwera proxy REST platformy Azure za pomocą polecenia curl. Zwróć uwagę, że podczas uzyskiwania tokenu scope=https://hib.azurehdinsight.net/.default
potrzebujemy określonego tokenu.
curl -X POST -H "Content-Type: application/x-www-form-urlencoded" -d 'client_id=<clientid>&client_secret=<clientsecret>&grant_type=client_credentials&scope=https://hib.azurehdinsight.net/.default' 'https://login.microsoftonline.com/<tenantid>/oauth2/v2.0/token'