다음을 통해 공유


스트리밍 데이터 쿼리

Azure Databricks를 사용하여 구조적 스트리밍을 사용하여 스트리밍 데이터 원본을 쿼리할 수 있습니다. Azure Databricks는 Python 및 Scala의 스트리밍 워크로드에 대한 광범위한 지원을 제공하며 SQL을 사용하여 대부분의 구조적 스트리밍 기능을 지원합니다.

다음 예제에서는 Notebook에서 대화형으로 개발하는 동안 스트리밍 데이터를 수동으로 검사하기 위해 메모리 싱크를 사용하는 방법을 보여 줍니다. Notebook UI의 행 출력 제한으로 인해 스트리밍 쿼리에서 읽은 모든 데이터를 관찰하지 못할 수 있습니다. 프로덕션 워크로드에서는 대상 테이블 또는 외부 시스템에 쿼리를 작성하여 스트리밍 쿼리만 트리거해야 합니다.

참고 항목

스트리밍 데이터에 대한 대화형 쿼리에 대한 SQL 지원은 다목적 컴퓨팅에서 실행되는 Notebook으로 제한됩니다. Databricks SQL 또는 Delta Live Tables에서 스트리밍 테이블을 선언할 때 SQL을 사용할 수도 있습니다. Databricks SQL에서 스트리밍 테이블을 사용하여 데이터 로드 및 델타 라이브 테이블이란?을 참조하세요.

스트리밍 시스템에서 데이터 쿼리

Azure Databricks는 다음 스트리밍 시스템에 대한 스트리밍 데이터 판독기를 제공합니다.

  • Kafka
  • Kinesis
  • PubSub
  • Pulsar

이러한 시스템에 대해 쿼리를 초기화할 때 구성 세부 정보를 제공해야 합니다. 이는 구성된 환경 및 읽도록 선택한 시스템에 따라 달라집니다. 스트리밍 데이터 원본 구성을 참조 하세요.

스트리밍 시스템과 관련된 일반적인 워크로드에는 레이크하우스에 대한 데이터 수집 및 외부 시스템에 데이터를 싱크하는 스트림 처리가 포함됩니다. 스트리밍 워크로드에 대한 자세한 내용은 Azure Databricks의 스트리밍을 참조 하세요.

다음 예제에서는 Kafka에서 읽은 대화형 스트리밍을 보여 줍니다.

Python

display(spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()
)

SQL

SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<server:ip>',
  subscribe => '<topic>',
  startingOffsets => 'latest'
);

스트리밍 읽기로 테이블 쿼리

Azure Databricks는 기본적으로 Delta Lake를 사용하여 모든 테이블을 만듭니다. 델타 테이블에 대해 스트리밍 쿼리를 수행하면 테이블 버전이 커밋되면 쿼리가 자동으로 새 레코드를 선택합니다. 기본적으로 스트리밍 쿼리는 원본 테이블에 추가된 레코드만 포함해야 합니다. 업데이트 및 삭제가 포함된 스트리밍 데이터를 사용해야 하는 경우 Databricks는 Delta Live Tables 및 APPLY CHANGES INTO를 사용하는 것이 좋습니다. APPLY CHANGES API: Delta Live Tables에서 변경 데이터 캡처 간소화를 참조하세요.

다음 예제에서는 테이블에서 읽은 대화형 스트리밍을 수행하는 방법을 보여 줍니다.

Python

display(spark.readStream.table("table_name"))

SQL

SELECT * FROM STREAM table_name

자동 로더를 사용하여 클라우드 개체 스토리지의 데이터 쿼리

Azure Databricks 클라우드 데이터 커넥터인 자동 로더를 사용하여 클라우드 개체 스토리지에서 데이터를 스트리밍할 수 있습니다. 커넥터를 Unity 카탈로그 볼륨 또는 다른 클라우드 개체 스토리지 위치에 저장된 파일과 함께 사용할 수 있습니다. Databricks는 볼륨을 사용하여 클라우드 개체 스토리지의 데이터에 대한 액세스를 관리하는 것이 좋습니다. 데이터 원본에 대한 커넥트 참조하세요.

Azure Databricks는 인기 있는 구조적, 반구조적 및 구조화되지 않은 형식으로 저장된 클라우드 개체 스토리지의 데이터 스트리밍 수집을 위해 이 커넥터를 최적화합니다. Databricks는 수집된 데이터를 거의 원시 형식으로 저장하여 처리량을 최대화하고 손상된 레코드 또는 스키마 변경으로 인한 잠재적인 데이터 손실을 최소화하는 것이 좋습니다.

클라우드 개체 스토리지에서 데이터를 수집하는 데 대한 자세한 권장 사항은 Databricks Lakehouse로 데이터 수집을 참조 하세요.

다음 예제에서는 볼륨의 JSON 파일 디렉터리에서 읽은 대화형 스트리밍을 보여 줍니다.

Python

display(spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").load("/Volumes/catalog/schema/volumes/path/to/files"))

SQL

SELECT * FROM STREAM read_files('/Volumes/catalog/schema/volumes/path/to/files', format => 'json')