EventHubProducerClient 클래스
EventHubProducerClient 클래스는 Azure Event Hubs 서비스로 이벤트를 보내기 위한 상위 수준 인터페이스를 정의합니다.
- 상속
-
azure.eventhub.aio._client_base_async.ClientBaseAsyncEventHubProducerClient
생성자
EventHubProducerClient(fully_qualified_namespace: str, eventhub_name: str, credential: CredentialTypes, *, buffered_mode: bool = False, on_error: Callable[[List[EventData | AmqpAnnotatedMessage], str | None, Exception], Awaitable[None]] | None = None, on_success: Callable[[List[EventData | AmqpAnnotatedMessage], str | None], Awaitable[None]] | None = None, max_buffer_length: int | None = None, max_wait_time: float | None = None, **kwargs: Any)
매개 변수
- fully_qualified_namespace
- str
Event Hubs 네임스페이스의 정규화된 호스트 이름입니다. 이는 .servicebus.windows.net 유사할 수 있습니다.
- credential
- AsyncTokenCredential 또는 AzureSasCredential 또는 AzureNamedKeyCredential
토큰을 가져오기 위한 특정 인터페이스를 구현하는 인증에 사용되는 자격 증명 개체입니다. *get_token(self, scopes) 메서드를 구현하는 azure-identity 라이브러리 및 개체에서 생성된 또는 자격 증명 개체를 허용EventHubSharedKeyCredential합니다.
- buffered_mode
- bool
True이면 생산자 클라이언트가 버퍼에서 이벤트를 수집하여 효율적으로 일괄 처리한 다음 게시합니다. 기본값은 False입니다.
일괄 처리가 성공적으로 게시되면 호출할 콜백입니다. 콜백은 두 개의 매개 변수를 사용합니다.
events: 성공적으로 게시된 이벤트 목록
partition_id: 목록의 이벤트가 게시된 파티션 ID입니다.
콜백 함수는 on_success(이벤트, partition_id)와 같이 정의해야 합니다. buffered_mode True인 경우 필요하지만 buffered_mode False인 경우 선택 사항입니다.
일괄 처리를 게시하지 못한 후 호출할 콜백입니다. buffered_mode 가 True인 경우 필요하지만 buffered_mode False인 경우 선택 사항입니다. 콜백 함수는 다음과 같이 정의해야 합니다. on_error(이벤트, partition_id, 오류). 여기서는 다음과 같습니다.
events: 게시하지 못한 이벤트 목록입니다.
partition_id: 목록의 이벤트가 및 에 게시하려고 시도한 파티션 ID입니다.
error: 송신 실패와 관련된 예외입니다.
buffered_mode False이면 on_error 콜백은 선택 사항이며 오류는 다음과 같이 처리됩니다.
생산자 클라이언트 인스턴스화 중에 on_error 콜백이 전달되는 경우
그러면 오류 정보가 on_error 콜백에 전달되고 이 콜백이 호출됩니다.
클라이언트 인스턴스화 중에 on_error 콜백이 전달되지 않는 경우
그러면 오류가 기본적으로 발생합니다.
buffered_mode True이면 on_error 콜백이 필요하며 오류는 다음과 같이 처리됩니다.
지정된 시간 제한 내에 이벤트가 큐에 추가되지 않으면 오류가 직접 발생합니다.
큐에 성공적으로 큐에 추가한 후 이벤트가 전송되지 않으면 on_error 콜백이 호출됩니다.
- max_buffer_length
- int
버퍼링 모드만. 플러시를 트리거하기 전에 버퍼링할 수 있는 파티션당 총 이벤트 수입니다. 기본값은 버퍼링 모드에서 1500입니다.
버퍼링 모드만. 게시하기 전에 버퍼의 이벤트로 일괄 처리가 빌드될 때까지 대기하는 시간입니다. 기본값은 버퍼링 모드에서 1입니다.
- logging_enable
- bool
로거에 네트워크 추적 로그를 출력할지 여부입니다. 기본값은 False입니다.
- auth_timeout
- float
서비스에서 토큰이 승인될 때까지 대기하는 시간(초)입니다. 기본값은 60초입니다. 0으로 설정하면 클라이언트에서 시간 제한이 적용되지 않습니다.
- user_agent
- str
지정된 경우 사용자 에이전트 문자열 앞에 추가됩니다.
- retry_total
- int
오류가 발생할 때 실패한 작업을 다시 실행하려는 총 시도 횟수입니다. 기본값은 3입니다.
- retry_backoff_factor
- float
두 번째 시도 후 시도 간에 적용할 백오프 요소입니다(대부분의 오류는 지연 없이 두 번째 시도에 의해 즉시 해결됨). 고정 모드에서는 {backoff factor}에 대해 재시도 정책이 항상 절전 모드로 전환됩니다. '지수' 모드에서 재시도 정책은 {backoff factor} * (2 ** ({총 재시도 횟수} - 1)) 초 동안 절전 모드로 전환됩니다. backoff_factor 0.1이면 재시도 사이에 [0.0s, 0.2s, 0.4s, ...]에 대해 재시도가 절전 모드로 전환됩니다. 기본값은 0.8입니다.
- retry_backoff_max
- float
최대 백오프 시간입니다. 기본값은 120초(2분)입니다.
- retry_mode
- str
재시도 사이의 지연 동작입니다. 지원되는 값은 'fixed' 또는 'exponential'입니다. 여기서 기본값은 '지수'입니다.
- idle_timeout
- float
시간 제한(초)으로, 활동이 없으면 이 클라이언트가 기본 연결을 닫습니다. 기본적으로 값은 None입니다. 즉, 서비스에서 시작하지 않는 한 비활성으로 인해 클라이언트가 종료되지 않습니다.
- transport_type
- TransportType
Event Hubs 서비스와 통신하는 데 사용할 전송 프로토콜의 유형입니다. 기본값은 TransportType.Amqp 이며 이 경우 포트 5671이 사용됩니다. 네트워크 환경에서 포트 5671을 사용할 수 없거나 차단된 경우 통신을 위해 포트 443을 사용하는 대신 TransportType.AmqpOverWebsocket 을 사용할 수 있습니다.
- http_proxy
- dict
HTTP 프록시 설정. 'proxy_hostname'(str value) 및 'proxy_port'(int value) 키가 있는 사전이어야 합니다. 또한 'username', 'password' 키도 있을 수 있습니다.
Event Hubs 서비스에 대한 연결을 설정하는 데 사용할 사용자 지정 엔드포인트 주소로, 호스트 환경에 필요한 애플리케이션 게이트웨이 또는 기타 경로를 통해 네트워크 요청을 라우팅할 수 있습니다. 기본값은 None입니다. 형식은 "sb://< custom_endpoint_hostname>:<custom_endpoint_port>"와 같습니다. 포트가 custom_endpoint_address 지정되지 않은 경우 기본적으로 포트 443이 사용됩니다.
연결 엔드포인트의 ID를 인증하는 데 사용되는 SSL 인증서의 사용자 지정 CA_BUNDLE 파일 경로입니다. 기본값은 None입니다. 이 경우 certifi.where() 가 사용됩니다.
- uamqp_transport
- bool
uamqp 라이브러리를 기본 전송으로 사용할지 여부입니다. 기본값은 False이고 Pure Python AMQP 라이브러리는 기본 전송으로 사용됩니다.
- socket_timeout
- float
연결의 기본 소켓이 시간이 초과되기 전에 데이터를 보내고 받을 때 대기해야 하는 시간(초)입니다. 기본값은 TransportType.Amqp의 경우 0.2이고 TransportType.AmqpOverWebsocket의 경우 1입니다. 쓰기 시간 초과로 인해 EventHubsConnectionError 오류가 발생하는 경우 기본값보다 큰 값을 전달해야 할 수 있습니다. 이는 고급 사용 시나리오용이며 일반적으로 기본값으로 충분해야 합니다.
예제
EventHubProducerClient의 새 instance 만듭니다.
import os
from azure.eventhub.aio import EventHubProducerClient, EventHubSharedKeyCredential
fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
eventhub_name = os.environ['EVENT_HUB_NAME']
shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
shared_access_key = os.environ['EVENT_HUB_SAS_KEY']
producer = EventHubProducerClient(fully_qualified_namespace=fully_qualified_namespace,
eventhub_name=eventhub_name,
credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key))
메서드
close |
생산자 클라이언트 기본 AMQP 연결 및 링크를 닫습니다. |
create_batch |
max_size_in_bytes 제한되는 모든 콘텐츠의 최대 크기를 사용하여 EventDataBatch 개체를 만듭니다. max_size_in_bytes 서비스에서 정의한 최대 허용 메시지 크기보다 크지 않아야 합니다. |
flush |
버퍼링 모드만. 클라이언트가 버퍼링 모드에서 작동하는 경우 즉시 전송될 버퍼의 이벤트를 플러시합니다. |
from_connection_string |
연결 문자열 EventHubProducerClient를 만듭니다. |
get_buffered_event_count |
버퍼링되고 지정된 파티션에 대해 게시되기를 기다리는 이벤트 수입니다. 버퍼링되지 않은 모드에서 None을 반환합니다. 참고: 이벤트 버퍼는 백그라운드 코루틴에서 처리되므로 이 API에서 보고하는 버퍼의 이벤트 수는 근사치로만 간주되어야 하며 디버깅에만 사용하는 것이 좋습니다. 버퍼링된 이벤트가 없는 파티션 ID의 경우 해당 파티션 ID가 실제로 이벤트 허브 내에 있는지 여부에 관계없이 0이 반환됩니다. |
get_eventhub_properties |
이벤트 허브의 속성을 가져옵니다. 반환된 사전의 키는 다음과 같습니다.
|
get_partition_ids |
이벤트 허브의 파티션 ID를 가져옵니다. |
get_partition_properties |
지정된 파티션의 속성을 가져옵니다. 속성 사전의 키는 다음과 같습니다.
|
send_batch |
이벤트 데이터의 일괄 처리를 보냅니다. 기본적으로 메서드는 승인이 수신되거나 작업이 시간 초과될 때까지 차단됩니다. EventHubProducerClient 가 버퍼링된 모드에서 실행되도록 구성된 경우 메서드는 이벤트를 로컬 버퍼에 큐에 넣고 반환합니다. 생산자는 백그라운드에서 자동 전송을 수행합니다. buffered_mode False이면 on_error 콜백은 선택 사항이며 오류는 다음과 같이 처리됩니다.
buffered_mode True이면 on_error 콜백이 필요하며 다음과 같이 오류가 처리됩니다.
버퍼링 모드에서 일괄 처리를 보내는 것은 그대로 유지되고 단일 단위로 전송됩니다. 일괄 처리는 다시 정렬되지 않습니다. 이로 인해 이벤트를 보내는 비효율성이 발생할 수 있습니다. EventData 또는 AmqpAnnotatedMessage의 한정된 목록을 보내고 이벤트 허브 프레임 크기 제한 내에 있다는 것을 알고 있는 경우 send_batch 호출을 사용하여 보낼 수 있습니다. 그렇지 않으면 를 사용하여 create_batchEventDataBatch 를 만들고 크기 제한까지 EventData 또는 AmqpAnnotatedMessage 를 일괄 처리에 하나씩 추가한 다음, 이 메서드를 호출하여 일괄 처리를 보냅니다. |
send_event |
이벤트 데이터를 보냅니다. 기본적으로 메서드는 승인이 수신되거나 작업이 시간 초과될 때까지 차단됩니다. EventHubProducerClient 가 버퍼링된 모드에서 실행되도록 구성된 경우 메서드는 이벤트를 로컬 버퍼에 큐에 넣고 반환합니다. 생산자는 백그라운드에서 자동 일괄 처리 및 전송을 수행합니다. buffered_mode False이면 on_error 콜백이 선택 사항이며 다음과 같이 오류가 처리됩니다. * 생산자 클라이언트 인스턴스화 중에 on_error 콜백이 전달되는 경우
buffered_mode True이면 on_error 콜백이 필요하며 다음과 같이 오류가 처리됩니다. * 지정된 시간 제한 내에 이벤트가 큐에 추가되지 않으면 오류가 직접 발생합니다.
|
close
생산자 클라이언트 기본 AMQP 연결 및 링크를 닫습니다.
async close(*, flush: bool = True, **kwargs: Any) -> None
매개 변수
- flush
- bool
버퍼링 모드만. True로 설정하면 버퍼의 이벤트가 즉시 전송됩니다. 기본값은 True입니다.
반환 형식
예외
플러시를 True로 설정하거나 버퍼링 모드에서 기본 AMQP 연결을 닫는 경우 버퍼를 플러시 할 때 오류가 발생한 경우.
예제
처리기를 닫습니다.
import os
from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub import EventData
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
producer = EventHubProducerClient.from_connection_string(
conn_str=event_hub_connection_str,
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
try:
event_data_batch = await producer.create_batch()
while True:
try:
event_data_batch.add(EventData('Message inside EventBatchData'))
except ValueError:
# The EventDataBatch object reaches its max_size.
# You can send the full EventDataBatch object and create a new one here.
break
await producer.send_batch(event_data_batch)
finally:
# Close down the producer handler.
await producer.close()
create_batch
max_size_in_bytes 제한되는 모든 콘텐츠의 최대 크기를 사용하여 EventDataBatch 개체를 만듭니다.
max_size_in_bytes 서비스에서 정의한 최대 허용 메시지 크기보다 크지 않아야 합니다.
async create_batch(*, partition_id: str | None = None, partition_key: str | None = None, max_size_in_bytes: int | None = None) -> EventDataBatch
반환 형식
예외
플러시를 True로 설정하거나 버퍼링 모드에서 기본 AMQP 연결을 닫는 경우 버퍼를 플러시 할 때 오류가 발생한 경우.
예제
제한된 크기 내에서 EventDataBatch 개체 만들기
from azure.eventhub import EventData
event_data_batch = await producer.create_batch()
while True:
try:
event_data_batch.add(EventData('Message inside EventBatchData'))
except ValueError:
# The EventDataBatch object reaches its max_size.
# You can send the full EventDataBatch object and create a new one here.
break
flush
버퍼링 모드만. 클라이언트가 버퍼링 모드에서 작동하는 경우 즉시 전송될 버퍼의 이벤트를 플러시합니다.
async flush(**kwargs: Any) -> None
매개 변수
반환 형식
예외
생산자가 버퍼링 모드에서 지정된 시간 제한 내에서 버퍼를 플러시하지 못하는 경우
from_connection_string
연결 문자열 EventHubProducerClient를 만듭니다.
from_connection_string(conn_str: str, *, eventhub_name: str | None = None, buffered_mode: bool = False, on_error: Callable[[List[EventData | AmqpAnnotatedMessage], str | None, Exception], Awaitable[None]] | None = None, on_success: Callable[[List[EventData | AmqpAnnotatedMessage], str | None], Awaitable[None]] | None = None, max_buffer_length: int | None = None, max_wait_time: float | None = None, logging_enable: bool = False, http_proxy: Dict[str, str | int] | None = None, auth_timeout: float = 60, user_agent: str | None = None, retry_total: int = 3, transport_type: TransportType = TransportType.Amqp, **kwargs: Any) -> EventHubProducerClient
매개 변수
- eventhub_name
- str
클라이언트를 연결할 특정 이벤트 허브의 경로입니다.
- buffered_mode
- bool
True이면 생산자 클라이언트가 버퍼에서 이벤트를 수집하여 효율적으로 일괄 처리한 다음 게시합니다. 기본값은 False입니다.
일괄 처리가 성공적으로 게시되면 호출할 콜백입니다. 콜백은 두 개의 매개 변수를 사용합니다.
events: 성공적으로 게시된 이벤트 목록
partition_id: 목록의 이벤트가 게시된 파티션 ID입니다.
콜백 함수는 on_success(이벤트, partition_id)와 같이 정의해야 합니다. buffered_mode False인 경우 선택 사항이지만 buffered_mode True인 경우 필요합니다.
일괄 처리를 게시하지 못한 후 호출할 콜백입니다. 콜백 함수는 다음과 같이 정의해야 합니다. on_error(이벤트, partition_id, 오류). 여기서는 다음과 같습니다.
events: 게시하지 못한 이벤트 목록입니다.
partition_id: 목록의 이벤트가 및 에 게시하려고 시도한 파티션 ID입니다.
error: 송신 실패와 관련된 예외입니다.
buffered_mode False이면 on_error 콜백은 선택 사항이며 오류는 다음과 같이 처리됩니다.
생산자 클라이언트 인스턴스화 중에 on_error 콜백이 전달되는 경우
그러면 오류 정보가 on_error 콜백에 전달되고 이 콜백이 호출됩니다.
클라이언트 인스턴스화 중에 on_error 콜백이 전달되지 않는 경우
그러면 오류가 기본적으로 발생합니다.
buffered_mode True이면 on_error 콜백이 필요하며 오류는 다음과 같이 처리됩니다.
지정된 시간 제한 내에 이벤트가 큐에 추가되지 않으면 오류가 직접 발생합니다.
큐에 성공적으로 큐에 추가한 후 이벤트가 전송되지 않으면 on_error 콜백이 호출됩니다.
- max_buffer_length
- int
버퍼링 모드만. 플러시를 트리거하기 전에 버퍼링할 수 있는 파티션당 총 이벤트 수입니다. 기본값은 버퍼링 모드에서 1500입니다.
버퍼링 모드만. 게시하기 전에 버퍼의 이벤트로 일괄 처리가 빌드될 때까지 대기하는 시간입니다. 기본값은 버퍼링 모드에서 1입니다.
- logging_enable
- bool
로거에 네트워크 추적 로그를 출력할지 여부입니다. 기본값은 False입니다.
- http_proxy
- dict
HTTP 프록시 설정. 'proxy_hostname'(str value) 및 'proxy_port'(int value) 키가 있는 사전이어야 합니다. 또한 'username', 'password' 키도 있을 수 있습니다.
- auth_timeout
- float
서비스에서 토큰이 승인될 때까지 대기하는 시간(초)입니다. 기본값은 60초입니다. 0으로 설정하면 클라이언트에서 시간 제한이 적용되지 않습니다.
- user_agent
- str
지정된 경우 사용자 에이전트 문자열 앞에 추가됩니다.
- retry_total
- int
오류가 발생할 때 실패한 작업을 다시 실행하려는 총 시도 횟수입니다. 기본값은 3입니다.
- retry_backoff_factor
- float
두 번째 시도 후 시도 간에 적용할 백오프 요소입니다(대부분의 오류는 지연 없이 두 번째 시도에 의해 즉시 해결됨). 고정 모드에서는 {backoff factor}에 대해 재시도 정책이 항상 절전 모드로 전환됩니다. '지수' 모드에서 재시도 정책은 {backoff factor} * (2 ** ({총 재시도 횟수} - 1)) 초 동안 절전 모드로 전환됩니다. backoff_factor 0.1이면 재시도 사이에 [0.0s, 0.2s, 0.4s, ...]에 대해 재시도가 절전 모드로 전환됩니다. 기본값은 0.8입니다.
- retry_backoff_max
- float
최대 백오프 시간입니다. 기본값은 120초(2분)입니다.
- retry_mode
- str
재시도 사이의 지연 동작입니다. 지원되는 값은 'fixed' 또는 'exponential'입니다. 여기서 기본값은 '지수'입니다.
- idle_timeout
- float
시간 제한(초)으로, 활동이 없으면 이 클라이언트가 기본 연결을 닫습니다. 기본적으로 값은 None입니다. 즉, 서비스에서 시작하지 않는 한 비활성으로 인해 클라이언트가 종료되지 않습니다.
- transport_type
- TransportType
Event Hubs 서비스와 통신하는 데 사용할 전송 프로토콜의 유형입니다. 기본값은 TransportType.Amqp 이며 이 경우 포트 5671이 사용됩니다. 네트워크 환경에서 포트 5671을 사용할 수 없거나 차단된 경우 통신을 위해 포트 443을 사용하는 대신 TransportType.AmqpOverWebsocket 을 사용할 수 있습니다.
Event Hubs 서비스에 대한 연결을 설정하는 데 사용할 사용자 지정 엔드포인트 주소로, 호스트 환경에 필요한 애플리케이션 게이트웨이 또는 기타 경로를 통해 네트워크 요청을 라우팅할 수 있습니다. 기본값은 None입니다. 형식은 "sb://< custom_endpoint_hostname>:<custom_endpoint_port>"와 같습니다. 포트가 custom_endpoint_address 지정되지 않은 경우 기본적으로 포트 443이 사용됩니다.
연결 엔드포인트의 ID를 인증하는 데 사용되는 SSL 인증서의 사용자 지정 CA_BUNDLE 파일 경로입니다. 기본값은 None입니다. 이 경우 certifi.where() 가 사용됩니다.
- uamqp_transport
- bool
uamqp 라이브러리를 기본 전송으로 사용할지 여부입니다. 기본값은 False이고 Pure Python AMQP 라이브러리는 기본 전송으로 사용됩니다.
반환 형식
예외
플러시를 True로 설정하거나 버퍼링 모드에서 기본 AMQP 연결을 닫는 경우 버퍼를 플러시 할 때 오류가 발생한 경우.
예제
연결 문자열 EventHubProducerClient의 새 instance 만듭니다.
import os
from azure.eventhub.aio import EventHubProducerClient
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
producer = EventHubProducerClient.from_connection_string(
conn_str=event_hub_connection_str,
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
get_buffered_event_count
버퍼링되고 지정된 파티션에 대해 게시되기를 기다리는 이벤트 수입니다. 버퍼링되지 않은 모드에서 None을 반환합니다. 참고: 이벤트 버퍼는 백그라운드 코루틴에서 처리되므로 이 API에서 보고하는 버퍼의 이벤트 수는 근사치로만 간주되어야 하며 디버깅에만 사용하는 것이 좋습니다. 버퍼링된 이벤트가 없는 파티션 ID의 경우 해당 파티션 ID가 실제로 이벤트 허브 내에 있는지 여부에 관계없이 0이 반환됩니다.
get_buffered_event_count(partition_id: str) -> int | None
매개 변수
반환 형식
예외
플러시를 True로 설정하거나 버퍼링 모드에서 기본 AMQP 연결을 닫는 경우 버퍼를 플러시 할 때 오류가 발생한 경우.
get_eventhub_properties
이벤트 허브의 속성을 가져옵니다.
반환된 사전의 키는 다음과 같습니다.
eventhub_name (str)
created_at (UTC datetime.datetime)
partition_ids (list[str])
async get_eventhub_properties() -> Dict[str, Any]
반환
이벤트 허브에 대한 정보가 포함된 사전입니다.
반환 형식
예외
get_partition_ids
이벤트 허브의 파티션 ID를 가져옵니다.
async get_partition_ids() -> List[str]
반환
파티션 ID 목록입니다.
반환 형식
예외
get_partition_properties
지정된 파티션의 속성을 가져옵니다.
속성 사전의 키는 다음과 같습니다.
eventhub_name (str)
id (str)
beginning_sequence_number (int)
last_enqueued_sequence_number (int)
last_enqueued_offset (str)
last_enqueued_time_utc (UTC datetime.datetime)
is_empty (부울)
async get_partition_properties(partition_id: str) -> Dict[str, Any]
매개 변수
반환
파티션 속성의 받아쓰기입니다.
반환 형식
예외
send_batch
이벤트 데이터의 일괄 처리를 보냅니다. 기본적으로 메서드는 승인이 수신되거나 작업이 시간 초과될 때까지 차단됩니다. EventHubProducerClient 가 버퍼링된 모드에서 실행되도록 구성된 경우 메서드는 이벤트를 로컬 버퍼에 큐에 넣고 반환합니다. 생산자는 백그라운드에서 자동 전송을 수행합니다.
buffered_mode False이면 on_error 콜백은 선택 사항이며 오류는 다음과 같이 처리됩니다.
생산자 클라이언트 인스턴스화 중에 on_error 콜백이 전달되는 경우
그러면 오류 정보가 on_error 콜백에 전달되고, 이 콜백이 호출됩니다.
클라이언트 인스턴스화 중에 on_error 콜백이 전달되지 않는 경우
그러면 오류가 기본적으로 발생합니다.
buffered_mode True이면 on_error 콜백이 필요하며 다음과 같이 오류가 처리됩니다.
이벤트가 지정된 시간 제한 내에 큐에 추가되지 않으면 오류가 직접 발생합니다.
성공적으로 큐에 추가한 후 이벤트가 전송되지 않으면 on_error 콜백이 호출됩니다.
버퍼링 모드에서 일괄 처리를 보내는 것은 그대로 유지되고 단일 단위로 전송됩니다. 일괄 처리는 다시 정렬되지 않습니다. 이로 인해 이벤트를 보내는 비효율성이 발생할 수 있습니다.
EventData 또는 AmqpAnnotatedMessage의 한정된 목록을 보내고 이벤트 허브 프레임 크기 제한 내에 있다는 것을 알고 있는 경우 send_batch 호출을 사용하여 보낼 수 있습니다. 그렇지 않으면 를 사용하여 create_batchEventDataBatch 를 만들고 크기 제한까지 EventData 또는 AmqpAnnotatedMessage 를 일괄 처리에 하나씩 추가한 다음, 이 메서드를 호출하여 일괄 처리를 보냅니다.
async send_batch(event_data_batch: EventDataBatch | List[EventData | AmqpAnnotatedMessage], **kwargs: Any) -> None
매개 변수
- event_data_batch
- Union[EventDataBatch, List[Union[EventData, AmqpAnnotatedMessage]]
보낼 EventDataBatch 개체 또는 일괄 처리로 보낼 EventData 목록입니다. 목록 또는 EventDataBatch의 모든 EventData 또는 AmqpAnnotatedMessage가 동일한 파티션에 배치됩니다.
- timeout
- float
이벤트 데이터를 버퍼링되지 않은 모드로 보내는 최대 대기 시간 또는 버퍼링 모드에서 이벤트 데이터를 버퍼에 큐에 넣기 위한 최대 대기 시간입니다. 버퍼링되지 않은 모드에서는 생산자를 만들 때 지정된 기본 대기 시간이 사용됩니다. 버퍼링 모드에서 기본 대기 시간은 없음입니다.
- partition_id
- str
보낼 특정 파티션 ID입니다. 기본값은 없음입니다. 이 경우 서비스는 라운드 로빈을 사용하여 모든 파티션에 할당됩니다. partition_id 지정되고 EventDataBatch 자체가 partition_id 때문에 event_data_batch EventDataBatch이면 TypeError가 발생합니다.
- partition_key
- str
지정된 partition_key 사용하여 이벤트 데이터는 서비스에서 결정한 이벤트 허브의 특정 파티션으로 전송됩니다. Partition_key 지정되고 EventDataBatch 자체가 partition_key 때문에 event_data_batch EventDataBatch이면 TypeError가 발생합니다. partition_id 및 partition_key 모두 제공되면 partition_id 우선합니다. 경고: 이벤트 허브 서비스에서 partition_key 무시되고 이벤트가 라운드 로빈을 사용하여 모든 파티션에 할당되므로 전송할 이벤트에 대한 문자열이 아닌 값의 partition_key 설정하는 것은 권장되지 않습니다. 또한 partition_key 문자열 형식일 것으로 예상되는 이벤트를 소비하기 위한 SDK가 있으며, 문자열이 아닌 값을 구문 분석하지 못할 수 있습니다.
반환 형식
예외
이벤트가 버퍼링되지 않은 모드로 전송되기 전에 시간 제한 매개 변수로 지정된 값이 경과하거나 이벤트를 버퍼링된 모드로 버퍼링된 에 큐에 넣을 수 있는 경우
예제
이벤트 데이터를 비동기적으로 보냅니다.
async with producer:
event_data_batch = await producer.create_batch()
while True:
try:
event_data_batch.add(EventData('Message inside EventBatchData'))
except ValueError:
# The EventDataBatch object reaches its max_size.
# You can send the full EventDataBatch object and create a new one here.
break
await producer.send_batch(event_data_batch)
send_event
이벤트 데이터를 보냅니다. 기본적으로 메서드는 승인이 수신되거나 작업이 시간 초과될 때까지 차단됩니다. EventHubProducerClient 가 버퍼링된 모드에서 실행되도록 구성된 경우 메서드는 이벤트를 로컬 버퍼에 큐에 넣고 반환합니다. 생산자는 백그라운드에서 자동 일괄 처리 및 전송을 수행합니다.
buffered_mode False이면 on_error 콜백이 선택 사항이며 다음과 같이 오류가 처리됩니다. * 생산자 클라이언트 인스턴스화 중에 on_error 콜백이 전달되는 경우
then error information will be passed to the *on_error* callback, which will then be called.
* If an *on_error* callback is not passed in during client instantiation,
then the error will be raised by default.
buffered_mode True이면 on_error 콜백이 필요하며 다음과 같이 오류가 처리됩니다. * 지정된 시간 제한 내에 이벤트가 큐에 추가되지 않으면 오류가 직접 발생합니다.
* If events fail to send after enqueuing successfully, the *on_error* callback will be called.
async send_event(event_data: EventData | AmqpAnnotatedMessage, **kwargs: Any) -> None
매개 변수
- timeout
- float
버퍼링되지 않은 모드에서 이벤트 데이터를 보내는 최대 대기 시간 또는 버퍼링 모드에서 이벤트 데이터를 버퍼에 큐에 넣기 위한 최대 대기 시간입니다. 버퍼링되지 않은 모드에서는 생산자가 생성될 때 지정된 기본 대기 시간이 사용됩니다. 버퍼링 모드에서 기본 대기 시간은 없음입니다.
- partition_id
- str
보낼 특정 파티션 ID입니다. 기본값은 없음입니다. 이 경우 서비스는 라운드 로빈을 사용하여 모든 파티션에 할당됩니다. Partition_id 지정되고 EventDataBatch 자체가 partition_id 때문에 event_data_batch EventDataBatch인 경우 TypeError가 발생합니다.
- partition_key
- str
지정된 partition_key 사용하여 이벤트 데이터는 서비스에서 결정한 이벤트 허브의 특정 파티션으로 전송됩니다. Partition_key 지정되고 EventDataBatch 자체가 partition_key 때문에 event_data_batch EventDataBatch인 경우 TypeError가 발생합니다. partition_id 및 partition_key 모두 제공되면 partition_id 우선적으로 적용됩니다. 경고: 이벤트 허브 서비스에서 partition_key 무시되고 이벤트가 라운드 로빈을 사용하여 모든 파티션에 할당되므로 전송할 이벤트에 대한 문자열이 아닌 값의 partition_key 설정하는 것은 권장되지 않습니다. 또한 partition_key 문자열 형식일 것으로 예상되는 이벤트를 소비하기 위한 SDK가 있으며, 문자열이 아닌 값을 구문 분석하지 못할 수 있습니다.
반환 형식
예외
시간 제한 매개 변수로 지정된 값이 경과한 경우 이벤트가 버퍼링되지 않은 모드로 전송되거나 버퍼링된 모드에서 버퍼링된 에 이벤트를 큐에 넣을 수 없습니다.
특성
total_buffered_event_count
Azure SDK for Python