Sdílet prostřednictvím


Integrace Azure Functions s Azure Data Explorerem pomocí vstupních a výstupních vazeb (Preview)

Důležité

Tento konektor se dá použít v sadě Microsoft Fabric v reálném čase. Postupujte podle pokynů v tomto článku s následujícími výjimkami:

Azure Functions umožňuje spouštět bezserverový kód v cloudu podle plánu nebo v reakci na událost. Pomocí vstupních a výstupních vazeb Azure Data Exploreru pro Azure Functions můžete Azure Data Explorer integrovat do pracovních postupů a ingestovat data a spouštět dotazy v clusteru.

Požadavky

Vyzkoušejte integraci s naším ukázkovým projektem

Jak používat vazby Azure Data Exploreru pro Azure Functions

Informace o používání vazeb Azure Data Exploreru pro Azure Functions najdete v následujících tématech:

Scénáře použití vazeb Azure Data Exploreru pro Azure Functions

Následující části popisují některé běžné scénáře použití vazeb Azure Data Exploreru pro Azure Functions.

Vstupní vazby

Vstupní vazby spouští dotaz dotazovací jazyk Kusto (KQL) nebo funkci KQL, volitelně s parametry a vrací výstup funkce.

Následující části popisují, jak v některých běžných scénářích používat vstupní vazby.

Scénář 1: Koncový bod HTTP pro dotazování dat z clusteru

Použití vstupních vazeb je použitelné v situacích, kdy potřebujete zveřejnit data Azure Data Exploreru prostřednictvím rozhraní REST API. V tomto scénáři použijete trigger HTTP služby Azure Functions k dotazování dat v clusteru. Scénář je zvlášť užitečný v situacích, kdy potřebujete poskytnout programový přístup k datům Azure Data Exploreru pro externí aplikace nebo služby. Díky zveřejnění dat prostřednictvím rozhraní REST API můžou aplikace snadno využívat data, aniž by se musely připojovat přímo ke clusteru.

Kód definuje funkci s triggerem HTTP a vstupní vazbou Azure Data Exploreru. Vstupní vazba určuje dotaz, který se má spustit v tabulce Products v databázi productsdb . Funkce používá sloupec productId jako predikát předaný jako parametr.

{
    [FunctionName("GetProduct")]
    public static async Task<IActionResult> RunAsync(
        [HttpTrigger(AuthorizationLevel.User, "get", Route = "getproducts/{productId}")]
        HttpRequest req,
        [Kusto(Database:"productsdb" ,
        KqlCommand = "declare query_parameters (productId:long);Products | where ProductID == productId" ,
        KqlParameters = "@productId={productId}",
        Connection = "KustoConnectionString")]
        IAsyncEnumerable<Product> products)
    {
        IAsyncEnumerator<Product> enumerator = products.GetAsyncEnumerator();
        var productList = new List<Product>();
        while (await enumerator.MoveNextAsync())
        {
            productList.Add(enumerator.Current);
        }
        await enumerator.DisposeAsync();
        return new OkObjectResult(productList);
    }
}

Funkci pak lze vyvolat následujícím způsobem:

curl https://myfunctionapp.azurewebsites.net/api/getproducts/1

Scénář 2: Naplánovaný trigger pro export dat z clusteru

Následující scénář je použitelný v situacích, kdy je potřeba exportovat data podle časového plánu.

Kód definuje funkci s triggerem časovače, který exportuje agregaci prodejních dat z databáze productsdb do souboru CSV ve službě Azure Blob Storage.

public static async Task Run([TimerTrigger("0 0 1 * * *")] TimerInfo myTimer,
    [Kusto(ConnectionStringSetting = "KustoConnectionString",
            DatabaseName = "productsdb",
            Query = "ProductSales | where OrderDate >= ago(1d) | summarize Sales = sum(ProductSales) by ProductName | top 10 by Sales desc")] IEnumerable<dynamic> queryResults,
[Blob("salescontainer/productsblob.csv", FileAccess.Write, Connection = "BlobStorageConnection")] CloudBlockBlob outputBlob,
ILogger log)
{
    // Write the query results to a CSV file
    using (var stream = new MemoryStream())
    using (var writer = new StreamWriter(stream))
    using (var csv = new CsvWriter(writer, CultureInfo.InvariantCulture))
    {
        csv.WriteRecords(queryResults);
        writer.Flush();
        stream.Position = 0;
        await outputBlob.UploadFromStreamAsync(stream);
    }
}

Výstupní vazby

Výstupní vazby přebíjí jeden nebo více řádků a vloží je do tabulky Azure Data Exploreru.

Následující části popisují, jak v některých běžných scénářích používat výstupní vazby.

Scénář 1: Koncový bod HTTP pro příjem dat do clusteru

Následující scénář je použitelný v situacích, kdy je potřeba zpracovat a ingestovat příchozí požadavky HTTP do clusteru. Pomocí výstupní vazby je možné příchozí data z požadavku zapsat do tabulek Azure Data Exploreru.

Kód definuje funkci s triggerem HTTP a výstupní vazbou Azure Data Exploreru. Tato funkce přebírá datovou část JSON v textu požadavku HTTP a zapíše ji do tabulky produktů v databázi productsdb .

public static IActionResult Run(
    [HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "addproductuni")]
    HttpRequest req, ILogger log,
    [Kusto(Database:"productsdb" ,
    TableName ="products" ,
    Connection = "KustoConnectionString")] out Product product)
{
    log.LogInformation($"AddProduct function started");
    string body = new StreamReader(req.Body).ReadToEnd();
    product = JsonConvert.DeserializeObject<Product>(body);
    string productString = string.Format(CultureInfo.InvariantCulture, "(Name:{0} ID:{1} Cost:{2})",
                product.Name, product.ProductID, product.Cost);
    log.LogInformation("Ingested product {}", productString);
    return new CreatedResult($"/api/addproductuni", product);
}

Funkci pak lze vyvolat následujícím způsobem:

curl -X POST https://myfunctionapp.azurewebsites.net/api/addproductuni -d '{"Name":"Product1","ProductID":1,"Cost":100,"ActivatedOn":"2023-01-02T00:00:00"}'

Scénář 2: Příjem dat z RabbitMQ nebo jiných systémů zasílání zpráv podporovaných v Azure

Následující scénář je použitelný v situacích, kdy je potřeba do clusteru ingestovat data ze systému zasílání zpráv. Pomocí výstupní vazby je možné příchozí data ze systému zasílání zpráv ingestovat do tabulek Azure Data Exploreru.

Kód definuje funkci se zprávami, daty ve formátu JSON, příchozí prostřednictvím triggeru RabbitMQ, který se ingestuje do tabulky produktů v databázi productsdb .

public class QueueTrigger
{
    [FunctionName("QueueTriggerBinding")]
    [return: Kusto(Database: "productsdb",
                TableName = "products",
                Connection = "KustoConnectionString")]
    public static Product Run(
        [RabbitMQTrigger(queueName: "bindings.products.queue", ConnectionStringSetting = "rabbitMQConnectionAppSetting")] Product product,
        ILogger log)
    {
        log.LogInformation($"Dequeued product {product.ProductID}");
        return product;
    }
}

Další informace o funkcích najdete v dokumentaci ke službě Azure Functions. Rozšíření Azure Data Exploreru je k dispozici na: