Övning – Bearbeta händelserna och lagra data i Azure Cosmos DB

Slutförd

En andra funktion kan lyssna på händelser för det specifika namnområdet i Azure-händelsehubben och bearbeta och lagra dem i en databas som skapats med Azure Cosmos DB.

Skapa en databas med Azure Cosmos DB

Använd kommandot för az cosmosdb create att skapa databasen. Kommandot använder ett Azure Cosmos DB-konto, en databas och en SQL-container.

az cosmosdb create \
    --resource-group $RESOURCE_GROUP \
    --name $COSMOS_DB_ACCOUNT
az cosmosdb sql database create \
    --resource-group $RESOURCE_GROUP \
    --account-name $COSMOS_DB_ACCOUNT \
    --name TelemetryDb
az cosmosdb sql container create \
    --resource-group $RESOURCE_GROUP \
    --account-name $COSMOS_DB_ACCOUNT \
    --database-name TelemetryDb \
    --name TelemetryInfo \
    --partition-key-path '/temperatureStatus'

För vårt scenario är temperaturen intressant. Därför definierar temperatureStatus vi som partitionsnyckel.

Skapa, konfigurera och distribuera en annan Azure-funktion

Med händelsehubbar kan du börja med dataströmmar i megabyte och växa till gigabyte eller terabyte. Funktionen autoinflate är ett av de många tillgängliga alternativen för att skala antalet dataflödesenheter för att uppfylla dina användningsbehov.

De program som används för varje funktion har en separat vy över händelseströmmen. De läser strömmen oberoende av varandra i egen takt och med sina egna offset.

I vårt scenario skapar du en azure-funktion som ett exempel. För att skapa funktionen, enligt bästa praxis, bör den vara oberoende, med sitt eget lagringskonto och bindningar för lös koppling och skalbarhet.

az storage account create \
    --resource-group $RESOURCE_GROUP \
    --name $STORAGE_ACCOUNT"c" \
    --sku Standard_LRS
az functionapp create \
    --resource-group $RESOURCE_GROUP \
    --name $FUNCTION_APP"-c"\
    --storage-account $STORAGE_ACCOUNT"c" \
    --consumption-plan-location $LOCATION \
    --runtime java \
    --functions-version 4

Hämta anslutningssträng

Konsumentfunktionen måste vara medveten om sitt lagringskonto och händelsehubben. Den måste också vara medveten om databasen som den skriver de bearbetade händelserna till.

AZURE_WEB_JOBS_STORAGE=$( \
    az storage account show-connection-string \
        --resource-group $RESOURCE_GROUP \
        --name $STORAGE_ACCOUNT"c" \
        --query connectionString \
        --output tsv)
echo $AZURE_WEB_JOBS_STORAGE
COSMOS_DB_CONNECTION_STRING=$( \
    az cosmosdb keys list \
        --resource-group $RESOURCE_GROUP \
        --name $COSMOS_DB_ACCOUNT \
        --type connection-strings \
        --query 'connectionStrings[0].connectionString' \
        --output tsv)
echo $COSMOS_DB_CONNECTION_STRING

Du kan använda kommandot echo $EVENT_HUB_CONNECTION_STRING för att kontrollera om variabeln fortfarande är korrekt inställd. Annars kör du följande kommando igen:

EVENT_HUB_CONNECTION_STRING=$( \
    az eventhubs eventhub authorization-rule keys list \
        --resource-group $RESOURCE_GROUP \
        --name $EVENT_HUB_AUTHORIZATION_RULE \
        --eventhub-name $EVENT_HUB_NAME \
        --namespace-name $EVENT_HUB_NAMESPACE \
        --query primaryConnectionString \
        --output tsv)
echo $EVENT_HUB_CONNECTION_STRING

Dessa anslutningssträng måste lagras i programinställningarna för ditt Azure Functions-konto.

az functionapp config appsettings set \
    --resource-group $RESOURCE_GROUP \
    --name $FUNCTION_APP"-c" \
    --settings \
        AzureWebJobsStorage=$AZURE_WEB_JOBS_STORAGE \
        EventHubConnectionString=$EVENT_HUB_CONNECTION_STRING \
        CosmosDBConnectionString=$COSMOS_DB_CONNECTION_STRING

Kommentar

För produktionsmiljöer kan du använda en instans av Azure Key Vault för att lagra och hantera anslutningssträng.

Skapa funktionsprogrammet

Kontrollera att du är i rätt mapp innan du skapar nästa funktion.

cd ..
mvn archetype:generate --batch-mode \
    -DarchetypeGroupId=com.microsoft.azure \
    -DarchetypeArtifactId=azure-functions-archetype \
    -DappName=$FUNCTION_APP"-c" \
    -DresourceGroup=$RESOURCE_GROUP \
    -DappRegion=$LOCATION \
    -DappServicePlanName=$LOCATION"plan" \
    -DgroupId=com.learn \
    -DartifactId=telemetry-functions-consumer

Kommandot skapar ett program som i den senaste övningen. Du tar bort testfilerna, uppdaterar local.settings.file med fetch-app-settings kommandot och ersätter sedan den befintliga Function.java filen.

cd telemetry-functions-consumer
rm -r src/test

Uppdatera de lokala inställningarna för lokal körning och felsökning.

func azure functionapp fetch-app-settings $FUNCTION_APP"-c"

Öppna Function.java sedan filen och ersätt innehållet med följande kod:

package com.learn;

import com.learn.TelemetryItem.status;
import com.microsoft.azure.functions.annotation.FunctionName;
import com.microsoft.azure.functions.ExecutionContext;
import com.microsoft.azure.functions.OutputBinding;
import com.microsoft.azure.functions.annotation.Cardinality;
import com.microsoft.azure.functions.annotation.CosmosDBOutput;
import com.microsoft.azure.functions.annotation.EventHubTrigger;

public class Function {

    @FunctionName("processSensorData")
    public void processSensorData(
        @EventHubTrigger(
            name = "msg",
            eventHubName = "", // blank because the value is included in the connection string
            cardinality = Cardinality.ONE,
            connection = "EventHubConnectionString")
            TelemetryItem item,
        @CosmosDBOutput(
            name = "databaseOutput",
            databaseName = "TelemetryDb",
            collectionName = "TelemetryInfo",
            connectionStringSetting = "CosmosDBConnectionString")
            OutputBinding<TelemetryItem> document,
        final ExecutionContext context) {
    
        context.getLogger().info("Event hub message received: " + item.toString());
    
        if (item.getPressure() > 30) {
            item.setNormalPressure(false);
        } else {
            item.setNormalPressure(true);
        }
    
        if (item.getTemperature() < 40) {
            item.setTemperatureStatus(status.COOL);
        } else if (item.getTemperature() > 90) {
            item.setTemperatureStatus(status.HOT);
        } else {
            item.setTemperatureStatus(status.WARM);
        }
    
        document.setValue(item);
    }
}

Skapa en annan ny fil med namnet TelemetryItem.java på samma plats som Function.java och lägg till följande kod:

package com.learn;

public class TelemetryItem {

    private String id;
    private double temperature;
    private double pressure;
    private boolean isNormalPressure;
    private status temperatureStatus;
    static enum status {
        COOL,
        WARM,
        HOT
    }

    public TelemetryItem(double temperature, double pressure) {
        this.temperature = temperature;
        this.pressure = pressure;
    }

    public String getId() {
        return id;
    }

    public double getTemperature() {
        return temperature;
    }

    public double getPressure() {
        return pressure;
    }

    @Override
    public String toString() {
        return "TelemetryItem={id=" + id + ",temperature="
            + temperature + ",pressure=" + pressure + "}";
    }

    public boolean isNormalPressure() {
        return isNormalPressure;
    }

    public void setNormalPressure(boolean isNormal) {
        this.isNormalPressure = isNormal;
    }

    public status getTemperatureStatus() {
        return temperatureStatus;
    }

    public void setTemperatureStatus(status temperatureStatus) {
        this.temperatureStatus = temperatureStatus;
    }
}

När händelsehubben tar emot meddelandet genereras en händelse. Funktionen processSensorData körs när den tar emot händelsen. Sedan bearbetar den händelsedata och använder en utdatabindning av Azure Cosmos DB för att skicka resultaten till databasen. Vi använder TelemetryItem.java klassen igen. Objekten TelemetryItem kan ses som det konsumentdrivna kontraktet mellan deltagarna i det här händelsedrivna systemet.

Kör lokalt

Med Azure Functions kan du ta emot händelser från hela världen. Ja, du kan även ta emot händelser lokalt på utvecklingsdatorn!

mvn clean package
mvn azure-functions:run

Efter bygg- och startmeddelandena visas inkommande händelser när funktionen körs:

[2021-01-19T16:45:24.709Z] Executing 'Functions.processSensorData' (Reason='(null)', Id=87354afa-abf4-4963-bd44-0c1421048240)
[2021-01-19T16:45:24.712Z] Event hub message received: TelemetryItem={id=null,temperature=21.653044570769897,pressure=36.061288095436126}
[2021-01-19T16:45:24.712Z] Function "processSensorData" (Id: 87354afa-abf4-4963-bd44-0c1421048240) invoked by Java Worker

I Azure-portalen går du till ditt Azure Cosmos DB-konto. Välj Datautforskaren, välj TelemetryInfo och välj sedan Objekt för att visa dina data när de kommer.

Screenshot that shows TelemetryInfo in Azure Cosmos DB Data Explorer.

Distribuera i Azure

Nu ska vi flytta hela arbetsbelastningen i molnet. Om du vill distribuera funktionerna till Azure Functions använder du Maven-kommandot mvn azure-functions:deploy. Kontrollera att du fortfarande är på rätt lagringsplats, telemetrifunktioner.

mvn azure-functions:deploy

Underbart! Vi distribuerade hela telemetriscenariot genom att skicka data mot en händelsehubb och använda data med en annan oberoende funktion. Funktionen bearbetar data och lagrar sedan resultatet i en databas som skapats med Azure Cosmos DB. Hur kan vi se till att vårt program uppfyller våra fördefinierade krav? Genom att använda övervakning.