Komunikacja z centrum IoT przy użyciu protokołu AMQP
Usługa Azure IoT Hub obsługuje protokół 1.0 Advanced Message Queuing Protocol (AMQP) o nazwie OASIS Advanced Message Queuing Protocol w wersji 1.0 w celu dostarczania różnych funkcji za pośrednictwem punktów końcowych dostępnych dla urządzeń i usług. W tym dokumencie opisano używanie klientów amQP do nawiązywania połączenia z centrum IoT w celu korzystania z funkcji usługi IoT Hub.
Klient usługi
Połączenie i uwierzytelnianie w centrum IoT (klient usługi)
Aby nawiązać połączenie z centrum IoT przy użyciu protokołu AMQP, klient może użyć uwierzytelniania opartego na oświadczeniach (CBS) lub prostego uwierzytelniania i warstwy zabezpieczeń (SASL).
Następujące informacje są wymagane dla klienta usługi:
Informacja | Wartość |
---|---|
Nazwa hosta centrum IoT | <iot-hub-name>.azure-devices.net |
Nazwa klucza | service |
Klucz dostępu | Klucz podstawowy lub pomocniczy skojarzony z usługą |
Sygnatura dostępu współdzielonego | Krótkotrwały sygnatura dostępu współdzielonego w następującym formacie: SharedAccessSignature sig={signature-string}&se={expiry}&skn={policyName}&sr={URL-encoded-resourceURI} . Aby uzyskać kod generowania tego podpisu, zobacz Kontrola dostępu do usługi IoT Hub. |
Poniższy fragment kodu używa biblioteki uAMQP w języku Python do nawiązywania połączenia z centrum IoT Hub za pośrednictwem linku nadawcy.
import uamqp
import urllib
import time
# Use generate_sas_token implementation available here:
# https://learn.microsoft.com/azure/iot-hub/iot-hub-devguide-security#sas-token-structure
from helper import generate_sas_token
iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
policy_name = 'service'
access_key = '<primary-or-secondary-key>'
operation = '<operation-link-name>' # example: '/messages/devicebound'
username = '{policy_name}@sas.root.{iot_hub_name}'.format(
iot_hub_name=iot_hub_name, policy_name=policy_name)
sas_token = generate_sas_token(hostname, access_key, policy_name)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
urllib.quote_plus(sas_token), hostname, operation)
# Create a send or receive client
send_client = uamqp.SendClient(uri, debug=True)
receive_client = uamqp.ReceiveClient(uri, debug=True)
Wywoływanie komunikatów chmura-urządzenie (klient usługi)
Aby dowiedzieć się więcej na temat wymiany komunikatów z chmury na urządzenie między usługą a centrum IoT i między urządzeniem a centrum IoT, zobacz Wysyłanie komunikatów z chmury do urządzenia z centrum IoT. Klient usługi używa dwóch linków do wysyłania komunikatów i odbierania opinii dotyczących wcześniej wysyłanych komunikatów z urządzeń, zgodnie z opisem w poniższej tabeli:
Utworzone przez | Typ linku | Ścieżka łącza | opis |
---|---|---|---|
Usługa | Link nadawcy | /messages/devicebound |
Komunikaty z chmury do urządzenia przeznaczone dla urządzeń są wysyłane do tego linku przez usługę. Komunikaty wysyłane za pośrednictwem tego linku mają właściwość To ustawioną na ścieżkę łącza odbiorcy urządzenia docelowego, /devices/<deviceID>/messages/devicebound . |
Usługa | Link odbiorcy | /messages/serviceBound/feedback |
Komunikaty zwrotne dotyczące uzupełniania, odrzucania i porzucania, które pochodzą z urządzeń odebranych na tym linku przez usługę. Aby uzyskać więcej informacji na temat komunikatów opinii, zobacz Wysyłanie komunikatów z chmury do urządzenia z centrum IoT Hub. |
Poniższy fragment kodu przedstawia sposób tworzenia komunikatu z chmury do urządzenia i wysyłania go do urządzenia przy użyciu biblioteki uAMQP w języku Python.
import uuid
# Create a message and set message property 'To' to the device-bound link on device
msg_id = str(uuid.uuid4())
msg_content = b"Message content goes here!"
device_id = '<device-id>'
to = '/devices/{device_id}/messages/devicebound'.format(device_id=device_id)
ack = 'full' # Alternative values are 'positive', 'negative', and 'none'
app_props = {'iothub-ack': ack}
msg_props = uamqp.message.MessageProperties(message_id=msg_id, to=to)
msg = uamqp.Message(msg_content, properties=msg_props,
application_properties=app_props)
# Send the message by using the send client that you created and connected to the IoT hub earlier
send_client.queue_message(msg)
results = send_client.send_all_messages()
# Close the client if it's not needed
send_client.close()
Aby otrzymywać opinie, klient usługi tworzy link odbiorcy. Poniższy fragment kodu przedstawia sposób tworzenia linku przy użyciu biblioteki uAMQP w języku Python:
import json
operation = '/messages/serviceBound/feedback'
# ...
# Re-create the URI by using the preceding feedback path and authenticate it
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
urllib.quote_plus(sas_token), hostname, operation)
receive_client = uamqp.ReceiveClient(uri, debug=True)
batch = receive_client.receive_message_batch(max_batch_size=10)
for msg in batch:
print('received a message')
# Check content_type in message property to identify feedback messages coming from device
if msg.properties.content_type == 'application/vnd.microsoft.iothub.feedback.json':
msg_body_raw = msg.get_data()
msg_body_str = ''.join(msg_body_raw)
msg_body = json.loads(msg_body_str)
print(json.dumps(msg_body, indent=2))
print('******************')
for feedback in msg_body:
print('feedback received')
print('\tstatusCode: ' + str(feedback['statusCode']))
print('\toriginalMessageId: ' + str(feedback['originalMessageId']))
print('\tdeviceId: ' + str(feedback['deviceId']))
print
else:
print('unknown message:', msg.properties.content_type)
Jak pokazano w poprzednim kodzie, komunikat opinii z chmury do urządzenia ma typ zawartości aplikacji/vnd.microsoft.iothub.feedback.json. Właściwości w treści JSON wiadomości można użyć, aby wywnioskować stan dostarczania oryginalnej wiadomości:
Klucz
statusCode
w treści opinii ma jedną z następujących wartości: Powodzenie, Wygaśnięcie, DeliveryCountExceeded, Odrzucone lub Przeczyszczone.Klucz
deviceId
w treści opinii ma identyfikator urządzenia docelowego.Klucz
originalMessageId
w treści opinii ma identyfikator oryginalnego komunikatu z chmury do urządzenia, który został wysłany przez usługę. Ten stan dostarczania umożliwia skorelowanie opinii z komunikatami z chmury do urządzenia.
Odbieranie komunikatów telemetrycznych (klient usługi)
Domyślnie centrum IoT przechowuje pozyskane komunikaty telemetryczne urządzenia w wbudowanym centrum zdarzeń. Klient usługi może odbierać przechowywane zdarzenia przy użyciu protokołu AMQP.
W tym celu klient usługi musi najpierw nawiązać połączenie z punktem końcowym centrum IoT Hub i otrzymać adres przekierowania do wbudowanych centrów zdarzeń. Następnie klient usługi używa podanego adresu, aby nawiązać połączenie z wbudowanym centrum zdarzeń.
W każdym kroku klient musi przedstawić następujące informacje:
Prawidłowe poświadczenia usługi (token sygnatury dostępu współdzielonego usługi).
Dobrze sformatowana ścieżka do partycji grupy odbiorców, z którą zamierza pobierać komunikaty. W przypadku danej grupy odbiorców i identyfikatora partycji ścieżka ma następujący format:
/messages/events/ConsumerGroups/<consumer_group>/Partitions/<partition_id>
(domyślna grupa odbiorców to$Default
).Opcjonalny predykat filtrowania w celu wyznaczenia punktu początkowego w partycji. Ten predykat może być w postaci numeru sekwencji, przesunięcia lub w kolejce znacznika czasu.
Poniższy fragment kodu używa biblioteki uAMQP w języku Python do zademonstrowania powyższych kroków:
import json
import uamqp
import urllib
import time
# Use the generate_sas_token implementation that's available here: https://learn.microsoft.com/azure/iot-hub/iot-hub-devguide-security#sas-token-structure
from helper import generate_sas_token
iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
policy_name = 'service'
access_key = '<primary-or-secondary-key>'
operation = '/messages/events/ConsumerGroups/{consumer_group}/Partitions/{p_id}'.format(
consumer_group='$Default', p_id=0)
username = '{policy_name}@sas.root.{iot_hub_name}'.format(
policy_name=policy_name, iot_hub_name=iot_hub_name)
sas_token = generate_sas_token(hostname, access_key, policy_name)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
urllib.quote_plus(sas_token), hostname, operation)
# Optional filtering predicates can be specified by using endpoint_filter
# Valid predicates include:
# - amqp.annotation.x-opt-sequence-number
# - amqp.annotation.x-opt-offset
# - amqp.annotation.x-opt-enqueued-time
# Set endpoint_filter variable to None if no filter is needed
endpoint_filter = b'amqp.annotation.x-opt-sequence-number > 2995'
# Helper function to set the filtering predicate on the source URI
def set_endpoint_filter(uri, endpoint_filter=''):
source_uri = uamqp.address.Source(uri)
source_uri.set_filter(endpoint_filter)
return source_uri
receive_client = uamqp.ReceiveClient(
set_endpoint_filter(uri, endpoint_filter), debug=True)
try:
batch = receive_client.receive_message_batch(max_batch_size=5)
except uamqp.errors.LinkRedirect as redirect:
# Once a redirect error is received, close the original client and recreate a new one to the re-directed address
receive_client.close()
sas_auth = uamqp.authentication.SASTokenAuth.from_shared_access_key(
redirect.address, policy_name, access_key)
receive_client = uamqp.ReceiveClient(set_endpoint_filter(
redirect.address, endpoint_filter), auth=sas_auth, debug=True)
# Start receiving messages in batches
batch = receive_client.receive_message_batch(max_batch_size=5)
for msg in batch:
print('*** received a message ***')
print(''.join(msg.get_data()))
print('\t: ' + str(msg.annotations['x-opt-sequence-number']))
print('\t: ' + str(msg.annotations['x-opt-offset']))
print('\t: ' + str(msg.annotations['x-opt-enqueued-time']))
W przypadku danego identyfikatora urządzenia centrum IoT używa skrótu identyfikatora urządzenia, aby określić, w której partycji mają być przechowywane komunikaty. Powyższy fragment kodu pokazuje, jak zdarzenia są odbierane z jednej takiej partycji. Należy jednak pamiętać, że typowa aplikacja często musi pobierać zdarzenia przechowywane we wszystkich partycjach centrum zdarzeń.
Klient urządzenia
Połączenie i uwierzytelnianie w centrum IoT (klient urządzenia)
Aby nawiązać połączenie z centrum IoT przy użyciu protokołu AMQP, urządzenie może używać oświadczeń opartych na zabezpieczeniach (CBS) lub uwierzytelniania Simple Authentication and Security Layer (SASL).
Następujące informacje są wymagane dla klienta urządzenia:
Informacja | Wartość |
---|---|
Nazwa hosta centrum IoT | <iot-hub-name>.azure-devices.net |
Klucz dostępu | Klucz podstawowy lub pomocniczy skojarzony z urządzeniem |
Sygnatura dostępu współdzielonego | Krótkotrwały sygnatura dostępu współdzielonego w następującym formacie: SharedAccessSignature sig={signature-string}&se={expiry}&skn={policyName}&sr={URL-encoded-resourceURI} . Aby uzyskać kod generowania tego podpisu, zobacz Kontrola dostępu do usługi IoT Hub. |
Poniższy fragment kodu używa biblioteki uAMQP w języku Python do nawiązywania połączenia z centrum IoT Hub za pośrednictwem linku nadawcy.
import uamqp
import urllib
import uuid
# Use generate_sas_token implementation available here:
# https://learn.microsoft.com/azure/iot-hub/iot-hub-devguide-security#sas-token-structure
from helper import generate_sas_token
iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
device_id = '<device-id>'
access_key = '<primary-or-secondary-key>'
username = '{device_id}@sas.{iot_hub_name}'.format(
device_id=device_id, iot_hub_name=iot_hub_name)
sas_token = generate_sas_token('{hostname}/devices/{device_id}'.format(
hostname=hostname, device_id=device_id), access_key, None)
# e.g., '/devices/{device_id}/messages/devicebound'
operation = '<operation-link-name>'
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
urllib.quote_plus(sas_token), hostname, operation)
receive_client = uamqp.ReceiveClient(uri, debug=True)
send_client = uamqp.SendClient(uri, debug=True)
Następujące ścieżki linków są obsługiwane jako operacje urządzeń:
Utworzone przez | Typ linku | Ścieżka łącza | opis |
---|---|---|---|
Urządzenia | Link odbiorcy | /devices/<deviceID>/messages/devicebound |
Komunikaty z chmury do urządzenia przeznaczone dla urządzeń są odbierane na tym linku przez każde urządzenie docelowe. |
Urządzenia | Link nadawcy | /devices/<deviceID>/messages/events |
Komunikaty przesyłane z urządzenia do chmury wysyłane z urządzenia są wysyłane za pośrednictwem tego linku. |
Urządzenia | Link nadawcy | /messages/serviceBound/feedback |
Opinie dotyczące komunikatów z chmury do urządzenia wysyłane do usługi za pośrednictwem tego linku przez urządzenia. |
Odbieranie poleceń z chmury do urządzenia (klient urządzenia)
Polecenia z chmury do urządzenia wysyłane do urządzeń są dostarczane za pomocą linku /devices/<deviceID>/messages/devicebound
. Urządzenia mogą odbierać te komunikaty w partiach i używać ładunku danych komunikatu, właściwości komunikatów, adnotacji lub właściwości aplikacji w wiadomości zgodnie z potrzebami.
Poniższy fragment kodu używa biblioteki uAMQP w języku Python do odbierania komunikatów z chmury do urządzenia.
# ...
# Create a receive client for the cloud-to-device receive link on the device
operation = '/devices/{device_id}/messages/devicebound'.format(
device_id=device_id)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
urllib.quote_plus(sas_token), hostname, operation)
receive_client = uamqp.ReceiveClient(uri, debug=True)
while True:
batch = receive_client.receive_message_batch(max_batch_size=5)
for msg in batch:
print('*** received a message ***')
print(''.join(msg.get_data()))
# Property 'to' is set to: '/devices/device1/messages/devicebound',
print('\tto: ' + str(msg.properties.to))
# Property 'message_id' is set to value provided by the service
print('\tmessage_id: ' + str(msg.properties.message_id))
# Other properties are present if they were provided by the service
print('\tcreation_time: ' + str(msg.properties.creation_time))
print('\tcorrelation_id: ' +
str(msg.properties.correlation_id))
print('\tcontent_type: ' + str(msg.properties.content_type))
print('\treply_to_group_id: ' +
str(msg.properties.reply_to_group_id))
print('\tsubject: ' + str(msg.properties.subject))
print('\tuser_id: ' + str(msg.properties.user_id))
print('\tgroup_sequence: ' +
str(msg.properties.group_sequence))
print('\tcontent_encoding: ' +
str(msg.properties.content_encoding))
print('\treply_to: ' + str(msg.properties.reply_to))
print('\tabsolute_expiry_time: ' +
str(msg.properties.absolute_expiry_time))
print('\tgroup_id: ' + str(msg.properties.group_id))
# Message sequence number in the built-in event hub
print('\tx-opt-sequence-number: ' +
str(msg.annotations['x-opt-sequence-number']))
Wysyłanie komunikatów telemetrycznych (klient urządzenia)
Możesz również wysyłać komunikaty telemetryczne z urządzenia przy użyciu protokołu AMQP. Urządzenie może opcjonalnie podać słownik właściwości aplikacji lub różne właściwości komunikatu, takie jak identyfikator komunikatu.
Aby kierować komunikaty na podstawie treści komunikatu content_type
, należy ustawić właściwość na application/json;charset=utf-8
. Aby dowiedzieć się więcej na temat routingu komunikatów na podstawie właściwości komunikatów lub treści komunikatów, zobacz dokumentację składni zapytania routingu komunikatów usługi IoT Hub.
Poniższy fragment kodu używa biblioteki uAMQP w języku Python do wysyłania komunikatów z urządzenia do chmury z urządzenia.
# ...
# Create a send client for the device-to-cloud send link on the device
operation = '/devices/{device_id}/messages/events'.format(device_id=device_id)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username), urllib.quote_plus(sas_token), hostname, operation)
send_client = uamqp.SendClient(uri, debug=True)
# Set any of the applicable message properties
msg_props = uamqp.message.MessageProperties()
msg_props.message_id = str(uuid.uuid4())
msg_props.creation_time = None
msg_props.correlation_id = None
msg_props.content_type = 'application/json;charset=utf-8'
msg_props.reply_to_group_id = None
msg_props.subject = None
msg_props.user_id = None
msg_props.group_sequence = None
msg_props.to = None
msg_props.content_encoding = None
msg_props.reply_to = None
msg_props.absolute_expiry_time = None
msg_props.group_id = None
# Application properties in the message (if any)
application_properties = { "app_property_key": "app_property_value" }
# Create message
msg_data = b"Your message payload goes here"
message = uamqp.Message(msg_data, properties=msg_props, application_properties=application_properties)
send_client.queue_message(message)
results = send_client.send_all_messages()
for result in results:
if result == uamqp.constants.MessageState.SendFailed:
print result
Dodatkowe uwagi
Połączenia AMQP mogą być zakłócane z powodu usterki sieci lub wygaśnięcia tokenu uwierzytelniania (wygenerowanego w kodzie). Klient usługi musi obsługiwać te okoliczności i w razie potrzeby ponownie opublikować połączenie i łącza. Jeśli token uwierzytelniania wygaśnie, klient może uniknąć porzucania połączenia, proaktywnie odnawiając token przed jego wygaśnięciem.
Klient musi od czasu do czasu mieć możliwość poprawnego obsługi przekierowań linków. Aby zrozumieć taką operację, zapoznaj się z dokumentacją klienta protokołu AMQP.
Następne kroki
Aby dowiedzieć się więcej o protokole AMQP, zobacz specyfikację protokołu AMQP w wersji 1.0.
Aby dowiedzieć się więcej o komunikatach usługi IoT Hub, zobacz: