Komunikace s centrem IoT pomocí protokolu AMQP
Azure IoT Hub podporuje PROTOKOL AMQP (Advanced Message Queuing Protocol) OASIS verze 1.0 , aby poskytoval celou řadu funkcí prostřednictvím koncových bodů směřujících k zařízením a službám. Tento dokument popisuje použití klientů AMQP pro připojení k centru IoT k používání funkcí ioT Hubu.
Klient služby
Připojení a ověření ve službě IoT Hub (klient služby)
Pokud se chcete připojit k centru IoT pomocí AMQP, klient může použít ověřování ZALOŽENÉ na deklaracích identit (CBS) nebo ověřování SASL (Simple Authentication and Security Layer).
Klient služby vyžaduje následující informace:
Informační | Hodnota |
---|---|
Název hostitele služby IoT Hub | <iot-hub-name>.azure-devices.net |
Název klíče | service |
Přístupový klíč | Primární nebo sekundární klíč přidružený ke službě |
Sdílený přístupový podpis | Krátkodobý sdílený přístupový podpis v následujícím formátu: SharedAccessSignature sig={signature-string}&se={expiry}&skn={policyName}&sr={URL-encoded-resourceURI} . Pokud chcete získat kód pro generování tohoto podpisu, přečtěte si téma Řízení přístupu ke službě IoT Hub. |
Následující fragment kódu používá knihovnu uAMQP v Pythonu k připojení k centru IoT prostřednictvím odkazu odesílatele.
import uamqp
import urllib
import time
# Use generate_sas_token implementation available here:
# https://learn.microsoft.com/azure/iot-hub/iot-hub-devguide-security#sas-token-structure
from helper import generate_sas_token
iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
policy_name = 'service'
access_key = '<primary-or-secondary-key>'
operation = '<operation-link-name>' # example: '/messages/devicebound'
username = '{policy_name}@sas.root.{iot_hub_name}'.format(
iot_hub_name=iot_hub_name, policy_name=policy_name)
sas_token = generate_sas_token(hostname, access_key, policy_name)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
urllib.quote_plus(sas_token), hostname, operation)
# Create a send or receive client
send_client = uamqp.SendClient(uri, debug=True)
receive_client = uamqp.ReceiveClient(uri, debug=True)
Vyvolání zpráv z cloudu na zařízení (klient služby)
Další informace o výměně zpráv typu cloud-zařízení mezi službou a centrem IoT a mezi zařízením a centrem IoT najdete v tématu Odesílání zpráv typu cloud-zařízení ze služby IoT Hub. Klient služby používá dva odkazy k odesílání zpráv a přijímání zpětné vazby pro dříve odeslané zprávy ze zařízení, jak je popsáno v následující tabulce:
Vytvořil(a) | Typ odkazu | Cesta odkazu | Popis |
---|---|---|---|
Služba | Odkaz odesílatele | /messages/devicebound |
Zprávy typu cloud-zařízení určené pro zařízení se odesílají na tento odkaz službou. Zprávy odeslané přes tento odkaz mají vlastnost To nastavena na cestu odkazu příjemce cílového zařízení, /devices/<deviceID>/messages/devicebound . |
Služba | Odkaz přijímače | /messages/serviceBound/feedback |
Zprávy o ukončení, zamítnutí a opuštění zpětné vazby, které pocházejí ze zařízení přijatých na tomto odkazu službou. Další informace o zprávách zpětné vazby najdete v tématu Odesílání zpráv typu cloud-zařízení z centra IoT. |
Následující fragment kódu ukazuje, jak vytvořit zprávu typu cloud-zařízení a odeslat ji do zařízení pomocí knihovny uAMQP v Pythonu.
import uuid
# Create a message and set message property 'To' to the device-bound link on device
msg_id = str(uuid.uuid4())
msg_content = b"Message content goes here!"
device_id = '<device-id>'
to = '/devices/{device_id}/messages/devicebound'.format(device_id=device_id)
ack = 'full' # Alternative values are 'positive', 'negative', and 'none'
app_props = {'iothub-ack': ack}
msg_props = uamqp.message.MessageProperties(message_id=msg_id, to=to)
msg = uamqp.Message(msg_content, properties=msg_props,
application_properties=app_props)
# Send the message by using the send client that you created and connected to the IoT hub earlier
send_client.queue_message(msg)
results = send_client.send_all_messages()
# Close the client if it's not needed
send_client.close()
Aby klient služby získal zpětnou vazbu, vytvoří odkaz příjemce. Následující fragment kódu ukazuje, jak vytvořit odkaz pomocí knihovny uAMQP v Pythonu:
import json
operation = '/messages/serviceBound/feedback'
# ...
# Re-create the URI by using the preceding feedback path and authenticate it
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
urllib.quote_plus(sas_token), hostname, operation)
receive_client = uamqp.ReceiveClient(uri, debug=True)
batch = receive_client.receive_message_batch(max_batch_size=10)
for msg in batch:
print('received a message')
# Check content_type in message property to identify feedback messages coming from device
if msg.properties.content_type == 'application/vnd.microsoft.iothub.feedback.json':
msg_body_raw = msg.get_data()
msg_body_str = ''.join(msg_body_raw)
msg_body = json.loads(msg_body_str)
print(json.dumps(msg_body, indent=2))
print('******************')
for feedback in msg_body:
print('feedback received')
print('\tstatusCode: ' + str(feedback['statusCode']))
print('\toriginalMessageId: ' + str(feedback['originalMessageId']))
print('\tdeviceId: ' + str(feedback['deviceId']))
print
else:
print('unknown message:', msg.properties.content_type)
Jak je znázorněno v předchozím kódu, zpráva zpětné vazby typu cloud-zařízení má typ obsahu aplikace/vnd.microsoft.iothub.feedback.json. Pomocí vlastností v textu JSON zprávy můžete odvodit stav doručení původní zprávy:
Klíč
statusCode
v textu zpětné vazby má jednu z následujících hodnot: Success, Expired, DeliveryCountExceeded, Rejected nebo Purged.Klíč
deviceId
v textu zpětné vazby má ID cílového zařízení.Klíč
originalMessageId
v textu zpětné vazby má ID původní zprávy typu cloud-zařízení, která byla odeslána službou. Tento stav doručení můžete použít ke korelaci zpětné vazby se zprávami typu cloud-zařízení.
Příjem zpráv telemetrie (klient služby)
Ve výchozím nastavení vaše centrum IoT ukládá ingestované zprávy telemetrie zařízení do integrovaného centra událostí. Klient služby může k příjmu uložených událostí použít protokol AMQP.
Pro tento účel se klient služby musí nejprve připojit ke koncovému bodu služby IoT Hub a přijmout adresu přesměrování do integrovaných center událostí. Klient služby pak použije zadanou adresu pro připojení k integrovanému centru událostí.
V každém kroku musí klient předložit následující informace:
Platné přihlašovací údaje služby (token sdíleného přístupového podpisu služby).
Dobře naformátovaná cesta k oddílu skupiny příjemců, ze kterého hodlá načíst zprávy. Pro danou skupinu příjemců a ID oddílu má cesta následující formát:
/messages/events/ConsumerGroups/<consumer_group>/Partitions/<partition_id>
(výchozí skupina příjemců je$Default
).Volitelný predikát filtrování pro určení výchozího bodu v oddílu. Tento predikát může být ve formě pořadového čísla, posunu nebo výčtu časového razítka.
Následující fragment kódu používá knihovnu uAMQP v Pythonu k předvedení předchozích kroků:
import json
import uamqp
import urllib
import time
# Use the generate_sas_token implementation that's available here: https://learn.microsoft.com/azure/iot-hub/iot-hub-devguide-security#sas-token-structure
from helper import generate_sas_token
iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
policy_name = 'service'
access_key = '<primary-or-secondary-key>'
operation = '/messages/events/ConsumerGroups/{consumer_group}/Partitions/{p_id}'.format(
consumer_group='$Default', p_id=0)
username = '{policy_name}@sas.root.{iot_hub_name}'.format(
policy_name=policy_name, iot_hub_name=iot_hub_name)
sas_token = generate_sas_token(hostname, access_key, policy_name)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
urllib.quote_plus(sas_token), hostname, operation)
# Optional filtering predicates can be specified by using endpoint_filter
# Valid predicates include:
# - amqp.annotation.x-opt-sequence-number
# - amqp.annotation.x-opt-offset
# - amqp.annotation.x-opt-enqueued-time
# Set endpoint_filter variable to None if no filter is needed
endpoint_filter = b'amqp.annotation.x-opt-sequence-number > 2995'
# Helper function to set the filtering predicate on the source URI
def set_endpoint_filter(uri, endpoint_filter=''):
source_uri = uamqp.address.Source(uri)
source_uri.set_filter(endpoint_filter)
return source_uri
receive_client = uamqp.ReceiveClient(
set_endpoint_filter(uri, endpoint_filter), debug=True)
try:
batch = receive_client.receive_message_batch(max_batch_size=5)
except uamqp.errors.LinkRedirect as redirect:
# Once a redirect error is received, close the original client and recreate a new one to the re-directed address
receive_client.close()
sas_auth = uamqp.authentication.SASTokenAuth.from_shared_access_key(
redirect.address, policy_name, access_key)
receive_client = uamqp.ReceiveClient(set_endpoint_filter(
redirect.address, endpoint_filter), auth=sas_auth, debug=True)
# Start receiving messages in batches
batch = receive_client.receive_message_batch(max_batch_size=5)
for msg in batch:
print('*** received a message ***')
print(''.join(msg.get_data()))
print('\t: ' + str(msg.annotations['x-opt-sequence-number']))
print('\t: ' + str(msg.annotations['x-opt-offset']))
print('\t: ' + str(msg.annotations['x-opt-enqueued-time']))
Pro dané ID zařízení používá Centrum IoT hodnotu hash ID zařízení k určení, do kterého oddílu se mají ukládat zprávy. Předchozí fragment kódu ukazuje, jak se události přijímají z jednoho takového oddílu. Upozorňujeme však, že typická aplikace často potřebuje načíst události, které jsou uložené ve všech oddílech centra událostí.
Klient zařízení
Připojení a ověření ve službě IoT Hub (klient zařízení)
Pokud se chcete připojit k centru IoT pomocí AMQP, může zařízení používat ověřování ZALOŽENÉ na deklaracích identit (CBS) nebo ověřování SASL (Simple Authentication and Security Layer).
Klient zařízení vyžaduje následující informace:
Informační | Hodnota |
---|---|
Název hostitele služby IoT Hub | <iot-hub-name>.azure-devices.net |
Přístupový klíč | Primární nebo sekundární klíč přidružený k zařízení |
Sdílený přístupový podpis | Krátkodobý sdílený přístupový podpis v následujícím formátu: SharedAccessSignature sig={signature-string}&se={expiry}&skn={policyName}&sr={URL-encoded-resourceURI} . Pokud chcete získat kód pro generování tohoto podpisu, přečtěte si téma Řízení přístupu ke službě IoT Hub. |
Následující fragment kódu používá knihovnu uAMQP v Pythonu k připojení k centru IoT prostřednictvím odkazu odesílatele.
import uamqp
import urllib
import uuid
# Use generate_sas_token implementation available here:
# https://learn.microsoft.com/azure/iot-hub/iot-hub-devguide-security#sas-token-structure
from helper import generate_sas_token
iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
device_id = '<device-id>'
access_key = '<primary-or-secondary-key>'
username = '{device_id}@sas.{iot_hub_name}'.format(
device_id=device_id, iot_hub_name=iot_hub_name)
sas_token = generate_sas_token('{hostname}/devices/{device_id}'.format(
hostname=hostname, device_id=device_id), access_key, None)
# e.g., '/devices/{device_id}/messages/devicebound'
operation = '<operation-link-name>'
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
urllib.quote_plus(sas_token), hostname, operation)
receive_client = uamqp.ReceiveClient(uri, debug=True)
send_client = uamqp.SendClient(uri, debug=True)
Jako operace zařízení se podporují následující cesty propojení:
Vytvořil(a) | Typ odkazu | Cesta odkazu | Popis |
---|---|---|---|
Zařízení | Odkaz přijímače | /devices/<deviceID>/messages/devicebound |
Zprávy typu cloud-zařízení určené pro zařízení se na tomto odkazu přijímají jednotlivými cílovými zařízeními. |
Zařízení | Odkaz odesílatele | /devices/<deviceID>/messages/events |
Zprávy ze zařízení do cloudu, které se odesílají ze zařízení, se posílají přes tento odkaz. |
Zařízení | Odkaz odesílatele | /messages/serviceBound/feedback |
Zpětná vazba ke zprávě typu cloud-zařízení odeslaná službě prostřednictvím tohoto odkazu zařízeními. |
Příjem příkazů cloud-zařízení (klient zařízení)
Příkazy typu cloud-zařízení, které se odesílají do zařízení, přicházejí na /devices/<deviceID>/messages/devicebound
odkaz. Zařízení mohou tyto zprávy přijímat v dávkách a podle potřeby používat datovou část dat zprávy, vlastnosti zprávy, poznámky nebo vlastnosti aplikace.
Následující fragment kódu používá knihovnu uAMQP v Pythonu) k příjmu zpráv typu cloud-zařízení zařízením.
# ...
# Create a receive client for the cloud-to-device receive link on the device
operation = '/devices/{device_id}/messages/devicebound'.format(
device_id=device_id)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
urllib.quote_plus(sas_token), hostname, operation)
receive_client = uamqp.ReceiveClient(uri, debug=True)
while True:
batch = receive_client.receive_message_batch(max_batch_size=5)
for msg in batch:
print('*** received a message ***')
print(''.join(msg.get_data()))
# Property 'to' is set to: '/devices/device1/messages/devicebound',
print('\tto: ' + str(msg.properties.to))
# Property 'message_id' is set to value provided by the service
print('\tmessage_id: ' + str(msg.properties.message_id))
# Other properties are present if they were provided by the service
print('\tcreation_time: ' + str(msg.properties.creation_time))
print('\tcorrelation_id: ' +
str(msg.properties.correlation_id))
print('\tcontent_type: ' + str(msg.properties.content_type))
print('\treply_to_group_id: ' +
str(msg.properties.reply_to_group_id))
print('\tsubject: ' + str(msg.properties.subject))
print('\tuser_id: ' + str(msg.properties.user_id))
print('\tgroup_sequence: ' +
str(msg.properties.group_sequence))
print('\tcontent_encoding: ' +
str(msg.properties.content_encoding))
print('\treply_to: ' + str(msg.properties.reply_to))
print('\tabsolute_expiry_time: ' +
str(msg.properties.absolute_expiry_time))
print('\tgroup_id: ' + str(msg.properties.group_id))
# Message sequence number in the built-in event hub
print('\tx-opt-sequence-number: ' +
str(msg.annotations['x-opt-sequence-number']))
Odesílání zpráv telemetrie (klient zařízení)
Pomocí AMQP můžete také odesílat telemetrické zprávy ze zařízení. Zařízení může volitelně poskytnout slovník vlastností aplikace nebo různé vlastnosti zprávy, například ID zprávy.
Chcete-li směrovat zprávy na základě textu zprávy, je nutné nastavit content_type
vlastnost na application/json;charset=utf-8
hodnotu . Další informace o směrování zpráv na základě vlastností zprávy nebo textu zprávy najdete v dokumentaci k syntaxi dotazů směrování zpráv služby IoT Hub.
Následující fragment kódu používá knihovnu uAMQP v Pythonu k odesílání zpráv typu zařízení-cloud ze zařízení.
# ...
# Create a send client for the device-to-cloud send link on the device
operation = '/devices/{device_id}/messages/events'.format(device_id=device_id)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username), urllib.quote_plus(sas_token), hostname, operation)
send_client = uamqp.SendClient(uri, debug=True)
# Set any of the applicable message properties
msg_props = uamqp.message.MessageProperties()
msg_props.message_id = str(uuid.uuid4())
msg_props.creation_time = None
msg_props.correlation_id = None
msg_props.content_type = 'application/json;charset=utf-8'
msg_props.reply_to_group_id = None
msg_props.subject = None
msg_props.user_id = None
msg_props.group_sequence = None
msg_props.to = None
msg_props.content_encoding = None
msg_props.reply_to = None
msg_props.absolute_expiry_time = None
msg_props.group_id = None
# Application properties in the message (if any)
application_properties = { "app_property_key": "app_property_value" }
# Create message
msg_data = b"Your message payload goes here"
message = uamqp.Message(msg_data, properties=msg_props, application_properties=application_properties)
send_client.queue_message(message)
results = send_client.send_all_messages()
for result in results:
if result == uamqp.constants.MessageState.SendFailed:
print result
Další poznámky
Připojení AMQP můžou být narušena kvůli výpadku sítě nebo vypršení platnosti ověřovacího tokenu (vygenerovaného v kódu). Klient služby musí tyto okolnosti zpracovat a v případě potřeby znovu obnovit připojení a propojení. Pokud vyprší platnost ověřovacího tokenu, klient se může vyhnout poklesu připojení tím, že token proaktivně obnoví před vypršením jeho platnosti.
Klient musí občas umět správně zpracovat přesměrování odkazů. Informace o takové operaci najdete v dokumentaci klienta AMQP.
Další kroky
Další informace o protokolu AMQP najdete ve specifikaci AMQP v1.0.
Další informace o zasílání zpráv ve službě IoT Hub najdete tady: