Соединитель Azure Data Explorer для Apache Spark
Apache Spark — это единый аналитический механизм для крупномасштабной обработки данных. Azure Data Explorer — это быстрая и полностью управляемая служба для аналитики большого объема потоковых данных в реальном времени.
Соединитель Kusto для Spark — это проект открытый код, который может выполняться в любом кластере Spark. Он реализует источник и приемник данных для перемещения данных между кластерами Azure Data Explorer и Spark. Используя Azure Data Explorer и Apache Spark, вы можете создавать быстрые и масштабируемые приложения, ориентированные на сценарии, основанные на данных. Например, машинное обучение (ML), извлечение-преобразование-загрузка (ETL) и Log Analytics. С помощью соединителя Azure Data Explorer становится допустимым хранилищем данных для стандартных операций источника и приемника Spark, таких как запись, чтение и writeStream.
Вы можете записать в Azure Data Explorer с помощью приема в очереди или потоковой передачи. Чтение из Azure Data Explorer поддерживает обрезку столбцов и раскрытие предикатов, которые фильтруют данные в обозревателе данных Azure, уменьшая объем передаваемых данных.
Примечание.
Сведения о работе с соединителем Synapse Spark для Azure Data Explorer см. в статье Подключение к Azure Data Explorer с помощью Apache Spark для Azure Synapse Analytics.
В этом разделе описывается, как установить и настроить соединитель Spark Azure Data Explorer и перемещать данные между кластерами обозревателя данных Azure и Apache Spark.
Примечание.
Хотя некоторые из приведенных ниже примеров относятся к кластеру Azure Databricks Spark, соединитель Azure Data Explorer Spark не имеет прямых зависимостей от Databricks или любого другого дистрибутива Spark.
Необходимые компоненты
- Подписка Azure. Создайте бесплатную учетную запись Azure.
- Кластер и база данных Azure Data Explorer. Создайте кластер и базу данных.
- Кластер Spark
- Установите библиотеку соединителя:
- Предварительно созданные библиотеки для Spark 2.4+Scala 2.11 или Spark 3+scala 2.12
- Репозиторий Maven
- Maven 3.x установлен
Совет
Версии Spark 2.3.x также поддерживаются, но могут потребоваться некоторые изменения в зависимостях pom.xml.
Как собрать соединитель Spark
Начиная с версии 2.3.0 мы представляем новые идентификаторы артефактов, заменяющие spark-kusto-connector: kusto-spark_3.0_2.12 , предназначенные для Spark 3.x и Scala 2.12.
Примечание.
Версии до 2.5.1 больше не работают для вставки в существующую таблицу, пожалуйста, обновите до более поздней версии. Этот шаг необязательный. Если вы используете готовые библиотеки, например Maven, см. раздел Настройка кластера Spark.
Предварительные требования к сборке
Обратитесь к этому источнику для создания Spark Connector.
Для приложений Scala/Java с помощью определений проекта Maven свяжите приложение с последним артефактом. Найдите последний артефакт на Maven Central.
For more information, see [https://mvnrepository.com/artifact/com.microsoft.azure.kusto/kusto-spark_3.0_2.12](https://mvnrepository.com/artifact/com.microsoft.azure.kusto/kusto-spark_3.0_2.12).
Если вы не используете предварительно созданные библиотеки, необходимо установить библиотеки, перечисленные в зависимостях , включая следующие библиотеки пакета SDK Для Java Kusto. Чтобы найти нужную версию для установки, загляните в pom соответствующего выпуска.
Для создания JAR-файла и выполнения всех тестов выполните команду:
mvn clean package -DskipTests
Чтобы собрать jar, запустите все тесты и установите jar в локальный репозиторий Maven:
mvn clean install -DskipTests
Для получения дополнительной информации см. использование соединителя.
Настройка кластера Spark
Примечание.
Рекомендуется использовать последний выпуск соединителя Kusto Spark при выполнении следующих действий.
Настройте следующие параметры кластера Spark на основе кластера Azure Databricks Spark 3.0.1 и Scala 2.12:
Установите последнюю версию библиотеки spark-kusto-connector от Maven.
Убедитесь, что установлены все необходимые библиотеки:
Для установки с помощью JAR-файла убедитесь, что установлены другие зависимости:
Проверка подлинности
Соединитель Kusto Spark позволяет выполнять проверку подлинности с помощью идентификатора Microsoft Entra с помощью одного из следующих методов:
- Приложение Microsoft Entra
- Маркер доступа Microsoft Entra
- Проверка подлинности устройства (для сценариев, не являющихся продуктами)
- Azure Key Vault: чтобы получить доступ к ресурсу хранилища ключей, установите пакет azure-keyvault и укажите учетные данные приложения.
Проверка подлинности приложения Microsoft Entra
Проверка подлинности приложения Microsoft Entra является самым простым и наиболее распространенным методом проверки подлинности и рекомендуется для соединителя Kusto Spark.
Войдите в подписку Azure с помощью Azure CLI. Затем авторизуйтесь в браузере.
az login
Выберите подписку для размещения субъекта. Этот шаг необходим, если у вас несколько подписок.
az account set --subscription YOUR_SUBSCRIPTION_GUID
Создайте субъект-службу. В этом примере принципал службы называется
my-service-principal
.az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
Из возвращаемых данных JSON скопируйте
appId
password
данные иtenant
для дальнейшего использования.{ "appId": "00001111-aaaa-2222-bbbb-3333cccc4444", "displayName": "my-service-principal", "name": "my-service-principal", "password": "00001111-aaaa-2222-bbbb-3333cccc4444", "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444" }
Вы создали приложение Microsoft Entra и субъект-службу.
Соединитель Spark использует следующие свойства приложения Entra для проверки подлинности:
Свойства | Строка параметра | Description |
---|---|---|
KUSTO_AAD_APP_ID | kustoAadAppId | Идентификатор приложения Microsoft Entra (клиент). |
KUSTO_AAD_AUTHORITY_ID | kustoAadAuthorityID | Центр проверки подлинности Microsoft Entra. Идентификатор каталога Microsoft Entra (клиента). Необязательный — по умолчанию используется microsoft.com. Дополнительные сведения см. в центре Microsoft Entra. |
KUSTO_AAD_APP_SECRET | kustoAadAppSecret | Ключ приложения Microsoft Entra для клиента. |
KUSTO_ACCESS_TOKEN | kustoAccessToken | Если у вас уже есть accessToken, созданный с доступом к Kusto, его можно использовать для проверки подлинности. |
Примечание.
Более старые версии API (менее 2.0.0) имеют следующие наименования: kustoAADClientID, kustoClientAADClientPassword, kustoAADAuthorityID
Привилегии Kusto
Предоставьте следующие привилегии на стороне kusto на основе операции Spark, которую вы хотите выполнить.
Операция Spark | Привилегии |
---|---|
Чтение — один режим | Читатель |
Чтение — принудительно распределенный режим | Читатель |
Запись — режим очереди с параметром создания таблицы CreateTableIfNotExist | Административный |
Запись — режим очереди с параметром создания таблицы FailIfNotExist | Ingestor |
Запись — TransactionalMode | Административный |
Дополнительные сведения об основных ролях см. в разделе "Управление доступом на основе ролей". Для управления ролями безопасности см. Управление ролями безопасности.
Приемник Spark: запись в Kusto
Настроить параметры приемника:
val KustoSparkTestAppId = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppId") val KustoSparkTestAppKey = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppKey") val appId = KustoSparkTestAppId val appKey = KustoSparkTestAppKey val authorityId = "72f988bf-86f1-41af-91ab-2d7cd011db47" // Optional - defaults to microsoft.com val cluster = "Sparktest.eastus2" val database = "TestDb" val table = "StringAndIntTable"
Запись кадра данных Spark в кластер Kusto в виде пакета:
import com.microsoft.kusto.spark.datasink.KustoSinkOptions import org.apache.spark.sql.{SaveMode, SparkSession} df.write .format("com.microsoft.kusto.spark.datasource") .option(KustoSinkOptions.KUSTO_CLUSTER, cluster) .option(KustoSinkOptions.KUSTO_DATABASE, database) .option(KustoSinkOptions.KUSTO_TABLE, "Demo3_spark") .option(KustoSinkOptions.KUSTO_AAD_APP_ID, appId) .option(KustoSinkOptions.KUSTO_AAD_APP_SECRET, appKey) .option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authorityId) .option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, "CreateIfNotExist") .mode(SaveMode.Append) .save()
Или используйте упрощенный синтаксис:
import com.microsoft.kusto.spark.datasink.SparkIngestionProperties import com.microsoft.kusto.spark.sql.extension.SparkExtension._ val sparkIngestionProperties = Some(new SparkIngestionProperties()) // Optional, use None if not needed df.write.kusto(cluster, database, table, conf, sparkIngestionProperties)
Запись потоковых данных:
import org.apache.spark.sql.streaming.Trigger import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit import org.apache.spark.sql.streaming.Trigger // Set up a checkpoint and disable codeGen. spark.conf.set("spark.sql.streaming.checkpointLocation", "/FileStore/temp/checkpoint") // Write to a Kusto table from a streaming source val kustoQ = df .writeStream .format("com.microsoft.kusto.spark.datasink.KustoSinkProvider") .options(conf) .trigger(Trigger.ProcessingTime(TimeUnit.SECONDS.toMillis(10))) // Sync this with the ingestionBatching policy of the database .start()
Источник Spark: чтение из Kusto
При чтении небольших объемов данных определите запрос данных:
import com.microsoft.kusto.spark.datasource.KustoSourceOptions import org.apache.spark.SparkConf import org.apache.spark.sql._ import com.microsoft.azure.kusto.data.ClientRequestProperties val query = s"$table | where (ColB % 1000 == 0) | distinct ColA" val conf: Map[String, String] = Map( KustoSourceOptions.KUSTO_AAD_APP_ID -> appId, KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey ) val df = spark.read.format("com.microsoft.kusto.spark.datasource"). options(conf). option(KustoSourceOptions.KUSTO_QUERY, query). option(KustoSourceOptions.KUSTO_DATABASE, database). option(KustoSourceOptions.KUSTO_CLUSTER, cluster). load() // Simplified syntax flavor import com.microsoft.kusto.spark.sql.extension.SparkExtension._ val cpr: Option[ClientRequestProperties] = None // Optional val df2 = spark.read.kusto(cluster, database, query, conf, cpr) display(df2)
Необязательно. Если вы предоставляете временное хранилище BLOB-объектов (а не Kusto), большие двоичные объекты создаются под ответственностью вызывающего объекта. Это включает в себя предоставление хранилища, ротацию ключей доступа и удаление временных артефактов. Модуль KustoBlobStorageUtils содержит вспомогательные функции для удаления больших двоичных объектов на основе координат учетной записи и контейнера и учетных данных либо полного URL-адреса SAS с разрешениями на запись, чтение и список. Когда соответствующий RDD больше не нужен, каждая транзакция сохраняет временные артефакты больших двоичных объектов в отдельном каталоге. Этот каталог записывается как часть журналов с информацией о транзакциях чтения, передаваемых на узле Spark Driver.
// Use either container/account-key/account name, or container SaS val container = dbutils.secrets.get(scope = "KustoDemos", key = "blobContainer") val storageAccountKey = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountKey") val storageAccountName = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountName") // val storageSas = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageSasUrl")
В приведенном выше примере доступ к Key Vault через интерфейс соединителя отсутствует; используется более простой метод использования секретов Databricks.
Чтение из Kusto.
Если вы предоставляете временное хранилище BLOB-объектов, читайте из Kusto следующим образом:
val conf3 = Map( KustoSourceOptions.KUSTO_AAD_APP_ID -> appId, KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey KustoSourceOptions.KUSTO_BLOB_STORAGE_SAS_URL -> storageSas) val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3) val dfFiltered = df2 .where(df2.col("ColA").startsWith("row-2")) .filter("ColB > 12") .filter("ColB <= 21") .select("ColA") display(dfFiltered)
Если Kusto предоставляет временное хранилище BLOB-объектов, считывается из Kusto следующим образом:
val conf3 = Map( KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId, KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey) val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3) val dfFiltered = df2 .where(df2.col("ColA").startsWith("row-2")) .filter("ColB > 12") .filter("ColB <= 21") .select("ColA") display(dfFiltered)