Создание службы очередей
Служба очередей — это отличный пример длительно выполняемой службы, где рабочие элементы могут помещаться в очередь и обрабатываться последовательно по мере завершения предыдущих рабочих элементов. Опираясь на шаблон рабочей службы, вы создаете новые функциональные возможности на основе BackgroundServiceшаблона рабочей службы.
В этом руководстве описано следующее:
- Создание службы очередей.
- Делегирование работы в очередь задач.
- Регистрация прослушивателя ключа консоли из событий IHostApplicationLifetime.
Совет
Весь пример исходного кода "Рабочие роли в .NET" доступен для скачивания в Обозревателе примеров. Дополнительные сведения см. в разделе Обзор примеров кода: рабочие роли в .NET.
Необходимые компоненты
- Пакет SDK для .NET 8.0 или более поздней версии
- Интегрированная среда разработки .NET (IDE)
- Можно использовать Visual Studio
Создание нового проекта
Чтобы создать новый проект службы рабочей роли с помощью Visual Studio, выберите Файл>Создать>Проект. В диалоговом окне Создание нового проекта выполните поиск по запросу "служба рабочей роли" и выберите шаблон "Служба рабочей роли". Если вы предпочитаете использовать .NET CLI, откройте используемый терминал в рабочем каталоге. Выполните команду dotnet new
и замените <Project.Name>
именем проекта.
dotnet new worker --name <Project.Name>
Дополнительные сведения о команде .NET CLI для создания проекта службы рабочей роли см. здесь.
Совет
Если вы используете Visual Studio Code, вы можете выполнять команды .NET CLI из интегрированного терминала. Дополнительные сведения см. в статье Visual Studio Code: интегрированный терминал.
Создание служб очередей
Возможно, вам знакомы функциональные возможности QueueBackgroundWorkItem(Func<CancellationToken,Task>) из пространства имен System.Web.Hosting
.
Совет
Функции System.Web
пространства имен намеренно не переносились в .NET и остаются эксклюзивными для платформа .NET Framework. Дополнительные сведения см. в статье "Начало работы с добавочными ASP.NET для миграции ASP.NET Core".
В .NET для моделирования службы, вдохновленной функциональностью, начните с QueueBackgroundWorkItem
добавления IBackgroundTaskQueue
интерфейса в проект:
namespace App.QueueService;
public interface IBackgroundTaskQueue
{
ValueTask QueueBackgroundWorkItemAsync(
Func<CancellationToken, ValueTask> workItem);
ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(
CancellationToken cancellationToken);
}
Существует два метода, один из которых предоставляет функциональные возможности очереди, а другой выводит из очереди рабочие элементы, которые были добавлены в нее ранее. Рабочий элемент — Func<CancellationToken, ValueTask>
. Затем добавьте в проект реализацию по умолчанию.
using System.Threading.Channels;
namespace App.QueueService;
public sealed class DefaultBackgroundTaskQueue : IBackgroundTaskQueue
{
private readonly Channel<Func<CancellationToken, ValueTask>> _queue;
public DefaultBackgroundTaskQueue(int capacity)
{
BoundedChannelOptions options = new(capacity)
{
FullMode = BoundedChannelFullMode.Wait
};
_queue = Channel.CreateBounded<Func<CancellationToken, ValueTask>>(options);
}
public async ValueTask QueueBackgroundWorkItemAsync(
Func<CancellationToken, ValueTask> workItem)
{
ArgumentNullException.ThrowIfNull(workItem);
await _queue.Writer.WriteAsync(workItem);
}
public async ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(
CancellationToken cancellationToken)
{
Func<CancellationToken, ValueTask>? workItem =
await _queue.Reader.ReadAsync(cancellationToken);
return workItem;
}
}
Предыдущая реализация использует в качестве очереди Channel<T>. BoundedChannelOptions(Int32) вызывается с явным значением емкости. Емкость должна быть установлена на основе ожидаемой загрузки приложения и количества параллельных потоков, обращающихся к очереди. BoundedChannelFullMode.Wait вызывает вызовы возвращать ChannelWriter<T>.WriteAsync задачу, которая завершается только после того, как пространство становится доступным. Что приводит к обратному выражению, в случае, если слишком много издателей или вызовов начинают накапливаться.
Повторное создание класса рабочей роли
В следующем примере QueueHostedService
:
- Метод
ProcessTaskQueueAsync
возвращает значение Task вExecuteAsync
. - Фоновые задачи в очереди выводятся из очереди и выполняются в
ProcessTaskQueueAsync
: - Рабочие элементы ожидают остановки службы через
StopAsync
.
Замените существующий класс Worker
следующим кодом C# и переименуйте файл в QueueHostedService.cs.
namespace App.QueueService;
public sealed class QueuedHostedService(
IBackgroundTaskQueue taskQueue,
ILogger<QueuedHostedService> logger) : BackgroundService
{
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
logger.LogInformation("""
{Name} is running.
Tap W to add a work item to the
background queue.
""",
nameof(QueuedHostedService));
return ProcessTaskQueueAsync(stoppingToken);
}
private async Task ProcessTaskQueueAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
Func<CancellationToken, ValueTask>? workItem =
await taskQueue.DequeueAsync(stoppingToken);
await workItem(stoppingToken);
}
catch (OperationCanceledException)
{
// Prevent throwing if stoppingToken was signaled
}
catch (Exception ex)
{
logger.LogError(ex, "Error occurred executing task work item.");
}
}
}
public override async Task StopAsync(CancellationToken stoppingToken)
{
logger.LogInformation(
$"{nameof(QueuedHostedService)} is stopping.");
await base.StopAsync(stoppingToken);
}
}
Служба MonitorLoop
обрабатывает задачи постановки в очередь для размещенной службы при выборе на устройстве ввода ключа w
:
- В службу
MonitorLoop
внедряетсяIBackgroundTaskQueue
. IBackgroundTaskQueue.QueueBackgroundWorkItemAsync
вызывается для постановки рабочего элемента в очередь:- Рабочий элемент имитирует долго выполняющуюся фоновую задачу:
- Выполняется три 5-секундных задержки Delay.
- Оператор
try-catch
перехватывается OperationCanceledException, если задача отменена.
namespace App.QueueService;
public sealed class MonitorLoop(
IBackgroundTaskQueue taskQueue,
ILogger<MonitorLoop> logger,
IHostApplicationLifetime applicationLifetime)
{
private readonly CancellationToken _cancellationToken = applicationLifetime.ApplicationStopping;
public void StartMonitorLoop()
{
logger.LogInformation($"{nameof(MonitorAsync)} loop is starting.");
// Run a console user input loop in a background thread
Task.Run(async () => await MonitorAsync());
}
private async ValueTask MonitorAsync()
{
while (!_cancellationToken.IsCancellationRequested)
{
var keyStroke = Console.ReadKey();
if (keyStroke.Key == ConsoleKey.W)
{
// Enqueue a background work item
await taskQueue.QueueBackgroundWorkItemAsync(BuildWorkItemAsync);
}
}
}
private async ValueTask BuildWorkItemAsync(CancellationToken token)
{
// Simulate three 5-second tasks to complete
// for each enqueued work item
int delayLoop = 0;
var guid = Guid.NewGuid();
logger.LogInformation("Queued work item {Guid} is starting.", guid);
while (!token.IsCancellationRequested && delayLoop < 3)
{
try
{
await Task.Delay(TimeSpan.FromSeconds(5), token);
}
catch (OperationCanceledException)
{
// Prevent throwing if the Delay is cancelled
}
++ delayLoop;
logger.LogInformation("Queued work item {Guid} is running. {DelayLoop}/3", guid, delayLoop);
}
if (delayLoop is 3)
{
logger.LogInformation("Queued Background Task {Guid} is complete.", guid);
}
else
{
logger.LogInformation("Queued Background Task {Guid} was cancelled.", guid);
}
}
}
Замените существующее содержимое Program
следующим кодом C#:
using App.QueueService;
HostApplicationBuilder builder = Host.CreateApplicationBuilder(args);
builder.Services.AddSingleton<MonitorLoop>();
builder.Services.AddHostedService<QueuedHostedService>();
builder.Services.AddSingleton<IBackgroundTaskQueue>(_ =>
{
if (!int.TryParse(builder.Configuration["QueueCapacity"], out var queueCapacity))
{
queueCapacity = 100;
}
return new DefaultBackgroundTaskQueue(queueCapacity);
});
IHost host = builder.Build();
MonitorLoop monitorLoop = host.Services.GetRequiredService<MonitorLoop>()!;
monitorLoop.StartMonitorLoop();
host.Run();
Службы зарегистрированы в (Program.cs). Размещенная служба регистрируется с использованием метода расширения AddHostedService
. MonitorLoop
запускается в инструкции Program.cs верхнего уровня.
MonitorLoop monitorLoop = host.Services.GetRequiredService<MonitorLoop>()!;
monitorLoop.StartMonitorLoop();
Дополнительные сведения о регистрации служб см. в статье Внедрение зависимостей в .NET.
Проверка функциональности службы
Чтобы запустить приложение из Visual Studio, нажмите клавишу F5 или выберите в меню Отладка>Начать отладку. Если вы используете .NET CLI, выполните команду dotnet run
из рабочего каталога:
dotnet run
Дополнительные сведения о выполнении команды .NET CLI см. в статье dotnet run.
При появлении запроса введите w
(или W
) по крайней мере один раз в очередь эмулированного рабочего элемента, как показано в примере выходных данных:
info: App.QueueService.MonitorLoop[0]
MonitorAsync loop is starting.
info: App.QueueService.QueuedHostedService[0]
QueuedHostedService is running.
Tap W to add a work item to the background queue.
info: Microsoft.Hosting.Lifetime[0]
Application started. Press Ctrl+C to shut down.
info: Microsoft.Hosting.Lifetime[0]
Hosting environment: Development
info: Microsoft.Hosting.Lifetime[0]
Content root path: .\queue-service
winfo: App.QueueService.MonitorLoop[0]
Queued work item 8453f845-ea4a-4bcb-b26e-c76c0d89303e is starting.
info: App.QueueService.MonitorLoop[0]
Queued work item 8453f845-ea4a-4bcb-b26e-c76c0d89303e is running. 1/3
info: App.QueueService.MonitorLoop[0]
Queued work item 8453f845-ea4a-4bcb-b26e-c76c0d89303e is running. 2/3
info: App.QueueService.MonitorLoop[0]
Queued work item 8453f845-ea4a-4bcb-b26e-c76c0d89303e is running. 3/3
info: App.QueueService.MonitorLoop[0]
Queued Background Task 8453f845-ea4a-4bcb-b26e-c76c0d89303e is complete.
info: Microsoft.Hosting.Lifetime[0]
Application is shutting down...
info: App.QueueService.QueuedHostedService[0]
QueuedHostedService is stopping.
Если приложение запускается из Visual Studio, выберите Отладка>Остановить отладку. Кроме того, можно нажать клавиши CTRL + C в окне консоли, чтобы сообщить об отмене.