Übung: Verarbeiten der Ereignisse und Speichern der Daten in Azure Cosmos DB

Abgeschlossen

Eine zweite Funktion kann auf Ereignisse des spezifischen Namespace im Azure Event Hub lauschen, diese verarbeiten und in einer mit Azure Cosmos DB erstellten Datenbank speichern.

Erstellen einer Datenbank mit Azure Cosmos DB

Verwenden Sie den az cosmosdb create-Befehl, um die Datenbank zu erstellen. Im Befehl werden ein Azure Cosmos DB-Konto, eine Datenbank und einen SQL-Container verwendet.

az cosmosdb create \
    --resource-group $RESOURCE_GROUP \
    --name $COSMOS_DB_ACCOUNT
az cosmosdb sql database create \
    --resource-group $RESOURCE_GROUP \
    --account-name $COSMOS_DB_ACCOUNT \
    --name TelemetryDb
az cosmosdb sql container create \
    --resource-group $RESOURCE_GROUP \
    --account-name $COSMOS_DB_ACCOUNT \
    --database-name TelemetryDb \
    --name TelemetryInfo \
    --partition-key-path '/temperatureStatus'

In unserem Szenario ist die Temperatur interessant. Sie definieren also temperatureStatus als Partitionsschlüssel.

Erstellen, Konfigurieren und Bereitstellen einer weiteren Azure-Funktion

Mit Event Hubs können Sie mit Datenströmen in Megabytes anfangen und zu Gigabytes oder Terabytes erweitern. Das Feature „Automatische Vergrößerung“ ist eine von vielen verfügbaren Optionen zum Skalieren der Anzahl von Durchsatzeinheiten, um Ihren Nutzungsanforderungen gerecht zu werden.

Die Consumeranwendungen der einzelnen Funktionen verfügen jeweils über eine separate Ansicht des Ereignisdatenstroms. Sie können den Datenstrom unabhängig voneinander im eigenen Tempo und mit eigenen Offsets lesen.

In diesem Szenario erstellen Sie eine Azure-Funktion, die als Beispiel dienen soll. Gemäß Best Practices wird die Funktion als unabhängige Funktion mit einem eigenen Speicherkonto und eigenen Bindungen für eine lose Kopplung und Skalierbarkeit erstellt.

az storage account create \
    --resource-group $RESOURCE_GROUP \
    --name $STORAGE_ACCOUNT"c" \
    --sku Standard_LRS
az functionapp create \
    --resource-group $RESOURCE_GROUP \
    --name $FUNCTION_APP"-c"\
    --storage-account $STORAGE_ACCOUNT"c" \
    --consumption-plan-location $LOCATION \
    --runtime java \
    --functions-version 4

Abrufen der Verbindungszeichenfolgen

Das Speicherkonto und der Event Hub müssen für die Consumerfunktion bekannt sein. Die Datenbank, in die die verarbeiteten Ereignisse geschrieben werden sollen, muss der Funktion ebenfalls bekannt sein.

AZURE_WEB_JOBS_STORAGE=$( \
    az storage account show-connection-string \
        --resource-group $RESOURCE_GROUP \
        --name $STORAGE_ACCOUNT"c" \
        --query connectionString \
        --output tsv)
echo $AZURE_WEB_JOBS_STORAGE
COSMOS_DB_CONNECTION_STRING=$( \
    az cosmosdb keys list \
        --resource-group $RESOURCE_GROUP \
        --name $COSMOS_DB_ACCOUNT \
        --type connection-strings \
        --query 'connectionStrings[0].connectionString' \
        --output tsv)
echo $COSMOS_DB_CONNECTION_STRING

Mit dem Befehl echo $EVENT_HUB_CONNECTION_STRING können Sie überprüfen, ob die Variable noch korrekt festgelegt ist. Führen Sie andernfalls den folgenden Befehl erneut aus:

EVENT_HUB_CONNECTION_STRING=$( \
    az eventhubs eventhub authorization-rule keys list \
        --resource-group $RESOURCE_GROUP \
        --name $EVENT_HUB_AUTHORIZATION_RULE \
        --eventhub-name $EVENT_HUB_NAME \
        --namespace-name $EVENT_HUB_NAMESPACE \
        --query primaryConnectionString \
        --output tsv)
echo $EVENT_HUB_CONNECTION_STRING

Diese Verbindungszeichenfolgen müssen in den Anwendungseinstellungen Ihres Azure Functions-Kontos gespeichert werden.

az functionapp config appsettings set \
    --resource-group $RESOURCE_GROUP \
    --name $FUNCTION_APP"-c" \
    --settings \
        AzureWebJobsStorage=$AZURE_WEB_JOBS_STORAGE \
        EventHubConnectionString=$EVENT_HUB_CONNECTION_STRING \
        CosmosDBConnectionString=$COSMOS_DB_CONNECTION_STRING

Hinweis

In Produktionsumgebungen können Sie eine Azure Key Vault-Instanz verwenden, um die Verbindungszeichenfolgen zu speichern und zu verwalten.

Erstellen der Functions-Anwendung

Stellen Sie vor dem Erstellen der nächsten Funktion sicher, dass Sie sich im richtigen Ordner befinden.

cd ..
mvn archetype:generate --batch-mode \
    -DarchetypeGroupId=com.microsoft.azure \
    -DarchetypeArtifactId=azure-functions-archetype \
    -DappName=$FUNCTION_APP"-c" \
    -DresourceGroup=$RESOURCE_GROUP \
    -DappRegion=$LOCATION \
    -DappServicePlanName=$LOCATION"plan" \
    -DgroupId=com.learn \
    -DartifactId=telemetry-functions-consumer

Mit dem Befehl wird eine Anwendung wie in der letzten Übung erstellt. Sie löschen die Testdateien, aktualisieren local.settings.file mit dem Befehl fetch-app-settings und ersetzen dann die vorhandene Function.java-Datei.

cd telemetry-functions-consumer
rm -r src/test

Aktualisieren Sie die lokalen Einstellungen für die lokale Ausführung und das Debuggen.

func azure functionapp fetch-app-settings $FUNCTION_APP"-c"

Öffnen Sie als Nächstes die Datei Function.java, und ersetzen Sie den Inhalt durch den folgenden Code:

package com.learn;

import com.learn.TelemetryItem.status;
import com.microsoft.azure.functions.annotation.FunctionName;
import com.microsoft.azure.functions.ExecutionContext;
import com.microsoft.azure.functions.OutputBinding;
import com.microsoft.azure.functions.annotation.Cardinality;
import com.microsoft.azure.functions.annotation.CosmosDBOutput;
import com.microsoft.azure.functions.annotation.EventHubTrigger;

public class Function {

    @FunctionName("processSensorData")
    public void processSensorData(
        @EventHubTrigger(
            name = "msg",
            eventHubName = "", // blank because the value is included in the connection string
            cardinality = Cardinality.ONE,
            connection = "EventHubConnectionString")
            TelemetryItem item,
        @CosmosDBOutput(
            name = "databaseOutput",
            databaseName = "TelemetryDb",
            collectionName = "TelemetryInfo",
            connectionStringSetting = "CosmosDBConnectionString")
            OutputBinding<TelemetryItem> document,
        final ExecutionContext context) {
    
        context.getLogger().info("Event hub message received: " + item.toString());
    
        if (item.getPressure() > 30) {
            item.setNormalPressure(false);
        } else {
            item.setNormalPressure(true);
        }
    
        if (item.getTemperature() < 40) {
            item.setTemperatureStatus(status.COOL);
        } else if (item.getTemperature() > 90) {
            item.setTemperatureStatus(status.HOT);
        } else {
            item.setTemperatureStatus(status.WARM);
        }
    
        document.setValue(item);
    }
}

Erstellen Sie eine weitere Datei mit dem Namen „TelemetryItem.java“ am gleichen Speicherort wie Function.java, und fügen Sie den folgenden Code hinzu:

package com.learn;

public class TelemetryItem {

    private String id;
    private double temperature;
    private double pressure;
    private boolean isNormalPressure;
    private status temperatureStatus;
    static enum status {
        COOL,
        WARM,
        HOT
    }

    public TelemetryItem(double temperature, double pressure) {
        this.temperature = temperature;
        this.pressure = pressure;
    }

    public String getId() {
        return id;
    }

    public double getTemperature() {
        return temperature;
    }

    public double getPressure() {
        return pressure;
    }

    @Override
    public String toString() {
        return "TelemetryItem={id=" + id + ",temperature="
            + temperature + ",pressure=" + pressure + "}";
    }

    public boolean isNormalPressure() {
        return isNormalPressure;
    }

    public void setNormalPressure(boolean isNormal) {
        this.isNormalPressure = isNormal;
    }

    public status getTemperatureStatus() {
        return temperatureStatus;
    }

    public void setTemperatureStatus(status temperatureStatus) {
        this.temperatureStatus = temperatureStatus;
    }
}

Wenn der Event Hub die Nachricht empfängt, generiert er ein Ereignis. Die Funktion processSensorData wird ausgeführt, wenn sie das Ereignis empfängt. Anschließend werden die Ereignisdaten verarbeitet und eine Ausgabebindung von Azure Cosmos DB verwendet, um die Ergebnisse an die Datenbank zu senden. Wir verwenden wieder die Klasse TelemetryItem.java. Die TelemetryItem-Objekte können als Consumer Driven Contract zwischen den Teilnehmern dieses ereignisgesteuerten Systems angesehen werden.

Lokales Ausführen

Mit Azure Functions können Sie Ereignisse aus der ganzen Welt empfangen. Ja, Sie können Ereignisse lokal auf Ihrem Entwicklungscomputer erhalten.

mvn clean package
mvn azure-functions:run

Bei Ausführen der Funktion werden nach den Build- und Startnachrichten die eingehenden Ereignisse angezeigt:

[2021-01-19T16:45:24.709Z] Executing 'Functions.processSensorData' (Reason='(null)', Id=87354afa-abf4-4963-bd44-0c1421048240)
[2021-01-19T16:45:24.712Z] Event hub message received: TelemetryItem={id=null,temperature=21.653044570769897,pressure=36.061288095436126}
[2021-01-19T16:45:24.712Z] Function "processSensorData" (Id: 87354afa-abf4-4963-bd44-0c1421048240) invoked by Java Worker

Navigieren Sie im Azure-Portal zu Ihrem Azure Cosmos DB-Konto. Klicken Sie auf Daten-Explorer, klicken Sie auf TelemetryInfo, und klicken Sie dann auf Elemente, um Ihre Daten beim Eintreffen anzuzeigen.

Screenshot that shows TelemetryInfo in Azure Cosmos DB Data Explorer.

Bereitstellen in Azure

Nun verschieben wir die gesamte Workload in die Cloud. Verwenden Sie den Maven-Befehl mvn azure-functions:deploy, um die Funktionen für Azure Functions bereitzustellen. Stellen Sie sicher, dass Sie sich noch im richtigen telemetry-functions-Repository befinden.

mvn azure-functions:deploy

Wunderbar! Das gesamte Telemetrieszenario wurde bereitgestellt, indem die Daten an einen Event Hub gesendet und die Daten von einer anderen unabhängigen Funktion genutzt wurden. Die Funktion verarbeitet die Daten und speichert das Ergebnis dann in einer Datenbank, die mit Azure Cosmos DB erstellt wurde. Wie können wir sicherstellen, dass unsere Anwendung unsere vorher definierten Anforderungen erfüllt? Mithilfe der Überwachung.