Python을 사용하여 Event Hubs로 이벤트 보내기 또는 받기

이 빠른 시작에서는 azure-eventhubs Python 패키지를 사용하여 이벤트 허브와 이벤트를 주고 받는 방법을 보여줍니다.

필수 조건

Azure Event Hubs를 새로 사용하는 경우 이 빠른 시작을 하기 전에 Event Hubs 개요를 참조하세요.

이 빠른 시작을 완료하려면 다음 필수 구성 요소가 필요합니다.

  • Microsoft Azure 구독. Azure Event Hubs를 비롯한 Azure 서비스를 사용하려면 구독이 필요합니다. 기존 Azure 계정이 없으면 무료 평가판에 등록합니다.
  • pip가 설치되고 업데이트된 Python 3.8 이상.
  • Visual Studio Code(권장) 또는 기타 IDE(통합 개발 환경)입니다.
  • Event Hubs 네임스페이스 및 이벤트 허브 만들기 1단계에서는 Azure Portal을 사용하여 Event Hubs 네임스페이스를 만들고 애플리케이션이 이벤트 허브와 통신하는 데 필요한 관리 자격 증명을 얻습니다. 네임스페이스 및 이벤트 허브를 만들려면 이 문서의 절차를 따릅니다.

이벤트를 보낼 패키지 설치

Event Hubs용 Python 패키지를 설치하려면 해당 경로에 Python이 있는 명령 프롬프트를 엽니다. 샘플을 보관할 폴더로 디렉터리를 변경합니다.

pip install azure-eventhub
pip install azure-identity
pip install aiohttp

Azure에 앱 인증

이 빠른 시작에서는 Azure Event Hubs에 연결하는 두 가지 방법, 즉 암호 없는 방법과 연결 문자열 보여 줍니다. 첫 번째 옵션은 Microsoft Entra ID 및 RBAC(역할 기반 액세스 제어)에서 보안 주체를 사용하여 Event Hubs 네임스페이스에 연결하는 방법을 보여 줍니다. 코드 또는 구성 파일 또는 Azure Key Vault와 같은 보안 스토리지에 하드 코딩된 연결 문자열 있는 것에 대해 걱정할 필요가 없습니다. 두 번째 옵션은 연결 문자열을 사용하여 Event Hubs 네임스페이스에 연결하는 방법을 보여 줍니다. Azure를 접하는 경우 연결 문자열 옵션을 더 쉽게 따를 수 있습니다. 실제 애플리케이션 및 프로덕션 환경에서 암호 없는 옵션을 사용하는 것이 좋습니다. 자세한 내용은 인증 및 권한 부여를 참조하세요. 개요 페이지에서 암호 없는 인증에 대한 자세한 내용을 확인할 수도 있습니다.

Microsoft Entra 사용자에게 역할 할당

로컬로 개발할 때 Azure Event Hubs에 연결하는 사용자 계정에 올바른 권한이 있는지 확인합니다. 메시지를 보내고 받으려면 Azure Event Hubs 데이터 소유자 역할이 필요합니다. 이 역할을 자신에게 할당하려면 사용자 액세스 관리주체 역할 또는 작업을 포함하는 Microsoft.Authorization/roleAssignments/write 다른 역할이 필요합니다. Azure Portal, Azure CLI 또는 Azure PowerShell을 사용하여 사용자에게 Azure RBAC 역할을 할당할 수 있습니다. 범위 개요 페이지에서 역할 할당에 사용할 수 있는 범위에 대해 자세히 알아봅니다.

다음 예제에서는 Azure Event Hubs Azure Event Hubs Data Owner 리소스에 대한 모든 권한을 제공하는 사용자 계정에 역할을 할당합니다. 실제 시나리오에서는 최소 권한 원칙에 따라 사용자에게 보다 안전한 프로덕션 환경에 필요한 최소 권한만 부여합니다.

Azure Event Hubs에 대한 Azure 기본 제공 역할

Azure Event Hubs의 경우 Azure Portal 및 Azure 리소스 관리 API를 통한 네임스페이스 및 모든 관련 리소스의 관리는 이미 Azure RBAC 모델을 사용하여 보호됩니다. Azure는 Event Hubs 네임스페이스에 대한 액세스 권한을 부여하기 위한 아래 Azure 기본 제공 역할을 제공합니다.

  • Azure Event Hubs 데이터 소유자: Event Hubs 네임스페이스 및 해당 엔터티(큐, 토픽, 구독 및 필터)에 대한 데이터 액세스를 사용하도록 설정합니다.
  • Azure Event Hubs 데이터 발신자: 이 역할을 사용하여 보낸 사람에게 Event Hubs 네임스페이스 및 해당 엔터티에 대한 액세스 권한을 부여합니다.
  • Azure Event Hubs 데이터 수신기: 이 역할을 사용하여 수신기에 Event Hubs 네임스페이스 및 해당 엔터티에 대한 액세스 권한을 부여합니다.

사용자 지정 역할을 만들려면 Event Hubs 작업에 필요한 권한을 참조 하세요.

Important

대부분의 경우 역할 할당이 Azure에서 전파되는 데 1~2분이 걸립니다. 드문 경우지만 최대 8분이 소요될 수 있습니다. 코드를 처음 실행할 때 인증 오류가 발생하면 잠시 기다렸다가 다시 시도하세요.

  1. Azure Portal에서 기본 검색 창 또는 왼쪽 탐색을 사용하여 Event Hubs 네임스페이스를 찾습니다.

  2. 개요 페이지의 왼쪽 메뉴에서 액세스 제어(IAM)를 선택합니다.

  3. 액세스 제어(IAM) 페이지에서 역할 할당 탭을 선택합니다.

  4. 위쪽 메뉴에서 + 추가를 선택한 다음, 드롭다운 메뉴에서 역할 할당 추가를 선택합니다.

    A screenshot showing how to assign a role.

  5. 검색 상자를 사용하여 결과를 원하는 역할로 필터링합니다. 이 예제에서는 일치하는 결과를 검색 Azure Event Hubs Data Owner 하여 선택합니다. 다음을 선택합니다.

  6. 다음에 대한 액세스 할당 아래에서 사용자, 그룹 또는 서비스 주체를 선택한 다음, + 멤버 선택을 선택합니다.

  7. 대화 상자에서 Microsoft Entra 사용자 이름(일반적으로 user@do기본 전자 메일 주소)을 검색한 다음 대화 상자 아래쪽에서 선택을 선택합니다.

  8. 검토 + 할당을 선택하여 최종 페이지로 이동한 다음, 검토 + 할당을 다시 선택하여 프로세스를 완료합니다.

이벤트 보내기

이 섹션에서는 이전에 만든 이벤트 허브에 이벤트를 보내는 Python 스크립트를 만듭니다.

  1. 선호하는 Python 편집기(예: Visual Studio Code)를 엽니다.

  2. send.py 스크립트를 만듭니다. 이 스크립트는 이전에 만든 이벤트 허브에 이벤트 일괄 처리를 보냅니다.

  3. 다음 코드를 send.py 붙여넣습니다.

    코드에서 실제 값을 사용하여 다음 자리 표시자를 대체합니다.

    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE
    • EVENT_HUB_NAME
    import asyncio
    
    from azure.eventhub import EventData
    from azure.eventhub.aio import EventHubProducerClient
    from azure.identity.aio import DefaultAzureCredential
    
    EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
    EVENT_HUB_NAME = "EVENT_HUB_NAME"
    
    credential = DefaultAzureCredential()
    
    async def run():
        # Create a producer client to send messages to the event hub.
        # Specify a credential that has correct role assigned to access
        # event hubs namespace and the event hub name.
        producer = EventHubProducerClient(
            fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            credential=credential,
        )
        async with producer:
            # Create a batch.
            event_data_batch = await producer.create_batch()
    
            # Add events to the batch.
            event_data_batch.add(EventData("First event "))
            event_data_batch.add(EventData("Second event"))
            event_data_batch.add(EventData("Third event"))
    
            # Send the batch of events to the event hub.
            await producer.send_batch(event_data_batch)
    
            # Close credential when no longer needed.
            await credential.close()
    
    asyncio.run(run())
    

    참고 항목

    연결 문자열을 사용하여 Event Hub에 이벤트를 비동기적으로 보내는 다른 옵션의 예는 GitHub send_async.py 페이지를 참조하세요. 여기에 표시된 패턴은 암호 없이 이벤트를 보내는 데에도 적용할 수 있습니다.

이벤트 수신

이 빠른 시작에서는 검사점 저장소로 Azure Blob 스토리지를 사용합니다. 검사포인트 저장소는 검사포인트(즉, 마지막 읽기 위치)를 유지하는 데 사용됩니다.

Azure Blob Storage를 검사점 저장소로 사용할 때 다음 권장 사항을 따릅니다.

  • 각 소비자 그룹에 대해 별도의 컨테이너를 사용합니다. 동일한 스토리지 계정을 사용할 수 있지만 각 그룹당 하나의 컨테이너를 사용합니다.
  • 컨테이너를 다른 용도로 사용하지 말고 스토리지 계정을 다른 용도로 사용하지 마세요.
  • 스토리지 계정은 배포된 애플리케이션이 있는 지역과 동일한 지역에 있어야 합니다. 애플리케이션이 온-프레미스인 경우 가능한 가장 가까운 지역을 선택해 보세요.

Azure Portal에서 Storage 계정 페이지의 Blob service 섹션에서 다음 설정을 사용하지 않도록 설정해야 합니다.

  • 계층 구조 네임스페이스
  • Blob 일시 삭제
  • 버전 관리

Azure Storage 계정 및 Blob 컨테이너 만들기

다음 단계를 수행하여 Azure Storage 계정 및 Blob 컨테이너를 만듭니다.

  1. Azure Storage 계정 만들기
  2. Blob 컨테이너를 만듭니다.
  3. Blob 컨테이너에 인증

나중에 수신 코드에 사용할 수 있도록 연결 문자열과 컨테이너 이름을 기록해 두어야 합니다.

로컬로 개발하는 경우 Blob 데이터에 액세스하는 사용자 계정에 올바른 권한이 있는지 확인합니다. Blob 데이터를 읽고 쓰려면 Storage Blob 데이터 참가자가 필요합니다. 이 역할을 자신에게 할당하려면 사용자 액세스 관리자 역할 또는 Microsoft.Authorization/roleAssignments/write 작업을 포함하는 다른 역할이 필요합니다. Azure Portal, Azure CLI 또는 Azure PowerShell을 사용하여 사용자에게 Azure RBAC 역할을 할당할 수 있습니다. 범위 개요 페이지에서 역할 할당에 사용할 수 있는 범위에 대해 자세히 알아볼 수 있습니다.

이 시나리오에서는 최소 권한 원칙을 따르기 위해 범위가 스토리지 계정으로 지정된 사용자 계정에 권한을 할당합니다. 이 방법은 사용자에게 필요한 최소 권한만 부여하고 더 안전한 프로덕션 환경을 만듭니다.

다음 예제에서는 스토리지 계정의 Blob 데이터에 대한 읽기 및 쓰기 액세스를 모두 제공하는 Storage Blob 데이터 참가자 역할을 사용자 계정에 할당합니다.

Important

대부분의 경우 Azure에서 역할 할당이 전파되는 데 1~2분이 걸리지만 드문 경우이지만 최대 8분이 걸릴 수 있습니다. 코드를 처음 실행할 때 인증 오류가 발생하면 잠시 기다렸다가 다시 시도하세요.

  1. Azure Portal에서 기본 검색 창 또는 왼쪽 탐색 영역을 사용하여 스토리지 계정을 찾습니다.

  2. 스토리지 계정 개요 페이지의 왼쪽 메뉴에서 액세스 제어(IAM)를 선택합니다.

  3. 액세스 제어(IAM) 페이지에서 역할 할당 탭을 선택합니다.

  4. 위쪽 메뉴에서 + 추가를 선택한 다음, 드롭다운 메뉴에서 역할 할당 추가를 선택합니다.

    A screenshot showing how to assign a storage account role.

  5. 검색 상자를 사용하여 결과를 원하는 역할로 필터링합니다. 이 예에서는 Storage Blob 데이터 기여자를 검색하고, 일치하는 결과를 선택하고, 다음을 선택합니다.

  6. 다음에 대한 액세스 할당 아래에서 사용자, 그룹 또는 서비스 주체를 선택한 다음, + 멤버 선택을 선택합니다.

  7. 대화 상자에서 Microsoft Entra 사용자 이름(일반적으로 user@do기본 전자 메일 주소)을 검색한 다음 대화 상자 아래쪽에서 선택을 선택합니다.

  8. 검토 + 할당을 선택하여 최종 페이지로 이동한 다음, 검토 + 할당을 다시 선택하여 프로세스를 완료합니다.

이벤트를 수신할 패키지 설치

수신측에서는 패키지를 하나 이상 설치해야 합니다. 이 빠른 시작에서는 프로그램이 이미 읽은 이벤트를 다시 읽지 않도록 Azure Blob 스토리지를 사용하여 검사점을 유지합니다. Blob에서 정기적으로 수신된 메시지에 대한 메타데이터 검사포인트를 수행합니다. 이 방법을 사용하면 나중에 중단된 위치에서 메시지를 계속 쉽게 받을 수 있습니다.

pip install azure-eventhub-checkpointstoreblob-aio
pip install azure-identity

이벤트를 수신하는 Python 스크립트 만들기

이 섹션에서는 이벤트 허브에서 이벤트를 수신하는 Python 스크립트를 만듭니다.

  1. 선호하는 Python 편집기(예: Visual Studio Code)를 엽니다.

  2. recv.py 스크립트를 만듭니다.

  3. 다음 코드를 recv.py에 붙여넣습니다.

    코드에서 실제 값을 사용하여 다음 자리 표시자를 대체합니다.

    • BLOB_STORAGE_ACCOUNT_URL
    • BLOB_CONTAINER_NAME
    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE
    • EVENT_HUB_NAME
    import asyncio
    
    from azure.eventhub.aio import EventHubConsumerClient
    from azure.eventhub.extensions.checkpointstoreblobaio import (
        BlobCheckpointStore,
    )
    from azure.identity.aio import DefaultAzureCredential
    
    BLOB_STORAGE_ACCOUNT_URL = "BLOB_STORAGE_ACCOUNT_URL"
    BLOB_CONTAINER_NAME = "BLOB_CONTAINER_NAME"
    EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
    EVENT_HUB_NAME = "EVENT_HUB_NAME"
    
    credential = DefaultAzureCredential()
    
    async def on_event(partition_context, event):
        # Print the event data.
        print(
            'Received the event: "{}" from the partition with ID: "{}"'.format(
                event.body_as_str(encoding="UTF-8"), partition_context.partition_id
            )
        )
    
        # Update the checkpoint so that the program doesn't read the events
        # that it has already read when you run it next time.
        await partition_context.update_checkpoint(event)
    
    
    async def main():
        # Create an Azure blob checkpoint store to store the checkpoints.
        checkpoint_store = BlobCheckpointStore(
            blob_account_url=BLOB_STORAGE_ACCOUNT_URL,
            container_name=BLOB_CONTAINER_NAME,
            credential=credential,
        )
    
        # Create a consumer client for the event hub.
        client = EventHubConsumerClient(
            fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            consumer_group="$Default",
            checkpoint_store=checkpoint_store,
            credential=credential,
        )
        async with client:
            # Call the receive method. Read from the beginning of the partition
            # (starting_position: "-1")
            await client.receive(on_event=on_event, starting_position="-1")
    
        # Close credential when no longer needed.
        await credential.close()
    
    if __name__ == "__main__":
        # Run the main method.
        asyncio.run(main())
    

    참고 항목

    연결 문자열을 사용하여 Event Hub에서 이벤트를 비동기적으로 수신하는 다른 옵션의 예는 GitHub recv_with_checkpoint_store_async.py 페이지를 참조하세요. 여기에 표시된 패턴은 암호 없이 이벤트를 수신하는 데에도 적용할 수 있습니다.

수신기 앱 실행

스크립트를 실행하려면 해당 경로에 Python이 있는 명령 프롬프트를 열고 다음 명령을 실행합니다.

python recv.py

보낸 사람 앱 실행

스크립트를 실행하려면 해당 경로에 Python이 있는 명령 프롬프트를 열고 다음 명령을 실행합니다.

python send.py

수신자 창에 이벤트 허브로 전송된 메시지가 표시됩니다.

문제 해결

수신기 창에 이벤트가 표시되지 않거나 코드에서 오류를 보고하는 경우 다음 문제 해결 팁을 시도해 보세요.

  • recy.py의 결과가 표시되지 않으면 send.py를 여러 번 실행합니다.

  • 암호 없는 코드(자격 증명 포함)를 사용할 때 "코루틴"에 대한 오류가 표시되는 경우 azure.identity.aio에서 가져오기를 사용하고 있는지 확인합니다.

  • 암호 없는 코드(자격 증명 포함)가 있는 "닫히지 않은 클라이언트 세션"이 표시되는 경우 완료되면 자격 증명을 닫아야 합니다. 자세한 내용은 비동기 자격 증명을 참조하세요.

  • 스토리지에 액세스할 때 recv.py에 권한 부여 오류가 표시되면 Azure 스토리지 계정 및 Blob 컨테이너 만들기의 단계를 수행하고 Storage Blob 데이터 기여자 역할을 서비스 주체에 할당했는지 확인합니다.

  • 파티션 ID가 다른 이벤트를 수신하는 경우 이런 결과가 예상됩니다. 파티션은 애플리케이션을 사용하는 데 필요한 다운스트림 병렬 처리와 관련된 데이터 조직 메커니즘입니다. Event Hub의 파티션 수는 예상되는 동시 판독기의 수와 직접적으로 관련이 있습니다. 자세한 내용은 파티션에 대해 자세히 알아보기를 참조하세요.

다음 단계

이 빠른 시작에서는 이벤트를 비동기적으로 보내고 받았습니다. 이벤트를 동기적으로 보내고 받는 방법을 알아보려면 GitHub sync_samples 페이지로 이동하세요.

GitHub의 모든 샘플(동기 및 비동기)의 경우 Python 샘플Azure Event Hubs 클라이언트 라이브러리로 이동합니다.