Escenario de distribución ramificada de entrada/salida en Durable Functions: ejemplo de copia de seguridad en la nube
Artículo
La distribución ramificada de entrada y salida hace referencia al patrón de ejecución simultánea de varias funciones y la agregación de resultados. En este artículo se explica un ejemplo que usa Durable Functions para implementar un escenario de distribución ramificada de entrada y salida. El ejemplo es una instancia de Durable Functions que realiza una copia de seguridad parcial o total del contenido de una aplicación en Azure Storage.
Nota:
La versión 4 del modelo de programación de Node.js para Azure Functions está disponible de forma general. El nuevo modelo v4 está diseñado para que los desarrolladores de JavaScript y TypeScript tengan una experiencia más flexible e intuitiva. Obtenga más información sobre las diferencias entre v3 y v4 en la guía de migración.
En los siguientes fragmentos de código, JavaScript (PM4) denota el modelo de programación V4, la nueva experiencia.
En este ejemplo, las funciones cargan todos los archivos de un directorio especificado de forma repetitiva en Blob Storage. También cuenta el número de bytes que se han cargado.
Es posible escribir una sola función que se encargue de todo. El problema principal que encontraría es la escalabilidad. La ejecución de una función solo se puede realizar en una única máquina virtual, por lo que el rendimiento se verá limitado por el de esa VM concreta. Otro problema es la confiabilidad. Si se produce un error a mitad de camino, o si todo el proceso tarda más de 5 minutos, se podría producir un error de copia de seguridad en un estado completado parcialmente. por lo que tendría que reiniciarse.
Un enfoque más sólido sería escribir dos funciones normales: una que enumere los archivos y agregue los nombres de archivo a una cola, y otra que lea de la cola y cargue los archivos en Blob Storage. Este enfoque es mejor en cuanto a rendimiento y confiabilidad, pero habrá que aprovisionar y administrar una cola. Más importante aún, se introduce una complejidad considerable en términos de administración de estado y coordinación si desea hacer algo más, como notificar el total de bytes cargados.
Los enfoques de Durable Functions proporcionan todas las ventajas mencionadas con muy poca sobrecarga.
Funciones
En este artículo se explican las funciones siguientes en la aplicación de ejemplo:
E2_BackupSiteContent: función de orquestador que llama a E2_GetFileList para obtener una lista de archivos de los que se va a realizar una copia de seguridad y, a continuación, llama a E2_CopyFileToBlob para hacer una copia de seguridad de cada archivo.
E2_GetFileList: función de actividad que devuelve una lista de archivos de un directorio.
E2_CopyFileToBlob: función de actividad que realiza una copia de seguridad de un único archivo en Azure Blob Storage.
Función de orquestador E2_BackupSiteContent
Esta función de orquestador básicamente hace lo siguiente:
Toma un valor rootDirectory como parámetro de entrada.
Llama a una función para obtener una lista recursiva de los archivos de rootDirectory.
Realiza varias llamadas de función paralelas para cargar los archivos en Azure Blob Storage.
Espera a que finalicen todas las cargas.
Devuelve los bytes totales que se han cargado en Azure Blob Storage.
Este es el código que implementa la función de orquestador:
[FunctionName("E2_BackupSiteContent")]
public static async Task<long> Run(
[OrchestrationTrigger] IDurableOrchestrationContext backupContext)
{
string rootDirectory = backupContext.GetInput<string>()?.Trim();
if (string.IsNullOrEmpty(rootDirectory))
{
rootDirectory = Directory.GetParent(typeof(BackupSiteContent).Assembly.Location).FullName;
}
string[] files = await backupContext.CallActivityAsync<string[]>(
"E2_GetFileList",
rootDirectory);
var tasks = new Task<long>[files.Length];
for (int i = 0; i < files.Length; i++)
{
tasks[i] = backupContext.CallActivityAsync<long>(
"E2_CopyFileToBlob",
files[i]);
}
await Task.WhenAll(tasks);
long totalBytes = tasks.Sum(t => t.Result);
return totalBytes;
}
Observe la línea await Task.WhenAll(tasks);. No se esperaba ninguna de las llamadas individuales a la función E2_CopyFileToBlob, lo que les permite ejecutarse en paralelo. Cuando se pasa esta matriz de tareas a Task.WhenAll, obtenemos una tarea que no finalizará hasta que se completen todas las operaciones de copia. Si está familiarizado con la biblioteca TPL en. NET, esto no supondrá una novedad para usted. La diferencia es que estas tareas se pueden ejecutar en varias máquinas virtuales al mismo tiempo y la extensión Durable Functions garantiza que la ejecución de un extremo a otro es resistente al reciclaje de procesos.
Después de espera a Task.WhenAll, sabemos que todas las llamadas de función han finalizado y nos han devuelto valores. Cada llamada a E2_CopyFileToBlob devuelve el número de bytes cargados, por lo que calcular el recuento total de bytes es cuestión de agregar todos los valores devueltos.
La función utiliza la norma function.json para las funciones de orquestador.
Este es el código que implementa la función de orquestador:
const df = require("durable-functions");
module.exports = df.orchestrator(function* (context) {
const rootDirectory = context.df.getInput();
if (!rootDirectory) {
throw new Error("A directory path is required as an input.");
}
const files = yield context.df.callActivity("E2_GetFileList", rootDirectory);
// Backup Files and save Promises into array
const tasks = [];
for (const file of files) {
tasks.push(context.df.callActivity("E2_CopyFileToBlob", file));
}
// wait for all the Backup Files Activities to complete, sum total bytes
const results = yield context.df.Task.all(tasks);
const totalBytes = results.reduce((prev, curr) => prev + curr, 0);
// return results;
return totalBytes;
});
Observe la línea yield context.df.Task.all(tasks);. No se generó ninguna de las llamadas individuales a la función E2_CopyFileToBlob, lo que permite que se ejecuten en paralelo. Cuando se pasa esta matriz de tareas a context.df.Task.all, obtenemos una tarea que no finalizará hasta que se completen todas las operaciones de copia. Si está familiarizado con Promise.all en JavaScript, no le parecerá nada nuevo. La diferencia es que estas tareas se pueden ejecutar en varias máquinas virtuales al mismo tiempo y la extensión Durable Functions garantiza que la ejecución de un extremo a otro es resistente al reciclaje de procesos.
Nota
Aunque las tareas son conceptualmente similares a las promesas de JavaScript, las funciones de orquestador deben usar context.df.Task.all y context.df.Task.any, en lugar de Promise.all y Promise.race, para administrar la paralelización de la tarea.
Después de la generación desde context.df.Task.all, sabemos que todas las llamadas de función han finalizado y nos han devuelto valores. Cada llamada a E2_CopyFileToBlob devuelve el número de bytes cargados, por lo que calcular el recuento total de bytes es cuestión de agregar todos los valores devueltos.
Este es el código que implementa la función de orquestador:
const df = require("durable-functions");
const path = require("path");
const getFileListActivityName = "getFileList";
const copyFileToBlobActivityName = "copyFileToBlob";
df.app.orchestration("backupSiteContent", function* (context) {
const rootDir = context.df.getInput();
if (!rootDir) {
throw new Error("A directory path is required as an input.");
}
const rootDirAbs = path.resolve(rootDir);
const files = yield context.df.callActivity(getFileListActivityName, rootDirAbs);
// Backup Files and save Tasks into array
const tasks = [];
for (const file of files) {
const input = {
backupPath: path.relative(rootDirAbs, file).replace("\\", "/"),
filePath: file,
};
tasks.push(context.df.callActivity(copyFileToBlobActivityName, input));
}
// wait for all the Backup Files Activities to complete, sum total bytes
const results = yield context.df.Task.all(tasks);
const totalBytes = results ? results.reduce((prev, curr) => prev + curr, 0) : 0;
// return results;
return totalBytes;
});
Observe la línea yield context.df.Task.all(tasks);. No se generó ninguna de las llamadas individuales a la función copyFileToBlob, lo que permite que se ejecuten en paralelo. Cuando se pasa esta matriz de tareas a context.df.Task.all, obtenemos una tarea que no finalizará hasta que se completen todas las operaciones de copia. Si está familiarizado con Promise.all en JavaScript, no le parecerá nada nuevo. La diferencia es que estas tareas se pueden ejecutar en varias máquinas virtuales al mismo tiempo y la extensión Durable Functions garantiza que la ejecución de un extremo a otro es resistente al reciclaje de procesos.
Nota
Aunque las tareas sean conceptualmente similares a las promesas de JavaScript, las funciones de orquestador deberían usar context.df.Task.all y context.df.Task.any en lugar de Promise.all y Promise.race para administrar la paralelización de tareas.
Después de la generación desde context.df.Task.all, sabemos que todas las llamadas de función han finalizado y nos han devuelto valores. Cada llamada a copyFileToBlob devuelve el número de bytes cargados, por lo que calcular el recuento total de bytes es cuestión de agregar todos los valores devueltos.
La función utiliza la norma function.json para las funciones de orquestador.
Este es el código que implementa la función de orquestador:
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
root_directory: str = context.get_input()
if not root_directory:
raise Exception("A directory path is required as input")
files = yield context.call_activity("E2_GetFileList", root_directory)
tasks = []
for file in files:
tasks.append(context.call_activity("E2_CopyFileToBlob", file))
results = yield context.task_all(tasks)
total_bytes = sum(results)
return total_bytes
main = df.Orchestrator.create(orchestrator_function)
Observe la línea yield context.task_all(tasks);. No se generó ninguna de las llamadas individuales a la función E2_CopyFileToBlob, lo que permite que se ejecuten en paralelo. Cuando se pasa esta matriz de tareas a context.task_all, obtenemos una tarea que no finalizará hasta que se completen todas las operaciones de copia. Si está familiarizado con asyncio.gather en Python, no le parecerá nada nuevo. La diferencia es que estas tareas se pueden ejecutar en varias máquinas virtuales al mismo tiempo y la extensión Durable Functions garantiza que la ejecución de un extremo a otro es resistente al reciclaje de procesos.
Nota
Aunque las tareas son conceptualmente similares a las previstas de Python, las funciones de orquestador deben usar yield, así como las API context.task_all y context.task_any para administrar la paralelización de tareas.
Después de la generación desde context.task_all, sabemos que todas las llamadas de función han finalizado y nos han devuelto valores. Cada llamada a E2_CopyFileToBlob devuelve el número de bytes cargados, por lo que calcular el recuento total de bytes es cuestión de agregar todos los valores devueltos.
Funciones auxiliares de actividad
Las funciones auxiliares de actividad, al igual que otros ejemplos, son básicamente funciones normales que usan el desencadenador de enlace activityTrigger.
import os
from os.path import dirname
from typing import List
def main(rootDirectory: str) -> List[str]:
all_file_paths = []
# We walk the file system
for path, _, files in os.walk(rootDirectory):
# We copy the code for activities and orchestrators
if "E2_" in path:
# For each file, we add their full-path to the list
for name in files:
if name == "__init__.py" or name == "function.json":
file_path = os.path.join(path, name)
all_file_paths.append(file_path)
return all_file_paths
Nota
Tal vez se pregunte por qué no se podía poner directamente el código en la función de orquestador. Se podría, pero esto infringiría una de las reglas fundamentales de funciones de orquestador, que es que nunca hacen E/S, acceso al sistema de archivos local incluido. Para más información, consulte el Restricciones de código de las funciones de orquestador.
[FunctionName("E2_CopyFileToBlob")]
public static async Task<long> CopyFileToBlob(
[ActivityTrigger] string filePath,
Binder binder,
ILogger log)
{
long byteCount = new FileInfo(filePath).Length;
// strip the drive letter prefix and convert to forward slashes
string blobPath = filePath
.Substring(Path.GetPathRoot(filePath).Length)
.Replace('\\', '/');
string outputLocation = $"backups/{blobPath}";
log.LogInformation($"Copying '{filePath}' to '{outputLocation}'. Total bytes = {byteCount}.");
// copy the file contents into a blob
using (Stream source = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read))
using (Stream destination = await binder.BindAsync<CloudBlobStream>(
new BlobAttribute(outputLocation, FileAccess.Write)))
{
await source.CopyToAsync(destination);
}
return byteCount;
}
Nota
Deberá instalar el paquete NuGet Microsoft.Azure.WebJobs.Extensions.Storage para ejecutar el código de ejemplo.
La función usa algunas características de enlace de Azure Functions avanzadas (por ejemplo, el parámetro Binder), pero no tiene que preocuparse de esos detalles para este tutorial.
El archivo function.json para E2_CopyFileToBlob es igual de simple:
import os
import pathlib
from azure.storage.blob import BlobServiceClient
from azure.core.exceptions import ResourceExistsError
connect_str = os.getenv('AzureWebJobsStorage')
def main(filePath: str) -> str:
# Create the BlobServiceClient object which will be used to create a container client
blob_service_client = BlobServiceClient.from_connection_string(connect_str)
# Create a unique name for the container
container_name = "backups"
# Create the container if it does not exist
try:
blob_service_client.create_container(container_name)
except ResourceExistsError:
pass
# Create a blob client using the local file name as the name for the blob
parent_dir, fname = pathlib.Path(filePath).parts[-2:] # Get last two path components
blob_name = parent_dir + "_" + fname
blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
# Count bytes in file
byte_count = os.path.getsize(filePath)
# Upload the created file
with open(filePath, "rb") as data:
blob_client.upload_blob(data)
return byte_count
Con la implementación se carga el archivo desde el disco y transmite de forma asincrónica el contenido a un blob con el mismo nombre en el contenedor "backups". El valor devuelto es el número de bytes copiados en el almacenamiento, que utilizará la función de orquestador para calcular el agregado total.
Nota
Es un ejemplo perfecto de cómo se mueven las operaciones de E/S a una función activityTrigger. No solo puede distribuir el trabajo en muchas máquinas virtuales diferentes, sino que también obtiene las ventajas de establecer puntos de control del progreso. Si el proceso de host se finaliza por alguna razón, sabrá qué cargas ya se han completado.
Ejecución del ejemplo
Puede iniciar la orquestación en Windows mediante el envío de la siguiente solicitud HTTP POST.
POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20
"D:\\home\\LogFiles"
Como alternativa, en una aplicación de funciones Linux (Python actualmente solo se ejecuta en Linux para App Service), puede iniciar la orquestación de la siguiente manera:
POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20
"/home/site/wwwroot"
Nota
La función HttpStart que se está invocando solo funciona con contenido con formato JSON. Por este motivo, se necesita el encabezado Content-Type: application/json y se codifica la ruta de acceso de directorio como cadena JSON. Además, el fragmento de código HTTP da por supuesto que hay una entrada en el archivo host.json que elimina el prefijo api/ predeterminado de todas las direcciones URL de las funciones de activación de HTTP. Puede encontrar el marcador para esta configuración en el archivo host.json en los ejemplos.
Esta solicitud HTTP desencadena el orquestador E2_BackupSiteContent y pasa la cadena D:\home\LogFiles como un parámetro. La respuesta proporciona un vínculo para obtener el estado de esta operación de copia de seguridad:
Dependiendo de cuántos archivos de registro tenga en la aplicación de función, esta operación puede tardar varios minutos en completarse. Puede obtener el estado más reciente al consultar la dirección URL en el encabezado Location de la respuesta HTTP 202 anterior.
GET http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
En este caso, la función todavía se está ejecutando. Es posible que vea la entrada que se guardó en el estado de orquestador y la hora de la última actualización. Se pueden seguir usando los valores del encabezado Location para sondear la finalización. Cuando el estado sea "Completado", verá un valor de respuesta HTTP similar al siguiente:
HTTP/1.1 200 OK
Content-Length: 152
Content-Type: application/json; charset=utf-8
{"runtimeStatus":"Completed","input":"D:\\home\\LogFiles","output":452071,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:26Z"}
Ahora verá que la orquestación ha finalizado y el tiempo aproximado que tardó en completarse. También verá un valor para el campo output, lo que indica que se han cargado alrededor de 450 KB de registros.
Pasos siguientes
Este ejemplo ha mostrado cómo implementar el modelo de distribución ramificada de salida y entrada. En el ejemplo siguiente se muestra cómo implementar el patrón de supervisión con temporizadores durables.