Ćwiczenie — przetwarzanie zdarzeń i przechowywanie danych w usłudze Azure Cosmos DB

Ukończone

Druga funkcja może nasłuchiwać zdarzeń określonej przestrzeni nazw w centrum zdarzeń platformy Azure i przetwarzać je w bazie danych utworzonej za pomocą usługi Azure Cosmos DB.

Tworzenie bazy danych za pomocą usługi Azure Cosmos DB

Aby utworzyć bazę danych, użyj polecenia az cosmosdb create. Polecenie używa konta usługi Azure Cosmos DB, bazy danych i kontenera SQL.

az cosmosdb create \
    --resource-group $RESOURCE_GROUP \
    --name $COSMOS_DB_ACCOUNT
az cosmosdb sql database create \
    --resource-group $RESOURCE_GROUP \
    --account-name $COSMOS_DB_ACCOUNT \
    --name TelemetryDb
az cosmosdb sql container create \
    --resource-group $RESOURCE_GROUP \
    --account-name $COSMOS_DB_ACCOUNT \
    --database-name TelemetryDb \
    --name TelemetryInfo \
    --partition-key-path '/temperatureStatus'

W naszym scenariuszu temperatura jest interesująca. Dlatego definiujemy temperatureStatus jako klucz partycji.

Kompilowanie, konfigurowanie i wdrażanie innej funkcji platformy Azure

Za pomocą centrów zdarzeń można zacząć od strumieni danych w megabajtach i rozwijać się do gigabajtów lub terabajtów. Funkcja automatycznego rozszerzania jest jedną z wielu opcji dostępnych do skalowania liczby jednostek przepływności w celu spełnienia wymagań dotyczących użycia.

Aplikacje zużywające dla każdej funkcji mają oddzielny widok strumienia zdarzeń. Odczytują strumień niezależnie we własnym tempie i z własnymi przesunięciami.

W naszym scenariuszu utworzysz funkcję platformy Azure, która jako przykład używa zasobów. Aby utworzyć funkcję, zgodnie z najlepszymi praktykami, powinna być niezależna, z własnym kontem magazynu i powiązaniami, co zapewnia luźne sprzężenie i skalowalność.

az storage account create \
    --resource-group $RESOURCE_GROUP \
    --name $STORAGE_ACCOUNT"c" \
    --sku Standard_LRS
az functionapp create \
    --resource-group $RESOURCE_GROUP \
    --name $FUNCTION_APP"-c"\
    --storage-account $STORAGE_ACCOUNT"c" \
    --consumption-plan-location $LOCATION \
    --runtime java \
    --functions-version 4

Pobieranie parametrów połączenia

Funkcja konsumenta musi mieć świadomość konta przechowywania i koncentratora zdarzeń. Trzeba również być świadomym bazy danych, do której zapisuje przetworzone zdarzenia.

AZURE_WEB_JOBS_STORAGE=$( \
    az storage account show-connection-string \
        --resource-group $RESOURCE_GROUP \
        --name $STORAGE_ACCOUNT"c" \
        --query connectionString \
        --output tsv)
echo $AZURE_WEB_JOBS_STORAGE
COSMOS_DB_CONNECTION_STRING=$( \
    az cosmosdb keys list \
        --resource-group $RESOURCE_GROUP \
        --name $COSMOS_DB_ACCOUNT \
        --type connection-strings \
        --query 'connectionStrings[0].connectionString' \
        --output tsv)
echo $COSMOS_DB_CONNECTION_STRING

Możesz użyć polecenia echo $EVENT_HUB_CONNECTION_STRING, aby sprawdzić, czy zmienna jest nadal ustawiona poprawnie. W przeciwnym razie uruchom ponownie następujące polecenie:

EVENT_HUB_CONNECTION_STRING=$( \
    az eventhubs eventhub authorization-rule keys list \
        --resource-group $RESOURCE_GROUP \
        --name $EVENT_HUB_AUTHORIZATION_RULE \
        --eventhub-name $EVENT_HUB_NAME \
        --namespace-name $EVENT_HUB_NAMESPACE \
        --query primaryConnectionString \
        --output tsv)
echo $EVENT_HUB_CONNECTION_STRING

Te parametry połączenia muszą być przechowywane w ustawieniach aplikacji dla konta usługi Azure Functions.

az functionapp config appsettings set \
    --resource-group $RESOURCE_GROUP \
    --name $FUNCTION_APP"-c" \
    --settings \
        AzureWebJobsStorage=$AZURE_WEB_JOBS_STORAGE \
        EventHubConnectionString=$EVENT_HUB_CONNECTION_STRING \
        CosmosDBConnectionString=$COSMOS_DB_CONNECTION_STRING

Notatka

W środowiskach produkcyjnych można użyć wystąpienia usługi Azure Key Vault do przechowywania parametrów połączenia i zarządzania nimi.

Utwórz aplikację funkcji

Przed utworzeniem następnej funkcji upewnij się, że znajdujesz się we właściwym folderze.

cd ..
mvn archetype:generate --batch-mode \
    -DarchetypeGroupId=com.microsoft.azure \
    -DarchetypeArtifactId=azure-functions-archetype \
    -DappName=$FUNCTION_APP"-c" \
    -DresourceGroup=$RESOURCE_GROUP \
    -DappRegion=$LOCATION \
    -DappServicePlanName=$LOCATION"plan" \
    -DgroupId=com.learn \
    -DartifactId=telemetry-functions-consumer

Polecenie tworzy aplikację podobną do w ostatnim ćwiczeniu. Pliki testowe należy usunąć, zaktualizować local.settings.file za pomocą polecenia fetch-app-settings, a następnie zastąpić istniejący plik Function.java.

cd telemetry-functions-consumer
rm -r src/test

Zaktualizuj ustawienia lokalne na potrzeby lokalnego wykonywania i debugowania.

func azure functionapp fetch-app-settings $FUNCTION_APP"-c"

Następnie otwórz plik Function.java i zastąp zawartość następującym kodem:

package com.learn;

import com.learn.TelemetryItem.status;
import com.microsoft.azure.functions.annotation.FunctionName;
import com.microsoft.azure.functions.ExecutionContext;
import com.microsoft.azure.functions.OutputBinding;
import com.microsoft.azure.functions.annotation.Cardinality;
import com.microsoft.azure.functions.annotation.CosmosDBOutput;
import com.microsoft.azure.functions.annotation.EventHubTrigger;

public class Function {

    @FunctionName("processSensorData")
    public void processSensorData(
        @EventHubTrigger(
            name = "msg",
            eventHubName = "", // blank because the value is included in the connection string
            cardinality = Cardinality.ONE,
            connection = "EventHubConnectionString")
            TelemetryItem item,
        @CosmosDBOutput(
            name = "databaseOutput",
            databaseName = "TelemetryDb",
            collectionName = "TelemetryInfo",
            connectionStringSetting = "CosmosDBConnectionString")
            OutputBinding<TelemetryItem> document,
        final ExecutionContext context) {
    
        context.getLogger().info("Event hub message received: " + item.toString());
    
        if (item.getPressure() > 30) {
            item.setNormalPressure(false);
        } else {
            item.setNormalPressure(true);
        }
    
        if (item.getTemperature() < 40) {
            item.setTemperatureStatus(status.COOL);
        } else if (item.getTemperature() > 90) {
            item.setTemperatureStatus(status.HOT);
        } else {
            item.setTemperatureStatus(status.WARM);
        }
    
        document.setValue(item);
    }
}

Utwórz inny nowy plik o nazwie TelemetryItem.java w tej samej lokalizacji co Function.java i dodaj następujący kod:

package com.learn;

public class TelemetryItem {

    private String id;
    private double temperature;
    private double pressure;
    private boolean isNormalPressure;
    private status temperatureStatus;
    static enum status {
        COOL,
        WARM,
        HOT
    }

    public TelemetryItem(double temperature, double pressure) {
        this.temperature = temperature;
        this.pressure = pressure;
    }

    public String getId() {
        return id;
    }

    public double getTemperature() {
        return temperature;
    }

    public double getPressure() {
        return pressure;
    }

    @Override
    public String toString() {
        return "TelemetryItem={id=" + id + ",temperature="
            + temperature + ",pressure=" + pressure + "}";
    }

    public boolean isNormalPressure() {
        return isNormalPressure;
    }

    public void setNormalPressure(boolean isNormal) {
        this.isNormalPressure = isNormal;
    }

    public status getTemperatureStatus() {
        return temperatureStatus;
    }

    public void setTemperatureStatus(status temperatureStatus) {
        this.temperatureStatus = temperatureStatus;
    }
}

Gdy centrum zdarzeń odbiera komunikat, generuje zdarzenie. Funkcja processSensorData jest uruchamiana po odebraniu zdarzenia. Następnie przetwarza dane zdarzenia i używa powiązania wyjściowego usługi Azure Cosmos DB do wysyłania wyników do bazy danych. Ponownie użyjemy klasy TelemetryItem.java. Obiekty TelemetryItem można postrzegać jako kontrakt napędzany przez konsumentów między uczestnikami tego sterowanego zdarzeniami systemu.

Uruchamianie lokalne

Usługa Azure Functions umożliwia odbieranie zdarzeń z całego świata. Tak, możesz nawet odbierać zdarzenia lokalnie na maszynie deweloperskiej.

mvn clean package
mvn azure-functions:run

Po komunikatach kompilacji i uruchamiania po uruchomieniu funkcji są wyświetlane zdarzenia przychodzące:

[2021-01-19T16:45:24.709Z] Executing 'Functions.processSensorData' (Reason='(null)', Id=87354afa-abf4-4963-bd44-0c1421048240)
[2021-01-19T16:45:24.712Z] Event hub message received: TelemetryItem={id=null,temperature=21.653044570769897,pressure=36.061288095436126}
[2021-01-19T16:45:24.712Z] Function "processSensorData" (Id: 87354afa-abf4-4963-bd44-0c1421048240) invoked by Java Worker

W witrynie Azure Portal przejdź do konta usługi Azure Cosmos DB. Wybierz Eksplorator danych, TelemetryInfo, a następnie Items, aby wyświetlić dane po ich nadejściu.

Zrzut ekranu przedstawiający informacje telemetryczne w Eksploratorze danych usługi Azure Cosmos DB.

Wdrażanie na platformie Azure

Teraz przenieśmy całe obciążenie w chmurze. Aby wdrożyć funkcje w usłudze Azure Functions, użyj polecenia Maven mvn azure-functions:deploy. Upewnij się, że nadal znajdujesz się w poprawnym repozytorium, funkcje telemetrii.

mvn azure-functions:deploy

Wspaniały! Wdrożyliśmy cały scenariusz telemetrii, kierując dane do centrum zdarzeń i przetwarzając je za pomocą innej niezależnej funkcji. Funkcja przetwarza dane, a następnie przechowuje wynik w bazie danych utworzonej za pomocą usługi Azure Cosmos DB. Jak upewnić się, że nasza aplikacja spełnia nasze wstępnie zdefiniowane wymagania? Za pomocą monitorowania.