Biblioteca de cliente do Pacote JobRouter de Comunicação do Azure para Python – versão 1.0.0
Este pacote contém um SDK Python para Azure Communication Services para JobRouter. Leia mais sobre Azure Communication Services aqui
Código fonte | Pacote (Pypi) | Documentação do produto
Exclusão de Responsabilidade
O suporte de pacotes Python do SDK do Azure para Python 2.7 terminou a 01 de janeiro de 2022. Para obter mais informações e perguntas, consulte https://github.com/Azure/azure-sdk-for-python/issues/20691
Introdução
Pré-requisitos
Precisa de uma subscrição do Azure e de um Recurso do Serviço de Comunicação para utilizar este pacote.
- O Python 3.7 ou posterior é necessário para utilizar este pacote. Para obter mais detalhes, leia a nossa página na política de suporte de versões do Azure SDK para Python.
- Para criar um novo Serviço de Comunicação, pode utilizar o Portal do Azure, o Azure PowerShell
Instalar o pacote
Instale a biblioteca de cliente Azure Communication JobRouter para Python com pip:
pip install azure-communication-jobrouter
Conceitos-chave
Tarefa
Uma Tarefa representa a unidade de trabalho, que tem de ser encaminhada para uma Função de Trabalho disponível. Um exemplo real disto pode ser uma chamada ou conversa de entrada no contexto de um call center.
Trabalho
Uma Função de Trabalho representa a oferta disponível para processar uma Tarefa. Cada trabalho regista-se com ou mais filas para receber trabalhos. Um exemplo real disto pode ser um agente a trabalhar num call center.
Fila
Uma Fila representa uma lista ordenada de trabalhos à espera de serem servidos por um trabalhador. Os trabalhadores registar-se-ão numa fila para receber trabalho da mesma. Um exemplo real disto pode ser uma fila de chamadas num call center.
Canal
Um Canal representa um agrupamento de tarefas por algum tipo. Quando um trabalhador se regista para receber trabalho, também tem de especificar para que canais podem processar o trabalho e quanto de cada um deles pode processar em simultâneo.
Um exemplo real disto pode ser voice calls
ou chats
num call center.
Oferta
Uma Oferta é expandida pelo JobRouter para uma função de trabalho para processar uma determinada tarefa quando determina uma correspondência, esta notificação é normalmente entregue através do EventGrid. A função de trabalho pode aceitar ou recusar a oferta através da th JobRouter API ou irá expirar de acordo com o tempo de vida configurado na política de distribuição. Um exemplo real disto pode ser o toque de um agente num call center.
Política de Distribuição
Uma Política de Distribuição representa um conjunto de configuração que rege a forma como os trabalhos numa fila são distribuídos aos trabalhadores registados nessa fila. Esta configuração inclui quanto tempo uma Oferta é válida antes de expirar e o modo de distribuição, que define a ordem pela qual os trabalhadores são escolhidos quando existem vários disponíveis.
Modo de Distribuição
Os 3 tipos de modos são
- Round Robin: Os trabalhadores são ordenados por
Id
e o próximo trabalhador depois do anterior que recebeu uma oferta é escolhido. - Longest Idle: a função de trabalho que não trabalha num trabalho há mais tempo.
- Melhor Trabalho: pode especificar uma expressão para comparar 2 trabalhadores para determinar qual escolher.
Etiquetas
Pode anexar etiquetas a trabalhadores, trabalhos e filas. Estes são pares de valores-chave que podem ser de tipos de string
number
dados ou boolean
.
Um exemplo real disto pode ser o nível de competência de uma determinada função de trabalho ou da equipa ou localização geográfica.
Seletores de Etiquetas
Os seletores de etiquetas podem ser anexados a uma tarefa para direcionar um subconjunto de trabalhadores que servem a fila. Um exemplo real disto pode ser uma condição numa chamada recebida que o agente tem de ter um nível mínimo de conhecimento de um determinado produto.
Política de classificação
Uma política de classificação pode ser utilizada para selecionar dinamicamente uma fila, determinar a prioridade da tarefa e anexar seletores de etiquetas de trabalho a uma tarefa ao tirar partido de um motor de regras.
Política de exceção
Uma política de exceção controla o comportamento de uma Tarefa com base num acionador e executa uma ação pretendida. A política de exceção está anexada a uma Fila para que possa controlar o comportamento das Tarefas na Fila.
Exemplos
Inicialização do Cliente
Para inicializar o Cliente SMS, o cadeia de ligação pode ser utilizado para instanciar. Em alternativa, também pode utilizar a autenticação do Active Directory com DefaultAzureCredential.
from azure.communication.jobrouter import (
JobRouterClient,
JobRouterAdministrationClient
)
connection_string = "endpoint=ENDPOINT;accessKey=KEY"
router_client = JobRouterClient.from_connection_string(conn_str = connection_string)
router_admin_client = JobRouterAdministrationClient.from_connection_string(conn_str = connection_string)
Política de Distribuição
Antes de podermos criar uma Fila, precisamos de uma Política de Distribuição.
from azure.communication.jobrouter.models import (
LongestIdleMode,
DistributionPolicy
)
distribution_policy: DistributionPolicy = DistributionPolicy(
offer_expires_after_seconds = 24 * 60 * 60,
mode = LongestIdleMode(
min_concurrent_offers = 1,
max_concurrent_offers = 1
)
)
distribution_policy: DistributionPolicy = router_admin_client.upsert_distribution_policy(
"distribution-policy-1",
distribution_policy
)
Fila
Em seguida, podemos criar a fila.
from azure.communication.jobrouter.models import (
RouterQueue
)
queue: RouterQueue = RouterQueue(
distribution_policy_id = "distribution-policy-1"
)
queue: RouterQueue = router_admin_client.upsert_queue(
"queue-1",
queue
)
Tarefa
Agora, podemos submeter uma tarefa diretamente para essa fila, com um seletor de trabalho a exigir que a função de trabalho tenha a etiqueta Some-Skill
superior a 10.
from azure.communication.jobrouter.models import (
RouterJob,
RouterWorkerSelector,
LabelOperator
)
router_job: RouterJob = RouterJob(
channel_id = "my-channel",
queue_id = "queue-1",
channel_reference = "12345",
priority = 1,
requested_worker_selectors = [
RouterWorkerSelector(key = "Some-Skill", label_operator = LabelOperator.EQUAL, value = 10)
]
)
job: RouterJob = router_client.upsert_job(
"jobId-1",
router_job
)
Trabalho
Agora, registamos um trabalhador para receber trabalho dessa fila, com uma etiqueta igual Some-Skill
a 11.
from azure.communication.jobrouter.models import (
RouterWorker,
RouterChannel
)
router_worker: RouterWorker = RouterWorker(
capacity = 1,
queues = [
"queue-1"
],
labels = {
"Some-Skill": 11
},
channels = [
RouterChannel(channel_id = "my-channel", capacity_cost_per_job = 1)
],
available_for_offers = True
)
worker = router_client.upsert_worker(
"worker-1",
router_worker
)
Oferta
Devemos obter um RouterWorkerOfferIssued a partir da nossa subscrição do EventGrid.
Existem vários serviços do Azure diferentes que atuam como um processador de eventos. Neste cenário, vamos assumir webhooks para entrega de eventos. Saiba mais sobre a entrega de eventos do Webhook
Assim que os eventos forem entregues ao processador de eventos, podemos anular a serialização do payload JSON numa lista de eventos.
# Parse the JSON payload into a list of events
from azure.eventgrid import EventGridEvent
import json
## deserialize payload into a list of typed Events
events = [EventGridEvent.from_json(json.loads(msg)) for msg in payload]
offer_id = ""
for event in events:
if event.event_type == "Microsoft.Communication.RouterWorkerOfferIssued":
offer_id = event.data.offer_id
else:
continue
No entanto, também podemos aguardar alguns segundos e, em seguida, consultar o trabalho diretamente na API jobRouter para ver se foi emitida uma oferta.
from azure.communication.jobrouter.models import (
RouterWorker,
)
router_worker: RouterWorker = router_client.get_worker(worker_id = "worker-1")
for offer in router_worker.offers:
print(f"Worker {router_worker.id} has an active offer for job {offer.job_id}")
Aceitar uma oferta
Quando um trabalhador recebe uma oferta, pode tomar duas ações possíveis: aceitar ou recusar. Vamos aceitar a oferta.
from azure.communication.jobrouter.models import (
RouterJobOffer,
AcceptJobOfferResult,
RouterJobStatus
)
# fetching the offer id
job_offer: RouterJobOffer = [offer for offer in router_worker.offers if offer.job_id == "jobId-1"][0]
offer_id = job_offer.offer_id
# accepting the offer sent to `worker-1`
accept_job_offer_result: AcceptJobOfferResult = router_client.accept_job_offer(
worker_id = "worker-1",
offer_id = offer_id
)
print(f"Offer: {job_offer.offer_id} sent to worker: {router_worker.id} has been accepted")
print(f"Job has been assigned to worker: {router_worker.id} with assignment: {accept_job_offer_result.assignment_id}")
# verify job assignment is populated when querying job
updated_job = router_client.get_job(job_id = "jobId-1")
print(f"Job assignment has been successful: {updated_job.job_status == RouterJobStatus.Assigned and accept_job_offer_result.assignment_id in updated_job.assignments}")
Concluir uma tarefa
Assim que a função de trabalho estiver concluída com o trabalho, a função de trabalho tem de marcar a tarefa como completed
.
import datetime
from azure.communication.jobrouter.models import (
CompleteJobOptions
)
complete_job_result = router_client.complete_job(
"jobId-1",
accept_job_offer_result.assignment_id,
CompleteJobOptions(
note = f"Job has been completed by {router_worker.id} at {datetime.datetime.utcnow()}"
)
)
print(f"Job has been successfully completed.")
Fechar uma tarefa
Após a conclusão de uma tarefa, a função de trabalho pode realizar ações de encapsulamento para a tarefa antes de fechar a tarefa e, por fim, libertar a sua capacidade para aceitar mais tarefas de entrada
from azure.communication.jobrouter.models import (
RouterJob,
RouterJobStatus,
CloseJobOptions,
)
close_job_result = router_client.close_job(
"jobId-1",
accept_job_offer_result.assignment_id,
CloseJobOptions(
note = f"Job has been closed by {router_worker.id} at {datetime.datetime.utcnow()}"
)
)
print(f"Job has been successfully closed.")
update_job: RouterJob = router_client.get_job(job_id = "jobId-1")
print(f"Updated job status: {update_job.job_status == RouterJobStatus.CLOSED}")
import time
from datetime import datetime, timedelta
from azure.communication.jobrouter.models import (
RouterJob,
RouterJobStatus,
CloseJobOptions,
)
close_job_in_future_result = router_client.close_job(
"jobId-1",
accept_job_offer_result.assignment_id,
CloseJobOptions(
note = f"Job has been closed by {router_worker.id} at {datetime.utcnow()}",
close_at = datetime.utcnow() + timedelta(seconds = 2)
)
)
print(f"Job has been marked to close")
time.sleep(secs = 2)
update_job: RouterJob = router_client.get_job(job_id = "jobId-1")
print(f"Updated job status: {update_job.job_status == RouterJobStatus.CLOSED}")
Resolução de problemas
Está a ter problemas? Esta secção deve conter detalhes sobre o que fazer lá.
Passos seguintes
Mais código de exemplo
Veja o diretório de exemplos para obter exemplos detalhados sobre como utilizar esta biblioteca.
Enviar Comentários
Se encontrar erros ou tiver sugestões, submeta um problema na secção Problemas do projeto
Contribuir
Agradecemos todas as contribuições e sugestões para este projeto. A maioria das contribuições requerem que celebre um Contrato de Licença de Contribuição (CLA) no qual se declare que tem o direito de conceder e que, na verdade, concede-nos os direitos para utilizar a sua contribuição. Para obter detalhes, visite cla.microsoft.com.
Este projeto adotou o Microsoft Open Source Code of Conduct (Código de Conduta do Microsoft Open Source). Para obter mais informações, consulte as FAQ do Código de Conduta ou o contacto opencode@microsoft.com com quaisquer perguntas ou comentários adicionais.
Azure SDK for Python