EventHubConsumerClient 클래스

EventHubConsumerClient 클래스는 Azure Event Hubs 서비스에서 이벤트를 수신하기 위한 상위 수준 인터페이스를 정의합니다.

EventHubConsumerClient의 기본 목표는 부하 분산 및 검사점이 있는 EventHub의 모든 파티션에서 이벤트를 수신하는 것입니다.

여러 EventHubConsumerClient 인스턴스가 동일한 이벤트 허브, 소비자 그룹 및 검사점 위치에 대해 실행되는 경우 파티션이 균등하게 분산됩니다.

부하 분산 및 지속형 검사점 사용하려면 EventHubConsumerClient를 만들 때 checkpoint_store 설정해야 합니다. 검사점 저장소가 제공되지 않으면 검사점은 메모리에서 내부적으로 유지 관리됩니다.

EventHubConsumerClient는 해당 메서드 receive() 또는 receive_batch()를 호출하고 partition_id 지정할 때 특정 파티션에서 수신할 수도 있습니다. 부하 분산은 단일 파티션 모드에서 작동하지 않습니다. 그러나 checkpoint_store 설정된 경우에도 사용자는 검사점도 저장할 수 있습니다.

상속
azure.eventhub._client_base.ClientBase
EventHubConsumerClient

생성자

EventHubConsumerClient(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, credential: CredentialTypes, **kwargs: Any)

매개 변수

fully_qualified_namespace
str
필수

Event Hubs 네임스페이스의 정규화된 호스트 이름입니다. 네임스페이스 형식은 .servicebus.windows.net.

eventhub_name
str
필수

클라이언트를 연결할 특정 이벤트 허브의 경로입니다.

consumer_group
str
필수

이 소비자 그룹에 대한 이벤트 허브에서 이벤트를 수신합니다.

credential
TokenCredential 또는 AzureSasCredential 또는 AzureNamedKeyCredential
필수

토큰을 가져오기 위한 특정 인터페이스를 구현하는 인증에 사용되는 자격 증명 개체입니다. *get_token(자체, 범위) 메서드를 구현하는 개체 및 azure-identity 라이브러리에서 생성된 또는 자격 증명 개체를 허용EventHubSharedKeyCredential합니다.

logging_enable
bool

로거에 네트워크 추적 로그를 출력할지 여부입니다. 기본값은 False입니다.

auth_timeout
float

서비스에서 토큰이 승인될 때까지 대기하는 시간(초)입니다. 기본값은 60초입니다. 0으로 설정하면 클라이언트에서 시간 제한이 적용되지 않습니다.

user_agent
str

지정된 경우 사용자 에이전트 문자열 앞에 추가됩니다.

retry_total
int

오류가 발생할 때 실패한 작업을 다시 실행하려는 총 시도 횟수입니다. 기본값은 3입니다. 수신의 retry_total 컨텍스트는 특별합니다. 수신 메서드는 각 반복에서 내부 수신 메서드를 호출하는 while-loop에 의해 구현됩니다. 수신 사례에서 retry_total while 루프의 내부 수신 메서드에 의해 발생한 오류 후 재시도 횟수를 지정합니다. 재시도 시도가 모두 완료되면 오류 정보와 함께 on_error 콜백이 호출됩니다(제공된 경우). 실패한 내부 파티션 소비자는 닫힙니다(제공된 경우 on_partition_close 호출됨). 새 내부 파티션 소비자가 만들어지고(제공된 경우 on_partition_initialize 호출됨) 수신을 다시 시작합니다.

retry_backoff_factor
float

두 번째 시도 후 시도 간에 적용할 백오프 요소입니다(대부분의 오류는 지연 없이 두 번째 시도에 의해 즉시 해결됨). 고정 모드에서는 {backoff factor}에 대해 재시도 정책이 항상 절전 모드로 전환됩니다. '지수' 모드에서 재시도 정책은 {backoff factor} * (2 ** ({총 재시도 횟수 } - 1)) 초 동안 절전 모드로 전환됩니다. backoff_factor 0.1이면 재시도 사이에 [0.0s, 0.2s, 0.4s, ...]에 대해 재시도가 절전 모드로 전환됩니다. 기본값은 0.8입니다.

retry_backoff_max
float

최대 백오프 시간입니다. 기본값은 120초(2분)입니다.

retry_mode
str

재시도 사이의 지연 동작입니다. 지원되는 값은 '고정' 또는 '지수'이며, 여기서 기본값은 '지수'입니다.

idle_timeout
float

시간 제한(초)이며, 이후 이 클라이언트는 추가 작업이 없는 경우 기본 연결을 닫습니다. 기본적으로 값은 None입니다. 즉, 서비스에서 시작하지 않는 한 비활성으로 인해 클라이언트가 종료되지 않습니다.

transport_type
TransportType

Event Hubs 서비스와 통신하는 데 사용할 전송 프로토콜의 유형입니다. 기본값은 TransportType.Amqp 이며 이 경우 포트 5671이 사용됩니다. 네트워크 환경에서 포트 5671을 사용할 수 없거나 차단된 경우 통신을 위해 포트 443을 사용하는 TransportType.AmqpOverWebsocket 을 대신 사용할 수 있습니다.

http_proxy
dict[str, str 또는 int]

HTTP 프록시 설정. 'proxy_hostname'(str value) 및'proxy_port'(int value) 키가 있는 사전이어야 합니다. 또한 'username', 'password' 키도 있을 수 있습니다.

checkpoint_store
CheckpointStore 또는 None

이벤트를 수신할 때 파티션 부하 분산 및 검사점 데이터를 저장하는 관리자입니다. 검사점 저장소는 모든 파티션 또는 단일 파티션에서 수신하는 두 경우 모두에 사용됩니다. 후자의 경우 부하 분산이 적용되지 않습니다. 검사점 저장소가 제공되지 않으면 검사점이 메모리에 내부적으로 유지되고 EventHubConsumerClient instance 부하 분산 없이 이벤트를 받습니다.

load_balancing_interval
float

부하 분산이 시작되는 경우. 두 부하 분산 평가 사이의 간격(초)입니다. 기본값은 30초입니다.

partition_ownership_expiration_interval
float

파티션 소유권은 이 시간(초) 후에 만료됩니다. 모든 부하 분산 평가는 소유권 만료 시간을 자동으로 연장합니다. 기본값은 6 * load_balancing_interval, 즉 기본 load_balancing_interval 사용하는 경우 180초입니다.

load_balancing_strategy
str 또는 LoadBalancingStrategy

부하 분산이 시작되면 이 전략을 사용하여 파티션 소유권을 클레임하고 균형을 조정합니다. greedy 전략에는 "greedy" 또는 LoadBalancingStrategy.GREEDY 를 사용합니다. 이 전략은 모든 부하 분산 평가에서 부하를 분산하는 데 필요한 클레임되지 않은 파티션을 많이 가져옵니다. 균형 잡힌 전략에는 "balanced" 또는 LoadBalancingStrategy.BALANCED 를 사용합니다. 이 전략은 모든 부하 분산 평가에서 다른 EventHubConsumerClient에서 클레임하지 않는 하나의 파티션만 클레임합니다. EventHub의 모든 파티션이 다른 EventHubConsumerClient 에서 클레임되고 이 클라이언트가 너무 적은 파티션을 주장한 경우 이 클라이언트는 부하 분산 전략에 관계없이 모든 부하 분산 평가에 대해 다른 클라이언트에서 하나의 파티션을 도용합니다. Greedy 전략은 기본적으로 사용됩니다.

custom_endpoint_address
str 또는 None

Event Hubs 서비스에 대한 연결을 설정하는 데 사용할 사용자 지정 엔드포인트 주소로, 호스트 환경에 필요한 애플리케이션 게이트웨이 또는 기타 경로를 통해 네트워크 요청을 라우팅할 수 있습니다. 기본값은 None입니다. 형식은 "sb://< custom_endpoint_hostname>:<custom_endpoint_port>"와 같습니다. 포트가 custom_endpoint_address 지정되지 않은 경우 기본적으로 포트 443이 사용됩니다.

connection_verify
str 또는 None

연결 엔드포인트의 ID를 인증하는 데 사용되는 SSL 인증서의 사용자 지정 CA_BUNDLE 파일 경로입니다. 기본값은 none입니다. 이 경우 certifi.where() 가 사용됩니다.

uamqp_transport
bool

uamqp 라이브러리를 기본 전송으로 사용할지 여부입니다. 기본값은 False이고 Pure Python AMQP 라이브러리는 기본 전송으로 사용됩니다.

socket_timeout
float

연결의 기본 소켓이 시간을 초과하기 전에 데이터를 보내고 받을 때 대기해야 하는 시간(초)입니다. 기본값은 TransportType.Amqp의 경우 0.2이고 TransportType.AmqpOverWebsocket의 경우 1입니다. 쓰기 시간 초과로 인해 EventHubsConnectionError 오류가 발생하는 경우 기본값보다 큰 를 전달해야 할 수 있습니다. 이는 고급 사용 시나리오용이며 일반적으로 기본값으로 충분해야 합니다.

예제

EventHubConsumerClient의 새 instance 만듭니다.


   import os
   from azure.eventhub import EventHubConsumerClient, EventHubSharedKeyCredential

   fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
   shared_access_key = os.environ['EVENT_HUB_SAS_KEY']

   credential = EventHubSharedKeyCredential(shared_access_policy, shared_access_key)
   consumer = EventHubConsumerClient(
       fully_qualified_namespace=fully_qualified_namespace,
       eventhub_name=eventhub_name,
       consumer_group='$Default',
       credential=credential)

메서드

close

이벤트 허브에서 이벤트 검색을 중지하고 기본 AMQP 연결 및 링크를 닫습니다.

from_connection_string

연결 문자열 EventHubConsumerClient를 만듭니다.

get_eventhub_properties

이벤트 허브의 속성을 가져옵니다.

반환된 사전의 키는 다음과 같습니다.

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_partition_ids

이벤트 허브의 파티션 ID를 가져옵니다.

get_partition_properties

지정된 파티션의 속성을 가져옵니다.

속성 사전의 키는 다음과 같습니다.

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (부울)

receive

선택적 부하 분산 및 검사점 지정을 사용하여 파티션에서 이벤트를 수신합니다.

receive_batch

선택적 부하 분산 및 검사점 지정을 사용하여 파티션에서 이벤트를 수신합니다.

close

이벤트 허브에서 이벤트 검색을 중지하고 기본 AMQP 연결 및 링크를 닫습니다.

close() -> None

반환 형식

예제

클라이언트를 닫습니다.


   import os
   import threading

   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']

   from azure.eventhub import EventHubConsumerClient
   consumer = EventHubConsumerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       consumer_group="$Default",
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

   logger = logging.getLogger("azure.eventhub")

   def on_event(partition_context, event):
       # Put your code here.
       # If the operation is i/o intensive, multi-thread will have better performance.
       logger.info("Received event from partition: {}".format(partition_context.partition_id))

   # The 'receive' method is a blocking call, it can be executed in a thread for
   # non-blocking behavior, and combined with the 'close' method.

   worker = threading.Thread(
       target=consumer.receive,
       kwargs={
           "on_event": on_event,
           "starting_position": "-1",  # "-1" is from the beginning of the partition.
       }
   )
   worker.start()
   time.sleep(10)  # Keep receiving for 10s then close.
   # Close down the consumer handler explicitly.
   consumer.close()

from_connection_string

연결 문자열 EventHubConsumerClient를 만듭니다.

from_connection_string(conn_str: str, consumer_group: str, **kwargs: Any) -> EventHubConsumerClient

매개 변수

conn_str
str
필수

이벤트 허브의 연결 문자열.

consumer_group
str
필수

이 소비자 그룹에 대한 이벤트 허브에서 이벤트를 수신합니다.

eventhub_name
str

클라이언트를 연결할 특정 이벤트 허브의 경로입니다.

logging_enable
bool

로거에 네트워크 추적 로그를 출력할지 여부입니다. 기본값은 False입니다.

auth_timeout
float

서비스에서 토큰이 승인될 때까지 대기하는 시간(초)입니다. 기본값은 60초입니다. 0으로 설정하면 클라이언트에서 시간 제한이 적용되지 않습니다.

user_agent
str

지정된 경우 사용자 에이전트 문자열 앞에 추가됩니다.

retry_total
int

오류가 발생할 때 실패한 작업을 다시 실행하려는 총 시도 횟수입니다. 기본값은 3입니다. 수신의 retry_total 컨텍스트는 특별합니다. 수신 메서드는 각 반복에서 내부 수신 메서드를 호출하는 while-loop에 의해 구현됩니다. 수신 사례에서 retry_total while 루프의 내부 수신 메서드에 의해 발생한 오류 후 재시도 횟수를 지정합니다. 재시도 시도가 모두 완료되면 오류 정보와 함께 on_error 콜백이 호출됩니다(제공된 경우). 실패한 내부 파티션 소비자는 닫힙니다(제공된 경우 on_partition_close 호출됨). 새 내부 파티션 소비자가 만들어지고(제공된 경우 on_partition_initialize 호출됨) 수신을 다시 시작합니다.

retry_backoff_factor
float

두 번째 시도 후 시도 간에 적용할 백오프 요소입니다(대부분의 오류는 지연 없이 두 번째 시도에 의해 즉시 해결됨). 고정 모드에서는 {backoff factor}에 대해 재시도 정책이 항상 절전 모드로 전환됩니다. '지수' 모드에서 재시도 정책은 {backoff factor} * (2 ** ({총 재시도 횟수 } - 1)) 초 동안 절전 모드로 전환됩니다. backoff_factor 0.1이면 재시도 사이에 [0.0s, 0.2s, 0.4s, ...]에 대해 재시도가 절전 모드로 전환됩니다. 기본값은 0.8입니다.

retry_backoff_max
float

최대 백오프 시간입니다. 기본값은 120초(2분)입니다.

retry_mode
str

재시도 사이의 지연 동작입니다. 지원되는 값은 '고정' 또는 '지수'이며, 여기서 기본값은 '지수'입니다.

idle_timeout
float

시간 제한(초)이며, 이후 이 클라이언트는 furthur 작업이 없는 경우 기본 연결을 닫습니다. 기본적으로 값은 None입니다. 즉, 서비스에서 시작하지 않는 한 비활성으로 인해 클라이언트가 종료되지 않습니다.

transport_type
TransportType

Event Hubs 서비스와 통신하는 데 사용할 전송 프로토콜의 유형입니다. 기본값은 TransportType.Amqp 이며 이 경우 포트 5671이 사용됩니다. 네트워크 환경에서 포트 5671을 사용할 수 없거나 차단된 경우 통신을 위해 포트 443을 사용하는 TransportType.AmqpOverWebsocket 을 대신 사용할 수 있습니다.

http_proxy
dict

HTTP 프록시 설정. 'proxy_hostname'(str value) 및'proxy_port'(int value) 키가 있는 사전이어야 합니다. 또한 'username', 'password' 키도 있을 수 있습니다.

checkpoint_store
CheckpointStore 또는 None

이벤트를 수신할 때 파티션 부하 분산 및 검사점 데이터를 저장하는 관리자입니다. 검사점 저장소는 모든 파티션 또는 단일 파티션에서 수신하는 두 경우 모두에 사용됩니다. 후자의 경우 부하 분산이 적용되지 않습니다. 검사점 저장소가 제공되지 않으면 검사점이 메모리에 내부적으로 유지되고 EventHubConsumerClient instance 부하 분산 없이 이벤트를 받습니다.

load_balancing_interval
float

부하 분산이 시작되는 경우. 두 부하 분산 평가 사이의 간격(초)입니다. 기본값은 10초입니다.

partition_ownership_expiration_interval
float

파티션 소유권은 이 시간(초) 후에 만료됩니다. 모든 부하 분산 평가는 소유권 만료 시간을 자동으로 연장합니다. 기본값은 6 * load_balancing_interval, 즉 기본 load_balancing_interval 사용하는 경우 60초입니다.

load_balancing_strategy
str 또는 LoadBalancingStrategy

부하 분산이 시작되면 이 전략을 사용하여 파티션 소유권을 클레임하고 균형을 조정합니다. greedy 전략에는 "greedy" 또는 LoadBalancingStrategy.GREEDY 를 사용합니다. 이 전략은 모든 부하 분산 평가에서 부하를 분산하는 데 필요한 클레임되지 않은 파티션을 많이 가져옵니다. 균형 잡힌 전략에는 "balanced" 또는 LoadBalancingStrategy.BALANCED 를 사용합니다. 이 전략은 모든 부하 분산 평가에서 다른 EventHubConsumerClient에서 클레임하지 않는 하나의 파티션만 클레임합니다. EventHub의 모든 파티션이 다른 EventHubConsumerClient 에서 클레임되고 이 클라이언트가 너무 적은 파티션을 주장한 경우 이 클라이언트는 부하 분산 전략에 관계없이 모든 부하 분산 평가에 대해 다른 클라이언트에서 하나의 파티션을 도용합니다. Greedy 전략은 기본적으로 사용됩니다.

custom_endpoint_address
str 또는 None

Event Hubs 서비스에 대한 연결을 설정하는 데 사용할 사용자 지정 엔드포인트 주소로, 호스트 환경에 필요한 애플리케이션 게이트웨이 또는 기타 경로를 통해 네트워크 요청을 라우팅할 수 있습니다. 기본값은 None입니다. 형식은 "sb://< custom_endpoint_hostname>:<custom_endpoint_port>"와 같습니다. 포트가 custom_endpoint_address 지정되지 않은 경우 기본적으로 포트 443이 사용됩니다.

connection_verify
str 또는 None

연결 엔드포인트의 ID를 인증하는 데 사용되는 SSL 인증서의 사용자 지정 CA_BUNDLE 파일 경로입니다. 기본값은 none입니다. 이 경우 certifi.where() 가 사용됩니다.

uamqp_transport
bool

uamqp 라이브러리를 기본 전송으로 사용할지 여부입니다. 기본값은 False이고 Pure Python AMQP 라이브러리는 기본 전송으로 사용됩니다.

반환 형식

예제

연결 문자열 EventHubConsumerClient의 새 instance 만듭니다.


   import os
   from azure.eventhub import EventHubConsumerClient
   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   consumer = EventHubConsumerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       consumer_group='$Default',
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

get_eventhub_properties

이벤트 허브의 속성을 가져옵니다.

반환된 사전의 키는 다음과 같습니다.

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_eventhub_properties() -> Dict[str, Any]

반환

이벤트 허브에 대한 정보를 포함하는 사전입니다.

반환 형식

예외

get_partition_ids

이벤트 허브의 파티션 ID를 가져옵니다.

get_partition_ids() -> List[str]

반환

파티션 ID 목록입니다.

반환 형식

예외

get_partition_properties

지정된 파티션의 속성을 가져옵니다.

속성 사전의 키는 다음과 같습니다.

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (부울)

get_partition_properties(partition_id: str) -> Dict[str, Any]

매개 변수

partition_id
str
필수

대상 파티션 ID입니다.

반환

파티션 속성을 포함하는 사전입니다.

반환 형식

예외

receive

선택적 부하 분산 및 검사점 지정을 사용하여 파티션에서 이벤트를 수신합니다.

receive(on_event: Callable[[PartitionContext, EventData | None], None], **kwargs: Any) -> None

매개 변수

on_event
callable[PartitionContext, EventData 또는 None]
필수

수신된 이벤트를 처리하기 위한 콜백 함수입니다. 콜백은 파티션 컨텍스트와 수신된 이벤트가 포함된 partition_context 두 개의 매개 변수를 사용합니다. 콜백 함수는 on_event(partition_context, 이벤트)와 같이 정의되어야 합니다. 자세한 파티션 컨텍스트 정보는 를 PartitionContext참조하세요.

max_wait_time
float

이벤트 프로세서가 콜백을 호출하기 전에 대기하는 최대 간격(초)입니다. 이 간격 내에 이벤트가 수신되지 않으면 없음을 사용하여 on_event 콜백이 호출됩니다. 이 값이 None 또는 0(기본값)으로 설정된 경우 이벤트가 수신될 때까지 콜백이 호출되지 않습니다.

partition_id
str

지정된 경우 클라이언트는 이 파티션에서만 수신됩니다. 그렇지 않으면 클라이언트는 모든 파티션에서 수신됩니다.

owner_level
int

독점 소비자의 우선 순위입니다. owner_level 설정된 경우 단독 소비자가 만들어집니다. owner_level 높은 소비자는 배타적 우선 순위가 높습니다. 소유자 수준은 소비자의 'epoch 값'으로도 알고 있습니다.

prefetch
int

처리를 위해 서비스에서 프리페치할 이벤트 수입니다. 기본값은 300입니다.

track_last_enqueued_event_properties
bool

소비자가 연결된 파티션에서 마지막으로 큐에 추가된 이벤트에 대한 정보를 요청하고 이벤트가 수신될 때 해당 정보를 추적해야 하는지 여부를 나타냅니다. 마지막으로 큐에 추가된 파티션에 대한 정보가 추적되는 경우 Event Hubs 서비스에서 받은 각 이벤트는 파티션에 대한 메타데이터를 전달합니다. 이로 인해 이벤트 허브 클라이언트를 사용하여 파티션 속성에 대해 주기적으로 요청하는 것을 고려할 때 일반적으로 유리한 절전 모드인 적은 양의 추가 네트워크 대역폭 소비가 발생합니다. 기본적으로 False 로 설정됩니다.

starting_position
str, int, datetime 또는 dict[str,any]

파티션에 대한 검사점 데이터가 없는 경우 이 이벤트 위치에서 수신을 시작합니다. 검사점 데이터는 사용 가능한 경우 사용됩니다. 파티션 ID를 키로 사용하고 개별 파티션의 값으로 위치를 지정하는 받아쓰기 또는 모든 파티션에 대한 단일 값일 수 있습니다. 값 형식은 str, int 또는 datetime.datetime일 수 있습니다. 또한 스트림의 시작 부분에서 수신하기 위한 값 "-1"과 새 이벤트만 수신하기 위한 "@latest" 값도 지원됩니다. 기본값은 "@latest"입니다.

starting_position_inclusive
bool 또는 dict[str,bool]

지정된 starting_position 포함(>=)인지 여부를 확인합니다(>). 포함의 경우 True이고 단독의 경우 False입니다. 파티션 ID를 키로 사용하고 특정 파티션에 대한 starting_position 포함 여부를 나타내는 값으로 bool인 받아쓰기일 수 있습니다. 이 값은 모든 starting_position 단일 부울 값일 수도 있습니다. 기본값은 False입니다.

on_error
callable[[PartitionContext, Exception]]

재시도 시도가 소진된 후 또는 부하 분산 과정에서 오류가 발생할 때 호출되는 콜백 함수입니다. 콜백은 파티션 정보가 포함된 partition_context 예외인 오류 라는 두 가지 매개 변수를 사용합니다. 부하 분산 과정에서 오류가 발생하면 partition_context 없음일 수 있습니다. 콜백은 다음과 같이 정의해야 합니다. on_error(partition_context, 오류). on_event 콜백 중에 처리되지 않은 예외가 발생하는 경우에도 on_error 콜백이 호출됩니다.

on_partition_initialize
callable[[PartitionContext]]

특정 파티션에 대한 소비자가 초기화를 완료한 후 호출될 콜백 함수입니다. 실패하고 닫힌 내부 파티션 소비자에 대한 수신 프로세스를 인수하기 위해 새 내부 파티션 소비자를 만들 때도 호출됩니다. 콜백은 파티션 정보를 포함하는 단일 매개 변수 인 partition_context 사용합니다. 콜백은 on_partition_initialize(partition_context)와 같이 정의되어야 합니다.

on_partition_close
callable[[PartitionContext, CloseReason]]

특정 파티션에 대한 소비자가 닫힌 후 호출될 콜백 함수입니다. 재시도를 모두 마친 후 수신하는 동안 오류가 발생할 때도 호출됩니다. 콜백은 파티션 정보와 닫기 이유를 포함하는 partition_context 두 개의 매개 변수를 사용합니다. 콜백은 on_partition_close(partition_context, 이유)와 같이 정의해야 합니다. 닫는 다양한 이유로 을 참조 CloseReason 하세요.

반환 형식

예제

EventHub에서 이벤트를 수신합니다.


       logger = logging.getLogger("azure.eventhub")

       def on_event(partition_context, event):
           # Put your code here.
           # If the operation is i/o intensive, multi-thread will have better performance.
           logger.info("Received event from partition: {}".format(partition_context.partition_id))

       with consumer:
           consumer.receive(on_event=on_event)

receive_batch

선택적 부하 분산 및 검사점 지정을 사용하여 파티션에서 이벤트를 수신합니다.

receive_batch(on_event_batch: Callable[[PartitionContext, List[EventData]], None], **kwargs: Any) -> None

매개 변수

on_event_batch
callable[PartitionContext, list[EventData]]
필수

수신된 이벤트의 일괄 처리를 처리하기 위한 콜백 함수입니다. 콜백은 파티션 컨텍스트를 포함하는 partition_context 수신된 이벤트인 event_batch 두 매개 변수를 사용합니다. 콜백 함수는 다음과 같이 정의해야 합니다. on_event_batch(partition_context, event_batch). event_batch max_wait_time 없음 또는 0이 아니고 max_wait_time후에 이벤트가 수신되지 않는 경우 빈 목록이 될 수 있습니다. 자세한 파티션 컨텍스트 정보는 를 PartitionContext참조하세요.

max_batch_size
int

백 on_event_batch 전달된 일괄 처리의 최대 이벤트 수입니다. 실제로 받은 이벤트 수가 max_batch_size보다 큰 경우 수신된 이벤트는 일괄 처리로 나뉘며 최대 max_batch_size 이벤트가 있는 각 일괄 처리에 대한 콜백을 호출합니다.

max_wait_time
float

콜백을 호출하기 전에 이벤트 프로세서가 대기하는 최대 간격(초)입니다. 이 간격 내에 이벤트가 수신되지 않으면 on_event_batch 콜백이 빈 목록으로 호출됩니다.

partition_id
str

지정된 경우 클라이언트는 이 파티션에서만 수신됩니다. 그렇지 않으면 클라이언트가 모든 파티션에서 수신됩니다.

owner_level
int

독점 소비자의 우선 순위입니다. owner_level 설정된 경우 단독 소비자가 만들어집니다. owner_level 높은 소비자는 배타적 우선 순위가 높습니다. 소유자 수준은 소비자의 'epoch 값'으로도 알고 있습니다.

prefetch
int

처리를 위해 서비스에서 프리페치할 이벤트 수입니다. 기본값은 300입니다.

track_last_enqueued_event_properties
bool

소비자가 연결된 파티션에서 마지막으로 큐에 추가된 이벤트에 대한 정보를 요청하고 이벤트가 수신될 때 해당 정보를 추적해야 하는지 여부를 나타냅니다. 마지막으로 큐에 추가된 파티션 이벤트에 대한 정보를 추적할 때 Event Hubs 서비스에서 받은 각 이벤트는 파티션에 대한 메타데이터를 전달합니다. 이로 인해 소량의 추가 네트워크 대역폭 소비가 발생하며, 이벤트 허브 클라이언트를 사용하여 파티션 속성에 대해 주기적으로 요청하는 것을 고려할 때 일반적으로 양호한 절차입니다. 기본적으로 False 로 설정됩니다.

starting_position
str, int, datetime 또는 dict[str,any]

파티션에 대한 검사점 데이터가 없는 경우 이 이벤트 위치에서 수신을 시작합니다. 검사점 데이터는 사용 가능한 경우 사용됩니다. 파티션 ID를 키로 사용하고 개별 파티션의 값으로 위치를 지정하는 받아쓰기 또는 모든 파티션에 대한 단일 값일 수 있습니다. 값 형식은 str, int 또는 datetime.datetime일 수 있습니다. 또한 스트림의 시작 부분에서 수신하기 위한 값 "-1"과 새 이벤트만 수신하기 위한 "@latest" 값도 지원됩니다. 기본값은 "@latest"입니다.

starting_position_inclusive
bool 또는 dict[str,bool]

지정된 starting_position 포함(>=)인지 여부(>)를 확인합니다. 포함의 경우 True이고, False이면 배타적입니다. 파티션 ID를 키로 사용하고 bool을 특정 파티션에 대한 starting_position 포함 여부를 나타내는 값으로 사용하는 받아쓰기일 수 있습니다. 이 값은 모든 starting_position 단일 부울 값일 수도 있습니다. 기본값은 False입니다.

on_error
callable[[PartitionContext, Exception]]

재시도 시도가 소진된 후 또는 부하 분산 프로세스 중에 수신하는 동안 오류가 발생할 때 호출되는 콜백 함수입니다. 콜백은 두 개의 매개 변수를 사용합니다. partition_context 파티션 정보를 포함하고 오류가 예외입니다. 부하 분산 프로세스 중에 오류가 발생하면 partition_context 없음일 수 있습니다. 콜백은 다음과 같이 정의해야 합니다. on_error(partition_context, 오류). on_event 콜백 중에 처리되지 않은 예외가 발생하는 경우에도 on_error 콜백이 호출됩니다.

on_partition_initialize
callable[[PartitionContext]]

특정 파티션에 대한 소비자가 초기화를 완료한 후 호출되는 콜백 함수입니다. 또한 실패하고 닫힌 내부 파티션 소비자에 대한 수신 프로세스를 인수하기 위해 새 내부 파티션 소비자를 만들 때 호출됩니다. 콜백은 단일 매개 변수인 partition_context 파티션 정보를 포함합니다. 콜백은 on_partition_initialize(partition_context)와 같이 정의해야 합니다.

on_partition_close
callable[[PartitionContext, CloseReason]]

특정 파티션에 대한 소비자가 닫힌 후 호출될 콜백 함수입니다. 재시도를 모두 마친 후 수신하는 동안 오류가 발생할 때도 호출됩니다. 콜백은 파티션 정보와 종료 이유를 포함하는 partition_context 두 매개 변수를 사용합니다. 콜백은 on_partition_close(partition_context, 이유)와 같이 정의해야 합니다. 닫는 다양한 이유로 을 참조 CloseReason 하세요.

반환 형식

예제

EventHub에서 이벤트를 일괄 처리로 수신합니다.


       logger = logging.getLogger("azure.eventhub")

       def on_event_batch(partition_context, event_batch):
           # Put your code here.
           # If the operation is i/o intensive, multi-thread will have better performance.
           logger.info("Received events from partition: {}".format(partition_context.partition_id))

       with consumer:
           consumer.receive_batch(on_event_batch=on_event_batch)