-out/-in scenario i Durable Functions – Exempel på molnsäkerhetskopiering
Artikel
-out/-in refererar till mönstret för att köra flera funktioner samtidigt och sedan utföra en aggregering på resultaten. I den här artikeln beskrivs ett exempel som använder Durable Functions för att implementera ett scenario med in-/ut-fläktar. Exemplet är en varaktig funktion som säkerhetskopierar allt eller en del av en apps webbplatsinnehåll till Azure Storage.
Kommentar
Version 4 av Node.js programmeringsmodellen för Azure Functions är allmänt tillgänglig. Den nya v4-modellen är utformad för att ha en mer flexibel och intuitiv upplevelse för JavaScript- och TypeScript-utvecklare. Läs mer om skillnaderna mellan v3 och v4 i migreringsguiden.
I följande kodfragment anger JavaScript (PM4) programmeringsmodellen V4, den nya upplevelsen.
I det här exemplet laddar funktionerna upp alla filer under en angiven katalog rekursivt till bloblagring. De räknar också det totala antalet byte som har laddats upp.
Det går att skriva en enda funktion som tar hand om allt. Det största problemet du stöter på är skalbarhet. En enda funktionskörning kan bara köras på en enda virtuell dator, så dataflödet begränsas av dataflödet för den enskilda virtuella datorn. Ett annat problem är tillförlitlighet. Om det uppstår ett fel halvvägs igenom, eller om hela processen tar mer än 5 minuter, kan säkerhetskopieringen misslyckas i ett delvis slutfört tillstånd. Den skulle sedan behöva startas om.
En mer robust metod skulle vara att skriva två vanliga funktioner: en skulle räkna upp filerna och lägga till filnamnen i en kö, och en annan skulle läsa från kön och ladda upp filerna till Blob Storage. Den här metoden är bättre när det gäller dataflöde och tillförlitlighet, men du måste etablera och hantera en kö. Ännu viktigare är att betydande komplexitet införs när det gäller statlig hantering och samordning om du vill göra något mer, som att rapportera det totala antalet uppladdade byte.
En Durable Functions-metod ger dig alla de nämnda fördelarna med mycket låga omkostnader.
Funktionerna
I den här artikeln beskrivs följande funktioner i exempelappen:
E2_BackupSiteContent: En orchestrator-funktion som anropar E2_GetFileList för att hämta en lista över filer som ska säkerhetskopieras och sedan anropar E2_CopyFileToBlob för att säkerhetskopiera varje fil.
E2_GetFileList: En aktivitetsfunktion som returnerar en lista över filer i en katalog.
E2_CopyFileToBlob: En aktivitetsfunktion som säkerhetskopierar en enskild fil till Azure Blob Storage.
E2_BackupSiteContent orchestrator-funktion
Den här orkestreringsfunktionen gör i princip följande:
Tar ett rootDirectory värde som indataparameter.
Anropar en funktion för att hämta en rekursiv lista över filer under rootDirectory.
Gör flera parallella funktionsanrop för att ladda upp varje fil till Azure Blob Storage.
Väntar på att alla uppladdningar ska slutföras.
Returnerar summan av totalt antal byte som har laddats upp till Azure Blob Storage.
Här är koden som implementerar orchestrator-funktionen:
[FunctionName("E2_BackupSiteContent")]
public static async Task<long> Run(
[OrchestrationTrigger] IDurableOrchestrationContext backupContext)
{
string rootDirectory = backupContext.GetInput<string>()?.Trim();
if (string.IsNullOrEmpty(rootDirectory))
{
rootDirectory = Directory.GetParent(typeof(BackupSiteContent).Assembly.Location).FullName;
}
string[] files = await backupContext.CallActivityAsync<string[]>(
"E2_GetFileList",
rootDirectory);
var tasks = new Task<long>[files.Length];
for (int i = 0; i < files.Length; i++)
{
tasks[i] = backupContext.CallActivityAsync<long>(
"E2_CopyFileToBlob",
files[i]);
}
await Task.WhenAll(tasks);
long totalBytes = tasks.Sum(t => t.Result);
return totalBytes;
}
Lägg märke till await Task.WhenAll(tasks); raden. Alla enskilda anrop till E2_CopyFileToBlob funktionen väntades inte , vilket gör att de kan köras parallellt. När vi skickar den här uppgiftsmatrisen till Task.WhenAllfår vi tillbaka en uppgift som inte slutförs förrän alla kopieringsåtgärder har slutförts. Om du är bekant med TPL (Task Parallel Library) i .NET är detta inte nytt för dig. Skillnaden är att dessa uppgifter kan köras på flera virtuella datorer samtidigt, och Durable Functions-tillägget säkerställer att körningen från slutpunkt till slutpunkt är motståndskraftig mot processåtervinning.
När vi har väntat från Task.WhenAllvet vi att alla funktionsanrop har slutförts och har returnerat värden tillbaka till oss. Varje anrop till E2_CopyFileToBlob returnerar antalet uppladdade byte, så att beräkna det totala antalet byte handlar om att lägga till alla dessa returvärden tillsammans.
Funktionen använder standard-function.json för orchestrator-funktioner.
Här är koden som implementerar orchestrator-funktionen:
const df = require("durable-functions");
module.exports = df.orchestrator(function* (context) {
const rootDirectory = context.df.getInput();
if (!rootDirectory) {
throw new Error("A directory path is required as an input.");
}
const files = yield context.df.callActivity("E2_GetFileList", rootDirectory);
// Backup Files and save Promises into array
const tasks = [];
for (const file of files) {
tasks.push(context.df.callActivity("E2_CopyFileToBlob", file));
}
// wait for all the Backup Files Activities to complete, sum total bytes
const results = yield context.df.Task.all(tasks);
const totalBytes = results.reduce((prev, curr) => prev + curr, 0);
// return results;
return totalBytes;
});
Lägg märke till yield context.df.Task.all(tasks); raden. Alla enskilda anrop till E2_CopyFileToBlob funktionen har inte getts, vilket gör att de kan köras parallellt. När vi skickar den här uppgiftsmatrisen till context.df.Task.allfår vi tillbaka en uppgift som inte slutförs förrän alla kopieringsåtgärder har slutförts. Om du är bekant med Promise.all JavaScript är detta inte nytt för dig. Skillnaden är att dessa uppgifter kan köras på flera virtuella datorer samtidigt, och Durable Functions-tillägget säkerställer att körningen från slutpunkt till slutpunkt är motståndskraftig mot processåtervinning.
Kommentar
Även om uppgifter begreppsmässigt liknar JavaScript-löften bör orkestreringsfunktionerna använda context.df.Task.all och context.df.Task.any i stället för Promise.all och Promise.race hantera uppgiftsparallellisering.
När vi har gett från context.df.Task.allvet vi att alla funktionsanrop har slutförts och har returnerat värden tillbaka till oss. Varje anrop till E2_CopyFileToBlob returnerar antalet uppladdade byte, så att beräkna det totala antalet byte handlar om att lägga till alla dessa returvärden tillsammans.
Här är koden som implementerar orchestrator-funktionen:
const df = require("durable-functions");
const path = require("path");
const getFileListActivityName = "getFileList";
const copyFileToBlobActivityName = "copyFileToBlob";
df.app.orchestration("backupSiteContent", function* (context) {
const rootDir = context.df.getInput();
if (!rootDir) {
throw new Error("A directory path is required as an input.");
}
const rootDirAbs = path.resolve(rootDir);
const files = yield context.df.callActivity(getFileListActivityName, rootDirAbs);
// Backup Files and save Tasks into array
const tasks = [];
for (const file of files) {
const input = {
backupPath: path.relative(rootDirAbs, file).replace("\\", "/"),
filePath: file,
};
tasks.push(context.df.callActivity(copyFileToBlobActivityName, input));
}
// wait for all the Backup Files Activities to complete, sum total bytes
const results = yield context.df.Task.all(tasks);
const totalBytes = results ? results.reduce((prev, curr) => prev + curr, 0) : 0;
// return results;
return totalBytes;
});
Lägg märke till yield context.df.Task.all(tasks); raden. Alla enskilda anrop till copyFileToBlob funktionen har inte getts, vilket gör att de kan köras parallellt. När vi skickar den här uppgiftsmatrisen till context.df.Task.allfår vi tillbaka en uppgift som inte slutförs förrän alla kopieringsåtgärder har slutförts. Om du är bekant med Promise.all JavaScript är detta inte nytt för dig. Skillnaden är att dessa uppgifter kan köras på flera virtuella datorer samtidigt, och Durable Functions-tillägget säkerställer att körningen från slutpunkt till slutpunkt är motståndskraftig mot processåtervinning.
Kommentar
Även om Uppgifter begreppsmässigt liknar JavaScript-löften bör orkestreringsfunktionerna använda context.df.Task.all och context.df.Task.any i stället för Promise.all och Promise.race hantera uppgiftsparallellisering.
När vi har gett från context.df.Task.allvet vi att alla funktionsanrop har slutförts och har returnerat värden tillbaka till oss. Varje anrop till copyFileToBlob returnerar antalet uppladdade byte, så att beräkna det totala antalet byte handlar om att lägga till alla dessa returvärden tillsammans.
Funktionen använder standard-function.json för orchestrator-funktioner.
Här är koden som implementerar orchestrator-funktionen:
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
root_directory: str = context.get_input()
if not root_directory:
raise Exception("A directory path is required as input")
files = yield context.call_activity("E2_GetFileList", root_directory)
tasks = []
for file in files:
tasks.append(context.call_activity("E2_CopyFileToBlob", file))
results = yield context.task_all(tasks)
total_bytes = sum(results)
return total_bytes
main = df.Orchestrator.create(orchestrator_function)
Lägg märke till yield context.task_all(tasks); raden. Alla enskilda anrop till E2_CopyFileToBlob funktionen har inte getts, vilket gör att de kan köras parallellt. När vi skickar den här uppgiftsmatrisen till context.task_allfår vi tillbaka en uppgift som inte slutförs förrän alla kopieringsåtgärder har slutförts. Om du är bekant med asyncio.gather i Python är detta inte nytt för dig. Skillnaden är att dessa uppgifter kan köras på flera virtuella datorer samtidigt, och Durable Functions-tillägget säkerställer att körningen från slutpunkt till slutpunkt är motståndskraftig mot processåtervinning.
Kommentar
Även om uppgifter konceptuellt liknar Python-väntande funktioner, bör orchestrator-funktioner använda yield samt API:erna context.task_all och context.task_any för att hantera uppgiftsparallellisering.
När vi har gett från context.task_allvet vi att alla funktionsanrop har slutförts och har returnerat värden tillbaka till oss. Varje anrop till E2_CopyFileToBlob returnerar antalet uppladdade byte, så att vi kan beräkna det totala antalet byte genom att lägga till alla returvärden tillsammans.
Aktivitetsfunktioner för hjälp
Hjälpaktivitetsfunktionerna är precis som med andra exempel bara vanliga funktioner som använder utlösarbindningen activityTrigger .
import os
from os.path import dirname
from typing import List
def main(rootDirectory: str) -> List[str]:
all_file_paths = []
# We walk the file system
for path, _, files in os.walk(rootDirectory):
# We copy the code for activities and orchestrators
if "E2_" in path:
# For each file, we add their full-path to the list
for name in files:
if name == "__init__.py" or name == "function.json":
file_path = os.path.join(path, name)
all_file_paths.append(file_path)
return all_file_paths
Kommentar
Du kanske undrar varför du inte bara kunde placera koden direkt i orchestrator-funktionen. Det kan du, men detta skulle bryta mot en av de grundläggande reglerna för orkestreringsfunktioner, vilket är att de aldrig ska göra I/O, inklusive lokal filsystemåtkomst. Mer information finns i Begränsningar för Orchestrator-funktionskod.
[FunctionName("E2_CopyFileToBlob")]
public static async Task<long> CopyFileToBlob(
[ActivityTrigger] string filePath,
Binder binder,
ILogger log)
{
long byteCount = new FileInfo(filePath).Length;
// strip the drive letter prefix and convert to forward slashes
string blobPath = filePath
.Substring(Path.GetPathRoot(filePath).Length)
.Replace('\\', '/');
string outputLocation = $"backups/{blobPath}";
log.LogInformation($"Copying '{filePath}' to '{outputLocation}'. Total bytes = {byteCount}.");
// copy the file contents into a blob
using (Stream source = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read))
using (Stream destination = await binder.BindAsync<CloudBlobStream>(
new BlobAttribute(outputLocation, FileAccess.Write)))
{
await source.CopyToAsync(destination);
}
return byteCount;
}
Kommentar
Du måste installera Microsoft.Azure.WebJobs.Extensions.Storage NuGet-paketet för att köra exempelkoden.
Funktionen använder vissa avancerade funktioner i Azure Functions-bindningar (dvs. användningen av parameternBinder), men du behöver inte bekymra dig om informationen i den här genomgången.
Den function.json filen för E2_CopyFileToBlob är lika enkel:
Python-implementeringen använder Azure Storage SDK för Python för att ladda upp filerna till Azure Blob Storage.
import os
import pathlib
from azure.storage.blob import BlobServiceClient
from azure.core.exceptions import ResourceExistsError
connect_str = os.getenv('AzureWebJobsStorage')
def main(filePath: str) -> str:
# Create the BlobServiceClient object which will be used to create a container client
blob_service_client = BlobServiceClient.from_connection_string(connect_str)
# Create a unique name for the container
container_name = "backups"
# Create the container if it does not exist
try:
blob_service_client.create_container(container_name)
except ResourceExistsError:
pass
# Create a blob client using the local file name as the name for the blob
parent_dir, fname = pathlib.Path(filePath).parts[-2:] # Get last two path components
blob_name = parent_dir + "_" + fname
blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
# Count bytes in file
byte_count = os.path.getsize(filePath)
# Upload the created file
with open(filePath, "rb") as data:
blob_client.upload_blob(data)
return byte_count
Implementeringen läser in filen från disken och strömmar asynkront innehållet till en blob med samma namn i containern "säkerhetskopior". Returvärdet är antalet byte som kopieras till lagringen, som sedan används av orchestrator-funktionen för att beräkna den aggregerade summan.
Kommentar
Det här är ett perfekt exempel på hur du flyttar I/O-åtgärder till en activityTrigger funktion. Arbetet kan inte bara distribueras på många olika datorer, utan du får också fördelarna med att kontrollera förloppet. Om värdprocessen avslutas av någon anledning vet du vilka uppladdningar som redan har slutförts.
Kör exemplet
Du kan starta orkestreringen i Windows genom att skicka följande HTTP POST-begäran.
POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20
"D:\\home\\LogFiles"
Du kan också starta orkestreringen på en Linux-funktionsapp (Python körs för närvarande endast på Linux för App Service):
POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20
"/home/site/wwwroot"
Kommentar
Funktionen HttpStart som du anropar fungerar bara med JSON-formaterat innehåll. Därför Content-Type: application/json krävs rubriken och katalogsökvägen kodas som en JSON-sträng. Dessutom förutsätter HTTP-kodfragment att det finns en post i host.json filen som tar bort standardprefixet api/ från alla URL:er för HTTP-utlösare. Du hittar markering för den här konfigurationen host.json i filen i exemplen.
Den här HTTP-begäran utlöser orkestratorn E2_BackupSiteContent och skickar strängen D:\home\LogFiles som en parameter. Svaret innehåller en länk för att hämta status för säkerhetskopieringsåtgärden:
Beroende på hur många loggfiler du har i funktionsappen kan det ta flera minuter att slutföra den här åtgärden. Du kan få den senaste statusen genom att fråga URL:en i rubriken för Location föregående HTTP 202-svar.
GET http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
I det här fallet körs funktionen fortfarande. Du kan se indata som sparades i orkestreringstillståndet och den senaste uppdaterade tiden. Du kan fortsätta att använda Location huvudvärdena för att söka efter slutförande. När statusen är "Slutförd" visas ett HTTP-svarsvärde som liknar följande:
HTTP/1.1 200 OK
Content-Length: 152
Content-Type: application/json; charset=utf-8
{"runtimeStatus":"Completed","input":"D:\\home\\LogFiles","output":452071,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:26Z"}
Nu kan du se att orkestreringen är klar och ungefär hur lång tid det tog att slutföra. Du ser också ett värde för fältet output , vilket anger att cirka 450 kB loggar har laddats upp.
Nästa steg
Det här exemplet har visat hur du implementerar mönstret-out/-in. Nästa exempel visar hur du implementerar övervakningsmönstret med hjälp av varaktiga timers.