На схеме эталонной архитектуры представлен сквозной конвейер обработки потоков данных. Четыре этапа этого конвейера — прием, обработка, хранение и анализ и отчет. В этой эталонной архитектуре конвейер принимает данные из двух источников, объединяет связанные записи из каждого потока, дополняет результат и вычисляет среднее значение в реальном времени. Затем результаты сохраняются для дальнейшего анализа.
Эталонная реализация этой архитектуры доступна на сайте GitHub.
Архитектура
Скачайте файл Visio данной архитектуры.
Рабочий процесс
Следующий поток данных соответствует предыдущей схеме:
В этой архитектуре существует два источника данных, которые создают потоки данных в реальном времени. Первый поток содержит сведения о поездке, а второй поток содержит сведения о тарифах. Эталонная архитектура включает имитированный генератор данных, который считывает из набора статических файлов и отправляет данные в Центры событий Azure. Источники данных в реальном приложении — это устройства, установленные в такси такси.
Центры событий — это служба приема событий. В этой архитектуре используется два экземпляра службы — по одной на каждый источник данных. Каждый источник данных отправляет поток данных в соответствующую службу.
Azure Databricks — это платформа аналитики на основе Apache Spark, оптимизированная для платформы облачных служб Microsoft Azure. Azure Databricks используется для сопоставления данных о поездках на такси и тарифах, а также для обогащения коррелированных данных с данными по соседству, хранящимся в файловой системе Azure Databricks.
Azure Cosmos DB — это полностью управляемая служба базы данных с несколькими моделями. Выходные данные задания Azure Databricks — это серия записей, которые записываются в Azure Cosmos DB для Apache Cassandra. Azure Cosmos DB для Apache Cassandra используется, так как он поддерживает моделирование данных временных рядов.
Azure Synapse Link для Azure Cosmos DB позволяет выполнять практически аналитику в режиме реального времени на операционные данные в Azure Cosmos DB без каких-либо последствий производительности или затрат на рабочую нагрузку транзакций. Эти результаты можно достичь с помощью бессерверных пулов SQL и пулов Spark. Эти подсистемы аналитики доступны в рабочей области Azure Synapse Analytics.
зеркальное отображение Azure Cosmos DB для NoSQL в Microsoft Fabric позволяет интегрировать данные Azure Cosmos DB с остальными данными в Microsoft Fabric.
Log Analytics — это средство в Azure Monitor, позволяющее запрашивать и анализировать данные журнала из различных источников. Данные журнала приложений, собираемые Azure Monitor, хранятся в рабочей области Log Analytics . Запросы Log Analytics можно использовать для анализа и визуализации метрик и проверки сообщений журнала для выявления проблем в приложении.
Подробности сценария
Компания такси собирает данные о каждой поездке на такси. В этом сценарии предполагается, что два отдельных устройства отправляют данные. Такси имеет метр, который отправляет информацию о каждой поездке, включая длительность, расстояние и посадку и раскрывающиеся места. Отдельное устройство принимает платежи от клиентов и отправляет данные о тарифах. Чтобы определить тенденции всадника, компания такси хочет вычислить средний чаевые на милю, управляемый для каждого района, в режиме реального времени.
Прием данных
Для имитации источника данных эта эталонная архитектура использует набор данных данных о такси Нью-Йорка1. Этот набор данных содержит данные о поездках по такси в Нью-Йорке с 2010 по 2013 год. Он содержит записи данных о поездках и тарифах. Данные о поездках включают длительность поездки, расстояние поездки, а также места сбора и удаления. Данные о тарифах включают сведения о тарифе, налоге и сумме чаевых. Поля в обоих типах записей включают номер медальона, лицензию взлома и идентификатор поставщика. Сочетание этих трех полей однозначно определяет такси и водителя. Данные хранятся в формате CSV.
[1] Донован, Брайан; Work, Dan (2016): New York City Taxi Trip Data (2010-2013). Иллинойсский университет в Урбане-Шампейне. https://doi.org/10.13012/J8PN93H8
Генератор данных — это приложение .NET Core, которое считывает записи и отправляет их в Центры событий. Генератор отправляет данные о поездке в формате JSON, а данные о тарифах — в формате CSV.
Для сегментации данных Центры событий используют секции. Они позволяют объекту-получателю считывать данные каждой секции параллельно. При отправке данных в Центры событий можно указать ключ секции напрямую. В противном случае записи назначаются секциям методом циклического перебора.
В этом сценарии данные о поездках и тарифы должны быть назначены одному и тому же идентификатору секции для конкретного такси такси. Это назначение позволяет Databricks применять степень параллелизма при сопоставлении двух потоков. Например, запись в разделе n данных о поездке соответствует записи в разделе n данных тарифа.
Скачайте файл Visio этой архитектуры.
В генераторе данных общая модель данных для обоих типов записей имеет свойство PartitionKey
, в котором объединены Medallion
, HackLicense
и VendorId
.
public abstract class TaxiData
{
public TaxiData()
{
}
[JsonProperty]
public long Medallion { get; set; }
[JsonProperty]
public long HackLicense { get; set; }
[JsonProperty]
public string VendorId { get; set; }
[JsonProperty]
public DateTimeOffset PickupTime { get; set; }
[JsonIgnore]
public string PartitionKey
{
get => $"{Medallion}_{HackLicense}_{VendorId}";
}
Это свойство используется для предоставления явного ключа секции при отправке данных в Центры событий.
using (var client = pool.GetObject())
{
return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
t.GetData(dataFormat))), t.PartitionKey);
}
Event Hubs
Пропускная способность Центров событий вычисляется в единицах пропускной способности. Вы можете автомасштабировать концентратор событий, включив автоматическое расширение. Эта функция автоматически масштабирует единицы пропускной способности на основе трафика до заданного максимума.
Потоковая обработка
В Azure Databricks задание выполняет обработку данных. Задание назначается кластеру, а затем выполняется на нем. Задание может быть пользовательским кодом, написанным на Java или записной книжкой Spark.
В этой эталонной архитектуре задание — это архив Java, который содержит классы, написанные на Java и Scala. При указании архива Java для задания Databricks кластер Databricks указывает класс для операции.
main
Здесь метод com.microsoft.pnp.TaxiCabReader
класса содержит логику обработки данных.
Чтение потока из двух экземпляров концентратора событий
Для считывания данных из двух экземпляров концентратора событий Azure в логике обработки данных используется Spark Structured Streaming.
// Create a token credential using Managed Identity
val credential = new DefaultAzureCredentialBuilder().build()
val rideEventHubOptions = EventHubsConf(rideEventHubEntraIdAuthConnectionString)
.setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
.setConsumerGroup(conf.taxiRideConsumerGroup())
.setStartingPosition(EventPosition.fromStartOfStream)
val rideEvents = spark.readStream
.format("eventhubs")
.options(rideEventHubOptions.toMap)
.load
val fareEventHubOptions = EventHubsConf(fareEventHubEntraIdAuthConnectionString)
.setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
.setConsumerGroup(conf.taxiFareConsumerGroup())
.setStartingPosition(EventPosition.fromStartOfStream)
val fareEvents = spark.readStream
.format("eventhubs")
.options(fareEventHubOptions.toMap)
.load
Обогащение данных с помощью сведений о районе
Данные о поездке включают координаты широты и долготы мест сбора и удаления. Эти координаты полезны, но не легко используются для анализа. Поэтому эти данные обогащены данными по соседству, которые считываются из файла фигуры.
Формат файла фигуры является двоичным и не легко анализируется. Но библиотека GeoTools предоставляет средства для геопространственных данных, использующих формат файла фигуры. Эта библиотека используется в классе com.microsoft.pnp.GeoFinder
для определения имени района на основе координат для расположений сбора и удаления.
val neighborhoodFinder = (lon: Double, lat: Double) => {
NeighborhoodFinder.getNeighborhood(lon, lat).get()
}
Присоединение к данным о поездках и тарифах
Сначала выполняется преобразование данных о поездках и тарифах:
val rides = transformedRides
.filter(r => {
if (r.isNullAt(r.fieldIndex("errorMessage"))) {
true
}
else {
malformedRides.add(1)
false
}
})
.select(
$"ride.*",
to_neighborhood($"ride.pickupLon", $"ride.pickupLat")
.as("pickupNeighborhood"),
to_neighborhood($"ride.dropoffLon", $"ride.dropoffLat")
.as("dropoffNeighborhood")
)
.withWatermark("pickupTime", conf.taxiRideWatermarkInterval())
val fares = transformedFares
.filter(r => {
if (r.isNullAt(r.fieldIndex("errorMessage"))) {
true
}
else {
malformedFares.add(1)
false
}
})
.select(
$"fare.*",
$"pickupTime"
)
.withWatermark("pickupTime", conf.taxiFareWatermarkInterval())
Затем данные о поездках присоединяются к данным тарифа:
val mergedTaxiTrip = rides.join(fares, Seq("medallion", "hackLicense", "vendorId", "pickupTime"))
Обработка данных и его вставка в Azure Cosmos DB
Средняя сумма тарифа для каждого района рассчитывается для определенного интервала времени:
val maxAvgFarePerNeighborhood = mergedTaxiTrip.selectExpr("medallion", "hackLicense", "vendorId", "pickupTime", "rateCode", "storeAndForwardFlag", "dropoffTime", "passengerCount", "tripTimeInSeconds", "tripDistanceInMiles", "pickupLon", "pickupLat", "dropoffLon", "dropoffLat", "paymentType", "fareAmount", "surcharge", "mtaTax", "tipAmount", "tollsAmount", "totalAmount", "pickupNeighborhood", "dropoffNeighborhood")
.groupBy(window($"pickupTime", conf.windowInterval()), $"pickupNeighborhood")
.agg(
count("*").as("rideCount"),
sum($"fareAmount").as("totalFareAmount"),
sum($"tipAmount").as("totalTipAmount"),
(sum($"fareAmount")/count("*")).as("averageFareAmount"),
(sum($"tipAmount")/count("*")).as("averageTipAmount")
)
.select($"window.start", $"window.end", $"pickupNeighborhood", $"rideCount", $"totalFareAmount", $"totalTipAmount", $"averageFareAmount", $"averageTipAmount")
Затем средняя сумма тарифа вставляется в Azure Cosmos DB:
maxAvgFarePerNeighborhood
.writeStream
.queryName("maxAvgFarePerNeighborhood_cassandra_insert")
.outputMode(OutputMode.Append())
.foreach(new CassandraSinkForeach(connector))
.start()
.awaitTermination()
Рекомендации
Эти рекомендации реализуют основные принципы платформы Azure Well-Architected Framework, которая представляет собой набор руководящих принципов, которые можно использовать для улучшения качества рабочей нагрузки. Дополнительные сведения см. в статье Microsoft Azure Well-Architected Framework.
Безопасность
Безопасность обеспечивает гарантии от преднамеренного нападения и неправильного использования ценных данных и систем. Дополнительные сведения см. в контрольном списке конструктора длябезопасности.
Доступ к рабочей области Azure Databricks управляется с помощью консоли администрирования. Консоль администрирования включает функции для добавления пользователей, управления разрешениями пользователей и настройки единого входа. Также в ней можно настроить управление доступом для рабочих областей, кластеров, заданий и таблиц.
Управление секретами
Azure Databricks включает в себя секретное хранилище, которое используется для хранения учетных данных и ссылки на них в записных книжках и заданиях. Области секционирования секретов в хранилище секретов Azure Databricks:
databricks secrets create-scope --scope "azure-databricks-job"
Секреты добавляются на уровне области:
databricks secrets put --scope "azure-databricks-job" --key "taxi-ride"
Примечание.
Используйте область, поддерживаемую Azure Key Vault, вместо собственной области Azure Databricks.
В коде для получения доступа к секретам используются соответствующие служебные программы Azure Databricks.
Оптимизация затрат
Оптимизация затрат фокусируется на способах сокращения ненужных расходов и повышения эффективности работы. Дополнительные сведения см. в контрольном списке конструктора дляоптимизации затрат.
Для оценки затрат используйте калькулятор цен Azure. Рассмотрим следующие службы, используемые в этой эталонной архитектуре.
Рекомендации по затратам центров событий
Эта эталонная архитектура развертывает центры событий на уровне "Стандартный". Модель ценообразования основана на единицах пропускной способности, событиях входящего трафика и записи событий. Событие входящего трафика — это единица данных, которая составляет 64 КБ или меньше. Оплата зависит от размера сообщений (одно событие до 64 КБ). Единицы пропускной способности указываются через API управления портал Azure или Центрами событий.
Если требуется больше дней хранения, рассмотрите уровень "Выделенный". Этот уровень предоставляет развертывания с одним клиентом, которые имеют строгие требования. Это предложение создает кластер, основанный на единицах емкости и не зависит от единиц пропускной способности. Плата за уровень "Стандартный" также взимается на основе событий входящего трафика и единиц пропускной способности.
Дополнительные сведения см. в ценовыхЦентров событий.
Рекомендации по затратам Azure Databricks
Azure Databricks предоставляет уровень "Стандартный" и уровень "Премиум", оба из которых поддерживают три рабочих нагрузки. Эта эталонная архитектура развертывает рабочую область Azure Databricks на уровне "Премиум".
Рабочие нагрузки проектирования данных должны выполняться в кластере заданий. Инженеры данных используют кластеры для создания и выполнения заданий. Рабочие нагрузки аналитики данных должны выполняться в кластере всех целей и предназначены для специалистов по обработке и анализу данных для интерактивного изучения, визуализации, управления данными и обмена ими.
Azure Databricks предоставляет несколько моделей ценообразования.
плана оплаты по мере использования
Плата за виртуальные машины, подготовленные в кластерах и единицах субД Azure Databricks, взимается на основе выбранного экземпляра виртуальной машины. DBU — это единица обработки, которая взимается за счет использования в секунду. Потребление DBU зависит от размера и типа экземпляра, работающего в Azure Databricks. Цены зависят от выбранной рабочей нагрузки и уровня.
план предварительной покупки
Вы фиксируете базы данных в качестве единиц фиксации Azure Databricks в течение одного или трех лет, чтобы снизить общую стоимость владения в течение этого периода времени по сравнению с моделью оплаты по мере использования.
Дополнительные сведения см. в цен на Azure Databricks.
Рекомендации по затратам Azure Cosmos DB
В этой архитектуре задание Azure Databricks записывает ряд записей в Azure Cosmos DB. Плата взимается за резервную емкость, которая измеряется в единицах запросов в секунду (ЕЗ/с). Эта емкость используется для выполнения операций вставки. Единица выставления счетов составляет 100 ЕЗ/с в час. Например, стоимость записи элементов размером 100 КБ составляет 50 ЕЗ/с.
Для операций записи подготовьте достаточно емкости для поддержки количества операций записи, необходимых в секунду. Вы можете увеличить подготовленную пропускную способность с помощью портала или Azure CLI перед выполнением операций записи, а затем сократить пропускную способность после завершения этих операций. Пропускная способность для периода записи — это сумма минимальной пропускной способности, необходимой для конкретных данных, и пропускной способности, необходимой для операции вставки. В этом вычислении предполагается, что не выполняется другая рабочая нагрузка.
Пример анализа затрат
Предположим, что вы настраиваете значение пропускной способности 1000 ЕЗ/с в контейнере. Он развертывается в течение 24 часов в течение 30 дней в общей сложности 720 часов.
Контейнер выставляется по 10 единицам ез/с в час за каждый час. 10 единиц в 0,008 долл. США (за 100 ЕЗ/с в час) взимается по 0,08 долл. США в час.
В течение 720 часов или 7200 единиц (из 100 единиц), вы выставляете счета за 57,60 долл. США в течение месяца.
Плата за хранение также взимается за каждый ГБ, используемый для хранимых данных и индекса. Дополнительные сведения см. в модели ценообразования Azure Cosmos DB.
Используйте калькулятор емкости Azure Cosmos DB для быстрой оценки затрат на рабочую нагрузку.
Операционное превосходство
Операционное превосходство охватывает процессы, которые развертывают приложение и продолжают работать в рабочей среде. Дополнительные сведения см. в контрольном списке проверки конструктора дляоперационного превосходства.
Наблюдение
Azure Databricks основан на Apache Spark. Azure Databricks и Apache Spark используют Apache Log4j в качестве стандартной библиотеки для ведения журнала. Помимо ведения журнала по умолчанию, которое предоставляет Apache Spark, можно реализовать ведение журнала в Log Analytics. Дополнительные сведения см. в статье Мониторинг в Azure Databricks.
Поскольку com.microsoft.pnp.TaxiCabReader
класс обрабатывает сообщения о поездках и тарифах, сообщение может быть неправильно сформировано и поэтому недопустимо. В рабочей среде важно проанализировать эти неправильные сообщения, чтобы определить проблему с источниками данных, чтобы ее можно было быстро устранить, чтобы предотвратить потерю данных. Класс com.microsoft.pnp.TaxiCabReader
регистрирует накопитель Apache Spark, отслеживающий количество неправильно сформированных записей тарифа и записей о поездках:
@transient val appMetrics = new AppMetrics(spark.sparkContext)
appMetrics.registerGauge("metrics.malformedrides", AppAccumulators.getRideInstance(spark.sparkContext))
appMetrics.registerGauge("metrics.malformedfares", AppAccumulators.getFareInstance(spark.sparkContext))
SparkEnv.get.metricsSystem.registerSource(appMetrics)
Apache Spark использует библиотеку Dropwizard для отправки метрик. Некоторые из собственных полей метрик Dropwizard несовместимы с Log Analytics, поэтому эта эталонная архитектура включает в себя настраиваемый приемник Dropwizard и репортер. Он форматирует метрики в формате, который ожидает Log Analytics. Apache Spark передает метрики вместе с пользовательскими метриками для данных о поездках и тарифах неправильного формата.
Чтобы отслеживать работу задания потоковой передачи, можно использовать следующие примеры запросов в рабочей области Log Analytics. Аргумент ago(1d)
в каждом запросе возвращает все записи, созданные за последний день. Этот параметр можно настроить для просмотра другого периода времени.
Исключения, зарегистрированные во время операции потокового запроса
SparkLoggingEvent_CL
| where TimeGenerated > ago(1d)
| where Level == "ERROR"
Сбор данных неправильного формата о поездках и тарифах
SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedrides"
| project value_d, TimeGenerated, applicationId_s
| render timechart
SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedfares"
| project value_d, TimeGenerated, applicationId_s
| render timechart
Операция задания с течением времени
SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "driver.DAGScheduler.job.allJobs"
| project value_d, TimeGenerated, applicationId_s
| render timechart
Организация ресурсов и развертывания
Создайте отдельные группы ресурсов для рабочей среды, сред разработки и тестирования. Так будет проще управлять развертываниями, удалять тестовые развертывания и назначать права доступа.
Используйте шаблон Azure Resource Manager для развертывания ресурсов Azure в соответствии с процессом инфраструктуры как кода. С помощью шаблонов можно легко автоматизировать развертывание с помощью служб Azure DevOps или других решений непрерывной интеграции и непрерывной доставки (CI/CD).
Поместите каждую рабочую нагрузку в отдельный шаблон развертывания и сохраните ресурсы в системах управления версиями. Можно развернуть шаблоны вместе или по отдельности в процессе непрерывной интеграции и непрерывного развертывания. Этот подход упрощает процесс автоматизации.
В этой архитектуре Центры событий, Log Analytics и Azure Cosmos DB определяются как одна рабочая нагрузка. Эти ресурсы включены в один шаблон Azure Resource Manager.
Рассмотрите возможность промежуточного хранения рабочих нагрузок. Развернитесь на различных этапах и выполните проверки на каждом этапе перед переходом к следующему этапу. Таким образом вы можете управлять отправкой обновлений в рабочие среды и свести к минимуму непредвиденные проблемы с развертыванием.
В этой архитектуре существует несколько этапов развертывания. Рассмотрите возможность создания конвейера Azure DevOps и добавления этих этапов. Вы можете автоматизировать следующие этапы:
- Запустите кластер Databricks.
- Настройка интерфейса командной строки Databricks.
- Установите средства Scala.
- Добавьте секреты Databricks.
Рассмотрите возможность написания автоматических тестов интеграции для повышения качества и надежности кода Databricks и его жизненного цикла.
Развертывание этого сценария
Чтобы развернуть и запустить эталонную реализацию, выполните действия, описанные в GitHub.