练习 - 继续处理事件并将数据存储在 Azure Cosmos DB 中

已完成

第二个函数可以侦听 Azure 事件中心中特定命名空间的事件,对其进行处理,然后将其存储在使用 Azure Cosmos DB 创建的数据库中。

使用 Azure Cosmos DB 创建数据库

若要创建数据库,请使用 az cosmosdb create 命令。 此命令使用一个 Azure Cosmos DB 数据库帐户、一个数据库和一个 SQL 容器。

az cosmosdb create \
    --resource-group $RESOURCE_GROUP \
    --name $COSMOS_DB_ACCOUNT
az cosmosdb sql database create \
    --resource-group $RESOURCE_GROUP \
    --account-name $COSMOS_DB_ACCOUNT \
    --name TelemetryDb
az cosmosdb sql container create \
    --resource-group $RESOURCE_GROUP \
    --account-name $COSMOS_DB_ACCOUNT \
    --database-name TelemetryDb \
    --name TelemetryInfo \
    --partition-key-path '/temperatureStatus'

在我们的方案中,需要注意温度这个指标。 因此,我们将 temperatureStatus 定义为分区键。

构建、配置和部署另一个 Azure 函数

使用事件中心可以从 MB 量级的数据流着手,然后逐步扩展到 GB 甚至 TB 量级的处理。 自动扩充功能是用于根据用量需求扩展吞吐量单位数的众多选项之一。

每个函数的消耗应用都有单独的事件流视图。 使用者根据自身的步调和情况独立读取流。

在我们的场景中,你需要创建一个消耗 Azure 函数作为示例。 要创建该函数,应遵循最佳做法,独立创建该函数,并使用其自己的存储帐户和绑定来实现松散的耦合和可伸缩性。

az storage account create \
    --resource-group $RESOURCE_GROUP \
    --name $STORAGE_ACCOUNT"c" \
    --sku Standard_LRS
az functionapp create \
    --resource-group $RESOURCE_GROUP \
    --name $FUNCTION_APP"-c"\
    --storage-account $STORAGE_ACCOUNT"c" \
    --consumption-plan-location $LOCATION \
    --runtime java \
    --functions-version 4

检索连接字符串

使用者函数需要了解其存储帐户和事件中心。 它还需要了解将处理后的事件写入到的数据库。

AZURE_WEB_JOBS_STORAGE=$( \
    az storage account show-connection-string \
        --resource-group $RESOURCE_GROUP \
        --name $STORAGE_ACCOUNT"c" \
        --query connectionString \
        --output tsv)
echo $AZURE_WEB_JOBS_STORAGE
COSMOS_DB_CONNECTION_STRING=$( \
    az cosmosdb keys list \
        --resource-group $RESOURCE_GROUP \
        --name $COSMOS_DB_ACCOUNT \
        --type connection-strings \
        --query 'connectionStrings[0].connectionString' \
        --output tsv)
echo $COSMOS_DB_CONNECTION_STRING

可以使用 echo $EVENT_HUB_CONNECTION_STRING 命令检查变量是否仍然设置正确。 否则,请重新运行以下命令:

EVENT_HUB_CONNECTION_STRING=$( \
    az eventhubs eventhub authorization-rule keys list \
        --resource-group $RESOURCE_GROUP \
        --name $EVENT_HUB_AUTHORIZATION_RULE \
        --eventhub-name $EVENT_HUB_NAME \
        --namespace-name $EVENT_HUB_NAMESPACE \
        --query primaryConnectionString \
        --output tsv)
echo $EVENT_HUB_CONNECTION_STRING

这些连接字符串需要存储在 Azure Functions 帐户中的应用程序设置中。

az functionapp config appsettings set \
    --resource-group $RESOURCE_GROUP \
    --name $FUNCTION_APP"-c" \
    --settings \
        AzureWebJobsStorage=$AZURE_WEB_JOBS_STORAGE \
        EventHubConnectionString=$EVENT_HUB_CONNECTION_STRING \
        CosmosDBConnectionString=$COSMOS_DB_CONNECTION_STRING

注意

对于生产环境,可以使用 Azure Key Vault 实例来存储和管理连接字符串。

创建函数应用程序

在创建下一个函数之前,确保位于正确的文件夹中。

cd ..
mvn archetype:generate --batch-mode \
    -DarchetypeGroupId=com.microsoft.azure \
    -DarchetypeArtifactId=azure-functions-archetype \
    -DappName=$FUNCTION_APP"-c" \
    -DresourceGroup=$RESOURCE_GROUP \
    -DappRegion=$LOCATION \
    -DappServicePlanName=$LOCATION"plan" \
    -DgroupId=com.learn \
    -DartifactId=telemetry-functions-consumer

与上次练习一样,此命令会创建一个应用程序。 你需要删除测试文件,使用 fetch-app-settings 命令更新 local.settings.file,然后替换现有的 Function.java 文件。

cd telemetry-functions-consumer
rm -r src/test

更新本地执行和调试的本地设置。

func azure functionapp fetch-app-settings $FUNCTION_APP"-c"

接下来,打开 Function.java 文件并将其内容替换为以下代码:

package com.learn;

import com.learn.TelemetryItem.status;
import com.microsoft.azure.functions.annotation.FunctionName;
import com.microsoft.azure.functions.ExecutionContext;
import com.microsoft.azure.functions.OutputBinding;
import com.microsoft.azure.functions.annotation.Cardinality;
import com.microsoft.azure.functions.annotation.CosmosDBOutput;
import com.microsoft.azure.functions.annotation.EventHubTrigger;

public class Function {

    @FunctionName("processSensorData")
    public void processSensorData(
        @EventHubTrigger(
            name = "msg",
            eventHubName = "", // blank because the value is included in the connection string
            cardinality = Cardinality.ONE,
            connection = "EventHubConnectionString")
            TelemetryItem item,
        @CosmosDBOutput(
            name = "databaseOutput",
            databaseName = "TelemetryDb",
            collectionName = "TelemetryInfo",
            connectionStringSetting = "CosmosDBConnectionString")
            OutputBinding<TelemetryItem> document,
        final ExecutionContext context) {
    
        context.getLogger().info("Event hub message received: " + item.toString());
    
        if (item.getPressure() > 30) {
            item.setNormalPressure(false);
        } else {
            item.setNormalPressure(true);
        }
    
        if (item.getTemperature() < 40) {
            item.setTemperatureStatus(status.COOL);
        } else if (item.getTemperature() > 90) {
            item.setTemperatureStatus(status.HOT);
        } else {
            item.setTemperatureStatus(status.WARM);
        }
    
        document.setValue(item);
    }
}

在与 Function.java 相同的位置创建另一个名为 TelemetryItem.java 的新文件,并添加以下代码:

package com.learn;

public class TelemetryItem {

    private String id;
    private double temperature;
    private double pressure;
    private boolean isNormalPressure;
    private status temperatureStatus;
    static enum status {
        COOL,
        WARM,
        HOT
    }

    public TelemetryItem(double temperature, double pressure) {
        this.temperature = temperature;
        this.pressure = pressure;
    }

    public String getId() {
        return id;
    }

    public double getTemperature() {
        return temperature;
    }

    public double getPressure() {
        return pressure;
    }

    @Override
    public String toString() {
        return "TelemetryItem={id=" + id + ",temperature="
            + temperature + ",pressure=" + pressure + "}";
    }

    public boolean isNormalPressure() {
        return isNormalPressure;
    }

    public void setNormalPressure(boolean isNormal) {
        this.isNormalPressure = isNormal;
    }

    public status getTemperatureStatus() {
        return temperatureStatus;
    }

    public void setTemperatureStatus(status temperatureStatus) {
        this.temperatureStatus = temperatureStatus;
    }
}

当事件中心收到消息时,它会生成一个事件。 processSensorData 函数在收到事件时运行。 然后,它会处理事件数据,并使用 Azure Cosmos DB 的输出绑定将结果发送到数据库。 我们将再次使用 TelemetryItem.java 类。 TelemetryItem 对象可以看作是此事件驱动系统参与者之间的消费者驱动契约。

在本地运行

利用 Azure Functions,可以接收来自世界各地的事件。 是的,你甚至可以在开发计算机上本地接收事件!

mvn clean package
mvn azure-functions:run

生成和启动消息后,你会在函数运行时看到传入的事件:

[2021-01-19T16:45:24.709Z] Executing 'Functions.processSensorData' (Reason='(null)', Id=87354afa-abf4-4963-bd44-0c1421048240)
[2021-01-19T16:45:24.712Z] Event hub message received: TelemetryItem={id=null,temperature=21.653044570769897,pressure=36.061288095436126}
[2021-01-19T16:45:24.712Z] Function "processSensorData" (Id: 87354afa-abf4-4963-bd44-0c1421048240) invoked by Java Worker

在 Azure 门户中,转到 Azure Cosmos DB 帐户。 选择“数据资源管理器”,选择“TelemetryInfo”,然后选择“项”在数据到达时查看数据。

Screenshot that shows TelemetryInfo in Azure Cosmos DB Data Explorer.

在 Azure 上部署

现在,让我们将整个工作负载转移到云中。 要将函数部署到 Azure Functions,需要使用 Maven 命令 mvn azure-functions:deploy。 确保仍处于正确的存储库 telemetry-functions 中。

mvn azure-functions:deploy

很好! 我们通过将数据发送到事件中心并使用另一个独立功能消耗数据来部署整个遥测场景。 该功能处理数据,然后将结果存储在由 Azure Cosmos DB 创建的数据库中。 如何确保应用程序满足预定义的需求? 方法是使用监视。