你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
适用于 Python 的 Azure 通信聊天包客户端库 - 版本 1.2.0
此包包含用于聊天Azure 通信服务的 Python SDK。 在此处阅读有关Azure 通信服务的详细信息
源代码 | 包 (Pypi) | 包 (Conda) | API 参考文档 | 产品文档
免责声明
对 Python 2.7 的 Azure SDK Python 包支持已于 2022 年 1 月 1 日结束。 有关详细信息和问题,请参阅 https://github.com/Azure/azure-sdk-for-python/issues/20691
入门
先决条件
- 使用此包需要 Python 3.7 或更高版本。
- 已部署的通信服务资源。 可以使用 Azure 门户或Azure PowerShell对其进行设置。
安装包
安装 Azure 通信服务聊天 SDK。
pip install --pre azure-communication-chat
用户访问令牌
使用用户访问令牌可以生成直接对 Azure 通信服务进行身份验证的客户端应用程序。 可以使用 azure.communication.identity 模块生成这些令牌,然后使用它们初始化通信服务 SDK。 使用 azure.communication.identity 的示例:
pip install azure-communication-identity
from azure.communication.identity import CommunicationIdentityClient
identity_client = CommunicationIdentityClient.from_connection_string("<connection string of your Communication service>")
user = identity_client.create_user()
tokenresponse = identity_client.get_token(user, scopes=["chat"])
token = tokenresponse.token
user
稍后将使用上面创建的 ,因为在使用此令牌创建用户时,应将该用户添加为新聊天线程的参与者。 这是因为创建请求的发起方必须位于聊天线程的参与者列表中。
创建聊天客户端
这将允许你创建、获取、列出或删除聊天线程。
from azure.communication.chat import ChatClient, CommunicationTokenCredential
# Your unique Azure Communication service endpoint
endpoint = "https://<RESOURCE_NAME>.communcationservices.azure.com"
chat_client = ChatClient(endpoint, CommunicationTokenCredential(token))
创建聊天线程客户端
ChatThreadClient 将允许你执行特定于聊天线程的操作,例如发送消息、获取消息、更新聊天线程主题、将参与者添加到聊天线程等。
可以通过使用 ChatClient 创建新的聊天线程来获取它:
create_chat_thread_result = chat_client.create_chat_thread(topic)
chat_thread_client = chat_client.get_chat_thread_client(create_chat_thread_result.chat_thread.id)
此外,客户端还可以定向,以便请求可重复;也就是说,如果客户端多次使用同一 Idempotency-Token 发出请求,它将返回适当的响应,而无需服务器多次执行请求。 Idempotency-Token 的值是一个不透明的字符串,表示请求的客户端生成的全局唯一标识符。
create_chat_thread_result = chat_client.create_chat_thread(
topic,
thread_participants=thread_participants,
idempotency_token=idempotency_token
)
chat_thread_client = chat_client.get_chat_thread_client(create_chat_thread_result.chat_thread.id)
或者,如果之前已创建聊天线程,并且具有其thread_id,则可以通过以下方式创建它:
chat_thread_client = chat_client.get_chat_thread_client(thread_id) # thread_id is the id of an existing chat thread
关键概念
聊天会话由聊天线程表示。 线程中的每个用户称为线程参与者。 线程参与者可以在 1:1 聊天中私下相互聊天,也可以在 1:N 群组聊天中挤在一起。 用户还可以在其他人键入和阅读消息时获得近乎实时的更新。
初始化 ChatClient
类后,可以执行以下聊天操作:
创建、获取、更新和删除线程
对线程执行 CRD (创建-读取-删除) 操作
create_chat_thread(topic, **kwargs)
list_chat_threads(**kwargs)
delete_chat_thread(thread_id, **kwargs)
初始化 ChatThreadClient
类后,可以执行以下聊天操作:
更新线程
对线程主题执行更新操作
update_topic(topic, **kwargs)
获取聊天线程属性
get_properties(**kwargs)
发送、获取、更新和删除邮件
对消息执行 CRUD (Create-Read-Update-Delete) 操作
send_message(content, **kwargs)
get_message(message_id, **kwargs)
list_messages(**kwargs)
update_message(message_id, content, **kwargs)
delete_message(message_id, **kwargs)
获取、添加和删除参与者
对线程参与者执行 CRD (Create-Read-Delete) 操作
list_participants(**kwargs)
add_participants(thread_participants, **kwargs)
remove_participant(participant_identifier, **kwargs)
发送键入通知
通知服务键入通知
send_typing_notification(**kwargs)
发送并获取已读回执
通知服务已读取消息并获取已读消息列表。
send_read_receipt(message_id, **kwargs)
list_read_receipts(**kwargs)
示例
以下部分提供了几个代码片段,涵盖了一些最常见的任务,包括:
线程操作
创建线程
使用 create_chat_thread
方法创建聊天会话。
- 使用
topic
(必需)提供线程主题; - 使用
thread_participants
(可选)提供ChatParticipant
要添加到线程的列表;user
,必需,它是CommunicationUserIdentifier
CommunicationIdentityClient.create_user () 从用户访问令牌创建的
display_name
(可选)是会话参与者的显示名称。share_history_time
,可选,是开始与参与者共享聊天历史记录的时间。
- 使用
idempotency_token
(可选)指定请求的唯一标识符。
CreateChatThreadResult
是创建线程后返回的结果,可以使用它来提取已创建的聊天线程的 id
。 然后,可以使用此 id
通过 ChatThreadClient
方法提取 get_chat_thread_client
对象。 ChatThreadClient
可用于对此聊天线程执行其他聊天操作。
# Without idempotency_token and thread_participants
topic = "test topic"
create_chat_thread_result = chat_client.create_chat_thread(topic)
chat_thread_client = chat_client.get_chat_thread_client(create_chat_thread_result.chat_thread.id)
# With idempotency_token and thread_participants
from azure.communication.identity import CommunicationIdentityClient
from azure.communication.chat import ChatParticipant, ChatClient, CommunicationTokenCredential
import uuid
from datetime import datetime
# create an user
identity_client = CommunicationIdentityClient.from_connection_string('<connection_string>')
user = identity_client.create_user()
# user access tokens
tokenresponse = identity_client.get_token(user, scopes=["chat"])
token = tokenresponse.token
## OR pass existing user
# from azure.communication.chat import CommunicationUserIdentifier
# user_id = 'some_user_id'
# user = CommunicationUserIdentifier(user_id)
# create the chat_client
endpoint = "https://<RESOURCE_NAME>.communcationservices.azure.com"
chat_client = ChatClient(endpoint, CommunicationTokenCredential(token))
# modify function to implement customer logic
def get_unique_identifier_for_request(**kwargs):
res = uuid.uuid4()
return res
topic = "test topic"
thread_participants = [ChatParticipant(
identifier=user,
display_name='name',
share_history_time=datetime.utcnow()
)]
# obtains idempotency_token using some customer logic
idempotency_token = get_unique_identifier_for_request()
create_chat_thread_result = chat_client.create_chat_thread(
topic,
thread_participants=thread_participants,
idempotency_token=idempotency_token)
thread_id = create_chat_thread_result.chat_thread.id
# fetch ChatThreadClient
chat_thread_client = chat_client.get_chat_thread_client(create_chat_thread_result.chat_thread.id)
# Additionally, you can also check if all participants were successfully added or not
# and subsequently retry adding the failed participants again
def decide_to_retry(error, **kwargs):
"""
Insert some custom logic to decide if retry is applicable based on error
"""
return True
retry = [thread_participant for thread_participant, error in create_chat_thread_result.errors if decide_to_retry(error)]
if retry:
chat_thread_client.add_participants(retry)
获取线程
使用 get_properties
方法从服务中检索 ChatThreadProperties
;thread_id
是线程的唯一 ID。
chat_thread_properties = chat_thread_client.get_properties()
列出聊天会话
Use list_chat_threads
方法检索已创建的聊天线程的列表
- 使用
results_per_page
、(可选)每页要返回的最大消息数。 - 使用
start_time
、(可选)范围查询的开始时间。
[ChatThreadItem]
的迭代器是列出线程后返回的响应
from azure.communication.chat import ChatClient, CommunicationTokenCredential
from datetime import datetime, timedelta
token = "<token>"
endpoint = "https://<RESOURCE_NAME>.communcationservices.azure.com"
chat_client = ChatClient(endpoint, CommunicationTokenCredential(token))
start_time = datetime.utcnow() - timedelta(days=2)
chat_threads = chat_client.list_chat_threads(results_per_page=5, start_time=start_time)
for chat_thread_item_page in chat_threads.by_page():
for chat_thread_item in chat_thread_item_page:
print("thread id:", chat_thread_item.id)
更新线程主题
使用 update_topic
方法更新线程的属性。 topic
用于描述线程主题的更改
- 使用
topic
为线程提供新主题;
topic = "new topic"
chat_thread_client.update_topic(topic=topic)
chat_thread = chat_thread_client.get_properties(thread_id)
assert chat_thread.topic == topic
删除线程
使用 delete_chat_thread
方法删除线程; thread_id
是线程的唯一 ID。
- 使用必需的
thread_id
指定线程的唯一 ID。
chat_client.delete_chat_thread(thread_id=thread_id)
消息操作
发送消息
使用 send_message
方法将消息发送到 由 标识的 thread_id
线程。
- 使用
content
(必需)提供聊天消息内容。 - 使用
chat_message_type
(可选)提供聊天消息类型。 可能的值包括:ChatMessageType.TEXT
、ChatMessageType.HTML
、'text'
、'html'
;如果未指定,ChatMessageType.TEXT
将设置 - 使用
sender_display_name
,optional 指定发件人的显示名称,如果未指定,将设置空名称
SendChatMessageResult
是发送消息后返回的响应,它包含一个 ID,该 ID 是消息的唯一 ID。
from azure.communication.chat import ChatMessageType
topic = "test topic"
create_chat_thread_result = chat_client.create_chat_thread(topic)
thread_id = create_chat_thread_result.chat_thread.id
chat_thread_client = chat_client.get_chat_thread_client(create_chat_thread_result.chat_thread.id)
content='hello world'
sender_display_name='sender name'
chat_message_type = ChatMessageType.TEXT
# without specifying sender_display_name and chat_message_type
send_message_result = chat_thread_client.send_message(content)
send_message_result_id = send_message_result.id
print("Message sent: id: ", send_message_result_id)
# specifying sender_display_name and chat_message_type
send_message_result_w_type = chat_thread_client.send_message(
content,
sender_display_name=sender_display_name,
chat_message_type=chat_message_type # equivalent to chat_message_type = 'text'
)
send_message_result_w_type_id = send_message_result_w_type.id
print("Message sent: id: ", send_message_result_w_type_id)
获取消息
Use get_message
方法从服务检索消息; message_id
是消息的唯一 ID。
- 使用
message_id
,必需,指定现有消息ChatMessage
的消息 ID 是从获取消息返回的响应,它包含 ID,这是消息的唯一 ID,其他字段请参阅 azure.communication.chat.ChatMessage
chat_message = chat_thread_client.get_message(message_id=send_message_result_id)
print("get_chat_message succeeded, message id:", chat_message.id, "content: ", chat_message.content)
列出邮件
Use list_messages
方法从服务中检索消息。
- 使用
results_per_page
、(可选)每页要返回的最大消息数。 - 使用
start_time
、(可选)范围查询的开始时间。
[ChatMessage]
的迭代器是列出消息后返回的响应
from datetime import datetime, timedelta
start_time = datetime.utcnow() - timedelta(days=1)
chat_messages = chat_thread_client.list_messages(results_per_page=1, start_time=start_time)
for chat_message_page in chat_messages.by_page():
for chat_message in chat_message_page:
print("ChatMessage: Id=", chat_message.id, "; Content=", chat_message.content.message)
更新消息
使用 update_message
更新由 threadId 和 messageId 标识的消息。
- 使用必需的
message_id
指定消息的唯一 ID。 - 使用可选的
content
指定要更新的消息内容;如果不指定,则分配空内容
content = "updated message content"
chat_thread_client.update_message(send_message_result_id, content=content)
chat_message = chat_thread_client.get_message(message_id=send_message_result_id)
assert chat_message.content.message == content
删除消息
使用 delete_message
删除邮件。
- 使用
message_id
(必需)是消息的唯一 ID。
chat_thread_client.delete_message(message_id=send_message_result_id)
线程参与者操作
列出线程参与者
使用 list_participants
检索聊天会话的参与者。
- 使用
results_per_page
、可选、每页返回的最大参与者人数。 - 使用
skip
、可选跳过响应中指定位置的参与者。
[ChatParticipant]
的迭代器是所列参与者返回的响应
chat_participants = chat_thread_client.list_participants(results_per_page=5, skip=5)
for chat_participant_page in chat_participants.by_page():
for chat_participant in chat_participant_page:
print("ChatParticipant: ", chat_participant)
添加线程参与者
使用 add_participants
方法将线程参与者添加到线程。
- 使用
thread_participants
(必需)列出ChatParticipant
要添加到线程的 ;user
,必需,它是CommunicationUserIdentifier
CommunicationIdentityClient.create_user () 从用户访问令牌创建的
display_name
(可选)是会话参与者的显示名称。share_history_time
,可选,是开始与参与者共享聊天历史记录的时间。
将返回 list(tuple(ChatParticipant, ChatError))
。 如果已成功添加参与者,则预期会提供一个空列表。 如果在添加参与者时遇到错误,则会在该列表中填充未能添加的参与者以及遇到的错误。
from azure.communication.identity import CommunicationIdentityClient
from azure.communication.chat import ChatParticipant
from datetime import datetime
# create 2 users
identity_client = CommunicationIdentityClient.from_connection_string('<connection_string>')
new_users = [identity_client.create_user() for i in range(2)]
# # conversely, you can also add an existing user to a chat thread; provided the user_id is known
# from azure.communication.chat import CommunicationUserIdentifier
#
# user_id = 'some user id'
# user_display_name = "Wilma Flinstone"
# new_user = CommunicationUserIdentifier(user_id)
# participant = ChatParticipant(
# identifier=new_user,
# display_name=user_display_name,
# share_history_time=datetime.utcnow())
participants = []
for _user in new_users:
chat_participant = ChatParticipant(
identifier=_user,
display_name='Fred Flinstone',
share_history_time=datetime.utcnow()
)
participants.append(chat_participant)
response = chat_thread_client.add_participants(thread_participants=participants)
def decide_to_retry(error, **kwargs):
"""
Insert some custom logic to decide if retry is applicable based on error
"""
return True
# verify if all users has been successfully added or not
# in case of partial failures, you can retry to add all the failed participants
retry = [p for p, e in response if decide_to_retry(e)]
if retry:
chat_thread_client.add_participants(retry)
删除线程参与者
使用 remove_participant
方法将会话参与者从由 threadId 标识的会话中删除。
identifier
CommunicationUserIdentifier
是由 CommunicationIdentityClient.create_user () 从 中创建的azure-communication-identity
和 已添加到此聊天线程中。
- 使用
identifier
指定CommunicationUserIdentifier
创建的
chat_thread_client.remove_participant(identifier=new_user)
# # conversely you can also do the following; provided the user_id is known
# from azure.communication.chat import CommunicationUserIdentifier
#
# user_id = 'some user id'
# chat_thread_client.remove_participant(identifier=CommunicationUserIdentifier(new_user))
事件操作
发送键入通知
使用 send_typing_notification
方法代表用户将键入通知事件发布到线程。
chat_thread_client.send_typing_notification()
发送阅读回执
使用 send_read_receipt
方法代表用户将已读回执事件发布到线程。
- 使用
message_id
指定要发送已读回执的邮件的 ID
content='hello world'
send_message_result = chat_thread_client.send_message(content)
send_message_result_id = send_message_result.id
chat_thread_client.send_read_receipt(message_id=send_message_result_id)
列出阅读回执
使用 list_read_receipts
方法检索线程的已读回执。
- 使用
results_per_page
,可选,每页返回的最大已读回执数。 - 使用
skip
(可选)跳过响应中指定位置的已读回执。
[ChatMessageReadReceipt]
的迭代器是列出阅读回执后返回的响应
read_receipts = chat_thread_client.list_read_receipts(results_per_page=5, skip=5)
for read_receipt_page in read_receipts.by_page():
for read_receipt in read_receipt_page:
print(read_receipt)
print(read_receipt.sender)
print(read_receipt.chat_message_id)
print(read_receipt.read_on)
代码示例
这些代码示例演示了 Azure 通信聊天客户端库的常见方案操作。
示例的异步版本 (追加 _async
的 python 示例文件) 显示异步操作。
在运行示例代码之前,请参阅先决条件
创建资源,然后设置一些环境变量
set AZURE_COMMUNICATION_SERVICE_ENDPOINT="https://<RESOURCE_NAME>.communcationservices.azure.com"
set COMMUNICATION_SAMPLES_CONNECTION_STRING="<connection string of your Communication service>"
pip install azure-communication-identity
python samples\chat_client_sample.py
python samples\chat_client_sample_async.py
python samples\chat_thread_client_sample.py
python samples\chat_thread_client_sample_async.py
疑难解答
遇到问题了? 本部分应包含要执行哪些操作的详细信息。
后续步骤
此处 应提供更多示例代码,以及指向相应示例测试的链接。
贡献
如果遇到任何 bug 或有建议,请在项目的“ 问题 ”部分中提出问题。