Python용 Azure Event Hubs 클라이언트 라이브러리 - 버전 5.11.5

Azure Event Hubs 초당 수백만 개의 이벤트를 수집하여 여러 소비자에게 스트리밍할 수 있는 확장성이 뛰어난 게시 구독 서비스입니다. 이렇게 하면 연결된 디바이스 및 애플리케이션에서 생성되는 대량의 데이터를 처리하고 분석할 수 있습니다. Event Hubs가 데이터를 수집하면 실시간 분석 공급자 또는 일괄 처리/스토리지 어댑터를 사용하여 데이터를 검색, 변환 및 저장할 수 있습니다. Azure Event Hubs 대해 자세히 알아보려면 Event Hubs란?

Azure Event Hubs 클라이언트 라이브러리를 사용하면 Azure Event Hubs 이벤트를 게시하고 사용할 수 있으며 다음을 수행할 수 있습니다.

  • 비즈니스 인텔리전스 및 진단 목적으로 애플리케이션에 대한 원격 분석을 내보냅니다.
  • 이해 당사자가 관찰하고 작업을 수행하기 위한 트리거로 사용할 수 있는 애플리케이션 상태에 대한 사실을 게시합니다.
  • 느슨하게 연결된 시스템을 함께 묶을 필요 없이 상호 작용할 수 있도록 비즈니스 또는 기타 에코시스템 내에서 일어나는 흥미로운 작업 및 상호 작용을 관찰합니다.
  • 하나 이상의 게시자로부터 이벤트를 수신하고 에코시스템의 요구 사항을 더 잘 충족하도록 변환한 다음, 소비자가 관찰할 수 있도록 변환된 이벤트를 새 스트림에 게시합니다.

소스 코드 | 패키지(PyPi) | 패키지(Conda) | API 참조 설명서 | 제품 설명서 | 샘플

시작

필수 구성 요소

  • Python 3.7 이상

  • Microsoft Azure 구독: Azure Event Hubs 포함하여 Azure 서비스를 사용하려면 구독이 필요합니다. 기존 Azure 계정이 없는 경우 무료 평가판에 등록하거나 계정을 만들 때 MSDN 구독자 혜택을 사용할 수 있습니다.

  • Event Hubs 네임스페이스(Event Hubs)를 사용하여 다음을 수행합니다. Azure Event Hubs 상호 작용하려면 네임스페이스와 Event Hub를 사용할 수 있어야 합니다. Azure 리소스를 만드는 데 익숙하지 않은 경우 Azure Portal 사용하여 Event Hub를 만들기 위한 단계별 가이드를 따를 수 있습니다. 또한 Azure CLI, Azure PowerShell 또는 ARM(Azure Resource Manager) 템플릿을 사용하여 이벤트 허브를 만들기 위한 자세한 지침을 찾을 수 있습니다.

패키지 설치

pip를 사용하여 Python용 Azure Event Hubs 클라이언트 라이브러리를 설치합니다.

$ pip install azure-eventhub

클라이언트 인증

Event Hubs와의 상호 작용은 EventHubConsumerClient 또는 EventHubProducerClient 클래스의 instance 시작합니다. 클라이언트 개체를 인스턴스화하려면 호스트 이름, SAS/AAD 자격 증명 및 이벤트 허브 이름 또는 연결 문자열 필요합니다.

연결 문자열 클라이언트 만들기:

Event Hubs 클라이언트 라이브러리가 Event Hubs와 상호 작용할 수 있도록 가장 쉬운 방법은 Event Hubs 네임스페이스를 만들 때 자동으로 만들어지는 연결 문자열 사용하는 것입니다. Azure의 공유 액세스 정책에 익숙하지 않은 경우 단계별 가이드에 따라 Event Hubs 연결 문자열 가져올 수 있습니다.

  • 메서드는 from_connection_stringEndpoint=sb://<yournamespace>.servicebus.windows.net/;SharedAccessKeyName=<yoursharedaccesskeyname>;SharedAccessKey=<yoursharedaccesskey> 및 엔터티 이름의 연결 문자열 Event Hub instance 사용합니다. Azure Portal 연결 문자열 가져올 수 있습니다.

azure-identity 라이브러리를 사용하여 클라이언트를 만듭니다.

또는 Credential 개체를 사용하여 azure-identity 패키지를 사용하여 AAD를 통해 인증할 수 있습니다.

  • 위의 샘플에 설명된 이 생성자는 TokenCredential 프로토콜을 구현하는 이벤트 허브 instance 및 자격 증명의 호스트 이름 및 엔터티 이름을 사용합니다. azure-identity 패키지에서 사용할 수 있는 프로토콜의 TokenCredential 구현이 있습니다. 호스트 이름은 형식 <yournamespace.servicebus.windows.net>입니다.
  • 에서 제공하는 자격 증명 형식을 azure-identity사용하려면 패키지를 설치하세요. pip install azure-identity
  • 또한 비동기 API를 사용하려면 먼저 와 같은 aiohttp비동기 전송을 설치해야 합니다. pip install aiohttp
  • Azure Active Directory를 사용하는 경우 보안 주체에 Azure Event Hubs 데이터 소유자 역할과 같은 Event Hubs에 대한 액세스를 허용하는 역할이 할당되어야 합니다. Event Hubs에서 Azure Active Directory 권한 부여를 사용하는 방법에 대한 자세한 내용은 관련 설명서를 참조하세요.

주요 개념

  • EventHubProducerClient는 포함된 디바이스 솔루션, 모바일 디바이스 애플리케이션, 콘솔 또는 다른 디바이스에서 실행되는 게임 타이틀, 일부 클라이언트 또는 서버 기반 비즈니스 솔루션 또는 웹 사이트의 일부로 원격 분석 데이터, 진단 정보, 사용 로그 또는 기타 로그 데이터의 원본입니다.

  • EventHubConsumerClient는 이벤트 허브에서 이러한 정보를 선택하고 처리합니다. 처리에는 집계, 복잡한 계산 및 필터링이 포함될 수 있습니다. 또한 처리에 원시 또는 변환된 방식으로 정보를 배포하거나 저장하는 작업이 포함될 수도 있습니다. 이벤트 허브 소비자는 Azure Stream Analytics, Apache Spark, Apache Storm 같은 기본 제공 분석 기능이 있는 강력하고 확장성이 뛰어난 플랫폼 인프라 부분인 경우가 많습니다.

  • 파티션은 이벤트 허브에 보관되는 순서가 지정된 이벤트 시퀀스입니다. Azure Event Hubs는 각 소비자가 메시지 스트림의 특정 하위 집합 또는 파티션만 읽는 분할된 소비자 패턴을 통해 메시지 스트리밍을 제공합니다. 최신 이벤트가 도착하면 이 시퀀스의 끝에 추가됩니다. 파티션 수는 이벤트 허브를 만들 때 지정되며 변경할 수 없습니다.

  • 소비자 그룹은 전체 이벤트 허브의 보기입니다. 소비자 그룹을 통해 데이터를 사용하는 여러 애플리케이션이 각각 개별 이벤트 스트림 보기를 사용할 수 있으며 고유한 위치에서 고유한 속도로 독립적으로 스트림을 읽을 수 있습니다. 소비자 그룹당 파티션에는 최대 5개의 동시 reader가 있을 수 있습니다. 그러나 지정된 파티션 및 소비자 그룹 페어링에 대해 활성 소비자가 하나만 있는 것이 좋습니다. 각 활성 reader는 해당 파티션의 모든 이벤트를 수신합니다. 동일한 파티션에 여러 reader가 있는 경우 중복된 이벤트를 수신합니다.

더 많은 개념과 심층적인 논의는 Event Hubs 기능을 참조하세요. 또한 AMQP에 대한 개념은 OASIS AMQP(Advanced Messaging Queuing Protocol) 버전 1.0에 잘 설명되어 있습니다.

스레드로부터의 안전성

EventHubProducerClient 또는 EventHubConsumerClient가 스레드로부터 안전하다는 보장은 없습니다. 스레드 간에 이러한 인스턴스를 다시 사용하지 않는 것이 좋습니다. 스레드로부터 안전한 방식으로 이러한 클래스를 사용하는 것은 실행 중인 애플리케이션에 달려 있습니다.

데이터 모델 형식 EventDataBatch 은 스레드로부터 안전하지 않습니다. 스레드 간에 공유하거나 클라이언트 메서드와 동시에 사용해서는 안 됩니다.

예제

다음 섹션에서는 다음을 포함하여 가장 일반적인 Event Hubs 작업 중 일부를 다루는 몇 가지 코드 조각을 제공합니다.

이벤트 허브 검사

이벤트 허브의 파티션 ID를 가져옵니다.

import os
from azure.eventhub import EventHubConsumerClient

CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"]
EVENTHUB_NAME = os.environ['EVENT_HUB_NAME']

consumer_client = EventHubConsumerClient.from_connection_string(
    conn_str=CONNECTION_STR,
    consumer_group='$Default',
    eventhub_name=EVENTHUB_NAME,
)

with consumer_client:
    pass # consumer_client is now ready to be used.

이벤트 허브에 이벤트 게시

에 메서드를 create_batch 사용하여 메서드를 EventDataBatch 사용하여 보낼 수 있는 개체를 만듭니다send_batch.EventHubProducerClient 최대 일괄 처리 크기 제한(바이트)에 도달할 때까지 메서드를 사용하여 add 에 이벤트를 추가할 EventDataBatch 수 있습니다.

def send_event_data_batch(producer):
    # Without specifying partition_id or partition_key
    # the events will be distributed to available partitions via round-robin.
    event_data_batch = producer.create_batch()
    event_data_batch.add(EventData('Single message'))
    producer.send_batch(event_data_batch)

이벤트 허브에서 이벤트 사용

EventHub에서 이벤트를 사용하는 방법에는 여러 가지가 있습니다. 이벤트가 수신될 때 콜백을 트리거하기 위해 메서드는 EventHubConsumerClient.receive 다음과 같이 사용합니다.

import logging
from azure.eventhub import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

def on_event(partition_context, event):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    partition_context.update_checkpoint(event)

with client:
    client.receive(
        on_event=on_event,
        starting_position="-1",  # "-1" is from the beginning of the partition.
    )
    # receive events from specified partition:
    # client.receive(on_event=on_event, partition_id='0')

이벤트 허브의 이벤트를 일괄 처리로 사용

위의 샘플은 수신되는 각 메시지에 대한 콜백을 트리거하는 반면, 다음 샘플은 이벤트 일괄 처리에서 콜백을 트리거하여 한 번에 숫자를 수신하려고 시도합니다.

import logging
from azure.eventhub import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

def on_event_batch(partition_context, events):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    partition_context.update_checkpoint()

with client:
    client.receive_batch(
        on_event_batch=on_event_batch,
        starting_position="-1",  # "-1" is from the beginning of the partition.
    )
    # receive events from specified partition:
    # client.receive_batch(on_event_batch=on_event_batch, partition_id='0')

이벤트 허브에 이벤트를 비동기적으로 게시

에 메서드를 create_batch 사용하여 메서드를 EventDataBatch 사용하여 보낼 수 있는 개체를 만듭니다send_batch.EventHubProducer 최대 일괄 처리 크기 제한(바이트)에 도달할 때까지 메서드를 사용하여 add 에 이벤트를 추가할 EventDataBatch 수 있습니다.

import asyncio
from azure.eventhub.aio import EventHubProducerClient  # The package name suffixed with ".aio" for async
from azure.eventhub import EventData

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

async def create_batch(client):
    event_data_batch = await client.create_batch()
    can_add = True
    while can_add:
        try:
            event_data_batch.add(EventData('Message inside EventBatchData'))
        except ValueError:
            can_add = False  # EventDataBatch object reaches max_size.
    return event_data_batch

async def send():
    client = EventHubProducerClient.from_connection_string(connection_str, eventhub_name=eventhub_name)
    batch_data = await create_batch(client)
    async with client:
        await client.send_batch(batch_data)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(send())

이벤트 허브의 이벤트를 비동기적으로 사용

이 SDK는 동기 및 asyncio 기반 코드를 모두 지원합니다. 위의 샘플에서 설명한 대로 수신하려면 aio 내에서 다음이 필요합니다.

import logging
import asyncio
from azure.eventhub.aio import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

async def on_event(partition_context, event):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    await partition_context.update_checkpoint(event)

async def receive():
    client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)
    async with client:
        await client.receive(
            on_event=on_event,
            starting_position="-1",  # "-1" is from the beginning of the partition.
        )
        # receive events from specified partition:
        # await client.receive(on_event=on_event, partition_id='0')

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(receive())

이벤트 허브의 이벤트를 비동기적으로 일괄 처리로 사용

모든 동기 함수는 aio에서도 지원됩니다. 동기 일괄 처리 수신에 대해 위에서 설명한 것처럼 다음과 같이 asyncio 내에서 동일한 작업을 수행할 수 있습니다.

import logging
import asyncio
from azure.eventhub.aio import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

async def on_event_batch(partition_context, events):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    await partition_context.update_checkpoint()

async def receive_batch():
    client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)
    async with client:
        await client.receive_batch(
            on_event_batch=on_event_batch,
            starting_position="-1",  # "-1" is from the beginning of the partition.
        )
        # receive events from specified partition:
        # await client.receive_batch(on_event_batch=on_event_batch, partition_id='0')

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(receive_batch())

검사점 저장소를 사용하여 이벤트 사용 및 검사점 저장

EventHubConsumerClient 는 한 번에 여러 파티션에서 이벤트를 수신하고 동일한 이벤트 허브 및 소비자 그룹을 사용하여 다른 소비자와 부하를 분산할 수 있는 높은 수준의 구문입니다.

또한 사용자는 검사점에서 이벤트를 처리할 때 진행률을 추적할 수 있습니다.

검사점은 이벤트 허브 instance 소비자 그룹의 특정 파티션에서 사용자가 마지막으로 성공적으로 처리한 이벤트를 나타내기 위한 것입니다. 는 EventHubConsumerClient 의 instance CheckpointStore 사용하여 검사점을 업데이트하고 부하 분산 알고리즘에 필요한 관련 정보를 저장합니다.

접두사를 azure-eventhub-checkpointstore 사용하여 pypi를 검색하여 이를 지원하는 패키지를 찾고 이러한 패키지 중 하나에서 구현을 CheckpointStore 사용합니다. 동기화 라이브러리와 비동기 라이브러리가 모두 제공됩니다.

아래 예제에서는 의 instance EventHubConsumerClient 만들고 를 BlobCheckpointStore사용합니다. 코드를 실행하려면 Azure Storage 계정Blob 컨테이너 를 만들어야 합니다.

Azure Blob Storage 검사점 저장소 비동기Azure Blob Storage 검사점 저장소 동기화는 영구 저장소로 Azure Blob Storage 적용하는 구현 중 하나입니다CheckpointStore.

import asyncio

from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
storage_connection_str = '<< CONNECTION STRING FOR THE STORAGE >>'
container_name = '<<NAME OF THE BLOB CONTAINER>>'

async def on_event(partition_context, event):
    # do something
    await partition_context.update_checkpoint(event)  # Or update_checkpoint every N events for better performance.

async def receive(client):
    await client.receive(
        on_event=on_event,
        starting_position="-1",  # "-1" is from the beginning of the partition.
    )

async def main():
    checkpoint_store = BlobCheckpointStore.from_connection_string(storage_connection_str, container_name)
    client = EventHubConsumerClient.from_connection_string(
        connection_str,
        consumer_group,
        eventhub_name=eventhub_name,
        checkpoint_store=checkpoint_store,  # For load balancing and checkpoint. Leave None for no load balancing
    )
    async with client:
        await receive(client)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

EventHubConsumerClient를 사용하여 IoT Hub 작업

를 사용하여 EventHubConsumerClient IoT Hub 작업할 수도 있습니다. 이는 연결된 EventHub에서 IoT Hub 원격 분석 데이터를 수신하는 데 유용합니다. 연결된 연결 문자열 보내기 클레임이 없으므로 이벤트를 보낼 수 없습니다.

이벤트 허브 호환 엔드포인트에 대한 연결 문자열 필요합니다(예: "Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/; SharedAccessKeyName=my-SA-name; SharedAccessKey=my-SA-key; EntityPath=my-iot-hub-name"

Event Hubs 호환 엔드포인트를 가져오는 방법에는 두 가지가 있습니다.

  • Azure Portal에서 IoT Hub "기본 제공 엔드포인트"를 수동으로 가져와서 수신합니다.
from azure.eventhub import EventHubConsumerClient

connection_str = 'Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-iot-hub-name'
consumer_group = '<< CONSUMER GROUP >>'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group)

partition_ids = client.get_partition_ids()

문제 해결

azure-eventhubs 다양한 오류 시나리오를 진단하는 방법에 대한 자세한 내용은 문제 해결 가이드를 참조하세요.

다음 단계

추가 샘플 코드

이 라이브러리를 사용하여 Event Hubs에 이벤트를 보내고 받는 방법에 대한 자세한 예제는 샘플 디렉터리를 참조하세요.

설명서

참조 설명서는 여기에서 확인할 수 있습니다.

스키마 레지스트리 및 Avro 인코더

EventHubs SDK는 스키마 레지스트리 서비스 및 Avro와 원활하게 통합됩니다. 자세한 내용은 스키마 레지스트리 SDK스키마 레지스트리 Avro 인코더 SDK를 참조하세요.

순수 Python AMQP 전송 및 이전 버전과의 호환성 지원

이제 Azure Event Hubs 클라이언트 라이브러리는 순수 Python AMQP 구현을 기반으로 합니다. uAMQP 가 필수 종속성으로 제거되었습니다.

기본 전송으로 사용하려면 다음을 수행 uAMQP 합니다.

  1. pip를 사용하여 설치 uamqp 합니다.
$ pip install uamqp 
  1. 클라이언트를 생성하는 동안 전달 uamqp_transport=True 합니다.
from azure.eventhub import EventHubProducerClient, EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

client = EventHubProducerClient.from_connection_string(
    connection_str, eventhub_name=eventhub_name, uamqp_transport=True
)
client = EventHubConsumerClient.from_connection_string(
    connection_str, consumer_group, eventhub_name=eventhub_name, uamqp_transport=True
)

참고: message 이전에 을EventDataBatchEventData/ 노출uamqp.Message한 의 특성은 더 이상 사용되지 않습니다. 에서 반환된 EventData.message/EventDataBatch.message "레거시" 개체는 전환을 용이하게 하기 위해 도입되었습니다.

원본에서 uAMQP 휠 빌드

uAMQP를 의 기본 AMQP 프로토콜 구현azure-eventhub으로 사용하려는 경우 대부분의 주요 운영 체제에서 uAMQP 휠을 찾을 수 있습니다.

사용 uAMQP 하려는 경우 uAMQP 휠이 제공되지 않는 플랫폼에서 실행 중인 경우 uAMQP 설치 지침에 따라 원본에서 설치하세요.

피드백 제공

버그가 발생하거나 제안이 있는 경우 프로젝트의 문제 섹션에 문제를 제출하세요.

참여

이 프로젝트에 대한 기여와 제안을 환영합니다. 대부분의 경우 기여하려면 권한을 부여하며 실제로 기여를 사용할 권한을 당사에 부여한다고 선언하는 CLA(기여자 라이선스 계약)에 동의해야 합니다. 자세한 내용은 https://cla.microsoft.com 을 참조하세요.

끌어오기 요청을 제출하면 CLA-bot은 CLA를 제공하고 PR을 적절하게 데코레이팅해야 하는지 여부를 자동으로 결정합니다(예: 레이블, 설명). 봇에서 제공하는 지침을 따르기만 하면 됩니다. 이 작업은 CLA를 사용하여 모든 리포지토리에서 한 번만 수행하면 됩니다.

이 프로젝트에는 Microsoft Open Source Code of Conduct(Microsoft 오픈 소스 준수 사항)가 적용됩니다. 자세한 내용은 Code of Conduct FAQ(규정 FAQ)를 참조하세요. 또는 추가 질문이나 의견은 opencode@microsoft.com으로 문의하세요.

Impressions