Share via


Python 用 Databricks SQL コネクタ

Databricks SQL Connector for Python は、Python コードを使用して Azure Databricks クラスターと Databricks SQL ウェアハウスで SQL コマンドを実行できるようにする Python ライブラリです。 Databricks SQL Connector for Python は、pyodbc のような類似の Python ライブラリよりも設定と使用が簡単です。 このライブラリは、PEP 249 – Python Database API Specification v2.0 に準拠しています。

Note

Databricks SQL Connector for Python には Azure Databricks の SQLAlchemy 言語も含まれています。 「Azure Databricks での SQLAlchemy の使用」を参照してください。

要件

  • Python >=3.8、<=3.11 を実行している開発マシン。
  • Databricks では、Python 仮想環境 (Python に含まれる venv によって提供されるものなど) を使用することをお勧めしています。 仮想環境は、正しいバージョンの Python と Databricks SQL Connector for Python を一緒に使用していることを確認するのに役立ちます。 仮想環境のセットアップと使用は、この記事の範囲外です。 詳細については、「仮想環境を作成する」を参照してください。
  • 既存のクラスターまたは SQL ウェアハウス

作業の開始

  • pip install databricks-sql-connector または python -m pip install databricks-sql-connector を実行して、Databricks SQL Connector for Python ライブラリを開発マシンにインストールします。

  • 使用するクラスターまたは SQL ウェアハウスに関する次の情報を収集します。

    クラスター

    • クラスターのサーバー ホスト名。 これは、クラスターの [詳細オプション]> [JDBC/ODBC] タブにある [サーバー ホスト名] の値から取得できます。
    • クラスターの HTTP パス。 これは、クラスターの [詳細オプション]> [JDBC/ODBC] タブにある [HTTP パス] の値から取得できます。

    SQL ウェアハウス

    • SQL ウェアハウスのサーバー ホスト名。 これは、SQL ウェアハウスの [接続の詳細] タブにある [サーバー ホスト名] の値から取得できます。
    • SQL ウェアハウスの HTTP パス。 これは、SQL ウェアハウスの [接続の詳細] タブにある [HTTP パス] の値から取得できます。

認証

Databricks SQL Connector for Python では、次の種類の Azure Databricks 認証がサポートされています。

Databricks SQL Connector for Python では、次の Azure Databricks 認証の種類はまだサポートされません。

Databricks 個人用アクセス トークン認証

Databricks SQL Connector for Python を Azure Databricks の個人用アクセス トークン認証で使用するには、まず、次のように Azure Databricks 個人用アクセス トークンを作成する必要があります。

  1. Azure Databricks ワークスペースの上部バーで、目的の Azure Databricks ユーザー名をクリックし、次にドロップダウンから [設定] を選択します。
  2. [開発者] をクリックします。
  3. [アクセス トークン] の横にある [管理] をクリックします。
  4. [新しいトークンの生成] をクリックします。
  5. (省略可能) 将来このトークンを識別するのに役立つコメントを入力し、トークンの既定の有効期間 90 日を変更します。 有効期間のないトークンを作成するには (推奨されません)、[有効期間 (日)] ボックスを空のままにします。
  6. [Generate](生成) をクリックします。
  7. 表示されたトークンを安全な場所にコピーし、[完了] をクリックします。

Note

コピーしたトークンは必ず安全な場所に保存してください。 コピーしたトークンは他人に見せないでください。 コピーしたトークンを失った場合、それとまったく同じトークンは再生成できません。 代わりに、この手順を繰り返して新しいトークンを作成する必要があります。 コピーしたトークンを紛失した場合や、トークンが侵害されていると思われる場合、Databricks では、[アクセス トークン] ページのトークンの横にあるごみ箱 ([取り消し]) アイコンをクリックして、ワークスペースからそのトークンをすぐに削除することを強くお勧めします。

ワークスペースでトークンを作成することや使用することができない場合は、ワークスペース管理者によってトークンが無効にされているか、トークンを作成または使用する権限が作業者に付与されていない可能性があります。 ワークスペース管理者に連絡するか、以下の情報を参照してください。

Databricks SQL Connector for Python を認証するには、次のコード スニペットを使用します。 このスニペットでは、次の環境変数を設定していることを前提としています。

  • DATABRICKS_SERVER_HOSTNAME は、お使いのクラスターまたは SQL ウェアハウスのサーバー ホスト名の値に設定します。
  • DATABRICKS_HTTP_PATH が、クラスターまたは SQL ウェアハウスの HTTP パスの値に設定されていること。
  • DATABRICKS_TOKEN が、Azure Databricks 個人用アクセス トークンに設定されていること。

環境変数を設定するには、オペレーティング システムのドキュメントを参照してください。

from databricks import sql
import os

with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
                 http_path       = os.getenv("DATABRICKS_HTTP_PATH"),
                 access_token    = os.getenv("DATABRICKS_TOKEN")) as connection:
# ...

OAuth マシン間 (M2M) 認証

Databricks SQL Connector for Python バージョン 2.7.0 以降では、OAuth マシン間 (M2M) 認証をサポートしています。 Databricks SDK for Python 0.18.0 以上もインストールする必要があります (たとえば、pip install databricks-sdk または python -m pip install databricks-sdk を実行します)。

Databricks SQL Connector for Python と OAuth M2M 認証を使用するには、次を行う必要があります。

  1. Azure Databricks ワークスペースで Azure Databricks サービス プリンシパルを作成し、そのサービス プリンシパルの OAuth シークレットを作成します。

    サービス プリンシパルとその OAuth シークレットを作成するには、「OAuth マシン間 (M2M) 認証」をご参照ください。 サービス プリンシパルの UUID またはアプリケーション ID の値と、サービス プリンシパルの OAuth シークレットのシークレットを書き留めておいてください。

  2. そのサービス プリンシパルにクラスターまたはウェアハウスへのアクセス権を付与します。

    サービス プリンシパルにクラスターまたはウェアハウスへのアクセス権を付与するには、「コンピューティングのアクセス許可」または「SQL ウェアハウスを管理する」を参照してください。

Databricks SQL Connector for Python を認証するには、次のコード スニペットを使用します。 このスニペットでは、次の環境変数を設定していることを前提としています。

  • DATABRICKS_SERVER_HOSTNAME が、クラスターまたは SQL ウェアハウスのサーバー ホスト名の値に設定されていること。
  • DATABRICKS_HTTP_PATH が、クラスターまたは SQL ウェアハウスの HTTP パスの値に設定されていること。
  • DATABRICKS_CLIENT_ID。サービス プリンシパルの UUID またはアプリケーション ID の値に設定します。
  • DATABRICKS_CLIENT_SECRET が、サービス プリンシパルの OAuth シークレットの [シークレット] 値に設定されていること。

環境変数を設定するには、オペレーティング システムのドキュメントを参照してください。

from databricks.sdk.core import Config, oauth_service_principal
from databricks import sql
import os

server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME")

def credential_provider():
  config = Config(
    host          = f"https://{server_hostname}",
    client_id     = os.getenv("DATABRICKS_CLIENT_ID"),
    client_secret = os.getenv("DATABRICKS_CLIENT_SECRET"))
  return oauth_service_principal(config)

with sql.connect(server_hostname      = server_hostname,
                 http_path            = os.getenv("DATABRICKS_HTTP_PATH"),
                 credentials_provider = credential_provider) as connection:
# ...

Microsoft Entra ID (旧称 Azure Active Directory) トークン認証

Microsoft Entra ID (旧称: Azure Active Directory) トークン認証で Databricks SQL Connector for Python を使うには、Databricks SQL Connector for Python に Microsoft Entra ID トークンを指定する必要があります。 Microsoft Entra ID アクセス トークンを作成するには、次の操作を行います。

Microsoft Entra ID トークンの既定の有効期間は約 1 時間です。 新しい Microsoft Entra ID トークンを作成するには、このプロセスを繰り返します。

Databricks SQL Connector for Python を認証するには、次のコード スニペットを使用します。 このスニペットでは、次の環境変数を設定していることを前提としています。

  • DATABRICKS_SERVER_HOSTNAME は、お使いのクラスターまたは SQL ウェアハウスのサーバー ホスト名の値に設定します。
  • DATABRICKS_HTTP_PATH は、お使いのクラスターまたは SQL ウェアハウスの HTTP パスの値に設定します。
  • DATABRICKS_TOKEN は、Microsoft Entra ID トークンに設定します。

環境変数を設定するには、オペレーティング システムのドキュメントを参照してください。

from databricks import sql
import os

with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
                 http_path       = os.getenv("DATABRICKS_HTTP_PATH"),
                 access_token    = os.getenv("DATABRICKS_TOKEN")) as connection:
# ...

OAuth ユーザー対マシン (U2M) 認証

Databricks SQL Connector for Python バージョン 2.7.0 以上では、OAuth ユーザー対マシン (U2M) 認証をサポートします。 Databricks SDK for Python 0.19.0 以上もインストールする必要があります (たとえば、pip install databricks-sdk または python -m pip install databricks-sdk を実行します)。

OAuth U2M 認証で Databricks SQL Connector for Python を認証するには、次のコード スニペットを使用します。 OAuth U2M 認証では、リアルタイムの人間のサインインと同意を使用して、ターゲットの Azure Databricks ユーザー アカウントを認証します。 このスニペットでは、次の環境変数を設定していることを前提としています。

  • DATABRICKS_SERVER_HOSTNAME は、お使いのクラスターまたは SQL ウェアハウスのサーバー ホスト名の値に設定します。
  • DATABRICKS_HTTP_PATH は、お使いのクラスターまたは SQL ウェアハウスの HTTP パスの値に設定します。

環境変数を設定するには、オペレーティング システムのドキュメントを参照してください。

from databricks import sql
import os

with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
                 http_path       = os.getenv("DATABRICKS_HTTP_PATH"),
                 auth_type       = "databricks-oauth") as connection:
# ...

次のコード例は、Databricks SQL Connector for Python を使用して、データのクエリと挿入、メタデータのクエリ、カーソルと接続の管理、ログ記録の構成を行う方法を示しています。

Note

以下のコード例は、認証に Azure Databricks 個人用アクセス トークンをどのように使用するかを示します。 代わりに他の種類の使用可能な Azure Databricks 認証を使用するには、「認証」をご覧ください。

このコード例では、これらの環境変数から server_hostnamehttp_pathaccess_token 接続変数の値を取得します。

  • DATABRICKS_SERVER_HOSTNAME は、要件からのサーバー ホスト名を表します。
  • DATABRICKS_HTTP_PATH は、要件からの HTTP パスを表します。
  • DATABRICKS_TOKEN は、要件からのアクセス トークンを表します。

これらの接続変数の値を取得するには、他の方法を使用できます。 環境変数の使用は、多くのアプローチの 1 つにすぎません。

クエリ データ

次のコード例は、Databricks SQL Connector for Python を呼び出して、クラスターまたは SQL ウェアハウスで基本的な SQL コマンドを実行する方法を示しています。 このコマンドを使用すると、samples カタログの nyctaxi スキーマの trips テーブルから最初の 2 行が返されます。

from databricks import sql
import os

with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
                 http_path       = os.getenv("DATABRICKS_HTTP_PATH"),
                 access_token    = os.getenv("DATABRICKS_TOKEN")) as connection:

  with connection.cursor() as cursor:
    cursor.execute("SELECT * FROM samples.nyctaxi.trips LIMIT 2")
    result = cursor.fetchall()

    for row in result:
      print(row)

データを挿入する

次の例では、少量のデータ (数千行) を挿入する方法を示します。

from databricks import sql
import os

with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
                 http_path       = os.getenv("DATABRICKS_HTTP_PATH"),
                 access_token    = os.getenv("DATABRICKS_TOKEN")) as connection:

  with connection.cursor() as cursor:
    cursor.execute("CREATE TABLE IF NOT EXISTS squares (x int, x_squared int)")

    squares = [(i, i * i) for i in range(100)]
    values = ",".join([f"({x}, {y})" for (x, y) in squares])

    cursor.execute(f"INSERT INTO squares VALUES {values}")

    cursor.execute("SELECT * FROM squares LIMIT 10")

    result = cursor.fetchall()

    for row in result:
      print(row)

大量のデータの場合は、最初にデータをクラウド ストレージにアップロードしてから、COPY INTO コマンドを実行する必要があります。

メタデータのクエリを実行する

メタデータを取得するための専用のメソッドがあります。 次の例では、サンプル テーブルの列に関するメタデータを取得します。

from databricks import sql
import os

with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
                 http_path       = os.getenv("DATABRICKS_HTTP_PATH"),
                 access_token    = os.getenv("DATABRICKS_TOKEN")) as connection:

  with connection.cursor() as cursor:
    cursor.columns(schema_name="default", table_name="squares")
    print(cursor.fetchall())

カーソルと接続を管理する

使用されなくなった接続とカーソルを閉じるのがベスト プラクティスです。 これにより、Azure Databricks クラスターと Databricks SQL ウェアハウスのリソースが解放されます。

コンテキスト マネージャー (前の例で使用した with 構文) を使用してリソースを管理したり、明示的に close を呼び出したりできます。

from databricks import sql
import os

connection = sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
                         http_path       = os.getenv("DATABRICKS_HTTP_PATH"),
                         access_token    = os.getenv("DATABRICKS_TOKEN"))

cursor = connection.cursor()

cursor.execute("SELECT * from range(10)")
print(cursor.fetchall())

cursor.close()
connection.close()

ログの構成

Databricks SQL コネクタでは、Python の標準ログ モジュールが使用されます。 ログ レベルは、次のように構成できます。

from databricks import sql
import os, logging

logging.getLogger("databricks.sql").setLevel(logging.DEBUG)
logging.basicConfig(filename = "results.log",
                    level    = logging.DEBUG)

connection = sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
                         http_path       = os.getenv("DATABRICKS_HTTP_PATH"),
                         access_token    = os.getenv("DATABRICKS_TOKEN"))

cursor = connection.cursor()

cursor.execute("SELECT * from range(10)")

result = cursor.fetchall()

for row in result:
   logging.debug(row)

cursor.close()
connection.close()

テスト

コードをテストするには、pytest などの Python テスト フレームワークを使用します。 Azure Databricks REST API エンドポイントを呼び出したり、Azure Databricks アカウントまたはワークスペースの状態を変更したりせずに、シミュレートされた条件下でコードをテストするには、unittest.mock などの Python モック ライブラリを使用できます。

たとえば、Azure Databricks 個人用アクセス トークンを使用して Azure Databricks ワークスペースへの接続を返す get_connection_personal_access_token 関数を含む helpers.py という名前の次のファイルと、接続を使用して samples カタログの nyctaxi スキーマの trips テーブルから指定された数のデータ行を取得する select_nyctaxi_trips 関数を指定します。

# helpers.py

from databricks import sql
from databricks.sql.client import Connection, List, Row, Cursor

def get_connection_personal_access_token(
  server_hostname: str,
  http_path: str,
  access_token: str
) -> Connection:
  return sql.connect(
    server_hostname = server_hostname,
    http_path = http_path,
    access_token = access_token
  )

def select_nyctaxi_trips(
  connection: Connection,
  num_rows: int
) -> List[Row]:
  cursor: Cursor = connection.cursor()
  cursor.execute(f"SELECT * FROM samples.nyctaxi.trips LIMIT {num_rows}")
  result: List[Row] = cursor.fetchall()
  return result

get_connection_personal_access_token 関数と select_nyctaxi_trips 関数を呼び出す main.py という名前の次のファイルを指定します。

# main.py

from databricks.sql.client import Connection, List, Row
import os
from helpers import get_connection_personal_access_token, select_nyctaxi_trips

connection: Connection = get_connection_personal_access_token(
  server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
  http_path = os.getenv("DATABRICKS_HTTP_PATH"),
  access_token = os.getenv("DATABRICKS_TOKEN")
)

rows: List[Row] = select_nyctaxi_trips(
  connection = connection,
  num_rows = 2
)

for row in rows:
  print(row)

test_helpers.py という名前の次のファイルは、select_nyctaxi_trips 関数が、想定される応答を返すかどうかをテストします。 このテストでは、ターゲット ワークスペースへの実際の接続を作成するのではなく、Connection オブジェクトをモックします。 また、このテストでは、実際のデータ内にあるスキーマと値に準拠するデータをモックします。 テストでは、モックされた接続を介してモックされたデータが返され、モックされたデータ行の値のいずれかが期待値と一致するかどうかを確認します。

# test_helpers.py

import pytest
from databricks.sql.client import Connection, List, Row
from datetime import datetime
from helpers import select_nyctaxi_trips
from unittest.mock import create_autospec

@pytest.fixture
def mock_data() -> List[Row]:
  return [
    Row(
      tpep_pickup_datetime = datetime(2016, 2, 14, 16, 52, 13),
      tpep_dropoff_datetime = datetime(2016, 2, 14, 17, 16, 4),
      trip_distance = 4.94,
      fare_amount = 19.0,
      pickup_zip = 10282,
      dropoff_zip = 10171
    ),
    Row(
      tpep_pickup_datetime = datetime(2016, 2, 4, 18, 44, 19),
      tpep_dropoff_datetime = datetime(2016, 2, 4, 18, 46),
      trip_distance = 0.28,
      fare_amount = 3.5,
      pickup_zip = 10110,
      dropoff_zip = 10110
    )
  ]

def test_select_nyctaxi_trips(mock_data: List[Row]):
  # Create a mock Connection.
  mock_connection = create_autospec(Connection)

  # Set the mock Connection's cursor().fetchall() to the mock data.
  mock_connection.cursor().fetchall.return_value = mock_data

  # Call the real function with the mock Connection.
  response: List[Row] = select_nyctaxi_trips(
    connection = mock_connection,
    num_rows = 2)

  # Check the value of one of the mocked data row's columns.
  assert response[1].fare_amount == 3.5

select_nyctaxi_trips 関数には SELECT ステートメントが含まれ、そのため、trips テーブルの状態が変更されないため、モックはこの例では完全に不要です。 ただし、モックの場合、ワークスペースとの実際の接続が行われるのを待つことなく、テストを速やかに実行できます。 また、モックの場合、INSERT INTOUPDATEDELETE FROM など、テーブルの状態を変更する可能性のある関数に対してシミュレーション テストを複数回実行できます。

API リファレンス

Package

databricks-sql-connector

使用法: pip install databricks-sql-connector

Python Package Index (PyPI) の databricks-sql-connector も参照してください。

モジュール

databricks.sql

使用法: from databricks import sql

クラス

選択するクラスには、次のようなものがあります。

クラス
Connection

Azure Databricks コンピューティング リソース上のセッション。
Cursor

データ レコードを走査するためのメカニズム。
Row

SQL クエリ結果のデータ行。

Connection クラス

Connection オブジェクトを作成するには、次のパラメータを指定して databricks.sql.connect メソッドを呼び出します。

パラメーター
server_hostname

型: str

クラスターまたは SQL ウェアハウスのサーバー ホスト名。 サーバー ホスト名を取得するには、この記事の前の手順を参照してください。

このパラメーターは必須です。

例: adb-1234567890123456.7.azuredatabricks.net
http_path

型: str

クラスターまたは SQL ウェアハウスの HTTP パス。 HTTP パスを取得するには、この記事の前の手順を参照してください。

このパラメーターは必須です。

例:
クラスターの場合は sql/protocolv1/o/1234567890123456/1234-567890-test123
SQL ウェアハウスの場合は /sql/1.0/warehouses/a1b234c567d8e9fa
access_token, auth_type

型: str

Azure Databricks の認証設定に関する情報。 詳細については、「認証」を参照してください。
session_configuration

型: dict[str, Any]

Spark セッション構成パラメーターの辞書。 構成の設定は、SET key=val SQL コマンドを使用することと同じです。 SQL コマンド SET -v を実行して、使用可能な構成の完全なリストを取得します。

既定値は None です。

このパラメーターは省略可能です。

例: {"spark.sql.variable.substitute": True}
http_headers

型: List[Tuple[str, str]]]

クライアントが行う RPC 要求ごとに HTTP ヘッダーで設定する追加の (キー、値) ペア。 一般的な使用法では、追加の HTTP ヘッダーは設定されません。 既定値は None です。

このパラメーターは省略可能です。

バージョン 2.0 以降
catalog

型: str

接続に使用する初期カタログです。 既定値は None です (この場合、既定のカタログ (通常は hive_metastore) が使用されます)。

このパラメーターは省略可能です。

バージョン 2.0 以降
schema

型: str

接続に使用する初期スキーマです。 既定値は None です (この場合、既定のスキーマ (通常は default) が使用されます)。

このパラメーターは省略可能です。

バージョン 2.0 以降
use_cloud_fetch

型: bool

True では、フェッチ要求を直接、クラウド オブジェクト ストアに送信し、まとまったデータをダウンロードします。 False (既定値) では Azure Databricks に直接、フェッチ要求を送信します。

use_cloud_fetchTrue に設定されていてもネットワーク アクセスがブロックされている場合、フェッチ要求は失敗します。

バージョン 2.8 以降

選択する Connection メソッドには、次のようなものがあります。

メソッド
close

データベースへの接続を閉じ、サーバー上のすべての関連リソースを解放します。 この接続への呼び出しを追加すると、Error がスローされます。

パラメーターはありません。

戻り値はありません。
cursor

データベース内のレコードを走査できる新しい Cursor オブジェクトを返します。

パラメーターはありません。

Cursor クラス

Cursor オブジェクトを作成するには、Connection クラスの cursor メソッドを呼び出します。

選択する Cursor 属性には、次のようなものがあります。

属性
arraysize

fetchmany メソッドで使われ、内部バッファー サイズを指定します。これは、一度にサーバーから実際にフェッチされる行数でもあります。 既定値は 10000 です。 狭い結果 (各行に多くのデータが含まれていない結果) の場合は、パフォーマンスを向上させるために、この値を大きくする必要があります。

読み取り/書き込みアクセス。
description

Python の tuple オブジェクトの list が含まれます。 これらの tuple オブジェクトにはそれぞれ 7 つの値が含まれ、各 tuple オブジェクトの最初の 2 つの項目に、次のように単一の結果列を説明する情報が含まれます。

* name: 列の名前。
* type_code 列の型を表す文字列。 たとえば、整数列の型コードは int になります。

各 7 項目の tuple オブジェクトの残りの 5 項目は実装されておらず、それらの値は定義されていません。 通常、これらでは 4 が返されます
None 値と、その後に続く 1 つの True 値。

読み取り専用アクセス。

選択する Cursor メソッドには、次のようなものがあります。

メソッド
cancel

カーソルが起動したデータベース クエリまたはコマンドの実行を中断します。 サーバー上の関連リソースを解放するには、
cancel メソッドを呼び出した後で close メソッドを呼び出します。

パラメーターはありません。

戻り値はありません。
close

カーソルを閉じ、サーバー上の関連リソースを解放します。 既に閉じているカーソルを閉じると、エラーがスローされる可能性があります。

パラメーターはありません。

戻り値はありません。
execute

データベース クエリまたはコマンドを準備して実行します。

戻り値はありません。

パラメーター:

operation

型: str

準備してから実行するクエリまたはコマンド。

このパラメーターは必須です。

parameters パラメーターを指定しない例:


cursor.execute(
'SELECT * FROM samples.nyctaxi.trips WHERE pickup_zip="10019" LIMIT 2'
)

parameters パラメーターを指定する例:


cursor.execute(
'SELECT * FROM samples.nyctaxi.trips WHERE zip=%(pickup_zip)s LIMIT 2',
{ 'pickup_zip': '10019' }
)

parameters

型: 辞書

operation パラメーターと一緒に使用するパラメーターのシーケンス。

このパラメーターは省略可能です。 既定値は、None です。
executemany

seq_of_parameters 引数内のすべてのパラメーター シーケンスを使用して、データベース クエリまたはコマンドを準備して実行します。 最終的な結果セットだけが保持されます。

戻り値はありません。

パラメーター:

operation

型: str

準備してから実行するクエリまたはコマンド。

このパラメーターは必須です。

seq_of_parameters

型: dictlist

以下で使用するパラメーター値の多くのセットのシーケンス:
operation パラメーター。

このパラメーターは必須です。
catalogs

カタログに関するメタデータ クエリを実行します。 実際の結果は、fetchmany または fetchall を使用してフェッチする必要があります。

結果セットの重要なフィールドには、以下が含まれます。

* フィールド名: TABLE_CAT。 次のコマンドを入力します: str カタログの名前です。

パラメーターはありません。

戻り値はありません。

"バージョン 1.0 以降"
schemas

スキーマに関するメタデータ クエリを実行します。 実際の結果は、fetchmany または fetchall を使用してフェッチする必要があります。

結果セットの重要なフィールドには、以下が含まれます。

* フィールド名: TABLE_SCHEM。 次のコマンドを入力します: str スキーマの名前です。
* フィールド名: TABLE_CATALOG。 次のコマンドを入力します: str スキーマが属するカタログ。

戻り値はありません。

"バージョン 1.0 以降"

パラメーター:

catalog_name

型: str

情報を取得するカタログ名。 % 文字は、ワイルドカードとして解釈されます。

このパラメーターは省略可能です。

schema_name

型: str

情報を取得するスキーマ名。 % 文字は、ワイルドカードとして解釈されます。

このパラメーターは省略可能です。
tables

テーブルとビューに関するメタデータ クエリを実行します。 実際の結果は、fetchmany または fetchall を使用してフェッチする必要があります。

結果セットの重要なフィールドには、以下が含まれます。

* フィールド名: TABLE_CAT。 次のコマンドを入力します: str テーブルが属するカタログ。
* フィールド名: TABLE_SCHEM。 次のコマンドを入力します: str テーブルが属するスキーマ。
* フィールド名: TABLE_NAME。 次のコマンドを入力します: str テーブルの名前。
* フィールド名: TABLE_TYPE。 次のコマンドを入力します: str 関係の種類。例えば VIEWTABLE (Databricks Runtime 10.4 LTS 以降および Databricks SQL に適用されます。以前のバージョンの Databricks Runtime では空の文字列が返されます)。

戻り値はありません。

"バージョン 1.0 以降"

パラメーター

catalog_name

型: str

情報を取得するカタログ名。 % 文字は、ワイルドカードとして解釈されます。

このパラメーターは省略可能です。

schema_name

型: str

情報を取得するスキーマ名。 % 文字は、ワイルドカードとして解釈されます。

このパラメーターは省略可能です。

table_name

型: str

情報を取得するテーブル名。 % 文字は、ワイルドカードとして解釈されます。

このパラメーターは省略可能です。

table_types

型: List[str]

照合するテーブル型のリスト (例:TABLEVIEW)。

このパラメーターは省略可能です。
columns

列に関するメタデータ クエリを実行します。 実際の結果は、fetchmany または fetchall を使用してフェッチする必要があります。

結果セットの重要なフィールドには、以下が含まれます。

* フィールド名: TABLE_CAT。 次のコマンドを入力します: str 列が属するカタログ。
* フィールド名: TABLE_SCHEM。 次のコマンドを入力します: str 列が属するスキーマ。
* フィールド名: TABLE_NAME。 次のコマンドを入力します: str 列が属するテーブルの名前。
* フィールド名: COLUMN_NAME。 次のコマンドを入力します: str 列の名前です。

戻り値はありません。

"バージョン 1.0 以降"

パラメーター:

catalog_name

型: str

情報を取得するカタログ名。 % 文字は、ワイルドカードとして解釈されます。

このパラメーターは省略可能です。

schema_name

型: str

情報を取得するスキーマ名。 % 文字は、ワイルドカードとして解釈されます。

このパラメーターは省略可能です。

table_name

型: str

情報を取得するテーブル名。 % 文字は、ワイルドカードとして解釈されます。

このパラメーターは省略可能です。

column_name

型: str

情報を取得する列名。 % 文字は、ワイルドカードとして解釈されます。

このパラメーターは省略可能です。
fetchall

クエリのすべて (または残りすべて) の行を取得します。

パラメーターはありません。

クエリのすべて (または残りすべて) の行を、Python の次のものの list として返します。
Row オブジェクト。

execute メソッドの前回の呼び出しでデータが返されなかった場合、または execute の呼び出しがまだ行われていない場合は、Error をスローします。
fetchmany

クエリの次の行を取得します。

クエリの次の最大 size (または、size が指定されていない場合は arraysize 属性) 行を、Python の Row オブジェクトの list として返します。

残りのフェッチする行が size より少ない場合は、残りの行がすべて返されます。

execute メソッドの前回の呼び出しでデータが返されなかった場合、または execute の呼び出しがまだ行われていない場合は、Error をスローします。

パラメーター:

size

型: int

取得する次の行の数。

このパラメーターは省略可能です。 指定しない場合、arraysize 属性の値が使用されます。

例: cursor.fetchmany(10)
fetchone

データセットの次の行を取得します。

パラメーターはありません。

データセットの次の行を単一のシーケンスとして Python の
tuple オブジェクトで返すか、使用可能なデータがこれ以上ない場合は None を返します。

execute メソッドの前回の呼び出しでデータが返されなかった場合、または execute の呼び出しがまだ行われていない場合は、Error をスローします。
fetchall_arrow

クエリのすべて (または残りすべて) の行を、PyArrow Table オブジェクトとして取得します。 非常に大量のデータを返すクエリは、メモリ消費を減らす代わりに、fetchmany_arrow を使用する必要があります。

パラメーターはありません。

クエリのすべて (または残りすべて) の行を PyArrow テーブルとして返します。

execute メソッドの前回の呼び出しでデータが返されなかった場合、または execute の呼び出しがまだ行われていない場合は、Error をスローします。

バージョン 2.0 以降
fetchmany_arrow

クエリの次の行を PyArrow Table オブジェクトとして取得します。

クエリの次の最大 size 引数 (または size が指定されていない場合は arraysize 属性) 行を、Python PyArrow の次のものとして返します。
Table オブジェクト。

execute メソッドの前回の呼び出しでデータが返されなかった場合、または execute の呼び出しがまだ行われていない場合は、Error をスローします。

バージョン 2.0 以降

パラメーター:

size

型: int

取得する次の行の数。

このパラメーターは省略可能です。 指定しない場合、arraysize 属性の値が使用されます。

例: cursor.fetchmany_arrow(10)

Row クラス

行クラスは、個々の結果行を表すタプルのようなデータ構造です。 行に "my_column" という名前の列が含まれている場合は、row.my_column を介して row"my_column" フィールドにアクセスできます。 数値インデックスを使用して、row[0] などのフィールドにアクセスすることもできます。 列名が属性メソッド名として許可されていない場合 (たとえば、数字で始まる場合)、row["1_my_column"] としてフィールドにアクセスできます。

"バージョン 1.0 以降"

選択する Row メソッドには、次のようなものがあります。

| asDict

フィールド名のインデックスが付けられた行の辞書表現を返します。 重複するフィールド名がある場合は、重複するフィールドの 1 つ (ただし 1 つのみ) が辞書に返されます。 どの重複フィールドが返されるかは定義されていません。

パラメーターはありません。

フィールドの dict を返します。 |

型の変換

次の表は、Apache Spark SQL データ型と、同等の Python データ型の対応付けを示しています。

Apache Spark SQL データ型 Python データ型
array numpy.ndarray
bigint int
binary bytearray
boolean bool
date datetime.date
decimal decimal.Decimal
double float
int int
map str
null NoneType
smallint int
string str
struct str
timestamp datetime.datetime
tinyint int

トラブルシューティング

tokenAuthWrapperInvalidAccessToken: Invalid access token メッセージ

問題: コードを実行すると、Error during request to server: tokenAuthWrapperInvalidAccessToken: Invalid access token のようなメッセージが表示されます。

考えられる原因: access_token に渡された値が、Azure Databricks の有効な個人用アクセス トークンではありません。

推奨される修正: access_token に渡された値が正しいか確認し、もう一度試してください。

gaierror(8, 'nodename nor servname provided, or not known') メッセージ

問題: コードを実行すると、Error during request to server: gaierror(8, 'nodename nor servname provided, or not known') のようなメッセージが表示されます。

考えられる原因: server_hostname に渡された値が正しいホスト名ではありません。

推奨される修正: server_hostname に渡された値が正しいか確認し、もう一度試してください。

サーバー ホスト名の検索について詳しくは、「Azure Databricks コンピューティング リソースの接続の詳細を取得する」をご覧ください。

IpAclError メッセージ

問題: コードを実行すると、Azure Databricks ノートブックでコネクタを使用しようとしたときに Error during request to server: IpAclValidation メッセージが表示されます。

考えられる原因: Azure Databricks ワークスペースで IP 許可リストを有効にしている可能性があります。 IP 許可リストがあると、既定では Spark クラスターからコントロール プレーンへの接続が許可されません。

推奨される修正: コンピューティング プレーン サブネットを IP 許可リストに追加するように管理者に依頼してください。

その他のリソース

詳細については、次を参照してください。