你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
教程:使用 Dapr 和 MQTT 代理生成事件驱动应用
在本演练中,你会将 Dapr 应用程序部署到群集。 Dapr 应用程序使用发布到 MQTT 代理的模拟 MQTT 数据,应用开窗函数,然后将结果发布回 MQTT 代理。 发布的输出表示如何在边缘聚合高量数据以减少消息频率和大小。 Dapr 应用程序是无状态的,并使用 MQTT 代理状态存储来缓存窗口计算所需的过去值。
Dapr 应用程序会执行以下步骤:
- 订阅有关传感器数据的
sensor/data
主题。 - 接收到有关该主题的数据时,数据将发布到 MQTT 代理状态存储。
- 它会每隔 10 秒从状态存储中提取一次数据,并计算过去 30 秒内添加时间戳的任何传感器数据的最小值、最大值、平均值、中位数和第 75 百分位值。
- 超过 30 秒的数据将会从状态存储区过期。
- 结果以 JSON 格式发布到
sensor/window_data
主题。
注意
本教程禁用 Dapr CloudEvents,这会支持它使用原始 MQTT 进行发布和订阅。
先决条件
- 已安装 Azure IoT 操作 - 快速入门:使用 K3s 在 GitHub Codespaces 中运行 Azure IoT 操作
- 已安装 MQTT 代理 Dapr 组件 - 安装 MQTT 代理 Dapr 组件
部署 Dapr 应用程序
此时可以部署 Dapr 应用程序。 注册组件不会部署打包在容器中的关联二进制文件。 若要连同应用程序一起部署二进制文件,可以使用部署将容器化 Dapr 应用程序和两个组件组合在一起。
首先,创建使用以下定义的 yaml 文件:
组件 | 说明 |
---|---|
volumes.mqtt-client-token |
用于通过 MQTT 中转站和状态存储对 Dapr 可插入组件进行身份验证的 SAT |
volumes.aio-internal-ca-cert-chain |
用于验证 MQTT 代理 TLS 证书的信任链 |
containers.mq-event-driven |
预生成的 Dapr 应用程序容器。 |
将以下部署 yaml 保存到名为
app.yaml
的文件:apiVersion: v1 kind: ServiceAccount metadata: name: dapr-client namespace: azure-iot-operations annotations: aio-broker-auth/group: dapr-workload --- apiVersion: apps/v1 kind: Deployment metadata: name: mq-event-driven-dapr namespace: azure-iot-operations spec: selector: matchLabels: app: mq-event-driven-dapr template: metadata: labels: app: mq-event-driven-dapr annotations: dapr.io/enabled: "true" dapr.io/inject-pluggable-components: "true" dapr.io/app-id: "mq-event-driven-dapr" dapr.io/app-port: "6001" dapr.io/app-protocol: "grpc" spec: serviceAccountName: dapr-client volumes: # SAT token used to authenticate between Dapr and the MQTT broker - name: mqtt-client-token projected: sources: - serviceAccountToken: path: mqtt-client-token audience: aio-internal expirationSeconds: 86400 # Certificate chain for Dapr to validate the MQTT broker - name: aio-ca-trust-bundle configMap: name: azure-iot-operations-aio-ca-trust-bundle containers: - name: mq-event-driven-dapr image: ghcr.io/azure-samples/explore-iot-operations/mq-event-driven-dapr:latest
通过运行以下命令来部署应用程序:
kubectl apply -f app.yaml
确认应用程序已成功部署。 Pod 应报告所有容器在短间隔后准备就绪,如以下命令所示:
kubectl get pods -l app=mq-event-driven-dapr -n azure-iot-operations
返回以下输出:
NAME READY STATUS RESTARTS AGE mq-event-driven-dapr 3/3 Running 0 30s
部署模拟器
通过部署 Kubernetes 工作负载来模拟测试数据。 它通过使用 sensor/data
主题上的 MQTT 客户端定期向 MQTT 中转站发送样本温度、振动和压力读数来模拟传感器。
从“浏览 IoT 操作”存储库部署模拟器:
kubectl apply -f https://raw.githubusercontent.com/Azure-Samples/explore-iot-operations/main/tutorials/mq-event-driven-dapr/simulate-data.yaml
确认模拟器正在正确运行:
kubectl logs deployment/mqtt-publisher-deployment -n azure-iot-operations -f
返回以下输出:
Get:1 http://deb.debian.org/debian stable InRelease [151 kB] Get:2 http://deb.debian.org/debian stable-updates InRelease [52.1 kB] Get:3 http://deb.debian.org/debian-security stable-security InRelease [48.0 kB] Get:4 http://deb.debian.org/debian stable/main amd64 Packages [8780 kB] Get:5 http://deb.debian.org/debian stable-updates/main amd64 Packages [6668 B] Get:6 http://deb.debian.org/debian-security stable-security/main amd64 Packages [101 kB] Fetched 9139 kB in 3s (3570 kB/s) ... Messages published in the last 10 seconds: 10 Messages published in the last 10 seconds: 10 Messages published in the last 10 seconds: 10
部署 MQTT 客户端
若要验证 MQTT 网桥是否正常工作,请将 MQTT 客户端部署到群集。
在名为
client.yaml
的新文件中,指定客户端部署:apiVersion: v1 kind: ServiceAccount metadata: name: mqtt-client namespace: azure-iot-operations --- apiVersion: v1 kind: Pod metadata: name: mqtt-client namespace: azure-iot-operations spec: serviceAccountName: mqtt-client containers: - image: alpine name: mqtt-client command: ["sh", "-c"] args: ["apk add mosquitto-clients mqttui && sleep infinity"] volumeMounts: - name: mqtt-client-token mountPath: /var/run/secrets/tokens - name: aio-ca-trust-bundle mountPath: /var/run/certs/aio-internal-ca-cert/ volumes: - name: mqtt-client-token projected: sources: - serviceAccountToken: path: mqtt-client-token audience: aio-internal expirationSeconds: 86400 - name: aio-ca-trust-bundle configMap: name: azure-iot-operations-aio-ca-trust-bundle
使用 kubectl 应用部署文件:
kubectl apply -f client.yaml
验证输出:
pod/mqtt-client created
验证 Dapr 应用程序输出
向 Mosquitto 客户端 Pod 打开 shell:
kubectl exec --stdin --tty mqtt-client -n azure-iot-operations -- sh
订阅
sensor/window_data
主题,以观察 Dapr 应用程序的已发布输出:mosquitto_sub -L mqtt://aio-broker/sensor/window_data
验证应用程序是否每 10 秒为各种传感器输出滑动窗口计算:
{ "timestamp": "2023-11-16T21:59:53.939690+00:00", "window_size": 30, "temperature": { "min": 553.024, "max": 598.907, "mean": 576.4647857142858, "median": 577.4905, "75_per": 585.96125, "count": 28 }, "pressure": { "min": 290.605, "max": 299.781, "mean": 295.521, "median": 295.648, "75_per": 297.64050000000003, "count": 28 }, "vibration": { "min": 0.00124192, "max": 0.00491257, "mean": 0.0031171810714285715, "median": 0.003199235, "75_per": 0.0038769150000000003, "count": 28 } }
可选 - 创建 Dapr 应用程序
此教程使用 Dapr 应用程序的预生成容器。 要自行修改和生成代码,请执行以下步骤:
先决条件
- Docker - 用于生成应用程序容器
- 容器注册表 - 用于托管应用程序容器
构建应用程序
克隆“浏览 IoT 操作”存储库:
git clone https://github.com/Azure-Samples/explore-iot-operations
更改为 Dapr 教程目录:
cd explore-iot-operations/tutorials/mq-event-driven-dapr/src
生成 Docker 映像:
docker build docker build . -t mq-event-driven-dapr
若要在 Kubernetes 群集中使用应用程序,需要将映像推送到容器注册表,例如 Azure 容器注册表。 还可以推送到本地容器注册表,例如 minikube 或 Docker。
docker tag mq-event-driven-dapr <container-alias> docker push <container-alias>
更新
app.yaml
以拉取新创建的映像。
故障排除
如果应用程序未启动,或者你看到了 CrashLoopBackoff
中的容器,则 daprd
容器日志通常包含有用的信息。
运行以下命令以查看 daprd 组件日志:
kubectl logs -l app=mq-event-driven-dapr -n azure-iot-operations -c daprd