你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

适用于 Python 的 Azure 通信作业路由包客户端库 - 版本 1.0.0

此包包含一个 Python SDK for Azure 通信服务 for JobRouter。 在此处阅读有关Azure 通信服务的详细信息

源代码 | 包 (Pypi) | 产品文档

免责声明

对 Python 2.7 的 Azure SDK Python 包支持已于 2022 年 1 月 1 日结束。 有关详细信息和问题,请参阅 https://github.com/Azure/azure-sdk-for-python/issues/20691

入门

先决条件

需要 Azure 订阅通信服务资源 才能使用此包。

安装包

使用 pip 安装适用于 Python 的 Azure Communication JobRouter 客户端库:

pip install azure-communication-jobrouter

关键概念

作业

职位代表工作单元,需要路由到可聘用的工作人员。 实际示例可能是呼叫中心上下文中的传入呼叫或聊天。

辅助角色

辅助角色表示可用于处理作业的供应。 每个辅助角色注册一个或多个队列以接收作业。 实际示例可能是在呼叫中心工作的代理。

队列

队列表示等待辅助角色提供服务的作业的有序列表。 辅助角色将向队列注册以从队列接收工作。 实际示例可能是呼叫中心中的呼叫队列。

通道

通道表示按某种类型对作业进行分组。 当辅助角色注册以接收工作时,还必须指定可以处理工作的通道,以及可同时处理每个通道的数量。 实际示例可以是 voice callschats 在呼叫中心。

产品/服务

在确定匹配项时,JobRouter 将产品/服务扩展到辅助角色以处理特定作业,此通知通常通过 EventGrid 传递。 辅助角色可以使用 JobRouter API 接受或拒绝产品/服务,或者根据分发策略上配置的生存时间过期。 这种情况的一个实际示例可能是呼叫中心中代理的响铃。

分发类型

分发策略表示一个配置集,该配置集控制如何将队列中的作业分发到使用该队列注册的工作器。 此配置包括产品/服务在过期前的有效期以及分发模式,该模式定义当有多个可用辅助角色时选取工作器的顺序。

分发模式

这 3 种类型的模式是

  • 轮循机制:辅助角色按 Id 进行排序,并选择按顺序依次下一个获得聘约的辅助角色。
  • 最长空闲时间:未在作业上工作的时间最长的工作人员。
  • 最佳辅助角色:可以指定一个表达式来比较 2 个辅助角色,以确定要选取哪个辅助角色。

标签

可以将标签附加到辅助角色、作业和队列。 这些键值对可以是 stringnumberboolean 数据类型。 其中一个实际示例可能是特定工作人员、团队或地理位置的技能级别。

标签选择器

标签选择器可以附加到作业,以便针对为队列提供服务的辅助角色的子集。 这种情况的实际示例可能是传入呼叫的一个条件,即代理必须至少了解特定产品。

分类策略

分类策略可用于动态选择队列、确定作业优先级,以及利用规则引擎将辅助角色标签选择器附加到作业。

例外策略

例外策略基于触发器控制职位的行为并执行所需的操作。 例外策略附加到队列,因此可以控制队列中职位的行为。

示例

客户端初始化

若要初始化 SMS 客户端,可以使用 连接字符串 进行实例化。 或者,还可以使用 DefaultAzureCredential 使用 Active Directory 身份验证。

from azure.communication.jobrouter import (
    JobRouterClient,
    JobRouterAdministrationClient
)

connection_string = "endpoint=ENDPOINT;accessKey=KEY"
router_client = JobRouterClient.from_connection_string(conn_str = connection_string)
router_admin_client = JobRouterAdministrationClient.from_connection_string(conn_str = connection_string)

分发类型

在创建队列之前,需要分发策略。

from azure.communication.jobrouter.models import (
    LongestIdleMode,
    DistributionPolicy
)

distribution_policy: DistributionPolicy = DistributionPolicy(
    offer_expires_after_seconds = 24 * 60 * 60,
    mode = LongestIdleMode(
        min_concurrent_offers = 1,
        max_concurrent_offers = 1
    )
)

distribution_policy: DistributionPolicy = router_admin_client.upsert_distribution_policy(
    "distribution-policy-1",
    distribution_policy
)

队列

接下来,我们可以创建队列。

from azure.communication.jobrouter.models import (
    RouterQueue
)

queue: RouterQueue = RouterQueue(
    distribution_policy_id = "distribution-policy-1"
)

queue: RouterQueue = router_admin_client.upsert_queue(
    "queue-1",
    queue
)

作业

现在,我们可以直接向该队列提交作业,使用辅助角色选择器要求辅助角色的标签 Some-Skill 大于 10。

from azure.communication.jobrouter.models import (
    RouterJob,
    RouterWorkerSelector,
    LabelOperator
)

router_job: RouterJob = RouterJob(
    channel_id = "my-channel",
    queue_id = "queue-1",
    channel_reference = "12345",
    priority = 1,
    requested_worker_selectors = [
        RouterWorkerSelector(key = "Some-Skill", label_operator = LabelOperator.EQUAL, value = 10)
    ]
)

job: RouterJob = router_client.upsert_job(
    "jobId-1",
    router_job
)

辅助角色

现在,我们注册一个辅助角色以从该队列接收工作,其标签 Some-Skill 等于 11。

from azure.communication.jobrouter.models import (
    RouterWorker,
    RouterChannel
)

router_worker: RouterWorker = RouterWorker(
    capacity = 1,
    queues = [
        "queue-1"
    ],
    labels = {
        "Some-Skill": 11
    },
    channels = [
        RouterChannel(channel_id = "my-channel", capacity_cost_per_job = 1)
    ],
    available_for_offers = True
)

worker = router_client.upsert_worker(
    "worker-1",
    router_worker
)

产品/服务

我们应该从 EventGrid 订阅中获得一个 RouterWorkerOfferIssued

有几个不同的 Azure 服务充当 事件处理程序。 对于此方案,我们将假定 Webhook 用于事件传递。 详细了解 Webhook 事件传递

将事件传递到事件处理程序后,我们可以将 JSON 有效负载反序列化为事件列表。

# Parse the JSON payload into a list of events
from azure.eventgrid import EventGridEvent
import json

## deserialize payload into a list of typed Events
events = [EventGridEvent.from_json(json.loads(msg)) for msg in payload]
offer_id = ""
for event in events:
    if event.event_type == "Microsoft.Communication.RouterWorkerOfferIssued":
        offer_id = event.data.offer_id
    else:
        continue

但是,我们也可以等待几秒钟,然后直接针对 JobRouter API 查询辅助角色,以查看是否向其发出了聘约。

from azure.communication.jobrouter.models import (
    RouterWorker,
)

router_worker: RouterWorker = router_client.get_worker(worker_id = "worker-1")

for offer in router_worker.offers:
    print(f"Worker {router_worker.id} has an active offer for job {offer.job_id}")

接受产品/服务

辅助角色收到产品/服务后,可以采取两种可能的操作:接受或拒绝。 我们将接受报价。

from azure.communication.jobrouter.models import (
    RouterJobOffer,
    AcceptJobOfferResult,
    RouterJobStatus
)

# fetching the offer id
job_offer: RouterJobOffer = [offer for offer in router_worker.offers if offer.job_id == "jobId-1"][0]
offer_id = job_offer.offer_id

# accepting the offer sent to `worker-1`
accept_job_offer_result: AcceptJobOfferResult = router_client.accept_job_offer(
    worker_id = "worker-1",
    offer_id = offer_id
)

print(f"Offer: {job_offer.offer_id} sent to worker: {router_worker.id} has been accepted")
print(f"Job has been assigned to worker: {router_worker.id} with assignment: {accept_job_offer_result.assignment_id}")

# verify job assignment is populated when querying job
updated_job = router_client.get_job(job_id = "jobId-1")
print(f"Job assignment has been successful: {updated_job.job_status == RouterJobStatus.Assigned and accept_job_offer_result.assignment_id in updated_job.assignments}")

完成作业

辅助角色完成作业后,该辅助角色必须将作业 completed标记为 。

import datetime
from azure.communication.jobrouter.models import (
    CompleteJobOptions
)
complete_job_result = router_client.complete_job(
    "jobId-1",
    accept_job_offer_result.assignment_id,
    CompleteJobOptions(
        note = f"Job has been completed by {router_worker.id} at {datetime.datetime.utcnow()}"
    )
)

print(f"Job has been successfully completed.")

关闭作业

作业完成后,辅助角色可以在关闭作业并最终释放其容量以接受更多传入作业之前对作业执行包装操作

from azure.communication.jobrouter.models import (
    RouterJob,
    RouterJobStatus,
    CloseJobOptions,
)

close_job_result = router_client.close_job(
    "jobId-1",
    accept_job_offer_result.assignment_id,
    CloseJobOptions(
        note = f"Job has been closed by {router_worker.id} at {datetime.datetime.utcnow()}"
    )
)

print(f"Job has been successfully closed.")

update_job: RouterJob = router_client.get_job(job_id = "jobId-1")
print(f"Updated job status: {update_job.job_status == RouterJobStatus.CLOSED}")
import time
from datetime import datetime, timedelta
from azure.communication.jobrouter.models import (
    RouterJob,
    RouterJobStatus,
    CloseJobOptions,
)

close_job_in_future_result = router_client.close_job(
    "jobId-1",
    accept_job_offer_result.assignment_id,
    CloseJobOptions(
        note = f"Job has been closed by {router_worker.id} at {datetime.utcnow()}",
        close_at = datetime.utcnow() + timedelta(seconds = 2)
    )
)

print(f"Job has been marked to close")
time.sleep(secs = 2)
update_job: RouterJob = router_client.get_job(job_id = "jobId-1")
print(f"Updated job status: {update_job.job_status == RouterJobStatus.CLOSED}")

故障排除

遇到问题了? 本部分应包含有关该操作的详细信息。

后续步骤

更多示例代码

请查看 示例 目录,获取有关如何使用此库的详细示例。

提供反馈

如果遇到任何 bug 或有建议,请在项目的“ 问题 ”部分中提出问题

贡献

本项目欢迎贡献和建议。 大多数贡献要求你同意贡献者许可协议 (CLA),并声明你有权(并且确实有权)授予我们使用你的贡献的权利。 有关详细信息,请访问 cla.microsoft.com

此项目采用了 Microsoft 开放源代码行为准则。 有关详细信息,请参阅行为准则常见问题解答,或如果有任何其他问题或意见,请与 联系。