Aktualizowanie lub scalanie rekordów w usłudze Azure SQL Database za pomocą usługi Azure Functions
Obecnie usługa Azure Stream Analytics (ASA) obsługuje tylko wstawianie (dołączanie) wierszy do danych wyjściowych SQL (Azure SQL Databases i Azure Synapse Analytics). W tym artykule omówiono obejścia dotyczące włączania aktualizacji, operacji UPSERT lub MERGE w bazach danych SQL z usługą Azure Functions jako warstwą pośredniczącą.
Alternatywne opcje usługi Azure Functions są prezentowane na końcu.
Wymaganie
Zapisywanie danych w tabeli zwykle odbywa się w następujący sposób:
Tryb | Równoważna instrukcja języka T-SQL | Wymagania |
---|---|---|
Dołączanie | INSERT | Brak |
Replace | MERGE (UPSERT) | Unikatowy klucz |
Gromadzić | MERGE (UPSERT) z operatorem przypisania złożonego (+= , -= ...) |
Unikatowy klucz i akumulator |
Aby zilustrować różnice, zapoznaj się z tym, co się dzieje podczas pozyskiwania następujących dwóch rekordów:
Arrival_Time | Device_Id | Measure_Value |
---|---|---|
10:00 | A | 1 |
10:05 | A | 20 |
W trybie dołączania wstawiamy dwa rekordy. Równoważna instrukcja języka T-SQL to:
INSERT INTO [target] VALUES (...);
Wynikowe:
Modified_Time | Device_Id | Measure_Value |
---|---|---|
10:00 | A | 1 |
10:05 | A | 20 |
W trybie zamiany uzyskujemy tylko ostatnią wartość według klucza. W tym miejscu użyjemy Device_Id jako klucza. Równoważna instrukcja języka T-SQL to:
MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace when the key exists
WHEN MATCHED THEN
UPDATE SET
t.Modified_Time = v.Modified_Time,
t.Measure_Value = v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
INSERT (Modified_Time,Device_Key,Measure_Value)
VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)
Wynikowe:
Modified_Time | Device_Key | Measure_Value |
---|---|---|
10:05 | A | 20 |
Na koniec w trybie kumulowania sumujemy za Value
pomocą operatora przypisania złożonego (+=
). W tym miejscu użyjemy również Device_Id jako klucza:
MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace and/or accumulate when the key exists
WHEN MATCHED THEN
UPDATE SET
t.Modified_Time = v.Modified_Time,
t.Measure_Value += v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
INSERT (Modified_Time,Device_Key,Measure_Value)
VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)
Wynikowe:
Modified_Time | Device_Key | Measure_Value |
---|---|---|
10:05 | A | 21 |
W przypadku zagadnień dotyczących wydajności adaptery wyjściowe bazy danych SQL usługi ASA obsługują obecnie tylko tryb dołączania natywnie. Te karty używają wstawiania zbiorczego, aby zmaksymalizować przepływność i ograniczyć ciśnienie wsteczne.
W tym artykule pokazano, jak za pomocą usługi Azure Functions zaimplementować tryby zastępowania i kumulowania dla usługi ASA. Jeśli używasz funkcji jako warstwy pośredniej, potencjalna wydajność zapisu nie wpłynie na zadanie przesyłania strumieniowego. W związku z tym korzystanie z usługi Azure Functions działa najlepiej z usługą Azure SQL. W przypadku usługi Synapse SQL przejście z instrukcji zbiorczych na wiersz po wiersz może spowodować większe problemy z wydajnością.
Dane wyjściowe usługi Azure Functions
W naszym zadaniu zastąpimy dane wyjściowe usługi ASA SQL danymi wyjściowymi usługi Azure Functions usługi ASA. Funkcje UPDATE, UPSERT lub MERGE są implementowane w funkcji .
Obecnie istnieją dwie opcje uzyskiwania dostępu do usługi SQL Database w funkcji. Najpierw jest powiązanie danych wyjściowych usługi Azure SQL. Obecnie jest on ograniczony do języka C#i oferuje tylko tryb zamiany. Po drugie należy utworzyć zapytanie SQL, które ma zostać przesłane za pomocą odpowiedniego sterownika SQL (Microsoft.Data.SqlClient dla platformy .NET).
W przypadku obu poniższych przykładów przyjęto założenie, że w poniższym schemacie tabeli. Opcja powiązania wymaga ustawienia klucza podstawowego w tabeli docelowej. Nie jest to konieczne, ale zalecane w przypadku korzystania ze sterownika SQL.
CREATE TABLE [dbo].[device_updated](
[DeviceId] [bigint] NOT NULL, -- bigint in ASA
[Value] [decimal](18, 10) NULL, -- float in ASA
[Timestamp] [datetime2](7) NULL, -- datetime in ASA
CONSTRAINT [PK_device_updated] PRIMARY KEY CLUSTERED
(
[DeviceId] ASC
)
);
Funkcja musi spełniać następujące oczekiwania, które mają być używane jako dane wyjściowe z usługi ASA:
- Usługa Azure Stream Analytics oczekuje stanu HTTP 200 z aplikacji usługi Functions dla partii, które zostały pomyślnie przetworzone
- Gdy usługa Azure Stream Analytics otrzymuje wyjątek 413 ("http Request Entity Too Large") z funkcji platformy Azure, zmniejsza rozmiar partii wysyłanych do funkcji platformy Azure
- Podczas połączenia testowego usługa Stream Analytics wysyła żądanie POST z pustą partią do usługi Azure Functions i oczekuje stanu HTTP 20x z powrotem w celu zweryfikowania testu
Opcja 1. Aktualizacja według klucza za pomocą powiązania SQL funkcji platformy Azure
Ta opcja używa powiązania danych wyjściowych SQL funkcji platformy Azure. To rozszerzenie może zastąpić obiekt w tabeli bez konieczności pisania instrukcji SQL. Obecnie nie obsługuje operatorów przypisania złożonego (akumulacji).
Ten przykład został skompilowany na:
- Środowisko uruchomieniowe usługi Azure Functions w wersji 4
- .NET 6.0
- Microsoft.Azure.WebJobs.Extensions.Sql 0.1.131-preview
Aby lepiej zrozumieć podejście do powiązania, zaleca się wykonanie czynności z tego samouczka.
Najpierw utwórz domyślną aplikację funkcji HttpTrigger, wykonując czynności opisane w tym samouczku. Używane są następujące informacje:
- Język:
C#
- Środowisko uruchomieniowe:
.NET 6
(w obszarze function/runtime v4) - Szablon:
HTTP trigger
Zainstaluj rozszerzenie powiązania, uruchamiając następujące polecenie w terminalu znajdującym się w folderze projektu:
dotnet add package Microsoft.Azure.WebJobs.Extensions.Sql --prerelease
SqlConnectionString
Dodaj element w Values
sekcji local.settings.json
pliku , wypełniając parametry połączenia serwera docelowego:
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"SqlConnectionString": "Your connection string"
}
}
Zastąp całą funkcję (plik .cs w projekcie) następującym fragmentem kodu. Zaktualizuj własną przestrzeń nazw, nazwę klasy i nazwę funkcji:
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
namespace Company.Function
{
public static class HttpTrigger1{
[FunctionName("HttpTrigger1")]
public static async Task<IActionResult> Run (
// http trigger binding
[HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
ILogger log,
[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
)
{
// Extract the body from the request
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
dynamic data = JsonConvert.DeserializeObject(requestBody);
// Reject if too large, as per the doc
if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
// Parse items and send to binding
for (var i = 0; i < data.Count; i++)
{
var device = new Device();
device.DeviceId = data[i].DeviceId;
device.Value = data[i].Value;
device.Timestamp = data[i].Timestamp;
await devices.AddAsync(device);
}
await devices.FlushAsync();
return new OkResult(); // 200
}
}
public class Device{
public int DeviceId { get; set; }
public double Value { get; set; }
public DateTime Timestamp { get; set; }
}
}
Zaktualizuj nazwę tabeli docelowej w sekcji powiązania:
[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
Zaktualizuj sekcję Device
klasy i mapowania, aby pasować do własnego schematu:
...
device.DeviceId = data[i].DeviceId;
device.Value = data[i].Value;
device.Timestamp = data[i].Timestamp;
...
public class Device{
public int DeviceId { get; set; }
public double Value { get; set; }
public DateTime Timestamp { get; set; }
Możesz teraz przetestować okablowanie między funkcją lokalną a bazą danych, debugując (F5 w programie Visual Studio Code). Baza danych SQL musi być osiągalna z twojej maszyny. Program SSMS może służyć do sprawdzania łączności. Następnie wyślij żądania POST do lokalnego punktu końcowego. Żądanie z pustą treścią powinno zwrócić http 204. Żądanie z rzeczywistym ładunkiem powinno być utrwalane w tabeli docelowej (w trybie zamiany/aktualizacji). Oto przykładowy ładunek odpowiadający schematowi użytemu w tym przykładzie:
[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]
Funkcję można teraz opublikować na platformie Azure. Dla ustawienia aplikacji należy ustawić SqlConnectionString
wartość . Zapora programu Azure SQL Server powinna zezwalać usługom platformy Azure na dostęp do funkcji na żywo.
Następnie funkcję można zdefiniować jako dane wyjściowe w zadaniu asa i użyć do zastąpienia rekordów zamiast ich wstawiania.
Opcja 2. Scalanie z przypisaniem złożonym (skumulowanym) za pomocą niestandardowego zapytania SQL
Uwaga
Po ponownym uruchomieniu i odzyskiwaniu usługa ASA może ponownie wysyłać zdarzenia wyjściowe, które zostały już emitowane. Jest to oczekiwane zachowanie, które może spowodować niepowodzenie logiki akumulowania (podwojenie poszczególnych wartości). Aby temu zapobiec, zaleca się wyprowadzenie tych samych danych w tabeli za pośrednictwem natywnych danych wyjściowych USŁUGI ASA SQL. Ta tabela sterowania może następnie służyć do wykrywania problemów i ponownego synchronizowania akumulowania w razie potrzeby.
Ta opcja używa elementu Microsoft.Data.SqlClient. Ta biblioteka umożliwia nam wystawianie zapytań SQL w usłudze SQL Database.
Ten przykład został skompilowany na:
- Środowisko uruchomieniowe usługi Azure Functions w wersji 4
- .NET 6.0
- Microsoft.Data.SqlClient 4.0.0
Najpierw utwórz domyślną aplikację funkcji HttpTrigger, wykonując czynności opisane w tym samouczku. Używane są następujące informacje:
- Język:
C#
- Środowisko uruchomieniowe:
.NET 6
(w obszarze function/runtime v4) - Szablon:
HTTP trigger
Zainstaluj bibliotekę SqlClient, uruchamiając następujące polecenie w terminalu znajdującym się w folderze projektu:
dotnet add package Microsoft.Data.SqlClient --version 4.0.0
SqlConnectionString
Dodaj element w Values
sekcji local.settings.json
pliku , wypełniając parametry połączenia serwera docelowego:
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"SqlConnectionString": "Your connection string"
}
}
Zastąp całą funkcję (plik .cs w projekcie) następującym fragmentem kodu. Zaktualizuj własną przestrzeń nazw, nazwę klasy i nazwę funkcji:
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Data.SqlClient;
namespace Company.Function
{
public static class HttpTrigger1{
[FunctionName("HttpTrigger1")]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
ILogger log)
{
// Extract the body from the request
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
dynamic data = JsonConvert.DeserializeObject(requestBody);
// Reject if too large, as per the doc
if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
var SqlConnectionString = Environment.GetEnvironmentVariable("SqlConnectionString");
using (SqlConnection conn = new SqlConnection(SqlConnectionString))
{
conn.Open();
// Parse items and send to binding
for (var i = 0; i < data.Count; i++)
{
int DeviceId = data[i].DeviceId;
double Value = data[i].Value;
DateTime Timestamp = data[i].Timestamp;
var sqltext =
$"MERGE INTO [device_updated] AS old " +
$"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
$"ON new.DeviceId = old.DeviceId " +
$"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
$"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";
//log.LogInformation($"Running {sqltext}");
using (SqlCommand cmd = new SqlCommand(sqltext, conn))
{
// Execute the command and log the # rows affected.
var rows = await cmd.ExecuteNonQueryAsync();
log.LogInformation($"{rows} rows updated");
}
}
conn.Close();
}
return new OkResult(); // 200
}
}
}
Zaktualizuj sekcję sqltext
kompilowania poleceń, aby pasować do własnego schematu (zwróć uwagę, jak akumulacja jest osiągana za pośrednictwem +=
operatora podczas aktualizacji):
var sqltext =
$"MERGE INTO [device_updated] AS old " +
$"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
$"ON new.DeviceId = old.DeviceId " +
$"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
$"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";
Teraz można przetestować okablowanie między funkcją lokalną a bazą danych, debugując (F5 w programie VS Code). Baza danych SQL musi być osiągalna z twojej maszyny. Program SSMS może służyć do sprawdzania łączności. Następnie wysyłaj żądania POST do lokalnego punktu końcowego. Żądanie z pustą treścią powinno zwrócić http 204. Żądanie z rzeczywistym ładunkiem powinno być utrwalane w tabeli docelowej (w trybie skumulowanym/scalania). Oto przykładowy ładunek odpowiadający schematowi użytemu w tym przykładzie:
[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]
Funkcję można teraz opublikować na platformie Azure. Dla ustawienia aplikacji należy ustawić SqlConnectionString
wartość . Zapora programu Azure SQL Server powinna zezwalać usługom platformy Azure na dostęp do funkcji na żywo.
Następnie funkcję można zdefiniować jako dane wyjściowe w zadaniu asa i użyć do zastąpienia rekordów zamiast ich wstawiania.
Alternatywy
Poza usługą Azure Functions istnieje wiele sposobów osiągnięcia oczekiwanego wyniku. Ta sekcja zawiera niektóre z nich.
Przetwarzanie końcowe w docelowej usłudze SQL Database
Zadanie w tle działa po wstawieniu danych do bazy danych za pośrednictwem standardowych danych wyjściowych usługi ASA SQL.
W przypadku usługi Azure SQL INSTEAD OF
wyzwalacze DML mogą służyć do przechwytywania poleceń INSERT wydanych przez usługę ASA:
CREATE TRIGGER tr_devices_updated_upsert ON device_updated INSTEAD OF INSERT
AS
BEGIN
MERGE device_updated AS old
-- In case of duplicates on the key below, use a subquery to make the key unique via aggregation or ranking functions
USING inserted AS new
ON new.DeviceId = old.DeviceId
WHEN MATCHED THEN
UPDATE SET
old.Value += new.Value,
old.Timestamp = new.Timestamp
WHEN NOT MATCHED THEN
INSERT (DeviceId, Value, Timestamp)
VALUES (new.DeviceId, new.Value, new.Timestamp);
END;
W przypadku usługi Synapse SQL usługa ASA może wstawić do tabeli przejściowej. Zadanie cykliczne może następnie przekształcić dane zgodnie z potrzebami w tabelę pośredniczącą. Na koniec dane są przenoszone do tabeli produkcyjnej.
Przetwarzanie wstępne w usłudze Azure Cosmos DB
Usługa Azure Cosmos DB obsługuje natywnie funkcję UPSERT. Tutaj możliwe jest tylko dołączanie/zastępowanie. Akumulacje muszą być zarządzane po stronie klienta w usłudze Azure Cosmos DB.
Jeśli wymagania są zgodne, opcją jest zastąpienie docelowej bazy danych SQL przez wystąpienie usługi Azure Cosmos DB. W ten sposób wymagana jest ważna zmiana ogólnej architektury rozwiązania.
W przypadku usługi Synapse SQL usługa Azure Cosmos DB może służyć jako warstwa pośrednicząca za pośrednictwem usługi Azure Synapse Link dla usługi Azure Cosmos DB. Usługa Azure Synapse Link może służyć do tworzenia magazynu analitycznego. Ten magazyn danych można następnie wykonywać zapytania bezpośrednio w usłudze Synapse SQL.
Porównanie alternatyw
Każde podejście oferuje różne propozycje wartości i możliwości:
Typ | Opcja | Tryby | Azure SQL Database | Azure Synapse Analytics |
---|---|---|---|---|
Przetwarzanie końcowe | ||||
Wyzwalacze | Zamień, Akumuluj | + | Nie dotyczy, wyzwalacze nie są dostępne w usłudze Synapse SQL | |
Przygotowanie | Zamień, Akumuluj | + | + | |
Wstępne przetwarzanie | ||||
Azure Functions | Zamień, Akumuluj | + | - (wydajność wiersz po wierszu) | |
Zastąpienie usługi Azure Cosmos DB | Replace | Brak | Brak | |
Azure Cosmos DB Azure Synapse Link | Replace | Nie dotyczy | + |
Uzyskiwanie pomocy technicznej
Aby uzyskać dalszą pomoc, wypróbuj stronę pytań i odpowiedzi firmy Microsoft dotyczącą usługi Azure Stream Analytics.
Następne kroki
- Omówienie danych wyjściowych z usługi Azure Stream Analytics
- Dane wyjściowe usługi Azure Stream Analytics do usługi Azure SQL Database
- Zwiększanie wydajności przepływności do usługi Azure SQL Database z usługi Azure Stream Analytics
- Używanie tożsamości zarządzanych do uzyskiwania dostępu do usługi Azure SQL Database lub Azure Synapse Analytics z zadania usługi Azure Stream Analytics
- Używanie danych referencyjnych z usługi SQL Database dla zadania usługi Azure Stream Analytics
- Uruchamianie usługi Azure Functions w zadaniach usługi Azure Stream Analytics — samouczek dotyczący danych wyjściowych usługi Redis
- Szybki start: tworzenie zadania usługi Stream Analytics przy użyciu witryny Azure Portal