Interactie met Apache Kafka-clusters in Azure HDInsight met behulp van een REST-proxy
Met de Kafka REST-proxy kunt u communiceren met uw Kafka-cluster via een REST API via HTTPS. Deze actie betekent dat uw Kafka-clients zich buiten uw virtuele netwerk kunnen bevinden. Clients kunnen eenvoudige, beveiligde HTTPS-aanroepen naar het Kafka-cluster maken in plaats van te vertrouwen op Kafka-bibliotheken. In dit artikel leest u hoe u een Kafka-cluster met REST-proxy maakt. Biedt ook een voorbeeldcode die laat zien hoe u aanroepen naar DE REST-proxy uitvoert.
Naslaginformatie over REST API
Zie voor bewerkingen die worden ondersteund door de Kafka REST API HDInsight Kafka REST Proxy-API.
Achtergrond
Zie de Apache Kafka REST Proxy-API voor de volledige specificatie van bewerkingen die worden ondersteund door de API.
REST Proxy-eindpunt
Als u een HDInsight Kafka-cluster met REST-proxy maakt, maakt u een nieuw openbaar eindpunt voor uw cluster, dat u kunt vinden in de eigenschappen van uw HDInsight-cluster in Azure Portal.
Beveiliging
Toegang tot de Kafka REST-proxy die wordt beheerd met Microsoft Entra-beveiligingsgroepen. Geef bij het maken van het Kafka-cluster de Microsoft Entra-beveiligingsgroep op met REST-eindpunttoegang. Kafka-clients die toegang nodig hebben tot de REST-proxy, moeten worden geregistreerd bij deze groep door de groepseigenaar. De groepseigenaar kan zich registreren via de portal of via PowerShell.
Voor REST-proxy-eindpuntaanvragen moeten clienttoepassingen een OAuth-token ophalen. Het token gebruikt om het lidmaatschap van de beveiligingsgroep te controleren. Een voorbeeld van een clienttoepassing zoeken laat zien hoe u een OAuth-token kunt ophalen. De clienttoepassing geeft het OAuth-token in de HTTPS-aanvraag door aan de REST-proxy.
Notitie
Zie App - en resourcetoegang beheren met Behulp van Microsoft Entra-groepen voor meer informatie over Microsoft Entra-beveiligingsgroepen. Zie Toegang tot Microsoft Entra-webtoepassingen autoriseren met behulp van de OAuth 2.0-codetoekenningenstroom voor meer informatie over de werking van OAuth-tokens.
Kafka REST-proxy met netwerkbeveiligingsgroepen
Als u uw eigen VNet gebruikt en netwerkverkeer met netwerkbeveiligingsgroepen controleert, staat u inkomend verkeer op poort 9400 toe naast poort 443. Dit zorgt ervoor dat de Kafka REST-proxyserver bereikbaar is.
Vereisten
Een toepassing registreren bij Microsoft Entra ID. De clienttoepassingen die u schrijft om te communiceren met de Kafka REST-proxy, gebruiken de id en het geheim van deze toepassing om te verifiëren bij Azure.
Maak een Microsoft Entra-beveiligingsgroep. Voeg de toepassing die u hebt geregistreerd bij Microsoft Entra ID toe aan de beveiligingsgroep als lid van de groep. Deze beveiligingsgroep wordt gebruikt om te bepalen welke toepassingen interactie met de REST-proxy toestaan. Zie Een basisgroep maken en leden toevoegen met behulp van Microsoft Entra-id voor meer informatie over het maken van Microsoft Entra-groepen.
Controleer of de groep van het type Beveiliging is.
Controleer of de toepassing lid is van de groep.
Een Kafka-cluster maken waarvoor REST-proxy is ingeschakeld
In de stappen wordt Azure Portal gebruikt. Zie Apache Kafka REST-proxycluster maken met behulp van Azure CLI voor een voorbeeld van het gebruik van Azure CLI.
Schakel tijdens het maken van het Kafka-cluster op het tabblad Beveiliging en netwerken de optie Kafka REST-proxy inschakelen in.
Klik op Beveiligingsgroep selecteren. Selecteer in de lijst met beveiligingsgroepen de beveiligingsgroep waartoe u toegang wilt hebben tot de REST-proxy. U kunt het zoekvak gebruiken om de juiste beveiligingsgroep te vinden. Klik op de knop Selecteren onderaan.
Voer de resterende stappen uit om uw cluster te maken, zoals beschreven in Een Apache Kafka-cluster maken in Azure HDInsight met behulp van Azure Portal.
Zodra het cluster is gemaakt, gaat u naar de clustereigenschappen om de Kafka REST-proxy-URL vast te leggen.
Voorbeeld van clienttoepassing
U kunt de Python-code gebruiken om te communiceren met de REST-proxy in uw Kafka-cluster. Voer de volgende stappen uit om het codevoorbeeld te gebruiken:
Sla de voorbeeldcode op een computer op waarop Python is geïnstalleerd.
Installeer vereiste Python-afhankelijkheden door uit te
pip3 install msal
voeren.Wijzig de codesectie Configureer deze eigenschappen en werk de volgende eigenschappen voor uw omgeving bij:
Eigenschappen Beschrijving Tenant-id De Azure-tenant waar uw abonnement zich bevindt. Client ID De id voor de toepassing die u hebt geregistreerd in de beveiligingsgroep. Clientgeheim Het geheim voor de toepassing die u hebt geregistreerd in de beveiligingsgroep. Kafkarest_endpoint Haal deze waarde op via het tabblad Eigenschappen in het clusteroverzicht, zoals beschreven in de implementatiesectie. Deze moet de volgende indeling hebben: https://<clustername>-kafkarest.azurehdinsight.net
Voer vanaf de opdrachtregel het Python-bestand uit door het uit te voeren
sudo python3 <filename.py>
Deze code voert de volgende actie uit:
- Hiermee wordt een OAuth-token opgehaald uit Microsoft Entra-id.
- Laat zien hoe u een aanvraag indient bij de Kafka REST-proxy.
Zie de python AuthenticationContext-klasse voor meer informatie over het ophalen van OAuth-tokens in Python. Mogelijk ziet u een vertraging terwijl topics
deze niet is gemaakt of verwijderd via de Kafka REST-proxy daar worden weergegeven. Deze vertraging wordt veroorzaakt door het vernieuwen van de cache. Het waardeveld van de Producer-API is uitgebreid. Nu accepteert het JSON-objecten en een geserialiseerd formulier.
#Required Python packages
#pip3 install msal
import json
import msal
import random
import requests
import string
import sys
import time
def get_random_string():
letters = string.ascii_letters
random_string = ''.join(random.choice(letters) for i in range(7))
return random_string
#--------------------------Configure these properties-------------------------------#
# Tenant ID for your Azure Subscription
tenant_id = 'aaaabbbb-0000-cccc-1111-dddd2222eeee'
# Your Client Application Id
client_id = '00001111-aaaa-2222-bbbb-3333cccc4444'
# Your Client Credentials
client_secret = 'password'
# kafka rest proxy -endpoint
kafkarest_endpoint = "https://<clustername>-kafkarest.azurehdinsight.net"
#--------------------------Configure these properties-------------------------------#
# Get access token
# Scope
scope = 'https://hib.azurehdinsight.net/.default'
#Authority
authority = 'https://login.microsoftonline.com/' + tenant_id
app = msal.ConfidentialClientApplication(
client_id , client_secret, authority,
#cache - For details on how look at this example: https://github.com/Azure-Samples/ms-identity-python-webapp/blob/master/app.py
)
# The pattern to acquire a token looks like this.
result = None
result = app.acquire_token_for_client(scopes=[scope])
accessToken = result['access_token']
verify_https = True
request_timeout = 10
# Print access token
print("Access token: " + accessToken)
# API format
api_version = 'v1'
api_format = kafkarest_endpoint + '/{api_version}/{rest_api}'
get_topic_api = 'metadata/topics'
topic_api_format = 'topics/{topic_name}'
producer_api_format = 'producer/topics/{topic_name}'
consumer_api_format = 'consumer/topics/{topic_name}/partitions/{partition_id}/offsets/{offset}?count={count}' # by default count = 1
partitions_api_format = 'metadata/topics/{topic_name}/partitions'
partition_api_format = 'metadata/topics/{topic_name}/partitions/{partition_id}'
# Request header
headers = {
'Authorization': 'Bearer ' + accessToken,
'Content-type': 'application/json' # set Content-type to 'application/json'
}
# New topic
new_topic = 'hello_topic_' + get_random_string()
print("Topic " + new_topic + " is going to be used for demo.")
topics = []
# Create a new topic
# Example of topic config
topic_config = {
"partition_count": 1,
"replication_factor": 1,
"topic_properties": {
"retention.ms": 604800000,
"min.insync.replicas": "1"
}
}
create_topic_url = api_format.format(api_version=api_version, rest_api=topic_api_format.format(topic_name=new_topic))
response = requests.put(create_topic_url, headers=headers, json=topic_config, timeout=request_timeout, verify=verify_https)
print(response.content)
if response.ok:
while new_topic not in topics:
print("The new topic " + new_topic + " is not visible yet. sleep 30 seconds...")
time.sleep(30)
# List Topic
get_topic_url = api_format.format(api_version=api_version, rest_api=get_topic_api)
response = requests.get(get_topic_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
topic_list = response.json()
topics = topic_list.get("topics", [])
else:
print("Topic " + new_topic + " was created. Exit.")
sys.exit(1)
# Produce messages to new_topic
# Example payload of Producer REST API
payload_json = {
"records": [
{
"key": "key1",
"value": "**********" # A string
},
{
"partition": 0,
"value": 5 # An integer
},
{
"value": 3.14 # A floating number
},
{
"value": { # A JSON object
"id": 1,
"name": "HDInsight Kafka REST proxy"
}
},
{
"value": [ # A list of JSON objects
{
"id": 1,
"name": "HDInsight Kafka REST proxy 1"
},
{
"id": 2,
"name": "HDInsight Kafka REST proxy 2"
},
{
"id": 3,
"name": "HDInsight Kafka REST proxy 3"
}
]
},
{
"value": { # A nested JSON object
"group id": 1,
"HDI Kafka REST": {
"id": 1,
"name": "HDInsight Kafka REST proxy 1"
},
"HDI Kafka REST server info": {
"id": 1,
"name": "HDInsight Kafka REST proxy 1",
"servers": [
{
"server id": 1,
"server name": "HDInsight Kafka REST proxy server 1"
},
{
"server id": 2,
"server name": "HDInsight Kafka REST proxy server 2"
},
{
"server id": 3,
"server name": "HDInsight Kafka REST proxy server 3"
}
]
}
}
}
]
}
print("Payloads in a Producer request: \n", payload_json)
producer_url = api_format.format(api_version=api_version, rest_api=producer_api_format.format(topic_name=new_topic))
response = requests.post(producer_url, headers=headers, json=payload_json, timeout=request_timeout, verify=verify_https)
print(response.content)
# Consume messages from the topic
partition_id = 0
offset = 0
count = 2
while True:
consumer_url = api_format.format(api_version=api_version, rest_api=consumer_api_format.format(topic_name=new_topic, partition_id=partition_id, offset=offset, count=count))
print("Consuming " + str(count) + " messages from offset " + str(offset))
response = requests.get(consumer_url, headers=headers, timeout=request_timeout, verify=verify_https)
if response.ok:
messages = response.json()
print("Consumed messages: \n" + json.dumps(messages, indent=2))
next_offset = response.headers.get("NextOffset")
if offset == next_offset or not messages.get("records", []):
print("Consumer caught up with producer. Exit for now...")
break
offset = next_offset
else:
print("Error " + str(response.status_code))
break
# List partitions
get_partitions_url = api_format.format(api_version=api_version, rest_api=partitions_api_format.format(topic_name=new_topic))
print("Fetching partitions from " + get_partitions_url)
response = requests.get(get_partitions_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
partition_list = response.json()
print("Partition list: \n" + json.dumps(partition_list, indent=2))
# List a partition
get_partition_url = api_format.format(api_version=api_version, rest_api=partition_api_format.format(topic_name=new_topic, partition_id=partition_id))
print("Fetching metadata of a partition from " + get_partition_url)
response = requests.get(get_partition_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
partition = response.json()
print("Partition metadata: \n" + json.dumps(partition, indent=2))
Hieronder vindt u een ander voorbeeld over het ophalen van een token uit Azure voor REST-proxy met behulp van een curl-opdracht. U ziet dat we de scope=https://hib.azurehdinsight.net/.default
opgegeven gegevens nodig hebben tijdens het ophalen van een token.
curl -X POST -H "Content-Type: application/x-www-form-urlencoded" -d 'client_id=<clientid>&client_secret=<clientsecret>&grant_type=client_credentials&scope=https://hib.azurehdinsight.net/.default' 'https://login.microsoftonline.com/<tenantid>/oauth2/v2.0/token'