Kanál změn procesů ve službě Azure Blob Storage
Kanál změn poskytuje transakční protokoly všech změn, ke kterým dochází u objektů blob a metadat objektů blob ve vašem účtu úložiště. V tomto článku se dozvíte, jak číst záznamy kanálu změn pomocí knihovny procesoru kanálu změn objektů blob.
Další informace o kanálu změn najdete v tématu Kanál změn ve službě Azure Blob Storage.
Nastavení projektu
Tato část vás provede přípravou projektu pro práci s klientskou knihovnou kanálu změn objektů blob pro .NET.
Instalace balíčků
Z adresáře projektu pomocí příkazu nainstalujte balíček pro klientskou knihovnu dotnet add package
kanálu změn Azure Storage Blobs pro .NET. V tomto příkladu --prerelease
přidáme příznak k příkazu pro instalaci nejnovější verze Preview.
dotnet add package Azure.Storage.Blobs.ChangeFeed --prerelease
Příklady kódu v tomto článku také používají balíčky Azure Blob Storage a Azure Identity .
dotnet add package Azure.Identity
dotnet add package Azure.Storage.Blobs
Přidání using
direktiv
Do souboru kódu přidejte následující using
direktivy:
using Azure.Identity;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.ChangeFeed;
Vytvoření objektu klienta
Pokud chcete aplikaci připojit ke službě Blob Storage, vytvořte instanci BlobServiceClient
třídy. Následující příklad ukazuje, jak vytvořit objekt klienta pro DefaultAzureCredential
autorizaci. Další informace najdete v tématu Autorizace přístupu a připojení ke službě Blob Storage. Pokud chcete pracovat s kanálem změn, potřebujete integrovanou roli Čtenář dat objektů blob služby Storage nebo vyšší role Azure RBAC.
// TODO: Replace <storage-account-name> with the name of your storage account
string accountName = "<storage-account-name>";
BlobServiceClient client = new(
new Uri($"https://{accountName}.blob.core.windows.net"),
new DefaultAzureCredential());
Objekt klienta se předává jako parametr některým metodám uvedeným v tomto článku.
Čtení záznamů v kanálu změn
Poznámka:
Kanál změn je neměnná a jen pro čtení entita ve vašem účtu úložiště. Libovolný počet aplikací může číst a zpracovávat informační kanál změn současně a nezávisle na svých vlastních pohodlích. Záznamy se z kanálu změn neodeberou, když je aplikace přečte. Stav čtení nebo iterace každého čtenáře, který využívá, je nezávislý a spravovaný pouze vaší aplikací.
Následující příklad kódu iteruje všechny záznamy v kanálu změn, přidá je do seznamu a vrátí seznam událostí kanálu změn:
public async Task<List<BlobChangeFeedEvent>> ChangeFeedAsync(BlobServiceClient client)
{
// Create a new BlobChangeFeedClient
BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
List<BlobChangeFeedEvent> changeFeedEvents = [];
// Get all the events in the change feed
await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync())
{
changeFeedEvents.Add(changeFeedEvent);
}
return changeFeedEvents;
}
Následující příklad kódu vytiskne některé hodnoty ze seznamu událostí kanálu změn:
public void showEventData(List<BlobChangeFeedEvent> changeFeedEvents)
{
foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedEvents)
{
string subject = changeFeedEvent.Subject;
string eventType = changeFeedEvent.EventType.ToString();
BlobOperationName operationName = changeFeedEvent.EventData.BlobOperationName;
Console.WriteLine("Subject: " + subject + "\n" +
"Event Type: " + eventType + "\n" +
"Operation: " + operationName.ToString());
}
}
Obnovení čtení záznamů z uložené pozice
Umístění pro čtení můžete uložit v informačním kanálu změn a pak pokračovat v iterování záznamů v budoucnu. Umístění pro čtení můžete uložit získáním kurzoru kanálu změn. Kurzor je řetězec a aplikace může tento řetězec uložit jakýmkoli způsobem, který dává smysl pro návrh vaší aplikace, například do souboru nebo databáze.
Tento příklad prochází všechny záznamy v kanálu změn, přidá je do seznamu a uloží kurzor. Volajícímu se vrátí seznam a kurzor.
public async Task<(string, List<BlobChangeFeedEvent>)> ChangeFeedResumeWithCursorAsync(
BlobServiceClient client,
string cursor)
{
// Get a new change feed client
BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();
IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
.GetChangesAsync(continuationToken: cursor)
.AsPages(pageSizeHint: 10)
.GetAsyncEnumerator();
await enumerator.MoveNextAsync();
foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
{
changeFeedEvents.Add(changeFeedEvent);
}
// Update the change feed cursor. The cursor is not required to get each page of events,
// it's intended to be saved and used to resume iterating at a later date.
cursor = enumerator.Current.ContinuationToken;
return (cursor, changeFeedEvents);
}
Zpracování datových proudů záznamů
Můžete se rozhodnout zpracovávat záznamy kanálu změn při jejich potvrzení do kanálu změn. Viz specifikace. Události změn se publikují v kanálu změn v průměru po dobu 60 sekund. Při zadávání intervalu hlasování doporučujeme dotazovat se na nové změny s ohledem na toto období.
Tento příklad se pravidelně dotazuje na změny. Pokud existují záznamy změn, tento kód tyto záznamy zpracuje a uloží kurzor kanálu změn. Tímto způsobem, pokud je proces zastaven a znovu spuštěn, může aplikace pomocí kurzoru pokračovat ve zpracování záznamů, kde naposledy skončila. Tento příklad uloží kurzor do místního souboru pro demonstrační účely, ale vaše aplikace ji může uložit v libovolném formuláři, který dává pro váš scénář největší smysl.
public async Task ChangeFeedStreamAsync(
BlobServiceClient client,
int waitTimeMs,
string cursor)
{
// Get a new change feed client
BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
while (true)
{
IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
.GetChangesAsync(continuationToken: cursor).AsPages().GetAsyncEnumerator();
while (true)
{
var result = await enumerator.MoveNextAsync();
if (result)
{
foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
{
string subject = changeFeedEvent.Subject;
string eventType = changeFeedEvent.EventType.ToString();
BlobOperationName operationName = changeFeedEvent.EventData.BlobOperationName;
Console.WriteLine("Subject: " + subject + "\n" +
"Event Type: " + eventType + "\n" +
"Operation: " + operationName.ToString());
}
// Helper method to save cursor
SaveCursor(enumerator.Current.ContinuationToken);
}
else
{
break;
}
}
await Task.Delay(waitTimeMs);
}
}
void SaveCursor(string cursor)
{
// Specify the path to the file where you want to save the cursor
string filePath = "path/to/cursor.txt";
// Write the cursor value to the file
File.WriteAllText(filePath, cursor);
}
Čtení záznamů v určitém časovém rozsahu
Můžete číst záznamy, které spadají do určitého časového rozsahu. Tento příklad iteruje všechny záznamy v kanálu změn, které spadají do určitého data a časového rozsahu, přidá je do seznamu a vrátí seznam:
async Task<List<BlobChangeFeedEvent>> ChangeFeedBetweenDatesAsync(BlobServiceClient client)
{
// Get a new change feed client
BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();
// Create the start and end time. The change feed client will round start time down to
// the nearest hour, and round endTime up to the next hour if you provide DateTimeOffsets
// with minutes and seconds.
DateTimeOffset startTime = new DateTimeOffset(2024, 3, 1, 0, 0, 0, TimeSpan.Zero);
DateTimeOffset endTime = new DateTimeOffset(2024, 6, 1, 0, 0, 0, TimeSpan.Zero);
// You can also provide just a start or end time.
await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync(
start: startTime,
end: endTime))
{
changeFeedEvents.Add(changeFeedEvent);
}
return changeFeedEvents;
}
Zadaný počáteční čas se zaokrouhlí dolů na nejbližší hodinu a koncový čas se zaokrouhlí nahoru na nejbližší hodinu. Je možné, že uživatelé můžou vidět události, ke kterým došlo před počátečním časem a po koncovém čase. Je také možné, že se některé události, ke kterým dochází mezi počátečním a koncovým časem, nezobrazí. Důvodem je to, že události můžou být zaznamenány během hodiny předcházející počátečnímu času nebo během hodiny po koncovém čase.