練習 - 處理事件並將資料儲存在 Azure Cosmos DB 中

已完成

第二個函式可以接聽並處理 Azure 事件中樞內特定命名空間的事件,然後將其儲存在使用 Azure Cosmos DB 建立的資料庫中。

使用 Azure Cosmos DB 建立資料庫

若要建立資料庫,請使用 az cosmosdb create 命令。 此命令會一個 Azure Cosmos DB 帳戶、一個資料庫和一個 SQL 容器。

az cosmosdb create \
    --resource-group $RESOURCE_GROUP \
    --name $COSMOS_DB_ACCOUNT
az cosmosdb sql database create \
    --resource-group $RESOURCE_GROUP \
    --account-name $COSMOS_DB_ACCOUNT \
    --name TelemetryDb
az cosmosdb sql container create \
    --resource-group $RESOURCE_GROUP \
    --account-name $COSMOS_DB_ACCOUNT \
    --database-name TelemetryDb \
    --name TelemetryInfo \
    --partition-key-path '/temperatureStatus'

在我們的案例中,我們對溫度感興趣。 因此,我們會將 temperatureStatus 定義為分割區索引鍵。

建立、設定及部署另一個 Azure 函式

透過事件中樞,您可以先使用以 MB 為單位的資料流,然後成長到 GB 或 TB。 自動擴充功能是可用來調整輸送量單位數以符合您的用量需求的多個選項之一。

每個函式的取用應用程式都有一個個別的事件資料流檢視。 他們會以自己的步調獨立讀取資料流,並使用自己的位移。

對於我們的案例,您將建立一個取用 Azure 函式作為範例。 若要建立此函式,請遵循最佳做法,它應該是獨立的,具有自己的儲存體帳戶以及可鬆散結合及擴縮的繫結。

az storage account create \
    --resource-group $RESOURCE_GROUP \
    --name $STORAGE_ACCOUNT"c" \
    --sku Standard_LRS
az functionapp create \
    --resource-group $RESOURCE_GROUP \
    --name $FUNCTION_APP"-c"\
    --storage-account $STORAGE_ACCOUNT"c" \
    --consumption-plan-location $LOCATION \
    --runtime java \
    --functions-version 4

擷取連接字串

取用者函式必須知道其儲存體帳戶和事件中樞。 它還需要知道將處理過的事件寫入其中的資料庫。

AZURE_WEB_JOBS_STORAGE=$( \
    az storage account show-connection-string \
        --resource-group $RESOURCE_GROUP \
        --name $STORAGE_ACCOUNT"c" \
        --query connectionString \
        --output tsv)
echo $AZURE_WEB_JOBS_STORAGE
COSMOS_DB_CONNECTION_STRING=$( \
    az cosmosdb keys list \
        --resource-group $RESOURCE_GROUP \
        --name $COSMOS_DB_ACCOUNT \
        --type connection-strings \
        --query 'connectionStrings[0].connectionString' \
        --output tsv)
echo $COSMOS_DB_CONNECTION_STRING

您可以使用 echo $EVENT_HUB_CONNECTION_STRING 命令來檢查該變數是否仍正確設定。 否則,請重新執行下列命令:

EVENT_HUB_CONNECTION_STRING=$( \
    az eventhubs eventhub authorization-rule keys list \
        --resource-group $RESOURCE_GROUP \
        --name $EVENT_HUB_AUTHORIZATION_RULE \
        --eventhub-name $EVENT_HUB_NAME \
        --namespace-name $EVENT_HUB_NAMESPACE \
        --query primaryConnectionString \
        --output tsv)
echo $EVENT_HUB_CONNECTION_STRING

這些連接字串必須儲存在您 Azure Functions 帳戶的應用程式設定中。

az functionapp config appsettings set \
    --resource-group $RESOURCE_GROUP \
    --name $FUNCTION_APP"-c" \
    --settings \
        AzureWebJobsStorage=$AZURE_WEB_JOBS_STORAGE \
        EventHubConnectionString=$EVENT_HUB_CONNECTION_STRING \
        CosmosDBConnectionString=$COSMOS_DB_CONNECTION_STRING

注意

針對生產環境,您可以使用 Azure Key Vault 的執行個體儲存及管理連接字串。

建立函式應用程式

建立下一個函式之前,請確定您在正確的資料夾中。

cd ..
mvn archetype:generate --batch-mode \
    -DarchetypeGroupId=com.microsoft.azure \
    -DarchetypeArtifactId=azure-functions-archetype \
    -DappName=$FUNCTION_APP"-c" \
    -DresourceGroup=$RESOURCE_GROUP \
    -DappRegion=$LOCATION \
    -DappServicePlanName=$LOCATION"plan" \
    -DgroupId=com.learn \
    -DartifactId=telemetry-functions-consumer

該命令會建立一個應用程式,如上一個練習中所示。 您將刪除測試檔案、使用 fetch-app-settings 命令來更新 local.settings.file,然後取代現有的 Function.java 檔案。

cd telemetry-functions-consumer
rm -r src/test

更新用於本機執行和偵錯的本機設定。

func azure functionapp fetch-app-settings $FUNCTION_APP"-c"

接下來,請開啟 Function.java 檔案,並以下列程式碼取代內容:

package com.learn;

import com.learn.TelemetryItem.status;
import com.microsoft.azure.functions.annotation.FunctionName;
import com.microsoft.azure.functions.ExecutionContext;
import com.microsoft.azure.functions.OutputBinding;
import com.microsoft.azure.functions.annotation.Cardinality;
import com.microsoft.azure.functions.annotation.CosmosDBOutput;
import com.microsoft.azure.functions.annotation.EventHubTrigger;

public class Function {

    @FunctionName("processSensorData")
    public void processSensorData(
        @EventHubTrigger(
            name = "msg",
            eventHubName = "", // blank because the value is included in the connection string
            cardinality = Cardinality.ONE,
            connection = "EventHubConnectionString")
            TelemetryItem item,
        @CosmosDBOutput(
            name = "databaseOutput",
            databaseName = "TelemetryDb",
            collectionName = "TelemetryInfo",
            connectionStringSetting = "CosmosDBConnectionString")
            OutputBinding<TelemetryItem> document,
        final ExecutionContext context) {
    
        context.getLogger().info("Event hub message received: " + item.toString());
    
        if (item.getPressure() > 30) {
            item.setNormalPressure(false);
        } else {
            item.setNormalPressure(true);
        }
    
        if (item.getTemperature() < 40) {
            item.setTemperatureStatus(status.COOL);
        } else if (item.getTemperature() > 90) {
            item.setTemperatureStatus(status.HOT);
        } else {
            item.setTemperatureStatus(status.WARM);
        }
    
        document.setValue(item);
    }
}

在與 Function.java 相同的位置中建立名為 TelemetryItem.java 的另一個新檔案,並新增下列程式碼:

package com.learn;

public class TelemetryItem {

    private String id;
    private double temperature;
    private double pressure;
    private boolean isNormalPressure;
    private status temperatureStatus;
    static enum status {
        COOL,
        WARM,
        HOT
    }

    public TelemetryItem(double temperature, double pressure) {
        this.temperature = temperature;
        this.pressure = pressure;
    }

    public String getId() {
        return id;
    }

    public double getTemperature() {
        return temperature;
    }

    public double getPressure() {
        return pressure;
    }

    @Override
    public String toString() {
        return "TelemetryItem={id=" + id + ",temperature="
            + temperature + ",pressure=" + pressure + "}";
    }

    public boolean isNormalPressure() {
        return isNormalPressure;
    }

    public void setNormalPressure(boolean isNormal) {
        this.isNormalPressure = isNormal;
    }

    public status getTemperatureStatus() {
        return temperatureStatus;
    }

    public void setTemperatureStatus(status temperatureStatus) {
        this.temperatureStatus = temperatureStatus;
    }
}

當事件中樞收到訊息時,其會產生事件。 processSensorData 函式會在接收事件時執行。 然後,它會處理事件資料,並使用 Azure Cosmos DB 的輸出繫結來將結果傳送至資料庫。 我們會再次使用 TelemetryItem.java 類別。 TelemetryItem 物件可視為此事件驅動系統參與者之間的取用者驅動合約。

在本機執行

使用 Azure Functions,您可以接收來自全球各地的事件。 是的,您甚至可以接收本機開發電腦上的事件!

mvn clean package
mvn azure-functions:run

在組建和啟動訊息之後,您會在函式執行時看到傳入的事件:

[2021-01-19T16:45:24.709Z] Executing 'Functions.processSensorData' (Reason='(null)', Id=87354afa-abf4-4963-bd44-0c1421048240)
[2021-01-19T16:45:24.712Z] Event hub message received: TelemetryItem={id=null,temperature=21.653044570769897,pressure=36.061288095436126}
[2021-01-19T16:45:24.712Z] Function "processSensorData" (Id: 87354afa-abf4-4963-bd44-0c1421048240) invoked by Java Worker

在 Azure 入口網站中,前往 Azure Cosmos DB 帳戶。 依序選取 [資料總管]、[TelemetryInfo] 和 [項目],以在資料抵達時進行檢視。

Screenshot that shows TelemetryInfo in Azure Cosmos DB Data Explorer.

在 Azure 上進行部署

現在,讓我們將整個工作負載轉移到雲端。 若要將函式部署到 Azure Functions,您將使用 Maven 命令 mvn azure-functions:deploy。 確定您仍在正確的存放庫 telemetry-functions 中。

mvn azure-functions:deploy

很好! 我們已部署整個遙測案例,方法是將資料傳送至事件中樞,然後使用不同的獨立函式取用資料。 此函式會處理資料,然後將結果儲存在使用 Azure Cosmos DB 建立的資料庫中。 如何才能確保應用程式符合預先定義的需求? 透過使用監視。