Cenário de fan-out/fan-in nas Funções Duráveis – Exemplo de backup em nuvem
Artigo
Fan-out/fan-in é o padrão de executar várias funções simultaneamente e, em seguida, executar alguma agregação dos resultados. Este artigo descreve um exemplo que usa as Funções Duráveis para implementar um cenário de fan-out/fan-in. O exemplo é uma função durável que faz o backup de todo ou de parte do conteúdo do site de um aplicativo no Armazenamento do Azure.
Observação
A versão 4 do modelo de programação Node.js para o Azure Functions está em disponibilidade geral. O novo modelo v4 é projetado para oferecer uma experiência mais flexível e intuitiva para desenvolvedores de JavaScript e TypeScript. Saiba mais sobre as diferenças entre v3 e v4 na guia de migração.
Nos trechos de código a seguir, o JavaScript (PM4) denota o modelo de programação V4, a nova experiência.
Neste exemplo, as funções carregam todos os arquivos em um diretório especificado, recursivamente, no armazenamento de blobs. Elas também contam o número total de bytes que foram carregados.
É possível escrever uma única função que cuida de tudo. O principal problema que você teria seria a escalabilidade. Uma única função só pode ser executada em uma única máquina virtual, de modo que a taxa de transferência seria limitada à taxa de transferência dessa VM. Outro problema é a confiabilidade. Se houver um falha no meio do caminho ou se o processo inteiro levar mais de 5 minutos, o backup poderá falhar com um estado parcialmente concluído. Em seguida, ele precisaria ser reiniciado.
Uma abordagem mais robusta seria escrever duas funções regulares: um enumeraria os arquivos e adicionaria os nomes dos arquivo a uma fila e a outra leria da fila e carregaria os arquivos no armazenamento de blobs. Essa abordagem é melhor em termos de taxa de transferência e confiabilidade, mas requer que você provisione e gerencie uma fila. E, mais importante, uma complexidade significativa é introduzida em termos de gerenciamento de estado e coordenação se você quiser fazer qualquer outra coisa, como relatar o número total de bytes carregados.
Uma abordagem com as Funções Duráveis fornece todos os benefícios mencionados, com pouquíssima sobrecarga.
As funções
Este artigo explica as seguintes funções no aplicativo de exemplo:
E2_BackupSiteContent: uma função de orquestrador que chama E2_GetFileList para obter uma lista de arquivos para fazer backup e, em seguida, chama E2_CopyFileToBlob para fazer backup de cada arquivo.
E2_GetFileList: uma função de atividade que retorna uma lista de arquivos em um diretório.
E2_CopyFileToBlob: uma função de atividade que faz backup de um único arquivo para o Armazenamento de Blobs do Azure.
Função de orquestrador E2_BackupSiteContent
Essa função de orquestrador faz, essencialmente, o seguinte:
Usa um valor de rootDirectory como um parâmetro de entrada.
Chama uma função para obter uma lista recursiva de arquivos em rootDirectory.
Faz várias chamadas de função paralelas para carregar cada arquivo no Armazenamento de Blobs do Azure.
Aguarda que todos os uploads sejam concluídos.
Retorna o total de bytes que foram carregados no Armazenamento de Blobs do Azure.
Este é o código que implementa a função de orquestrador:
[FunctionName("E2_BackupSiteContent")]
public static async Task<long> Run(
[OrchestrationTrigger] IDurableOrchestrationContext backupContext)
{
string rootDirectory = backupContext.GetInput<string>()?.Trim();
if (string.IsNullOrEmpty(rootDirectory))
{
rootDirectory = Directory.GetParent(typeof(BackupSiteContent).Assembly.Location).FullName;
}
string[] files = await backupContext.CallActivityAsync<string[]>(
"E2_GetFileList",
rootDirectory);
var tasks = new Task<long>[files.Length];
for (int i = 0; i < files.Length; i++)
{
tasks[i] = backupContext.CallActivityAsync<long>(
"E2_CopyFileToBlob",
files[i]);
}
await Task.WhenAll(tasks);
long totalBytes = tasks.Sum(t => t.Result);
return totalBytes;
}
Observe a linha await Task.WhenAll(tasks);. Todas as chamadas individuais para a E2_CopyFileToBlob função não foram aguardadas, o que permite que elas sejam executadas em paralelo. Quando passamos essa matriz de tarefas para Task.WhenAll, obtemos uma tarefa que não será concluída até que todas as operações de cópia tenham sido concluídas. Se você estiver familiarizado com a TPL (biblioteca de paralelismo de tarefas) no .NET, isso não é novidade para você. A diferença é que essas tarefas poderiam ser executadas simultaneamente em várias máquinas, virtuais e a extensão Durable Functions assegura que a execução de ponta a ponta seja resiliente em caso de reciclagem do processo.
Depois de aguardar Task.WhenAll, sabemos que todas as chamadas de função foram concluídas e retornaram valores para nós. Cada chamada para E2_CopyFileToBlob retorna o número de bytes carregados, de forma que calcular a contagem total de bytes é uma questão de adicionar todos esses valores retornados.
A função usa o function.json padrão para funções de orquestrador.
Este é o código que implementa a função de orquestrador:
const df = require("durable-functions");
module.exports = df.orchestrator(function* (context) {
const rootDirectory = context.df.getInput();
if (!rootDirectory) {
throw new Error("A directory path is required as an input.");
}
const files = yield context.df.callActivity("E2_GetFileList", rootDirectory);
// Backup Files and save Promises into array
const tasks = [];
for (const file of files) {
tasks.push(context.df.callActivity("E2_CopyFileToBlob", file));
}
// wait for all the Backup Files Activities to complete, sum total bytes
const results = yield context.df.Task.all(tasks);
const totalBytes = results.reduce((prev, curr) => prev + curr, 0);
// return results;
return totalBytes;
});
Observe a linha yield context.df.Task.all(tasks);. Todas as chamadas individuais para a E2_CopyFileToBlob função não foram suspensas, o que permite que elas sejam executadas em paralelo. Quando passamos essa matriz de tarefas para context.df.Task.all, obtemos uma tarefa que não será concluída até que todas as operações de cópia tenham sido concluídas. Se estiver familiarizado com Promise.all em JavaScript, isso não será novidade para você. A diferença é que essas tarefas poderiam ser executadas simultaneamente em várias máquinas, virtuais e a extensão Durable Functions assegura que a execução de ponta a ponta seja resiliente em caso de reciclagem do processo.
Observação
Embora as tarefas sejam conceitualmente semelhantes a promessas JavaScript, funções de orquestrador devem usar context.df.Task.all e context.df.Task.any em vez de Promise.all e Promise.race para gerenciar a paralelização de tarefa.
Depois de suspender context.df.Task.all, sabemos que todas as chamadas de função foram concluídas e retornaram valores para nós. Cada chamada para E2_CopyFileToBlob retorna o número de bytes carregados, de forma que calcular a contagem total de bytes é uma questão de adicionar todos esses valores retornados.
Este é o código que implementa a função de orquestrador:
const df = require("durable-functions");
const path = require("path");
const getFileListActivityName = "getFileList";
const copyFileToBlobActivityName = "copyFileToBlob";
df.app.orchestration("backupSiteContent", function* (context) {
const rootDir = context.df.getInput();
if (!rootDir) {
throw new Error("A directory path is required as an input.");
}
const rootDirAbs = path.resolve(rootDir);
const files = yield context.df.callActivity(getFileListActivityName, rootDirAbs);
// Backup Files and save Tasks into array
const tasks = [];
for (const file of files) {
const input = {
backupPath: path.relative(rootDirAbs, file).replace("\\", "/"),
filePath: file,
};
tasks.push(context.df.callActivity(copyFileToBlobActivityName, input));
}
// wait for all the Backup Files Activities to complete, sum total bytes
const results = yield context.df.Task.all(tasks);
const totalBytes = results ? results.reduce((prev, curr) => prev + curr, 0) : 0;
// return results;
return totalBytes;
});
Observe a linha yield context.df.Task.all(tasks);. Todas as chamadas individuais para a copyFileToBlob função não foram suspensas, o que permite que elas sejam executadas em paralelo. Quando passamos essa matriz de tarefas para context.df.Task.all, obtemos uma tarefa que não será concluída até que todas as operações de cópia tenham sido concluídas. Se estiver familiarizado com Promise.all em JavaScript, isso não será novidade para você. A diferença é que essas tarefas poderiam ser executadas simultaneamente em várias máquinas, virtuais e a extensão Durable Functions assegura que a execução de ponta a ponta seja resiliente em caso de reciclagem do processo.
Observação
Embora as Tarefas sejam conceitualmente semelhantes às promessas do JavaScript, as funções do orquestrador devem usar context.df.Task.all e context.df.Task.any em vez de Promise.all e Promise.race para gerenciar a paralelização de tarefas.
Depois de suspender context.df.Task.all, sabemos que todas as chamadas de função foram concluídas e retornaram valores para nós. Cada chamada para copyFileToBlob retorna o número de bytes carregados, de forma que calcular a contagem total de bytes é uma questão de adicionar todos esses valores retornados.
A função usa o function.json padrão para funções de orquestrador.
Este é o código que implementa a função de orquestrador:
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
root_directory: str = context.get_input()
if not root_directory:
raise Exception("A directory path is required as input")
files = yield context.call_activity("E2_GetFileList", root_directory)
tasks = []
for file in files:
tasks.append(context.call_activity("E2_CopyFileToBlob", file))
results = yield context.task_all(tasks)
total_bytes = sum(results)
return total_bytes
main = df.Orchestrator.create(orchestrator_function)
Observe a linha yield context.task_all(tasks);. Todas as chamadas individuais para a E2_CopyFileToBlob função não foram suspensas, o que permite que elas sejam executadas em paralelo. Quando passamos essa matriz de tarefas para context.task_all, obtemos uma tarefa que não será concluída até que todas as operações de cópia tenham sido concluídas. Se estiver familiarizado com asyncio.gather em Python, isso não será novidade para você. A diferença é que essas tarefas poderiam ser executadas simultaneamente em várias máquinas, virtuais e a extensão Durable Functions assegura que a execução de ponta a ponta seja resiliente em caso de reciclagem do processo.
Observação
Embora as tarefas sejam conceitualmente semelhantes ao Python awaitables, as funções de orquestrador devem usar yield, bem como as APIs context.task_all e context.task_any, para gerenciar a paralelização de tarefas.
Depois de suspender context.task_all, sabemos que todas as chamadas de função foram concluídas e retornaram valores para nós. Cada chamada para E2_CopyFileToBlob retorna o número de bytes carregados, para que possamos calcular a contagem total de bytes somando todos os valores retornados juntos.
Funções de atividade auxiliares
Funções de atividade auxiliares, assim como ocorre com os outros exemplos, são apenas funções regulares que usam a associação de gatilho activityTrigger.
import os
from os.path import dirname
from typing import List
def main(rootDirectory: str) -> List[str]:
all_file_paths = []
# We walk the file system
for path, _, files in os.walk(rootDirectory):
# We copy the code for activities and orchestrators
if "E2_" in path:
# For each file, we add their full-path to the list
for name in files:
if name == "__init__.py" or name == "function.json":
file_path = os.path.join(path, name)
all_file_paths.append(file_path)
return all_file_paths
Observação
Você deve estar se perguntando por que você não poderia simplesmente colocar esse código diretamente na função de orquestrador. Você poderia, mas isso violaria uma das regras fundamentais das funções de orquestrador, de que elas nunca devem executar E/S, incluindo no caso de acesso ao sistema de arquivos local. Para obter mais informações, confira Restrições de código na função de orquestrador.
[FunctionName("E2_CopyFileToBlob")]
public static async Task<long> CopyFileToBlob(
[ActivityTrigger] string filePath,
Binder binder,
ILogger log)
{
long byteCount = new FileInfo(filePath).Length;
// strip the drive letter prefix and convert to forward slashes
string blobPath = filePath
.Substring(Path.GetPathRoot(filePath).Length)
.Replace('\\', '/');
string outputLocation = $"backups/{blobPath}";
log.LogInformation($"Copying '{filePath}' to '{outputLocation}'. Total bytes = {byteCount}.");
// copy the file contents into a blob
using (Stream source = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read))
using (Stream destination = await binder.BindAsync<CloudBlobStream>(
new BlobAttribute(outputLocation, FileAccess.Write)))
{
await source.CopyToAsync(destination);
}
return byteCount;
}
Observação
Você precisará instalar o Microsoft.Azure.WebJobs.Extensions.Storage pacote do NuGet para executar o código de exemplo.
A função usa alguns recursos avançados das associações do Azure Functions (ou seja, o uso do Binder parâmetro), mas você não precisa se preocupar com esses detalhes no contexto deste passo a passo.
O arquivo function.json para E2_CopyFileToBlob também é simples:
A implementação de JavaScript de copyFileToBlob usa uma associação de saída do Armazenamento do Microsoft Azure para carregar os arquivos no armazenamento de Blobs do Azure.
import os
import pathlib
from azure.storage.blob import BlobServiceClient
from azure.core.exceptions import ResourceExistsError
connect_str = os.getenv('AzureWebJobsStorage')
def main(filePath: str) -> str:
# Create the BlobServiceClient object which will be used to create a container client
blob_service_client = BlobServiceClient.from_connection_string(connect_str)
# Create a unique name for the container
container_name = "backups"
# Create the container if it does not exist
try:
blob_service_client.create_container(container_name)
except ResourceExistsError:
pass
# Create a blob client using the local file name as the name for the blob
parent_dir, fname = pathlib.Path(filePath).parts[-2:] # Get last two path components
blob_name = parent_dir + "_" + fname
blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
# Count bytes in file
byte_count = os.path.getsize(filePath)
# Upload the created file
with open(filePath, "rb") as data:
blob_client.upload_blob(data)
return byte_count
A implementação carrega o arquivo no disco e transmite de forma assíncrona o conteúdo para um blob de mesmo nome no contêiner "backups". O valor retornado é o número de bytes copiados para o armazenamento, que é usado pela função de orquestrador para calcular a soma agregada.
Observação
Este é um exemplo perfeito de movimentação de operações de E/S para uma função activityTrigger. Não só o trabalho pode ser distribuído entre várias máquinas virtuais diferentes, mas você também obtém os benefícios de fazer verificações pontuais do progresso. Se o processo de host for encerrado por algum motivo, você saberá quais carregamentos já foram concluídos.
Execute o exemplo
Você pode iniciar a orquestração, no Windows, enviando a solicitação HTTP POST a seguir.
POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20
"D:\\home\\LogFiles"
Como alternativa, em um aplicativo de funções do Linux (o Python atualmente só é executado no Linux para o serviço de aplicativo), você pode iniciar a orquestração da seguinte maneira:
POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20
"/home/site/wwwroot"
Observação
A função HttpStart que você está invocando funciona somente com conteúdo formatado em JSON. Por esse motivo, o cabeçalho Content-Type: application/json é obrigatório e o caminho do diretório é codificado como uma cadeia de caracteres JSON. Além disso, o fragmento de HTTP assume que existe uma entrada no arquivo host.jsonque remove o prefixo padrãoapi/ de todas as URLs de funções do acionador HTTP. Você pode encontrar a marcação para essa configuração no arquivo host.json nos exemplos.
Esta solicitação HTTP dispara o orquestrador E2_BackupSiteContent e passa a cadeia de caracteres D:\home\LogFiles como um parâmetro. A resposta fornece um link para obter o status da operação de backup:
Dependendo de quantos arquivos de log você tiver em seu aplicativo de funções, essa operação pode levar vários minutos para ser concluída. Você pode obter o status mais recente consultando a URL no cabeçalho Location da resposta HTTP 202 anterior.
GET http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
Nesse caso, a função ainda está sendo executada. É possível ver a entrada que foi salva no estado do orquestrador e a hora da última atualização. Você pode continuar usando os valores de cabeçalho Location para sondar a conclusão. Quando o status for "Concluído", você verá um valor de resposta HTTP semelhante ao seguinte:
HTTP/1.1 200 OK
Content-Length: 152
Content-Type: application/json; charset=utf-8
{"runtimeStatus":"Completed","input":"D:\\home\\LogFiles","output":452071,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:26Z"}
Agora, você pode ver que a orquestração foi concluída e aproximadamente quanto tempo ela levou para ser concluída. Você também verá um valor para o campo output, o que indica que cerca de 450 KB de logs foram carregados.
Próximas etapas
Este exemplo mostra como implementar o padrão de fan-out/fan-in. O próximo exemplo mostra como implementar o padrão de monitor usando temporizadores variáveis.