다음을 통해 공유


Google Pub/Sub 구독

Azure Databricks는 Databricks Runtime 13.3 LTS 이상에서 Google Pub/Sub를 구독하는 기본 제공 커넥터를 제공합니다. 이 커넥터는 구독자의 레코드에 대해 정확히 한 번 처리 의미 체계를 제공합니다.

참고 항목

Pub/Sub는 중복 레코드를 게시할 수 있으며 레코드가 구독자에게 순서대로 도착할 수 있습니다. 중복된 레코드와 순서가 다른 레코드를 처리하려면 Azure Databricks 코드를 작성해야 합니다.

구문 예제

다음 코드 예제에서는 Pub/Sub에서 읽은 구조적 스트리밍을 구성하기 위한 기본 구문을 보여 줍니다.

val authOptions: Map[String, String] =
  Map("clientId" -> clientId,
      "clientEmail" -> clientEmail,
      "privateKey" -> privateKey,
      "privateKeyId" -> privateKeyId)

val query = spark.readStream
  .format("pubsub")
  // we will create a Pubsub subscription if none exists with this id
  .option("subscriptionId", "mysub") // required
  .option("topicId", "mytopic") // required
  .option("projectId", "myproject") // required
  .options(authOptions)
  .load()

자세한 구성 옵션은 Pub/Sub 스트리밍 읽기에 대한 구성 옵션을 참조 하세요.

Pub/Sub에 대한 액세스 구성

Databricks는 권한 부여 옵션을 제공할 때 비밀을 사용하는 것이 좋습니다. 연결 권한을 부여하려면 다음 옵션이 필요합니다.

  • clientEmail
  • clientId
  • privateKey
  • privateKeyId

다음 표에서는 구성된 자격 증명에 필요한 역할을 설명합니다.

Roles 필수 또는 선택 사항 사용 방법
roles/pubsub.viewer 또는 roles/viewer Required 구독이 있는지 확인하고 구독 가져오기
roles/pubsub.subscriber Required 구독에서 데이터 가져오기
roles/pubsub.editor 또는 roles/editor 선택 사항 구독이 없는 경우 구독을 만들 수 있으며 deleteSubscriptionOnStreamStop 스트림 종료 시 구독을 삭제할 수도 있습니다.

Pub/Sub 스키마

스트림의 스키마는 다음 표에 설명된 대로 Pub/Sub에서 가져온 레코드와 일치합니다.

필드 Type
messageId StringType
payload ArrayType[ByteType]
attributes StringType
publishTimestampInMillis LongType

Pub/Sub 스트리밍 읽기에 대한 옵션 구성

다음 표에서는 Pub/Sub에 지원되는 옵션에 대해 설명합니다. 모든 옵션은 구문을 사용하여 .option("<optionName>", "<optionValue>") 구조적 스트리밍 읽기의 일부로 구성됩니다.

참고 항목

일부 Pub/Sub 구성 옵션은 마이크로 일괄 처리 대신 페치 개념을 사용합니다. 이는 내부 구현 세부 정보를 반영하며, 옵션은 레코드가 페치된 다음 처리된다는 점을 제외하고 다른 구조적 스트리밍 커넥터의 corollaries와 유사하게 작동합니다.

옵션 기본값 설명
numFetchPartitions 스트림 초기화에 있는 실행기 수의 절반으로 설정합니다. 구독에서 레코드를 가져오는 병렬 Spark 작업의 수입니다.
deleteSubscriptionOnStreamStop false 이 경우 true스트리밍 작업이 종료될 때 스트림에 전달된 구독이 삭제됩니다.
maxBytesPerTrigger 없음 트리거된 각 마이크로 일괄 처리 중에 처리할 일괄 처리 크기에 대한 소프트 제한입니다.
maxRecordsPerFetch 1000 레코드를 처리하기 전에 태스크당 가져올 레코드 수입니다.
maxFetchPeriod 10초 각 태스크가 레코드를 처리하기 전에 가져올 기간입니다. Databricks는 기본값을 사용하는 것이 좋습니다.

Pub/Sub에 대한 증분 일괄 처리 의미 체계

Pub/Sub 원본에서 사용 가능한 레코드를 증분 일괄 처리로 사용하는 데 사용할 Trigger.AvailableNow 수 있습니다.

Azure Databricks는 설정을 사용하여 읽기 Trigger.AvailableNow 를 시작할 때 타임스탬프를 기록합니다. 일괄 처리에서 처리되는 레코드에는 이전에 가져온 모든 데이터와 기록된 스트림 시작 타임스탬프보다 작은 타임스탬프가 있는 새로 게시된 레코드가 포함됩니다.

증분 일괄 처리 구성을 참조하세요.

스트리밍 메트릭 모니터링

구조적 스트리밍 진행률 메트릭은 가져오고 처리할 준비가 된 레코드 수, 가져오고 처리할 준비가 된 레코드의 크기, 스트림 시작 이후 표시되는 중복 횟수를 보고합니다. 다음은 이러한 메트릭의 예입니다.

"metrics" : {
  "numDuplicatesSinceStreamStart" : "1",
  "numRecordsReadyToProcess" : "1",
  "sizeOfRecordsReadyToProcess" : "8"
}

제한 사항

Pub/Sub에서 투기적 실행(spark.speculation)은 지원되지 않습니다.