Поделиться через


Соединитель Spark для хранилища данных Microsoft Fabric

Соединитель Spark для хранилища данных Fabric позволяет разработчикам Spark и специалистам по обработке и анализу данных получать доступ к данным из хранилища и конечной точки аналитики SQL в лейкхаусе. Соединитель предлагает следующие возможности:

  • Вы можете работать с данными из конечной точки хранилища или аналитики SQL в одной рабочей области или в нескольких рабочих областях.
  • Конечная точка аналитики SQL в Lakehouse автоматически обнаруживается в зависимости от контекста рабочей области.
  • Соединитель имеет упрощенный API Spark, абстрагирует базовую сложность и работает только с одной строкой кода.
  • При доступе к таблице или представлению соединитель поддерживает модели безопасности, определенные на уровне ядра SQL. К этим моделям относятся безопасность на уровне объектов (OLS), безопасность на уровне строк (RLS) и безопасность на уровне столбцов (CLS).
  • Соединитель предварительно установлен в среде выполнения Fabric, что устраняет необходимость отдельной установки.

Примечание.

Соединитель в настоящее время находится в предварительной версии. Дополнительные сведения см . в текущих ограничениях далее в этой статье.

Проверка подлинности

Проверка подлинности Microsoft Entra — это интегрированный подход к проверке подлинности. Пользователи войдите в рабочую область Microsoft Fabric и их учетные данные автоматически передаются в обработчик SQL для проверки подлинности и авторизации. Учетные данные автоматически сопоставляются, и пользователи не требуются для предоставления определенных параметров конфигурации.

Разрешения

Чтобы подключиться к подсистеме SQL, пользователям требуется по крайней мере разрешение на чтение (аналогично разрешение CONNECT в SQL Server) на конечной точке хранилища или аналитики SQL (уровень элемента). Пользователям также требуются подробные разрешения на уровень объекта для чтения данных из определенных таблиц или представлений. Дополнительные сведения см. в статье "Безопасность хранения данных" в Microsoft Fabric.

Шаблоны кода и примеры

Использование сигнатуры метода

Следующая команда показывает сигнатуру synapsesql метода для запроса на чтение. Для доступа к таблицам или представлениям из хранилища и конечной точки аналитики SQL в лейкхаусе требуется три части tableName . Обновите аргумент со следующими именами на основе вашего сценария:

  • Часть 1. Имя склада или озера.
  • Часть 2. Имя схемы.
  • Часть 3. Имя таблицы или представления.
synapsesql(tableName:String="<Part 1.Part 2.Part 3>") => org.apache.spark.sql.DataFrame

Помимо чтения из таблицы или представления напрямую, этот соединитель также позволяет указать настраиваемый или сквозной запрос, который передается подсистеме SQL и результат возвращается обратно в Spark.

spark.read.option(Constants.DatabaseName, "<warehouse/lakeshouse name>").synapsesql("<T-SQL Query>") => org.apache.spark.sql.DataFrame

Хотя этот соединитель автоматически обнаруживает конечную точку для указанного хранилища или озера, если вы хотите явно указать ее, это можно сделать.

//For warehouse
spark.conf.set("spark.datawarehouse.<warehouse name>.sqlendpoint", "<sql endpoint,port>")
//For lakehouse
spark.conf.set("spark.lakehouse.<lakeshouse name>.sqlendpoint", "<sql endpoint,port>")
//Read from table
spark.read.synapsesql("<warehouse/lakeshouse name>.<schema name>.<table or view name>") 

Чтение данных в одной рабочей области

Внимание

Выполните следующие инструкции импорта в начале записной книжки или перед началом использования соединителя:

Для Scala

import com.microsoft.spark.fabric.tds.implicits.read.FabricSparkTDSImplicits._

import com.microsoft.spark.fabric.Constants

Для PySpark (Python)

import com.microsoft.spark.fabric

from com.microsoft.spark.fabric.Constants import Constants

Следующий код является примером для чтения данных из таблицы или представления в кадре данных Spark:

df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>")

Следующий код является примером для чтения данных из таблицы или представления в кадре данных Spark с ограничением количества строк 10:

df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>").limit(10)

Следующий код является примером для чтения данных из таблицы или представления в кадре данных Spark после применения фильтра:

df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>").filter("column name == 'value'")

Следующий код является примером для чтения данных из таблицы или представления в кадре данных Spark только для выбранных столбцов:

df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>").select("column A", "Column B")

Чтение данных в рабочих областях

Для доступа к данным из хранилища или озера в рабочих областях можно указать идентификатор рабочей области, в котором существует хранилище или озеро, а затем идентификатор элемента lakehouse или хранилища. В следующей строке приведен пример чтения данных из таблицы или представления в кадре данных Spark из хранилища или озера с указанным идентификатором рабочей области и идентификатором lakehouse или хранилища:

# For lakehouse
df = spark.read.option(Constants.WorkspaceId, "<workspace id>").synapsesql("<lakehouse name>.<schema name>.<table or view name>")
df = spark.read.option(Constants.WorkspaceId, "<workspace id>").option(Constants.LakehouseId, "<lakehouse item id>").synapsesql("<lakehouse name>.<schema name>.<table or view name>")

# For warehouse
df = spark.read.option(Constants.WorkspaceId, "<workspace id>").synapsesql("<warehouse name>.<schema name>.<table or view name>")
df = spark.read.option(Constants.WorkspaceId, "<workspace id>").option(Constants.DatawarehouseId, "<warehouse item id>").synapsesql("<warehouse name>.<schema name>.<table or view name>")

Примечание.

При запуске записной книжки соединитель по умолчанию ищет указанный склад или lakehouse в рабочей области lakehouse, подключенной к записной книжке. Чтобы ссылаться на склад или lakehouse из другой рабочей области, укажите идентификатор рабочей области и идентификатор элемента lakehouse или склада, как описано выше.

Создание таблицы Lakehouse на основе данных из хранилища

Эти строки кода предоставляют пример для чтения данных из таблицы или представления в кадре данных Spark и его использования для создания таблицы lakehouse:

df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>")
df.write.format("delta").saveAsTable("<Lakehouse table name>")

Устранение неполадок

По завершении фрагмент ответа чтения отображается в выходных данных ячейки. Сбой в текущей ячейке также отменяет последующие выполнения ячеек записной книжки. Подробные сведения об ошибке доступны в журналах приложений Spark.

Текущие ограничения

В настоящее время соединитель:

  • Поддерживает извлечение данных из хранилищ Fabric и конечных точек аналитики SQL для элементов lakehouse.
  • Структура DW теперь поддерживает, однако этот соединитель не работает Time Travel для запроса с синтаксисом перемещения по времени.
  • Сохраняет подпись использования, например ту, которая поставляется с Apache Spark для Azure Synapse Analytics для согласованности. Однако он не совместим с подключением и работой с выделенным пулом SQL в Azure Synapse Analytics.
  • Имена столбцов со специальными символами будут обрабатываться путем добавления escape-символа перед отправкой запроса на основе имени таблицы или представления 3 части. В случае чтения на основе пользовательского или сквозного запроса пользователи должны экранировать имена столбцов, которые будут содержать специальные символы.