Övning – Bearbeta händelserna och lagra data i Azure Cosmos DB
En andra funktion kan lyssna på händelser för det specifika namnområdet i Azure-händelsehubben och bearbeta och lagra dem i en databas som skapats med Azure Cosmos DB.
Skapa en databas med Azure Cosmos DB
Använd kommandot för az cosmosdb create
att skapa databasen. Kommandot använder ett Azure Cosmos DB-konto, en databas och en SQL-container.
az cosmosdb create \
--resource-group $RESOURCE_GROUP \
--name $COSMOS_DB_ACCOUNT
az cosmosdb sql database create \
--resource-group $RESOURCE_GROUP \
--account-name $COSMOS_DB_ACCOUNT \
--name TelemetryDb
az cosmosdb sql container create \
--resource-group $RESOURCE_GROUP \
--account-name $COSMOS_DB_ACCOUNT \
--database-name TelemetryDb \
--name TelemetryInfo \
--partition-key-path '/temperatureStatus'
För vårt scenario är temperaturen intressant. Därför definierar temperatureStatus
vi som partitionsnyckel.
Skapa, konfigurera och distribuera en annan Azure-funktion
Med händelsehubbar kan du börja med dataströmmar i megabyte och växa till gigabyte eller terabyte. Funktionen autoinflate är ett av de många tillgängliga alternativen för att skala antalet dataflödesenheter för att uppfylla dina användningsbehov.
De program som används för varje funktion har en separat vy över händelseströmmen. De läser strömmen oberoende av varandra i egen takt och med sina egna offset.
I vårt scenario skapar du en azure-funktion som ett exempel. För att skapa funktionen, enligt bästa praxis, bör den vara oberoende, med sitt eget lagringskonto och bindningar för lös koppling och skalbarhet.
az storage account create \
--resource-group $RESOURCE_GROUP \
--name $STORAGE_ACCOUNT"c" \
--sku Standard_LRS
az functionapp create \
--resource-group $RESOURCE_GROUP \
--name $FUNCTION_APP"-c"\
--storage-account $STORAGE_ACCOUNT"c" \
--consumption-plan-location $LOCATION \
--runtime java \
--functions-version 4
Hämta anslutningssträng
Konsumentfunktionen måste vara medveten om sitt lagringskonto och händelsehubben. Den måste också vara medveten om databasen som den skriver de bearbetade händelserna till.
AZURE_WEB_JOBS_STORAGE=$( \
az storage account show-connection-string \
--resource-group $RESOURCE_GROUP \
--name $STORAGE_ACCOUNT"c" \
--query connectionString \
--output tsv)
echo $AZURE_WEB_JOBS_STORAGE
COSMOS_DB_CONNECTION_STRING=$( \
az cosmosdb keys list \
--resource-group $RESOURCE_GROUP \
--name $COSMOS_DB_ACCOUNT \
--type connection-strings \
--query 'connectionStrings[0].connectionString' \
--output tsv)
echo $COSMOS_DB_CONNECTION_STRING
Du kan använda kommandot echo $EVENT_HUB_CONNECTION_STRING
för att kontrollera om variabeln fortfarande är korrekt inställd. Annars kör du följande kommando igen:
EVENT_HUB_CONNECTION_STRING=$( \
az eventhubs eventhub authorization-rule keys list \
--resource-group $RESOURCE_GROUP \
--name $EVENT_HUB_AUTHORIZATION_RULE \
--eventhub-name $EVENT_HUB_NAME \
--namespace-name $EVENT_HUB_NAMESPACE \
--query primaryConnectionString \
--output tsv)
echo $EVENT_HUB_CONNECTION_STRING
Dessa anslutningssträng måste lagras i programinställningarna för ditt Azure Functions-konto.
az functionapp config appsettings set \
--resource-group $RESOURCE_GROUP \
--name $FUNCTION_APP"-c" \
--settings \
AzureWebJobsStorage=$AZURE_WEB_JOBS_STORAGE \
EventHubConnectionString=$EVENT_HUB_CONNECTION_STRING \
CosmosDBConnectionString=$COSMOS_DB_CONNECTION_STRING
Kommentar
För produktionsmiljöer kan du använda en instans av Azure Key Vault för att lagra och hantera anslutningssträng.
Skapa funktionsprogrammet
Kontrollera att du är i rätt mapp innan du skapar nästa funktion.
cd ..
mvn archetype:generate --batch-mode \
-DarchetypeGroupId=com.microsoft.azure \
-DarchetypeArtifactId=azure-functions-archetype \
-DappName=$FUNCTION_APP"-c" \
-DresourceGroup=$RESOURCE_GROUP \
-DappRegion=$LOCATION \
-DappServicePlanName=$LOCATION"plan" \
-DgroupId=com.learn \
-DartifactId=telemetry-functions-consumer
Kommandot skapar ett program som i den senaste övningen. Du tar bort testfilerna, uppdaterar local.settings.file
med fetch-app-settings
kommandot och ersätter sedan den befintliga Function.java
filen.
cd telemetry-functions-consumer
rm -r src/test
Uppdatera de lokala inställningarna för lokal körning och felsökning.
func azure functionapp fetch-app-settings $FUNCTION_APP"-c"
Öppna Function.java
sedan filen och ersätt innehållet med följande kod:
package com.learn;
import com.learn.TelemetryItem.status;
import com.microsoft.azure.functions.annotation.FunctionName;
import com.microsoft.azure.functions.ExecutionContext;
import com.microsoft.azure.functions.OutputBinding;
import com.microsoft.azure.functions.annotation.Cardinality;
import com.microsoft.azure.functions.annotation.CosmosDBOutput;
import com.microsoft.azure.functions.annotation.EventHubTrigger;
public class Function {
@FunctionName("processSensorData")
public void processSensorData(
@EventHubTrigger(
name = "msg",
eventHubName = "", // blank because the value is included in the connection string
cardinality = Cardinality.ONE,
connection = "EventHubConnectionString")
TelemetryItem item,
@CosmosDBOutput(
name = "databaseOutput",
databaseName = "TelemetryDb",
collectionName = "TelemetryInfo",
connectionStringSetting = "CosmosDBConnectionString")
OutputBinding<TelemetryItem> document,
final ExecutionContext context) {
context.getLogger().info("Event hub message received: " + item.toString());
if (item.getPressure() > 30) {
item.setNormalPressure(false);
} else {
item.setNormalPressure(true);
}
if (item.getTemperature() < 40) {
item.setTemperatureStatus(status.COOL);
} else if (item.getTemperature() > 90) {
item.setTemperatureStatus(status.HOT);
} else {
item.setTemperatureStatus(status.WARM);
}
document.setValue(item);
}
}
Skapa en annan ny fil med namnet TelemetryItem.java på samma plats som Function.java och lägg till följande kod:
package com.learn;
public class TelemetryItem {
private String id;
private double temperature;
private double pressure;
private boolean isNormalPressure;
private status temperatureStatus;
static enum status {
COOL,
WARM,
HOT
}
public TelemetryItem(double temperature, double pressure) {
this.temperature = temperature;
this.pressure = pressure;
}
public String getId() {
return id;
}
public double getTemperature() {
return temperature;
}
public double getPressure() {
return pressure;
}
@Override
public String toString() {
return "TelemetryItem={id=" + id + ",temperature="
+ temperature + ",pressure=" + pressure + "}";
}
public boolean isNormalPressure() {
return isNormalPressure;
}
public void setNormalPressure(boolean isNormal) {
this.isNormalPressure = isNormal;
}
public status getTemperatureStatus() {
return temperatureStatus;
}
public void setTemperatureStatus(status temperatureStatus) {
this.temperatureStatus = temperatureStatus;
}
}
När händelsehubben tar emot meddelandet genereras en händelse. Funktionen processSensorData
körs när den tar emot händelsen. Sedan bearbetar den händelsedata och använder en utdatabindning av Azure Cosmos DB för att skicka resultaten till databasen. Vi använder TelemetryItem.java
klassen igen. Objekten TelemetryItem
kan ses som det konsumentdrivna kontraktet mellan deltagarna i det här händelsedrivna systemet.
Kör lokalt
Med Azure Functions kan du ta emot händelser från hela världen. Ja, du kan även ta emot händelser lokalt på utvecklingsdatorn!
mvn clean package
mvn azure-functions:run
Efter bygg- och startmeddelandena visas inkommande händelser när funktionen körs:
[2021-01-19T16:45:24.709Z] Executing 'Functions.processSensorData' (Reason='(null)', Id=87354afa-abf4-4963-bd44-0c1421048240)
[2021-01-19T16:45:24.712Z] Event hub message received: TelemetryItem={id=null,temperature=21.653044570769897,pressure=36.061288095436126}
[2021-01-19T16:45:24.712Z] Function "processSensorData" (Id: 87354afa-abf4-4963-bd44-0c1421048240) invoked by Java Worker
I Azure-portalen går du till ditt Azure Cosmos DB-konto. Välj Datautforskaren, välj TelemetryInfo och välj sedan Objekt för att visa dina data när de kommer.
Distribuera i Azure
Nu ska vi flytta hela arbetsbelastningen i molnet. Om du vill distribuera funktionerna till Azure Functions använder du Maven-kommandot mvn azure-functions:deploy
. Kontrollera att du fortfarande är på rätt lagringsplats, telemetrifunktioner.
mvn azure-functions:deploy
Underbart! Vi distribuerade hela telemetriscenariot genom att skicka data mot en händelsehubb och använda data med en annan oberoende funktion. Funktionen bearbetar data och lagrar sedan resultatet i en databas som skapats med Azure Cosmos DB. Hur kan vi se till att vårt program uppfyller våra fördefinierade krav? Genom att använda övervakning.