Azure Databricks는 JDBC를 사용하여 외부 데이터베이스에 연결할 수 있도록 지원합니다. 이 문서에서는 Python, SQL 및 Scala의 예제와 함께 이러한 연결을 구성하고 사용하기 위한 기본 구문을 제공합니다.
중요합니다
레거시 쿼리 페더레이션 설명서가 사용 중지되었으며 업데이트되지 않을 수 있습니다. 이 콘텐츠에 언급된 구성은 Databricks에서 공식적으로 승인되거나 테스트되지 않습니다. Lakehouse Federation이 원본 데이터베이스를 지원하는 경우 Databricks는 대신 이 데이터베이스를 사용하는 것이 좋습니다.
Partner Connect는 많은 외부 데이터 원본과 데이터를 동기화하기 위한 최적화된 통합을 제공합니다. Databricks Partner Connect란?을 참조하세요.
중요합니다
이 문서의 예제에는 JDBC URL의 사용자 이름 및 암호가 포함되지 않습니다. Databricks는 데이터베이스 자격 증명을 저장하기 위해 시크릿을 사용할 것을 권장합니다. 다음은 그 예입니다.
파이썬
username = dbutils.secrets.get(scope = "jdbc", key = "username")
password = dbutils.secrets.get(scope = "jdbc", key = "password")
스칼라
val username = dbutils.secrets.get(scope = "jdbc", key = "username")
val password = dbutils.secrets.get(scope = "jdbc", key = "password")
SQL을 사용하여 Databricks 비밀을 참조하려면 클러스터를 초기화하는 동안 Spark 구성 속성을 구성해야 합니다.
비밀 관리의 전체 예제는 자습서: Databricks 비밀 만들기 및 사용을 참조하세요.
JDBC를 사용하여 데이터 읽기
JDBC를 사용하여 데이터를 읽으려면 여러 설정을 구성해야 합니다. 각 데이터베이스는 <jdbc-url>에 다른 형식을 사용합니다.
파이썬
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
)
스칼라
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
Spark는 데이터베이스 테이블에서 스키마를 자동으로 읽고 해당 형식을 Spark SQL 형식에 다시 매핑합니다.
파이썬
employees_table.printSchema
SQL
DESCRIBE employees_table_vw
스칼라
employees_table.printSchema
이 JDBC 테이블에 대해 쿼리를 실행할 수 있습니다.
파이썬
display(employees_table.select("age", "salary").groupBy("age").avg("salary"))
SQL
SELECT age, avg(salary) as salary
FROM employees_table_vw
GROUP BY age
스칼라
display(employees_table.select("age", "salary").groupBy("age").avg("salary"))
JDBC를 사용하여 데이터 쓰기
JDBC를 사용하여 테이블에 데이터를 저장하는 것은 읽기에 유사한 구성을 사용합니다. 다음 예제를 참조하세요.
파이썬
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
)
SQL
CREATE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT * FROM employees_table_vw
스칼라
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
기본 동작은 새 테이블을 만들려고 시도하며, 해당 이름의 테이블이 이미 존재하는 경우 오류를 발생시킵니다.
다음 구문을 사용하여 기존 테이블에 데이터를 추가할 수 있습니다.
파이썬
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("append")
.save()
)
SQL
CREATE TABLE IF NOT EXISTS new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
);
INSERT INTO new_employees_table
SELECT * FROM employees_table_vw;
스칼라
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("append")
.save()
다음 구문을 사용하여 기존 테이블을 덮어쓸 수 있습니다.
파이썬
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("overwrite")
.save()
)
SQL
CREATE OR REPLACE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT * FROM employees_table_vw;
스칼라
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("overwrite")
.save()
JDBC 쿼리에 대한 병렬 처리 제어
기본적으로 JDBC 드라이버는 단일 스레드로만 원본 데이터베이스를 쿼리합니다. 읽기 성능을 향상하려면 Azure Databricks에서 데이터베이스에 대해 수행하는 동시 쿼리 수를 제어하는 여러 옵션을 지정해야 합니다. 작은 클러스터의 경우 numPartitions 옵션을 클러스터의 실행기 코어 수와 동일하게 설정하면 모든 노드가 데이터를 병렬로 쿼리할 수 있습니다.
경고
대규모 클러스터에서 numPartitions를 높은 값으로 설정할 경우 동시에 너무 많이 쿼리하면 서비스에 과부하가 발생할 수 있으므로 원격 데이터베이스의 성능이 저하될 수 있습니다. 이는 애플리케이션 데이터베이스에서 특히 번거롭습니다. 이 값을 50 이상으로 설정하지 않는 것이 좋습니다.
비고
partitionColumn원본 데이터베이스에서 계산된 인덱스가 있는 열을 선택하여 쿼리 속도를 향상합니다.
다음 코드 예제에서는 8개의 코어가 있는 클러스터에 대한 병렬 처리를 구성하는 방법을 보여 줍니다.
파이썬
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
# a column that can be used that has a uniformly distributed range of values that can be used for parallelization
.option("partitionColumn", "<partition-key>")
# lowest value to pull data for with the partitionColumn
.option("lowerBound", "<min-value>")
# max value to pull data for with the partitionColumn
.option("upperBound", "<max-value>")
# number of partitions to distribute the data into. Do not set this very large (~hundreds)
.option("numPartitions", 8)
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>',
partitionColumn "<partition-key>",
lowerBound "<min-value>",
upperBound "<max-value>",
numPartitions 8
)
스칼라
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
// a column that can be used that has a uniformly distributed range of values that can be used for parallelization
.option("partitionColumn", "<partition-key>")
// lowest value to pull data for with the partitionColumn
.option("lowerBound", "<min-value>")
// max value to pull data for with the partitionColumn
.option("upperBound", "<max-value>")
// number of partitions to distribute the data into. Do not set this very large (~hundreds)
.option("numPartitions", 8)
.load()
비고
Azure Databricks는 JDBC를 구성하기 위한 모든 Apache Spark 옵션을 지원합니다.
JDBC를 사용하여 데이터베이스에 쓸 때 Apache Spark는 메모리의 파티션 수를 사용하여 병렬 처리를 제어합니다. 병렬 처리를 제어하기 위해 쓰기 전에 데이터를 다시 분할할 수 있습니다. 원격 데이터베이스를 압도하지 않도록 대규모 클러스터에서 많은 수의 파티션을 방지합니다. 다음 예제에서는 쓰기 전에 8개의 파티션으로 다시 분할하는 방법을 보여 줍니다.
파이썬
(employees_table.repartition(8)
.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
)
SQL
CREATE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT /*+ REPARTITION(8) */ * FROM employees_table_vw
스칼라
employees_table.repartition(8)
.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
데이터베이스 엔진에 쿼리 전달
전체 쿼리를 데이터베이스로 푸시 다운하고 결과만 반환할 수 있습니다.
table 매개 변수는 읽을 JDBC 테이블을 식별합니다. SQL 쿼리 FROM 절에서 유효한 모든 항목을 사용할 수 있습니다.
파이썬
pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", pushdown_query)
.option("user", "<username>")
.option("password", "<password>")
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "(select * from employees where emp_no < 10008) as emp_alias",
user '<username>',
password '<password>'
)
스칼라
val pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", pushdown_query)
.option("user", "<username>")
.option("password", "<password>")
.load()
쿼리당 가져오는 행 수 제어
JDBC 드라이버에는 원격 데이터베이스에서 한 번에 가져온 행 수를 제어하는 fetchSize 매개 변수가 있습니다.
| 설정 | 결과 |
|---|---|
| 너무 낮음 | 많은 왕복으로 인한 높은 대기 시간(쿼리당 반환되는 행은 거의 없음) |
| 너무 높음 | 메모리 부족 오류(한 쿼리에서 반환된 데이터가 너무 많음) |
최적의 값은 워크로드에 따라 달라집니다. 고려 사항은 다음과 같습니다.
- 쿼리에서 반환되는 열 수는 몇 개입니까?
- 반환되는 데이터 형식은 무엇인가요?
- 각 열에서 반환되는 문자열의 길이는 얼마나 되나요?
시스템은 매우 작은 기본값을 가지며 튜닝의 이점을 누릴 수 있습니다. 예: Oracle의 기본값 fetchSize 은 10입니다. 100으로 늘리면 실행해야 하는 총 쿼리 수가 10배로 줄어듭니다. JDBC 결과는 네트워크 트래픽이므로 매우 많은 수를 방지하지만 최적의 값은 많은 데이터 세트에 대해 수천 개에 있을 수 있습니다.
다음 예제와 같이 fetchSize 옵션을 사용합니다.
파이썬
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.option("fetchSize", "100")
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'.
fetchSize 100
)
스칼라
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.option("fetchSize", "100")
.load()