Compartir a través de


Integración de Azure Functions con Azure Data Explorer mediante enlaces de entrada y salida (versión preliminar)

Importante

Este conector se puede usar en Inteligencia de tiempo real en Microsoft Fabric. Use las instrucciones de este artículo con las siguientes excepciones:

Azure Functions le permite ejecutar código sin servidor en la nube según una programación o en respuesta a un evento. Con los enlaces de entrada y salida de Azure Data Explorer para Azure Functions, puede integrar Azure Data Explorer en los flujos de trabajo para ingerir datos y ejecutar consultas en el clúster.

Requisitos previos

Prueba de la integración con nuestro proyecto de ejemplo

Uso de enlaces de Azure Data Explorer para Azure Functions

Para obtener información sobre cómo usar enlaces de Azure Data Explorer para Azure Functions, consulte los temas siguientes:

Escenarios para usar enlaces de Azure Data Explorer para Azure Functions

En las secciones siguientes se describen algunos escenarios comunes para usar enlaces de Azure Data Explorer para Azure Functions.

Enlaces de entrada

Los enlaces de entrada ejecutan una consulta del lenguaje de consulta Kusto (KQL) o una función KQL, opcionalmente con parámetros, y devuelven la salida a la función.

En las secciones siguientes se describe cómo usar enlaces de entrada en algunos escenarios comunes.

Escenario 1: un punto de conexión HTTP para consultar datos de un clúster

El uso de enlaces de entrada es aplicable en situaciones en las que es necesario exponer datos de Azure Data Explorer a través de una API de REST. En este escenario, se usa un desencadenador HTTP de Azure Functions para consultar datos en el clúster. El escenario es especialmente útil en situaciones en las que es necesario proporcionar acceso mediante programación a los datos de Azure Data Explorer para aplicaciones o servicios externos. Al exponer los datos a través de una API de REST, las aplicaciones pueden consumir fácilmente los datos sin necesidad de que se conecten directamente al clúster.

El código define una función con un desencadenador HTTP y un enlace de entrada de Azure Data Explorer. El enlace de entrada especifica la consulta que se ejecutará en la tabla Productos de la base de datos de productsdb. La función usa la columna productId como predicado que se pasa como parámetro.

{
    [FunctionName("GetProduct")]
    public static async Task<IActionResult> RunAsync(
        [HttpTrigger(AuthorizationLevel.User, "get", Route = "getproducts/{productId}")]
        HttpRequest req,
        [Kusto(Database:"productsdb" ,
        KqlCommand = "declare query_parameters (productId:long);Products | where ProductID == productId" ,
        KqlParameters = "@productId={productId}",
        Connection = "KustoConnectionString")]
        IAsyncEnumerable<Product> products)
    {
        IAsyncEnumerator<Product> enumerator = products.GetAsyncEnumerator();
        var productList = new List<Product>();
        while (await enumerator.MoveNextAsync())
        {
            productList.Add(enumerator.Current);
        }
        await enumerator.DisposeAsync();
        return new OkObjectResult(productList);
    }
}

A continuación, se puede invocar la función, como se indica a continuación:

curl https://myfunctionapp.azurewebsites.net/api/getproducts/1

Escenario 2: un desencadenador programado para exportar datos desde un clúster

El escenario siguiente es aplicable en situaciones en las que los datos deben exportarse según una programación basada en el tiempo.

El código define una función con un desencadenador de temporizador que exporta una agregación de datos de ventas de la base de datos productsdb a un archivo CSV en Azure Blob Storage.

public static async Task Run([TimerTrigger("0 0 1 * * *")] TimerInfo myTimer,
    [Kusto(ConnectionStringSetting = "KustoConnectionString",
            DatabaseName = "productsdb",
            Query = "ProductSales | where OrderDate >= ago(1d) | summarize Sales = sum(ProductSales) by ProductName | top 10 by Sales desc")] IEnumerable<dynamic> queryResults,
[Blob("salescontainer/productsblob.csv", FileAccess.Write, Connection = "BlobStorageConnection")] CloudBlockBlob outputBlob,
ILogger log)
{
    // Write the query results to a CSV file
    using (var stream = new MemoryStream())
    using (var writer = new StreamWriter(stream))
    using (var csv = new CsvWriter(writer, CultureInfo.InvariantCulture))
    {
        csv.WriteRecords(queryResults);
        writer.Flush();
        stream.Position = 0;
        await outputBlob.UploadFromStreamAsync(stream);
    }
}

Enlaces de salida

Los enlaces de salida toman una o varias filas e las insertan en una tabla de Azure Data Explorer.

En las secciones siguientes se describe cómo usar enlaces de salida en algunos escenarios comunes.

Escenario 1: punto de conexión HTTP para ingerir datos en un clúster

El escenario siguiente es aplicable en situaciones en las que es necesario procesar e ingerir solicitudes HTTP entrantes en el clúster. Mediante un enlace de salida, los datos entrantes de la solicitud se pueden escribir en tablas de Azure Data Explorer.

El código define una función con un desencadenador HTTP y un enlace de salida de Azure Data Explorer. Esta función toma una carga JSON en el cuerpo de la solicitud HTTP y la escribe en la tabla productos de la base de datos productsdb.

public static IActionResult Run(
    [HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "addproductuni")]
    HttpRequest req, ILogger log,
    [Kusto(Database:"productsdb" ,
    TableName ="products" ,
    Connection = "KustoConnectionString")] out Product product)
{
    log.LogInformation($"AddProduct function started");
    string body = new StreamReader(req.Body).ReadToEnd();
    product = JsonConvert.DeserializeObject<Product>(body);
    string productString = string.Format(CultureInfo.InvariantCulture, "(Name:{0} ID:{1} Cost:{2})",
                product.Name, product.ProductID, product.Cost);
    log.LogInformation("Ingested product {}", productString);
    return new CreatedResult($"/api/addproductuni", product);
}

A continuación, se puede invocar la función, como se indica a continuación:

curl -X POST https://myfunctionapp.azurewebsites.net/api/addproductuni -d '{"Name":"Product1","ProductID":1,"Cost":100,"ActivatedOn":"2023-01-02T00:00:00"}'

Escenario 2: Ingesta de datos de RabbitMQ u otros sistemas de mensajería compatibles con Azure

El escenario siguiente es aplicable en situaciones en las que es necesario ingerir datos de un sistema de mensajería en el clúster. Mediante un enlace de salida, los datos entrantes del sistema de mensajería se pueden ingerir en tablas de Azure Data Explorer.

El código define una función con mensajes, datos en formato JSON, entrantes a través de un desencadenador RabbitMQ que se ingieren en la tablaproductos de la base de datos productsdb.

public class QueueTrigger
{
    [FunctionName("QueueTriggerBinding")]
    [return: Kusto(Database: "productsdb",
                TableName = "products",
                Connection = "KustoConnectionString")]
    public static Product Run(
        [RabbitMQTrigger(queueName: "bindings.products.queue", ConnectionStringSetting = "rabbitMQConnectionAppSetting")] Product product,
        ILogger log)
    {
        log.LogInformation($"Dequeued product {product.ProductID}");
        return product;
    }
}

Para más información sobre las funciones, consulte Documentación de Azure Functions. La extensión de Azure Data Explorer está disponible en: