Интеграция Функции Azure с Azure Data Explorer с помощью входных и выходных привязок (предварительная версия)
Внимание
Этот соединитель можно использовать в аналитике в режиме реального времени в Microsoft Fabric. Используйте инструкции в этой статье со следующими исключениями:
- При необходимости создайте базы данных с помощью инструкций в статье "Создание базы данных KQL".
- При необходимости создайте таблицы с помощью инструкций в статье "Создание пустой таблицы".
- Получение URI запроса или приема с помощью инструкций в URI копирования.
- Выполнение запросов в наборе запросов KQL.
Функции Azure позволяют запускать бессерверный код в облаке по расписанию или в ответ на событие. С помощью входных и выходных привязок Azure Data Explorer для Функции Azure можно интегрировать Azure Data Explorer в рабочие процессы для приема данных и выполнения запросов к кластеру.
Необходимые компоненты
- Подписка Azure. Создайте бесплатную учетную запись Azure.
- Кластер и база данных Azure Data Explorer с примерами данных. Создайте кластер и базу данных.
- Учетная запись хранения.
Попробуйте выполнить интеграцию с примером проекта
Использование привязок Azure Data Explorer для Функции Azure
Сведения об использовании привязок Azure Data Explorer для Функции Azure см. в следующих разделах:
- Общие сведения о привязках Azure Data Explorer для Функции Azure
- Входные привязки Azure Data Explorer для Функции Azure
- Выходные привязки Azure Data Explorer для Функции Azure
Сценарии использования привязок Azure Data Explorer для Функции Azure
В следующих разделах описаны некоторые распространенные сценарии использования привязок Azure Data Explorer для Функции Azure.
Входные привязки
Входные привязки выполняют запрос язык запросов Kusto (KQL) или функцию KQL, при необходимости с параметрами и возвращают выходные данные функции.
В следующих разделах описывается использование входных привязок в некоторых распространенных сценариях.
Сценарий 1. Конечная точка HTTP для запроса данных из кластера
Использование входных привязок применимо в ситуациях, когда необходимо предоставить данные Azure Data Explorer через REST API. В этом сценарии для запроса данных в кластере используется триггер HTTP Функции Azure. Этот сценарий особенно полезен в ситуациях, когда необходимо предоставить программный доступ к данным Azure Data Explorer для внешних приложений или служб. Предоставляя данные через REST API, приложения могут легко использовать данные без необходимости подключаться непосредственно к кластеру.
Код определяет функцию с триггером HTTP и входной привязкой Azure Data Explorer. Входная привязка указывает запрос для выполнения в таблице Products в базе данных productsdb . Функция использует столбец productId в качестве предиката, передаваемого в качестве параметра.
{
[FunctionName("GetProduct")]
public static async Task<IActionResult> RunAsync(
[HttpTrigger(AuthorizationLevel.User, "get", Route = "getproducts/{productId}")]
HttpRequest req,
[Kusto(Database:"productsdb" ,
KqlCommand = "declare query_parameters (productId:long);Products | where ProductID == productId" ,
KqlParameters = "@productId={productId}",
Connection = "KustoConnectionString")]
IAsyncEnumerable<Product> products)
{
IAsyncEnumerator<Product> enumerator = products.GetAsyncEnumerator();
var productList = new List<Product>();
while (await enumerator.MoveNextAsync())
{
productList.Add(enumerator.Current);
}
await enumerator.DisposeAsync();
return new OkObjectResult(productList);
}
}
Затем функцию можно вызвать следующим образом:
curl https://myfunctionapp.azurewebsites.net/api/getproducts/1
Сценарий 2. Запланированный триггер для экспорта данных из кластера
Следующий сценарий применим в ситуациях, когда данные необходимо экспортировать в расписание на основе времени.
Код определяет функцию с триггером таймера, который экспортирует агрегирование данных о продажах из базы данных productsdb в CSV-файл в Хранилище BLOB-объектов Azure.
public static async Task Run([TimerTrigger("0 0 1 * * *")] TimerInfo myTimer,
[Kusto(ConnectionStringSetting = "KustoConnectionString",
DatabaseName = "productsdb",
Query = "ProductSales | where OrderDate >= ago(1d) | summarize Sales = sum(ProductSales) by ProductName | top 10 by Sales desc")] IEnumerable<dynamic> queryResults,
[Blob("salescontainer/productsblob.csv", FileAccess.Write, Connection = "BlobStorageConnection")] CloudBlockBlob outputBlob,
ILogger log)
{
// Write the query results to a CSV file
using (var stream = new MemoryStream())
using (var writer = new StreamWriter(stream))
using (var csv = new CsvWriter(writer, CultureInfo.InvariantCulture))
{
csv.WriteRecords(queryResults);
writer.Flush();
stream.Position = 0;
await outputBlob.UploadFromStreamAsync(stream);
}
}
Выходные привязки
Выходные привязки принимают одну или несколько строк и вставляют их в таблицу Azure Data Explorer.
В следующих разделах описывается использование выходных привязок в некоторых распространенных сценариях.
Сценарий 1. Конечная точка HTTP для приема данных в кластер
Следующий сценарий применим в ситуациях, когда входящие HTTP-запросы должны обрабатываться и приема в кластер. Используя выходную привязку, входящие данные из запроса можно записать в таблицы Azure Data Explorer.
Код определяет функцию с триггером HTTP и выходной привязкой Azure Data Explorer. Эта функция принимает полезные данные JSON в тексте HTTP-запроса и записывает его в таблицу продуктов в базе данных productsdb .
public static IActionResult Run(
[HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "addproductuni")]
HttpRequest req, ILogger log,
[Kusto(Database:"productsdb" ,
TableName ="products" ,
Connection = "KustoConnectionString")] out Product product)
{
log.LogInformation($"AddProduct function started");
string body = new StreamReader(req.Body).ReadToEnd();
product = JsonConvert.DeserializeObject<Product>(body);
string productString = string.Format(CultureInfo.InvariantCulture, "(Name:{0} ID:{1} Cost:{2})",
product.Name, product.ProductID, product.Cost);
log.LogInformation("Ingested product {}", productString);
return new CreatedResult($"/api/addproductuni", product);
}
Затем функцию можно вызвать следующим образом:
curl -X POST https://myfunctionapp.azurewebsites.net/api/addproductuni -d '{"Name":"Product1","ProductID":1,"Cost":100,"ActivatedOn":"2023-01-02T00:00:00"}'
Сценарий 2. Прием данных из RabbitMQ или других систем обмена сообщениями, поддерживаемых в Azure
Следующий сценарий применим в ситуациях, когда данные из системы обмена сообщениями необходимо принять в кластер. Используя выходную привязку, входящие данные из системы обмена сообщениями можно получать в таблицы Azure Data Explorer.
Код определяет функцию с сообщениями, данными в формате JSON, входящей с помощью триггера RabbitMQ, которые передаются в таблицу продуктов в базе данных productsdb .
public class QueueTrigger
{
[FunctionName("QueueTriggerBinding")]
[return: Kusto(Database: "productsdb",
TableName = "products",
Connection = "KustoConnectionString")]
public static Product Run(
[RabbitMQTrigger(queueName: "bindings.products.queue", ConnectionStringSetting = "rabbitMQConnectionAppSetting")] Product product,
ILogger log)
{
log.LogInformation($"Dequeued product {product.ProductID}");
return product;
}
}
Дополнительные сведения о функциях см. в Функции Azure документации. Расширение Azure Data Explorer доступно в следующих разделах: