Python용 Azure EventHubs 검사점 저장소 클라이언트 라이브러리 - 버전 1.1.4

Storage Blob 사용

Azure EventHubs 검사점 저장소는 Azure Event Hubs 이벤트를 처리하는 동안 검사점을 저장하는 데 사용됩니다. 이 검사점 저장소 패키지는 에 대한 플러그 인 패키지로 작동합니다 EventHubConsumerClient. 검사점 및 파티션 소유권 정보를 유지 관리하기 위한 영구 저장소로 Azure Storage Blob을 사용합니다.

Azure EventHubs 검사점 저장소 클라이언트 라이브러리의 동기화 버전에 대한 비동기 라이브러리는 azure-eventhub-checkpointstoreblob을 참조하세요.

소스 코드 | 패키지(PyPi) | API 참조 설명서 | Azure Eventhubs 설명서 | Azure Storage 설명서

시작

필수 조건

  • Python 3.6 이상.

  • Microsoft Azure 구독: Azure Event Hubs 포함하여 Azure 서비스를 사용하려면 구독이 필요합니다. 기존 Azure 계정이 없는 경우 무료 평가판에 등록하거나 계정을 만들 때 MSDN 구독자 혜택을 사용할 수 있습니다.

  • Event Hubs 네임스페이스(Event Hubs)를 사용하여 다음을 수행합니다. Azure Event Hubs 상호 작용하려면 네임스페이스와 Event Hub를 사용할 수 있어야 합니다. Azure 리소스를 만드는 데 익숙하지 않은 경우 Azure Portal 사용하여 Event Hub를 만들기 위한 단계별 가이드를 따를 수 있습니다. 또한 Azure CLI, Azure PowerShell 또는 ARM(Azure Resource Manager) 템플릿을 사용하여 이벤트 허브를 만들기 위한 자세한 지침을 찾을 수 있습니다.

  • Azure Storage 계정: Azure Storage 계정이 있고 blob과 함께 검사점 데이터를 저장하기 위한 Azure Blob Storage 블록 컨테이너를 만들어야 합니다. Azure 블록 Blob Storage 계정 만들기 가이드를 따를 수 있습니다.

패키지 설치

$ pip install azure-eventhub-checkpointstoreblob-aio

주요 개념

검사점 설정

검사점 은 판독기가 파티션 이벤트 시퀀스 내에서 위치를 표시하거나 커밋하는 프로세스입니다. 검사점은 소비자의 책임으로 소비자 그룹 내에서 파티션별로 발생합니다. 이러한 책임은 각 소비자 그룹에 대해 각 파티션 판독기는 이벤트 스트림의 현재 위치를 추적해야 하며 데이터 스트림이 완료된 것으로 간주되면 서비스를 알릴 수 있다는 것을 의미합니다. 판독기가 파티션에서 연결을 끊은 경우 다시 연결하면 해당 소비자 그룹에서 해당 파티션의 마지막 판독기에서 이전에 제출한 검사점에서 읽기 시작합니다. 판독기가 연결하면, 오프셋을 이벤트 허브로 전달하여 읽기 시작할 위치를 지정합니다. 이러한 방식으로, 서로 다른 머신에서 실행되는 판독기 간의 장애 조치(failover)가 발생하는 경우 복원력을 제공하고 다운스트림 애플리케이션에서 이벤트를 "완료"로 표시하는 데 검사점을 사용할 수 있습니다. 이 검사점 프로세스에서 더 낮은 오프셋을 지정하면 이전 데이터로 돌아갈 수 있습니다. 이 메커니즘을 통해 검사점을 지정하면 장애 조치 복원력 및 제어된 이벤트 스트림 재생 모두를 사용할 수 있습니다.

시퀀스 & 번호 오프셋

두 오프셋 & 시퀀스 번호는 파티션 내의 이벤트 위치를 나타냅니다. 클라이언트 쪽 커서로 생각할 수 있습니다. 오프셋은 이벤트의 바이트 번호입니다. 오프셋/시퀀스 번호를 사용하면 이벤트 소비자(판독기)가 이벤트 읽기를 시작할 이벤트 스트림의 지점을 지정할 수 있습니다. 지정된 타임스탬프 후에만 큐에 포함된 이벤트를 수신할 수 있도록 타임스탬프를 지정할 수 있습니다. 소비자는 Event Hubs 서비스 외부에 자신의 오프셋 값을 저장하는 일을 담당합니다. 파티션 내에서 각 이벤트에는 오프셋, 시퀀스 번호 및 큐에 추가된 시간의 타임스탬프가 포함됩니다.

예제

EventHubConsumerClient 만들기

를 만드는 EventHubConsumerClient 가장 쉬운 방법은 연결 문자열을 사용하는 것입니다.

from azure.eventhub.aio import EventHubConsumerClient
eventhub_client = EventHubConsumerClient.from_connection_string("my_eventhub_namespace_connection_string", "my_consumer_group", eventhub_name="my_eventhub")

EventHubConsumerClient만드는 다른 방법은 EventHubs 라이브러리 를 참조하세요.

를 사용하여 이벤트를 사용하여 BlobCheckpointStore 검사점 사용

import asyncio

from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
storage_connection_str = '<< CONNECTION STRING OF THE STORAGE >>'
container_name = '<< STORAGE CONTAINER NAME>>'

async def on_event(partition_context, event):
    # Put your code here.
    await partition_context.update_checkpoint(event)  # Or update_checkpoint every N events for better performance.

async def main():
    checkpoint_store = BlobCheckpointStore.from_connection_string(
        storage_connection_str,
        container_name
    )
    client = EventHubConsumerClient.from_connection_string(
        connection_str,
        consumer_group,
        eventhub_name=eventhub_name,
        checkpoint_store=checkpoint_store,
    )

    async with client:
        await client.receive(on_event)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

다른 버전의 Azure Storage Service API와 함께 사용 BlobCheckpointStore

일부 환경에는 서로 다른 버전의 Azure Storage Service API가 있습니다. BlobCheckpointStore 기본적으로 Storage Service API 버전 2019-07-07을 사용합니다. 다른 버전에 대해 사용하려면 개체를 만들 때를 지정 api_version 합니다 BlobCheckpointStore .

문제 해결

일반

로깅을 사용하도록 설정하면 문제 해결에 도움이 됩니다.

로깅

  • 로거가 라이브러리에서 추적을 수집하도록 설정합니다 azure.eventhub.extensions.checkpointstoreblobaio .
  • 로거가 기본 azure-eventhub 라이브러리에서 추적을 수집하도록 설정합니다 azure.eventhub .
  • 로거가 Azure Storage Blob 라이브러리에서 추적을 수집하도록 설정합니다 azure.eventhub.extensions.checkpointstoreblobaio._vendor.storage .
  • 로거가 기본 uAMQP 라이브러리에서 추적을 수집하도록 설정합니다 uamqp .
  • 클라이언트를 만들 때 를 설정하여 AMQP 프레임 수준 추적을 사용하도록 설정합니다 logging_enable=True .

다음 단계

추가 샘플 코드

EventHubs 검사점 저장소 비동기 샘플을 시작합니다.

설명서

참조 설명서는 여기에서 확인할 수 있습니다.

피드백 제공

버그가 발생하거나 제안이 있는 경우 프로젝트의 문제 섹션에 문제를 제출하세요.

참여

이 프로젝트에 대한 기여와 제안을 환영합니다. 대부분의 경우 기여하려면 권한을 부여하며 실제로 기여를 사용할 권한을 당사에 부여한다고 선언하는 CLA(기여자 라이선스 계약)에 동의해야 합니다. 자세한 내용은 https://cla.microsoft.com 을 참조하세요.

끌어오기 요청을 제출하면 CLA-bot은 CLA를 제공하고 PR을 적절하게 데코레이팅해야 하는지 여부를 자동으로 결정합니다(예: 레이블, 설명). 봇에서 제공하는 지침을 따르기만 하면 됩니다. 이 작업은 CLA를 사용하여 모든 리포지토리에서 한 번만 수행하면 됩니다.

이 프로젝트에는 Microsoft Open Source Code of Conduct(Microsoft 오픈 소스 준수 사항)가 적용됩니다. 자세한 내용은 Code of Conduct FAQ(규정 FAQ)를 참조하세요. 또는 추가 질문이나 의견은 opencode@microsoft.com으로 문의하세요.

Impressions