你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
收发云到设备消息
Azure IoT 中心是一项完全托管的服务,可实现双向通信,包括从解决方案后端到数百万台设备的云到设备 (C2D) 消息。
本文介绍如何使用 Azure IoT SDK 生成以下类型的应用程序:
接收和处理来自 IoT 中心消息队列的云到设备消息的设备应用程序。
通过 IoT 中心消息队列将云到设备消息发送到单个设备的后端应用程序。
本文旨在补充本文中引用的可运行 SDK 示例。
注意
本文所述的功能只能用于 IoT 中心的标准层。 有关 IoT 中心基本层和标准/免费层的详细信息,请参阅选择适合你的解决方案的 IoT 中心层。
概述
要使设备应用程序接收云到设备消息,它必须连接到 IoT 中心,然后设置消息处理程序来处理传入消息。 Azure IoT 中心设备 SDK 提供类和方法,设备可以使用这些类和方法接收和处理来自服务的消息。 本文讨论接收消息的任何设备应用程序的关键元素,包括:
- 声明设备客户端对象
- 连接到 IoT 中心
- 从 IoT 中心消息队列检索消息
- 处理消息并将确认发送回 IoT 中心
- 配置接收消息重试策略
若要使后端应用程序发送云到设备消息,它必须连接到 IoT 中心并通过 IoT 中心消息队列发送消息。 Azure IoT 中心服务 SDK 提供类和方法,应用程序可使用这些类和方法将消息发送到设备。 本文讨论向设备发送消息的任何应用程序的关键元素,包括:
- 声明服务客户端对象
- 连接到 IoT 中心
- 生成并发送消息
- 接收送达反馈
- 配置发送消息重试策略
了解消息队列
若要了解云到设备消息传送,请务必了解有关 IoT 中心设备消息队列工作原理的一些基础知识。
系统通过 IoT 中心路由从解决方案后端应用程序发送到 IoT 设备的云到设备消息。 解决方案后端应用程序和目标设备之间没有直接对等消息传送通信。 IoT 中心将传入消息放入其消息队列中,可供目标 IoT 设备下载。
为了保证至少一次消息传递,IoT 中心将云到设备消息保存在每个设备的队列中。 在 IoT 中心从队列中删除消息之前,设备必须显式确认消息的相关操作已完成。 此方法保证了连接失败和设备故障时能够恢复。
当 IoT 中心将消息放入设备消息队列时,它将消息状态设置为“已排队”。 当设备线程从队列获取消息时,IoT 中心通过将消息状态设置为“不可见”来锁定消息。 此状态可防止设备上的其他线程处理同一消息。 当设备线程成功完成消息处理时,它会通知 IoT 中心,然后 IoT 中心会将消息状态设置为“已完成”。
如果设备应用程序已成功接收和处理消息,则称为已完成消息。 但是,如有必要,设备还可以:
- 拒绝消息,这会使 IoT 中心将此消息设置为“死信”状态。 通过消息队列遥测传输 (MQTT) 协议进行连接的设备无法拒绝云到设备的消息。
- 放弃消息,这会使 IoT 中心将消息放回队列,并将消息状态设置为“已排队”。 通过 MQTT 协议连接的设备无法放弃云到设备的消息。
若要获取有关云到设备消息生命周期以及 IoT 中心如何处理云到设备消息的详细信息,请参阅从 IoT 中心发送云到设备的消息。
创建设备应用程序
本部分介绍如何接收云到设备的消息。
设备客户端应用程序可以使用两个选项来接收消息:
- 回调:设备应用程序设置一个异步消息处理程序方法,系统在消息到达时立即调用该方法。
- 轮询:设备应用程序使用代码循环(例如,
while
或for
循环)检查新的 IoT 中心消息。 循环持续执行,检查消息。
所需设备 NuGet 包
使用 C# 编写的设备客户端应用程序需要 NuGet 包“Microsoft.Azure.Devices.Client”。
添加这些 using
语句以使用设备库。
using Microsoft.Azure.Devices.Client;
using Microsoft.Azure.Devices.Shared;
将设备连接到 IoT 中心
设备应用可以使用以下方法向 IoT 中心进行身份验证:
- 共享访问密钥
- X.509 证书
重要
本文包括使用共享访问签名(也称为对称密钥身份验证)连接设备的步骤。 此身份验证方法便于测试和评估,但使用 X.509 证书对设备进行身份验证是一种更安全的方法。 若要了解详细信息,请参阅“安全最佳做法 > 连接安全性”。
使用共享访问密钥进行身份验证
DeviceClient 类会公开在设备上接收消息所需的所有方法。
使用 CreateFromConnectionString 方法向 DeviceClient
提供 IoT 中心主连接字符串和设备 ID。 除了所需的 IoT 中心主连接字符串外,还可以重载 CreateFromConnectionString
方法以包括以下可选参数:
transportType
- 传输协议:HTTP 版本 1、AMQP 或 MQTT 的变体。AMQP
是默认值。 若要查看所有可用值,请参阅 TransportType 枚举。transportSettings
- 用于定义DeviceClient
和ModuleClient
的各种传输特定设置的接口。 有关详细信息,请参阅 ITransportSettings 接口。ClientOptions
- 允许在初始化期间配置设备或模块客户端实例的选项。
此示例使用 Mqtt
传输协议连接到设备。
static string DeviceConnectionString = "{IoT hub device connection string}";
static deviceClient = null;
deviceClient = DeviceClient.CreateFromConnectionString(DeviceConnectionString,
TransportType.Mqtt);
使用 X.509 证书进行身份验证
使用 X.509 证书将设备连接到 IoT 中心:
使用 DeviceAuthenticationWithX509Certificate 创建包含设备和证书信息的对象。
DeviceAuthenticationWithX509Certificate
将作为第二个参数传递给DeviceClient.Create
(步骤 2)。使用 DeviceClient.Create 通过 X.509 证书将设备连接到 IoT 中心。
在此示例中,设备和证书信息将填充在传递给 DeviceClient.Create
的 auth
DeviceAuthenticationWithX509Certificate
对象中。
清楚起见,此示例以本地变量的形式显示证书输入参数值。 在生产系统中,请将敏感输入参数存储在环境变量中或其他更安全的存储位置。 例如,使用 Environment.GetEnvironmentVariable("HOSTNAME")
读取主机名环境变量。
RootCertPath = "~/certificates/certs/sensor-thl-001-device.cert.pem";
Intermediate1CertPath = "~/certificates/certs/sensor-thl-001-device.intermediate1.cert.pem";
Intermediate2CertPath = "~/certificates/certs/sensor-thl-001-device.intermediate2.cert.pem";
DevicePfxPath = "~/certificates/certs/sensor-thl-001-device.cert.pfx";
DevicePfxPassword = "1234";
DeviceName = "MyDevice";
HostName = "xxxxx.azure-devices.net";
var chainCerts = new X509Certificate2Collection();
chainCerts.Add(new X509Certificate2(RootCertPath));
chainCerts.Add(new X509Certificate2(Intermediate1CertPath));
chainCerts.Add(new X509Certificate2(Intermediate2CertPath));
using var deviceCert = new X509Certificate2(DevicePfxPath, DevicePfxPassword);
using var auth = new DeviceAuthenticationWithX509Certificate(DeviceName, deviceCert, chainCerts);
using var deviceClient = DeviceClient.Create(
HostName,
auth,
TransportType.Amqp);
有关证书身份验证的详细信息,请参阅:
代码示例
有关设备 X.509 证书身份验证的工作示例,请参阅:
回调
若要在设备应用程序中接收回调云到设备消息,应用程序必须连接到 IoT 中心并设置回调侦听器来处理传入消息。 系统从 IoT 中心消息队列接收设备的传入消息。
通过回调,设备应用程序可使用 SetReceiveMessageHandlerAsync 设置消息处理程序方法。 调用消息处理程序,然后接收消息。 创建回调方法来接收消息,无需不断轮询接收的消息。
回调仅适用于以下协议:
Mqtt
Mqtt_WebSocket_Only
Mqtt_Tcp_Only
Amqp
Amqp_WebSocket_Only
Amqp_Tcp_only
Http1
协议选项不支持回调,因为 SDK 方法无论如何都需要轮询接收到的消息,这违反了回调原则。
在此示例中,SetReceiveMessageHandlerAsync
设置了一个名为 OnC2dMessageReceivedAsync
的回调处理程序方法,每次收到消息时都会调用该方法。
// Subscribe to receive C2D messages through a callback (which isn't supported over HTTP).
await deviceClient.SetReceiveMessageHandlerAsync(OnC2dMessageReceivedAsync, deviceClient);
Console.WriteLine($"\n{DateTime.Now}> Subscribed to receive C2D messages over callback.");
轮询
轮询使用 ReceiveAsync 检查消息。
对 ReceiveAsync
的调用可以采用以下形式:
ReceiveAsync()
- 等待消息的默认超时期限,然后再继续。ReceiveAsync (Timespan)
- 使用特定超时从设备队列接收消息。ReceiveAsync (CancellationToken)
- 使用取消令牌从设备队列接收消息。 使用取消令牌时,不使用默认超时期限。
当使用传输类型 HTTP 1(而不是 MQTT 或 AMQP)时,ReceiveAsync
方法会立即返回。 对于使用 HTTP 1 的云到设备消息,其支持模式是间歇连接到设备,且不常检查消息(至少每 25 分钟检查一次)。 发出更多 HTTP 1 接收会导致 IoT 中心限制请求。 有关 MQTT、AMQP 和 HTTP 1 支持之间的差异的详细信息,请参阅云到设备通信指南和选择通信协议。
CompleteAsync 方法
设备收到消息后,设备应用程序调用 CompleteAsync 方法以通知 IoT 中心该消息已成功处理,并且可以安全地从 IoT 中心设备队列中删除该消息。 无论设备使用何种传输协议,其都应在处理成功完成后调用此方法。
放弃、拒绝消息或消息超时
使用 AMQP 和 HTTP 版本 1 协议(但不是 MQTT 协议),设备还可以:
- 通过调用 AbandonAsync 放弃消息。 这会使 IoT 中心将消息保留在设备队列中供将来使用。
- 通过调用 RejectAsync 拒绝消息。 这会永久删除设备队列中的消息。
如果发生阻止设备完成、放弃或拒绝消息的情况,IoT 中心会在固定的超时期限过后再次对消息排队以进行传递。 因此,设备应用中的消息处理逻辑必须是幂等的,这样,多次接收相同的消息才会生成相同的结果。
若要获取有关云到设备消息生命周期以及 IoT 中心如何处理云到设备消息的详细信息,请参阅从 IoT 中心发送云到设备的消息。
轮询循环
使用轮询,应用程序使用代码循环反复调用 ReceiveAsync
方法来检查新消息,直到停止为止。
如果使用带有超时值或默认超时的 ReceiveAsync
,则在循环中对 ReceiveAsync
的每次调用都会等待指定的超时期限。 如果 ReceiveAsync
超时,则返回 null
值并继续循环。
收到消息时,ReceiveAsync
将返回一个 Task 对象,该对象应传递给 CompleteAsync。 调用 CompleteAsync
会根据 Task
参数通知 IoT 中心从消息队列中删除指定的消息。
在此示例中,循环会在收到消息或轮询循环停止之前调用 ReceiveAsync
。
static bool stopPolling = false;
while (!stopPolling)
{
// Check for a message. Wait for the default DeviceClient timeout period.
using Message receivedMessage = await _deviceClient.ReceiveAsync();
// Continue if no message was received
if (receivedMessage == null)
{
continue;
}
else // A message was received
{
// Print the message received
Console.WriteLine($"{DateTime.Now}> Polling using ReceiveAsync() - received message with Id={receivedMessage.MessageId}");
PrintMessage(receivedMessage);
// Notify IoT Hub that the message was received. IoT Hub will delete the message from the message queue.
await _deviceClient.CompleteAsync(receivedMessage);
Console.WriteLine($"{DateTime.Now}> Completed C2D message with Id={receivedMessage.MessageId}.");
}
// Check to see if polling loop should end
stopPolling = ShouldPollingstop ();
}
接收消息重试策略
可以使用 DeviceClient.SetRetryPolicy 定义设备客户端消息重试策略。
消息重试超时存储在 DeviceClient.OperationTimeoutInMilliseconds 属性中。
SDK 接收消息示例
.NET/C# SDK 包含一个消息接收示例,其中包含本节中描述的接收消息方法。
创建后端应用程序
本部分介绍使用适用于 .NET 的 Azure IoT SDK 中的 ServiceClient 类将消息从解决方案后端应用程序发送到 IoT 设备的基本代码。 如前所述,解决方案后端应用程序会连接到 IoT 中心,并将消息发送到使用目标设备编码的 IoT 中心。 IoT 中心将传入的消息存储到其消息队列中,然后消息从 IoT 中心消息队列传递到目标设备。
解决方案后端应用程序还可以请求并接收发送到 IoT 中心的消息的传递反馈,该消息旨在通过消息队列传送到设备。
添加服务 NuGet 包
后端服务应用程序需要 NuGet 包“Microsoft.Azure.Devices”。
连接到 IoT 中心
可使用以下方法将后端服务连接到 IoT 中心:
- 共享访问策略
- Microsoft Entra
重要
本文介绍使用共享访问签名连接到服务的步骤。 虽然可使用此身份验证方法进行测试和评估,但使用 Microsoft Entra ID 或托管标识对设备进行身份验证是一种更安全的方法。 有关详细信息,请参阅安全最佳做法和云安全。
使用共享访问策略进行连接
使用 CreateFromConnectionString 将后端应用程序连接到设备。 除了所需的 IoT 中心主连接字符串外,还可以重载 CreateFromConnectionString
方法以包括以下可选参数:
transportType
-Amqp
或Amqp_WebSocket_Only
。transportSettings
- 服务客户端的 AMQP 和 HTTP 代理设置。ServiceClientOptions
- 允许在初始化期间配置服务客户端实例的选项。 有关详细信息,请参阅 ServiceClientOptions。
此示例使用 IoT 中心连接字符串和默认 Amqp
传输创建 ServiceClient
对象。
static string connectionString = "{your IoT hub connection string}";
serviceClient = ServiceClient.CreateFromConnectionString(connectionString);
使用 Microsoft Entra 进行连接
使用 Microsoft Entra 的后端应用必须在连接到 IoT 中心之前成功完成身份验证并获取安全令牌凭据。 此令牌将传递给 IoT 中心连接方法。 有关为 IoT 中心设置和使用 Microsoft Entra 的一般信息,请参阅使用 Microsoft Entra ID 控制对 IoT 中心的访问。
配置 Microsoft Entra 应用
必须设置一个针对首选身份验证凭据进行配置的 Microsoft Entra 应用。 该应用包含由后端应用程序用来进行身份验证的参数,例如客户端机密。 可用的应用身份验证配置如下:
- 客户端机密
- 证书
- 联合标识凭据
根据正在执行的操作,Microsoft Entra 应用可能需要特定的角色权限。 例如,需要 IoT 中心孪生参与者角色才能启用对 IoT 中心设备和模块孪生的读写访问权限。 有关详细信息,请参阅使用 Azure RBAC 角色分配管理对 IoT 中心的访问。
有关设置 Microsoft Entra 应用的详细信息,请参阅快速入门:将应用程序注册到 Microsoft 标识平台。
使用 DefaultAzureCredential 进行身份验证
使用 Microsoft Entra 对后端应用程序进行身份验证的最简单方法是使用 DefaultAzureCredential,但建议在生产环境中使用其他方法,包括特定的 TokenCredential
或精简的 ChainedTokenCredential
。 为简单起见,本部分介绍如何使用 DefaultAzureCredential
和客户端机密进行身份验证。 有关使用 DefaultAzureCredential
的利弊的详细信息,请参阅 DefaultAzureCredential 的使用指南。
DefaultAzureCredential
支持不同的身份验证机制,并根据其执行环境确定相应的凭据类型。 它会尝试按顺序使用多种凭据类型,直到找到有效的凭据。
Microsoft Entra 需要这些 NuGet 包和相应的 using
语句:
- Azure.Core
- Azure.Identity
using Azure.Core;
using Azure.Identity;
在此示例中,Microsoft Entra 应用注册客户端机密、客户端 ID 和租户 ID 将添加到环境变量中。 DefaultAzureCredential
使用这些环境变量对应用程序进行身份验证。 成功 Microsoft Entra 身份验证的结果是传递给 IoT 中心连接方法的安全令牌凭据。
string clientSecretValue = "xxxxxxxxxxxxxxx";
string clientID = "xxxxxxxxxxxxxx";
string tenantID = "xxxxxxxxxxxxx";
Environment.SetEnvironmentVariable("AZURE_CLIENT_SECRET", clientSecretValue);
Environment.SetEnvironmentVariable("AZURE_CLIENT_ID", clientID);
Environment.SetEnvironmentVariable("AZURE_TENANT_ID", tenantID);
TokenCredential tokenCredential = new DefaultAzureCredential();
然后可以将生成的 TokenCredential 传递给任何接受 Microsoft Entra 凭据的 SDK 客户端的 IoT 中心连接方法:
在此示例中,TokenCredential
将传递给 ServiceClient.Create
,以创建 ServiceClient 连接对象。
string hostname = "xxxxxxxxxx.azure-devices.net";
using var serviceClient = ServiceClient.Create(hostname, tokenCredential, TransportType.Amqp);
在此示例中,TokenCredential
将传递给 RegistryManager.Create
,以创建 RegistryManager 对象。
string hostname = "xxxxxxxxxx.azure-devices.net";
registryManager = RegistryManager.Create(hostname, tokenCredential);
代码示例
有关 Microsoft Entra 服务身份验证的有效示例,请参阅基于角色的身份验证示例。
发送异步云到设备消息
在使用 sendAsync 的情况下,通过云(IoT 中心)将异步消息从应用程序发送到设备。 系统使用 AMQP 协议执行调用。
sendAsync
使用以下参数:
deviceID
- 目标设备的字符串标识符。message
- 云到设备消息。 该消息属于 Message 类型,可以进行相应的格式化。timeout
- 可选的超时值。 如果未指定,则默认值为 1 分钟。
此示例向目标设备发送一条具有 10 秒超时值的测试消息。
string targetDevice = "Device-1";
static readonly TimeSpan operationTimeout = TimeSpan.FromSeconds(10);
var commandMessage = new
Message(Encoding.ASCII.GetBytes("Cloud to device message."));
await serviceClient.SendAsync(targetDevice, commandMessage, operationTimeout);
接收送达反馈
发送程序可以向 IoT 中心请求每条云到设备消息的送达(或过期)确认。 此选项使发送程序能够使用通知、重试或补偿逻辑。 若要了解消息反馈操作和属性的完整描述,请参阅消息反馈。
若要接收邮件传递反馈,请执行以下操作:
- 创建
feedbackReceiver
对象 - 使用
Ack
参数发送消息 - 等待接收反馈
创建 feedbackReceiver 对象
调用 GetFeedbackReceiver 以创建 FeedbackReceiver 对象。 FeedbackReceiver
包含服务可以用来执行反馈接收操作的方法。
var feedbackReceiver = serviceClient.GetFeedbackReceiver();
使用 Ack 参数发送消息
每条消息都必须包含一个发送确认 Ack 属性的值,以便接收发送反馈。 Ack
属性可以是下列值之一:
none(默认):不生成反馈消息。
Positive
:如果消息已完成,则会收到反馈消息。Negative
:如果消息过期(或达到发送次数上限)且设备尚未完成,则接收反馈消息。Full
:对Positive
和Negative
结果的反馈。
在此示例中,已将 Ack
属性设置为 Full
,请求对一条消息的正面或负面的消息发送反馈。
var commandMessage = new
Message(Encoding.ASCII.GetBytes("Cloud to device message."));
commandMessage.Ack = DeliveryAcknowledgement.Full;
await serviceClient.SendAsync(targetDevice, commandMessage);
等待接收反馈
定义 CancellationToken
。 然后在循环中重复调用 ReceiveAsync,检查发送反馈消息。 每次调用 ReceiveAsync
都会等待为 ServiceClient
对象定义的超时期限。
- 如果
ReceiveAsync
超时已到期且未收到任何消息,则ReceiveAsync
会返回null
并且循环继续。 - 如果收到反馈消息,则
ReceiveAsync
将返回一个 Task 对象,该对象应与取消令牌一起传递给 CompleteAsync。 对CompleteAsync
的调用会根据Task
参数从消息队列中删除指定的已发送消息。 - 如有需要,接收代码可以调用 AbandonAsync 将发送消息放回队列。
var feedbackReceiver = serviceClient.GetFeedbackReceiver();
// Define the cancellation token.
CancellationTokenSource source = new CancellationTokenSource();
CancellationToken token = source.Token;
// Call ReceiveAsync, passing the token. Wait for the timeout period.
var feedbackBatch = await feedbackReceiver.ReceiveAsync(token);
if (feedbackBatch == null) continue;
此示例显示了包含这些步骤的方法。
private async static void ReceiveFeedbackAsync()
{
var feedbackReceiver = serviceClient.GetFeedbackReceiver();
Console.WriteLine("\nReceiving c2d feedback from service");
while (true)
{
// Check for messages, wait for the timeout period.
var feedbackBatch = await feedbackReceiver.ReceiveAsync();
// Continue the loop if null is received after a timeout.
if (feedbackBatch == null) continue;
Console.ForegroundColor = ConsoleColor.Yellow;
Console.WriteLine("Received feedback: {0}",
string.Join(", ", feedbackBatch.Records.Select(f => f.StatusCode)));
Console.ResetColor();
await feedbackReceiver.CompleteAsync(feedbackBatch);
}
}
请注意,此反馈接收模式与设备应用程序中接收云到设备消息所使用的模式类似。
服务客户端重新连接
遇到异常时,服务客户端会将该信息传递给调用应用程序。 此时,建议检查异常详细信息并采取必要的操作。
例如:
- 如果是网络异常,可以重试该操作。
- 如果是安全异常(未经授权的异常),请检查凭据并确保它们是最新的。
- 如果是超出限制/配额异常,请监视和/或修改发送请求的频率,或更新中心实例缩放单元。 有关详细信息,请参阅 IoT 中心配额和限制。
发送消息重试策略
可以使用 ServiceClient.SetRetryPolicy 定义 ServiceClient
消息重试策略。
SDK 发送消息示例
.NET/C# SDK 包含一个服务客户端示例,其中包含本节中描述的发送消息方法。
创建设备应用程序
本部分介绍如何使用适用于 Java 的 Azure IoT SDK 中的 DeviceClient 类接收云到设备消息。
对于基于 Java 的设备应用程序来说,要接收云到设备消息,它必须连接到 IoT 中心,然后设置回调侦听器和消息处理程序来处理来自 IoT 中心的传入消息。
导入 Azure IoT Java SDK 库
本文引用的代码使用了这些 SDK 库。
import com.microsoft.azure.sdk.iot.device.*;
import com.microsoft.azure.sdk.iot.device.exceptions.IotHubClientException;
import com.microsoft.azure.sdk.iot.device.transport.IotHubConnectionStatus;
将设备连接到 IoT 中心
设备应用可以使用以下方法向 IoT 中心进行身份验证:
- 共享访问密钥
- X.509 证书
重要
本文包括使用共享访问签名(也称为对称密钥身份验证)连接设备的步骤。 此身份验证方法便于测试和评估,但使用 X.509 证书对设备进行身份验证是一种更安全的方法。 若要了解详细信息,请参阅“安全最佳做法 > 连接安全性”。
使用共享访问密钥进行身份验证
DeviceClient 对象实例化需要以下参数:
- connString - IoT 设备连接字符串。 连接字符串是一组以“;”分隔的键值对,其中键和值以“=”分隔。 它应包含这些键的值:
HostName, DeviceId, and SharedAccessKey
。 - 传输协议 -
DeviceClient
连接可以使用以下 IoTHubClientProtocol 传输协议之一。AMQP
功能最丰富,允许频繁检查消息,并允许拒绝和取消消息。 MQTT 不支持拒绝或放弃消息的方法:AMQPS
AMQPS_WS
HTTPS
MQTT
MQTT_WS
例如:
static string connectionString = "{IOT hub device connection string}";
static protocol = IotHubClientProtocol.AMQPS;
DeviceClient client = new DeviceClient(connectionString, protocol);
使用 X.509 证书进行身份验证
使用 X.509 证书将设备连接到 IoT 中心:
- 使用 buildSSLContext 生成 SSLContext 对象。
- 将
SSLContext
信息添加到 ClientOptions 对象。 - 使用
ClientOptions
信息调用 DeviceClient,以创建设备到 IoT 中心的连接。
清楚起见,此示例以本地变量的形式显示证书输入参数值。 在生产系统中,请将敏感输入参数存储在环境变量中或其他更安全的存储位置。 例如,使用 Environment.GetEnvironmentVariable("PUBLICKEY")
读取公钥证书字符串环境变量。
private static final String publicKeyCertificateString =
"-----BEGIN CERTIFICATE-----\n" +
"XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\n" +
"XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\n" +
"-----END CERTIFICATE-----\n";
//PEM encoded representation of the private key
private static final String privateKeyString =
"-----BEGIN EC PRIVATE KEY-----\n" +
"XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\n" +
"XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\n" +
"-----END EC PRIVATE KEY-----\n";
SSLContext sslContext = SSLContextBuilder.buildSSLContext(publicKeyCertificateString, privateKeyString);
ClientOptions clientOptions = ClientOptions.builder().sslContext(sslContext).build();
DeviceClient client = new DeviceClient(connString, protocol, clientOptions);
有关证书身份验证的详细信息,请参阅:
代码示例
有关设备 X.509 证书身份验证的工作示例,请参阅:
设置消息回调方法
使用 setMessageCallback 方法定义一个消息处理程序方法,通过使用该方法,可在从 IoT 中心收到消息时收到通知。
setMessageCallback
包括这些参数:
callback
- 回调方法名称。 可以为null
。context
- 类型为object
的可选上下文。 如果未指定,则使用null
。
在此示例中,名为 MessageCallback
且没有上下文参数的 callback
方法被传递给 setMessageCallback
。
client.setMessageCallback(new MessageCallback(), null);
创建消息回调处理程序
回调消息处理程序接收并处理从 IoT 中心消息队列传递的传入消息。
在此示例中,消息处理程序处理传入消息,然后返回 IotHubMessageResult.COMPLETE。 IotHubMessageResult.COMPLETE
返回值会通知 IoT 中心该消息已成功处理,并且可以安全地从设备队列中删除该消息。 当处理成功完成时,设备应该返回 IotHubMessageResult.COMPLETE
,通知 IoT 中心应该从消息队列中删除该消息,无论使用何种协议均应如此。
protected static class MessageCallback implements com.microsoft.azure.sdk.iot.device.MessageCallback
{
public IotHubMessageResult onCloudToDeviceMessageReceived(Message msg, Object context)
{
System.out.println(
"Received message with content: " + new String(msg.getBytes(), Message.DEFAULT_IOTHUB_MESSAGE_CHARSET));
// Notify IoT Hub that the message
return IotHubMessageResult.COMPLETE;
}
}
消息放弃和拒绝选项
尽管设备应该能够成功接收大量传入消息并触发 IotHubMessageResult.COMPLETE
,但可能需要放弃或拒绝某条消息。
- 通过使用 AMQP 和 HTTPS(而不是 MQTT),应用程序可以:
IotHubMessageResult.ABANDON
消息。 IoT 中心会重新将消息排队,并在以后再次发送。IotHubMessageResult.REJECT
消息。 IoT 中心不会重新将消息排队,并永久删除消息队列中的消息。
- 使用
MQTT
或MQTT_WS
的客户端无法ABANDON
或REJECT
消息。
如果发生阻止设备完成、放弃或拒绝消息的情况,IoT 中心会在固定的超时期限过后再次对消息排队以进行传递。 因此,设备应用中的消息处理逻辑必须是幂等的,这样,多次接收相同的消息才会生成相同的结果。
若要获取有关云到设备消息生命周期以及 IoT 中心如何处理云到设备消息的详细信息,请参阅从 IoT 中心发送云到设备的消息。
注意
如果使用 HTTPS(而不使用 MQTT 或 AMQP)作为传输,则 DeviceClient 实例不会频繁检查 IoT 中心发来的消息(频率最低为每 25 分钟一次)。 有关 MQTT、AMQP 和 HTTPS 支持之间的差异的详细信息,请参阅云到设备通信指南和选择通信协议。
创建消息状态回调方法
应用程序可以使用 registerConnectionStatusChangeCallback 注册一个当设备连接状态改变时执行的回调方法。 这样,应用程序可以检测到中断的消息连接并尝试重新连接。
在此示例中,我们将 IotHubConnectionStatusChangeCallbackLogger
注册为连接状态改变回调方法。
client.registerConnectionStatusChangeCallback(new IotHubConnectionStatusChangeCallbackLogger(), new Object());
系统会触发回调并传递 ConnectionStatusChangeContext
对象。
调用 connectionStatusChangeContext.getNewStatus()
以获取当前连接状态。
IotHubConnectionStatus status = connectionStatusChangeContext.getNewStatus();
返回的连接状态可以是以下值之一:
IotHubConnectionStatus.DISCONNECTED
IotHubConnectionStatus.DISCONNECTED_RETRYING
IotHubConnectionStatus.CONNECTED
调用 connectionStatusChangeContext.getNewStatusReason()
以获取连接状态更改的原因。
IotHubConnectionStatusChangeReason statusChangeReason = connectionStatusChangeContext.getNewStatusReason();
调用 connectionStatusChangeContext.getCause()
以查找连接状态更改的原因。 如果没有可用信息,则 getCause()
可能会返回 null
。
Throwable throwable = connectionStatusChangeContext.getCause();
if (throwable != null)
throwable.printStackTrace();
有关介绍如何使用状态改变回调方法提取连接状态更改、设备状态更改的原因以及上下文的完整示例,请参阅本文 SDK 接收消息示例部分中列出的 HandleMessages 示例。
打开设备和 IoT 中心之间的连接
使用 open 在设备和 IoT 中心之间创建连接。 设备现在可以以异步方式向 IoT 中心发送消息或接收来自 IoT 中心的消息。 如果客户端已打开,该方法将不执行任何操作。
client.open(true);
SDK 接收消息示例
HandleMessages:适用于 Java 的 Microsoft Azure IoT SDK 附带的示例设备应用,可连接到 IoT 中心并接收云到设备的消息。
创建后端应用程序
本部分介绍如何使用适用于 Java 的 Azure IoT SDK 中的 ServiceClient 类发送云到设备消息。 解决方案后端应用程序会连接到 IoT 中心,并将消息发送到使用目标设备编码的 IoT 中心。 IoT 中心将传入的消息存储到其消息队列中,然后消息从 IoT 中心消息队列传递到目标设备。
解决方案后端应用程序还可以请求并接收发送到 IoT 中心的消息的传递反馈,该消息旨在通过消息队列传送到设备。
添加依赖项语句
添加依赖项以使用应用程序中的 iothub-java-service-client 包与 IoT 中心服务通信:
<dependency>
<groupId>com.microsoft.azure.sdk.iot</groupId>
<artifactId>iot-service-client</artifactId>
<version>1.7.23</version>
</dependency>
添加 import 语句
添加这些 import 语句以使用 Azure IoT Java SDK 和异常处理程序。
import com.microsoft.azure.sdk.iot.service.*;
import java.io.IOException;
import java.net.URISyntaxException;
连接到 IoT 中心
可以使用以下方法将后端服务连接到 IoT 中心:
- 共享访问策略
- Microsoft Entra
重要
本文介绍使用共享访问签名连接到服务的步骤。 虽然可使用此身份验证方法进行测试和评估,但使用 Microsoft Entra ID 或托管标识对设备进行身份验证是一种更安全的方法。 有关详细信息,请参阅安全最佳做法和云安全。
使用共享访问策略进行连接
定义连接协议
使用 IotHubServiceClientProtocol 定义服务客户端与 IoT 中心通信所使用的应用层协议。
IotHubServiceClientProtocol
仅接受 AMQPS
或 AMQPS_WS
枚举。
IotHubServiceClientProtocol protocol = IotHubServiceClientProtocol.AMQPS;
创建 ServiceClient 对象
创建 ServiceClient 对象,提供 Iot 中心连接字符串和协议。
String connectionString = "{yourhubconnectionstring}";
ServiceClient serviceClient (connectionString, protocol);
打开应用程序与 IoT 中心之间的连接
使用 open 方法打开 AMQP 发送方连接。 此方法会创建应用程序与 IoT 中心之间的连接。
serviceClient.open();
使用 Microsoft Entra 进行连接
使用 Microsoft Entra 的后端应用必须在连接到 IoT 中心之前成功完成身份验证并获取安全令牌凭据。 此令牌将传递给 IoT 中心连接方法。 有关为 IoT 中心设置和使用 Microsoft Entra 的一般信息,请参阅使用 Microsoft Entra ID 控制对 IoT 中心的访问。
有关 Java SDK 身份验证的概述,请参阅使用 Java 和 Azure 标识进行 Azure 身份验证。
为简单起见,本部分重点介绍如何使用客户端机密进行身份验证。
配置 Microsoft Entra 应用
必须设置一个针对首选身份验证凭据进行配置的 Microsoft Entra 应用。 该应用包含由后端应用程序用来进行身份验证的参数,例如客户端机密。 可用的应用身份验证配置如下:
- 客户端机密
- 证书
- 联合标识凭据
根据正在执行的操作,Microsoft Entra 应用可能需要特定的角色权限。 例如,需要 IoT 中心孪生参与者角色才能启用对 IoT 中心设备和模块孪生的读写访问权限。 有关详细信息,请参阅使用 Azure RBAC 角色分配管理对 IoT 中心的访问。
有关设置 Microsoft Entra 应用的详细信息,请参阅快速入门:将应用程序注册到 Microsoft 标识平台。
使用 DefaultAzureCredential 进行身份验证
使用 Microsoft Entra 对后端应用程序进行身份验证的最简单方法是使用 DefaultAzureCredential,但建议在生产环境中使用其他方法,包括特定的 TokenCredential
或精简的 ChainedTokenCredential
。
有关使用 DefaultAzureCredential
的利弊的详细信息,请参阅适用于 Java 的 Azure 标识客户端库中的凭据链。
DefaultAzureCredential 支持不同的身份验证机制,并根据其执行环境确定相应的凭据类型。 它会尝试按顺序使用多种凭据类型,直到找到有效的凭据。
可以使用 DefaultAzureCredentialBuilder 来验证 Microsoft Entra 应用凭据。 客户端机密 tenantID、clientID 和客户端机密值等连接参数将保存为环境变量。 创建 TokenCredential
后,请将其作为“credential”参数传递给 ServiceClient 或其他生成器。
在此示例中,DefaultAzureCredentialBuilder
尝试根据 DefaultAzureCredential 中描述的列表对连接进行身份验证。 成功 Microsoft Entra 身份验证的结果是传递给 ServiceClient 等构造函数的安全令牌凭据。
TokenCredential defaultAzureCredential = new DefaultAzureCredentialBuilder().build();
使用 ClientSecretCredentialBuilder 进行身份验证
可以使用 ClientSecretCredentialBuilder 来创建使用客户端机密信息的凭据。 如果成功,此方法将返回 TokenCredential,可将其作为“credential”参数传递给 ServiceClient 或其他生成器。
在此示例中,Microsoft Entra 应用注册客户端机密、客户端 ID 和租户 ID 值已添加到环境变量中。 ClientSecretCredentialBuilder
使用这些环境变量来生成凭据。
string clientSecretValue = System.getenv("AZURE_CLIENT_SECRET");
string clientID = System.getenv("AZURE_CLIENT_ID");
string tenantID = System.getenv("AZURE_TENANT_ID");
TokenCredential credential =
new ClientSecretCredentialBuilder()
.tenantId(tenantID)
.clientId(clientID)
.clientSecret(clientSecretValue)
.build();
其他身份验证类
Java SDK 还包括以下使用 Microsoft Entra 对后端应用程序进行身份验证的类:
- AuthorizationCodeCredential
- AzureCliCredential
- AzureDeveloperCliCredential
- AzurePipelinesCredential
- ChainedTokenCredential
- ClientAssertionCredential
- ClientCertificateCredential
- DeviceCodeCredential
- EnvironmentCredential
- InteractiveBrowserCredential
- ManagedIdentityCredential
- OnBehalfOfCredential
代码示例
有关 Microsoft Entra 服务身份验证的有效示例,请参阅基于角色的身份验证示例。
打开消息发送反馈的反馈接收器
可以使用 FeedbackReceiver 接收有关发送到 IoT 中心反馈的消息的发送状态。 FeedbackReceiver
是一种专门的接收器,其 Receive
方法会返回 FeedbackBatch
而不是 Message
。
在此示例中,系统将创建 FeedbackReceiver
对象,并调用 open()
语句来等待反馈。
FeedbackReceiver feedbackReceiver = serviceClient
.getFeedbackReceiver();
if (feedbackReceiver != null) feedbackReceiver.open();
添加消息属性
可以选择使用 setProperties 添加消息属性。 这些属性包含在发送到设备的消息中,设备应用程序在收到相应消息到后可以提取这些属性。
Map<String, String> propertiesToSend = new HashMap<String, String>();
propertiesToSend.put(messagePropertyKey,messagePropertyKey);
messageToSend.setProperties(propertiesToSend);
创建和发送异步消息
Message 对象会存储要发送的消息。 在此示例中,传送了“云到设备消息”。
使用 setDeliveryAcknowledgement 请求已传送/未传送至 IoT 中心的消息队列确认。 在此示例中,无论是已送达还是未送达,请求的确认均是 Full
。
使用 SendAsync 将异步消息从客户端发送到设备。 或者,可以使用 Send
(非异步)方法,但此函数在内部同步,因此一次只允许执行一个发送操作。 将消息从应用程序发送到 IoT 中心。 IoT 中心将消息放入消息队列中,准备发送给目标设备。
Message messageToSend = new Message("Cloud to device message.");
messageToSend.setDeliveryAcknowledgementFinal(DeliveryAcknowledgement.Full);
serviceClient.sendAsync(deviceId, messageToSend);
接收消息发送反馈
消息从应用程序发出后,应用程序可以调用 receive 方法,无论该方法是否带有超时值。 如果未提供超时值,则使用默认超时。 这会传回一个 FeedbackBatch 对象,该对象包含可以检查的消息发送反馈属性。
此示例会创建 FeedbackBatch
接收方,调用 getEnqueuedTimeUtc,并输出消息排队时间。
FeedbackBatch feedbackBatch = feedbackReceiver.receive(10000);
if (feedbackBatch != null) {
System.out.println("Message feedback received, feedback time: "
+ feedbackBatch.getEnqueuedTimeUtc().toString());
}
SDK 发送消息示例
有两个发送消息示例:
创建设备应用程序
本部分介绍如何接收云到设备的消息。
IoTHubDeviceClient 类包括用户创建从设备到 Azure IoT 中心的同步连接的方法以及从 IoT 中心接收消息的方法。
必须安装 azure-iot-device 库才能创建设备应用程序。
pip install azure-iot-device
对于基于 Python 的设备应用程序来说,要接收云到设备消息,它必须连接到 IoT 中心,然后设置回调消息处理程序来处理来自 IoT 中心的传入消息。
设备 import 语句
添加此段代码以从 azure.iot.device SDK 导入 IoTHubDeviceClient
函数。
from azure.iot.device import IoTHubDeviceClient
将设备连接到 IoT 中心
设备应用可以使用以下方法向 IoT 中心进行身份验证:
- 共享访问密钥
- X.509 证书
重要
本文包括使用共享访问签名(也称为对称密钥身份验证)连接设备的步骤。 此身份验证方法便于测试和评估,但使用 X.509 证书对设备进行身份验证是一种更安全的方法。 若要了解详细信息,请参阅“安全最佳做法 > 连接安全性”。
使用共享访问密钥进行身份验证
将设备连接到 IoT 中心:
- 调用 create_from_connection_string,以添加设备主连接字符串。
- 调用 connect 以连接设备客户端。
例如:
# Add your IoT hub primary connection string
CONNECTION_STRING = "{Device primary connection string}"
device_client = IoTHubDeviceClient.create_from_connection_string(CONNECTION_STRING)
# Connect the client
device_client.connect()
使用 X.509 证书进行身份验证
使用 X.509 证书将设备连接到 IoT 中心:
- 使用 create_from_x509_certificate 添加 X.509 证书参数
- 调用 connect 以连接设备客户端
清楚起见,此示例以本地变量的形式显示证书输入参数值。 在生产系统中,请将敏感输入参数存储在环境变量中或其他更安全的存储位置。 例如,使用 os.getenv("HOSTNAME")
读取主机名环境变量。
# The Azure IoT hub name
hostname = "xxxxx.azure-devices.net"
# The device that has been created on the portal using X509 CA signing or self-signing capabilities
device_id = "MyDevice"
# The X.509 certificate file name
cert_file = "~/certificates/certs/sensor-thl-001-device.cert.pfx"
key_file = "~/certificates/certs/sensor-thl-001-device.cert.key"
# The optional certificate pass phrase
pass_phrase = "1234"
x509 = X509(
cert_file,
key_file,
pass_phrase,
)
# The client object is used to interact with your Azure IoT hub.
device_client = IoTHubDeviceClient.create_from_x509_certificate(
hostname=hostname, device_id=device_id, x509=x509
)
# Connect to IoT Hub
await device_client.connect()
有关证书身份验证的详细信息,请参阅:
代码示例
有关设备 X.509 证书身份验证的工作示例,请参阅异步中心场景中文件名以 x509 结尾的示例。
处理重新连接
IoTHubDeviceClient
默认会尝试重新建立断开的连接。 重新连接行为由 IoTHubDeviceClient
connection_retry和 connection_retry_interval
参数控制。
创建消息处理程序
创建消息处理程序函数以处理到设备的传入消息。 这将由 on_message_received
(下一步)分配为回调消息处理程序。
在此示例中,系统会在收到消息时调用 message_handler
。 系统会使用循环将消息属性 (.items
) 输出到控制台。
def message_handler(message):
global RECEIVED_MESSAGES
RECEIVED_MESSAGES += 1
print("")
print("Message received:")
# print data from both system and application (custom) properties
for property in vars(message).items():
print (" {}".format(property))
print("Total calls received: {}".format(RECEIVED_MESSAGES))
分配消息处理程序
使用 on_message_received 方法分配消息处理程序方法。
在此示例中,系统会将名为 message_handler
的消息处理程序方法附加到 IoTHubDeviceClient
client
对象。 client
对象等待从 IoT 中心接收云到设备消息。 此代码最多等待 300 秒(5 分钟)以接收消息,或者在你按下键盘键时停止执行。
try:
# Attach the handler to the client
client.on_message_received = message_handler
while True:
time.sleep(300)
except KeyboardInterrupt:
print("IoT Hub C2D Messaging device sample stopped")
finally:
# Graceful exit
print("Shutting down IoT Hub Client")
client.shutdown()
SDK 接收消息示例
接收消息 - 接收从 Azure IoT 中心发送到设备的云到设备 (C2D) 消息。
创建后端应用程序
本部分介绍如何发送云到设备的消息。 解决方案后端应用程序会连接到 IoT 中心,并将消息发送到使用目标设备编码的 IoT 中心。 IoT 中心将传入的消息存储到其消息队列中,然后消息从 IoT 中心消息队列传递到目标设备。
IoTHubRegistryManager 类将公开创建后端应用程序以从服务与云到设备的消息进行交互所需的所有方法。 必须安装 azure-iot-hub 库才能创建后端服务应用程序。
pip install azure-iot-hub
导入 IoTHubRegistryManager 对象
添加以下 import
语句: IoTHubRegistryManager 包括用于 IoT 中心注册表管理器操作的 API。
from azure.iot.hub import IoTHubRegistryManager
连接到 IoT 中心
可使用以下方法将后端服务连接到 IoT 中心:
- 共享访问策略
- Microsoft Entra
重要
本文介绍使用共享访问签名连接到服务的步骤。 虽然可使用此身份验证方法进行测试和评估,但使用 Microsoft Entra ID 或托管标识对设备进行身份验证是一种更安全的方法。 有关详细信息,请参阅安全最佳做法和云安全。
使用共享访问策略进行连接
使用 from_connection_string 连接到 IoT 中心。
例如:
IoTHubConnectionString = "{IoT hub service connection string}"
registry_manager = IoTHubRegistryManager.from_connection_string(IoTHubConnectionString)
使用 Microsoft Entra 进行连接
使用 Microsoft Entra 的后端应用必须在连接到 IoT 中心之前成功完成身份验证并获取安全令牌凭据。 此令牌将传递给 IoT 中心连接方法。 有关为 IoT 中心设置和使用 Microsoft Entra 的一般信息,请参阅使用 Microsoft Entra ID 控制对 IoT 中心的访问。
配置 Microsoft Entra 应用
必须设置一个针对首选身份验证凭据进行配置的 Microsoft Entra 应用。 该应用包含由后端应用程序用来进行身份验证的参数,例如客户端机密。 可用的应用身份验证配置如下:
- 客户端机密
- 证书
- 联合标识凭据
根据正在执行的操作,Microsoft Entra 应用可能需要特定的角色权限。 例如,需要 IoT 中心孪生参与者角色才能启用对 IoT 中心设备和模块孪生的读写访问权限。 有关详细信息,请参阅使用 Azure RBAC 角色分配管理对 IoT 中心的访问。
有关设置 Microsoft Entra 应用的详细信息,请参阅快速入门:将应用程序注册到 Microsoft 标识平台。
使用 DefaultAzureCredential 进行身份验证
使用 Microsoft Entra 对后端应用程序进行身份验证的最简单方法是使用 DefaultAzureCredential,但建议在生产环境中使用其他方法,包括特定的 TokenCredential
或精简的 ChainedTokenCredential
。 为简单起见,本部分介绍如何使用 DefaultAzureCredential
和客户端机密进行身份验证。 有关使用 DefaultAzureCredential
的利弊的详细信息,请参阅 DefaultAzureCredential 的使用指南。
DefaultAzureCredential
支持不同的身份验证机制,并根据其执行环境确定相应的凭据类型。 它会尝试按顺序使用多种凭据类型,直到找到有效的凭据。
Microsoft Entra 需要这些 NuGet 包和相应的 using
语句:
- Azure.Core
- Azure.Identity
using Azure.Core;
using Azure.Identity;
在此示例中,Microsoft Entra 应用注册客户端机密、客户端 ID 和租户 ID 将添加到环境变量中。 DefaultAzureCredential
使用这些环境变量对应用程序进行身份验证。 成功 Microsoft Entra 身份验证的结果是传递给 IoT 中心连接方法的安全令牌凭据。
string clientSecretValue = "xxxxxxxxxxxxxxx";
string clientID = "xxxxxxxxxxxxxx";
string tenantID = "xxxxxxxxxxxxx";
Environment.SetEnvironmentVariable("AZURE_CLIENT_SECRET", clientSecretValue);
Environment.SetEnvironmentVariable("AZURE_CLIENT_ID", clientID);
Environment.SetEnvironmentVariable("AZURE_TENANT_ID", tenantID);
TokenCredential tokenCredential = new DefaultAzureCredential();
然后可以将生成的 TokenCredential 传递给任何接受 Microsoft Entra 凭据的 SDK 客户端的 IoT 中心连接方法:
在此示例中,TokenCredential
将传递给 ServiceClient.Create
,以创建 ServiceClient 连接对象。
string hostname = "xxxxxxxxxx.azure-devices.net";
using var serviceClient = ServiceClient.Create(hostname, tokenCredential, TransportType.Amqp);
在此示例中,TokenCredential
将传递给 RegistryManager.Create
,以创建 RegistryManager 对象。
string hostname = "xxxxxxxxxx.azure-devices.net";
registryManager = RegistryManager.Create(hostname, tokenCredential);
代码示例
有关 Microsoft Entra 服务身份验证的有效示例,请参阅基于角色的身份验证示例。
生成并发送消息
使用 send_c2d_message 通过云(IoT 中心)向设备发送消息。
send_c2d_message
使用以下参数:
deviceID
- 目标设备的字符串标识符。message
- 云到设备消息。 消息的类型为str
(字符串)。properties
-dict
类型属性的可选集合。 属性可以包含应用程序属性和系统属性。 默认值为{}
。
此示例会将测试消息发送到目标设备。
# define the device ID
deviceID = "Device-1"
# define the message
message = "{\"c2d test message\"}"
# include optional properties
props={}
props.update(messageId = "message1")
props.update(prop1 = "test property-1")
props.update(prop1 = "test property-2")
prop_text = "Test message"
props.update(testProperty = prop_text)
# send the message through the cloud (IoT Hub) to the device
registry_manager.send_c2d_message(deviceID, message, properties=props)
SDK 发送消息示例
适用于 Python 的 Azure IoT SDK 提供了一个服务应用的工作示例,演示如何发送云到设备的消息。 有关详细信息,请参阅 send_message.py,该示例演示了如何发送云到设备的消息。
创建设备应用程序
本部分介绍如何使用适用于 Node.js 的 Azure IoT SDK 中的 azure-iot-device 类接收云到设备消息。
对于基于 Node.js 的设备应用程序来说,要接收云到设备消息,它必须连接到 IoT 中心,然后设置回调消息处理程序来处理来自 IoT 中心的传入消息。 如果设备到 IoT 中心的消息连接断开,设备应用程序还应能够检测和处理断开连接。
安装 SDK 包
azure-iot-device 包包含与 IoT 设备交互的对象。 运行以下命令,在开发计算机上安装“azure-iot-device”设备 SDK:
npm install azure-iot-device --save
将设备连接到 IoT 中心
设备应用可以使用以下方法向 IoT 中心进行身份验证:
- X.509 证书
- 共享访问密钥
重要
本文包括使用共享访问签名(也称为对称密钥身份验证)连接设备的步骤。 此身份验证方法便于测试和评估,但使用 X.509 证书对设备进行身份验证是一种更安全的方法。 若要了解详细信息,请参阅“安全最佳做法 > 连接安全性”。
使用 X.509 证书进行身份验证
X.509 证书将附加到设备到 IoT 中心的连接传输。
若要配置一个使用 X.509 证书的设备到 IoT 中心的连接,请执行以下操作:
调用 fromConnectionString,以将设备或标识模块连接字符串以及传输类型添加到
Client
对象。 将x509=true
添加到连接字符串,以指示已将证书添加到DeviceClientOptions
。 例如:设备连接字符串:
HostName=xxxxx.azure-devices.net;DeviceId=Device-1;SharedAccessKey=xxxxxxxxxxxxx;x509=true
标识模块连接字符串:
HostName=xxxxx.azure-devices.net;DeviceId=Device-1;ModuleId=Module-1;SharedAccessKey=xxxxxxxxxxxxx;x509=true
配置包含证书详细信息的 JSON 变量,并将其传递给 DeviceClientOptions。
调用 setOptions,将 X.509 证书和密钥(以及可选的密码)添加到客户端传输。
调用 open 以打开从设备到 IoT 中心的连接。
此示例显示 JSON 变量中的证书配置信息。 认证配置 clientOptions
传递给 setOptions
,并使用 open
打开连接。
const Client = require('azure-iot-device').Client;
const Protocol = require('azure-iot-device-mqtt').Mqtt;
// Connection string illustrated for demonstration only. Never hard-code the connection string in production. Instead use an environmental variable or other secure storage.
const connectionString = `HostName=xxxxx.azure-devices.net;DeviceId=Device-1;SharedAccessKey=xxxxxxxxxxxxx;x509=true`
const client = Client.fromConnectionString(connectionString, Protocol);
var clientOptions = {
cert: myX509Certificate,
key: myX509Key,
passphrase: passphrase,
http: {
receivePolicy: {
interval: 10
}
}
}
client.setOptions(clientOptions);
client.open(connectCallback);
有关证书身份验证的详细信息,请参阅:
代码示例
有关设备 X.509 证书身份验证的工作示例,请参阅简单示例设备 X.509。
使用共享访问密钥进行身份验证
选择传输协议
Client
对象支持以下属性:
Amqp
Http
- 使用Http
时,Client
实例不会频繁地检查来自 IoT 中心的消息(至少每 25 分钟一次)。Mqtt
MqttWs
AmqpWs
在开发计算机上安装所需的传输协议。
例如,以下命令安装 Amqp
协议:
npm install azure-iot-device-amqp --save
有关 MQTT、AMQP 和 HTTPS 支持之间的差异的详细信息,请参阅云到设备通信指南和选择通信协议。
此示例将 AMQP 协议分配给 Protocol
变量。 系统会将此协议变量传递给本文“添加连接字符串”部分中的 Client.fromConnectionString
方法。
const Protocol = require('azure-iot-device-mqtt').Amqp;
消息完成、拒绝和放弃功能
可以根据所选协议使用消息完成、拒绝和放弃方法。
AMQP 和 HTTP
AMQP 和 HTTP 传输可以完成、拒绝或放弃消息:
- 完成 - 若要完成消息,发送云到设备消息的服务会收到对方已接收消息的通知。 IoT 中心从消息队列中删除消息。 该方法采用
client.complete(message, callback function)
形式。 - 拒绝 - 若要拒绝消息,发送云到设备消息的服务会收到通知,通知内容为“设备尚未处理该消息”。 IoT 中心会永久删除设备队列中的消息。 该方法采用
client.reject(message, callback function)
形式。 - 放弃 - 若要放弃消息,IoT 中心会立即尝试重新发送消息。 IoT 中心会保留设备队列中的消息以供将来使用。 该方法采用
client.abandon(message, callback function)
形式。
MQTT
MQTT 不支持消息完成、拒绝或放弃函数。 相反,MQTT 默认接受消息,并从 IoT 中心消息队列中删除该消息。
重新发送尝试
如果发生阻止设备完成、放弃或拒绝消息的情况,IoT 中心会在固定的超时期限过后再次对消息排队以进行传递。 因此,设备应用中的消息处理逻辑必须是幂等的,这样,多次接收相同的消息才会生成相同的结果。
创建客户端对象
使用已安装的包创建 Client
对象。
例如:
const Client = require('azure-iot-device').Client;
创建协议对象
使用已安装的传输包创建 Protocol
对象。
此示例分配 AMQP 协议:
const Protocol = require('azure-iot-device-amqp').Amqp;
添加设备连接字符串和传输协议
调用 fromConnectionString 以提供设备连接参数:
- connStr - 设备连接字符串。
- transportCtor - 传输协议。
此示例使用 Amqp
传输协议:
const deviceConnectionString = "{IoT hub device connection string}"
const Protocol = require('azure-iot-device-mqtt').Amqp;
let client = Client.fromConnectionString(deviceConnectionString, Protocol);
创建传入消息处理程序
为每个传入消息调用消息处理程序。
成功接收消息后,如果使用 AMQP 或 HTTP 传输,则调用 client.complete
方法通知 IoT 中心可以从消息队列中删除该消息。
例如,此消息处理程序将消息 ID 和消息正文输出到控制台,然后调用 client.complete
通知 IoT 中心它已处理该消息并且可以安全地将其从设备队列中删除。 如果使用 MQTT 传输,则不需要调用 complete
并且可以省略它。 AMQP 或 HTTPS 传输需要调用 complete
。
function messageHandler(msg) {
console.log('Id: ' + msg.messageId + ' Body: ' + msg.data);
client.complete(msg, printResultFor('completed'));
}
创建连接中断处理程序
连接中断是调用中断处理程序。 中断处理程序可用于实现重新连接代码。
此示例捕获并显示控制台的断开连接错误消息。
function disconnectHandler() {
clearInterval(sendInterval);
sendInterval = null;
client.open().catch((err) => {
console.error(err.message);
});
}
添加事件侦听器
可以使用 .on 方法指定这些事件监听器。
- 连接处理程序
- 错误处理程序
- 中断处理程序
- 消息处理程序
此示例包括前面定义的消息和断开连接处理程序。
client.on('connect', connectHandler);
client.on('error', errorHandler);
client.on('disconnect', disconnectHandler);
client.on('message', messageHandler);
打开到 IoT 中心的连接
使用 open 方法打开 IoT 设备和 IoT 中心之间的连接。
使用 .catch(err)
捕获错误和调用处理程序代码。
例如:
client.open()
.catch((err) => {
console.error('Could not connect: ' + err.message);
});
SDK 设备示例
适用于 Node.js 的 Azure IoT SDK 提供了用于处理消息接收任务的设备应用的工作示例。 有关详细信息,请参阅:
simple_sample_device - 连接到 IoT 中心并接收云到设备消息的设备应用。
创建后端应用程序
本部分介绍如何发送云到设备的消息。 如前所述,解决方案后端应用程序会连接到 IoT 中心,并将消息发送到使用目标设备编码的 IoT 中心。 IoT 中心将传入的消息存储到其消息队列中,然后消息从 IoT 中心消息队列传递到目标设备。
解决方案后端应用程序还可以请求并接收发送到 IoT 中心的消息的传递反馈,该消息旨在通过消息队列传送到设备。
安装服务 SDK 包
azure-iothub 包包含与 IoT 中心交互的对象。 本文介绍了通过 IoT 中心将消息从应用程序发送到设备的 Client
类代码。
运行以下命令,在开发计算机上安装“azure-iothub”:
npm install azure-iothub --save
加载客户端和消息模块
使用 azure-iothub
包中的 Client
类声明 Client
对象。
使用 azure-iot-common
包中的 Message
类声明 Message
对象。
'use strict';
var Client = require('azure-iothub').Client;
var Message = require('azure-iot-common').Message;
连接到 IoT 中心
可使用以下方法将后端服务连接到 IoT 中心:
- 共享访问策略
- Microsoft Entra
重要
本文介绍使用共享访问签名连接到服务的步骤。 虽然可使用此身份验证方法进行测试和评估,但使用 Microsoft Entra ID 或托管标识对设备进行身份验证是一种更安全的方法。 有关详细信息,请参阅安全最佳做法和云安全。
使用共享访问策略进行连接
使用 fromConnectionString 连接到 IoT 中心。
在此示例中,我们使用 Amqp
传输类型创建 serviceClient
对象。
var connectionString = '{IoT hub device connection string}';
var serviceClient = Client.fromConnectionString(connectionString,`Amqp`);
打开客户端连接
调用 Client
open 方法以打开应用程序与 IoT 中心之间的连接。
无论是否有指定在 open
操作完成时调用的回调函数,均可调用 open
。
在此示例中,open
方法包括一个可选的 err
打开连接回调函数。 如果出现打开错误,则返回一个错误对象。 如果打开连接成功,则返回 null
回调值。
serviceClient.open(function (err)
if (err)
console.error('Could not connect: ' + err.message);
使用 Microsoft Entra 进行连接
使用 Microsoft Entra 的后端应用必须在连接到 IoT 中心之前成功完成身份验证并获取安全令牌凭据。 此令牌将传递给 IoT 中心连接方法。 有关为 IoT 中心设置和使用 Microsoft Entra 的一般信息,请参阅使用 Microsoft Entra ID 控制对 IoT 中心的访问。
有关 Node.js SDK 身份验证的概述,请参阅:
配置 Microsoft Entra 应用
必须设置一个针对首选身份验证凭据进行配置的 Microsoft Entra 应用。 该应用包含由后端应用程序用来进行身份验证的参数,例如客户端机密。 可用的应用身份验证配置如下:
- 客户端机密
- 证书
- 联合标识凭据
根据正在执行的操作,Microsoft Entra 应用可能需要特定的角色权限。 例如,需要 IoT 中心孪生参与者角色才能启用对 IoT 中心设备和模块孪生的读写访问权限。 有关详细信息,请参阅使用 Azure RBAC 角色分配管理对 IoT 中心的访问。
有关设置 Microsoft Entra 应用的详细信息,请参阅快速入门:将应用程序注册到 Microsoft 标识平台。
使用 DefaultAzureCredential 进行身份验证
使用 Microsoft Entra 对后端应用程序进行身份验证的最简单方法是使用 DefaultAzureCredential,但建议在生产环境中使用其他方法,包括特定的 TokenCredential
或精简的 ChainedTokenCredential
。 为简单起见,本部分介绍如何使用 DefaultAzureCredential
和客户端机密进行身份验证。
有关使用 DefaultAzureCredential
的利弊的详细信息,请参阅适用于 JavaScript 的 Azure 标识客户端库中的凭据链
DefaultAzureCredential 支持不同的身份验证机制,并根据其执行环境确定相应的凭据类型。 它会尝试按顺序使用多种凭据类型,直到找到有效的凭据。
Microsoft Entra 需要此包:
npm install --save @azure/identity
在此示例中,Microsoft Entra 应用注册客户端机密、客户端 ID 和租户 ID 已添加到环境变量中。 DefaultAzureCredential
使用这些环境变量对应用程序进行身份验证。 成功 Microsoft Entra 身份验证的结果是传递给 IoT 中心连接方法的安全令牌凭据。
import { DefaultAzureCredential } from "@azure/identity";
// Azure SDK clients accept the credential as a parameter
const credential = new DefaultAzureCredential();
然后可以将生成的凭据令牌传递给 fromTokenCredential,以连接到任何接受 Microsoft Entra 凭据的 SDK 客户端的 IoT 中心:
fromTokenCredential
需要两个参数:
- Azure 服务 URL - Azure 服务 URL 应采用
{Your Entra domain URL}.azure-devices.net
格式(不包括https://
前缀)。 例如,MyAzureDomain.azure-devices.net
。 - Azure 凭据令牌
在此示例中,使用 DefaultAzureCredential
获取 Azure 凭据。 然后将 Azure 域 URL 和凭据提供给 Registry.fromTokenCredential
,以便与 IoT 中心创建连接。
const { DefaultAzureCredential } = require("@azure/identity");
let Registry = require('azure-iothub').Registry;
// Define the client secret values
clientSecretValue = 'xxxxxxxxxxxxxxx'
clientID = 'xxxxxxxxxxxxxx'
tenantID = 'xxxxxxxxxxxxx'
// Set environment variables
process.env['AZURE_CLIENT_SECRET'] = clientSecretValue;
process.env['AZURE_CLIENT_ID'] = clientID;
process.env['AZURE_TENANT_ID'] = tenantID;
// Acquire a credential object
const credential = new DefaultAzureCredential()
// Create an instance of the IoTHub registry
hostName = 'MyAzureDomain.azure-devices.net';
let registry = Registry.fromTokenCredential(hostName,credential);
代码示例
有关 Microsoft Entra 服务身份验证的有效示例,请参阅 Azure 标识示例。
创建一条消息
消息对象包括异步云到设备消息。 消息功能在 AMQP、MQTT 和 HTTP 上的工作方式相同。
消息对象支持多个属性,包括这些属性。 请参阅消息属性,获取完整列表。
ack
- 发送反馈。 在下一部分中介绍。properties
- 包含用于存储自定义消息属性的字符串键和值的映射。- messageId - 用于关联双向通信。
在消息对象实例化时添加消息正文。 在此示例中,将添加一条 'Cloud to device message.'
消息。
var message = new Message('Cloud to device message.');
message.ack = 'full';
message.messageId = "My Message ID";
发送确认
发送程序可以向 IoT 中心请求每条云到设备消息的送达(或过期)确认。 此选项使发送程序能够使用通知、重试或补偿逻辑。 若要了解消息反馈操作和属性的完整描述,请参阅消息反馈。
每条消息都必须包含一个发送确认 ack 属性的值,以便接收发送反馈。 ack
属性可以是下列值之一:
none(默认):不生成反馈消息。
sent
:如果消息已完成,则会收到反馈消息。如果消息过期(或达到发送次数上限)且设备尚未完成,则会收到反馈消息。
full
:已发送结果和未发送结果的反馈。
在此示例中,已将 ack
属性设置为 full
,请求对一条消息同时提供消息已发送和消息未发送的发送反馈。
message.ack = 'full';
链接消息反馈接收器
消息反馈接收器回调函数使用 getFeedbackReceiver 链接到 Client
。
消息反馈接收器接收两个参数:
- 错误对象(可以为 null)
- AmqpReceiver 对象 - 客户端收到新的反馈消息时发出事件。
此示例函数接收发送反馈消息并将其输出到控制台。
function receiveFeedback(err, receiver){
receiver.on('message', function (msg) {
console.log('Feedback message:')
console.log(msg.getData().toString('utf-8'));
});
}
此代码使用 getFeedbackReceiver
将 receiveFeedback
反馈回调函数链接到服务 Client
对象。
serviceClient.getFeedbackReceiver(receiveFeedback);
定义消息完成结果处理程序
每条消息发送完成后,系统都会调用消息发送完成回调函数。
此示例函数将消息 send
操作结果输出到控制台。 在此示例中,printResultFor
函数将作为参数用于下一节中描述的 send
函数。
function printResultFor(op) {
return function printResult(err, res) {
if (err) console.log(op + ' error: ' + err.toString());
if (res) console.log(op + ' status: ' + res.constructor.name);
};
}
发送邮件
使用 send 函数通过 IoT 中心向设备应用发送异步云到设备消息。
send
支持以下参数:
- deviceID - 目标设备的设备 ID。
- message - 要发送到设备的消息正文。
- done - 操作完成时要调用的可选函数。 使用两个参数调用 Done:
- 错误对象(可以为 null)。
- 传输特定的响应对象,用于日志记录或调试。
此代码调用 send
,通过 IoT 中心将云到设备消息发送到设备应用。 上一节中定义的回调函数 printResultFor
会接收发送确认信息。
var targetDevice = '{device ID}';
serviceClient.send(targetDevice, message, printResultFor('send'));
本示例介绍在设备确认收到云到设备消息时,如何将消息发送到设备,并处理反馈消息:
serviceClient.open(function (err) {
if (err) {
console.error('Could not connect: ' + err.message);
} else {
console.log('Service client connected');
serviceClient.getFeedbackReceiver(receiveFeedback);
var message = new Message('Cloud to device message.');
message.ack = 'full';
message.messageId = "My Message ID";
console.log('Sending message: ' + message.getData());
serviceClient.send(targetDevice, message, printResultFor('send'));
}
});
SDK 发送消息示例
适用于 Node.js 的 Azure IoT SDK 提供了用于处理消息发送任务的设备应用的工作示例。 有关详细信息,请参阅:
send_c2d_message.js - 通过 IoT 中心向设备发送 C2D 消息。
连接重新连接策略
本文不演示设备到 IoT 中心连接或外部应用程序到 IoT 中心连接的消息重试策略。 在生产代码中,应该实施连接重试策略,如管理设备重新连接以创建可复原应用程序中所述。
消息保留时间、重试次数和发送次数上限
如从 IoT 中心发送云到设备消息中所述,可以使用门户 IoT 中心配置选项或 Azure CLI 查看和配置以下消息值的默认值。 这些配置选项可能会影响消息发送和反馈。
- 默认 TTL(生存时间)- 消息在被 IoT 中心判定为到期之前可用于设备的时长。
- 反馈保留时间 - 对于用于云到设备消息的过期或成功送达的反馈,IoT 中心的保存时长。
- IoT 中心尝试将云到设备消息发送到设备的次数。