Ejercicio: Procesamiento de los eventos y almacenamiento de los datos en Azure Cosmos DB

Completado

Una segunda función puede escuchar eventos del espacio de nombres específico del centro de eventos de Azure, y procesarlos y almacenarlos en una base de datos creada con Azure Cosmos DB.

Creación de una base de datos con Azure Cosmos DB

Para crear la base de datos, use el comando az cosmosdb create. El comando usa una cuenta de Azure Cosmos DB, una base de datos y un contenedor de SQL.

az cosmosdb create \
    --resource-group $RESOURCE_GROUP \
    --name $COSMOS_DB_ACCOUNT
az cosmosdb sql database create \
    --resource-group $RESOURCE_GROUP \
    --account-name $COSMOS_DB_ACCOUNT \
    --name TelemetryDb
az cosmosdb sql container create \
    --resource-group $RESOURCE_GROUP \
    --account-name $COSMOS_DB_ACCOUNT \
    --database-name TelemetryDb \
    --name TelemetryInfo \
    --partition-key-path '/temperatureStatus'

En nuestro escenario, la temperatura es interesante. Por eso, definimos temperatureStatus como clave de partición.

Compilación, configuración e implementación de otra función de Azure

Con los centros de eventos, puede comenzar con los flujos de datos en megabytes, y aumentar a gigabytes o terabytes. La característica de inflado automático es una de las muchas opciones disponibles para escalar el número de unidades de procesamiento con el fin de satisfacer las necesidades de uso.

Las aplicaciones de consumo de cada función tienen una vista separada de la secuencia de eventos. Leen la secuencia de forma independiente, a su propio ritmo y con sus propios desplazamientos.

En nuestro escenario, crea una función de Azure de consumo como ejemplo. Para crear la función, según los procedimientos recomendados, debe ser independiente, con su propia cuenta de almacenamiento y enlaces para el acoplamiento flexible y la escalabilidad.

az storage account create \
    --resource-group $RESOURCE_GROUP \
    --name $STORAGE_ACCOUNT"c" \
    --sku Standard_LRS
az functionapp create \
    --resource-group $RESOURCE_GROUP \
    --name $FUNCTION_APP"-c"\
    --storage-account $STORAGE_ACCOUNT"c" \
    --consumption-plan-location $LOCATION \
    --runtime java \
    --functions-version 4

Recuperación de las cadenas de conexión

La función de consumidor necesita conocer su cuenta de almacenamiento y el centro de eventos, También debe tener en cuenta la base de datos en la que escribe los eventos procesados.

AZURE_WEB_JOBS_STORAGE=$( \
    az storage account show-connection-string \
        --resource-group $RESOURCE_GROUP \
        --name $STORAGE_ACCOUNT"c" \
        --query connectionString \
        --output tsv)
echo $AZURE_WEB_JOBS_STORAGE
COSMOS_DB_CONNECTION_STRING=$( \
    az cosmosdb keys list \
        --resource-group $RESOURCE_GROUP \
        --name $COSMOS_DB_ACCOUNT \
        --type connection-strings \
        --query 'connectionStrings[0].connectionString' \
        --output tsv)
echo $COSMOS_DB_CONNECTION_STRING

Puede usar el comando echo $EVENT_HUB_CONNECTION_STRING para comprobar si la variable todavía está establecida correctamente. De lo contrario, vuelva a ejecutar el siguiente comando:

EVENT_HUB_CONNECTION_STRING=$( \
    az eventhubs eventhub authorization-rule keys list \
        --resource-group $RESOURCE_GROUP \
        --name $EVENT_HUB_AUTHORIZATION_RULE \
        --eventhub-name $EVENT_HUB_NAME \
        --namespace-name $EVENT_HUB_NAMESPACE \
        --query primaryConnectionString \
        --output tsv)
echo $EVENT_HUB_CONNECTION_STRING

Estas cadenas de conexión deben almacenarse en la configuración de la aplicación para la cuenta de Azure Functions.

az functionapp config appsettings set \
    --resource-group $RESOURCE_GROUP \
    --name $FUNCTION_APP"-c" \
    --settings \
        AzureWebJobsStorage=$AZURE_WEB_JOBS_STORAGE \
        EventHubConnectionString=$EVENT_HUB_CONNECTION_STRING \
        CosmosDBConnectionString=$COSMOS_DB_CONNECTION_STRING

Nota:

En entornos de producción, puede usar una instancia de Azure Key Vault para almacenar y administrar las cadenas de conexión.

Creación de una aplicación de Functions

Antes de crear la función siguiente, asegúrese de que se encuentra en la carpeta correcta.

cd ..
mvn archetype:generate --batch-mode \
    -DarchetypeGroupId=com.microsoft.azure \
    -DarchetypeArtifactId=azure-functions-archetype \
    -DappName=$FUNCTION_APP"-c" \
    -DresourceGroup=$RESOURCE_GROUP \
    -DappRegion=$LOCATION \
    -DappServicePlanName=$LOCATION"plan" \
    -DgroupId=com.learn \
    -DartifactId=telemetry-functions-consumer

El comando crea una aplicación como en el último ejercicio. Elimine los archivos de prueba, actualice local.settings.file con el comando fetch-app-settings y, a continuación, reemplace el archivo Function.java existente.

cd telemetry-functions-consumer
rm -r src/test

Actualice la configuración local para la ejecución y depuración locales.

func azure functionapp fetch-app-settings $FUNCTION_APP"-c"

Después, abra el archivo Function.java y reemplace el contenido por el código siguiente:

package com.learn;

import com.learn.TelemetryItem.status;
import com.microsoft.azure.functions.annotation.FunctionName;
import com.microsoft.azure.functions.ExecutionContext;
import com.microsoft.azure.functions.OutputBinding;
import com.microsoft.azure.functions.annotation.Cardinality;
import com.microsoft.azure.functions.annotation.CosmosDBOutput;
import com.microsoft.azure.functions.annotation.EventHubTrigger;

public class Function {

    @FunctionName("processSensorData")
    public void processSensorData(
        @EventHubTrigger(
            name = "msg",
            eventHubName = "", // blank because the value is included in the connection string
            cardinality = Cardinality.ONE,
            connection = "EventHubConnectionString")
            TelemetryItem item,
        @CosmosDBOutput(
            name = "databaseOutput",
            databaseName = "TelemetryDb",
            collectionName = "TelemetryInfo",
            connectionStringSetting = "CosmosDBConnectionString")
            OutputBinding<TelemetryItem> document,
        final ExecutionContext context) {
    
        context.getLogger().info("Event hub message received: " + item.toString());
    
        if (item.getPressure() > 30) {
            item.setNormalPressure(false);
        } else {
            item.setNormalPressure(true);
        }
    
        if (item.getTemperature() < 40) {
            item.setTemperatureStatus(status.COOL);
        } else if (item.getTemperature() > 90) {
            item.setTemperatureStatus(status.HOT);
        } else {
            item.setTemperatureStatus(status.WARM);
        }
    
        document.setValue(item);
    }
}

Cree otro archivo denominado TelemetryItem.java en la misma ubicación que Function.java y agregue el código siguiente:

package com.learn;

public class TelemetryItem {

    private String id;
    private double temperature;
    private double pressure;
    private boolean isNormalPressure;
    private status temperatureStatus;
    static enum status {
        COOL,
        WARM,
        HOT
    }

    public TelemetryItem(double temperature, double pressure) {
        this.temperature = temperature;
        this.pressure = pressure;
    }

    public String getId() {
        return id;
    }

    public double getTemperature() {
        return temperature;
    }

    public double getPressure() {
        return pressure;
    }

    @Override
    public String toString() {
        return "TelemetryItem={id=" + id + ",temperature="
            + temperature + ",pressure=" + pressure + "}";
    }

    public boolean isNormalPressure() {
        return isNormalPressure;
    }

    public void setNormalPressure(boolean isNormal) {
        this.isNormalPressure = isNormal;
    }

    public status getTemperatureStatus() {
        return temperatureStatus;
    }

    public void setTemperatureStatus(status temperatureStatus) {
        this.temperatureStatus = temperatureStatus;
    }
}

Cuando el centro de eventos recibe el mensaje, genera un evento. La función processSensorData se ejecuta cuando se recibe el evento. A continuación, procesa los datos del evento y usa un enlace de salida de Azure Cosmos DB para enviar los resultados a la base de datos. Usamos de nuevo la clase TelemetryItem.java. Los objetos TelemetryItem pueden verse como el contrato impulsado por el consumidor entre los participantes de este sistema controlado por eventos.

Ejecución en modo local

Con Azure Functions, puede recibir eventos de todo el mundo. Incluso puede recibir eventos localmente en el equipo de desarrollo.

mvn clean package
mvn azure-functions:run

Después de los mensajes de compilación e inicio, verá los eventos de entrada cuando se ejecute la función:

[2021-01-19T16:45:24.709Z] Executing 'Functions.processSensorData' (Reason='(null)', Id=87354afa-abf4-4963-bd44-0c1421048240)
[2021-01-19T16:45:24.712Z] Event hub message received: TelemetryItem={id=null,temperature=21.653044570769897,pressure=36.061288095436126}
[2021-01-19T16:45:24.712Z] Function "processSensorData" (Id: 87354afa-abf4-4963-bd44-0c1421048240) invoked by Java Worker

En Azure Portal, vaya a la cuenta de Azure Cosmos DB. Seleccione Explorador de datos, TelemetryInfo y Elementos para ver los datos cuando lleguen.

Screenshot that shows TelemetryInfo in Azure Cosmos DB Data Explorer.

Implementación en Azure

Ahora, vamos a desplazar toda la carga de trabajo a la nube. Para implementar las funciones en Azure Functions, use el comando de Maven mvn azure-functions:deploy. Asegúrese de que todavía está en el repositorio correcto (telemetry-functions).

mvn azure-functions:deploy

¡Estupendo! Hemos implementado todo el escenario de telemetría mediante el envío de los datos hacia un centro de eventos y el consumo de los datos con una función independiente diferente. La función procesa los datos y, luego, almacena el resultado en una base de datos creada con Azure Cosmos DB. ¿Cómo podemos asegurarnos de que nuestra aplicación cumple nuestros requisitos predefinidos? Mediante la supervisión.