Sdílet prostřednictvím


Klientská knihovna balíčku Azure Communication JobRouter pro Python – verze 1.0.0

Tento balíček obsahuje sadu Python SDK pro Azure Communication Services pro JobRouter. Další informace o Azure Communication Services najdete tady.

Zdrojový kód | Balíček (Pypi) | Dokumentace k produktu

Právní omezení

Podpora balíčků Azure SDK Python pro Python 2.7 skončila 1. ledna 2022. Další informace a dotazy najdete na https://github.com/Azure/azure-sdk-for-python/issues/20691

Začínáme

Požadavky

K použití tohoto balíčku potřebujete předplatné Azure a prostředek komunikační služby .

Instalace balíčku

Nainstalujte klientskou knihovnu Azure Communication JobRouter pro Python pomocí pip:

pip install azure-communication-jobrouter

Klíčové koncepty

Úloha

Úloha představuje jednotku práce, kterou je potřeba směrovat na dostupného pracovního procesu. Reálným příkladem může být příchozí hovor nebo chat v kontextu call centra.

Pracovník

Pracovní proces představuje zdroj, který je k dispozici pro zpracování úlohy. Každý pracovní proces registruje pro příjem úloh nebo více front. Reálným příkladem může být agent pracující v call centru.

Fronta

Fronta představuje seřazený seznam úloh čekajících na obsluhu pracovním procesem. Pracovní procesy se zaregistrují do fronty, aby z ní mohli přijímat práci. Skutečným příkladem může být fronta volání v call centru.

Kanál

Kanál představuje seskupení úloh podle nějakého typu. Když se pracovní proces registruje k příjmu práce, musí také určit, pro které kanály může zpracovávat práci a kolik z každého z nich může zpracovávat současně. Skutečný příklad může být voice calls nebo chats v call centru.

Nabídka

JobRouter rozšiřuje nabídku na pracovní proces pro zpracování konkrétní úlohy, když určí shodu. Toto oznámení se obvykle doručuje přes EventGrid. Pracovní proces může nabídku buď přijmout, nebo odmítnout pomocí rozhraní th JobRouter API, nebo její platnost vyprší podle doby života nakonfigurované v zásadách distribuce. Reálným příkladem může být vyzvánění agenta v call centru.

Zásady distribuce

Zásady distribuce představují konfigurační sadu, která řídí způsob distribuce úloh ve frontě do pracovních procesů registrovaných v této frontě. Tato konfigurace zahrnuje, jak dlouho je nabídka platná, než vyprší její platnost, a režim distribuce, který definuje pořadí, ve kterém se vyberou pracovní procesy, pokud je k dispozici více pracovních procesů.

Režim distribuce

3 typy režimů jsou

  • Kruhové dotazování: Pracovní procesy se seřadí podle Id a po předchozím pracovníkovi, který dostal nabídku, se vybere další.
  • Nejdelší nečinnost: Pracovník, který nejdéle nepracoval na práci.
  • Nejlepší pracovní proces: Můžete zadat výraz pro porovnání 2 pracovních procesů a určit, který z nich vybrat.

Popisky

Popisky můžete připojit k pracovním procesům, úlohám a frontám. Jedná se o páry hodnot klíčů, které můžou být datového stringtypu , number nebo boolean . Příkladem z reálného světa může být úroveň dovedností konkrétního pracovníka nebo týmu nebo zeměpisné umístění.

Selektory popisků

Selektory popisků je možné připojit k úloze, aby cílily na podmnožinu pracovních procesů obsluhujících frontu. Reálným příkladem toho může být podmínka příchozího hovoru, že agent musí mít minimální úroveň znalostí konkrétního produktu.

Zásady klasifikace

Zásady klasifikace lze použít k dynamickému výběru fronty, určení priority úlohy a připojení selektorů popisků pracovních procesů k úloze pomocí modulu pravidel.

Zásady výjimek

Zásady výjimek řídí chování úlohy na základě triggeru a provádějí požadovanou akci. Zásady výjimky jsou připojené k frontě, aby mohly řídit chování úloh ve frontě.

Příklady

Inicializace klienta

K inicializaci klienta SMS je možné k vytvoření instance použít připojovací řetězec. Alternativně můžete použít také ověřování Active Directory pomocí DefaultAzureCredential.

from azure.communication.jobrouter import (
    JobRouterClient,
    JobRouterAdministrationClient
)

connection_string = "endpoint=ENDPOINT;accessKey=KEY"
router_client = JobRouterClient.from_connection_string(conn_str = connection_string)
router_admin_client = JobRouterAdministrationClient.from_connection_string(conn_str = connection_string)

Zásady distribuce

Než budeme moct vytvořit frontu, potřebujeme zásady distribuce.

from azure.communication.jobrouter.models import (
    LongestIdleMode,
    DistributionPolicy
)

distribution_policy: DistributionPolicy = DistributionPolicy(
    offer_expires_after_seconds = 24 * 60 * 60,
    mode = LongestIdleMode(
        min_concurrent_offers = 1,
        max_concurrent_offers = 1
    )
)

distribution_policy: DistributionPolicy = router_admin_client.upsert_distribution_policy(
    "distribution-policy-1",
    distribution_policy
)

Fronta

Dále můžeme vytvořit frontu.

from azure.communication.jobrouter.models import (
    RouterQueue
)

queue: RouterQueue = RouterQueue(
    distribution_policy_id = "distribution-policy-1"
)

queue: RouterQueue = router_admin_client.upsert_queue(
    "queue-1",
    queue
)

Úloha

Teď můžeme odeslat úlohu přímo do této fronty, přičemž selektor pracovního procesu vyžaduje, aby měl popisek Some-Skill větší než 10.

from azure.communication.jobrouter.models import (
    RouterJob,
    RouterWorkerSelector,
    LabelOperator
)

router_job: RouterJob = RouterJob(
    channel_id = "my-channel",
    queue_id = "queue-1",
    channel_reference = "12345",
    priority = 1,
    requested_worker_selectors = [
        RouterWorkerSelector(key = "Some-Skill", label_operator = LabelOperator.EQUAL, value = 10)
    ]
)

job: RouterJob = router_client.upsert_job(
    "jobId-1",
    router_job
)

Pracovník

Teď zaregistrujeme pracovní proces pro příjem práce z této fronty s popiskem rovnou Some-Skill 11.

from azure.communication.jobrouter.models import (
    RouterWorker,
    RouterChannel
)

router_worker: RouterWorker = RouterWorker(
    capacity = 1,
    queues = [
        "queue-1"
    ],
    labels = {
        "Some-Skill": 11
    },
    channels = [
        RouterChannel(channel_id = "my-channel", capacity_cost_per_job = 1)
    ],
    available_for_offers = True
)

worker = router_client.upsert_worker(
    "worker-1",
    router_worker
)

Nabídka

Měli bychom získat RouterWorkerOfferIssued z našeho předplatného EventGrid.

Existuje několik různých služeb Azure, které fungují jako obslužná rutina události. V tomto scénáři předpokládáme, že k doručování událostí používáme webhooky. Další informace o doručování událostí webhooku

Jakmile se události doručí do obslužné rutiny události, můžeme datovou část JSON deserializovat do seznamu událostí.

# Parse the JSON payload into a list of events
from azure.eventgrid import EventGridEvent
import json

## deserialize payload into a list of typed Events
events = [EventGridEvent.from_json(json.loads(msg)) for msg in payload]
offer_id = ""
for event in events:
    if event.event_type == "Microsoft.Communication.RouterWorkerOfferIssued":
        offer_id = event.data.offer_id
    else:
        continue

Mohli bychom ale také několik sekund počkat a pak se dotázat pracovního procesu přímo na rozhraní API JobRouter, abychom zjistili, jestli pro něj nebyla vydána nabídka.

from azure.communication.jobrouter.models import (
    RouterWorker,
)

router_worker: RouterWorker = router_client.get_worker(worker_id = "worker-1")

for offer in router_worker.offers:
    print(f"Worker {router_worker.id} has an active offer for job {offer.job_id}")

Přijetí nabídky

Jakmile pracovník obdrží nabídku, může provést dvě možné akce: přijmout nebo odmítnout. Nabídku přijmeme.

from azure.communication.jobrouter.models import (
    RouterJobOffer,
    AcceptJobOfferResult,
    RouterJobStatus
)

# fetching the offer id
job_offer: RouterJobOffer = [offer for offer in router_worker.offers if offer.job_id == "jobId-1"][0]
offer_id = job_offer.offer_id

# accepting the offer sent to `worker-1`
accept_job_offer_result: AcceptJobOfferResult = router_client.accept_job_offer(
    worker_id = "worker-1",
    offer_id = offer_id
)

print(f"Offer: {job_offer.offer_id} sent to worker: {router_worker.id} has been accepted")
print(f"Job has been assigned to worker: {router_worker.id} with assignment: {accept_job_offer_result.assignment_id}")

# verify job assignment is populated when querying job
updated_job = router_client.get_job(job_id = "jobId-1")
print(f"Job assignment has been successful: {updated_job.job_status == RouterJobStatus.Assigned and accept_job_offer_result.assignment_id in updated_job.assignments}")

Dokončení úlohy

Jakmile pracovní proces dokončí úlohu, musí ji označit jako completed.

import datetime
from azure.communication.jobrouter.models import (
    CompleteJobOptions
)
complete_job_result = router_client.complete_job(
    "jobId-1",
    accept_job_offer_result.assignment_id,
    CompleteJobOptions(
        note = f"Job has been completed by {router_worker.id} at {datetime.datetime.utcnow()}"
    )
)

print(f"Job has been successfully completed.")

Zavření úlohy

Po dokončení úlohy může pracovní proces před uzavřením úlohy provést akce zabalení a nakonec uvolnit svou kapacitu pro příjem dalších příchozích úloh.

from azure.communication.jobrouter.models import (
    RouterJob,
    RouterJobStatus,
    CloseJobOptions,
)

close_job_result = router_client.close_job(
    "jobId-1",
    accept_job_offer_result.assignment_id,
    CloseJobOptions(
        note = f"Job has been closed by {router_worker.id} at {datetime.datetime.utcnow()}"
    )
)

print(f"Job has been successfully closed.")

update_job: RouterJob = router_client.get_job(job_id = "jobId-1")
print(f"Updated job status: {update_job.job_status == RouterJobStatus.CLOSED}")
import time
from datetime import datetime, timedelta
from azure.communication.jobrouter.models import (
    RouterJob,
    RouterJobStatus,
    CloseJobOptions,
)

close_job_in_future_result = router_client.close_job(
    "jobId-1",
    accept_job_offer_result.assignment_id,
    CloseJobOptions(
        note = f"Job has been closed by {router_worker.id} at {datetime.utcnow()}",
        close_at = datetime.utcnow() + timedelta(seconds = 2)
    )
)

print(f"Job has been marked to close")
time.sleep(secs = 2)
update_job: RouterJob = router_client.get_job(job_id = "jobId-1")
print(f"Updated job status: {update_job.job_status == RouterJobStatus.CLOSED}")

Řešení potíží

Máte problémy? Tato část by měla obsahovat podrobnosti o tom, co tam dělat.

Další kroky

Další vzorový kód

Podrobné příklady použití této knihovny najdete v adresáři samples .

Zadání zpětné vazby

Pokud narazíte na nějaké chyby nebo máte návrhy, nahlaste problém v části Problémy projektu.

Přispívání

Tento projekt vítá příspěvky a návrhy. Většina příspěvků vyžaduje souhlas s licenční smlouvou s přispěvatelem (CLA), která stanoví, že máte právo udělit nám práva k používání vašeho příspěvku a skutečně tak činíte. Podrobnosti najdete v cla.microsoft.com.

Tento projekt přijal pravidla chování pro Microsoft Open Source. Další informace najdete v nejčastějších dotazech k pravidlům chování nebo kontaktujte s opencode@microsoft.com případnými dalšími dotazy nebo připomínkami.