Exercice - Traiter les événements et stocker les données dans Azure Cosmos DB

Effectué

Une deuxième fonction peut écouter les événements de l’espace de noms spécifique dans le hub d’événements Azure, les traiter et les stocker dans une base de données créée avec Azure Cosmos DB.

Créer une base de données avec Azure Cosmos DB

Pour créer la base de données, utilisez la commande az cosmosdb create. La commande utilise un compte Azure Cosmos DB, une base de données et un conteneur SQL.

az cosmosdb create \
    --resource-group $RESOURCE_GROUP \
    --name $COSMOS_DB_ACCOUNT
az cosmosdb sql database create \
    --resource-group $RESOURCE_GROUP \
    --account-name $COSMOS_DB_ACCOUNT \
    --name TelemetryDb
az cosmosdb sql container create \
    --resource-group $RESOURCE_GROUP \
    --account-name $COSMOS_DB_ACCOUNT \
    --database-name TelemetryDb \
    --name TelemetryInfo \
    --partition-key-path '/temperatureStatus'

Pour notre scénario, la température est intéressante. Nous définissons donc temperatureStatus comme clé de partition.

Créer, configurer et déployer une autre fonction Azure

Avec les hubs d’événements, vous pouvez commencer avec des flux de données en mégaoctets et passer à des gigaoctets ou téraoctets. La fonctionnalité de majoration automatique est l’une des nombreuses options disponibles pour mettre à l’échelle le nombre d’unités de débit et ainsi répondre à vos besoins d’utilisation.

Les applications consommatrices pour chaque fonction ont une vue distincte du flux d’événements. Ils lisent le flux de manière indépendante à leur propre rythme et avec leurs propres décalages.

Pour notre scénario, vous créez un exemple de fonction Azure consommatrice. Pour respecter les bonnes pratiques, vous devez créer la fonction de sorte qu’elle soit indépendante, avec son propre compte de stockage ainsi que des liaisons pour un couplage faible et une scalabilité.

az storage account create \
    --resource-group $RESOURCE_GROUP \
    --name $STORAGE_ACCOUNT"c" \
    --sku Standard_LRS
az functionapp create \
    --resource-group $RESOURCE_GROUP \
    --name $FUNCTION_APP"-c"\
    --storage-account $STORAGE_ACCOUNT"c" \
    --consumption-plan-location $LOCATION \
    --runtime java \
    --functions-version 4

Récupérer les chaînes de connexion

La fonction consommatrice doit avoir connaissance de son compte de stockage et du hub d’événements. Elle doit également connaître la base de données où elle écrit les événements traités.

AZURE_WEB_JOBS_STORAGE=$( \
    az storage account show-connection-string \
        --resource-group $RESOURCE_GROUP \
        --name $STORAGE_ACCOUNT"c" \
        --query connectionString \
        --output tsv)
echo $AZURE_WEB_JOBS_STORAGE
COSMOS_DB_CONNECTION_STRING=$( \
    az cosmosdb keys list \
        --resource-group $RESOURCE_GROUP \
        --name $COSMOS_DB_ACCOUNT \
        --type connection-strings \
        --query 'connectionStrings[0].connectionString' \
        --output tsv)
echo $COSMOS_DB_CONNECTION_STRING

Vous pouvez utiliser la commande echo $EVENT_HUB_CONNECTION_STRING pour vérifier si la variable est toujours définie correctement. Sinon, réexécutez la commande suivante :

EVENT_HUB_CONNECTION_STRING=$( \
    az eventhubs eventhub authorization-rule keys list \
        --resource-group $RESOURCE_GROUP \
        --name $EVENT_HUB_AUTHORIZATION_RULE \
        --eventhub-name $EVENT_HUB_NAME \
        --namespace-name $EVENT_HUB_NAMESPACE \
        --query primaryConnectionString \
        --output tsv)
echo $EVENT_HUB_CONNECTION_STRING

Ces chaînes de connexion doivent être stockées dans les paramètres d’application de votre compte Azure Functions.

az functionapp config appsettings set \
    --resource-group $RESOURCE_GROUP \
    --name $FUNCTION_APP"-c" \
    --settings \
        AzureWebJobsStorage=$AZURE_WEB_JOBS_STORAGE \
        EventHubConnectionString=$EVENT_HUB_CONNECTION_STRING \
        CosmosDBConnectionString=$COSMOS_DB_CONNECTION_STRING

Notes

Pour les environnements de production, vous pouvez utiliser une instance d’Azure Key Vault pour stocker et gérer les chaînes de connexion.

Créer l’application de fonction

Avant de créer la fonction suivante, veillez à vous trouver dans le dossier approprié.

cd ..
mvn archetype:generate --batch-mode \
    -DarchetypeGroupId=com.microsoft.azure \
    -DarchetypeArtifactId=azure-functions-archetype \
    -DappName=$FUNCTION_APP"-c" \
    -DresourceGroup=$RESOURCE_GROUP \
    -DappRegion=$LOCATION \
    -DappServicePlanName=$LOCATION"plan" \
    -DgroupId=com.learn \
    -DartifactId=telemetry-functions-consumer

La commande crée une application comme dans le dernier exercice. Vous supprimez les fichiers de test, mettez à jour local.settings.file avec la commande fetch-app-settings, puis remplacez le fichier Function.java existant.

cd telemetry-functions-consumer
rm -r src/test

Mettez à jour les paramètres locaux pour l’exécution locale et le débogage.

func azure functionapp fetch-app-settings $FUNCTION_APP"-c"

Ensuite, ouvrez le fichier Function.java et remplacez son contenu par le code suivant :

package com.learn;

import com.learn.TelemetryItem.status;
import com.microsoft.azure.functions.annotation.FunctionName;
import com.microsoft.azure.functions.ExecutionContext;
import com.microsoft.azure.functions.OutputBinding;
import com.microsoft.azure.functions.annotation.Cardinality;
import com.microsoft.azure.functions.annotation.CosmosDBOutput;
import com.microsoft.azure.functions.annotation.EventHubTrigger;

public class Function {

    @FunctionName("processSensorData")
    public void processSensorData(
        @EventHubTrigger(
            name = "msg",
            eventHubName = "", // blank because the value is included in the connection string
            cardinality = Cardinality.ONE,
            connection = "EventHubConnectionString")
            TelemetryItem item,
        @CosmosDBOutput(
            name = "databaseOutput",
            databaseName = "TelemetryDb",
            collectionName = "TelemetryInfo",
            connectionStringSetting = "CosmosDBConnectionString")
            OutputBinding<TelemetryItem> document,
        final ExecutionContext context) {
    
        context.getLogger().info("Event hub message received: " + item.toString());
    
        if (item.getPressure() > 30) {
            item.setNormalPressure(false);
        } else {
            item.setNormalPressure(true);
        }
    
        if (item.getTemperature() < 40) {
            item.setTemperatureStatus(status.COOL);
        } else if (item.getTemperature() > 90) {
            item.setTemperatureStatus(status.HOT);
        } else {
            item.setTemperatureStatus(status.WARM);
        }
    
        document.setValue(item);
    }
}

Créez un autre fichier appelé TelemetryItem.java au même emplacement que le fichier Function.java et ajoutez le code suivant :

package com.learn;

public class TelemetryItem {

    private String id;
    private double temperature;
    private double pressure;
    private boolean isNormalPressure;
    private status temperatureStatus;
    static enum status {
        COOL,
        WARM,
        HOT
    }

    public TelemetryItem(double temperature, double pressure) {
        this.temperature = temperature;
        this.pressure = pressure;
    }

    public String getId() {
        return id;
    }

    public double getTemperature() {
        return temperature;
    }

    public double getPressure() {
        return pressure;
    }

    @Override
    public String toString() {
        return "TelemetryItem={id=" + id + ",temperature="
            + temperature + ",pressure=" + pressure + "}";
    }

    public boolean isNormalPressure() {
        return isNormalPressure;
    }

    public void setNormalPressure(boolean isNormal) {
        this.isNormalPressure = isNormal;
    }

    public status getTemperatureStatus() {
        return temperatureStatus;
    }

    public void setTemperatureStatus(status temperatureStatus) {
        this.temperatureStatus = temperatureStatus;
    }
}

Lorsque le hub d'événements reçoit le message, il génère un événement. La fonction processSensorData s'exécute lorsqu'il reçoit l'événement. Ensuite, elle traite les données d’événement et utilise une liaison de sortie d’Azure Cosmos DB pour envoyer les résultats à la base de données. Nous réutilisons la classe TelemetryItem.java. Les objets TelemetryItem peuvent être considérés comme le contrat piloté par le consommateur entre les participants de ce système piloté par les événements.

Exécution locale

Avec Azure Functions, vous pouvez recevoir des événements du monde entier. Oui, vous pouvez même recevoir des événements localement sur votre ordinateur de développement !

mvn clean package
mvn azure-functions:run

Après les messages de génération et de démarrage, les événements entrants sont visibles lorsque la fonction s’exécute :

[2021-01-19T16:45:24.709Z] Executing 'Functions.processSensorData' (Reason='(null)', Id=87354afa-abf4-4963-bd44-0c1421048240)
[2021-01-19T16:45:24.712Z] Event hub message received: TelemetryItem={id=null,temperature=21.653044570769897,pressure=36.061288095436126}
[2021-01-19T16:45:24.712Z] Function "processSensorData" (Id: 87354afa-abf4-4963-bd44-0c1421048240) invoked by Java Worker

Dans le portail Azure, accédez à votre compte Azure Cosmos DB. Sélectionnez Explorateur de données, TelemetryInfo, puis Éléments pour afficher vos données lors de leur arrivée.

Screenshot that shows TelemetryInfo in Azure Cosmos DB Data Explorer.

Déployer sur Azure

À présent, migrons toute la charge de travail sur le cloud. Pour déployer les fonctions sur Azure Functions, vous utilisez la commande Maven mvn azure-functions:deploy. Vérifiez que vous êtes toujours dans le dépôt approprié, telemetry-functions.

mvn azure-functions:deploy

Bravo ! Nous avons déployé la totalité du scénario de télémétrie en envoyant les données vers un hub d’événements, puis en les consommant avec une autre fonction indépendante. La fonction traite les données, puis stocke le résultat dans une base de données créée avec Azure Cosmos DB. Comment faire en sorte que notre application réponde à nos exigences prédéfinies ? À l’aide du monitoring.