Udostępnij za pośrednictwem


Aktualizowanie lub scalanie rekordów w usłudze Azure SQL Database za pomocą usługi Azure Functions

Obecnie usługa Azure Stream Analytics (ASA) obsługuje tylko wstawianie (dołączanie) wierszy do danych wyjściowych SQL (Azure SQL Databases i Azure Synapse Analytics). W tym artykule omówiono obejścia dotyczące włączania aktualizacji, operacji UPSERT lub MERGE w bazach danych SQL z usługą Azure Functions jako warstwą pośredniczącą.

Alternatywne opcje usługi Azure Functions są prezentowane na końcu.

Wymaganie

Zapisywanie danych w tabeli zwykle odbywa się w następujący sposób:

Tryb Równoważna instrukcja języka T-SQL Wymagania
Dołączanie INSERT Brak
Replace MERGE (UPSERT) Unikatowy klucz
Gromadzić MERGE (UPSERT) z operatorem przypisania złożonego (+=, -=...) Unikatowy klucz i akumulator

Aby zilustrować różnice, zapoznaj się z tym, co się dzieje podczas pozyskiwania następujących dwóch rekordów:

Arrival_Time Device_Id Measure_Value
10:00 A 1
10:05 A 20

W trybie dołączania wstawiamy dwa rekordy. Równoważna instrukcja języka T-SQL to:

INSERT INTO [target] VALUES (...);

Wynikowe:

Modified_Time Device_Id Measure_Value
10:00 A 1
10:05 A 20

W trybie zamiany uzyskujemy tylko ostatnią wartość według klucza. W tym miejscu użyjemy Device_Id jako klucza. Równoważna instrukcja języka T-SQL to:

MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace when the key exists
WHEN MATCHED THEN
    UPDATE SET
        t.Modified_Time = v.Modified_Time,
        t.Measure_Value = v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
    INSERT (Modified_Time,Device_Key,Measure_Value)
    VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)

Wynikowe:

Modified_Time Device_Key Measure_Value
10:05 A 20

Na koniec w trybie kumulowania sumujemy za Value pomocą operatora przypisania złożonego (+=). W tym miejscu użyjemy również Device_Id jako klucza:

MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace and/or accumulate when the key exists
WHEN MATCHED THEN
    UPDATE SET
        t.Modified_Time = v.Modified_Time,
        t.Measure_Value += v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
    INSERT (Modified_Time,Device_Key,Measure_Value)
    VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)

Wynikowe:

Modified_Time Device_Key Measure_Value
10:05 A 21

W przypadku zagadnień dotyczących wydajności adaptery wyjściowe bazy danych SQL usługi ASA obsługują obecnie tylko tryb dołączania natywnie. Te karty używają wstawiania zbiorczego, aby zmaksymalizować przepływność i ograniczyć ciśnienie wsteczne.

W tym artykule pokazano, jak za pomocą usługi Azure Functions zaimplementować tryby zastępowania i kumulowania dla usługi ASA. Jeśli używasz funkcji jako warstwy pośredniej, potencjalna wydajność zapisu nie wpłynie na zadanie przesyłania strumieniowego. W związku z tym korzystanie z usługi Azure Functions działa najlepiej z usługą Azure SQL. W przypadku usługi Synapse SQL przejście z instrukcji zbiorczych na wiersz po wiersz może spowodować większe problemy z wydajnością.

Dane wyjściowe usługi Azure Functions

W naszym zadaniu zastąpimy dane wyjściowe usługi ASA SQL danymi wyjściowymi usługi Azure Functions usługi ASA. Funkcje UPDATE, UPSERT lub MERGE są implementowane w funkcji .

Obecnie istnieją dwie opcje uzyskiwania dostępu do usługi SQL Database w funkcji. Najpierw jest powiązanie danych wyjściowych usługi Azure SQL. Obecnie jest on ograniczony do języka C#i oferuje tylko tryb zamiany. Po drugie należy utworzyć zapytanie SQL, które ma zostać przesłane za pomocą odpowiedniego sterownika SQL (Microsoft.Data.SqlClient dla platformy .NET).

W przypadku obu poniższych przykładów przyjęto założenie, że w poniższym schemacie tabeli. Opcja powiązania wymaga ustawienia klucza podstawowego w tabeli docelowej. Nie jest to konieczne, ale zalecane w przypadku korzystania ze sterownika SQL.

CREATE TABLE [dbo].[device_updated](
	[DeviceId] [bigint] NOT NULL, -- bigint in ASA
	[Value] [decimal](18, 10) NULL, -- float in ASA
	[Timestamp] [datetime2](7) NULL, -- datetime in ASA
CONSTRAINT [PK_device_updated] PRIMARY KEY CLUSTERED
(
	[DeviceId] ASC
)
);

Funkcja musi spełniać następujące oczekiwania, które mają być używane jako dane wyjściowe z usługi ASA:

  • Usługa Azure Stream Analytics oczekuje stanu HTTP 200 z aplikacji usługi Functions dla partii, które zostały pomyślnie przetworzone
  • Gdy usługa Azure Stream Analytics otrzymuje wyjątek 413 ("http Request Entity Too Large") z funkcji platformy Azure, zmniejsza rozmiar partii wysyłanych do funkcji platformy Azure
  • Podczas połączenia testowego usługa Stream Analytics wysyła żądanie POST z pustą partią do usługi Azure Functions i oczekuje stanu HTTP 20x z powrotem w celu zweryfikowania testu

Opcja 1. Aktualizacja według klucza za pomocą powiązania SQL funkcji platformy Azure

Ta opcja używa powiązania danych wyjściowych SQL funkcji platformy Azure. To rozszerzenie może zastąpić obiekt w tabeli bez konieczności pisania instrukcji SQL. Obecnie nie obsługuje operatorów przypisania złożonego (akumulacji).

Ten przykład został skompilowany na:

Aby lepiej zrozumieć podejście do powiązania, zaleca się wykonanie czynności z tego samouczka.

Najpierw utwórz domyślną aplikację funkcji HttpTrigger, wykonując czynności opisane w tym samouczku. Używane są następujące informacje:

  • Język: C#
  • Środowisko uruchomieniowe: .NET 6 (w obszarze function/runtime v4)
  • Szablon: HTTP trigger

Zainstaluj rozszerzenie powiązania, uruchamiając następujące polecenie w terminalu znajdującym się w folderze projektu:

dotnet add package Microsoft.Azure.WebJobs.Extensions.Sql --prerelease

SqlConnectionString Dodaj element w Values sekcji local.settings.jsonpliku , wypełniając parametry połączenia serwera docelowego:

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "SqlConnectionString": "Your connection string"
    }
}

Zastąp całą funkcję (plik .cs w projekcie) następującym fragmentem kodu. Zaktualizuj własną przestrzeń nazw, nazwę klasy i nazwę funkcji:

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;

namespace Company.Function
{
    public static class HttpTrigger1{
        [FunctionName("HttpTrigger1")]
        public static async Task<IActionResult> Run (
            // http trigger binding
            [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
            ILogger log,
            [Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
            )
        {

            // Extract the body from the request
            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check

            dynamic data = JsonConvert.DeserializeObject(requestBody);

            // Reject if too large, as per the doc
            if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge

            // Parse items and send to binding
            for (var i = 0; i < data.Count; i++)
            {
                var device = new Device();
                device.DeviceId = data[i].DeviceId;
                device.Value = data[i].Value;
                device.Timestamp = data[i].Timestamp;

                await devices.AddAsync(device);
            }
            await devices.FlushAsync();

            return new OkResult(); // 200
        }
    }

    public class Device{
        public int DeviceId { get; set; }
        public double Value { get; set; }
        public DateTime Timestamp { get; set; }
    }
}

Zaktualizuj nazwę tabeli docelowej w sekcji powiązania:

[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices

Zaktualizuj sekcję Device klasy i mapowania, aby pasować do własnego schematu:

...
                device.DeviceId = data[i].DeviceId;
                device.Value = data[i].Value;
                device.Timestamp = data[i].Timestamp;
...
    public class Device{
        public int DeviceId { get; set; }
        public double Value { get; set; }
        public DateTime Timestamp { get; set; }

Możesz teraz przetestować okablowanie między funkcją lokalną a bazą danych, debugując (F5 w programie Visual Studio Code). Baza danych SQL musi być osiągalna z twojej maszyny. Program SSMS może służyć do sprawdzania łączności. Następnie wyślij żądania POST do lokalnego punktu końcowego. Żądanie z pustą treścią powinno zwrócić http 204. Żądanie z rzeczywistym ładunkiem powinno być utrwalane w tabeli docelowej (w trybie zamiany/aktualizacji). Oto przykładowy ładunek odpowiadający schematowi użytemu w tym przykładzie:

[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]

Funkcję można teraz opublikować na platformie Azure. Dla ustawienia aplikacji należy ustawić SqlConnectionStringwartość . Zapora programu Azure SQL Server powinna zezwalać usługom platformy Azure na dostęp do funkcji na żywo.

Następnie funkcję można zdefiniować jako dane wyjściowe w zadaniu asa i użyć do zastąpienia rekordów zamiast ich wstawiania.

Opcja 2. Scalanie z przypisaniem złożonym (skumulowanym) za pomocą niestandardowego zapytania SQL

Uwaga

Po ponownym uruchomieniu i odzyskiwaniu usługa ASA może ponownie wysyłać zdarzenia wyjściowe, które zostały już emitowane. Jest to oczekiwane zachowanie, które może spowodować niepowodzenie logiki akumulowania (podwojenie poszczególnych wartości). Aby temu zapobiec, zaleca się wyprowadzenie tych samych danych w tabeli za pośrednictwem natywnych danych wyjściowych USŁUGI ASA SQL. Ta tabela sterowania może następnie służyć do wykrywania problemów i ponownego synchronizowania akumulowania w razie potrzeby.

Ta opcja używa elementu Microsoft.Data.SqlClient. Ta biblioteka umożliwia nam wystawianie zapytań SQL w usłudze SQL Database.

Ten przykład został skompilowany na:

Najpierw utwórz domyślną aplikację funkcji HttpTrigger, wykonując czynności opisane w tym samouczku. Używane są następujące informacje:

  • Język: C#
  • Środowisko uruchomieniowe: .NET 6 (w obszarze function/runtime v4)
  • Szablon: HTTP trigger

Zainstaluj bibliotekę SqlClient, uruchamiając następujące polecenie w terminalu znajdującym się w folderze projektu:

dotnet add package Microsoft.Data.SqlClient --version 4.0.0

SqlConnectionString Dodaj element w Values sekcji local.settings.jsonpliku , wypełniając parametry połączenia serwera docelowego:

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "SqlConnectionString": "Your connection string"
    }
}

Zastąp całą funkcję (plik .cs w projekcie) następującym fragmentem kodu. Zaktualizuj własną przestrzeń nazw, nazwę klasy i nazwę funkcji:

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Data.SqlClient;

namespace Company.Function
{
    public static class HttpTrigger1{
        [FunctionName("HttpTrigger1")]
        public static async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
            ILogger log)
        {
            // Extract the body from the request
            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check

            dynamic data = JsonConvert.DeserializeObject(requestBody);

            // Reject if too large, as per the doc
            if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge

            var SqlConnectionString = Environment.GetEnvironmentVariable("SqlConnectionString");
            using (SqlConnection conn = new SqlConnection(SqlConnectionString))
            {
                conn.Open();

                // Parse items and send to binding
                for (var i = 0; i < data.Count; i++)
                {
                    int DeviceId = data[i].DeviceId;
                    double Value = data[i].Value;
                    DateTime Timestamp = data[i].Timestamp;

                    var sqltext =
                    $"MERGE INTO [device_updated] AS old " +
                    $"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
                    $"ON new.DeviceId = old.DeviceId " +
                    $"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
                    $"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";

                    //log.LogInformation($"Running {sqltext}");

                    using (SqlCommand cmd = new SqlCommand(sqltext, conn))
                    {
                        // Execute the command and log the # rows affected.
                        var rows = await cmd.ExecuteNonQueryAsync();
                        log.LogInformation($"{rows} rows updated");
                    }
                }
                conn.Close();
            }
            return new OkResult(); // 200
        }
    }
}

Zaktualizuj sekcję sqltext kompilowania poleceń, aby pasować do własnego schematu (zwróć uwagę, jak akumulacja jest osiągana za pośrednictwem += operatora podczas aktualizacji):

    var sqltext =
    $"MERGE INTO [device_updated] AS old " +
    $"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
    $"ON new.DeviceId = old.DeviceId " +
    $"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
    $"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";

Teraz można przetestować okablowanie między funkcją lokalną a bazą danych, debugując (F5 w programie VS Code). Baza danych SQL musi być osiągalna z twojej maszyny. Program SSMS może służyć do sprawdzania łączności. Następnie wysyłaj żądania POST do lokalnego punktu końcowego. Żądanie z pustą treścią powinno zwrócić http 204. Żądanie z rzeczywistym ładunkiem powinno być utrwalane w tabeli docelowej (w trybie skumulowanym/scalania). Oto przykładowy ładunek odpowiadający schematowi użytemu w tym przykładzie:

[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]

Funkcję można teraz opublikować na platformie Azure. Dla ustawienia aplikacji należy ustawić SqlConnectionStringwartość . Zapora programu Azure SQL Server powinna zezwalać usługom platformy Azure na dostęp do funkcji na żywo.

Następnie funkcję można zdefiniować jako dane wyjściowe w zadaniu asa i użyć do zastąpienia rekordów zamiast ich wstawiania.

Alternatywy

Poza usługą Azure Functions istnieje wiele sposobów osiągnięcia oczekiwanego wyniku. Ta sekcja zawiera niektóre z nich.

Przetwarzanie końcowe w docelowej usłudze SQL Database

Zadanie w tle działa po wstawieniu danych do bazy danych za pośrednictwem standardowych danych wyjściowych usługi ASA SQL.

W przypadku usługi Azure SQL INSTEAD OF wyzwalacze DML mogą służyć do przechwytywania poleceń INSERT wydanych przez usługę ASA:

CREATE TRIGGER tr_devices_updated_upsert ON device_updated INSTEAD OF INSERT
AS
BEGIN
	MERGE device_updated AS old
	
	-- In case of duplicates on the key below, use a subquery to make the key unique via aggregation or ranking functions
	USING inserted AS new
		ON new.DeviceId = old.DeviceId

	WHEN MATCHED THEN 
		UPDATE SET
			old.Value += new.Value, 
			old.Timestamp = new.Timestamp

	WHEN NOT MATCHED THEN
		INSERT (DeviceId, Value, Timestamp)
		VALUES (new.DeviceId, new.Value, new.Timestamp);  
END;

W przypadku usługi Synapse SQL usługa ASA może wstawić do tabeli przejściowej. Zadanie cykliczne może następnie przekształcić dane zgodnie z potrzebami w tabelę pośredniczącą. Na koniec dane są przenoszone do tabeli produkcyjnej.

Przetwarzanie wstępne w usłudze Azure Cosmos DB

Usługa Azure Cosmos DB obsługuje natywnie funkcję UPSERT. Tutaj możliwe jest tylko dołączanie/zastępowanie. Akumulacje muszą być zarządzane po stronie klienta w usłudze Azure Cosmos DB.

Jeśli wymagania są zgodne, opcją jest zastąpienie docelowej bazy danych SQL przez wystąpienie usługi Azure Cosmos DB. W ten sposób wymagana jest ważna zmiana ogólnej architektury rozwiązania.

W przypadku usługi Synapse SQL usługa Azure Cosmos DB może służyć jako warstwa pośrednicząca za pośrednictwem usługi Azure Synapse Link dla usługi Azure Cosmos DB. Usługa Azure Synapse Link może służyć do tworzenia magazynu analitycznego. Ten magazyn danych można następnie wykonywać zapytania bezpośrednio w usłudze Synapse SQL.

Porównanie alternatyw

Każde podejście oferuje różne propozycje wartości i możliwości:

Typ Opcja Tryby Azure SQL Database Azure Synapse Analytics
Przetwarzanie końcowe
Wyzwalacze Zamień, Akumuluj + Nie dotyczy, wyzwalacze nie są dostępne w usłudze Synapse SQL
Przygotowanie Zamień, Akumuluj + +
Wstępne przetwarzanie
Azure Functions Zamień, Akumuluj + - (wydajność wiersz po wierszu)
Zastąpienie usługi Azure Cosmos DB Replace Brak Brak
Azure Cosmos DB Azure Synapse Link Replace Nie dotyczy +

Uzyskiwanie pomocy technicznej

Aby uzyskać dalszą pomoc, wypróbuj stronę pytań i odpowiedzi firmy Microsoft dotyczącą usługi Azure Stream Analytics.

Następne kroki