Exercício - Processar os eventos e armazenar os dados no Azure Cosmos DB

Concluído

Uma segunda função pode ouvir eventos do namespace específico no hub de eventos do Azure e processá-los e armazená-los em um banco de dados criado com o Azure Cosmos DB.

Criar um banco de dados com o Azure Cosmos DB

Para criar o banco de dados, use o az cosmosdb create comando. O comando usa uma conta do Azure Cosmos DB, um banco de dados e um contêiner SQL.

az cosmosdb create \
    --resource-group $RESOURCE_GROUP \
    --name $COSMOS_DB_ACCOUNT
az cosmosdb sql database create \
    --resource-group $RESOURCE_GROUP \
    --account-name $COSMOS_DB_ACCOUNT \
    --name TelemetryDb
az cosmosdb sql container create \
    --resource-group $RESOURCE_GROUP \
    --account-name $COSMOS_DB_ACCOUNT \
    --database-name TelemetryDb \
    --name TelemetryInfo \
    --partition-key-path '/temperatureStatus'

Para o nosso cenário, a temperatura é interessante. Assim, definimos temperatureStatus como a chave de partição.

Criar, configurar e implantar outra função do Azure

Com hubs de eventos, você pode começar com fluxos de dados em megabytes e crescer para gigabytes ou terabytes. O recurso de autoinflação é uma das muitas opções disponíveis para dimensionar o número de unidades de taxa de transferência para atender às suas necessidades de uso.

Os aplicativos de consumo para cada função têm uma visão separada do fluxo de eventos. Eles leem o fluxo de forma independente, ao seu próprio ritmo e com as suas próprias compensações.

Para o nosso cenário, você cria uma função do Azure consumindo como exemplo. Para criar a função, seguindo as melhores práticas, ela deve ser independente, com sua própria conta de armazenamento e ligações para acoplamento flexível e escalabilidade.

az storage account create \
    --resource-group $RESOURCE_GROUP \
    --name $STORAGE_ACCOUNT"c" \
    --sku Standard_LRS
az functionapp create \
    --resource-group $RESOURCE_GROUP \
    --name $FUNCTION_APP"-c"\
    --storage-account $STORAGE_ACCOUNT"c" \
    --consumption-plan-location $LOCATION \
    --runtime java \
    --functions-version 4

Recuperar as cadeias de conexão

A função de consumidor precisa estar ciente de sua conta de armazenamento e do hub de eventos. Ele também precisa estar ciente do banco de dados no qual grava os eventos processados.

AZURE_WEB_JOBS_STORAGE=$( \
    az storage account show-connection-string \
        --resource-group $RESOURCE_GROUP \
        --name $STORAGE_ACCOUNT"c" \
        --query connectionString \
        --output tsv)
echo $AZURE_WEB_JOBS_STORAGE
COSMOS_DB_CONNECTION_STRING=$( \
    az cosmosdb keys list \
        --resource-group $RESOURCE_GROUP \
        --name $COSMOS_DB_ACCOUNT \
        --type connection-strings \
        --query 'connectionStrings[0].connectionString' \
        --output tsv)
echo $COSMOS_DB_CONNECTION_STRING

Você pode usar o comando echo $EVENT_HUB_CONNECTION_STRING para verificar se a variável ainda está definida corretamente. Caso contrário, execute novamente o seguinte comando:

EVENT_HUB_CONNECTION_STRING=$( \
    az eventhubs eventhub authorization-rule keys list \
        --resource-group $RESOURCE_GROUP \
        --name $EVENT_HUB_AUTHORIZATION_RULE \
        --eventhub-name $EVENT_HUB_NAME \
        --namespace-name $EVENT_HUB_NAMESPACE \
        --query primaryConnectionString \
        --output tsv)
echo $EVENT_HUB_CONNECTION_STRING

Essas cadeias de conexão precisam ser armazenadas nas configurações do aplicativo para sua conta do Azure Functions.

az functionapp config appsettings set \
    --resource-group $RESOURCE_GROUP \
    --name $FUNCTION_APP"-c" \
    --settings \
        AzureWebJobsStorage=$AZURE_WEB_JOBS_STORAGE \
        EventHubConnectionString=$EVENT_HUB_CONNECTION_STRING \
        CosmosDBConnectionString=$COSMOS_DB_CONNECTION_STRING

Nota

Para ambientes de produção, você pode usar uma instância do Azure Key Vault para armazenar e gerenciar as cadeias de conexão.

Criar o aplicativo de funções

Antes de criar a próxima função, verifique se você está na pasta correta.

cd ..
mvn archetype:generate --batch-mode \
    -DarchetypeGroupId=com.microsoft.azure \
    -DarchetypeArtifactId=azure-functions-archetype \
    -DappName=$FUNCTION_APP"-c" \
    -DresourceGroup=$RESOURCE_GROUP \
    -DappRegion=$LOCATION \
    -DappServicePlanName=$LOCATION"plan" \
    -DgroupId=com.learn \
    -DartifactId=telemetry-functions-consumer

O comando cria um aplicativo como no último exercício. Excluir os arquivos de teste, atualizar o com o comando e, em seguida, substituir o local.settings.file fetch-app-settings arquivo existente Function.java .

cd telemetry-functions-consumer
rm -r src/test

Atualize as configurações locais para execução e depuração locais.

func azure functionapp fetch-app-settings $FUNCTION_APP"-c"

Em seguida, abra o arquivo e substitua o Function.java conteúdo pelo seguinte código:

package com.learn;

import com.learn.TelemetryItem.status;
import com.microsoft.azure.functions.annotation.FunctionName;
import com.microsoft.azure.functions.ExecutionContext;
import com.microsoft.azure.functions.OutputBinding;
import com.microsoft.azure.functions.annotation.Cardinality;
import com.microsoft.azure.functions.annotation.CosmosDBOutput;
import com.microsoft.azure.functions.annotation.EventHubTrigger;

public class Function {

    @FunctionName("processSensorData")
    public void processSensorData(
        @EventHubTrigger(
            name = "msg",
            eventHubName = "", // blank because the value is included in the connection string
            cardinality = Cardinality.ONE,
            connection = "EventHubConnectionString")
            TelemetryItem item,
        @CosmosDBOutput(
            name = "databaseOutput",
            databaseName = "TelemetryDb",
            collectionName = "TelemetryInfo",
            connectionStringSetting = "CosmosDBConnectionString")
            OutputBinding<TelemetryItem> document,
        final ExecutionContext context) {
    
        context.getLogger().info("Event hub message received: " + item.toString());
    
        if (item.getPressure() > 30) {
            item.setNormalPressure(false);
        } else {
            item.setNormalPressure(true);
        }
    
        if (item.getTemperature() < 40) {
            item.setTemperatureStatus(status.COOL);
        } else if (item.getTemperature() > 90) {
            item.setTemperatureStatus(status.HOT);
        } else {
            item.setTemperatureStatus(status.WARM);
        }
    
        document.setValue(item);
    }
}

Crie outro novo arquivo chamado TelemetryItem.java no mesmo local que Function.java e adicione o seguinte código:

package com.learn;

public class TelemetryItem {

    private String id;
    private double temperature;
    private double pressure;
    private boolean isNormalPressure;
    private status temperatureStatus;
    static enum status {
        COOL,
        WARM,
        HOT
    }

    public TelemetryItem(double temperature, double pressure) {
        this.temperature = temperature;
        this.pressure = pressure;
    }

    public String getId() {
        return id;
    }

    public double getTemperature() {
        return temperature;
    }

    public double getPressure() {
        return pressure;
    }

    @Override
    public String toString() {
        return "TelemetryItem={id=" + id + ",temperature="
            + temperature + ",pressure=" + pressure + "}";
    }

    public boolean isNormalPressure() {
        return isNormalPressure;
    }

    public void setNormalPressure(boolean isNormal) {
        this.isNormalPressure = isNormal;
    }

    public status getTemperatureStatus() {
        return temperatureStatus;
    }

    public void setTemperatureStatus(status temperatureStatus) {
        this.temperatureStatus = temperatureStatus;
    }
}

Quando o hub de eventos recebe a mensagem, ele gera um evento. A processSensorData função é executada quando recebe o evento. Em seguida, ele processa os dados do evento e usa uma associação de saída do Azure Cosmos DB para enviar os resultados para o banco de dados. Nós usamos a TelemetryItem.java classe novamente. Os TelemetryItem objetos podem ser vistos como o contrato orientado pelo consumidor entre os participantes deste sistema orientado por eventos.

Executar localmente

Com o Azure Functions, pode receber eventos de todo o mundo. Sim, você pode até receber eventos localmente em sua máquina de desenvolvimento!

mvn clean package
mvn azure-functions:run

Após as mensagens de compilação e inicialização, você verá os eventos de entrada quando a função é executada:

[2021-01-19T16:45:24.709Z] Executing 'Functions.processSensorData' (Reason='(null)', Id=87354afa-abf4-4963-bd44-0c1421048240)
[2021-01-19T16:45:24.712Z] Event hub message received: TelemetryItem={id=null,temperature=21.653044570769897,pressure=36.061288095436126}
[2021-01-19T16:45:24.712Z] Function "processSensorData" (Id: 87354afa-abf4-4963-bd44-0c1421048240) invoked by Java Worker

No portal do Azure, vá para sua conta do Azure Cosmos DB. Selecione Data Explorer, TelemetryInformation e, em seguida, selecione Itens para exibir seus dados quando eles chegarem.

Screenshot that shows TelemetryInfo in Azure Cosmos DB Data Explorer.

Implantar no Azure

Agora, vamos mudar toda a carga de trabalho na nuvem. Para implantar as funções no Azure Functions, use o comando mvn azure-functions:deployMaven . Certifique-se de que ainda está no repositório correto, funções de telemetria.

mvn azure-functions:deploy

Maravilhoso! Implantamos todo o cenário de telemetria enviando os dados para um hub de eventos e consumindo os dados com uma função independente diferente. A função processa os dados e, em seguida, armazena o resultado em um banco de dados criado com o Azure Cosmos DB. Como podemos ter certeza de que nosso aplicativo está atendendo aos nossos requisitos predefinidos? Através da monitorização.