Scenariusz fan-out/fan-in w usłudze Durable Functions — przykład tworzenia kopii zapasowej w chmurze
Artykuł
Fan-out/fan-in odnosi się do wzorca wykonywania wielu funkcji jednocześnie, a następnie wykonywania agregacji na wynikach. W tym artykule wyjaśniono przykład, który używa rozszerzenia Durable Functions do implementowania scenariusza fan-in/fan-out. Przykład jest trwałą funkcją, która tworzy kopię zapasową całej zawartości witryny aplikacji lub jej zawartości w usłudze Azure Storage.
Uwaga
Wersja 4 modelu programowania Node.js dla usługi Azure Functions jest ogólnie dostępna. Nowy model w wersji 4 został zaprojektowany z myślą o bardziej elastycznym i intuicyjnym środowisku dla deweloperów języków JavaScript i TypeScript. Dowiedz się więcej o różnicach między wersjami 3 i v4 w przewodniku migracji.
W poniższych fragmentach kodu javaScript (PM4) oznacza model programowania W wersji 4, nowe środowisko.
W tym przykładzie funkcje przekazują wszystkie pliki w określonym katalogu rekursywnie do magazynu obiektów blob. Zliczają również łączną liczbę przekazanych bajtów.
Można napisać jedną funkcję, która zajmuje się wszystkim. Głównym problemem, który można napotkać, jest skalowalność. Wykonywanie jednej funkcji może być uruchamiane tylko na jednej maszynie wirtualnej, więc przepływność będzie ograniczona przez przepływność tej pojedynczej maszyny wirtualnej. Innym problemem jest niezawodność. Jeśli wystąpił błąd w połowie procesu lub cały proces trwa dłużej niż 5 minut, tworzenie kopii zapasowej może zakończyć się niepowodzeniem w stanie częściowo ukończonym. Następnie należy go ponownie uruchomić.
Bardziej niezawodną metodą byłoby zapisanie dwóch regularnych funkcji: jeden wylicza pliki i dodaje nazwy plików do kolejki, a drugi odczytuje z kolejki i przekazuje pliki do magazynu obiektów blob. Takie podejście jest lepsze pod względem przepływności i niezawodności, ale wymaga aprowizacji kolejki i zarządzania nią. Co ważniejsze, znacząca złożoność jest wprowadzana w zakresie zarządzania stanem i koordynacji , jeśli chcesz zrobić coś więcej, na przykład zgłosić łączną liczbę przekazanych bajtów.
Podejście Durable Functions zapewnia wszystkie wymienione korzyści z bardzo niskim obciążeniem.
Funkcje
W tym artykule opisano następujące funkcje w przykładowej aplikacji:
E2_BackupSiteContent: Funkcja orkiestratora, która wywołuje E2_GetFileList metodę uzyskiwania listy plików do utworzenia kopii zapasowej, a następnie wywołuje E2_CopyFileToBlob kopię zapasową każdego pliku.
[FunctionName("E2_BackupSiteContent")]
public static async Task<long> Run(
[OrchestrationTrigger] IDurableOrchestrationContext backupContext)
{
string rootDirectory = backupContext.GetInput<string>()?.Trim();
if (string.IsNullOrEmpty(rootDirectory))
{
rootDirectory = Directory.GetParent(typeof(BackupSiteContent).Assembly.Location).FullName;
}
string[] files = await backupContext.CallActivityAsync<string[]>(
"E2_GetFileList",
rootDirectory);
var tasks = new Task<long>[files.Length];
for (int i = 0; i < files.Length; i++)
{
tasks[i] = backupContext.CallActivityAsync<long>(
"E2_CopyFileToBlob",
files[i]);
}
await Task.WhenAll(tasks);
long totalBytes = tasks.Sum(t => t.Result);
return totalBytes;
}
Zwróć uwagę na await Task.WhenAll(tasks); wiersz. Wszystkie poszczególne wywołania E2_CopyFileToBlob funkcji nie były oczekiwane, co umożliwia ich równoległe uruchamianie. Gdy przekażemy tę tablicę zadań do programu , wrócimy do Task.WhenAllzadania, które nie zostanie ukończone, dopóki wszystkie operacje kopiowania nie zostaną ukończone. Jeśli znasz bibliotekę równoległą zadań (TPL) na platformie .NET, to nie jest to dla Ciebie nowość. Różnica polega na tym, że te zadania mogą być uruchomione na wielu maszynach wirtualnych jednocześnie, a rozszerzenie Durable Functions gwarantuje, że kompleksowe wykonywanie jest odporne na odtwarzanie procesów.
Po oczekiwaniu na polecenie Task.WhenAllwiemy, że wszystkie wywołania funkcji zostały ukończone i zwróciły do nas wartości. Każde wywołanie funkcji zwraca E2_CopyFileToBlob liczbę przekazanych bajtów, dlatego obliczenie łącznej liczby bajtów jest kwestią dodania wszystkich tych zwracanych wartości.
Funkcja używa standardowego function.json dla funkcji orkiestratora.
const df = require("durable-functions");
module.exports = df.orchestrator(function* (context) {
const rootDirectory = context.df.getInput();
if (!rootDirectory) {
throw new Error("A directory path is required as an input.");
}
const files = yield context.df.callActivity("E2_GetFileList", rootDirectory);
// Backup Files and save Promises into array
const tasks = [];
for (const file of files) {
tasks.push(context.df.callActivity("E2_CopyFileToBlob", file));
}
// wait for all the Backup Files Activities to complete, sum total bytes
const results = yield context.df.Task.all(tasks);
const totalBytes = results.reduce((prev, curr) => prev + curr, 0);
// return results;
return totalBytes;
});
Zwróć uwagę na yield context.df.Task.all(tasks); wiersz. Wszystkie poszczególne wywołania E2_CopyFileToBlob funkcji nie zostały zwrócone, co umożliwia ich równoległe uruchamianie. Gdy przekażemy tę tablicę zadań do programu , wrócimy do context.df.Task.allzadania, które nie zostanie ukończone, dopóki wszystkie operacje kopiowania nie zostaną ukończone. Jeśli znasz Promise.all język JavaScript, nie jest to dla Ciebie nowości. Różnica polega na tym, że te zadania mogą być uruchomione na wielu maszynach wirtualnych jednocześnie, a rozszerzenie Durable Functions gwarantuje, że kompleksowe wykonywanie jest odporne na odtwarzanie procesów.
Uwaga
Chociaż zadania są koncepcyjnie podobne do obietnic języka JavaScript, funkcje orkiestratora powinny używać context.df.Task.allcontext.df.Task.any funkcji i zamiast Promise.all i Promise.race do zarządzania równoległościami zadań.
Po dokonaniu zwrotu z context.df.Task.allparametru wiemy, że wszystkie wywołania funkcji zostały ukończone i zwróciły nam wartości. Każde wywołanie funkcji zwraca E2_CopyFileToBlob liczbę przekazanych bajtów, dlatego obliczenie łącznej liczby bajtów jest kwestią dodania wszystkich tych zwracanych wartości.
Oto kod implementujący funkcję orkiestratora:
const df = require("durable-functions");
const path = require("path");
const getFileListActivityName = "getFileList";
const copyFileToBlobActivityName = "copyFileToBlob";
df.app.orchestration("backupSiteContent", function* (context) {
const rootDir = context.df.getInput();
if (!rootDir) {
throw new Error("A directory path is required as an input.");
}
const rootDirAbs = path.resolve(rootDir);
const files = yield context.df.callActivity(getFileListActivityName, rootDirAbs);
// Backup Files and save Tasks into array
const tasks = [];
for (const file of files) {
const input = {
backupPath: path.relative(rootDirAbs, file).replace("\\", "/"),
filePath: file,
};
tasks.push(context.df.callActivity(copyFileToBlobActivityName, input));
}
// wait for all the Backup Files Activities to complete, sum total bytes
const results = yield context.df.Task.all(tasks);
const totalBytes = results ? results.reduce((prev, curr) => prev + curr, 0) : 0;
// return results;
return totalBytes;
});
Zwróć uwagę na yield context.df.Task.all(tasks); wiersz. Wszystkie poszczególne wywołania copyFileToBlob funkcji nie zostały zwrócone, co umożliwia ich równoległe uruchamianie. Gdy przekażemy tę tablicę zadań do programu , wrócimy do context.df.Task.allzadania, które nie zostanie ukończone, dopóki wszystkie operacje kopiowania nie zostaną ukończone. Jeśli znasz Promise.all język JavaScript, nie jest to dla Ciebie nowości. Różnica polega na tym, że te zadania mogą być uruchomione na wielu maszynach wirtualnych jednocześnie, a rozszerzenie Durable Functions gwarantuje, że kompleksowe wykonywanie jest odporne na odtwarzanie procesów.
Uwaga
Chociaż zadania są koncepcyjnie podobne do obietnic języka JavaScript, funkcje orkiestratora powinny używać context.df.Task.allcontext.df.Task.any funkcji i zamiast Promise.all i Promise.race do zarządzania równoległościami zadań.
Po dokonaniu zwrotu z context.df.Task.allparametru wiemy, że wszystkie wywołania funkcji zostały ukończone i zwróciły nam wartości. Każde wywołanie funkcji zwraca copyFileToBlob liczbę przekazanych bajtów, dlatego obliczenie łącznej liczby bajtów jest kwestią dodania wszystkich tych zwracanych wartości.
Funkcja używa standardowego function.json dla funkcji orkiestratora.
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
root_directory: str = context.get_input()
if not root_directory:
raise Exception("A directory path is required as input")
files = yield context.call_activity("E2_GetFileList", root_directory)
tasks = []
for file in files:
tasks.append(context.call_activity("E2_CopyFileToBlob", file))
results = yield context.task_all(tasks)
total_bytes = sum(results)
return total_bytes
main = df.Orchestrator.create(orchestrator_function)
Zwróć uwagę na yield context.task_all(tasks); wiersz. Wszystkie poszczególne wywołania E2_CopyFileToBlob funkcji nie zostały zwrócone, co umożliwia ich równoległe uruchamianie. Gdy przekażemy tę tablicę zadań do programu , wrócimy do context.task_allzadania, które nie zostanie ukończone, dopóki wszystkie operacje kopiowania nie zostaną ukończone. Jeśli znasz asyncio.gather język Python, nie jest to dla Ciebie nowości. Różnica polega na tym, że te zadania mogą być uruchomione na wielu maszynach wirtualnych jednocześnie, a rozszerzenie Durable Functions gwarantuje, że kompleksowe wykonywanie jest odporne na odtwarzanie procesów.
Uwaga
Chociaż zadania są koncepcyjnie podobne do funkcji oczekujących w języku Python, funkcje orkiestratora powinny być używane yield , a także context.task_all interfejsy API i context.task_any do zarządzania równoległymi zadaniami.
Po dokonaniu zwrotu z context.task_allparametru wiemy, że wszystkie wywołania funkcji zostały ukończone i zwróciły nam wartości. Każde wywołanie metody zwraca E2_CopyFileToBlob liczbę przekazanych bajtów, dzięki czemu możemy obliczyć łączną liczbę bajtów przez dodanie wszystkich zwracanych wartości.
Funkcje działania pomocnika
Funkcje działania pomocnika, podobnie jak w przypadku innych przykładów, są tylko zwykłymi funkcjami korzystającymi z activityTrigger powiązania wyzwalacza.
import os
from os.path import dirname
from typing import List
def main(rootDirectory: str) -> List[str]:
all_file_paths = []
# We walk the file system
for path, _, files in os.walk(rootDirectory):
# We copy the code for activities and orchestrators
if "E2_" in path:
# For each file, we add their full-path to the list
for name in files:
if name == "__init__.py" or name == "function.json":
file_path = os.path.join(path, name)
all_file_paths.append(file_path)
return all_file_paths
Uwaga
Być może zastanawiasz się, dlaczego nie można po prostu umieścić tego kodu bezpośrednio w funkcji orkiestratora. Można jednak przerwać jedną z podstawowych reguł funkcji orkiestratora, która polega na tym, że nigdy nie powinny wykonywać operacji we/wy, w tym lokalnego dostępu do systemu plików. Aby uzyskać więcej informacji, zobacz Ograniczenia kodu funkcji programu Orchestrator.
[FunctionName("E2_CopyFileToBlob")]
public static async Task<long> CopyFileToBlob(
[ActivityTrigger] string filePath,
Binder binder,
ILogger log)
{
long byteCount = new FileInfo(filePath).Length;
// strip the drive letter prefix and convert to forward slashes
string blobPath = filePath
.Substring(Path.GetPathRoot(filePath).Length)
.Replace('\\', '/');
string outputLocation = $"backups/{blobPath}";
log.LogInformation($"Copying '{filePath}' to '{outputLocation}'. Total bytes = {byteCount}.");
// copy the file contents into a blob
using (Stream source = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read))
using (Stream destination = await binder.BindAsync<CloudBlobStream>(
new BlobAttribute(outputLocation, FileAccess.Write)))
{
await source.CopyToAsync(destination);
}
return byteCount;
}
Uwaga
Aby uruchomić przykładowy kod, należy zainstalować Microsoft.Azure.WebJobs.Extensions.Storage pakiet NuGet.
Funkcja używa niektórych zaawansowanych funkcji powiązań usługi Azure Functions (czyli użycia parametruBinder), ale nie musisz martwić się o te szczegóły na potrzeby tego przewodnika.
Plik function.json dla E2_CopyFileToBlob programu jest podobnie prosty:
Implementacja języka JavaScript używa powiązania wyjściowego copyFileToBlob usługi Azure Storage w celu przekazania plików do usługi Azure Blob Storage.
import os
import pathlib
from azure.storage.blob import BlobServiceClient
from azure.core.exceptions import ResourceExistsError
connect_str = os.getenv('AzureWebJobsStorage')
def main(filePath: str) -> str:
# Create the BlobServiceClient object which will be used to create a container client
blob_service_client = BlobServiceClient.from_connection_string(connect_str)
# Create a unique name for the container
container_name = "backups"
# Create the container if it does not exist
try:
blob_service_client.create_container(container_name)
except ResourceExistsError:
pass
# Create a blob client using the local file name as the name for the blob
parent_dir, fname = pathlib.Path(filePath).parts[-2:] # Get last two path components
blob_name = parent_dir + "_" + fname
blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
# Count bytes in file
byte_count = os.path.getsize(filePath)
# Upload the created file
with open(filePath, "rb") as data:
blob_client.upload_blob(data)
return byte_count
Implementacja ładuje plik z dysku i asynchronicznie przesyła strumieniowo zawartość do obiektu blob o tej samej nazwie w kontenerze "kopie zapasowe". Wartość zwracana jest liczbą bajtów skopiowanych do magazynu, która jest następnie używana przez funkcję orkiestratora do obliczenia sumy agregującej.
Uwaga
Jest to doskonały przykład przenoszenia operacji we/wy do activityTrigger funkcji. Praca może być dystrybuowana nie tylko na wielu różnych maszynach, ale także korzyści wynikające z tworzenia punktów kontrolnych postępu. Jeśli proces hosta zostanie przerwany z jakiegokolwiek powodu, wiesz, które przekazywanie zostało już ukończone.
Uruchamianie aplikacji przykładowej
Orkiestrację można uruchomić w systemie Windows, wysyłając następujące żądanie HTTP POST.
POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20
"D:\\home\\LogFiles"
Alternatywnie w aplikacji funkcji systemu Linux (język Python działa obecnie tylko w systemie Linux dla usługi App Service), możesz uruchomić aranżację w następujący sposób:
POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20
"/home/site/wwwroot"
Uwaga
Wywoływana HttpStart funkcja działa tylko z zawartością sformatowaną w formacie JSON. Z tego powodu Content-Type: application/json nagłówek jest wymagany, a ścieżka katalogu jest kodowana jako ciąg JSON. Ponadto fragment kodu HTTP zakłada, że w host.json pliku znajduje się wpis, który usuwa domyślny api/ prefiks ze wszystkich adresów URL funkcji wyzwalacza HTTP. Znaczniki dla tej konfiguracji można znaleźć w pliku w host.json przykładach.
To żądanie HTTP wyzwala E2_BackupSiteContent koordynatora i przekazuje ciąg D:\home\LogFiles jako parametr. Odpowiedź zawiera link umożliwiający uzyskanie stanu operacji tworzenia kopii zapasowej:
W zależności od liczby plików dziennika w aplikacji funkcji wykonanie tej operacji może potrwać kilka minut. Najnowszy stan można uzyskać, wysyłając zapytanie o adres URL w Location nagłówku poprzedniej odpowiedzi HTTP 202.
GET http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
W tym przypadku funkcja jest nadal uruchomiona. Możesz zobaczyć dane wejściowe zapisane w stanie orkiestratora i czas ostatniej aktualizacji. Możesz nadal używać wartości nagłówka do sondowania pod kątem Location ukończenia. Gdy stan to "Ukończono", zostanie wyświetlona wartość odpowiedzi HTTP podobna do następującej:
HTTP/1.1 200 OK
Content-Length: 152
Content-Type: application/json; charset=utf-8
{"runtimeStatus":"Completed","input":"D:\\home\\LogFiles","output":452071,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:26Z"}
Teraz możesz zobaczyć, że aranżacja jest ukończona i w przybliżeniu ile czasu zajęło ukończenie. Zostanie również wyświetlona wartość output pola, która wskazuje, że przekazano około 450 KB dzienników.
Następne kroki
W tym przykładzie pokazano, jak zaimplementować wzorzec fan-out/fan-in. W następnym przykładzie pokazano, jak zaimplementować wzorzec monitora przy użyciu trwałych czasomierzy.