Communiceren met uw IoT-hub met behulp van het AMQP-protocol
Azure IoT Hub ondersteunt OASIS Advanced Message Queuing Protocol (AMQP) versie 1.0 om verschillende functies te leveren via apparaatgerichte en servicegerichte eindpunten. In dit document wordt het gebruik van AMQP-clients beschreven om verbinding te maken met een IoT-hub om ioT Hub-functionaliteit te gebruiken.
Serviceclient
Verbinding maken en verifiëren bij een IoT-hub (serviceclient)
Als u verbinding wilt maken met een IoT-hub met AMQP, kan een client gebruikmaken van de op claims gebaseerde beveiliging (CBS) of SASL-verificatie (Simple Authentication and Security Layer).
De volgende informatie is vereist voor de serviceclient:
Gegevens | Weergegeven als |
---|---|
Hostnaam van IoT Hub | <iot-hub-name>.azure-devices.net |
Sleutelnaam | service |
Toegangssleutel | Een primaire of secundaire sleutel die is gekoppeld aan de service |
Shared Access Signature | Een handtekening voor gedeelde toegang met een korte levensduur in de volgende indeling: SharedAccessSignature sig={signature-string}&se={expiry}&skn={policyName}&sr={URL-encoded-resourceURI} . Zie Toegang tot IoT Hub beheren om de code voor het genereren van deze handtekening op te halen. |
In het volgende codefragment wordt de uAMQP-bibliotheek in Python gebruikt om via een afzenderkoppeling verbinding te maken met een IoT-hub.
import uamqp
import urllib
import time
# Use generate_sas_token implementation available here:
# https://learn.microsoft.com/azure/iot-hub/iot-hub-devguide-security#sas-token-structure
from helper import generate_sas_token
iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
policy_name = 'service'
access_key = '<primary-or-secondary-key>'
operation = '<operation-link-name>' # example: '/messages/devicebound'
username = '{policy_name}@sas.root.{iot_hub_name}'.format(
iot_hub_name=iot_hub_name, policy_name=policy_name)
sas_token = generate_sas_token(hostname, access_key, policy_name)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
urllib.quote_plus(sas_token), hostname, operation)
# Create a send or receive client
send_client = uamqp.SendClient(uri, debug=True)
receive_client = uamqp.ReceiveClient(uri, debug=True)
Cloud-naar-apparaat-berichten aanroepen (serviceclient)
Zie Cloud-naar-Apparaat-berichten verzenden vanuit uw IoT-hub voor meer informatie over de uitwisseling van cloud-naar-apparaatberichten tussen de service en de IoT-hub en tussen het apparaat en de IoT-hub. De serviceclient gebruikt twee koppelingen om berichten te verzenden en feedback te ontvangen voor eerder verzonden berichten van apparaten, zoals beschreven in de volgende tabel:
Gemaakt door | Type koppeling | Koppelingspad | Beschrijving |
---|---|---|---|
Service | Koppeling naar afzender | /messages/devicebound |
Cloud-naar-apparaat-berichten die zijn bestemd voor apparaten, worden door de service naar deze koppeling verzonden. Berichten die via deze koppeling worden verzonden, hebben hun To eigenschap ingesteld op het pad van de ontvanger van het doelapparaat, /devices/<deviceID>/messages/devicebound . |
Service | Ontvangerkoppeling | /messages/serviceBound/feedback |
Voltooiings-, afwijzings- en afbrekingsfeedbackberichten die afkomstig zijn van apparaten die op deze koppeling door de service zijn ontvangen. Zie Cloud-naar-apparaat-berichten verzenden vanuit een IoT-hub voor meer informatie over feedbackberichten. |
Het volgende codefragment laat zien hoe u een cloud-naar-apparaat-bericht maakt en verzendt naar een apparaat met behulp van de uAMQP-bibliotheek in Python.
import uuid
# Create a message and set message property 'To' to the device-bound link on device
msg_id = str(uuid.uuid4())
msg_content = b"Message content goes here!"
device_id = '<device-id>'
to = '/devices/{device_id}/messages/devicebound'.format(device_id=device_id)
ack = 'full' # Alternative values are 'positive', 'negative', and 'none'
app_props = {'iothub-ack': ack}
msg_props = uamqp.message.MessageProperties(message_id=msg_id, to=to)
msg = uamqp.Message(msg_content, properties=msg_props,
application_properties=app_props)
# Send the message by using the send client that you created and connected to the IoT hub earlier
send_client.queue_message(msg)
results = send_client.send_all_messages()
# Close the client if it's not needed
send_client.close()
Om feedback te ontvangen, maakt de serviceclient een ontvangerkoppeling. Het volgende codefragment laat zien hoe u een koppeling maakt met behulp van de uAMQP-bibliotheek in Python:
import json
operation = '/messages/serviceBound/feedback'
# ...
# Re-create the URI by using the preceding feedback path and authenticate it
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
urllib.quote_plus(sas_token), hostname, operation)
receive_client = uamqp.ReceiveClient(uri, debug=True)
batch = receive_client.receive_message_batch(max_batch_size=10)
for msg in batch:
print('received a message')
# Check content_type in message property to identify feedback messages coming from device
if msg.properties.content_type == 'application/vnd.microsoft.iothub.feedback.json':
msg_body_raw = msg.get_data()
msg_body_str = ''.join(msg_body_raw)
msg_body = json.loads(msg_body_str)
print(json.dumps(msg_body, indent=2))
print('******************')
for feedback in msg_body:
print('feedback received')
print('\tstatusCode: ' + str(feedback['statusCode']))
print('\toriginalMessageId: ' + str(feedback['originalMessageId']))
print('\tdeviceId: ' + str(feedback['deviceId']))
print
else:
print('unknown message:', msg.properties.content_type)
Zoals in de voorgaande code wordt weergegeven, heeft een feedbackbericht van de cloud naar het apparaat een inhoudstype van de toepassing/vnd.microsoft.iothub.feedback.json. U kunt de eigenschappen in de JSON-hoofdtekst van het bericht gebruiken om de bezorgingsstatus van het oorspronkelijke bericht af te stellen:
De sleutel
statusCode
in de feedbacktekst heeft een van de volgende waarden: Success, Expired, DeliveryCountExceededed, Rejected of Purged.De sleutel
deviceId
in de hoofdtekst van de feedback heeft de id van het doelapparaat.De sleutel
originalMessageId
in de hoofdtekst van de feedback heeft de id van het oorspronkelijke cloud-naar-apparaat-bericht dat door de service is verzonden. U kunt deze bezorgingsstatus gebruiken om feedback te correleren met cloud-naar-apparaat-berichten.
Telemetrieberichten ontvangen (serviceclient)
Standaard worden in uw IoT-hub opgenomen telemetrieberichten van apparaten opgeslagen in een ingebouwde Event Hub. Uw serviceclient kan het AMQP-protocol gebruiken om de opgeslagen gebeurtenissen te ontvangen.
Hiervoor moet de serviceclient eerst verbinding maken met het IoT Hub-eindpunt en een omleidingsadres ontvangen naar de ingebouwde Event Hubs. De serviceclient gebruikt vervolgens het opgegeven adres om verbinding te maken met de ingebouwde Event Hub.
In elke stap moet de client de volgende stukjes informatie presenteren:
Geldige servicereferenties (shared access signature-token van de service).
Een goed opgemaakt pad naar de partitie van de consumentengroep waaruit berichten moeten worden opgehaald. Voor een bepaalde consumentengroep en partitie-id heeft het pad de volgende indeling:
/messages/events/ConsumerGroups/<consumer_group>/Partitions/<partition_id>
(de standaardconsumergroep is$Default
).Een optioneel filterpredicaat om een beginpunt in de partitie aan te wijzen. Dit predicaat kan de vorm hebben van een volgnummer, verschuiving of enqueued tijdstempel.
In het volgende codefragment wordt de uAMQP-bibliotheek in Python gebruikt om de voorgaande stappen te demonstreren:
import json
import uamqp
import urllib
import time
# Use the generate_sas_token implementation that's available here: https://learn.microsoft.com/azure/iot-hub/iot-hub-devguide-security#sas-token-structure
from helper import generate_sas_token
iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
policy_name = 'service'
access_key = '<primary-or-secondary-key>'
operation = '/messages/events/ConsumerGroups/{consumer_group}/Partitions/{p_id}'.format(
consumer_group='$Default', p_id=0)
username = '{policy_name}@sas.root.{iot_hub_name}'.format(
policy_name=policy_name, iot_hub_name=iot_hub_name)
sas_token = generate_sas_token(hostname, access_key, policy_name)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
urllib.quote_plus(sas_token), hostname, operation)
# Optional filtering predicates can be specified by using endpoint_filter
# Valid predicates include:
# - amqp.annotation.x-opt-sequence-number
# - amqp.annotation.x-opt-offset
# - amqp.annotation.x-opt-enqueued-time
# Set endpoint_filter variable to None if no filter is needed
endpoint_filter = b'amqp.annotation.x-opt-sequence-number > 2995'
# Helper function to set the filtering predicate on the source URI
def set_endpoint_filter(uri, endpoint_filter=''):
source_uri = uamqp.address.Source(uri)
source_uri.set_filter(endpoint_filter)
return source_uri
receive_client = uamqp.ReceiveClient(
set_endpoint_filter(uri, endpoint_filter), debug=True)
try:
batch = receive_client.receive_message_batch(max_batch_size=5)
except uamqp.errors.LinkRedirect as redirect:
# Once a redirect error is received, close the original client and recreate a new one to the re-directed address
receive_client.close()
sas_auth = uamqp.authentication.SASTokenAuth.from_shared_access_key(
redirect.address, policy_name, access_key)
receive_client = uamqp.ReceiveClient(set_endpoint_filter(
redirect.address, endpoint_filter), auth=sas_auth, debug=True)
# Start receiving messages in batches
batch = receive_client.receive_message_batch(max_batch_size=5)
for msg in batch:
print('*** received a message ***')
print(''.join(msg.get_data()))
print('\t: ' + str(msg.annotations['x-opt-sequence-number']))
print('\t: ' + str(msg.annotations['x-opt-offset']))
print('\t: ' + str(msg.annotations['x-opt-enqueued-time']))
Voor een bepaalde apparaat-id gebruikt de IoT-hub een hash van de apparaat-id om te bepalen in welke partitie de berichten moeten worden opgeslagen. Het voorgaande codefragment laat zien hoe gebeurtenissen worden ontvangen van één dergelijke partitie. Houd er echter rekening mee dat een typische toepassing vaak gebeurtenissen moet ophalen die zijn opgeslagen in alle Event Hub-partities.
Apparaatclient
Verbinding maken en verifiëren bij een IoT-hub (apparaatclient)
Als u via AMQP verbinding wilt maken met een IoT-hub, kan een apparaat gebruikmaken van op claims gebaseerde beveiliging (CBS) of SASL-verificatie (Simple Authentication and Security Layer).
De volgende informatie is vereist voor de apparaatclient:
Gegevens | Weergegeven als |
---|---|
Hostnaam van IoT Hub | <iot-hub-name>.azure-devices.net |
Toegangssleutel | Een primaire of secundaire sleutel die is gekoppeld aan het apparaat |
Shared Access Signature | Een handtekening voor gedeelde toegang met een korte levensduur in de volgende indeling: SharedAccessSignature sig={signature-string}&se={expiry}&skn={policyName}&sr={URL-encoded-resourceURI} . Zie Toegang tot IoT Hub beheren om de code voor het genereren van deze handtekening op te halen. |
In het volgende codefragment wordt de uAMQP-bibliotheek in Python gebruikt om via een afzenderkoppeling verbinding te maken met een IoT-hub.
import uamqp
import urllib
import uuid
# Use generate_sas_token implementation available here:
# https://learn.microsoft.com/azure/iot-hub/iot-hub-devguide-security#sas-token-structure
from helper import generate_sas_token
iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
device_id = '<device-id>'
access_key = '<primary-or-secondary-key>'
username = '{device_id}@sas.{iot_hub_name}'.format(
device_id=device_id, iot_hub_name=iot_hub_name)
sas_token = generate_sas_token('{hostname}/devices/{device_id}'.format(
hostname=hostname, device_id=device_id), access_key, None)
# e.g., '/devices/{device_id}/messages/devicebound'
operation = '<operation-link-name>'
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
urllib.quote_plus(sas_token), hostname, operation)
receive_client = uamqp.ReceiveClient(uri, debug=True)
send_client = uamqp.SendClient(uri, debug=True)
De volgende koppelingspaden worden ondersteund als apparaatbewerkingen:
Gemaakt door | Type koppeling | Koppelingspad | Beschrijving |
---|---|---|---|
Apparaten | Ontvangerkoppeling | /devices/<deviceID>/messages/devicebound |
Cloud-naar-apparaat-berichten die zijn bestemd voor apparaten, worden via deze koppeling ontvangen door elk doelapparaat. |
Apparaten | Koppeling naar afzender | /devices/<deviceID>/messages/events |
Apparaat-naar-cloud-berichten die vanaf een apparaat worden verzonden, worden via deze koppeling verzonden. |
Apparaten | Koppeling naar afzender | /messages/serviceBound/feedback |
Feedback over cloud-naar-apparaatberichten die via deze koppeling door apparaten naar de service zijn verzonden. |
Cloud-naar-apparaat-opdrachten ontvangen (apparaatclient)
Cloud-naar-apparaat-opdrachten die naar apparaten worden verzonden, komen binnen via een /devices/<deviceID>/messages/devicebound
koppeling. Apparaten kunnen deze berichten in batches ontvangen en de nettolading van berichtgegevens, berichteigenschappen, aantekeningen of toepassingseigenschappen in het bericht gebruiken, indien nodig.
In het volgende codefragment wordt de uAMQP-bibliotheek in Python gebruikt om cloud-naar-apparaat-berichten te ontvangen door een apparaat.
# ...
# Create a receive client for the cloud-to-device receive link on the device
operation = '/devices/{device_id}/messages/devicebound'.format(
device_id=device_id)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
urllib.quote_plus(sas_token), hostname, operation)
receive_client = uamqp.ReceiveClient(uri, debug=True)
while True:
batch = receive_client.receive_message_batch(max_batch_size=5)
for msg in batch:
print('*** received a message ***')
print(''.join(msg.get_data()))
# Property 'to' is set to: '/devices/device1/messages/devicebound',
print('\tto: ' + str(msg.properties.to))
# Property 'message_id' is set to value provided by the service
print('\tmessage_id: ' + str(msg.properties.message_id))
# Other properties are present if they were provided by the service
print('\tcreation_time: ' + str(msg.properties.creation_time))
print('\tcorrelation_id: ' +
str(msg.properties.correlation_id))
print('\tcontent_type: ' + str(msg.properties.content_type))
print('\treply_to_group_id: ' +
str(msg.properties.reply_to_group_id))
print('\tsubject: ' + str(msg.properties.subject))
print('\tuser_id: ' + str(msg.properties.user_id))
print('\tgroup_sequence: ' +
str(msg.properties.group_sequence))
print('\tcontent_encoding: ' +
str(msg.properties.content_encoding))
print('\treply_to: ' + str(msg.properties.reply_to))
print('\tabsolute_expiry_time: ' +
str(msg.properties.absolute_expiry_time))
print('\tgroup_id: ' + str(msg.properties.group_id))
# Message sequence number in the built-in event hub
print('\tx-opt-sequence-number: ' +
str(msg.annotations['x-opt-sequence-number']))
Telemetrieberichten verzenden (apparaatclient)
U kunt ook telemetrieberichten vanaf een apparaat verzenden met AMQP. Het apparaat kan desgewenst een woordenlijst met toepassingseigenschappen of verschillende berichteigenschappen opgeven, zoals bericht-id.
Als u berichten wilt routeren op basis van de berichttekst, moet u de content_type
eigenschap instellen op application/json;charset=utf-8
. Raadpleeg de syntaxisdocumentatie voor ioT Hub-berichtroutering voor meer informatie over routeringsberichten op basis van berichteigenschappen of berichttekst.
In het volgende codefragment wordt de uAMQP-bibliotheek in Python gebruikt om apparaat-naar-cloud-berichten van een apparaat te verzenden.
# ...
# Create a send client for the device-to-cloud send link on the device
operation = '/devices/{device_id}/messages/events'.format(device_id=device_id)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username), urllib.quote_plus(sas_token), hostname, operation)
send_client = uamqp.SendClient(uri, debug=True)
# Set any of the applicable message properties
msg_props = uamqp.message.MessageProperties()
msg_props.message_id = str(uuid.uuid4())
msg_props.creation_time = None
msg_props.correlation_id = None
msg_props.content_type = 'application/json;charset=utf-8'
msg_props.reply_to_group_id = None
msg_props.subject = None
msg_props.user_id = None
msg_props.group_sequence = None
msg_props.to = None
msg_props.content_encoding = None
msg_props.reply_to = None
msg_props.absolute_expiry_time = None
msg_props.group_id = None
# Application properties in the message (if any)
application_properties = { "app_property_key": "app_property_value" }
# Create message
msg_data = b"Your message payload goes here"
message = uamqp.Message(msg_data, properties=msg_props, application_properties=application_properties)
send_client.queue_message(message)
results = send_client.send_all_messages()
for result in results:
if result == uamqp.constants.MessageState.SendFailed:
print result
Aanvullende opmerkingen
De AMQP-verbindingen kunnen worden onderbroken vanwege een netwerkstoring of het verlopen van het verificatietoken (gegenereerd in de code). De serviceclient moet deze omstandigheden afhandelen en de verbinding en koppelingen herstellen, indien nodig. Als een verificatietoken verloopt, kan de client een verbinding stoppen door het token proactief te vernieuwen voordat het is verlopen.
Uw client moet af en toe koppelingsomleidingen correct kunnen afhandelen. Als u een dergelijke bewerking wilt begrijpen, raadpleegt u de documentatie van uw AMQP-client.
Volgende stappen
Zie de AMQP v1.0-specificatie voor meer informatie over het AMQP-protocol.
Zie voor meer informatie over IoT Hub-berichten: