다음을 통해 공유


EventHubProducerClient 클래스

EventHubProducerClient 클래스는 Azure Event Hubs 서비스로 이벤트를 보내기 위한 상위 수준 인터페이스를 정의합니다.

상속
azure.eventhub._client_base.ClientBase
EventHubProducerClient

생성자

EventHubProducerClient(fully_qualified_namespace: str, eventhub_name: str, credential: CredentialTypes, *, buffered_mode: Literal[False] = False, **kwargs: Any)

매개 변수

fully_qualified_namespace
str
필수

Event Hubs 네임스페이스의 정규화된 호스트 이름입니다. 이는 .servicebus.windows.net 유사할 수 있습니다.

eventhub_name
str
필수

클라이언트를 연결할 특정 이벤트 허브의 경로입니다.

credential
TokenCredential 또는 AzureSasCredential 또는 AzureNamedKeyCredential
필수

토큰을 가져오기 위한 특정 인터페이스를 구현하는 인증에 사용되는 자격 증명 개체입니다. *get_token(자체, 범위) 메서드를 구현하는 개체 및 azure-identity 라이브러리에서 생성된 또는 자격 증명 개체를 허용EventHubSharedKeyCredential합니다.

buffered_mode
bool

True이면 생산자 클라이언트는 버퍼에서 이벤트를 수집하여 효율적으로 일괄 처리한 다음 게시합니다. 기본값은 False입니다.

buffer_concurrency
<xref:ThreadPoolExecutor> 또는 int 또는 None

이벤트를 게시하는 데 사용할 ThreadPoolExecutor 또는 ThreadPoolExecutor의 작업자 수입니다. 기본값은 없음이며, 기본 작업자 수가 있는 ThreadPoolExecutor는 에 따라 만들어집니다. https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor

on_success
Optional[Callable[[<xref:SendEventTypes>, Optional[str]], None]]

일괄 처리가 성공적으로 게시되면 호출할 콜백입니다. 콜백은 다음 두 개의 매개 변수를 사용합니다.

  • events: 성공적으로 게시된 이벤트 목록

  • partition_id: 목록의 이벤트가 게시된 파티션 ID입니다.

콜백 함수는 on_success(이벤트, partition_id)와 같이 정의되어야 합니다. buffered_mode False인 경우buffered_mode True인 경우 필수입니다.

on_error
Optional[Callable[[<xref:SendEventTypes>, Optional[str], Exception], None]]

일괄 처리를 게시하지 못한 후 호출할 콜백입니다. 콜백 함수는 다음과 같이 정의되어야 합니다. on_error(이벤트, partition_id, 오류). 여기서는 다음과 같습니다.

  • events: 게시하지 못한 이벤트 목록입니다.

  • partition_id: 목록의 이벤트가 및 에 게시하려고 시도한 파티션 ID입니다.

  • error: 송신 실패와 관련된 예외입니다.

buffered_mode False이면 on_error 콜백은 선택 사항이며 오류는 다음과 같이 처리됩니다.

  • 생산자 클라이언트 인스턴스화 중에 on_error 콜백이 전달되는 경우

    그러면 오류 정보가 on_error 콜백에 전달되고, 이 콜백이 호출됩니다.

  • 클라이언트 인스턴스화 중에 on_error 콜백이 전달되지 않는 경우

    그러면 오류가 기본적으로 발생합니다.

buffered_mode True이면 on_error 콜백이 필요하며 다음과 같이 오류가 처리됩니다.

  • 이벤트가 지정된 시간 제한 내에 큐에 추가되지 않으면 오류가 직접 발생합니다.

  • 성공적으로 큐에 추가한 후 이벤트가 전송되지 않으면 on_error 콜백이 호출됩니다.

max_buffer_length
int

버퍼링 모드에만 해당합니다. 플러시를 트리거하기 전에 버퍼링할 수 있는 파티션당 총 이벤트 수입니다. 기본값은 버퍼링 모드에서 1500입니다.

max_wait_time
Optional[float]

버퍼링 모드에만 해당합니다. 게시하기 전에 버퍼의 이벤트로 일괄 처리가 빌드될 때까지 대기하는 시간입니다. 기본값은 버퍼링 모드에서 1입니다.

logging_enable
bool

로거에 네트워크 추적 로그를 출력할지 여부입니다. 기본값은 False입니다.

auth_timeout
float

서비스에서 토큰이 승인될 때까지 대기하는 시간(초)입니다. 기본값은 60초입니다. 0으로 설정하면 클라이언트에서 시간 제한이 적용되지 않습니다.

user_agent
str

지정된 경우 사용자 에이전트 문자열 앞에 추가됩니다.

retry_total
int

오류가 발생할 때 실패한 작업을 다시 실행하려는 총 시도 횟수입니다. 기본값은 3입니다.

retry_backoff_factor
float

두 번째 시도 후 시도 간에 적용할 백오프 요소입니다(대부분의 오류는 지연 없이 두 번째 시도에 의해 즉시 해결됨). 고정 모드에서는 {backoff factor}에 대해 재시도 정책이 항상 절전 모드로 전환됩니다. '지수' 모드에서 재시도 정책은 {backoff factor} * (2 ** ({총 재시도 횟수 } - 1)) 초 동안 절전 모드로 전환됩니다. backoff_factor 0.1이면 재시도 사이에 [0.0s, 0.2s, 0.4s, ...]에 대해 재시도가 절전 모드로 전환됩니다. 기본값은 0.8입니다.

retry_backoff_max
float

최대 백오프 시간입니다. 기본값은 120초(2분)입니다.

retry_mode
str

재시도 사이의 지연 동작입니다. 지원되는 값은 '고정' 또는 '지수'이며, 여기서 기본값은 '지수'입니다.

idle_timeout
float

시간 제한(초)으로, 활동이 없으면 이 클라이언트가 기본 연결을 닫습니다. 기본적으로 값은 None입니다. 즉, 서비스에서 시작하지 않는 한 비활성으로 인해 클라이언트가 종료되지 않습니다.

transport_type
TransportType

Event Hubs 서비스와 통신하는 데 사용할 전송 프로토콜의 유형입니다. 기본값은 TransportType.Amqp 이며 이 경우 포트 5671이 사용됩니다. 네트워크 환경에서 포트 5671을 사용할 수 없거나 차단된 경우 통신을 위해 포트 443을 사용하는 TransportType.AmqpOverWebsocket 을 대신 사용할 수 있습니다.

http_proxy
Dict

HTTP 프록시 설정. 'proxy_hostname'(str value) 및'proxy_port'(int value) 키가 있는 사전이어야 합니다. 또한 'username', 'password' 키도 있을 수 있습니다.

custom_endpoint_address
Optional[str]

Event Hubs 서비스에 대한 연결을 설정하는 데 사용할 사용자 지정 엔드포인트 주소로, 호스트 환경에 필요한 애플리케이션 게이트웨이 또는 기타 경로를 통해 네트워크 요청을 라우팅할 수 있습니다. 기본값은 None입니다. 형식은 "sb://< custom_endpoint_hostname>:<custom_endpoint_port>"와 같습니다. 포트가 custom_endpoint_address 지정되지 않은 경우 기본적으로 포트 443이 사용됩니다.

connection_verify
Optional[str]

연결 엔드포인트의 ID를 인증하는 데 사용되는 SSL 인증서의 사용자 지정 CA_BUNDLE 파일 경로입니다. 기본값은 none입니다. 이 경우 certifi.where() 가 사용됩니다.

uamqp_transport
bool

uamqp 라이브러리를 기본 전송으로 사용할지 여부입니다. 기본값은 False이고 Pure Python AMQP 라이브러리는 기본 전송으로 사용됩니다.

socket_timeout
float

연결의 기본 소켓이 시간을 초과하기 전에 데이터를 보내고 받을 때 대기해야 하는 시간(초)입니다. 기본값은 TransportType.Amqp의 경우 0.2이고 TransportType.AmqpOverWebsocket의 경우 1입니다. 쓰기 시간 초과로 인해 EventHubsConnectionError 오류가 발생하는 경우 기본값보다 큰 를 전달해야 할 수 있습니다. 이는 고급 사용 시나리오용이며 일반적으로 기본값으로 충분해야 합니다.

예제

EventHubProducerClient의 새 instance 만듭니다.


   import os
   from azure.eventhub import EventHubProducerClient, EventHubSharedKeyCredential

   fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
   shared_access_key = os.environ['EVENT_HUB_SAS_KEY']

   credential = EventHubSharedKeyCredential(shared_access_policy, shared_access_key)
   producer = EventHubProducerClient(
       fully_qualified_namespace=fully_qualified_namespace,
       eventhub_name=eventhub_name,  # EventHub name should be specified if it doesn't show up in connection string.
       credential=credential
   )

메서드

close

생산자 클라이언트 기본 AMQP 연결 및 링크를 닫습니다.

create_batch

max_size_in_bytes 제한되는 모든 콘텐츠의 최대 크기를 사용하여 EventDataBatch 개체를 만듭니다.

max_size_in_bytes 서비스에서 정의한 최대 허용 메시지 크기보다 크지 않아야 합니다.

flush

버퍼링 모드에만 해당합니다. 클라이언트가 버퍼링 모드에서 작업하는 경우 즉시 전송될 버퍼의 이벤트를 플러시합니다.

from_connection_string

연결 문자열 EventHubProducerClient를 만듭니다.

get_buffered_event_count

버퍼링되고 지정된 파티션에 대해 게시되기를 기다리는 이벤트 수입니다. 버퍼링되지 않은 모드에서 None을 반환합니다. 참고: 이벤트 버퍼는 백그라운드 스레드에서 처리되므로 이 API에서 보고하는 버퍼의 이벤트 수는 근사치로만 간주되어야 하며 디버깅에만 사용하는 것이 좋습니다. 버퍼링된 이벤트가 없는 파티션 ID의 경우 해당 파티션 ID가 실제로 이벤트 허브 내에 있는지 여부에 관계없이 0이 반환됩니다.

get_eventhub_properties

이벤트 허브의 속성을 가져옵니다.

반환된 사전의 키는 다음과 같습니다.

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_partition_ids

이벤트 허브의 파티션 ID를 가져옵니다.

get_partition_properties

지정된 파티션의 속성을 가져옵니다.

속성 사전의 키는 다음과 같습니다.

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (부울)

send_batch

이벤트 데이터의 일괄 처리를 보냅니다. 기본적으로 메서드는 승인이 수신되거나 작업이 시간 초과될 때까지 차단됩니다. EventHubProducerClient 가 버퍼링 모드에서 실행되도록 구성된 경우 메서드는 지정된 시간 내에 이벤트를 버퍼로 큐에 넣고 반환합니다. 생산자는 버퍼링 모드로 백그라운드에서 자동 전송을 수행합니다.

buffered_mode False이면 on_error 콜백은 선택 사항이며 오류는 다음과 같이 처리됩니다.

  • 생산자 클라이언트 인스턴스화 중에 on_error 콜백이 전달되는 경우

    그러면 오류 정보가 on_error 콜백에 전달되고, 이 콜백이 호출됩니다.

  • 클라이언트 인스턴스화 중에 on_error 콜백이 전달되지 않는 경우

    그러면 오류가 기본적으로 발생합니다.

buffered_mode True이면 on_error 콜백이 필요하며 다음과 같이 오류가 처리됩니다.

  • 이벤트가 지정된 시간 제한 내에 큐에 추가되지 않으면 오류가 직접 발생합니다.

  • 성공적으로 큐에 추가한 후 이벤트가 전송되지 않으면 on_error 콜백이 호출됩니다.

버퍼링 모드에서 일괄 처리를 보내는 것은 그대로 유지되고 단일 단위로 전송됩니다. 일괄 처리는 다시 정렬되지 않습니다. 이로 인해 이벤트를 보내는 비효율성이 발생할 수 있습니다.

EventData 또는 AmqpAnnotatedMessage의 한정된 목록을 보내고 이벤트 허브 프레임 크기 제한 내에 있다는 것을 알고 있는 경우 send_batch 호출을 사용하여 보낼 수 있습니다. 그렇지 않으면 를 사용하여 create_batchEventDataBatch 를 만들고 크기 제한까지 EventData 또는 AmqpAnnotatedMessage 를 일괄 처리에 하나씩 추가한 다음, 이 메서드를 호출하여 일괄 처리를 보냅니다.

send_event

이벤트 데이터를 보냅니다. 기본적으로 메서드는 승인이 수신되거나 작업이 시간 초과될 때까지 차단됩니다. EventHubProducerClient 가 버퍼링 모드에서 실행되도록 구성된 경우 메서드는 지정된 시간 내에 이벤트를 버퍼로 큐에 넣고 반환합니다. 생산자는 버퍼링 모드로 백그라운드에서 자동 전송을 수행합니다.

buffered_mode False이면 on_error 콜백이 선택 사항이며 다음과 같이 오류가 처리됩니다. * 생산자 클라이언트 인스턴스화 중에 on_error 콜백이 전달되는 경우

       then error information will be passed to the *on_error* callback, which will then be called.

  * If an *on_error* callback is not passed in during client instantiation,

       then the error will be raised by default.

buffered_mode True이면 on_error 콜백이 필요하며 다음과 같이 오류가 처리됩니다. * 지정된 시간 제한 내에 이벤트가 큐에 추가되지 않으면 오류가 직접 발생합니다.

  * If events fail to send after enqueuing successfully, the *on_error* callback will be called.

close

생산자 클라이언트 기본 AMQP 연결 및 링크를 닫습니다.

close(*, flush: bool = True, **kwargs: Any) -> None

매개 변수

flush
bool

버퍼링 모드에만 해당합니다. True로 설정하면 버퍼의 이벤트가 즉시 전송됩니다. 기본값은 True입니다.

timeout
float 또는 None

버퍼링 모드에만 해당합니다. 생산자를 닫을 시간 제한입니다. 기본값은 없음으로, 시간 제한이 없음을 의미합니다.

반환 형식

예외

플러시를 True로 설정하거나 버퍼링 모드에서 기본 AMQP 연결을 닫는 경우 버퍼를 플러시 할 때 오류가 발생한 경우.

예제

클라이언트를 닫습니다.


   import os
   from azure.eventhub import EventHubProducerClient, EventData

   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']

   producer = EventHubProducerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )
   try:
       event_data_batch = producer.create_batch()

       while True:
           try:
               event_data_batch.add(EventData('Message inside EventBatchData'))
           except ValueError:
               # EventDataBatch object reaches max_size.
               # New EventDataBatch object can be created here to send more data
               break

       producer.send_batch(event_data_batch)
   finally:
       # Close down the producer handler.
       producer.close()

create_batch

max_size_in_bytes 제한되는 모든 콘텐츠의 최대 크기를 사용하여 EventDataBatch 개체를 만듭니다.

max_size_in_bytes 서비스에서 정의한 최대 허용 메시지 크기보다 크지 않아야 합니다.

create_batch(**kwargs: Any) -> EventDataBatch

반환 형식

예외

플러시를 True로 설정하거나 버퍼링 모드에서 기본 AMQP 연결을 닫는 경우 버퍼를 플러시 할 때 오류가 발생한 경우.

예제

제한된 크기 내에서 EventDataBatch 개체 만들기


       event_data_batch = producer.create_batch()
       while True:
           try:
               event_data_batch.add(EventData('Message inside EventBatchData'))
           except ValueError:
               # The EventDataBatch object reaches its max_size.
               # You can send the full EventDataBatch object and create a new one here.
               break

flush

버퍼링 모드에만 해당합니다. 클라이언트가 버퍼링 모드에서 작업하는 경우 즉시 전송될 버퍼의 이벤트를 플러시합니다.

flush(**kwargs: Any) -> None

매개 변수

timeout
Optional[float]

버퍼링된 이벤트를 플러시하는 시간 제한, 기본값은 없음입니다. 즉, 시간 제한이 없습니다.

반환 형식

예외

생산자가 버퍼링 모드에서 지정된 시간 제한 내에서 버퍼를 플러시하지 못하는 경우

from_connection_string

연결 문자열 EventHubProducerClient를 만듭니다.

from_connection_string(conn_str: str, *, eventhub_name: str | None = None, buffered_mode: Literal[False] = False, **kwargs: Any) -> EventHubProducerClient

매개 변수

conn_str
str
필수

이벤트 허브의 연결 문자열.

eventhub_name
str

클라이언트를 연결할 특정 이벤트 허브의 경로입니다.

buffered_mode
bool

True이면 생산자 클라이언트가 버퍼에서 이벤트를 수집하여 효율적으로 일괄 처리한 다음 게시합니다. 기본값은 False입니다.

buffer_concurrency
<xref:ThreadPoolExecutor> 또는 int 또는 None

이벤트를 게시하는 데 사용할 ThreadPoolExecutor 또는 ThreadPoolExecutor의 작업자 수입니다. 기본값은 None이며, 기본 작업자 수가 있는 ThreadPoolExecutor는 에 따라 만들어집니다. https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor

on_success
Optional[Callable[[<xref:SendEventTypes>, Optional[str]], None]]

일괄 처리가 성공적으로 게시되면 호출할 콜백입니다. 콜백은 두 개의 매개 변수를 사용합니다.

  • events: 성공적으로 게시된 이벤트 목록

  • partition_id: 목록의 이벤트가 게시된 파티션 ID입니다.

콜백 함수는 on_success(이벤트, partition_id)와 같이 정의해야 합니다. buffered_mode True인 경우 필요하지만 buffered_mode False인 경우 선택 사항입니다.

on_error
Optional[Callable[[<xref:SendEventTypes>, Optional[str], Exception], None]]

일괄 처리를 게시하지 못한 후 호출할 콜백입니다. buffered_mode 가 True인 경우 필요하지만 buffered_mode False인 경우 선택 사항입니다. 콜백 함수는 다음과 같이 정의해야 합니다. on_error(이벤트, partition_id, 오류). 여기서는 다음과 같습니다.

  • events: 게시하지 못한 이벤트 목록입니다.

  • partition_id: 목록의 이벤트가 및 에 게시하려고 시도한 파티션 ID입니다.

  • error: 송신 실패와 관련된 예외입니다.

buffered_mode False이면 on_error 콜백은 선택 사항이며 오류는 다음과 같이 처리됩니다.

  • 생산자 클라이언트 인스턴스화 중에 on_error 콜백이 전달되는 경우

    그러면 오류 정보가 on_error 콜백에 전달되고 이 콜백이 호출됩니다.

  • 클라이언트 인스턴스화 중에 on_error 콜백이 전달되지 않는 경우

    그러면 오류가 기본적으로 발생합니다.

buffered_mode True이면 on_error 콜백이 필요하며 오류는 다음과 같이 처리됩니다.

  • 지정된 시간 제한 내에 이벤트가 큐에 추가되지 않으면 오류가 직접 발생합니다.

  • 큐에 성공적으로 큐에 추가한 후 이벤트가 전송되지 않으면 on_error 콜백이 호출됩니다.

max_buffer_length
int

버퍼링 모드만. 플러시를 트리거하기 전에 버퍼링할 수 있는 파티션당 총 이벤트 수입니다. 기본값은 버퍼링 모드에서 1500입니다.

max_wait_time
Optional[float]

버퍼링 모드만. 게시하기 전에 버퍼의 이벤트로 일괄 처리가 빌드될 때까지 대기하는 시간입니다. 기본값은 버퍼링 모드에서 1입니다.

logging_enable
bool

로거에 네트워크 추적 로그를 출력할지 여부입니다. 기본값은 False입니다.

http_proxy
Dict

HTTP 프록시 설정. 'proxy_hostname'(str value) 및 'proxy_port'(int value) 키가 있는 사전이어야 합니다. 또한 'username', 'password' 키도 있을 수 있습니다.

auth_timeout
float

서비스에서 토큰이 승인될 때까지 대기하는 시간(초)입니다. 기본값은 60초입니다. 0으로 설정하면 클라이언트에서 시간 제한이 적용되지 않습니다.

user_agent
str

지정된 경우 사용자 에이전트 문자열 앞에 추가됩니다.

retry_total
int

오류가 발생할 때 실패한 작업을 다시 실행하려는 총 시도 횟수입니다. 기본값은 3입니다.

retry_backoff_factor
float

두 번째 시도 후 시도 간에 적용할 백오프 요소입니다(대부분의 오류는 지연 없이 두 번째 시도에 의해 즉시 해결됨). 고정 모드에서는 {backoff factor}에 대해 재시도 정책이 항상 절전 모드로 전환됩니다. '지수' 모드에서 재시도 정책은 {backoff factor} * (2 ** ({총 재시도 횟수} - 1)) 초 동안 절전 모드로 전환됩니다. backoff_factor 0.1이면 재시도 사이에 [0.0s, 0.2s, 0.4s, ...]에 대해 재시도가 절전 모드로 전환됩니다. 기본값은 0.8입니다.

retry_backoff_max
float

최대 백오프 시간입니다. 기본값은 120초(2분)입니다.

retry_mode
str

재시도 사이의 지연 동작입니다. 지원되는 값은 'fixed' 또는 'exponential'입니다. 여기서 기본값은 '지수'입니다.

idle_timeout
float

시간 제한(초)으로, 활동이 없으면 이 클라이언트가 기본 연결을 닫습니다. 기본적으로 값은 None입니다. 즉, 서비스에서 시작하지 않는 한 비활성으로 인해 클라이언트가 종료되지 않습니다.

transport_type
TransportType

Event Hubs 서비스와 통신하는 데 사용할 전송 프로토콜의 유형입니다. 기본값은 TransportType.Amqp 이며 이 경우 포트 5671이 사용됩니다. 네트워크 환경에서 포트 5671을 사용할 수 없거나 차단된 경우 통신을 위해 포트 443을 사용하는 대신 TransportType.AmqpOverWebsocket 을 사용할 수 있습니다.

http_proxy

HTTP 프록시 설정. 'proxy_hostname'(str value) 및 'proxy_port'(int value) 키가 있는 사전이어야 합니다. 또한 'username', 'password' 키도 있을 수 있습니다.

custom_endpoint_address
Optional[str]

Event Hubs 서비스에 대한 연결을 설정하는 데 사용할 사용자 지정 엔드포인트 주소로, 호스트 환경에 필요한 애플리케이션 게이트웨이 또는 기타 경로를 통해 네트워크 요청을 라우팅할 수 있습니다. 기본값은 None입니다. 형식은 "sb://< custom_endpoint_hostname>:<custom_endpoint_port>"와 같습니다. 포트가 custom_endpoint_address 지정되지 않은 경우 기본적으로 포트 443이 사용됩니다.

connection_verify
Optional[str]

연결 엔드포인트의 ID를 인증하는 데 사용되는 SSL 인증서의 사용자 지정 CA_BUNDLE 파일 경로입니다. 기본값은 None입니다. 이 경우 certifi.where() 가 사용됩니다.

uamqp_transport
bool

uamqp 라이브러리를 기본 전송으로 사용할지 여부입니다. 기본값은 False이고 Pure Python AMQP 라이브러리는 기본 전송으로 사용됩니다.

반환 형식

예외

플러시를 True로 설정하거나 버퍼링 모드에서 기본 AMQP 연결을 닫는 경우 버퍼를 플러시 할 때 오류가 발생한 경우.

예제

연결 문자열 EventHubProducerClient의 새 instance 만듭니다.


   import os
   from azure.eventhub import EventHubProducerClient
   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   producer = EventHubProducerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

get_buffered_event_count

버퍼링되고 지정된 파티션에 대해 게시되기를 기다리는 이벤트 수입니다. 버퍼링되지 않은 모드에서 None을 반환합니다. 참고: 이벤트 버퍼는 백그라운드 스레드에서 처리되므로 이 API에서 보고하는 버퍼의 이벤트 수는 근사치로만 간주되어야 하며 디버깅에만 사용하는 것이 좋습니다. 버퍼링된 이벤트가 없는 파티션 ID의 경우 해당 파티션 ID가 실제로 이벤트 허브 내에 있는지 여부에 관계없이 0이 반환됩니다.

get_buffered_event_count(partition_id: str) -> int | None

매개 변수

partition_id
str
필수

대상 파티션 ID입니다.

반환 형식

int,

예외

플러시를 True로 설정하거나 버퍼링 모드에서 기본 AMQP 연결을 닫는 경우 버퍼를 플러시 할 때 오류가 발생한 경우.

get_eventhub_properties

이벤트 허브의 속성을 가져옵니다.

반환된 사전의 키는 다음과 같습니다.

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_eventhub_properties() -> Dict[str, Any]

반환

eventhub 속성을 포함하는 사전입니다.

반환 형식

예외

get_partition_ids

이벤트 허브의 파티션 ID를 가져옵니다.

get_partition_ids() -> List[str]

반환

파티션 ID 목록입니다.

반환 형식

예외

get_partition_properties

지정된 파티션의 속성을 가져옵니다.

속성 사전의 키는 다음과 같습니다.

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (부울)

get_partition_properties(partition_id: str) -> Dict[str, Any]

매개 변수

partition_id
str
필수

대상 파티션 ID입니다.

반환

파티션 속성의 사전입니다.

반환 형식

예외

send_batch

이벤트 데이터의 일괄 처리를 보냅니다. 기본적으로 메서드는 승인이 수신되거나 작업이 시간 초과될 때까지 차단됩니다. EventHubProducerClient 가 버퍼링 모드에서 실행되도록 구성된 경우 메서드는 지정된 시간 내에 이벤트를 버퍼로 큐에 넣고 반환합니다. 생산자는 버퍼링 모드로 백그라운드에서 자동 전송을 수행합니다.

buffered_mode False이면 on_error 콜백은 선택 사항이며 오류는 다음과 같이 처리됩니다.

  • 생산자 클라이언트 인스턴스화 중에 on_error 콜백이 전달되는 경우

    그러면 오류 정보가 on_error 콜백에 전달되고, 이 콜백이 호출됩니다.

  • 클라이언트 인스턴스화 중에 on_error 콜백이 전달되지 않는 경우

    그러면 오류가 기본적으로 발생합니다.

buffered_mode True이면 on_error 콜백이 필요하며 다음과 같이 오류가 처리됩니다.

  • 이벤트가 지정된 시간 제한 내에 큐에 추가되지 않으면 오류가 직접 발생합니다.

  • 성공적으로 큐에 추가한 후 이벤트가 전송되지 않으면 on_error 콜백이 호출됩니다.

버퍼링 모드에서 일괄 처리를 보내는 것은 그대로 유지되고 단일 단위로 전송됩니다. 일괄 처리는 다시 정렬되지 않습니다. 이로 인해 이벤트를 보내는 비효율성이 발생할 수 있습니다.

EventData 또는 AmqpAnnotatedMessage의 한정된 목록을 보내고 이벤트 허브 프레임 크기 제한 내에 있다는 것을 알고 있는 경우 send_batch 호출을 사용하여 보낼 수 있습니다. 그렇지 않으면 를 사용하여 create_batchEventDataBatch 를 만들고 크기 제한까지 EventData 또는 AmqpAnnotatedMessage 를 일괄 처리에 하나씩 추가한 다음, 이 메서드를 호출하여 일괄 처리를 보냅니다.

send_batch(event_data_batch: EventDataBatch | List[EventData | AmqpAnnotatedMessage], **kwargs: Any) -> None

매개 변수

event_data_batch
Union[EventDataBatch, List[Union[EventData, AmqpAnnotatedMessage]]
필수

보낼 EventDataBatch 개체 또는 일괄 처리로 보낼 EventData 목록입니다. 목록 또는 EventDataBatch의 모든 EventData 또는 AmqpAnnotatedMessage가 동일한 파티션에 배치됩니다.

timeout
float

이벤트 데이터를 버퍼링되지 않은 모드로 보내는 최대 대기 시간 또는 버퍼링 모드에서 이벤트 데이터를 버퍼에 큐에 넣기 위한 최대 대기 시간입니다. 버퍼링되지 않은 모드에서는 생산자를 만들 때 지정된 기본 대기 시간이 사용됩니다. 버퍼링 모드에서 기본 대기 시간은 없음입니다.

partition_id
str

보낼 특정 파티션 ID입니다. 기본값은 없음입니다. 이 경우 서비스는 라운드 로빈을 사용하여 모든 파티션에 할당됩니다. Partition_id 지정되고 EventDataBatch 자체가 partition_id 때문에 event_data_batch EventDataBatch인 경우 TypeError가 발생합니다.

partition_key
str

지정된 partition_key 사용하여 이벤트 데이터는 서비스에서 결정한 이벤트 허브의 특정 파티션으로 전송됩니다. Partition_key 지정되고 EventDataBatch 자체가 partition_key 때문에 event_data_batch EventDataBatch인 경우 TypeError가 발생합니다. partition_id 및 partition_key 모두 제공되면 partition_id 우선적으로 적용됩니다. 경고: 이벤트 허브 서비스에서 partition_key 무시되고 이벤트가 라운드 로빈을 사용하여 모든 파티션에 할당되므로 전송할 이벤트에 대한 문자열이 아닌 값의 partition_key 설정하는 것은 권장되지 않습니다. 또한 partition_key 문자열 형식일 것으로 예상되는 이벤트를 소비하기 위한 SDK가 있으며, 문자열이 아닌 값을 구문 분석하지 못할 수 있습니다.

반환 형식

예외

이벤트가 버퍼링되지 않은 모드로 전송되기 전에 시간 제한 매개 변수로 지정된 값이 경과하거나 이벤트를 버퍼링된 모드로 버퍼링된 에 큐에 넣을 수 없는 경우

예제

이벤트 데이터 보내기


       with producer:
           event_data_batch = producer.create_batch()

           while True:
               try:
                   event_data_batch.add(EventData('Message inside EventBatchData'))
               except ValueError:
                   # EventDataBatch object reaches max_size.
                   # New EventDataBatch object can be created here to send more data
                   break

           producer.send_batch(event_data_batch)

send_event

이벤트 데이터를 보냅니다. 기본적으로 메서드는 승인이 수신되거나 작업이 시간 초과될 때까지 차단됩니다. EventHubProducerClient 가 버퍼링 모드에서 실행되도록 구성된 경우 메서드는 지정된 시간 내에 이벤트를 버퍼로 큐에 넣고 반환합니다. 생산자는 버퍼링 모드로 백그라운드에서 자동 전송을 수행합니다.

buffered_mode False이면 on_error 콜백이 선택 사항이며 다음과 같이 오류가 처리됩니다. * 생산자 클라이언트 인스턴스화 중에 on_error 콜백이 전달되는 경우

       then error information will be passed to the *on_error* callback, which will then be called.

  * If an *on_error* callback is not passed in during client instantiation,

       then the error will be raised by default.

buffered_mode True이면 on_error 콜백이 필요하며 다음과 같이 오류가 처리됩니다. * 지정된 시간 제한 내에 이벤트가 큐에 추가되지 않으면 오류가 직접 발생합니다.

  * If events fail to send after enqueuing successfully, the *on_error* callback will be called.
send_event(event_data: EventData | AmqpAnnotatedMessage, **kwargs: Any) -> None

매개 변수

event_data
Union[EventData, AmqpAnnotatedMessage]
필수

보낼 EventData 개체입니다.

timeout
float

버퍼링되지 않은 모드에서 이벤트 데이터를 보내는 최대 대기 시간 또는 버퍼링 모드에서 이벤트 데이터를 버퍼에 큐에 넣기 위한 최대 대기 시간입니다. 버퍼링되지 않은 모드에서는 생산자가 생성될 때 지정된 기본 대기 시간이 사용됩니다. 버퍼링 모드에서 기본 대기 시간은 없음입니다.

partition_id
str

보낼 특정 파티션 ID입니다. 기본값은 없음입니다. 이 경우 서비스는 라운드 로빈을 사용하여 모든 파티션에 할당됩니다. Partition_id 지정되고 EventDataBatch 자체가 partition_id 때문에 event_data_batch EventDataBatch인 경우 TypeError가 발생합니다.

partition_key
str

지정된 partition_key 사용하여 이벤트 데이터는 서비스에서 결정한 이벤트 허브의 특정 파티션으로 전송됩니다. Partition_key 지정되고 EventDataBatch 자체가 partition_key 때문에 event_data_batch EventDataBatch인 경우 TypeError가 발생합니다. partition_id 및 partition_key 모두 제공되면 partition_id 우선적으로 적용됩니다. 경고: 이벤트 허브 서비스에서 partition_key 무시되고 이벤트가 라운드 로빈을 사용하여 모든 파티션에 할당되므로 전송할 이벤트에 대한 문자열이 아닌 값의 partition_key 설정하는 것은 권장되지 않습니다. 또한 partition_key 문자열 형식일 것으로 예상되는 이벤트를 소비하기 위한 SDK가 있으며, 문자열이 아닌 값을 구문 분석하지 못할 수 있습니다.

반환 형식

예외

시간 제한 매개 변수로 지정된 값이 경과한 경우 이벤트가 버퍼링되지 않은 모드로 전송되거나 이벤트가 버퍼링된 모드에서 버퍼링된 모드로 큐에 넣을 수 있습니다.

특성

total_buffered_event_count

현재 버퍼링되고 게시되기를 기다리는 모든 파티션의 총 이벤트 수입니다. 버퍼링되지 않은 모드에서 None을 반환합니다. 참고: 이벤트 버퍼는 백그라운드 스레드에서 처리되므로 이 API에서 보고한 버퍼의 이벤트 수는 근사치로 간주되어야 하며 디버깅에만 사용하는 것이 좋습니다.

반환 형식

int,