Поделиться через


Взаимодействие с Azure Cosmos DB с помощью Apache Spark 3 в Azure Synapse Link

В этой статье показано, как реализовать взаимодействие с Azure Cosmos DB с помощью Synapse Apache Spark 3. Благодаря полной поддержке Scala, Python, SparkSQL и C# Synapse Apache Spark 3 можно использовать как единый центр в сценариях аналитики, инжиниринга, обработки, а также анализа и исследования данных в Azure Synapse Link для Azure Cosmos DB.

При взаимодействии с Azure Cosmos DB поддерживаются следующие возможности.

  • Synapse Apache Spark 3 позволяет анализировать данные в контейнерах Azure Cosmos DB, которые поддерживают Azure Synapse Link почти в реальном времени, не влияя на производительность транзакционных рабочих нагрузок. Следующие два варианта доступны для запроса аналитического хранилища Azure Cosmos DB из Spark:
    • Загрузка в кадр данных Spark.
    • Создать таблицу Spark
  • Synapse Apache Spark также позволяет принимать данные в Azure Cosmos DB. Важно отметить, что данные всегда поступают в контейнеры Azure Cosmos DB через хранилище транзакций. Если включена поддержка Synapse Link, все новые операции вставки, обновления и удаления автоматически синхронизируются с аналитическим хранилищем.
  • Synapse Apache Spark также поддерживает структурированную потоковую передачу Spark в Azure Cosmos DB в качестве источника и приемника.

В следующих разделах излагается синтаксис функций, описанных выше. Вы также можете проверить модуль Learn о том, как запрашивать Azure Cosmos DB с помощью Apache Spark для Azure Synapse Analytics. Жесты в рабочей области Azure Synapse Analytics разработаны с целью предоставления простого готового интерфейса для начала работы. Чтобы просмотреть жесты, щелкните правой кнопкой мыши контейнер Azure Cosmos DB на вкладке Данные рабочей области Synapse. С помощью жестов можно быстро создать код и скорректировать его в соответствии с потребностями. Жесты также идеально подходят для обнаружения данных одним щелчком мыши.

Внимание

Вы должны учитывать некоторые ограничения в аналитической схеме, которые могут привести к непредвиденному поведению в операциях загрузки данных. Например, в аналитической схеме доступны только первые 1000 свойств из схемы транзакций, свойства с пробелами и т. д. Если у вас возникли непредвиденные результаты, проверьте ограничения схемы аналитического хранилища для получения дополнительных сведений.

Запрос аналитического хранилища Azure Cosmos DB

Прежде чем вы узнаете о двух возможных вариантах для запроса аналитического хранилища Azure Cosmos DB, загрузки в Spark DataFrame и создании таблицы Spark, следует ознакомиться с различиями в работе. Это позволит выбрать вариант, который лучше всего соответствует вашим потребностям.

Различия заключаются в том, должны ли изменения базовых данных в контейнере Azure Cosmos DB автоматически отражаться в анализе, выполненном в Spark. При регистрации кадра данных Spark или создании таблицы Spark для аналитического хранилища контейнера метаданные, связанные с текущим моментальным снимком данных в аналитическом хранилище, извлекаются в Spark. Это позволяет выполнить эффективное включение последующего анализа. Важно отметить, что так как Spark следует политике отложенной оценки, если только не вызвано действие в кадре данных Spark или не будет выполнен запрос SparkSQL в таблице Spark, фактические данные не будут получены из аналитического хранилища базового контейнера.

В случае загрузки в кадр данных Spark извлеченные метаданные кэшируются в течение времени существования сеанса Spark. Поэтому последующие действия, вызываемые в кадре данных, оцениваются по моментальному снимку аналитического хранилища на момент создания кадра данных.

С другой стороны, в случае создания таблицы Spark метаданные состояния аналитического хранилища не кэшируются в Spark и повторно загружаются при каждом выполнении запроса SparkSQL в таблице Spark.

Таким образом, вы можете выполнить загрузку в кадр данных Spark или создать таблицу Spark в зависимости от того, хотите ли вы оценить ваш анализ Spark по фиксированному снимку аналитического хранилища или по последнему снимку аналитического хранилища соответственно.

Примечание.

Чтобы запросить учетные записи Azure Cosmos DB для MongoDB, узнайте больше о полном представлении схемы точности в аналитическом хранилище и именах расширенных свойств, используемых.

Примечание.

Обратите внимание, что во всех объектах options в приведенных ниже командах учитывается регистр.

Загрузка в кадр данных Spark.

В этом примере вы создадите кадр данных Spark, который указывает на аналитическое хранилище Azure Cosmos DB. Затем можно выполнить дополнительный анализ, вызвав действия Spark для кадра данных. Эта операция не влияет на хранилище транзакций.

Синтаксис на языке Python будет следующим:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

df = spark.read.format("cosmos.olap")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .load()

Эквивалентный синтаксис на языке Scala будет следующим:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val df_olap = spark.read.format("cosmos.olap").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    load()

Создать таблицу Spark

В этом примере вы создадите таблицу Spark, которая указывает на аналитическое хранилище Azure Cosmos DB. Затем можно выполнить дополнительный анализ, вызвав запросы SparkSQL к таблице. Эта операция никак не влияет на хранилище транзакций и не влечет за собой перемещение данных. Если вы решили удалить эту таблицу Spark, это никак не повлияет на базовый контейнер Azure Cosmos DB и соответствующее аналитическое хранилище.

Этот сценарий удобен для повторного использования таблиц Spark с применением средств сторонних разработчиков и предоставления доступа к базовым данным в среде выполнения.

Синтаксис для создания таблицы Spark выглядит следующим образом:

%%sql
-- To select a preferred list of regions in a multi-region Azure Cosmos DB account, add spark.cosmos.preferredRegions '<Region1>,<Region2>' in the config options

create table call_center using cosmos.olap options (
    spark.synapse.linkedService '<enter linked service name>',
    spark.cosmos.container '<enter container name>'
)

Примечание.

Если у вас есть сценарии, в которых схема базового контейнера Azure Cosmos DB изменяется с течением времени, и если вы хотите, чтобы обновленная схема автоматически отражалась в запросах к таблице Spark, в параметрах таблицы Spark установите для параметра spark.cosmos.autoSchemaMerge значение true.

Запись кадра данных Spark в контейнер Azure Cosmos DB

В этом примере вы запишете кадр данных Spark в контейнер Azure Cosmos DB. Эта операция влияет на производительность рабочих нагрузок транзакций и на использование единиц запросов, подготовленных в контейнере Azure Cosmos DB или общей базе данных.

Синтаксис на языке Python будет следующим:

# Write a Spark DataFrame into an Azure Cosmos DB container
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

YOURDATAFRAME.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .mode('append')\
    .save()

Эквивалентный синтаксис на языке Scala будет следующим:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

import org.apache.spark.sql.SaveMode

df.write.format("cosmos.oltp").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    mode(SaveMode.Append).
    save()

Загрузка DataFrame для потоковой передачи из контейнера

В этом жесте вы будете использовать возможность потоковой передачи Spark для загрузки данных из контейнера в DataFrame. Данные будут храниться в основной учетной записи Data Lake (и файловой системе), подключенной к рабочей области.

Примечание.

Если вы хотите ссылаться на внешние библиотеки в Synapse Apache Spark, просмотрите дополнительные сведения здесь. Например, если вы хотите принять кадр данных Spark в контейнер Azure Cosmos DB для MongoDB, можно использовать соединитель MongoDB для Spark здесь.

Загрузка кадра данных для потоковой передачи из контейнера Azure Cosmos DB

В этом примере вы будете использовать функцию структурированной потоковой передачи Spark для загрузки данных из контейнера Azure Cosmos DB в кадр данных потоковой передачи Spark с помощью функции канала изменений в Azure Cosmos DB. Данные контрольных точек, используемые Spark, будут храниться в основной учетной записи озера данных (и файловой системе), подключенной к рабочей области.

Синтаксис на языке Python будет следующим:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

dfStream = spark.readStream\
    .format("cosmos.oltp.changeFeed")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("spark.cosmos.changeFeed.startFrom", "Beginning")\
    .option("spark.cosmos.changeFeed.mode", "Incremental")\
    .load()

Эквивалентный синтаксис на языке Scala будет следующим:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val dfStream = spark.readStream.
    format("cosmos.oltp.changeFeed").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    option("spark.cosmos.changeFeed.startFrom", "Beginning").
    option("spark.cosmos.changeFeed.mode", "Incremental").
    load()

Запись кадра данных для потоковой передачи в контейнер Azure Cosmos DB

В этом примере вы запишете кадр данных для потоковой передачи в контейнер Azure Cosmos DB. Эта операция влияет на производительность рабочих нагрузок транзакций и на использование единиц запросов, подготовленных в контейнере Azure Cosmos DB или общей базе данных. Если папка /localWriteCheckpointFolder отсутствует (в примере ниже), она будет создана автоматически.

Синтаксис на языке Python будет следующим:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

streamQuery = dfStream\
    .writeStream\
    .format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("checkpointLocation", "/tmp/myRunId/")\
    .outputMode("append")\
    .start()

streamQuery.awaitTermination()

Эквивалентный синтаксис на языке Scala будет следующим:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val query = dfStream.
            writeStream.
            format("cosmos.oltp").
            outputMode("append").
            option("spark.synapse.linkedService", "<enter linked service name>").
            option("spark.cosmos.container", "<enter container name>").
            option("checkpointLocation", "/tmp/myRunId/").
            start()

query.awaitTermination()

Следующие шаги