SQL Server および Azure SQL 用の Apache Spark コネクタは、ビッグ データ分析でトランザクション データを使用し、アドホック クエリやレポートのために結果を保持できる高パフォーマンス コネクタです。 コネクタを使用すると、オンプレミスまたはクラウド内の任意の SQL データベースを、Spark ジョブの入力データ ソースまたは出力データ シンクとして使用できます。
注
このコネクタはアクティブに維持されません。 この記事はアーカイブ目的でのみ保持されます。
このライブラリには、SQL Server および Azure SQL プラットフォーム用の Apache Spark コネクタのソース コードが含まれています。
Apache Spark は、"大規模なデータ処理のための統合された分析エンジン" です。
Maven で使用できるコネクタには、2.4.x 互換バージョンと 3.0.x 互換バージョンの 2 つのバージョンがあります。 maven.org からコネクタをダウンロード し、座標を使用してインポートします。
コネクタ | Maven 座標 |
---|---|
Spark 2.4.x 互換コネクタ | com.microsoft.azure:spark-mssql-connector:1.0.2 |
Spark 3.0.x 互換コネクタ | com.microsoft.azure:spark-mssql-connector_2.12:1.1.0 |
Spark 3.1.x 互換コネクタ | com.microsoft.azure:spark-mssql-connector_2.12:1.2.0 |
ソースからコネクタをビルドしたり、GitHub のリリース セクションから jar をダウンロードしたりすることもできます。 コネクタの最新情報については、 SQL Spark コネクタの GitHub リポジトリを参照してください。
サポートされている機能
- すべての Spark バインドのサポート (Scala、Python、R)
- 基本認証と Active Directory (AD) キー タブのサポート
- 並べ替え
dataframe
書き込みサポート - SQL Server ビッグ データ クラスターでの SQL Server 単一インスタンスとデータ プールへの書き込みのサポート
- Sql Server 単一インスタンスに対する Reliable Connector のサポート
コンポーネント | サポートされているバージョン |
---|---|
Apache Spark | 2.4.x、3.0.x、3.1.x |
スカラ (プログラミング言語) | 2.11, 2.12 |
SQL Server 用 Microsoft JDBC ドライバー | 8.4 |
Microsoft SQL Server | SQL Server 2008 以降 |
Azure SQL データベース群 | サポートされています |
サポートされているオプション
APACHE Spark Connector for SQL Server と Azure SQL では、ここで定義されているオプションである SQL DataSource JDBC がサポートされています
さらに、次のオプションがサポートされています。
選択肢 | 既定値 | 説明 |
---|---|---|
reliabilityLevel |
BEST_EFFORT |
BEST_EFFORT または NO_DUPLICATES 。
NO_DUPLICATES は、Executor の再起動シナリオで信頼性の高い挿入を実装します |
dataPoolDataSource |
none |
none は、値が設定されていないことを意味し、コネクタは SQL Server の単一インスタンスに書き込む必要があります。 ビッグ データ クラスターでデータ プール テーブルを書き込むには、この値をデータ ソース名に設定します |
isolationLevel |
READ_COMMITTED |
分離レベルを指定する |
tableLock |
false |
TABLOCK オプションを使用して挿入を実装し、書き込みパフォーマンスを向上させます |
schemaCheckEnabled |
true |
false に設定すると、厳密なデータ フレームと SQL テーブルスキーマチェックを無効にします。 |
その他の 一括コピー オプション は、 dataframe
のオプションとして設定でき、書き込み時に bulkcopy
API に渡されます
パフォーマンスの比較
SQL Server および Azure SQL 用の Apache Spark コネクタは、SQL Server に書き込むための汎用 JDBC コネクタよりも最大 15 倍高速です。 パフォーマンス特性は、種類、データの量、使用されるオプションによって異なり、実行のバリエーションが表示される場合があります。 次のパフォーマンス結果は、Spark dataframe
内の 143.9M 行で SQL テーブルを上書きするのにかかった時間です。 spark dataframe
は、store_sales
を使用して生成された HDFS テーブル読み取ることによって構築されます。
store_sales
からdataframe
までの読み取り時間は除外されます。 結果は 3 回の実行で平均されます。
コネクタの種類 | オプション | 説明 | 書き込み時間 |
---|---|---|---|
JDBCConnector |
既定値 | 既定のオプションを含む汎用 JDBC コネクタ | 1,385 秒 |
sql-spark-connector |
BEST_EFFORT |
既定のオプションを使用した最善の努力 sql-spark-connector |
580 秒 |
sql-spark-connector |
NO_DUPLICATES |
頼もしい sql-spark-connector |
709 秒 |
sql-spark-connector |
BEST_EFFORT + tabLock=true |
テーブルロックが有効な状態での最善努力 sql-spark-connector |
72 秒 |
sql-spark-connector |
NO_DUPLICATES + tabLock=true |
テーブル ロックが有効になっている信頼性の高いsql-spark-connector |
198 秒 |
設定
- Spark 構成: num_executors = 20、executor_memory = '1664 m'、executor_cores = 2
- データ生成設定: スケールファクター=50, パーティション化テーブル=true
- 行数 143,997,590 のデータ ファイル
store_sales
環境
- SQL Server ビッグ データ クラスター CU5
-
master
+ 6 ノード - 各ノード Gen 5 サーバー、512 GB Ram、ノードあたり 4 TB NVM、NIC 10 GB
一般的に直面する問題
java.lang.NoClassDefFoundError: com/microsoft/aad/adal4j/AuthenticationException
この問題は、hadoop 環境で古いバージョンの mssql ドライバー (このコネクタに含まれるようになりました) を使用して発生します。 以前の Azure SQL コネクタを使用していて、Microsoft Entra 認証の互換性のためにそのクラスターにドライバーを手動でインストールしている場合は、それらのドライバーを削除する必要があります。
この問題を解決する手順:
汎用 Hadoop 環境を使用している場合は、mssql jar:
rm $HADOOP_HOME/share/hadoop/yarn/lib/mssql-jdbc-6.2.1.jre7.jar
を確認して削除します。 Databricks を使用している場合は、グローバルまたはクラスター init スクリプトを追加して、/databricks/jars
フォルダーから古いバージョンの mssql ドライバーを削除するか、既存のスクリプトに次の行を追加します。rm /databricks/jars/*mssql*
adal4j
パッケージとmssql
パッケージを追加します。 たとえば、Maven を使用することはできますが、どのような方法でも機能します。注意事項
この方法で SQL Spark コネクタをインストールしないでください。
接続構成にドライバー クラスを追加します。 例えば次が挙げられます。
connectionProperties = { `Driver`: `com.microsoft.sqlserver.jdbc.SQLServerDriver` }`
詳細と説明については、解決策https://github.com/microsoft/sql-spark-connector/issues/26を参照してください。
作業を開始する
SQL Server および Azure SQL 用の Apache Spark コネクタは、Spark DataSourceV1 API と SQL Server Bulk API に基づいており、組み込みの JDBC Spark-SQL コネクタと同じインターフェイスを使用します。 この統合により、format パラメーターを com.microsoft.sqlserver.jdbc.spark
で更新するだけで、コネクタを簡単に統合し、既存の Spark ジョブを移行できます。
プロジェクトにコネクタを含めるには、このリポジトリをダウンロードし、SBT を使用して jar をビルドします。
新しい SQL テーブルへの書き込み
Warnung
overwrite
モードでは、既定でテーブルがデータベースに既に存在する場合は、まずテーブルが削除されます。 予期しないデータ損失を避けるために、このオプションを慎重に使用してください。
テーブルの再作成時にオプション overwrite
を使用しない場合、モード truncate
を使用すると、インデックスが失われます。 では、列ストア テーブルはヒープになります。 既存のインデックス作成を維持する場合は、値が true のオプション truncate
も指定してください。 たとえば、.option("truncate","true")
のようにします。
server_name = "jdbc:sqlserver://{SERVER_ADDR}"
database_name = "database_name"
url = server_name + ";" + "databaseName=" + database_name + ";"
table_name = "table_name"
username = "username"
password = "password123!#" # Please specify password here
try:
df.write \
.format("com.microsoft.sqlserver.jdbc.spark") \
.mode("overwrite") \
.option("url", url) \
.option("dbtable", table_name) \
.option("user", username) \
.option("password", password) \
.save()
except ValueError as error :
print("Connector write failed", error)
SQL テーブルへの追加
try:
df.write \
.format("com.microsoft.sqlserver.jdbc.spark") \
.mode("append") \
.option("url", url) \
.option("dbtable", table_name) \
.option("user", username) \
.option("password", password) \
.save()
except ValueError as error :
print("Connector write failed", error)
分離レベルを指定する
このコネクタは、既定では、データベースへの一括挿入を実行するときに READ_COMMITTED
分離レベルを使用します。 分離レベルをオーバーライドする場合は、 mssqlIsolationLevel
オプションを使用します。
.option("mssqlIsolationLevel", "READ_UNCOMMITTED") \
SQL テーブルからの読み取り
jdbcDF = spark.read \
.format("com.microsoft.sqlserver.jdbc.spark") \
.option("url", url) \
.option("dbtable", table_name) \
.option("user", username) \
.option("password", password).load()
Microsoft Entra 認証
サービス プリンシパルを使用した Python の例
context = adal.AuthenticationContext(authority)
token = context.acquire_token_with_client_credentials(resource_app_id_url, service_principal_id, service_principal_secret)
access_token = token["accessToken"]
jdbc_db = spark.read \
.format("com.microsoft.sqlserver.jdbc.spark") \
.option("url", url) \
.option("dbtable", table_name) \
.option("accessToken", access_token) \
.option("encrypt", "true") \
.option("hostNameInCertificate", "*.database.windows.net") \
.load()
Active Directory パスワードを使用した Python の例
jdbc_df = spark.read \
.format("com.microsoft.sqlserver.jdbc.spark") \
.option("url", url) \
.option("dbtable", table_name) \
.option("authentication", "ActiveDirectoryPassword") \
.option("user", user_name) \
.option("password", password) \
.option("encrypt", "true") \
.option("hostNameInCertificate", "*.database.windows.net") \
.load()
Active Directory を使用して認証するには、必要な依存関係がインストールされている必要があります。
ActiveDirectoryPassword を使う場合の user
の形式は、UPN 形式にする必要があります (例: username@domainname.com
)。
Scala の場合、_com.microsoft.aad.adal4j_
成果物をインストールする必要があります。
Python の場合、_adal_
ライブラリをインストールする必要があります。 これは、pip を介して利用できます。
サンプル ノートブックの例を確認します。
支援
Azure SQL および SQL Server 用の Apache Spark コネクタは、オープンソース プロジェクトです。 このコネクタには、Microsoft のサポートはありません。 コネクタに関する問題や質問については、このプロジェクト リポジトリに問題を作成してください。 コネクタ コミュニティはアクティブであり、送信を監視しています。
関連コンテンツ
SQL Spark コネクタの GitHub リポジトリにアクセスします。