次の方法で共有


Apache Spark コネクタ: SQL Server と Azure SQL

SQL Server および Azure SQL 用の Apache Spark コネクタは、ビッグ データ分析でトランザクション データを使用し、アドホック クエリやレポートのために結果を保持できる高パフォーマンス コネクタです。 コネクタを使用すると、オンプレミスまたはクラウド内の任意の SQL データベースを、Spark ジョブの入力データ ソースまたは出力データ シンクとして使用できます。

このコネクタはアクティブに維持されません。 この記事はアーカイブ目的でのみ保持されます。

このライブラリには、SQL Server および Azure SQL プラットフォーム用の Apache Spark コネクタのソース コードが含まれています。

Apache Spark は、"大規模なデータ処理のための統合された分析エンジン" です。

Maven で使用できるコネクタには、2.4.x 互換バージョンと 3.0.x 互換バージョンの 2 つのバージョンがあります。 maven.org からコネクタをダウンロード し、座標を使用してインポートします。

コネクタ Maven 座標
Spark 2.4.x 互換コネクタ com.microsoft.azure:spark-mssql-connector:1.0.2
Spark 3.0.x 互換コネクタ com.microsoft.azure:spark-mssql-connector_2.12:1.1.0
Spark 3.1.x 互換コネクタ com.microsoft.azure:spark-mssql-connector_2.12:1.2.0

ソースからコネクタをビルドしたり、GitHub のリリース セクションから jar をダウンロードしたりすることもできます。 コネクタの最新情報については、 SQL Spark コネクタの GitHub リポジトリを参照してください。

サポートされている機能

  • すべての Spark バインドのサポート (Scala、Python、R)
  • 基本認証と Active Directory (AD) キー タブのサポート
  • 並べ替えdataframe書き込みサポート
  • SQL Server ビッグ データ クラスターでの SQL Server 単一インスタンスとデータ プールへの書き込みのサポート
  • Sql Server 単一インスタンスに対する Reliable Connector のサポート
コンポーネント サポートされているバージョン
Apache Spark 2.4.x、3.0.x、3.1.x
スカラ (プログラミング言語) 2.11, 2.12
SQL Server 用 Microsoft JDBC ドライバー 8.4
Microsoft SQL Server SQL Server 2008 以降
Azure SQL データベース群 サポートされています

サポートされているオプション

APACHE Spark Connector for SQL Server と Azure SQL では、ここで定義されているオプションである SQL DataSource JDBC がサポートされています

さらに、次のオプションがサポートされています。

選択肢 既定値 説明
reliabilityLevel BEST_EFFORT BEST_EFFORT または NO_DUPLICATESNO_DUPLICATES は、Executor の再起動シナリオで信頼性の高い挿入を実装します
dataPoolDataSource none none は、値が設定されていないことを意味し、コネクタは SQL Server の単一インスタンスに書き込む必要があります。 ビッグ データ クラスターでデータ プール テーブルを書き込むには、この値をデータ ソース名に設定します
isolationLevel READ_COMMITTED 分離レベルを指定する
tableLock false TABLOCK オプションを使用して挿入を実装し、書き込みパフォーマンスを向上させます
schemaCheckEnabled true false に設定すると、厳密なデータ フレームと SQL テーブルスキーマチェックを無効にします。

その他の 一括コピー オプション は、 dataframe のオプションとして設定でき、書き込み時に bulkcopy API に渡されます

パフォーマンスの比較

SQL Server および Azure SQL 用の Apache Spark コネクタは、SQL Server に書き込むための汎用 JDBC コネクタよりも最大 15 倍高速です。 パフォーマンス特性は、種類、データの量、使用されるオプションによって異なり、実行のバリエーションが表示される場合があります。 次のパフォーマンス結果は、Spark dataframe内の 143.9M 行で SQL テーブルを上書きするのにかかった時間です。 spark dataframeは、store_salesを使用して生成された HDFS テーブル読み取ることによって構築されます。 store_salesからdataframeまでの読み取り時間は除外されます。 結果は 3 回の実行で平均されます。

コネクタの種類 オプション 説明 書き込み時間
JDBCConnector 既定値 既定のオプションを含む汎用 JDBC コネクタ 1,385 秒
sql-spark-connector BEST_EFFORT 既定のオプションを使用した最善の努力 sql-spark-connector 580 秒
sql-spark-connector NO_DUPLICATES 頼もしい sql-spark-connector 709 秒
sql-spark-connector BEST_EFFORT + tabLock=true テーブルロックが有効な状態での最善努力 sql-spark-connector 72 秒
sql-spark-connector NO_DUPLICATES + tabLock=true テーブル ロックが有効になっている信頼性の高いsql-spark-connector 198 秒

設定

  • Spark 構成: num_executors = 20、executor_memory = '1664 m'、executor_cores = 2
  • データ生成設定: スケールファクター=50, パーティション化テーブル=true
  • 行数 143,997,590 のデータ ファイルstore_sales

環境

一般的に直面する問題

java.lang.NoClassDefFoundError: com/microsoft/aad/adal4j/AuthenticationException

この問題は、hadoop 環境で古いバージョンの mssql ドライバー (このコネクタに含まれるようになりました) を使用して発生します。 以前の Azure SQL コネクタを使用していて、Microsoft Entra 認証の互換性のためにそのクラスターにドライバーを手動でインストールしている場合は、それらのドライバーを削除する必要があります。

この問題を解決する手順:

  1. 汎用 Hadoop 環境を使用している場合は、mssql jar: rm $HADOOP_HOME/share/hadoop/yarn/lib/mssql-jdbc-6.2.1.jre7.jarを確認して削除します。 Databricks を使用している場合は、グローバルまたはクラスター init スクリプトを追加して、 /databricks/jars フォルダーから古いバージョンの mssql ドライバーを削除するか、既存のスクリプトに次の行を追加します。 rm /databricks/jars/*mssql*

  2. adal4jパッケージとmssql パッケージを追加します。 たとえば、Maven を使用することはできますが、どのような方法でも機能します。

    注意事項

    この方法で SQL Spark コネクタをインストールしないでください。

  3. 接続構成にドライバー クラスを追加します。 例えば次が挙げられます。

    connectionProperties = {
      `Driver`: `com.microsoft.sqlserver.jdbc.SQLServerDriver`
    }`
    

詳細と説明については、解決策https://github.com/microsoft/sql-spark-connector/issues/26を参照してください。

作業を開始する

SQL Server および Azure SQL 用の Apache Spark コネクタは、Spark DataSourceV1 API と SQL Server Bulk API に基づいており、組み込みの JDBC Spark-SQL コネクタと同じインターフェイスを使用します。 この統合により、format パラメーターを com.microsoft.sqlserver.jdbc.spark で更新するだけで、コネクタを簡単に統合し、既存の Spark ジョブを移行できます。

プロジェクトにコネクタを含めるには、このリポジトリをダウンロードし、SBT を使用して jar をビルドします。

新しい SQL テーブルへの書き込み

Warnung

overwrite モードでは、既定でテーブルがデータベースに既に存在する場合は、まずテーブルが削除されます。 予期しないデータ損失を避けるために、このオプションを慎重に使用してください。

テーブルの再作成時にオプション overwriteを使用しない場合、モード truncateを使用すると、インデックスが失われます。 では、列ストア テーブルはヒープになります。 既存のインデックス作成を維持する場合は、値が true のオプション truncate も指定してください。 たとえば、.option("truncate","true") のようにします。

server_name = "jdbc:sqlserver://{SERVER_ADDR}"
database_name = "database_name"
url = server_name + ";" + "databaseName=" + database_name + ";"

table_name = "table_name"
username = "username"
password = "password123!#" # Please specify password here

try:
  df.write \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .mode("overwrite") \
    .option("url", url) \
    .option("dbtable", table_name) \
    .option("user", username) \
    .option("password", password) \
    .save()
except ValueError as error :
    print("Connector write failed", error)

SQL テーブルへの追加

try:
  df.write \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .mode("append") \
    .option("url", url) \
    .option("dbtable", table_name) \
    .option("user", username) \
    .option("password", password) \
    .save()
except ValueError as error :
    print("Connector write failed", error)

分離レベルを指定する

このコネクタは、既定では、データベースへの一括挿入を実行するときに READ_COMMITTED 分離レベルを使用します。 分離レベルをオーバーライドする場合は、 mssqlIsolationLevel オプションを使用します。

    .option("mssqlIsolationLevel", "READ_UNCOMMITTED") \

SQL テーブルからの読み取り

jdbcDF = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("user", username) \
        .option("password", password).load()

Microsoft Entra 認証

サービス プリンシパルを使用した Python の例

context = adal.AuthenticationContext(authority)
token = context.acquire_token_with_client_credentials(resource_app_id_url, service_principal_id, service_principal_secret)
access_token = token["accessToken"]

jdbc_db = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("accessToken", access_token) \
        .option("encrypt", "true") \
        .option("hostNameInCertificate", "*.database.windows.net") \
        .load()

Active Directory パスワードを使用した Python の例

jdbc_df = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("authentication", "ActiveDirectoryPassword") \
        .option("user", user_name) \
        .option("password", password) \
        .option("encrypt", "true") \
        .option("hostNameInCertificate", "*.database.windows.net") \
        .load()

Active Directory を使用して認証するには、必要な依存関係がインストールされている必要があります。

ActiveDirectoryPassword を使う場合の user の形式は、UPN 形式にする必要があります (例: username@domainname.com)。

Scala の場合_com.microsoft.aad.adal4j_成果物をインストールする必要があります。

Python の場合_adal_ ライブラリをインストールする必要があります。 これは、pip を介して利用できます。

サンプル ノートブックの例を確認します。

支援

Azure SQL および SQL Server 用の Apache Spark コネクタは、オープンソース プロジェクトです。 このコネクタには、Microsoft のサポートはありません。 コネクタに関する問題や質問については、このプロジェクト リポジトリに問題を作成してください。 コネクタ コミュニティはアクティブであり、送信を監視しています。

SQL Spark コネクタの GitHub リポジトリにアクセスします。