用于 Python 的 Azure SDK 抽象化对基础 Azure 服务通信协议(无论是 HTTP 还是 AMQP)的调用(用于消息传送 SDK,如 ServiceBus 和 EventHubs)。 例如,如果使用一个使用 HTTP 的库,则用于 Python 的 Azure SDK 发出 HTTP 请求,并在后台接收 HTTP 响应。 SDK 将消除这种复杂性,以便可以使用直观的 Python 对象,而不是原始 HTTP 响应或 JSON 有效负载。
了解从 SDK 操作中接收的对象类型对于编写有效的 Azure 应用程序至关重要。 本文介绍遇到的常见响应类型及其与基础 HTTP 通信的关系。
注释
本文仅检查 HTTP 方案,而不是 AMQP 方案。
反序列化的 Python 对象
用于 Python 的 Azure SDK 通过从服务作返回强类型 Python 对象来确定开发人员工作效率的优先级。 无需直接分析 JSON 或处理 HTTP 状态代码,而是使用将 Azure 资源表示为 Python 对象的资源模型。
例如,从 Azure 存储检索 Blob 时,会收到一个对象 BlobProperties,它具有 name、size 和 last_modified 等属性,而不是原始的 JSON 字典:
from azure.storage.blob import BlobServiceClient
# Connect to storage account
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
container_client = blob_service_client.get_container_client("mycontainer")
# Get blob properties - returns a BlobProperties object
blob_client = container_client.get_blob_client("myblob.txt")
properties = blob_client.get_blob_properties()
# Access properties as Python attributes
print(f"Blob name: {properties.name}")
print(f"Blob size: {properties.size} bytes")
print(f"Last modified: {properties.last_modified}")
数据来自何处
了解数据流有助于了解 SDK 在后台执行的作:
-
代码调用 SDK 方法: 调用类似于
get_blob_properties().. - SDK 构造 HTTP 请求: SDK 使用标头、身份验证和查询参数生成相应的 HTTP 请求。
- Azure 服务响应: 服务返回 HTTP 响应,通常在响应正文中使用 JSON 有效负载。
-
SDK 处理响应: The SDK:
- 检查 HTTP 状态代码。
- 分析响应正文(通常是 JSON)。
- 根据预期的架构验证数据。
- 将数据映射到 Python 模型对象。
- 代码接收 Python 对象: 使用反序列化的对象,而不是原始 HTTP 数据。
通过此抽象,可以专注于应用程序逻辑,而不是 HTTP 协议详细信息。
常见响应类型
适用于 Python 的 Azure SDK 在所有服务中使用多个标准响应类型。 了解这些类型有助于有效地处理任何 Azure 服务。
资源模型
大多数 SDK 操作返回资源模型。 这些 Python 对象表示 Azure 资源。 模型特定于服务,但遵循一致的模式:
# Azure Key Vault example
from azure.keyvault.secrets import SecretClient
secret_client = SecretClient(vault_url=vault_url, credential=credential)
secret = secret_client.get_secret("mysecret") # Returns KeyVaultSecret
print(f"Secret name: {secret.name}")
print(f"Secret value: {secret.value}")
print(f"Secret version: {secret.properties.version}")
# Azure Cosmos DB example
from azure.cosmos import CosmosClient
cosmos_client = CosmosClient(url=cosmos_url, credential=credential)
database = cosmos_client.get_database_client("mydatabase")
container = database.get_container_client("mycontainer")
item = container.read_item(item="item-id", partition_key="partition-value") # Returns dict
print(f"Item ID: {item['id']}")
集合结果的 ItemPaged
当 SDK 列出资源时,它将返回 ItemPaged 以透明方式处理分页的对象:
from azure.storage.blob import BlobServiceClient
from azure.core.paging import ItemPaged
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
container_client = blob_service_client.get_container_client("mycontainer")
# list_blobs returns ItemPaged[BlobProperties]
blobs: ItemPaged[BlobProperties] = container_client.list_blobs()
# Iterate naturally - SDK handles pagination
for blob in blobs:
print(f"Blob: {blob.name}, Size: {blob.size}")
访问原始 HTTP 响应
虽然 SDK 的高级抽象满足大多数需求,但有时需要访问基础 HTTP 响应。 常见方案包括:
- 调试失败的请求。
- 访问自定义响应标头。
- 实现自定义重试逻辑。
- 使用非标准响应格式。
大多数 SDK 方法都接受参数 raw_response_hook :
from azure.keyvault.secrets import SecretClient
secret_client = SecretClient(vault_url=vault_url, credential=credential)
def inspect_response(response):
# Access the raw HTTP response
print(f"Request URL: {response.http_request.url}")
print(f"Status code: {response.http_response.status_code}")
print(f"Response headers: {dict(response.http_response.headers)}")
# Access custom headers
request_id = response.http_response.headers.get('x-ms-request-id')
print(f"Request ID: {request_id}")
# Must return the response
return response
# Hook is called before deserialization
secret = secret_client.get_secret("mysecret", raw_response_hook=inspect_response)
分页和迭代器
Azure 服务通常会返回大量资源集合。 SDK 用于 ItemPaged 高效处理这些集合,而无需一次性将所有内容加载到内存中。
自动分页
SDK 会在循环访问时自动提取新页面:
# List all blobs - could be thousands
blobs = container_client.list_blobs()
# SDK fetches pages as needed during iteration
for blob in blobs:
process_blob(blob) # Pages loaded on-demand
显式地操作页面
还可以在需要时直接处理页面:
blobs = container_client.list_blobs()
# Process by page
for page in blobs.by_page():
print(f"Processing page with {len(list(page))} items")
for blob in page:
process_blob(blob)
控制页面大小
许多列表操作接受参数 results_per_page:
# Fetch 100 items per page instead of the default
blobs = container_client.list_blobs(results_per_page=100)
某些 Azure 服务的一些方法具有用于控制页面大小的其他机制。 例如,Azure Key Vault 和 Azure 搜索使用 top kwarg 来限制每个调用的结果。 有关使用 Azure 搜索 search() 方法的示例,请参阅 源代码。
特殊情况:长时间运行的作和轮询器
某些 Azure作无法立即完成。 示例包括:
- 创建或删除虚拟机。
- 部署 Azure 资源管理器模板。
- 训练机器学习模型。
- 复制大型 Blob。
这些作返回跟踪作进度的轮询器对象。
使用投票器
from azure.mgmt.storage import StorageManagementClient
storage_client = StorageManagementClient(credential, subscription_id)
# Start storage account creation
poller = storage_client.storage_accounts.begin_create(
resource_group_name="myresourcegroup",
account_name="mystorageaccount",
parameters=storage_parameters
)
# Option 1: Wait for completion (blocking)
storage_account = poller.result()
# Option 2: Check status periodically
while not poller.done():
print(f"Status: {poller.status()}")
time.sleep(5)
storage_account = poller.result()
异步轮询器
使用 async/await 模式时,可以使用 AsyncLROPoller:
from azure.storage.blob.aio import BlobServiceClient
async with BlobServiceClient.from_connection_string(connection_string) as client:
container_client = client.get_container_client("mycontainer")
# Start async copy operation
blob_client = container_client.get_blob_client("large-blob.vhd")
poller = await blob_client.begin_copy_from_url(source_url)
# Wait for async completion
copy_properties = await poller.result()
用于长时期运行作业的对象轮询示例:虚拟机
部署虚拟机是一个操作的示例,它需要时间来完成,并通过返回轮询器对象来进行处理(LROPoller 对于同步代码,AsyncLROPoller 对于异步代码):
from azure.mgmt.compute import ComputeManagementClient
from azure.core.polling import LROPoller
compute_client = ComputeManagementClient(credential, subscription_id)
# Start VM creation - returns immediately with a poller
poller: LROPoller = compute_client.virtual_machines.begin_create_or_update(
resource_group_name="myresourcegroup",
vm_name="myvm",
parameters=vm_parameters
)
# Wait for completion and get the result
vm = poller.result() # Blocks until operation completes
print(f"VM {vm.name} provisioned successfully")
访问分页结果的响应
对于分页结果,请使用 by_page() 以下方法 raw_response_hook:
def page_response_hook(response):
continuation_token = response.http_response.headers.get('x-ms-continuation')
print(f"Continuation token: {continuation_token}")
return response
blobs = container_client.list_blobs()
for page in blobs.by_page(raw_response_hook=page_response_hook):
for blob in page:
print(blob.name)
最佳做法
首选高级抽象。
尽可能使用 SDK 的资源模型,而不是原始响应。
避免访问以下划线(_)为前缀的任何方法。 按照惯例,这些方法在 Python 中是私有的。 与公共 API 相比,对重大更改等问题没有保证:
# Preferred: Work with typed objects secret = secret_client.get_secret("mysecret") if secret.properties.enabled: use_secret(secret.value) # Avoid: Manual JSON parsing (unless necessary) ... # AND avoid accessing any objects or methods that start with `_` response = secret_client._client.get(...) # Don't access internal clients data = json.loads(response.text) if data['attributes']['enabled']: use_secret(data['value'])合理处理分页。 始终遍历分页结果,而不是将其转换为列表。
# Good: Memory-efficient iteration for blob in container_client.list_blobs(): process_blob(blob) # Avoid: Loading everything into memory all_blobs = list(container_client.list_blobs()) # Could consume excessive memory使用 poller.result() 来用于执行长时间运行的操作。 始终使用
result()此方法来确保操作顺利完成。# Correct: Wait for operation completion poller = compute_client.virtual_machines.begin_delete( resource_group_name="myresourcegroup", vm_name="myvm" ) poller.result() # Ensures deletion completes print("VM deleted successfully") # Wrong: Assuming immediate completion poller = compute_client.virtual_machines.begin_delete(...) print("VM deleted successfully") # Deletion might still be in progress!仅在需要时访问原始响应。 请谨慎使用原始响应访问,并且仅用于特定需求场景:
# Good use case: Debugging or logging def log_request_id(response): request_id = response.http_response.headers.get('x-ms-request-id') logger.info(f"Operation request ID: {request_id}") return response blob_client.upload_blob(data, raw_response_hook=log_request_id) # Good use case: Custom error handling def check_custom_header(response): if response.http_response.headers.get('x-custom-error'): raise CustomApplicationError("Custom error condition detected") return response