Comunicar com o Hub IoT usando o protocolo AMQP
O Hub IoT do Azure dá suporte ao OASIS AMQP (Advanced Message Queuing Protocol) versão 1.0 para fornecer uma variedade de funcionalidades por meio de pontos de extremidade voltados para o dispositivo e para o serviço. Este documento descreve o uso de clientes do AMQP para se conectar a um hub IoT para usar a funcionalidade do Hub IoT.
Cliente de serviço
Conectar e autenticar para um hub IoT (cliente de serviço)
Para se conectar a um hub IoT usando o AMQP, um cliente pode usar a autenticação CBS (segurança baseada em declarações) ou a SASL (Simple Authentication and Security Layer).
As informações a seguir são necessárias para o cliente de serviço:
Informações do | Valor |
---|---|
Nome do host do hub IoT | <iot-hub-name>.azure-devices.net |
Nome da chave | service |
Chave de acesso | Uma chave primária ou secundária associada ao serviço |
Assinatura de acesso compartilhado | Uma assinatura de acesso compartilhada de curta duração tem o formato a seguir:SharedAccessSignature sig={signature-string}&se={expiry}&skn={policyName}&sr={URL-encoded-resourceURI} . Para obter o código e gerar essa assinatura, consulte Controlar acesso ao Hub IoT do Azure. |
O trecho de código a seguir usa a biblioteca uAMQP no Python para se conectar a um hub IOT por meio de um link de remetente.
import uamqp
import urllib
import time
# Use generate_sas_token implementation available here:
# https://learn.microsoft.com/azure/iot-hub/iot-hub-devguide-security#sas-token-structure
from helper import generate_sas_token
iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
policy_name = 'service'
access_key = '<primary-or-secondary-key>'
operation = '<operation-link-name>' # example: '/messages/devicebound'
username = '{policy_name}@sas.root.{iot_hub_name}'.format(
iot_hub_name=iot_hub_name, policy_name=policy_name)
sas_token = generate_sas_token(hostname, access_key, policy_name)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
urllib.quote_plus(sas_token), hostname, operation)
# Create a send or receive client
send_client = uamqp.SendClient(uri, debug=True)
receive_client = uamqp.ReceiveClient(uri, debug=True)
Invocar mensagens da nuvem para o dispositivo (cliente de serviço)
Para saber mais sobre a troca de mensagens da nuvem para o dispositivo entre o serviço e o hub IoT e entre o dispositivo e o hub IoT, consulte Enviar mensagens da nuvem para o dispositivo a partir do hub IoT. O cliente de serviço usa dois links para enviar mensagens e receber comentários para mensagens enviadas anteriormente de dispositivos, conforme descrito na tabela a seguir:
Criado por | Tipo de vínculo | Caminho do link | Descrição |
---|---|---|---|
Serviço | Link do remetente | /messages/devicebound |
As mensagens da nuvem para o dispositivo destinadas para dispositivos são enviadas para esse link pelo serviço. As mensagens enviadas por esse link têm sua To propriedade definida como o caminho do link do destinatário do dispositivo de destino /devices/<deviceID>/messages/devicebound . |
Serviço | Link do destinatário | /messages/serviceBound/feedback |
As mensagens de comentários de conclusão, rejeição e desistências provenientes de dispositivos recebidos neste link pelo serviço. Para saber mais sobre mensagens de comentários, consulte Enviar mensagens da nuvem para o dispositivo de um hub IoT. |
O trecho de código a seguir demonstra como criar uma mensagem da nuvem para o dispositivo e enviá-la para um dispositivo usando a biblioteca uAMQP no Python.
import uuid
# Create a message and set message property 'To' to the device-bound link on device
msg_id = str(uuid.uuid4())
msg_content = b"Message content goes here!"
device_id = '<device-id>'
to = '/devices/{device_id}/messages/devicebound'.format(device_id=device_id)
ack = 'full' # Alternative values are 'positive', 'negative', and 'none'
app_props = {'iothub-ack': ack}
msg_props = uamqp.message.MessageProperties(message_id=msg_id, to=to)
msg = uamqp.Message(msg_content, properties=msg_props,
application_properties=app_props)
# Send the message by using the send client that you created and connected to the IoT hub earlier
send_client.queue_message(msg)
results = send_client.send_all_messages()
# Close the client if it's not needed
send_client.close()
Para receber comentários, o cliente de serviço cria um link de destinatário. O trecho de código a seguir demonstra como criar um link usando a biblioteca uAMQP no Python:
import json
operation = '/messages/serviceBound/feedback'
# ...
# Re-create the URI by using the preceding feedback path and authenticate it
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
urllib.quote_plus(sas_token), hostname, operation)
receive_client = uamqp.ReceiveClient(uri, debug=True)
batch = receive_client.receive_message_batch(max_batch_size=10)
for msg in batch:
print('received a message')
# Check content_type in message property to identify feedback messages coming from device
if msg.properties.content_type == 'application/vnd.microsoft.iothub.feedback.json':
msg_body_raw = msg.get_data()
msg_body_str = ''.join(msg_body_raw)
msg_body = json.loads(msg_body_str)
print(json.dumps(msg_body, indent=2))
print('******************')
for feedback in msg_body:
print('feedback received')
print('\tstatusCode: ' + str(feedback['statusCode']))
print('\toriginalMessageId: ' + str(feedback['originalMessageId']))
print('\tdeviceId: ' + str(feedback['deviceId']))
print
else:
print('unknown message:', msg.properties.content_type)
Conforme mostrado no código anterior, uma mensagem de comentários da nuvem para o dispositivo tem um tipo de conteúdo de application/vnd.microsoft.iothub.feedback.jsno. Você pode usar as propriedades no corpo JSON da mensagem para inferir o status de entrega da mensagem original:
A chave
statusCode
no corpo de comentários tem um dos valores a seguir: Success, Expired, DeliveryCountExceeded, Rejected ou Purged.A chave
deviceId
no corpo de comentários tem a ID do dispositivo de destino.A chave
originalMessageId
no corpo de comentários tem a ID da mensagem original da nuvem para o dispositivo enviada pelo serviço. Você pode usar esse status de entrega para correlacionar os comentários às mensagens da nuvem para o dispositivo.
Receber mensagens de telemetria (cliente de serviço)
Por padrão, o Hub IoT armazena mensagens de telemetria de dispositivo ingeridas em um hub de eventos interno. O cliente de serviço pode usar o protocolo AMQP para receber os eventos armazenados.
Para essa finalidade, o cliente de serviço primeiro precisa se conectar ao ponto de extremidade do hub IoT e receber um endereço de redirecionamento para os hubs de eventos internos. Em seguida, o cliente de serviço usa o endereço fornecido para se conectar ao hub de eventos interno.
Em cada etapa, o cliente precisa apresentar as informações a seguir:
Credenciais de serviço válidas (token de assinatura de acesso compartilhado do serviço).
Um caminho bem formatado para a partição do grupo de consumidores da qual ele pretende recuperar mensagens. Para um determinado grupo de consumidores e ID de partição, o caminho tem o seguinte formato:
/messages/events/ConsumerGroups/<consumer_group>/Partitions/<partition_id>
(o grupo de consumidores padrão é$Default
).Um predicado de filtragem opcional para designar um ponto de partida na partição. Esse predicado pode ser um número sequencial, um deslocamento ou carimbo de data/hora enfileirado.
O trecho de código a seguir usa a biblioteca uAMQP no Python para demonstrar as etapas anteriores:
import json
import uamqp
import urllib
import time
# Use the generate_sas_token implementation that's available here: https://learn.microsoft.com/azure/iot-hub/iot-hub-devguide-security#sas-token-structure
from helper import generate_sas_token
iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
policy_name = 'service'
access_key = '<primary-or-secondary-key>'
operation = '/messages/events/ConsumerGroups/{consumer_group}/Partitions/{p_id}'.format(
consumer_group='$Default', p_id=0)
username = '{policy_name}@sas.root.{iot_hub_name}'.format(
policy_name=policy_name, iot_hub_name=iot_hub_name)
sas_token = generate_sas_token(hostname, access_key, policy_name)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
urllib.quote_plus(sas_token), hostname, operation)
# Optional filtering predicates can be specified by using endpoint_filter
# Valid predicates include:
# - amqp.annotation.x-opt-sequence-number
# - amqp.annotation.x-opt-offset
# - amqp.annotation.x-opt-enqueued-time
# Set endpoint_filter variable to None if no filter is needed
endpoint_filter = b'amqp.annotation.x-opt-sequence-number > 2995'
# Helper function to set the filtering predicate on the source URI
def set_endpoint_filter(uri, endpoint_filter=''):
source_uri = uamqp.address.Source(uri)
source_uri.set_filter(endpoint_filter)
return source_uri
receive_client = uamqp.ReceiveClient(
set_endpoint_filter(uri, endpoint_filter), debug=True)
try:
batch = receive_client.receive_message_batch(max_batch_size=5)
except uamqp.errors.LinkRedirect as redirect:
# Once a redirect error is received, close the original client and recreate a new one to the re-directed address
receive_client.close()
sas_auth = uamqp.authentication.SASTokenAuth.from_shared_access_key(
redirect.address, policy_name, access_key)
receive_client = uamqp.ReceiveClient(set_endpoint_filter(
redirect.address, endpoint_filter), auth=sas_auth, debug=True)
# Start receiving messages in batches
batch = receive_client.receive_message_batch(max_batch_size=5)
for msg in batch:
print('*** received a message ***')
print(''.join(msg.get_data()))
print('\t: ' + str(msg.annotations['x-opt-sequence-number']))
print('\t: ' + str(msg.annotations['x-opt-offset']))
print('\t: ' + str(msg.annotations['x-opt-enqueued-time']))
Para uma determinada ID de dispositivo, o hub IoT usa um hash da ID do dispositivo para determinar em qual partição armazenar suas mensagens. O trecho de código anterior demonstra como os eventos são recebidos de uma única partição do tipo. No entanto, observe que um aplicativo típico geralmente precisa recuperar eventos que são armazenados em todas as partições do hub de eventos.
Cliente de Dispositivos
Conectar e autenticar para um hub IoT (cliente de dispositivo)
Para se conectar a um hub IoT usando o AMQP, um dispositivo pode usar a autenticação CBS (segurança baseada em declarações) ou a SASL (Simple Authentication and Security Layer).
As informações a seguir são necessárias para o cliente de dispositivo:
Informações do | Valor |
---|---|
Nome do host do hub IoT | <iot-hub-name>.azure-devices.net |
Chave de acesso | Uma chave primária ou secundária associada ao dispositivo |
Assinatura de acesso compartilhado | Uma assinatura de acesso compartilhada de curta duração tem o formato a seguir:SharedAccessSignature sig={signature-string}&se={expiry}&skn={policyName}&sr={URL-encoded-resourceURI} . Para obter o código e gerar essa assinatura, consulte Controlar acesso ao Hub IoT do Azure. |
O trecho de código a seguir usa a biblioteca uAMQP no Python para se conectar a um hub IOT por meio de um link de remetente.
import uamqp
import urllib
import uuid
# Use generate_sas_token implementation available here:
# https://learn.microsoft.com/azure/iot-hub/iot-hub-devguide-security#sas-token-structure
from helper import generate_sas_token
iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
device_id = '<device-id>'
access_key = '<primary-or-secondary-key>'
username = '{device_id}@sas.{iot_hub_name}'.format(
device_id=device_id, iot_hub_name=iot_hub_name)
sas_token = generate_sas_token('{hostname}/devices/{device_id}'.format(
hostname=hostname, device_id=device_id), access_key, None)
# e.g., '/devices/{device_id}/messages/devicebound'
operation = '<operation-link-name>'
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
urllib.quote_plus(sas_token), hostname, operation)
receive_client = uamqp.ReceiveClient(uri, debug=True)
send_client = uamqp.SendClient(uri, debug=True)
Os caminhos de link a seguir têm suporte como operações de dispositivo:
Criado por | Tipo de vínculo | Caminho do link | Descrição |
---|---|---|---|
Dispositivos | Link do destinatário | /devices/<deviceID>/messages/devicebound |
As mensagens da nuvem para o dispositivo destinadas para dispositivos são recebidas neste link por cada dispositivo de destino. |
Dispositivos | Link do remetente | /devices/<deviceID>/messages/events |
As mensagens do dispositivo para a nuvem enviadas de um dispositivo são enviadas por esse link. |
Dispositivos | Link do remetente | /messages/serviceBound/feedback |
Comentários de mensagem da nuvem para o dispositivo enviados ao serviço por meio desse link pelos dispositivos. |
Receber comandos da nuvem para o dispositivo (cliente do dispositivo)
Os comandos da nuvem para o dispositivo enviados aos dispositivos chegam em um /devices/<deviceID>/messages/devicebound
link. Os dispositivos podem receber essas mensagens em lotes e usar o conteúdo de dados da mensagem, as propriedades da mensagem, as anotações ou as propriedades do aplicativo na mensagem, conforme necessário.
O trecho de código a seguir usa a biblioteca uAMQP no Python) para receber mensagens da nuvem para o dispositivo por um dispositivo.
# ...
# Create a receive client for the cloud-to-device receive link on the device
operation = '/devices/{device_id}/messages/devicebound'.format(
device_id=device_id)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
urllib.quote_plus(sas_token), hostname, operation)
receive_client = uamqp.ReceiveClient(uri, debug=True)
while True:
batch = receive_client.receive_message_batch(max_batch_size=5)
for msg in batch:
print('*** received a message ***')
print(''.join(msg.get_data()))
# Property 'to' is set to: '/devices/device1/messages/devicebound',
print('\tto: ' + str(msg.properties.to))
# Property 'message_id' is set to value provided by the service
print('\tmessage_id: ' + str(msg.properties.message_id))
# Other properties are present if they were provided by the service
print('\tcreation_time: ' + str(msg.properties.creation_time))
print('\tcorrelation_id: ' +
str(msg.properties.correlation_id))
print('\tcontent_type: ' + str(msg.properties.content_type))
print('\treply_to_group_id: ' +
str(msg.properties.reply_to_group_id))
print('\tsubject: ' + str(msg.properties.subject))
print('\tuser_id: ' + str(msg.properties.user_id))
print('\tgroup_sequence: ' +
str(msg.properties.group_sequence))
print('\tcontent_encoding: ' +
str(msg.properties.content_encoding))
print('\treply_to: ' + str(msg.properties.reply_to))
print('\tabsolute_expiry_time: ' +
str(msg.properties.absolute_expiry_time))
print('\tgroup_id: ' + str(msg.properties.group_id))
# Message sequence number in the built-in event hub
print('\tx-opt-sequence-number: ' +
str(msg.annotations['x-opt-sequence-number']))
Enviar mensagens de telemetria (cliente do dispositivo)
Você também pode enviar mensagens de telemetria de um dispositivo usando o AMQP. O dispositivo pode, opcionalmente, fornecer um dicionário de propriedades do aplicativo ou várias propriedades de mensagem, como a ID da mensagem.
Para encaminhar mensagens com base no corpo da mensagem, você precisa definir a propriedade content_type
como application/json;charset=utf-8
. Para saber mais sobre como encaminhar mensagens com base nas propriedades da mensagem ou no corpo da mensagem, confira a documentação da sintaxe de consulta de encaminhamento de mensagens do Hub IoT.
O trecho de código a seguir usa a biblioteca uAMQP no Python para enviar mensagens do dispositivo para a nuvem a partir de um dispositivo.
# ...
# Create a send client for the device-to-cloud send link on the device
operation = '/devices/{device_id}/messages/events'.format(device_id=device_id)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username), urllib.quote_plus(sas_token), hostname, operation)
send_client = uamqp.SendClient(uri, debug=True)
# Set any of the applicable message properties
msg_props = uamqp.message.MessageProperties()
msg_props.message_id = str(uuid.uuid4())
msg_props.creation_time = None
msg_props.correlation_id = None
msg_props.content_type = 'application/json;charset=utf-8'
msg_props.reply_to_group_id = None
msg_props.subject = None
msg_props.user_id = None
msg_props.group_sequence = None
msg_props.to = None
msg_props.content_encoding = None
msg_props.reply_to = None
msg_props.absolute_expiry_time = None
msg_props.group_id = None
# Application properties in the message (if any)
application_properties = { "app_property_key": "app_property_value" }
# Create message
msg_data = b"Your message payload goes here"
message = uamqp.Message(msg_data, properties=msg_props, application_properties=application_properties)
send_client.queue_message(message)
results = send_client.send_all_messages()
for result in results:
if result == uamqp.constants.MessageState.SendFailed:
print result
Observações adicionais
As conexões AMQP podem ser interrompidas devido a uma falha de rede ou à expiração do token de autenticação (gerado no código). O cliente de serviço deve lidar com essas circunstâncias e restabelecer a conexão e os links, caso necessário. Caso um token de autenticação expire, o cliente pode evitar uma queda de conexão de forma proativa renovando o token antes da expiração.
Algumas vezes o cliente deve ser capaz de lidar com redirecionamentos de link de forma correta. Para entender essa operação, consulte a documentação do cliente do AMQP.
Próximas etapas
Para saber mais sobre o protocolo AMQP, consulte a especificação AMQP v1.0.
Para saber mais sobre mensagens do Hub IoT do Azure, consulte: