Azure Databricks Kafka 커넥터는 Kafka에 연결하기 위한 여러 인증 방법을 지원합니다. 이 문서에서는 Databricks에서 가장 일반적인 인증 방법 중 일부를 다룹니다. 지원되는 인증 방법의 전체 목록은 Kafka 설명서에서 찾을 수 있습니다.
Microsoft Entra ID 및 Azure Event Hubs를 사용한 서비스 주체 인증
Azure Databricks는 Event Hubs 서비스를 사용한 Spark 작업의 인증을 지원합니다. 이 인증은 Microsoft Entra ID와 OAuth를 통해 이루어집니다.
Unity 카탈로그 서비스 자격 증명으로 연결
Databricks Runtime 16.1이 릴리스된 이후 Azure Databricks는 Azure Event Hubs용 AWS Managed Streaming에 대한 액세스를 인증하기 위한 Unity 카탈로그 서비스 자격 증명을 지원합니다. Databricks는 특히 공유 클러스터 또는 서버리스 컴퓨팅에서 Kafka 스트리밍을 실행할 때 이 방법을 권장합니다.
인증에 Unity 카탈로그 서비스 자격 증명을 사용하려면 다음 단계를 수행합니다.
- 새 Unity 카탈로그 서비스 자격 증명을 만듭니다. 이 프로세스에 익숙하지 않은 경우 만들기에 대한 지침은 서비스 자격 증명 만들기 를 참조하세요.
- 서비스 자격 증명에 연결된 액세스 커넥터에 Azure Event Hubs에 연결하는 데 필요한 권한이 있는지 확인합니다.
- Kafka 구성에서 원본 옵션으로 Unity 카탈로그 서비스 자격 증명의 이름을 제공합니다. 이 옵션을
databricks.serviceCredential서비스 자격 증명의 이름으로 설정합니다.
다음 예제에서는 서비스 자격 증명을 사용하여 Kafka를 원본으로 구성합니다.
파이썬
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
"subscribe": "<topic>",
"databricks.serviceCredential": "<service-credential-name>",
# Optional: set this only if Databricks can't infer the scope for your Kafka service.
# "databricks.serviceCredential.scope": "https://<event-hubs-server>/.default",
}
df = spark.readStream.format("kafka").options(**kafka_options).load()
Scala
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-hostname>:9092",
"subscribe" -> "<topic>",
"databricks.serviceCredential" -> "<service-credential-name>",
// Optional: set this only if Databricks can't infer the scope for your Kafka service.
// "databricks.serviceCredential.scope" -> "https://<event-hubs-server>/.default",
)
val df = spark.readStream.format("kafka").options(kafkaOptions).load()
SQL
SELECT * FROM read_kafka(
bootstrapServers => '<bootstrap-hostname>:9092',
subscribe => '<topic>',
serviceCredential => '<service-credential-name>'
);
참고: Unity 카탈로그 서비스 자격 증명을 사용하여 Kafka에 연결하는 경우 다음 옵션이 더 이상 필요하지 않습니다.
kafka.sasl.mechanismkafka.sasl.jaas.configkafka.security.protocolkafka.sasl.client.callback.handler.classkafka.sasl.oauthbearer.token.endpoint.url
클라이언트 ID 및 비밀로 연결
Azure Databricks는 다음 컴퓨팅 환경에서 클라이언트 ID 및 비밀을 사용하여 Microsoft Entra ID 인증을 지원합니다.
- Databricks Runtime 12.2 LTS 이상이 전용 액세스 모드(이전의 단일 사용자 액세스 모드)로 구성된 컴퓨팅 환경에서.
- Databricks Runtime 14.3 LTS 및 그 이상의 버전을 표준 액세스 모드(이전의 공유 액세스 모드)로 구성된 컴퓨팅에서 사용.
- Unity 카탈로그 없이 구성된 Lakeflow Spark 선언적 파이프라인입니다.
Azure Databricks는 인증서를 사용한 Microsoft Entra ID 인증을 컴퓨팅 환경이나 Unity Catalog로 구성된 Lakeflow Spark 선언적 파이프라인에서 지원하지 않습니다.
이 인증은 표준 액세스 모드를 사용하는 컴퓨팅 또는 Unity Catalog Lakeflow Spark 선언적 파이프라인에서 작동하지 않습니다.
Microsoft Entra ID로 인증을 수행하려면 다음 값이 있어야 합니다.
임차인 ID. Microsoft Entra ID 서비스 탭에서 찾을 수 있습니다.
clientID(응용 프로그램 ID라고도 함).
클라이언트 암호. 암호를 획득하면 이를 Databricks 작업 영역에 비밀로 추가해야 합니다. 이 비밀을 추가하려면 비밀 관리를 참조하세요.
EventHubs 토픽. 특정 Event Hubs 네임스페이스 페이지의 엔터티 섹션 아래 Event Hubs 섹션에서 주제 목록을 찾을 수 있습니다. 여러 주제를 사용하려면 Event Hubs 수준에서 IAM 역할을 설정할 수 있습니다.
EventHubs 서버. Event Hubs 네임스페이스의 개요 페이지에서 이 정보를 확인할 수 있습니다.
또한 Entra ID를 사용하려면 Kafka에 OAuth SASL 메커니즘을 사용하도록 지시해야 합니다 (SASL은 일반적인 프로토콜이며 OAuth는 SASL "메커니즘"의 한 유형임):
-
kafka.security.protocol은SASL_SSL이어야 합니다. -
kafka.sasl.mechanism은OAUTHBEARER이어야 합니다. -
kafka.sasl.login.callback.handler.class는 음영 처리된 Kafka 클래스의 로그인 콜백 처리기에 대해kafkashaded값을 갖는 Java 클래스의 정규화된 이름이어야 합니다. 정확한 클래스에 대한 내용은 아래 예제를 참조하세요.
다음 예제에서는 클라이언트 ID 및 비밀로 Microsoft Entra ID 인증을 사용하여 Azure Event Hubs에 연결하도록 Kafka를 구성합니다.
파이썬
# This is the only section you need to modify for auth purposes!
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")
event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------
sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'
kafka_options = {
"kafka.bootstrap.servers": f"{event_hubs_server}:9093", # Port 9093 is the EventHubs Kafka port
"kafka.sasl.jaas.config": sasl_config,
"kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
"subscribe": event_hubs_topic,
# You should not need to modify these
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}
df = spark.readStream.format("kafka").options(**kafka_options)
display(df)
Scala
// This is the only section you need to modify for auth purposes!
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")
val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------
val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> s"$eventHubsServer:9093", // Port 9093 is the EventHubs Kafka port
"kafka.sasl.jaas.config" -> saslConfig,
"kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
"subscribe" -> eventHubsTopic,
// You should not need to modify these
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)
val scalaDF = spark.readStream
.format("kafka")
.options(kafkaOptions)
.load()
display(scalaDF)
SQL
CREATE OR REFRESH STREAMING TABLE <table_name>
AS
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<event-hubs-server>:9093',
subscribe => '<event-hubs-topic>',
`kafka.security.protocol` => 'SASL_SSL',
`kafka.sasl.mechanism` => 'OAUTHBEARER',
`kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="<client-id>" clientSecret="<client-secret>" scope="https://<event-hubs-server>/.default" ssl.protocol="SSL";',
`kafka.sasl.oauthbearer.token.endpoint.url` => 'https://login.microsoft.com/<tenant-id>/oauth2/v2.0/token',
`kafka.sasl.login.callback.handler.class` => 'kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler'
);
SSL을 사용하여 Azure Databricks를 Kafka에 연결하기
Kafka에 대한 SSL 연결을 사용하도록 설정하려면 Confluent 설명서의 SSL을 사용한 암호화 및 인증지침을 따르십시오. 해당 문서에서 설명하는, kafka. 접두사가 붙은 구성을 옵션으로 제공할 수 있습니다. 예를 들어 트러스트 저장소 위치는 속성 kafka.ssl.truststore.location으로 지정됩니다.
SSL을 사용하는 경우 Databricks에서 다음을 권장합니다.
- 인증서를 Unity 카탈로그 볼륨에 저장합니다. 볼륨에서 읽을 수 있는 액세스 권한이 있는 사용자는 Kafka 인증서를 사용할 수 있습니다.
- 인증서 암호를 비밀 범위에 비밀로 저장합니다.
다음 예제에서는 개체 스토리지 위치 및 Databricks 비밀을 사용하여 SSL 연결을 사용하도록 설정합니다.
파이썬
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", ...)
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)
Scala
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", ...)
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <truststore-password-key-name>))
SQL
SELECT * FROM read_kafka(
bootstrapServers => '<bootstrap-server>:9092',
subscribe => '<topic>',
`kafka.security.protocol` => 'SSL',
`kafka.ssl.truststore.location` => '<truststore-location>',
`kafka.ssl.keystore.location` => '<keystore-location>',
`kafka.ssl.keystore.password` => secret('<certificate-scope-name>', '<keystore-password-key-name>'),
`kafka.ssl.truststore.password` => secret('<certificate-scope-name>', '<truststore-password-key-name>')
);
Azure Databricks에 HDInsight의 Kafka 연결
HDInsight Kafka 클러스터를 만듭니다.
관련 지침은 Azure Virtual Network를 통해 HDInsight의 Kafka에 연결을 참조하세요.
올바른 주소를 보급하도록 Kafka 브로커를 구성합니다.
IP 광고를 위한 Kafka 구성의 지침을 따르세요. Azure Virtual Machines에서 직접 Kafka를 관리하는 경우, 브로커의
advertised.listeners설정이 호스트의 내부 IP로 되어 있는지 확인하세요.Azure Databricks 클러스터를 만듭니다.
Kafka 클러스터를 Azure Databricks 클러스터에 피어링합니다.
가상 네트워크 피어링의 지침을 따르세요.
잠재적 오류 처리
새 만들기 실패
KafkaAdminClient인증 옵션이 잘못되면 이 내부 Kafka 오류가 발생합니다.
- 클라이언트 ID (응용 프로그램 ID라고도 함)
- 임차인 ID
- Event Hubs 서버
오류를 해결하려면 이러한 옵션에 대한 값이 올바른지 확인합니다. 또한 예제에서 기본적으로 제공되는 구성 옵션(예:
kafka.security.protocol)을 수정하면 이 오류가 표시될 수 있습니다.레코드가 반환되지 않음
DataFrame을 표시하거나 처리하려고 하지만 결과가 표시되지 않는 경우 UI에 다음이 표시됩니다.
이 메시지는 인증에 성공했지만 EventHubs에서 데이터를 반환하지 않았음을 의미합니다. 가능한 몇 가지 이유는 다음과 같습니다(완전한 목록은 아님):
- 잘못된 EventHubs 항목을 지정했습니다.
- 기본 Kafka 구성 옵션은
startingOffsets로 설정되어 있으며, 현재 토픽을 통해 아직 데이터를 받고 있지 않습니다.startingOffsets를earliest로 설정하여 Kafka의 초기 오프셋에서부터 데이터 읽기를 시작할 수 있습니다.