Apache Spark コネクタ: SQL Server と Azure SQL
SQL Server と Azure SQL 用の Apache Spark コネクタは、パフォーマンスの高いコネクタであり、ビッグ データ分析でトランザクション データを使用し、アドホック クエリやレポートの結果を保持できるようにします。 コネクタを使用すると、オンプレミスまたはクラウド内のあらゆる SQL データベースを、Spark ジョブの入力データ ソースまたは出力データ シンクとして使用できます。
このライブラリには、SQL Server と Azure SQL 用の Apache Spark コネクタのソース コードが含まれています。
Apache Spark は、"大規模なデータ処理のための統合された分析エンジン" です。
Maven を通じて利用できるコネクタには、2.4.x 互換バージョンと 3.0.x 互換バージョンの 2 つのバージョンがあります。 どちらのバージョンもこちらにあり、次の座標を使用してインポートできます。
コネクタ | Maven 座標 |
---|---|
Spark 2.4.x 互換コネクタ | com.microsoft.azure:spark-mssql-connector:1.0.2 |
Spark 3.0.x 互換コネクタ | com.microsoft.azure:spark-mssql-connector_2.12:1.1.0 |
Spark 3.1.x 互換コネクタ | com.microsoft.azure:spark-mssql-connector_2.12:1.2.0 |
また、ソースからコネクタを作成することも、GitHub のリリース セクションから jar をダウンロードすることもできます。 コネクタの最新情報については、SQL Spark コネクタの GitHub リポジトリに関する記事をご覧ください。
サポートされている機能
- すべての Spark バインド (Scala、Python、R) のサポート
- 基本認証と Active Directory (AD) のキー タブのサポート
- 並べ替えられた
dataframe
書き込みのサポート - SQL Server ビッグ データ クラスターでの SQL Server 単一インスタンスおよびデータ プールへの書き込みのサポート
- SQL Server 単一インスタンスに対する信頼性の高いコネクタのサポート
コンポーネント | サポートされているバージョン |
---|---|
Apache Spark | 2.4.x、3.0.x、3.1.x |
Scala | 2.11、2.12 |
SQL Server 用 Microsoft JDBC ドライバー | 8.4 |
Microsoft SQL Server | SQL Server 2008 以降 |
Azure SQL Databases | サポートされています |
サポートされるオプション
SQL Server と Azure SQL 用の Apache Spark コネクタは、こちらで定義されているオプションをサポートしています:SQL DataSource JDBC
さらに、次のオプションがサポートされています
オプション | Default | 説明 |
---|---|---|
reliabilityLevel |
BEST_EFFORT |
BEST_EFFORT または NO_DUPLICATES 。 NO_DUPLICATES を使用すると、実行プログラム再起動シナリオで、信頼性の高い挿入を実装できます |
dataPoolDataSource |
none |
none は、値が設定されておらず、SQL Server の単一インスタンスへの書き込みにコネクタを使用する必要があることを意味します。 ビッグ データ クラスターのデータ プール テーブルに書き込むデータ ソース名に、この値を設定します |
isolationLevel |
READ_COMMITTED |
分離レベルを指定します |
tableLock |
false |
書き込みパフォーマンスを向上させるために、TABLOCK オプションと共に挿入を実装します |
schemaCheckEnabled |
true |
false に設定されている場合は、データ フレームと sql テーブル スキーマの厳密なチェックを無効にします |
その他の一括コピー オプションを、dataframe
のオプションとして設定でき、書き込み時に bulkcopy
API に渡されます
パフォーマンスの比較
SQL Server および Azure SQL 用の Apache Spark コネクタは、SQL Server に書き込むための汎用 JDBC コネクタよりも最大 15 倍高速です。 パフォーマンス特性は、種類、データ量、使用するオプションによって異なり、実行ごとの変動が示される場合もあります。 次のパフォーマンス結果は、spark dataframe
で 143.9M 行の SQL テーブルを上書きするために要した時間です。 spark dataframe
は spark TPCDS ベンチマークを使用して生成された store_sales
HDFS テーブルを読み取ることによって構築されます。 dataframe
への store_sales
の読み取り時間は除外されます。 結果は、3 回の実行を平均したものです。
コネクタの種類 | オプション | 説明 | 書き込み時間 |
---|---|---|---|
JDBCConnector |
Default | 既定のオプションを使用した汎用 JDBC コネクタ | 1385 秒 |
sql-spark-connector |
BEST_EFFORT |
既定のオプションを使用したベスト エフォート sql-spark-connector |
580 秒 |
sql-spark-connector |
NO_DUPLICATES |
信頼性の高い sql-spark-connector |
709 秒 |
sql-spark-connector |
BEST_EFFORT + tabLock=true |
テーブル ロックを有効にしたベスト エフォート sql-spark-connector |
72 秒 |
sql-spark-connector |
NO_DUPLICATES + tabLock=true |
テーブル ロックを有効にした信頼性の高い sql-spark-connector |
198 秒 |
構成
- Spark 構成: num_executors = 20、executor_memory = '1664 m'、executor_cores = 2
- データ生成構成: scale_factor=50、partitioned_tables=true
- 143,997,590 行の nr を含むデータ ファイル
store_sales
環境
- SQL Server ビッグ データ クラスター CU5
master
+ 6 ノード- 各ノード Gen 5 サーバー、512 GB RAM、4 TB NVM/ノード、NIC 10 GB
一般的に発生する問題
java.lang.NoClassDefFoundError: com/microsoft/aad/adal4j/AuthenticationException
この問題は、Hadoop 環境で以前のバージョンの mssql ドライバー (現在このコネクタに含まれています) を使用した場合に発生します。 以前の Azure SQL コネクタを使用していて、Microsoft Entra 認証の互換性を保つためにそのクラスターにドライバーを手動でインストールしている場合は、それらのドライバーを削除する必要があります。
この問題を解決する手順:
汎用の Hadoop 環境を使用している場合は、mssql jar:
rm $HADOOP_HOME/share/hadoop/yarn/lib/mssql-jdbc-6.2.1.jre7.jar
を確認し、削除します。 Databricks を使用している場合は、グローバルまたはクラスターの init スクリプトを追加して、以前のバージョンの mssql ドライバーを/databricks/jars
フォルダーから削除するか、または既存のスクリプトにrm /databricks/jars/*mssql*
の行を追加します。adal4j
およびmssql
パッケージを追加します。 たとえば、Maven を使用することはできますが、どのような方法でも機能するはずです。注意事項
SQL spark コネクタをこのようにインストールしないでください。
接続構成にドライバー クラスを追加します。 たとえば、次のように入力します。
connectionProperties = { `Driver`: `com.microsoft.sqlserver.jdbc.SQLServerDriver` }`
詳細と説明については、https://github.com/microsoft/sql-spark-connector/issues/26 の解決策を参照してください。
開始するには
SQL Server および Azure SQL 用の Apache Spark コネクタは、Spark DataSourceV1 API と SQL Server Bulk API に基づいており、組み込みの JDBC Spark SQL コネクタと同じインターフェイスを使用します。 この統合により、書式設定パラメーターを com.microsoft.sqlserver.jdbc.spark
で更新するだけで、コネクタを簡単に統合し、既存の Spark ジョブを移行させることができます。
このコネクタをプロジェクトに含めるには、このリポジトリをダウンロードし、SBT を使用して jar を構築します。
新しい SQL テーブルに書き込む
警告
overwrite
モードでは、テーブルが既定でデータベースに既に存在する場合は、最初にテーブルが削除されます。 予期しないデータ損失を避けるため、このオプションは注意して使用してください。
モード overwrite
を使用する場合、テーブルの再作成時にオプション truncate
を使用しないと、インデックスは失われます。 これで列ストア テーブルがヒープになります。 既存のインデックス作成を維持する場合は、オプション truncate
も値 true で指定してください。 たとえば、「 .option("truncate","true")
」のように入力します。
server_name = "jdbc:sqlserver://{SERVER_ADDR}"
database_name = "database_name"
url = server_name + ";" + "databaseName=" + database_name + ";"
table_name = "table_name"
username = "username"
password = "password123!#" # Please specify password here
try:
df.write \
.format("com.microsoft.sqlserver.jdbc.spark") \
.mode("overwrite") \
.option("url", url) \
.option("dbtable", table_name) \
.option("user", username) \
.option("password", password) \
.save()
except ValueError as error :
print("Connector write failed", error)
SQL テーブルに追加する
try:
df.write \
.format("com.microsoft.sqlserver.jdbc.spark") \
.mode("append") \
.option("url", url) \
.option("dbtable", table_name) \
.option("user", username) \
.option("password", password) \
.save()
except ValueError as error :
print("Connector write failed", error)
分離レベルを指定する
このコネクタは、データベースへの一括挿入を実行するときに、既定で READ_COMMITTED
分離レベルを使用します。 分離レベルをオーバーライドする場合は、次に示すように mssqlIsolationLevel
オプションを使用します。
.option("mssqlIsolationLevel", "READ_UNCOMMITTED") \
SQL テーブルから読み取る
jdbcDF = spark.read \
.format("com.microsoft.sqlserver.jdbc.spark") \
.option("url", url) \
.option("dbtable", table_name) \
.option("user", username) \
.option("password", password).load()
Microsoft Entra 認証
サービス プリンシパルを使用した Python の例
context = adal.AuthenticationContext(authority)
token = context.acquire_token_with_client_credentials(resource_app_id_url, service_principal_id, service_principal_secret)
access_token = token["accessToken"]
jdbc_db = spark.read \
.format("com.microsoft.sqlserver.jdbc.spark") \
.option("url", url) \
.option("dbtable", table_name) \
.option("accessToken", access_token) \
.option("encrypt", "true") \
.option("hostNameInCertificate", "*.database.windows.net") \
.load()
Active Directory パスワードを使用した Python の例
jdbc_df = spark.read \
.format("com.microsoft.sqlserver.jdbc.spark") \
.option("url", url) \
.option("dbtable", table_name) \
.option("authentication", "ActiveDirectoryPassword") \
.option("user", user_name) \
.option("password", password) \
.option("encrypt", "true") \
.option("hostNameInCertificate", "*.database.windows.net") \
.load()
Active Directory を使用して認証するには、必要な依存関係がインストールされている必要があります。
ActiveDirectoryPassword を使う場合の user
の形式は、UPN 形式にする必要があります (例: username@domainname.com
)。
Scala の場合、_com.microsoft.aad.adal4j_
アーティファクトをインストールする必要があります。
Python の場合、_adal_
ライブラリをインストールする必要があります。 これは、pip を介して利用できます。
例については、サンプル ノートブックを確認してください。
サポート
Azure SQL および SQL Server 用の Apache Spark コネクタは、オープンソース プロジェクトです。 このコネクタには、Microsoft サポートは付属していません。 コネクタに関する問題や質問については、このプロジェクト リポジトリに問題を作成してください。 コネクタ コミュニティはアクティブであり、送信を監視しています。
次のステップ
SQL Spark コネクタの GitHub リポジトリにアクセスします。
分離レベルについての詳細については、「SET TRANSACTION ISOLATION LEVEL (Transact-SQL)」を参照してください。