Compartilhar via


Atualizar ou mesclar registros em Banco de Dados SQL do Azure com Azure Functions

Atualmente, o Azure Stream Analytics (ASA) dá suporte apenas a inserir (anexar) linhas (acrescentar) a saídas SQL (Banco de Dados SQL do Azuree ao Azure Synapse Analytics). Este artigo aborda as soluções alternativas para habilitar UPDATE, UPSERT ou MERGE em bancos de dados SQL, tendo Azure Functions como a camada intermediária.

Opções alternativas para Azure Functions são apresentadas no final.

Requisito

A gravação de dados em uma tabela geralmente pode ser feita da seguinte maneira:

Mode Instrução T-SQL equivalente Requisitos
Acrescentar INSERT Nenhum
Substitua MERGE (UPSERT) Chave exclusiva
Accumulate MERGE (UPSERT) com operador de atribuição composta (+=, -=...) Chave exclusiva e acumulador

Para ilustrar as diferenças, podemos examinar o que acontece ao ingerir os dois registros a seguir:

Arrival_Time Device_Id Measure_Value
10:00 Um 1
10:05 A 20

No modo de acrescentar, inserimos os dois registros. A instrução T-SQL equivalente é:

INSERT INTO [target] VALUES (...);

Resultando em:

Modified_Time Device_Id Measure_Value
10:00 Um 1
10:05 A 20

No modo substituir, obtemos apenas o último valor por chave. Aqui, usamos Device_Id como chave. A instrução T-SQL equivalente é:

MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace when the key exists
WHEN MATCHED THEN
    UPDATE SET
        t.Modified_Time = v.Modified_Time,
        t.Measure_Value = v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
    INSERT (Modified_Time,Device_Key,Measure_Value)
    VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)

Resultando em:

Modified_Time Device_Key Measure_Value
10:05 A 20

Por fim, no modo acumular, somamos Value com um operador de atribuição composta (+=). Aqui, também usamos Device_Id como a chave:

MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace and/or accumulate when the key exists
WHEN MATCHED THEN
    UPDATE SET
        t.Modified_Time = v.Modified_Time,
        t.Measure_Value += v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
    INSERT (Modified_Time,Device_Key,Measure_Value)
    VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)

Resultando em:

Modified_Time Device_Key Measure_Value
10:05 A 21

Para considerações sobre desempenho, os adaptadores de saída de banco de dados ASA SQL atualmente dão suporte apenas ao modo acrescentar nativamente. Esses adaptadores usam inserção em massa para maximizar a taxa de transferência e limitar a pressão de retorno.

Este artigo mostra como usar Azure Functions para implementar modos Substituir e Acumular para o ASA. Ao usar uma função como camada intermediária, o desempenho de gravação potencial não afetará o trabalho de streaming. Nesse sentido, usar Azure Functions funciona melhor com o SQL do Azure. Com o SQL do Synapse, a alternância de instruções de massa para linha por linha pode causar problemas de desempenho maiores.

Saída do Azure Functions

Em nosso trabalho, vamos substituir a saída do ASA SQL pela saída do ASA Azure Functions. As funcionalidades UPDATE, UPSERT ou MERGE serão implementados na função.

Atualmente, há duas opções para acessar um Banco de Dados SQL em uma função. A primeira é a associação de saída do SQL do Azure. No momento, ela está limitada a C# e só oferece o modo substituir. A segunda é compor uma consulta SQL a ser enviada por meio do driver de SQL apropriado (Microsoft.Data.sqlclient para .NET).

Para ambos os exemplos a seguir, presumimos o esquema de tabela a seguir. A opção de associação requer que uma chave primária seja definida na tabela de destino. Isso não é necessário, mas é recomendado, ao usar um driver de SQL.

CREATE TABLE [dbo].[device_updated](
	[DeviceId] [bigint] NOT NULL, -- bigint in ASA
	[Value] [decimal](18, 10) NULL, -- float in ASA
	[Timestamp] [datetime2](7) NULL, -- datetime in ASA
CONSTRAINT [PK_device_updated] PRIMARY KEY CLUSTERED
(
	[DeviceId] ASC
)
);

Uma função deve atender às seguintes expectativas a serem usadas como uma saída do ASA:

  • O Azure Stream Analytics espera o status 200 do HTTP do aplicativo Functions para lotes que foram processados com êxito
  • Quando o Azure Stream Analytics recebe a exceção 413 (“Entidade de solicitação http muito grande”) de uma função do Azure, ele reduz o tamanho dos lotes que envia para o Azure Functions
  • Durante a conexão de teste, o Stream Analytics envia uma solicitação POST com um lote vazio para Azure Functions e espera o status HTTP 20x de volta para validar o teste

Opção 1: atualizar por chave com a associação de SQL do Azure Function

Essa opção usa a associação de saída do Azure Function SQL. Essa extensão pode substituir um objeto em uma tabela, sem a necessidade de escrever uma instrução SQL. Neste momento, ela não dá suporte a operadores de atribuição composta (acumulações).

Este exemplo foi criado em:

Para entender melhor a abordagem de associação, é recomendável seguir este tutorial.

Primeiro, crie um aplicativo de função padrão HttpTrigger seguindo este tutorial. As seguintes informações são usadas:

  • Linguagem de programação: C#
  • Runtime: .NET 6 (em função/runtime v4)
  • Modelo de máquina virtual: HTTP trigger

Instale a extensão de associação executando o seguinte comando em um terminal localizado na pasta do projeto:

dotnet add package Microsoft.Azure.WebJobs.Extensions.Sql --prerelease

Adicione o item SqlConnectionString na seção Values do local.settings.json, preenchendo a cadeia de conexão do servidor de destino:

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "SqlConnectionString": "Your connection string"
    }
}

Substitua toda a função (arquivo. cs no projeto) pelo trecho de código a seguir. Atualize o namespace, o nome da classe e o nome da função por conta própria:

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;

namespace Company.Function
{
    public static class HttpTrigger1{
        [FunctionName("HttpTrigger1")]
        public static async Task<IActionResult> Run (
            // http trigger binding
            [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
            ILogger log,
            [Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
            )
        {

            // Extract the body from the request
            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check

            dynamic data = JsonConvert.DeserializeObject(requestBody);

            // Reject if too large, as per the doc
            if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge

            // Parse items and send to binding
            for (var i = 0; i < data.Count; i++)
            {
                var device = new Device();
                device.DeviceId = data[i].DeviceId;
                device.Value = data[i].Value;
                device.Timestamp = data[i].Timestamp;

                await devices.AddAsync(device);
            }
            await devices.FlushAsync();

            return new OkResult(); // 200
        }
    }

    public class Device{
        public int DeviceId { get; set; }
        public double Value { get; set; }
        public DateTime Timestamp { get; set; }
    }
}

Atualize o nome da tabela de destino na seção associação:

[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices

Atualize a classe Device e seção de mapeamento para corresponder ao seu próprio esquema:

...
                device.DeviceId = data[i].DeviceId;
                device.Value = data[i].Value;
                device.Timestamp = data[i].Timestamp;
...
    public class Device{
        public int DeviceId { get; set; }
        public double Value { get; set; }
        public DateTime Timestamp { get; set; }

Agora você pode testar a conexão entre a função local e o banco de dados por depuração (F5 no Visual Studio Code). O banco de dados SQL precisa estar acessível de seu computador. SSMS pode ser usado para verificar a conectividade. Em seguida, envie solicitações POST para o ponto de extremidade local. Uma solicitação com um corpo vazio deve retornar http 204. Uma solicitação com um payload real deve ser persistida na tabela de destino (no modo substituir/atualizar). Aqui está uma amostra de payload correspondente ao esquema usado neste exemplo:

[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]

A função agora pode ser publicada no Azure. Uma configuração de aplicativo deve ser definida para SqlConnectionString. O firewall do servidor do SQL do Azure deve permitir que os serviços do Azure entrem na função ao vivo para alcançá-lo.

A função pode ser definida como uma saída no trabalho ASA e usada para substituir registros em vez de inseri-los.

Opção 2: mesclar com atribuição composta (acumular) por meio de uma consulta de SQL personalizada

Observação

Após a reinicialização e a recuperação, o ASA pode reenviar os eventos de saída que já tiverem sido emitidos. Esse é um comportamento esperado que pode fazer com que a lógica de acumulação falhe (duplicando valores individuais). Para evitar isso, é recomendável gerar os mesmos dados em uma tabela por meio do ASA nativo da saída SQL. Essa tabela de controle pode então ser usada para detectar problemas e sincronizar novamente o acúmulo quando necessário.

Essa opção usa Microsoft.Data.SqlClient. Essa biblioteca nos permite fazer consultas SQL a um Banco de Dados SQL.

Este exemplo foi criado em:

Primeiro, crie um aplicativo de função padrão HttpTrigger seguindo este tutorial. As seguintes informações são usadas:

  • Linguagem de programação: C#
  • Runtime: .NET 6 (em função/runtime v4)
  • Modelo de máquina virtual: HTTP trigger

Instale a biblioteca SqlClient executando o seguinte comando em um terminal localizado na pasta do projeto:

dotnet add package Microsoft.Data.SqlClient --version 4.0.0

Adicione o item SqlConnectionString na seção Values do local.settings.json, preenchendo a cadeia de conexão do servidor de destino:

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "SqlConnectionString": "Your connection string"
    }
}

Substitua toda a função (arquivo. cs no projeto) pelo trecho de código a seguir. Atualize o namespace, o nome da classe e o nome da função por conta própria:

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Data.SqlClient;

namespace Company.Function
{
    public static class HttpTrigger1{
        [FunctionName("HttpTrigger1")]
        public static async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
            ILogger log)
        {
            // Extract the body from the request
            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check

            dynamic data = JsonConvert.DeserializeObject(requestBody);

            // Reject if too large, as per the doc
            if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge

            var SqlConnectionString = Environment.GetEnvironmentVariable("SqlConnectionString");
            using (SqlConnection conn = new SqlConnection(SqlConnectionString))
            {
                conn.Open();

                // Parse items and send to binding
                for (var i = 0; i < data.Count; i++)
                {
                    int DeviceId = data[i].DeviceId;
                    double Value = data[i].Value;
                    DateTime Timestamp = data[i].Timestamp;

                    var sqltext =
                    $"MERGE INTO [device_updated] AS old " +
                    $"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
                    $"ON new.DeviceId = old.DeviceId " +
                    $"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
                    $"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";

                    //log.LogInformation($"Running {sqltext}");

                    using (SqlCommand cmd = new SqlCommand(sqltext, conn))
                    {
                        // Execute the command and log the # rows affected.
                        var rows = await cmd.ExecuteNonQueryAsync();
                        log.LogInformation($"{rows} rows updated");
                    }
                }
                conn.Close();
            }
            return new OkResult(); // 200
        }
    }
}

Atualize a seção de criação de comando sqltext para corresponder ao seu próprio esquema (observe como o acúmulo é obtido por meio do operador += na atualização):

    var sqltext =
    $"MERGE INTO [device_updated] AS old " +
    $"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
    $"ON new.DeviceId = old.DeviceId " +
    $"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
    $"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";

Agora você pode testar a conexão entre a função local e o banco de dados por depuração (F5 no VS Code). O banco de dados SQL precisa estar acessível de seu computador. SSMS pode ser usado para verificar a conectividade. Em seguida, emita solicitações POST para o ponto de extremidade local. Uma solicitação com um corpo vazio deve retornar http 204. Uma solicitação com um payload real deve ser persistida na tabela de destino (no modo acumular/mesclar). Aqui está uma amostra de payload correspondente ao esquema usado neste exemplo:

[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]

A função agora pode ser publicada no Azure. Uma configuração de aplicativo deve ser definida para SqlConnectionString. O firewall do servidor do SQL do Azure deve permitir que os serviços do Azure entrem na função ao vivo para alcançá-lo.

A função pode ser definida como uma saída no trabalho ASA e usada para substituir registros em vez de inseri-los.

Alternativas

Fora do Azure Functions, há várias maneiras de atingir o resultado esperado. Essa seção fornece alguns deles.

Pós-processamento no Banco de Dados SQL de destino

Uma tarefa em segundo plano funcionará depois que os dados forem inseridos no banco de dados por meio das saídas do SQL ASA padrão.

Para o SQL do Azure, INSTEAD OF os gatilhos DML podem ser usados para interceptar os comandos INSERT emitidos pelo ASA:

CREATE TRIGGER tr_devices_updated_upsert ON device_updated INSTEAD OF INSERT
AS
BEGIN
	MERGE device_updated AS old
	
	-- In case of duplicates on the key below, use a subquery to make the key unique via aggregation or ranking functions
	USING inserted AS new
		ON new.DeviceId = old.DeviceId

	WHEN MATCHED THEN 
		UPDATE SET
			old.Value += new.Value, 
			old.Timestamp = new.Timestamp

	WHEN NOT MATCHED THEN
		INSERT (DeviceId, Value, Timestamp)
		VALUES (new.DeviceId, new.Value, new.Timestamp);  
END;

Para o SQL do Synapse, o ASA pode inserir em uma tabela de preparo. Uma tarefa recorrente pode então transformar os dados conforme necessário em uma tabela intermediária. Por fim, os dados são movidos para a tabela de produção.

Pré-processamento no Azure Cosmos DB

O Azure Cosmos DB dá suporte ao UPSERT nativamente. Aqui, somente é possível acrescentar/substituir. As acumulações devem ser gerenciadas no lado do cliente no Azure Cosmos DB.

Se os requisitos corresponderem, uma opção é substituir o banco de dados de SQL de destino por uma instância do Azure Cosmos DB. Isso requer uma mudança importante na arquitetura geral da solução.

No caso da SQL do Synapse, o Cosmos DB pode ser usado como uma camada intermediária por meio do Link do Azure Synapse para o Azure Cosmos DB. O Link do Synapse pode ser usado para criar um repositório analítico. Esse armazenamento de dados pode ser consultado diretamente no SQL do Synapse.

Comparação das alternativas

Cada abordagem oferece uma proposta de valor e funcionalidades diferentes:

Type Opção Modos Banco de Dados SQL do Azure Azure Synapse Analytics
Pós-processamento
Gatilhos Substituir, Acumular + N/A, os gatilhos não estão disponíveis no SQL do Synapse
Staging Substituir, Acumular + +
Pré-processamento
Azure Functions Substituir, Acumular + – (desempenho de linha por linha)
Substituição do Azure Cosmos DB Substitua N/D N/D
Link do Azure Synapse para Azure Cosmos DB Substitua N/D +

Obtenha suporte

Para obter mais assistência, confira nossa página de Perguntas e respostas do Microsoft do Azure Stream Analytics.

Próximas etapas