Jak zadania są dopasowywane do procesów roboczych
W tym dokumencie opisano rejestrację pracowników, przesyłanie zadań i sposób ich dopasowania do siebie nawzajem.
Rejestracja procesu roboczego
Aby proces roboczy mógł otrzymywać oferty obsługi zadania, należy go najpierw zarejestrować, ustawiając wartość availableForOffers
true. Następnie musimy określić, które kolejki proces roboczy nasłuchuje i które kanały może obsłużyć. Po zarejestrowaniu otrzymasz zdarzenie RouterWorkerRegistered z usługi Event Grid, a stan procesu roboczego zostanie zmieniony na active
.
W poniższym przykładzie rejestrujemy proces roboczy w następujący sposób:
- Nasłuchiwanie i
queue-1
queue-2
- Obsługa kanałów głosowych i czatów. W takim przypadku proces roboczy może wykonać jedno
voice
zadanie jednocześnie lub dwachat
zadania. To ustawienie jest konfigurowane przez określenie całkowitej pojemności procesu roboczego i przypisanie kosztu zadania dla każdego kanału. - Zestaw etykiet opisujących elementy procesu roboczego, które mogą pomóc w ustaleniu, czy jest to dopasowanie do określonego zadania.
var worker = await client.CreateWorkerAsync(new CreateWorkerOptions(workerId: "worker-1", capacity: 2)
{
AvailableForOffers = true,
Queues = { "queue1", "queue2" },
Channels =
{
new RouterChannel(channelId: "voice", capacityCostPerJob: 2),
new RouterChannel(channelId: "chat", capacityCostPerJob: 1)
},
Labels =
{
["Skill"] = new RouterValue(11),
["English"] = new RouterValue(true),
["French"] = new RouterValue(false),
["Vendor"] = new RouterValue("Acme")
}
});
let worker = await client.path("/routing/workers/{workerId}", "worker-1").patch({
body: {
availableForOffers: true,
capacity: 2,
queues: ["queue1", "queue2"],
channels: [
{ channelId: "voice", capacityCostPerJob: 2 },
{ channelId: "chat", capacityCostPerJob: 1 }
],
labels: {
Skill: 11,
English: true,
French: false,
Vendor: "Acme"
}
},
contentType: "application/merge-patch+json"
});
worker = client.upsert_worker(
worker_id = "worker-1",
available_for_offers = True,
capacity = 2,
queues = ["queue1", "queue2"],
channels = [
RouterChannel(channel_id = "voice", capacity_cost_per_job = 2),
RouterChannel(channel_id = "chat", capacity_cost_per_job = 1)
],
labels = {
"Skill": 11,
"English": True,
"French": False,
"Vendor": "Acme"
}
)
RouterWorker worker = client.createWorker(new CreateWorkerOptions("worker-1", 2)
.setAvailableForOffers(true)
.setQueues(List.of("queue1", "queue2"))
.setChannels(List.of(
new RouterChannel("voice", 2),
new RouterChannel("chat", 1)))
.setLabels(Map.of(
"Skill", new RouterValue(11),
"English", new RouterValue(true),
"French", new RouterValue(false),
"Vendor", new RouterValue("Acme"))));
Przesyłanie zadania
W poniższym przykładzie przesyłamy zadanie, które
- Przechodzi bezpośrednio do .
queue1
- Dla kanału
chat
. - Za pomocą selektora procesów roboczych, który określa, że każdy proces roboczy obsługujący to zadanie musi mieć etykietę
English
ustawioną natrue
. - W selektorze procesu roboczego, który określa, że każdy proces roboczy obsługujący to zadanie musi mieć etykietę
Skill
większą niż10
i ten warunek wygaśnie po jednej minucie. - Z etykietą ustawioną
name
naJohn
.
await client.CreateJobAsync(new CreateJobOptions("job1", "chat", "queue1")
{
RequestedWorkerSelectors =
{
new RouterWorkerSelector(key: "English", labelOperator: LabelOperator.Equal, value: new RouterValue(true)),
new RouterWorkerSelector(key: "Skill", labelOperator: LabelOperator.GreaterThan, value: new RouterValue(10))
{ ExpiresAfter = TimeSpan.FromMinutes(5) }
},
Labels = { ["name"] = new RouterValue("John") }
});
await client.path("/routing/jobs/{jobId}", "job1").patch({
body: {
channelId: "chat",
queueId: "queue1",
requestedWorkerSelectors: [
{ key: "English", labelOperator: "equal", value: true },
{ key: "Skill", labelOperator: "greaterThan", value: 10, expiresAfterSeconds: 300 },
],
labels: { name: "John" }
},
contentType: "application/merge-patch+json"
})
client.upsert_job(
job_id = "job1",
channel_id = "chat",
queue_id = "queue1",
requested_worker_selectors = [
RouterWorkerSelector(
key = "English",
label_operator = LabelOperator.EQUAL,
value = True
),
RouterWorkerSelector(
key = "Skill",
label_operator = LabelOperator.GREATER_THAN,
value = True,
expires_after_seconds = 300
)
],
labels = { "name": "John" }
)
client.createJob(new CreateJobOptions("job1", "chat", "queue1")
.setRequestedWorkerSelectors(List.of(
new RouterWorkerSelector("English", LabelOperator.EQUAL, new RouterValue(true)),
new RouterWorkerSelector("Skill", LabelOperator.GREATER_THAN, new RouterValue(10))
.setExpiresAfter(Duration.ofMinutes(5))))
.setLabels(Map.of("name", new RouterValue("John"))));
Router zadań próbuje dopasować to zadanie do dostępnego procesu roboczego nasłuchiwania dla kanału chat
z ustawioną wartością English
true
i Skill
większą niż 10
.queue1
Po utworzeniu dopasowania zostanie utworzona oferta. Zasady dystrybucji dołączone do kolejki kontrolują, ile aktywnych ofert może istnieć dla zadania i jak długo każda oferta jest prawidłowa. Otrzymasz zdarzenie z oferty, które będzie wyglądać następująco:
{
"workerId": "worker-1",
"jobId": "7f1df17b-570b-4ae5-9cf5-fe6ff64cc712",
"channelId": "chat",
"queueId": "queue1",
"offerId": "525fec06-ab81-4e60-b780-f364ed96ade1",
"offerTimeUtc": "2021-06-23T02:43:30.3847144Z",
"expiryTimeUtc": "2021-06-23T02:44:30.3847674Z",
"jobPriority": 1,
"jobLabels": {
"name": "John"
}
}
Zdarzenie offerIssued zawiera szczegółowe informacje o zadaniu, pracowniku, o tym, jak długo oferta jest prawidłowa, oraz informacje offerId
o tym, jak długo trzeba zaakceptować lub odrzucić zadanie.
Uwaga
Maksymalny okres istnienia zadania wynosi 90 dni, po którym automatycznie wygaśnie.
Wyrejestrowanie procesu roboczego
Jeśli proces roboczy chce przestać otrzymywać oferty, można go wyrejestrować, ustawiając wartość na false
wartość AvailableForOffers
podczas aktualizowania procesu roboczego i odbierając zdarzenie RouterWorkerDeregistered z usługi Event Grid. Wszystkie istniejące oferty dla procesu roboczego są odwoływane i otrzymujesz zdarzenie RouterWorkerOfferRevoked dla każdej oferty.
worker.AvailableForOffers = false;
worker = await client.UpdateWorkerAsync(worker);
worker = await client.path("/routing/workers/{workerId}", worker.body.id).patch({
body: { availableForOffers: false },
contentType: "application/merge-patch+json"
});
worker = client.upsert_worker(worker_id = worker.id, available_for_offers = False)
client.updateWorker(worker.getId(), BinaryData.fromObject(worker.setAvailableForOffers(false)), null);
Uwaga
Jeśli proces roboczy jest zarejestrowany i bezczynny przez ponad 7 dni, zostanie on automatycznie wyrejestrowany. Po wyrejestrowanym stanie procesu roboczego jest draining
to, czy co najmniej jedno zadanie jest nadal przydzielone, czy inactive
nie ma przypisanych zadań.