Apache Spark 커넥터: SQL Server & Azure SQL
SQL Server 및 Azure SQL용 Apache Spark 커넥터는 빅 데이터 분석에서 트랜잭션 데이터를 사용하고 임시 쿼리 또는 보고를 위해 결과를 유지할 수 있는 고성능 커넥터입니다. 이 커넥터를 사용하면 온-프레미스 또는 클라우드의 모든 SQL 데이터베이스를 Spark 작업에 대한 입력 데이터 원본 또는 출력 데이터 싱크로 사용할 수 있습니다.
이 라이브러리에는 SQL Server 및 Azure SQL용 Apache Spark 커넥터의 소스 코드가 포함되어 있습니다.
Apache Spark는 대규모 데이터를 처리하기 위한 통합 분석 엔진입니다.
Maven을 통해 제공되는 커넥터에는 2.4.x 호환 버전과 3.0.x 호환 버전의 두 가지 버전이 있습니다. 두 버전 모두 여기에서 찾을 수 있으며 아래 좌표를 사용하여 가져올 수 있습니다:
커넥터 | Maven 좌표 |
---|---|
Spark 2.4.x 호환 커넥터 | com.microsoft.azure:spark-mssql-connector:1.0.2 |
Spark 3.0.x 호환 커넥터 | com.microsoft.azure:spark-mssql-connector_2.12:1.1.0 |
Spark 3.1.x 호환 커넥터 | com.microsoft.azure:spark-mssql-connector_2.12:1.2.0 |
원본에서 커넥터를 빌드하거나 GitHub의 릴리스 섹션에서 jar을 다운로드할 수도 있습니다. 커넥터에 대한 최신 정보는 SQL Spark 커넥터 GitHub 리포지토리를 참조해 주세요.
지원되는 기능
- 모든 Spark 바인딩 지원(Scala, Python, R)
- 기본 인증 및 AD(Active Directory) 키 탭 지원
- 순서가 다시 지정된
dataframe
쓰기 지원 - SQL Server 빅 데이터 클러스터에서 SQL Server 단일 인스턴스 및 데이터 풀에 쓰기 지원
- Sql Server 단일 인스턴스에 대한 신뢰할 수 있는 커넥터 지원
구성 요소 | 지원되는 버전 |
---|---|
Apache Spark | 2.4.x, 3.0.x, 3.1.x |
Scala | 2.11, 2.12 |
SQL Server용 Microsoft JDBC Driver | 8.4 |
Microsoft SQL Server | SQL Server 2008 이상 |
Azure SQL Database | 지원됨 |
지원되는 옵션
SQL Server 및 Azure SQL용 Apache Spark 커넥터는 SQL DataSource JDBC에 정의된 옵션을 지원합니다
게다가 다음에 나오는 옵션도 지원됩니다
옵션 | 기본값 | 설명 |
---|---|---|
reliabilityLevel |
BEST_EFFORT |
BEST_EFFORT 또는 NO_DUPLICATES . NO_DUPLICATES 는 실행기 다시 시작 시나리오에서 신뢰할 수 있는 삽입을 구현합니다 |
dataPoolDataSource |
none |
none 는 값이 설정되지 않았으며 커넥터가 SQL Server 단일 인스턴스에 씁니다. 빅 데이터 클러스터 데이터 풀 테이블을 작성하려면 이 값을 데이터 원본 이름으로 설정합니다 |
isolationLevel |
READ_COMMITTED |
격리 수준 지정 |
tableLock |
false |
쓰기 성능을 향상시키는 옵션을 사용하여 삽입 TABLOCK 구현 |
schemaCheckEnabled |
true |
false로 설정하면 엄격한 데이터 프레임 및 SQL 테이블 스키마 검사가 사용되지 않습니다 |
다른 대량 복사 옵션은 해당 옵션dataframe
으로 설정할 수 있으며 쓰기 시 API에bulkcopy
전달됩니다
성능 비교
SQL Server 및 Azure SQL용 Apache Spark 커넥트or는 SQL Server에 쓰기 위한 일반 JDBC 커넥터보다 최대 15배 빠릅니다. 성능 특성은 형식, 데이터 볼륨, 사용되는 옵션에 따라 다르며 실행 변형을 표시할 수 있습니다. 다음 성능 결과는 spark dataframe
에 약 143.9만 개 행이 있는 SQL 테이블을 덮어쓰는 데 걸리는 시간입니다. Spark는 spark TPCDS 벤치마크를 사용하여 생성된 dataframe
HDFS 테이블을 읽어 store_sales
생성됩니다. 읽을store_sales
dataframe
시간은 제외됩니다. 결과는 3회의 실행에서 평균입니다.
커넥터 형식 | 옵션 | 설명 | 작성 시간 |
---|---|---|---|
JDBCConnector |
기본값 | 기본 옵션이 있는 일반 JDBC 커넥터 | 1385초 |
sql-spark-connector |
BEST_EFFORT |
기본 옵션을 사용한 최상의 노력 sql-spark-connector |
580초 |
sql-spark-connector |
NO_DUPLICATES |
신뢰할 수 있는 sql-spark-connector |
709초 |
sql-spark-connector |
BEST_EFFORT + tabLock=true |
테이블 잠금을 사용하는 최상의 노력 sql-spark-connector |
72초 |
sql-spark-connector |
NO_DUPLICATES + tabLock=true |
테이블 잠금을 사용하도록 설정된 신뢰할 수 있음 sql-spark-connector |
198초 |
Config
- Spark 구성: num_executors = 20, executor_memory = '1664 m', executor_cores = 2
- Data Gen 구성: scale_factor=50, partitioned_tables=true
- nr의 행이 143,997,590인 데이터 파일
store_sales
환경
- SQL Server 빅 데이터 클러스터 CU5
master
+ 6개 노드- 각 노드 Gen 5 서버, 512GB Ram, 노드당 4TB NVM, NIC 10GB
일반적으로 발생하는 문제
java.lang.NoClassDefFoundError: com/microsoft/aad/adal4j/AuthenticationException
이 문제는 hadoop 환경에서 이전 버전의 mssql 드라이버(현재 이 커넥터에 포함됨)를 사용할 때 발생합니다. 이전 Azure SQL 커넥터or를 사용하여 Microsoft Entra인증 호환성을 위해 해당 클라스터에 드라이버를 수동으로 설치한 경우 해당 드라이버를 제거해야 합니다.
문제를 해결하는 단계:
일반 Hadoop 환경을 사용하는 경우 mssql jar을 확인하고 제거합니다:
rm $HADOOP_HOME/share/hadoop/yarn/lib/mssql-jdbc-6.2.1.jre7.jar
. Databricks를 사용하는 경우 전역 또는 클러스터 init 스크립트를 추가하여/databricks/jars
폴더에서 이전 버전의 mssql 드라이버를 제거하거나 기존 스크립트에 이 줄을 추가합니다:rm /databricks/jars/*mssql*
adal4j
및mssql
패키지를 추가합니다. 예를 들어 Maven을 사용할 수 있지만 어떤 방법이든 작동합니다.주의
이러한 방식으로 SQL spark 커넥터를 설치하지 마십시오.
연결 구성에 드라이버 클래스를 추가합니다. 예시:
connectionProperties = { `Driver`: `com.microsoft.sqlserver.jdbc.SQLServerDriver` }`
자세한 정보 및 설명은 해결 방법 https://github.com/microsoft/sql-spark-connector/issues/26을 참조해 주세요.
시작하기
SQL Server 및 Azure SQL용 Apache Spark 커넥트or는 Spark DataSourceV1 API 및 SQL Server 대량 API를 기반으로 하며 기본 제공 JDBC Spark-SQL 커넥터와 동일한 인터페이스를 사용합니다. 이 통합을 사용하면 간단히 형식 매개 변수를 업데이트하여 커넥터를 쉽게 통합하고 기존 Spark 작업을 마이그레이션할 수 있습니다 com.microsoft.sqlserver.jdbc.spark
.
프로젝트에 커넥터를 포함하려면 이 리포지토리를 다운로드하고 SBT를 사용하여 jar을 빌드합니다.
새 SQL 테이블에 쓰기
경고
기본적으로 데이터베이스에 이미 있는 경우overwrite
모드는 먼저 테이블을 삭제합니다. 예기치 않은 데이터 손실을 방지하려면 이 옵션을 신중하게 사용해야 합니다.
모드overwrite
를 사용하는 경우 테이블의 다시 만들기에 옵션을truncate
사용하지 않으면 인덱스가 손실됩니다. 이제 columnstore 테이블이 힙이 됩니다. 기존 인덱싱을 기본 경우 true 값으로 옵션을truncate
지정하세요. 예들 들어 .option("truncate","true")
입니다.
server_name = "jdbc:sqlserver://{SERVER_ADDR}"
database_name = "database_name"
url = server_name + ";" + "databaseName=" + database_name + ";"
table_name = "table_name"
username = "username"
password = "password123!#" # Please specify password here
try:
df.write \
.format("com.microsoft.sqlserver.jdbc.spark") \
.mode("overwrite") \
.option("url", url) \
.option("dbtable", table_name) \
.option("user", username) \
.option("password", password) \
.save()
except ValueError as error :
print("Connector write failed", error)
SQL 테이블에 추가
try:
df.write \
.format("com.microsoft.sqlserver.jdbc.spark") \
.mode("append") \
.option("url", url) \
.option("dbtable", table_name) \
.option("user", username) \
.option("password", password) \
.save()
except ValueError as error :
print("Connector write failed", error)
격리 수준 지정
기본적으로 이 커넥터는 데이터베이스에 대량 삽입을 수행할 때 격리 수준을 사용합니다READ_COMMITTED
. 격리 수준을 재정의하려면 아래와 같이 mssqlIsolationLevel
옵션을 사용합니다.
.option("mssqlIsolationLevel", "READ_UNCOMMITTED") \
SQL 테이블에서 읽기
jdbcDF = spark.read \
.format("com.microsoft.sqlserver.jdbc.spark") \
.option("url", url) \
.option("dbtable", table_name) \
.option("user", username) \
.option("password", password).load()
Microsoft Entra 인증
서비스 주체를 사용하는 Python 예
context = adal.AuthenticationContext(authority)
token = context.acquire_token_with_client_credentials(resource_app_id_url, service_principal_id, service_principal_secret)
access_token = token["accessToken"]
jdbc_db = spark.read \
.format("com.microsoft.sqlserver.jdbc.spark") \
.option("url", url) \
.option("dbtable", table_name) \
.option("accessToken", access_token) \
.option("encrypt", "true") \
.option("hostNameInCertificate", "*.database.windows.net") \
.load()
Active Directory 암호를 사용하는 Python 예제
jdbc_df = spark.read \
.format("com.microsoft.sqlserver.jdbc.spark") \
.option("url", url) \
.option("dbtable", table_name) \
.option("authentication", "ActiveDirectoryPassword") \
.option("user", user_name) \
.option("password", password) \
.option("encrypt", "true") \
.option("hostNameInCertificate", "*.database.windows.net") \
.load()
Active Directory를 사용하여 인증하려면 필요한 종속성을 설치해야 합니다.
ActiveDirectoryPassword를 사용하는 경우의 user
형식은 UPN 형식이어야 합니다(예제: username@domainname.com
).
Scala의 경우 _com.microsoft.aad.adal4j_
아티팩트를 설치해야 합니다.
Python의 경우 _adal_
라이브러리 설치해야 합니다. 이 라이브러리는 PIP를 통해 사용할 수 있습니다.
예제는 샘플 Notebook을 확인합니다.
지원
Azure SQL 및 SQL Server용 Apache Spark 커넥터는 오픈 소스 프로젝트입니다. 이 커넥터에는 Microsoft 지원이 제공되지 않습니다. 커넥터에 대한 문제 또는 질문이 있는 경우 이 프로젝트 리포지토리에서 문제를 게시하세요. 커넥터 커뮤니티는 활발하게 제출을 모니터링하고 있습니다.
다음 단계
SQL Spark 커넥터 GitHub 리포지토리를 참조해 주세요.
격리 수준에 대한 자세한 내용은 SET TRANSACTION ISOLATION LEVEL(Transact-SQL)을 참조하세요.