다음을 통해 공유


Apache Spark 커넥터: SQL Server & Azure SQL

SQL Server 및 Azure SQL용 Apache Spark 커넥터는 빅 데이터 분석에서 트랜잭션 데이터를 사용하고 임시 쿼리 또는 보고를 위해 결과를 유지할 수 있는 고성능 커넥터입니다. 이 커넥터를 사용하면 온-프레미스 또는 클라우드의 모든 SQL 데이터베이스를 Spark 작업에 대한 입력 데이터 원본 또는 출력 데이터 싱크로 사용할 수 있습니다.

이 라이브러리에는 SQL Server 및 Azure SQL용 Apache Spark 커넥터의 소스 코드가 포함되어 있습니다.

Apache Spark는 대규모 데이터를 처리하기 위한 통합 분석 엔진입니다.

Maven을 통해 제공되는 커넥터에는 2.4.x 호환 버전과 3.0.x 호환 버전의 두 가지 버전이 있습니다. 두 버전 모두 여기에서 찾을 수 있으며 아래 좌표를 사용하여 가져올 수 있습니다:

커넥터 Maven 좌표
Spark 2.4.x 호환 커넥터 com.microsoft.azure:spark-mssql-connector:1.0.2
Spark 3.0.x 호환 커넥터 com.microsoft.azure:spark-mssql-connector_2.12:1.1.0
Spark 3.1.x 호환 커넥터 com.microsoft.azure:spark-mssql-connector_2.12:1.2.0

원본에서 커넥터를 빌드하거나 GitHub의 릴리스 섹션에서 jar을 다운로드할 수도 있습니다. 커넥터에 대한 최신 정보는 SQL Spark 커넥터 GitHub 리포지토리를 참조해 주세요.

지원되는 기능

  • 모든 Spark 바인딩 지원(Scala, Python, R)
  • 기본 인증 및 AD(Active Directory) 키 탭 지원
  • 순서가 다시 지정된 dataframe 쓰기 지원
  • SQL Server 빅 데이터 클러스터에서 SQL Server 단일 인스턴스 및 데이터 풀에 쓰기 지원
  • Sql Server 단일 인스턴스에 대한 신뢰할 수 있는 커넥터 지원
구성 요소 지원되는 버전
Apache Spark 2.4.x, 3.0.x, 3.1.x
Scala 2.11, 2.12
SQL Server용 Microsoft JDBC Driver 8.4
Microsoft SQL Server SQL Server 2008 이상
Azure SQL Database 지원됨

지원되는 옵션

SQL Server 및 Azure SQL용 Apache Spark 커넥터는 SQL DataSource JDBC에 정의된 옵션을 지원합니다

게다가 다음에 나오는 옵션도 지원됩니다

옵션 기본값 설명
reliabilityLevel BEST_EFFORT BEST_EFFORT 또는 NO_DUPLICATES. NO_DUPLICATES는 실행기 다시 시작 시나리오에서 신뢰할 수 있는 삽입을 구현합니다
dataPoolDataSource none none 는 값이 설정되지 않았으며 커넥터가 SQL Server 단일 인스턴스에 씁니다. 빅 데이터 클러스터 데이터 풀 테이블을 작성하려면 이 값을 데이터 원본 이름으로 설정합니다
isolationLevel READ_COMMITTED 격리 수준 지정
tableLock false 쓰기 성능을 향상시키는 옵션을 사용하여 삽입 TABLOCK구현
schemaCheckEnabled true false로 설정하면 엄격한 데이터 프레임 및 SQL 테이블 스키마 검사가 사용되지 않습니다

다른 대량 복사 옵션은 해당 옵션dataframe 으로 설정할 수 있으며 쓰기 시 API에bulkcopy 전달됩니다

성능 비교

SQL Server 및 Azure SQL용 Apache Spark 커넥트or는 SQL Server에 쓰기 위한 일반 JDBC 커넥터보다 최대 15배 빠릅니다. 성능 특성은 형식, 데이터 볼륨, 사용되는 옵션에 따라 다르며 실행 변형을 표시할 수 있습니다. 다음 성능 결과는 spark dataframe에 약 143.9만 개 행이 있는 SQL 테이블을 덮어쓰는 데 걸리는 시간입니다. Spark는 spark TPCDS 벤치마크를 사용하여 생성된 dataframeHDFS 테이블을 읽어 store_sales생성됩니다. 읽을store_salesdataframe 시간은 제외됩니다. 결과는 3회의 실행에서 평균입니다.

커넥터 형식 옵션 설명 작성 시간
JDBCConnector 기본값 기본 옵션이 있는 일반 JDBC 커넥터 1385초
sql-spark-connector BEST_EFFORT 기본 옵션을 사용한 최상의 노력 sql-spark-connector 580초
sql-spark-connector NO_DUPLICATES 신뢰할 수 있는 sql-spark-connector 709초
sql-spark-connector BEST_EFFORT + tabLock=true 테이블 잠금을 사용하는 최상의 노력 sql-spark-connector 72초
sql-spark-connector NO_DUPLICATES + tabLock=true 테이블 잠금을 사용하도록 설정된 신뢰할 수 있음 sql-spark-connector 198초

Config

  • Spark 구성: num_executors = 20, executor_memory = '1664 m', executor_cores = 2
  • Data Gen 구성: scale_factor=50, partitioned_tables=true
  • nr의 행이 143,997,590인 데이터 파일 store_sales

환경

일반적으로 발생하는 문제

java.lang.NoClassDefFoundError: com/microsoft/aad/adal4j/AuthenticationException

이 문제는 hadoop 환경에서 이전 버전의 mssql 드라이버(현재 이 커넥터에 포함됨)를 사용할 때 발생합니다. 이전 Azure SQL 커넥터or를 사용하여 Microsoft Entra인증 호환성을 위해 해당 클라스터에 드라이버를 수동으로 설치한 경우 해당 드라이버를 제거해야 합니다.

문제를 해결하는 단계:

  1. 일반 Hadoop 환경을 사용하는 경우 mssql jar을 확인하고 제거합니다:rm $HADOOP_HOME/share/hadoop/yarn/lib/mssql-jdbc-6.2.1.jre7.jar. Databricks를 사용하는 경우 전역 또는 클러스터 init 스크립트를 추가하여 /databricks/jars폴더에서 이전 버전의 mssql 드라이버를 제거하거나 기존 스크립트에 이 줄을 추가합니다:rm /databricks/jars/*mssql*

  2. adal4jmssql 패키지를 추가합니다. 예를 들어 Maven을 사용할 수 있지만 어떤 방법이든 작동합니다.

    주의

    이러한 방식으로 SQL spark 커넥터를 설치하지 마십시오.

  3. 연결 구성에 드라이버 클래스를 추가합니다. 예시:

    connectionProperties = {
      `Driver`: `com.microsoft.sqlserver.jdbc.SQLServerDriver`
    }`
    

자세한 정보 및 설명은 해결 방법 https://github.com/microsoft/sql-spark-connector/issues/26을 참조해 주세요.

시작하기

SQL Server 및 Azure SQL용 Apache Spark 커넥트or는 Spark DataSourceV1 API 및 SQL Server 대량 API를 기반으로 하며 기본 제공 JDBC Spark-SQL 커넥터와 동일한 인터페이스를 사용합니다. 이 통합을 사용하면 간단히 형식 매개 변수를 업데이트하여 커넥터를 쉽게 통합하고 기존 Spark 작업을 마이그레이션할 수 있습니다 com.microsoft.sqlserver.jdbc.spark.

프로젝트에 커넥터를 포함하려면 이 리포지토리를 다운로드하고 SBT를 사용하여 jar을 빌드합니다.

새 SQL 테이블에 쓰기

경고

기본적으로 데이터베이스에 이미 있는 경우overwrite 모드는 먼저 테이블을 삭제합니다. 예기치 않은 데이터 손실을 방지하려면 이 옵션을 신중하게 사용해야 합니다.

모드overwrite를 사용하는 경우 테이블의 다시 만들기에 옵션을truncate 사용하지 않으면 인덱스가 손실됩니다. 이제 columnstore 테이블이 힙이 됩니다. 기존 인덱싱을 기본 경우 true 값으로 옵션을truncate 지정하세요. 예들 들어 .option("truncate","true")입니다.

server_name = "jdbc:sqlserver://{SERVER_ADDR}"
database_name = "database_name"
url = server_name + ";" + "databaseName=" + database_name + ";"

table_name = "table_name"
username = "username"
password = "password123!#" # Please specify password here

try:
  df.write \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .mode("overwrite") \
    .option("url", url) \
    .option("dbtable", table_name) \
    .option("user", username) \
    .option("password", password) \
    .save()
except ValueError as error :
    print("Connector write failed", error)

SQL 테이블에 추가

try:
  df.write \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .mode("append") \
    .option("url", url) \
    .option("dbtable", table_name) \
    .option("user", username) \
    .option("password", password) \
    .save()
except ValueError as error :
    print("Connector write failed", error)

격리 수준 지정

기본적으로 이 커넥터는 데이터베이스에 대량 삽입을 수행할 때 격리 수준을 사용합니다READ_COMMITTED. 격리 수준을 재정의하려면 아래와 같이 mssqlIsolationLevel 옵션을 사용합니다.

    .option("mssqlIsolationLevel", "READ_UNCOMMITTED") \

SQL 테이블에서 읽기

jdbcDF = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("user", username) \
        .option("password", password).load()

Microsoft Entra 인증

서비스 주체를 사용하는 Python 예

context = adal.AuthenticationContext(authority)
token = context.acquire_token_with_client_credentials(resource_app_id_url, service_principal_id, service_principal_secret)
access_token = token["accessToken"]

jdbc_db = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("accessToken", access_token) \
        .option("encrypt", "true") \
        .option("hostNameInCertificate", "*.database.windows.net") \
        .load()

Active Directory 암호를 사용하는 Python 예제

jdbc_df = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("authentication", "ActiveDirectoryPassword") \
        .option("user", user_name) \
        .option("password", password) \
        .option("encrypt", "true") \
        .option("hostNameInCertificate", "*.database.windows.net") \
        .load()

Active Directory를 사용하여 인증하려면 필요한 종속성을 설치해야 합니다.

ActiveDirectoryPassword를 사용하는 경우의 user 형식은 UPN 형식이어야 합니다(예제: username@domainname.com).

Scala의 경우 _com.microsoft.aad.adal4j_ 아티팩트를 설치해야 합니다.

Python의 경우 _adal_ 라이브러리 설치해야 합니다. 이 라이브러리는 PIP를 통해 사용할 수 있습니다.

예제는 샘플 Notebook을 확인합니다.

지원

Azure SQL 및 SQL Server용 Apache Spark 커넥터는 오픈 소스 프로젝트입니다. 이 커넥터에는 Microsoft 지원이 제공되지 않습니다. 커넥터에 대한 문제 또는 질문이 있는 경우 이 프로젝트 리포지토리에서 문제를 게시하세요. 커넥터 커뮤니티는 활발하게 제출을 모니터링하고 있습니다.

다음 단계

SQL Spark 커넥터 GitHub 리포지토리를 참조해 주세요.

격리 수준에 대한 자세한 내용은 SET TRANSACTION ISOLATION LEVEL(Transact-SQL)을 참조하세요.