Упражнение. Обработка событий и хранение данных в Azure Cosmos DB
Вторая функция может прослушивать события определенного пространства имен в концентраторе событий Azure и обрабатывать их и хранить в базе данных, созданной с помощью Azure Cosmos DB.
Создание базы данных с помощью Azure Cosmos DB
Чтобы создать базу данных, используйте команду az cosmosdb create
. Команда использует учетную запись Azure Cosmos DB, базу данных и контейнер SQL.
az cosmosdb create \
--resource-group $RESOURCE_GROUP \
--name $COSMOS_DB_ACCOUNT
az cosmosdb sql database create \
--resource-group $RESOURCE_GROUP \
--account-name $COSMOS_DB_ACCOUNT \
--name TelemetryDb
az cosmosdb sql container create \
--resource-group $RESOURCE_GROUP \
--account-name $COSMOS_DB_ACCOUNT \
--database-name TelemetryDb \
--name TelemetryInfo \
--partition-key-path '/temperatureStatus'
Для нашего сценария температура интересна. Поэтому мы определяем temperatureStatus
в качестве ключа раздела.
Создание, настройка и развертывание другой функции Azure
С помощью центров событий можно начать с потоков данных в мегабайтах и увеличиться до гигабайтов или терабайтов. Функция автоинфляции — это один из многих вариантов, доступных для масштабирования количества единиц пропускной способности в соответствии с потребностями использования.
Приложения-потребители для каждой функции имеют отдельное представление о потоке событий. Они читают поток независимо, по своему темпу и со своими собственными смещениями.
В нашем сценарии вы создадите одну из используемых функций Azure в качестве примера. Чтобы создать функцию, следуя рекомендациям, она должна быть независимой, с собственной учетной записью хранения и привязками для свободного связывания и масштабируемости.
az storage account create \
--resource-group $RESOURCE_GROUP \
--name $STORAGE_ACCOUNT"c" \
--sku Standard_LRS
az functionapp create \
--resource-group $RESOURCE_GROUP \
--name $FUNCTION_APP"-c"\
--storage-account $STORAGE_ACCOUNT"c" \
--consumption-plan-location $LOCATION \
--runtime java \
--functions-version 4
Получение строк подключения
Функция-получатель должна знать о своей учетной записи хранения и концентраторе событий. Кроме того, необходимо учитывать базу данных, в которую она записывает обработанные события.
AZURE_WEB_JOBS_STORAGE=$( \
az storage account show-connection-string \
--resource-group $RESOURCE_GROUP \
--name $STORAGE_ACCOUNT"c" \
--query connectionString \
--output tsv)
echo $AZURE_WEB_JOBS_STORAGE
COSMOS_DB_CONNECTION_STRING=$( \
az cosmosdb keys list \
--resource-group $RESOURCE_GROUP \
--name $COSMOS_DB_ACCOUNT \
--type connection-strings \
--query 'connectionStrings[0].connectionString' \
--output tsv)
echo $COSMOS_DB_CONNECTION_STRING
Для проверки правильности установки переменной можно использовать команду echo $EVENT_HUB_CONNECTION_STRING
. В противном случае выполните следующую команду:
EVENT_HUB_CONNECTION_STRING=$( \
az eventhubs eventhub authorization-rule keys list \
--resource-group $RESOURCE_GROUP \
--name $EVENT_HUB_AUTHORIZATION_RULE \
--eventhub-name $EVENT_HUB_NAME \
--namespace-name $EVENT_HUB_NAMESPACE \
--query primaryConnectionString \
--output tsv)
echo $EVENT_HUB_CONNECTION_STRING
Эти строки подключения должны храниться в параметрах приложения для учетной записи Функций Azure.
az functionapp config appsettings set \
--resource-group $RESOURCE_GROUP \
--name $FUNCTION_APP"-c" \
--settings \
AzureWebJobsStorage=$AZURE_WEB_JOBS_STORAGE \
EventHubConnectionString=$EVENT_HUB_CONNECTION_STRING \
CosmosDBConnectionString=$COSMOS_DB_CONNECTION_STRING
Заметка
В рабочих средах можно использовать экземпляр Azure Key Vault для хранения строк подключения и управления ими.
Создание приложения функций
Перед созданием следующей функции убедитесь, что вы находитесь в правильной папке.
cd ..
mvn archetype:generate --batch-mode \
-DarchetypeGroupId=com.microsoft.azure \
-DarchetypeArtifactId=azure-functions-archetype \
-DappName=$FUNCTION_APP"-c" \
-DresourceGroup=$RESOURCE_GROUP \
-DappRegion=$LOCATION \
-DappServicePlanName=$LOCATION"plan" \
-DgroupId=com.learn \
-DartifactId=telemetry-functions-consumer
Команда создает приложение, как в последнем упражнении. Удалите тестовые файлы, обновите local.settings.file
с помощью команды fetch-app-settings
, а затем замените существующий файл Function.java
.
cd telemetry-functions-consumer
rm -r src/test
Обновите локальные параметры для локального выполнения и отладки.
func azure functionapp fetch-app-settings $FUNCTION_APP"-c"
Затем откройте файл Function.java
и замените содержимое следующим кодом:
package com.learn;
import com.learn.TelemetryItem.status;
import com.microsoft.azure.functions.annotation.FunctionName;
import com.microsoft.azure.functions.ExecutionContext;
import com.microsoft.azure.functions.OutputBinding;
import com.microsoft.azure.functions.annotation.Cardinality;
import com.microsoft.azure.functions.annotation.CosmosDBOutput;
import com.microsoft.azure.functions.annotation.EventHubTrigger;
public class Function {
@FunctionName("processSensorData")
public void processSensorData(
@EventHubTrigger(
name = "msg",
eventHubName = "", // blank because the value is included in the connection string
cardinality = Cardinality.ONE,
connection = "EventHubConnectionString")
TelemetryItem item,
@CosmosDBOutput(
name = "databaseOutput",
databaseName = "TelemetryDb",
collectionName = "TelemetryInfo",
connectionStringSetting = "CosmosDBConnectionString")
OutputBinding<TelemetryItem> document,
final ExecutionContext context) {
context.getLogger().info("Event hub message received: " + item.toString());
if (item.getPressure() > 30) {
item.setNormalPressure(false);
} else {
item.setNormalPressure(true);
}
if (item.getTemperature() < 40) {
item.setTemperatureStatus(status.COOL);
} else if (item.getTemperature() > 90) {
item.setTemperatureStatus(status.HOT);
} else {
item.setTemperatureStatus(status.WARM);
}
document.setValue(item);
}
}
Создайте другой новый файл с именем TelemetryItem.java в том же расположении, что и Function.java, и добавьте следующий код:
package com.learn;
public class TelemetryItem {
private String id;
private double temperature;
private double pressure;
private boolean isNormalPressure;
private status temperatureStatus;
static enum status {
COOL,
WARM,
HOT
}
public TelemetryItem(double temperature, double pressure) {
this.temperature = temperature;
this.pressure = pressure;
}
public String getId() {
return id;
}
public double getTemperature() {
return temperature;
}
public double getPressure() {
return pressure;
}
@Override
public String toString() {
return "TelemetryItem={id=" + id + ",temperature="
+ temperature + ",pressure=" + pressure + "}";
}
public boolean isNormalPressure() {
return isNormalPressure;
}
public void setNormalPressure(boolean isNormal) {
this.isNormalPressure = isNormal;
}
public status getTemperatureStatus() {
return temperatureStatus;
}
public void setTemperatureStatus(status temperatureStatus) {
this.temperatureStatus = temperatureStatus;
}
}
Когда концентратор событий получает сообщение, он создает событие. Функция processSensorData
выполняется при получении события. Затем он обрабатывает данные события и использует выходную привязку Azure Cosmos DB для отправки результатов в базу данных. Мы снова используем класс TelemetryItem.java
. Объекты TelemetryItem
можно рассматривать как контракт, формируемый потребителями, между участниками этой системы, работающей на основе событий.
Запуск локально
С помощью Функций Azure вы можете получать события со всего мира. Да, вы даже можете получать события локально на вашей машине разработки!
mvn clean package
mvn azure-functions:run
После сообщений сборки и запуска вы увидите входящие события при выполнении функции:
[2021-01-19T16:45:24.709Z] Executing 'Functions.processSensorData' (Reason='(null)', Id=87354afa-abf4-4963-bd44-0c1421048240)
[2021-01-19T16:45:24.712Z] Event hub message received: TelemetryItem={id=null,temperature=21.653044570769897,pressure=36.061288095436126}
[2021-01-19T16:45:24.712Z] Function "processSensorData" (Id: 87354afa-abf4-4963-bd44-0c1421048240) invoked by Java Worker
На портале Azure перейдите к учетной записи Azure Cosmos DB. Выберите Data Explorer, выберите TelemetryInfo, а затем выберите Элементы, чтобы просмотреть ваши данные по их поступлении.
Развертывание в Azure
Теперь давайте переместим всю рабочую нагрузку в облаке. Чтобы развернуть функции в Azure Functions, используйте команду Maven mvn azure-functions:deploy
. Убедитесь, что вы все еще находитесь в правильном репозитории, модуль телеметрических функций.
mvn azure-functions:deploy
Замечательный! Мы развернули весь сценарий телеметрии, отправив данные в концентратор событий и обработав их с помощью независимой функции. Функция обрабатывает данные, а затем сохраняет результат в базе данных, созданной с помощью Azure Cosmos DB. Как убедиться, что наше приложение соответствует нашим предопределенным требованиям? С помощью мониторинга.