Interagir com os clusters do Apache Kafka no Azure HDInsight por meio de um proxy REST
O Proxy REST do Kafka permite interagir com o cluster Kafka por meio de uma API REST via HTTPS. Dessa maneira, os clientes do Kafka podem ficar fora da rede virtual. Os clientes podem fazer chamadas HTTPS simples e seguras para o cluster Kafka, em vez de usar as bibliotecas Kafka. Este artigo mostra como criar um proxy REST habilitado para o cluster do Kafka. Além disso, verá um exemplo de código que mostra como fazer chamadas ao proxy REST.
Referência da API REST
Para ver as operações compatíveis com a API REST do Kafka, confira Referência de API do Proxy REST do Kafka no HDInsight.
Tela de fundo
Para ver a especificação completa das operações compatíveis com a API, confira API do Proxy REST do Apache Kafka.
Ponto de extremidade de proxy REST
Quando você cria um cluster do Kafka no HDInsight com o proxy REST, um novo ponto de extremidade público é gerado para o cluster. É possível encontrá-lo no cluster Propriedades do HDInsight no portal do Azure.
Segurança
Acesso ao proxy REST do Kafka gerenciado com os grupos de segurança do Microsoft Entra. Ao criar o cluster do Kafka, forneça o grupo de segurança do Microsoft Entra com o acesso de ponto de extremidade REST. Os clientes do Kafka que precisam de acesso ao proxy REST devem ser registrados neste grupo pelo proprietário do grupo. O proprietário do grupo pode registrar por meio do Portal ou do PowerShell.
Para solicitações de ponto de extremidade de proxy REST, os aplicativos clientes precisam obter um token OAuth. O token é usado para verificar a associação ao grupo de segurança. Encontre uma Amostra de aplicativo cliente que mostre como obter um token OAuth. O aplicativo cliente passa o token OAuth na solicitação HTTPS para o proxy REST.
Observação
Consulte Gerenciar o acesso a aplicativos e recursos usando grupos do Microsoft Entra para saber mais sobre os grupos de segurança do Microsoft Entra. Para saber mais sobre como os tokens OAuth funcionam, confira Autorizar o acesso aos aplicativos web do Microsoft Entra por meio do fluxo de concessão de código do OAuth 2.0.
Proxy REST do Kafka com os Grupos de Segurança de Rede
Se você colocar a VNet e controlar o tráfego de rede nos grupos de segurança de rede, permita o tráfego de entrada na porta 9400 além de na porta 443. Isso assegura que o servidor proxy REST do Kafka fique acessível.
Pré-requisitos
Registrar um aplicativo com o Microsoft Entra ID. Os aplicativos cliente que você escreve para interagir com o proxy REST do Kafka usam a ID e o segredo desse aplicativo para autenticação no Azure.
Crie um grupo de segurança do Microsoft Entra. Adicione o aplicativo que você registrou com o Microsoft Entra ID ao grupo de segurança como membro do grupo. Esse grupo de segurança será usado para controlar quais aplicativos têm permissão para interagir com o proxy REST. Para obter mais informações sobre como criar grupos do Microsoft Entra, confira Criar um grupo básico e adicionar membros usando o Microsoft Entra ID.
Valide se o grupo é do tipo Segurança.
Valide se o aplicativo é membro do grupo.
Criar um cluster do Kafka com o proxy REST habilitado
As etapas usam o portal do Azure. Para ver um exemplo de como usar a CLI do Azure, confira Criar cluster do proxy REST do Apache Kafka com a CLI do Azure.
Durante o fluxo de trabalho de criação do cluster do Kafka, na guia Segurança + rede, verifique a opção Habilitar proxy REST do Kafka.
Clique em Selecionar Grupo de Segurança. Na lista de grupos de segurança, selecione aqueles que você quer que tenham acesso ao proxy REST. Use a caixa de pesquisa para encontrar o grupo de segurança apropriado. Clique no botão Selecionar na parte inferior.
Conclua as etapas restantes para criar o cluster, conforme descrito em Criar um cluster do Apache Kafka no Azure HDInsight usando o portal do Azure.
Após a criação do cluster, acesse as propriedades dele para registrar a URL do proxy REST do Kafka.
Exemplo de aplicativo cliente
Você pode usar o código Python para interagir com o proxy REST no seu cluster do Kafka. Para usar o exemplo de código, siga estas etapas:
Salve o exemplo de código em um computador com o Python instalado.
Instale as dependências necessárias do Python executando
pip3 install msal
.Modifique a seção de código Configurar estas propriedades e atualize as seguintes propriedades do seu ambiente:
Propriedade Descrição ID do locatário O locatário do Azure em que sua assinatura está. ID do Cliente A ID do aplicativo que você registrou no grupo de segurança. Segredo do cliente O segredo para o aplicativo que você registrou no grupo de segurança. Kafkarest_endpoint Obtenha esse valor na guia Propriedades da visão geral do cluster, conforme descrito na seção de implantação. Ele deve estar no seguinte formato: https://<clustername>-kafkarest.azurehdinsight.net
Na linha de comando, execute o arquivo Python com
sudo python3 <filename.py>
Esse código faz o seguinte:
- Busca um token OAuth do Microsoft Entra ID.
- Mostra como fazer uma solicitação para o proxy REST do Kafka.
Para saber mais sobre como obter tokens OAuth no Python, veja Classe AuthenticationContext do Python. Talvez haja um atraso enquanto topics
que não foram criados ou excluídos por meio do proxy REST do Kafka se reflitam aqui. Esse atraso ocorre por causa da atualização do cache. O campo valor da API do produtor foi aprimorado. Agora, ele aceita objetos JSON e qualquer formulário serializado.
#Required Python packages
#pip3 install msal
import json
import msal
import random
import requests
import string
import sys
import time
def get_random_string():
letters = string.ascii_letters
random_string = ''.join(random.choice(letters) for i in range(7))
return random_string
#--------------------------Configure these properties-------------------------------#
# Tenant ID for your Azure Subscription
tenant_id = 'aaaabbbb-0000-cccc-1111-dddd2222eeee'
# Your Client Application Id
client_id = '00001111-aaaa-2222-bbbb-3333cccc4444'
# Your Client Credentials
client_secret = 'password'
# kafka rest proxy -endpoint
kafkarest_endpoint = "https://<clustername>-kafkarest.azurehdinsight.net"
#--------------------------Configure these properties-------------------------------#
# Get access token
# Scope
scope = 'https://hib.azurehdinsight.net/.default'
#Authority
authority = 'https://login.microsoftonline.com/' + tenant_id
app = msal.ConfidentialClientApplication(
client_id , client_secret, authority,
#cache - For details on how look at this example: https://github.com/Azure-Samples/ms-identity-python-webapp/blob/master/app.py
)
# The pattern to acquire a token looks like this.
result = None
result = app.acquire_token_for_client(scopes=[scope])
accessToken = result['access_token']
verify_https = True
request_timeout = 10
# Print access token
print("Access token: " + accessToken)
# API format
api_version = 'v1'
api_format = kafkarest_endpoint + '/{api_version}/{rest_api}'
get_topic_api = 'metadata/topics'
topic_api_format = 'topics/{topic_name}'
producer_api_format = 'producer/topics/{topic_name}'
consumer_api_format = 'consumer/topics/{topic_name}/partitions/{partition_id}/offsets/{offset}?count={count}' # by default count = 1
partitions_api_format = 'metadata/topics/{topic_name}/partitions'
partition_api_format = 'metadata/topics/{topic_name}/partitions/{partition_id}'
# Request header
headers = {
'Authorization': 'Bearer ' + accessToken,
'Content-type': 'application/json' # set Content-type to 'application/json'
}
# New topic
new_topic = 'hello_topic_' + get_random_string()
print("Topic " + new_topic + " is going to be used for demo.")
topics = []
# Create a new topic
# Example of topic config
topic_config = {
"partition_count": 1,
"replication_factor": 1,
"topic_properties": {
"retention.ms": 604800000,
"min.insync.replicas": "1"
}
}
create_topic_url = api_format.format(api_version=api_version, rest_api=topic_api_format.format(topic_name=new_topic))
response = requests.put(create_topic_url, headers=headers, json=topic_config, timeout=request_timeout, verify=verify_https)
print(response.content)
if response.ok:
while new_topic not in topics:
print("The new topic " + new_topic + " is not visible yet. sleep 30 seconds...")
time.sleep(30)
# List Topic
get_topic_url = api_format.format(api_version=api_version, rest_api=get_topic_api)
response = requests.get(get_topic_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
topic_list = response.json()
topics = topic_list.get("topics", [])
else:
print("Topic " + new_topic + " was created. Exit.")
sys.exit(1)
# Produce messages to new_topic
# Example payload of Producer REST API
payload_json = {
"records": [
{
"key": "key1",
"value": "**********" # A string
},
{
"partition": 0,
"value": 5 # An integer
},
{
"value": 3.14 # A floating number
},
{
"value": { # A JSON object
"id": 1,
"name": "HDInsight Kafka REST proxy"
}
},
{
"value": [ # A list of JSON objects
{
"id": 1,
"name": "HDInsight Kafka REST proxy 1"
},
{
"id": 2,
"name": "HDInsight Kafka REST proxy 2"
},
{
"id": 3,
"name": "HDInsight Kafka REST proxy 3"
}
]
},
{
"value": { # A nested JSON object
"group id": 1,
"HDI Kafka REST": {
"id": 1,
"name": "HDInsight Kafka REST proxy 1"
},
"HDI Kafka REST server info": {
"id": 1,
"name": "HDInsight Kafka REST proxy 1",
"servers": [
{
"server id": 1,
"server name": "HDInsight Kafka REST proxy server 1"
},
{
"server id": 2,
"server name": "HDInsight Kafka REST proxy server 2"
},
{
"server id": 3,
"server name": "HDInsight Kafka REST proxy server 3"
}
]
}
}
}
]
}
print("Payloads in a Producer request: \n", payload_json)
producer_url = api_format.format(api_version=api_version, rest_api=producer_api_format.format(topic_name=new_topic))
response = requests.post(producer_url, headers=headers, json=payload_json, timeout=request_timeout, verify=verify_https)
print(response.content)
# Consume messages from the topic
partition_id = 0
offset = 0
count = 2
while True:
consumer_url = api_format.format(api_version=api_version, rest_api=consumer_api_format.format(topic_name=new_topic, partition_id=partition_id, offset=offset, count=count))
print("Consuming " + str(count) + " messages from offset " + str(offset))
response = requests.get(consumer_url, headers=headers, timeout=request_timeout, verify=verify_https)
if response.ok:
messages = response.json()
print("Consumed messages: \n" + json.dumps(messages, indent=2))
next_offset = response.headers.get("NextOffset")
if offset == next_offset or not messages.get("records", []):
print("Consumer caught up with producer. Exit for now...")
break
offset = next_offset
else:
print("Error " + str(response.status_code))
break
# List partitions
get_partitions_url = api_format.format(api_version=api_version, rest_api=partitions_api_format.format(topic_name=new_topic))
print("Fetching partitions from " + get_partitions_url)
response = requests.get(get_partitions_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
partition_list = response.json()
print("Partition list: \n" + json.dumps(partition_list, indent=2))
# List a partition
get_partition_url = api_format.format(api_version=api_version, rest_api=partition_api_format.format(topic_name=new_topic, partition_id=partition_id))
print("Fetching metadata of a partition from " + get_partition_url)
response = requests.get(get_partition_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
partition = response.json()
print("Partition metadata: \n" + json.dumps(partition, indent=2))
Veja abaixo outro exemplo de como obter um token do Azure para o proxy REST usando um comando curl. Observe que o scope=https://hib.azurehdinsight.net/.default
especificado é necessário para obter o token.
curl -X POST -H "Content-Type: application/x-www-form-urlencoded" -d 'client_id=<clientid>&client_secret=<clientsecret>&grant_type=client_credentials&scope=https://hib.azurehdinsight.net/.default' 'https://login.microsoftonline.com/<tenantid>/oauth2/v2.0/token'