你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

教程:使用 Dapr 和 MQTT 代理生成事件驱动应用

在本演练中,你会将 Dapr 应用程序部署到群集。 Dapr 应用程序使用发布到 MQTT 代理的模拟 MQTT 数据,应用开窗函数,然后将结果发布回 MQTT 代理。 发布的输出表示如何在边缘聚合高量数据以减少消息频率和大小。 Dapr 应用程序是无状态的,并使用 MQTT 代理状态存储来缓存窗口计算所需的过去值。

Dapr 应用程序会执行以下步骤:

  1. 订阅有关传感器数据的 sensor/data 主题。
  2. 接收到有关该主题的数据时,数据将发布到 MQTT 代理状态存储。
  3. 它会每隔 10 秒从状态存储中提取一次数据,并计算过去 30 秒内添加时间戳的任何传感器数据的最小值、最大值、平均值、中位数和第 75 百分位值
  4. 超过 30 秒的数据将会从状态存储区过期。
  5. 结果以 JSON 格式发布到 sensor/window_data 主题。

注意

本教程禁用 Dapr CloudEvents,这会支持它使用原始 MQTT 进行发布和订阅。

先决条件

部署 Dapr 应用程序

此时可以部署 Dapr 应用程序。 注册组件不会部署打包在容器中的关联二进制文件。 若要连同应用程序一起部署二进制文件,可以使用部署将容器化 Dapr 应用程序和两个组件组合在一起。

首先,创建使用以下定义的 yaml 文件:

组件 说明
volumes.mqtt-client-token 用于通过 MQTT 中转站和状态存储对 Dapr 可插入组件进行身份验证的 SAT
volumes.aio-internal-ca-cert-chain 用于验证 MQTT 代理 TLS 证书的信任链
containers.mq-event-driven 预生成的 Dapr 应用程序容器。
  1. 将以下部署 yaml 保存到名为 app.yaml 的文件:

    apiVersion: v1
    kind: ServiceAccount
    metadata:
      name: dapr-client
      namespace: azure-iot-operations
      annotations:
        aio-broker-auth/group: dapr-workload
    ---    
    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: mq-event-driven-dapr
      namespace: azure-iot-operations
    spec:
      selector:
        matchLabels:
          app: mq-event-driven-dapr
      template:
        metadata:
          labels:
            app: mq-event-driven-dapr
          annotations:
            dapr.io/enabled: "true"
            dapr.io/inject-pluggable-components: "true"
            dapr.io/app-id: "mq-event-driven-dapr"
            dapr.io/app-port: "6001"
            dapr.io/app-protocol: "grpc"
        spec:
          serviceAccountName: dapr-client
    
          volumes:
          # SAT token used to authenticate between Dapr and the MQTT broker
          - name: mqtt-client-token
            projected:
              sources:
                - serviceAccountToken:
                    path: mqtt-client-token
                    audience: aio-internal
                    expirationSeconds: 86400
    
          # Certificate chain for Dapr to validate the MQTT broker
          - name: aio-ca-trust-bundle
            configMap:
              name: azure-iot-operations-aio-ca-trust-bundle
    
          containers:
          - name: mq-event-driven-dapr
            image: ghcr.io/azure-samples/explore-iot-operations/mq-event-driven-dapr:latest
    
  2. 通过运行以下命令来部署应用程序:

    kubectl apply -f app.yaml
    
  3. 确认应用程序已成功部署。 Pod 应报告所有容器在短间隔后准备就绪,如以下命令所示:

    kubectl get pods -l app=mq-event-driven-dapr -n azure-iot-operations
    

    返回以下输出:

    NAME                         READY   STATUS              RESTARTS   AGE
    mq-event-driven-dapr         3/3     Running             0          30s
    

部署模拟器

通过部署 Kubernetes 工作负载来模拟测试数据。 它通过使用 sensor/data 主题上的 MQTT 客户端定期向 MQTT 中转站发送样本温度、振动和压力读数来模拟传感器。

  1. 从“浏览 IoT 操作”存储库部署模拟器:

    kubectl apply -f https://raw.githubusercontent.com/Azure-Samples/explore-iot-operations/main/tutorials/mq-event-driven-dapr/simulate-data.yaml    
    
  2. 确认模拟器正在正确运行:

    kubectl logs deployment/mqtt-publisher-deployment -n azure-iot-operations -f
    

    返回以下输出:

    Get:1 http://deb.debian.org/debian stable InRelease [151 kB]
    Get:2 http://deb.debian.org/debian stable-updates InRelease [52.1 kB]
    Get:3 http://deb.debian.org/debian-security stable-security InRelease [48.0 kB]
    Get:4 http://deb.debian.org/debian stable/main amd64 Packages [8780 kB]
    Get:5 http://deb.debian.org/debian stable-updates/main amd64 Packages [6668 B]
    Get:6 http://deb.debian.org/debian-security stable-security/main amd64 Packages [101 kB]
    Fetched 9139 kB in 3s (3570 kB/s)
    ...
    Messages published in the last 10 seconds: 10
    Messages published in the last 10 seconds: 10
    Messages published in the last 10 seconds: 10
    

部署 MQTT 客户端

若要验证 MQTT 网桥是否正常工作,请将 MQTT 客户端部署到群集。

  1. 在名为 client.yaml 的新文件中,指定客户端部署:

    apiVersion: v1
    kind: ServiceAccount
    metadata:
      name: mqtt-client
      namespace: azure-iot-operations
    ---
    apiVersion: v1
    kind: Pod
    metadata:
      name: mqtt-client
      namespace: azure-iot-operations
    spec:
      serviceAccountName: mqtt-client
      containers:
      - image: alpine
        name: mqtt-client
        command: ["sh", "-c"]
        args: ["apk add mosquitto-clients mqttui && sleep infinity"]
        volumeMounts:
        - name: mqtt-client-token
          mountPath: /var/run/secrets/tokens
        - name: aio-ca-trust-bundle
          mountPath: /var/run/certs/aio-internal-ca-cert/
      volumes:
      - name: mqtt-client-token
        projected:
          sources:
          - serviceAccountToken:
              path: mqtt-client-token
              audience: aio-internal
              expirationSeconds: 86400
      - name: aio-ca-trust-bundle
        configMap:
          name: azure-iot-operations-aio-ca-trust-bundle
    
  2. 使用 kubectl 应用部署文件:

    kubectl apply -f client.yaml
    

    验证输出:

    pod/mqtt-client created
    

验证 Dapr 应用程序输出

  1. 向 Mosquitto 客户端 Pod 打开 shell:

    kubectl exec --stdin --tty mqtt-client -n azure-iot-operations -- sh
    
  2. 订阅 sensor/window_data 主题,以观察 Dapr 应用程序的已发布输出:

    mosquitto_sub -L mqtt://aio-broker/sensor/window_data
    
  3. 验证应用程序是否每 10 秒为各种传感器输出滑动窗口计算:

    {
        "timestamp": "2023-11-16T21:59:53.939690+00:00",
        "window_size": 30,
        "temperature": {
            "min": 553.024,
            "max": 598.907,
            "mean": 576.4647857142858,
            "median": 577.4905,
            "75_per": 585.96125,
            "count": 28
        },
        "pressure": {
            "min": 290.605,
            "max": 299.781,
            "mean": 295.521,
            "median": 295.648,
            "75_per": 297.64050000000003,
            "count": 28
        },
        "vibration": {
            "min": 0.00124192,
            "max": 0.00491257,
            "mean": 0.0031171810714285715,
            "median": 0.003199235,
            "75_per": 0.0038769150000000003,
            "count": 28
        }
    }
    

可选 - 创建 Dapr 应用程序

此教程使用 Dapr 应用程序的预生成容器。 要自行修改和生成代码,请执行以下步骤:

先决条件

  1. Docker - 用于生成应用程序容器
  2. 容器注册表 - 用于托管应用程序容器

构建应用程序

  1. 克隆“浏览 IoT 操作”存储库

    git clone https://github.com/Azure-Samples/explore-iot-operations
    
  2. 更改为 Dapr 教程目录:

    cd explore-iot-operations/tutorials/mq-event-driven-dapr/src
    
  3. 生成 Docker 映像:

    docker build docker build . -t mq-event-driven-dapr
    
  4. 若要在 Kubernetes 群集中使用应用程序,需要将映像推送到容器注册表,例如 Azure 容器注册表。 还可以推送到本地容器注册表,例如 minikubeDocker

    docker tag mq-event-driven-dapr <container-alias>
    docker push <container-alias>
    
  5. 更新 app.yaml 以拉取新创建的映像。

故障排除

如果应用程序未启动,或者你看到了 CrashLoopBackoff 中的容器,则 daprd 容器日志通常包含有用的信息。

运行以下命令以查看 daprd 组件日志:

kubectl logs -l app=mq-event-driven-dapr -n azure-iot-operations -c daprd

后续步骤