Sdílet prostřednictvím


Interakce s clustery Apache Kafka ve službě Azure HDInsight pomocí proxy REST

Proxy server REST Kafka umožňuje interakci s clusterem Kafka prostřednictvím rozhraní REST API přes HTTPS. Tato akce znamená, že klienti Kafka můžou být mimo vaši virtuální síť. Klienti můžou usnadnit a zabezpečit volání HTTPS clusteru Kafka místo toho, aby se museli spoléhat na knihovny Kafka. V tomto článku se dozvíte, jak vytvořit cluster Kafka s povoleným proxy serverem REST. Poskytuje také vzorový kód, který ukazuje, jak provádět volání proxy serveru REST.

REST API – referenční informace

Informace o operacích podporovaných rozhraním Kafka REST API najdete v referenčních informacích k rozhraní REST API služby HDInsight Kafka.

Pozadí

Kafka REST proxy design.

Úplnou specifikaci operací podporovaných rozhraním API najdete v tématu Apache Kafka REST Proxy API.

Koncový bod proxy serveru REST

Vytvoření clusteru HDInsight Kafka s proxy serverem REST vytvoří pro váš cluster nový veřejný koncový bod, který najdete ve vlastnostech clusteru HDInsight na webu Azure Portal.

Zabezpečení

Přístup k proxy serveru Kafka REST spravovanému pomocí skupin zabezpečení Microsoft Entra Při vytváření clusteru Kafka poskytněte skupině zabezpečení Microsoft Entra přístup ke koncovému bodu REST. Klienti Kafka, kteří potřebují přístup k proxy serveru REST, by měli být zaregistrovaní v této skupině vlastníkem skupiny. Vlastník skupiny se může zaregistrovat prostřednictvím portálu nebo přes PowerShell.

U požadavků na koncový bod proxy serveru REST by klientské aplikace měly získat token OAuth. Token slouží k ověření členství ve skupině zabezpečení. Ukázka vyhledání klientské aplikace ukazuje, jak získat token OAuth. Klientská aplikace předá token OAuth v požadavku HTTPS na proxy server REST.

Poznámka:

Další informace o skupinách zabezpečení Microsoft Entra najdete v tématu Správa přístupu k aplikacím a prostředkům pomocí skupin Microsoft Entra. Další informace o tom, jak fungují tokeny OAuth, najdete v tématu Autorizace přístupu k webovým aplikacím Microsoft Entra pomocí toku udělení kódu OAuth 2.0.

Proxy server REST Kafka se skupinami zabezpečení sítě

Pokud používáte vlastní virtuální síť a řídíte síťový provoz pomocí skupin zabezpečení sítě, povolte příchozí provoz na portu 9400 kromě portu 443. Tím se zajistí, že proxy server KAFKA REST je dostupný.

Požadavky

  1. Registrace aplikace pomocí Microsoft Entra ID. Klientské aplikace, které zapisujete pro interakci s proxy serverem REST Kafka, používají k ověření v Azure ID a tajný klíč této aplikace.

  2. Vytvořte skupinu zabezpečení Microsoft Entra. Přidejte aplikaci, kterou jste zaregistrovali s ID Microsoft Entra, do skupiny zabezpečení jako člena skupiny. Tato skupina zabezpečení se použije k řízení aplikací, které umožňují interakci s proxy serverem REST. Další informace o vytváření skupin Microsoft Entra naleznete v tématu Vytvoření základní skupiny a přidání členů pomocí Microsoft Entra ID.

    Ověřte, že je skupina typu Zabezpečení. Security Group.

    Ověřte, že je aplikace členem skupiny. Check Membership.

Vytvoření clusteru Kafka s povoleným proxy serverem REST

Postup se používá na webu Azure Portal. Příklad použití Azure CLI najdete v tématu Vytvoření proxy clusteru Apache Kafka REST pomocí Azure CLI.

  1. Během pracovního postupu vytváření clusteru Kafka na kartě Zabezpečení a sítě zaškrtněte možnost Povolit proxy server REST Kafka.

    Screenshot shows the Create HDInsight cluster page with Security + networking selected.

  2. Klikněte na Vybrat skupinu zabezpečení. V seznamu skupin zabezpečení vyberte skupinu zabezpečení, ke které chcete mít přístup k proxy serveru REST. Pomocí vyhledávacího pole můžete najít příslušnou skupinu zabezpečení. Klikněte na tlačítko Vybrat v dolní části.

    Screenshot shows the Create HDInsight cluster page with the option to select a security group.

  3. Dokončete zbývající kroky a vytvořte cluster, jak je popsáno v tématu Vytvoření clusteru Apache Kafka ve službě Azure HDInsight pomocí webu Azure Portal.

  4. Po vytvoření clusteru přejděte do vlastností clusteru a zaznamenejte adresu URL proxy proxy serveru Kafka.

    view REST proxy URL.

Ukázka klientské aplikace

Kód Pythonu můžete použít k interakci s proxy serverem REST ve vašem clusteru Kafka. Pokud chcete použít ukázku kódu, postupujte takto:

  1. Uložte ukázkový kód na počítač s nainstalovaným Pythonem.

  2. Nainstalujte požadované závislosti Pythonu pip3 install msalspuštěním příkazu .

  3. Upravte oddíl kódu: Nakonfigurujte tyto vlastnosti a aktualizujte následující vlastnosti pro vaše prostředí:

    Vlastnost Popis
    ID tenanta Tenant Azure, ve kterém je vaše předplatné.
    Client ID ID aplikace, kterou jste zaregistrovali ve skupině zabezpečení.
    Tajný klíč klienta Tajný kód pro aplikaci, kterou jste zaregistrovali ve skupině zabezpečení.
    Kafkarest_endpoint Tuto hodnotu získáte z karty Vlastnosti v přehledu clusteru, jak je popsáno v části nasazení. Měl by být v následujícím formátu – https://<clustername>-kafkarest.azurehdinsight.net
  4. Z příkazového řádku spusťte soubor Pythonu spuštěním příkazu< a1/>. sudo python3 <filename.py>

Tento kód provede následující akci:

  1. Načte token OAuth z ID Microsoft Entra.
  2. Ukazuje, jak vytvořit požadavek na proxy server REST Kafka.

Další informace o získání tokenů OAuth v Pythonu najdete v tématu Třída Python AuthenticationContext. Může se zobrazit prodleva topics , která se nevytvořila nebo neodstranila prostřednictvím proxy serveru REST Kafka. Důvodem tohoto zpoždění je aktualizace mezipaměti. Pole hodnoty rozhraní API pro producenty bylo vylepšeno. Teď přijímá objekty JSON a všechny serializované formuláře.

#Required Python packages
#pip3 install msal

import json
import msal
import random
import requests
import string
import sys
import time

def get_random_string():
    letters = string.ascii_letters
    random_string = ''.join(random.choice(letters) for i in range(7))

    return random_string


#--------------------------Configure these properties-------------------------------#
# Tenant ID for your Azure Subscription
tenant_id = 'aaaabbbb-0000-cccc-1111-dddd2222eeee'
# Your Client Application Id
client_id = '00001111-aaaa-2222-bbbb-3333cccc4444'
# Your Client Credentials
client_secret = 'password'
# kafka rest proxy -endpoint
kafkarest_endpoint = "https://<clustername>-kafkarest.azurehdinsight.net"
#--------------------------Configure these properties-------------------------------#

# Get access token
# Scope
scope = 'https://hib.azurehdinsight.net/.default'
#Authority
authority = 'https://login.microsoftonline.com/' + tenant_id

app = msal.ConfidentialClientApplication(
    client_id , client_secret, authority,
    #cache - For details on how look at this example: https://github.com/Azure-Samples/ms-identity-python-webapp/blob/master/app.py
)

# The pattern to acquire a token looks like this.
result = None
result = app.acquire_token_for_client(scopes=[scope])
accessToken = result['access_token']
verify_https = True
request_timeout = 10

# Print access token
print("Access token: " + accessToken)

# API format
api_version = 'v1'
api_format = kafkarest_endpoint + '/{api_version}/{rest_api}'
get_topic_api = 'metadata/topics'
topic_api_format = 'topics/{topic_name}'
producer_api_format = 'producer/topics/{topic_name}'
consumer_api_format = 'consumer/topics/{topic_name}/partitions/{partition_id}/offsets/{offset}?count={count}'  # by default count = 1
partitions_api_format = 'metadata/topics/{topic_name}/partitions'
partition_api_format = 'metadata/topics/{topic_name}/partitions/{partition_id}'

# Request header
headers = {
    'Authorization': 'Bearer ' + accessToken,
    'Content-type': 'application/json'          # set Content-type to 'application/json'
}

# New topic
new_topic = 'hello_topic_' + get_random_string()
print("Topic " + new_topic + " is going to be used for demo.")

topics = []

# Create a  new topic
# Example of topic config
topic_config = {
    "partition_count": 1,
    "replication_factor": 1,
    "topic_properties": {
        "retention.ms": 604800000,
        "min.insync.replicas": "1"
    }
}

create_topic_url = api_format.format(api_version=api_version, rest_api=topic_api_format.format(topic_name=new_topic))
response = requests.put(create_topic_url, headers=headers, json=topic_config, timeout=request_timeout, verify=verify_https)
print(response.content)

if response.ok:
    while new_topic not in topics:
        print("The new topic " + new_topic + " is not visible yet. sleep 30 seconds...")
        time.sleep(30)
        # List Topic
        get_topic_url = api_format.format(api_version=api_version, rest_api=get_topic_api)

        response = requests.get(get_topic_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
        topic_list = response.json()
        topics = topic_list.get("topics", [])
else:
    print("Topic " + new_topic + " was created. Exit.")
    sys.exit(1)

# Produce messages to new_topic
# Example payload of Producer REST API
payload_json = {
    "records": [
        {
            "key": "key1",
            "value": "**********"         # A string                              
        },
        {
            "partition": 0,
            "value": 5                    # An integer
        },
        {
            "value": 3.14                 # A floating number
        },
        {
            "value": {                    # A JSON object
                "id": 1,
                "name": "HDInsight Kafka REST proxy"
            }
        },
        {
            "value": [                    # A list of JSON objects
                {
                    "id": 1,
                    "name": "HDInsight Kafka REST proxy 1"
                },
                {
                    "id": 2,
                    "name": "HDInsight Kafka REST proxy 2"
                },
                {
                    "id": 3,
                    "name": "HDInsight Kafka REST proxy 3"
                }
            ]
        },
        {
            "value": {                  # A nested JSON object
                "group id": 1,
                "HDI Kafka REST": {
                    "id": 1,
                    "name": "HDInsight Kafka REST proxy 1"
                },
                "HDI Kafka REST server info": {
                    "id": 1,
                    "name": "HDInsight Kafka REST proxy 1",
                    "servers": [
                        {
                            "server id": 1,
                            "server name": "HDInsight Kafka REST proxy server 1"
                        },
                        {
                            "server id": 2,
                            "server name": "HDInsight Kafka REST proxy server 2"
                        },
                        {
                            "server id": 3,
                            "server name": "HDInsight Kafka REST proxy server 3"
                        }
                    ]
                }
            }
        }
    ]
}

print("Payloads in a Producer request: \n", payload_json)
producer_url = api_format.format(api_version=api_version, rest_api=producer_api_format.format(topic_name=new_topic))
response = requests.post(producer_url, headers=headers, json=payload_json, timeout=request_timeout, verify=verify_https)
print(response.content)

# Consume messages from the topic
partition_id = 0
offset = 0
count = 2

while True:
    consumer_url = api_format.format(api_version=api_version, rest_api=consumer_api_format.format(topic_name=new_topic, partition_id=partition_id, offset=offset, count=count))
    print("Consuming " + str(count) + " messages from offset " + str(offset))

    response = requests.get(consumer_url, headers=headers, timeout=request_timeout, verify=verify_https)

    if response.ok:
        messages = response.json()
        print("Consumed messages: \n" + json.dumps(messages, indent=2))
        next_offset = response.headers.get("NextOffset")
        if offset == next_offset or not messages.get("records", []):
            print("Consumer caught up with producer. Exit for now...")
            break

        offset = next_offset

    else:
        print("Error " + str(response.status_code))
        break
        
# List partitions
get_partitions_url = api_format.format(api_version=api_version, rest_api=partitions_api_format.format(topic_name=new_topic))
print("Fetching partitions from  " + get_partitions_url)

response = requests.get(get_partitions_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
partition_list = response.json()
print("Partition list: \n" + json.dumps(partition_list, indent=2))

# List a partition
get_partition_url = api_format.format(api_version=api_version, rest_api=partition_api_format.format(topic_name=new_topic, partition_id=partition_id))
print("Fetching metadata of a partition from  " + get_partition_url)

response = requests.get(get_partition_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
partition = response.json()
print("Partition metadata: \n" + json.dumps(partition, indent=2))

Níže najdete další ukázku, jak získat token z Azure pro proxy REST pomocí příkazu curl. Všimněte si, že při získávání tokenu scope=https://hib.azurehdinsight.net/.default potřebujeme zadaný token.

curl -X POST -H "Content-Type: application/x-www-form-urlencoded" -d 'client_id=<clientid>&client_secret=<clientsecret>&grant_type=client_credentials&scope=https://hib.azurehdinsight.net/.default' 'https://login.microsoftonline.com/<tenantid>/oauth2/v2.0/token'

Další kroky