用于 Python 的 Databricks SQL 连接器是一个 Python 库,可用于使用 Python 代码在 Azure Databricks 全用途计算和 Databricks SQL 仓库上运行 SQL 命令。 相比类似的 Python 库(如 pyodbc),适用于 Python 的 Databricks SQL 连接器更易于设置和使用。 此库遵循 PEP 249 – Python 数据库 API 规范 v2.0。
重要
用于 Python 版本 3.0.0 及更高版本的 Databricks SQL 连接器支持本机参数化查询执行,这可以防止 SQL 注入并改进查询性能。 以前版本使用内联参数化执行,这对于 SQL 注入不安全,并且存在其他缺点。 有关详细信息,请参阅 “使用本机参数”。
用于 Python 的 Databricks SQL 连接器还支持 Azure Databricks SQLAlchemy 方言,但必须安装它才能使用这些功能。 请参阅将 SQLAlchemy 与 Azure Databricks 配合使用。
要求
- 运行 Python >=3.8 和<=3.11 的开发计算机。
- Databricks 建议使用 Python 虚拟环境,例如 python 随附的 venv 提供的环境。 虚拟环境有助于确保同时使用正确版本的 Python 和适用于 Python 的 Databricks SQL 连接器。 设置和使用虚拟环境不在本文的讨论范围之内。 有关详细信息,请参阅创建虚拟环境。
- 现有的 全用途计算 或 SQL 仓库。
开始使用
安装用于 Python 的 Databricks SQL 连接器。 PyArrow 是用于 Python 的 Databricks SQL 连接器的可选依赖项,默认情况下未在连接器版本 4.0.0 及更高版本中安装。 如果未安装 PyArrow,CloudFetch 和其他 Apache Arrow 功能等功能不可用,这可能会影响大量数据的性能。
- 若要安装精简连接器,请使用
pip install databricks-sql-connector
。 - 若要安装完整的连接器,包括 PyArrow,请使用
pip install databricks-sql-connector[pyarrow]
。
- 若要安装精简连接器,请使用
为要使用的通用计算或 SQL 仓库收集以下信息:
通用计算
SQL 仓库
- SQL 仓库的服务器主机名。 从 SQL 仓库的“连接详细信息”选项卡的“服务器主机名”值中可以获取此主机名。
- SQL 仓库的 HTTP 路径。 从 SQL 仓库的“连接详细信息”选项卡的“HTTP 路径”值中可以获取此路径。
身份验证
适用于 Python 的 Databricks SQL 连接器支持以下 Azure Databricks 身份验证类型:
- Databricks 个人访问令牌身份验证
- Microsoft Entra ID 令牌身份验证
- OAuth 计算机到计算机 (M2M) 身份验证
- OAuth 用户到计算机 (U2M) 身份验证
适用于 Python 的 Databricks SQL 连接器尚不支持以下 Azure Databricks 身份验证类型:
Databricks 个人访问令牌身份验证
若要将用于 Python 的 Databricks SQL 连接器与 Azure Databricks 个人访问令牌身份验证配合使用,必须先创建 Azure Databricks 个人访问令牌。 为此,请遵循工作区用户的 Azure Databricks 个人访问令牌中的步骤。
要对适用于 Python 的 Databricks SQL 连接器进行身份验证,可使用以下代码片段。 此代码片段假定你已设置以下环境变量:
-
DATABRICKS_SERVER_HOSTNAME
将其设置为用于通用计算或 SQL 仓库的服务器主机名值。 -
DATABRICKS_HTTP_PATH
,设置为用于所有用途的计算或 SQL 仓库的 HTTP 路径 值。 -
DATABRICKS_TOKEN
,设置为 Azure Databricks 个人访问令牌。
若要设置环境变量,请参阅操作系统的文档。
from databricks import sql
import os
with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN")) as connection:
# ...
OAuth 计算机到计算机 (M2M) 身份验证
适用于 Python 的 Databricks SQL 连接器版本 2.7.0 及更高版本支持 OAuth 计算机到计算机 (M2M) 身份验证。 还必须安装用于 Python 0.18.0 或更高版本的 Databricks SDK(例如通过运行 pip install databricks-sdk
或 python -m pip install databricks-sdk
)。
要将 Databricks SQL Connector for Python 与 OAuth M2M 身份验证配合使用,必须执行以下操作:
在 Azure Databricks 工作区中创建 Azure Databricks 服务主体,并为该服务主体创建 OAuth 机密。
若要创建服务主体及其 OAuth 机密,请参阅 使用 OAuth通过服务主体授权对 Azure Databricks 资源的无人参与访问。 记下服务主体的 UUID 或应用程序 ID 值,以及服务主体的 OAuth 机密的机密值。
为该服务主体提供对通用计算或仓库的访问权限。
若要向服务主体授予对通用计算或仓库的访问权限,请参阅 “计算权限 ”或 “管理 SQL 仓库”。
要对适用于 Python 的 Databricks SQL 连接器进行身份验证,可使用以下代码片段。 此代码片段假定你已设置以下环境变量:
-
DATABRICKS_SERVER_HOSTNAME
设置为用于所有用途计算或 SQL 仓库 的服务器主机名 值。 -
DATABRICKS_HTTP_PATH
,设置为用于所有用途的计算或 SQL 仓库的 HTTP 路径 值。 -
DATABRICKS_CLIENT_ID
,设置为服务主体的 UUID 或应用程序 ID 值。 -
DATABRICKS_CLIENT_SECRET
,设置为服务主体的 OAuth 机密的“机密”值。
若要设置环境变量,请参阅操作系统的文档。
from databricks.sdk.core import Config, oauth_service_principal
from databricks import sql
import os
server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME")
def credential_provider():
config = Config(
host = f"https://{server_hostname}",
client_id = os.getenv("DATABRICKS_CLIENT_ID"),
client_secret = os.getenv("DATABRICKS_CLIENT_SECRET"))
return oauth_service_principal(config)
with sql.connect(server_hostname = server_hostname,
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
credentials_provider = credential_provider) as connection:
# ...
Microsoft Entra ID 令牌身份验证
若要将适用于 Python 的 Databricks SQL 连接器与 Microsoft Entra ID 令牌身份验证配合使用,需要向适用于 Python 的 Databricks SQL 连接器提供 Microsoft Entra ID 令牌。 若要创建 Microsoft Entra ID 访问令牌,请执行以下操作:
- 对于 Azure Databricks 用户,可以使用 Azure CLI。 请参阅使用 Azure CLI 获取用户的 Microsoft Entra ID 令牌。
- 有关 Microsoft Entra ID 服务主体,请参阅使用 Azure CLI 获取 Microsoft Entra ID 访问令牌。 若要创建Microsoft Entra ID 托管服务主体,请参阅 服务主体。
Microsoft Entra ID 令牌的默认生存期约为 1 小时。 要创建新的 Microsoft Entra ID 令牌,请重复此过程。
要对适用于 Python 的 Databricks SQL 连接器进行身份验证,可使用以下代码片段。 此代码片段假定你已设置以下环境变量:
- 将
DATABRICKS_SERVER_HOSTNAME
设置为计算或 SQL 仓库的 服务器主机名 值,用于所有用途。 - 将
DATABRICKS_HTTP_PATH
设置为用于通用计算或 SQL 仓库的 HTTP 路径 值。 - 将
DATABRICKS_TOKEN
设置为 Microsoft Entra ID 令牌。
若要设置环境变量,请参阅操作系统的文档。
from databricks import sql
import os
with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN")) as connection:
# ...
OAuth 用户到计算机 (U2M) 身份验证
适用于 Python 的 Databricks SQL 连接器版本 2.7.0 及更高版本支持 OAuth 用户到计算机 (U2M) 身份验证。 还必须安装用于 Python 0.19.0 或更高版本的 Databricks SDK(例如通过运行 pip install databricks-sdk
或 python -m pip install databricks-sdk
)。
要使用 OAuth U2M 身份验证对适用于 Python 的 Databricks SQL 连接器进行身份验证,可使用以下代码片段。 OAuth U2M 身份验证使用实时人工登录和同意对目标 Azure Databricks 用户帐户进行身份验证。 此代码片段假定你已设置以下环境变量:
- 将
DATABRICKS_SERVER_HOSTNAME
设置为计算或 SQL 仓库的 服务器主机名 值,用于所有用途。 - 将
DATABRICKS_HTTP_PATH
设置为用于通用计算或 SQL 仓库的 HTTP 路径 值。
若要设置环境变量,请参阅操作系统的文档。
from databricks import sql
import os
with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
auth_type = "databricks-oauth") as connection:
# ...
示例
以下代码示例演示如何使用用于 Python 的 Databricks SQL 连接器来查询和插入数据、查询元数据、管理游标和连接,以及配置日志记录。
注意
以下代码示例演示如何使用 Azure Databricks 个人访问令牌进行身份验证。 若要改用其他可用的 Azure Databricks 身份验证类型,请参阅身份验证。
这些代码示例从以下环境变量中检索它们的 server_hostname
、http_path
和 access_token
连接变量值:
-
DATABRICKS_SERVER_HOSTNAME
,表示要求中的“服务器主机名”值。 -
DATABRICKS_HTTP_PATH
,表示要求中的“HTTP 路径”值。 -
DATABRICKS_TOKEN
,表示要求中的访问令牌。
可以使用其他方法来检索这些连接变量值。 使用环境变量只是众多方法中的一种。
设置用户代理
下面的代码示例演示如何设置 User-Agent 应用程序 product_name
进行使用情况跟踪。
from databricks import sql
import os
with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN"),
user_agent_entry = "product_name") as connection:
with connection.cursor() as cursor:
cursor.execute("SELECT 1 + 1")
result = cursor.fetchall()
for row in result:
print(row)
查询数据
下面的代码示例演示如何调用用于 Python 的 Databricks SQL 连接器,以便在通用计算或 SQL 仓库上运行基本 SQL 命令。 此命令返回 trips
目录的 samples
架构中的nyctaxi
表中的前两行。
from databricks import sql
import os
with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN")) as connection:
with connection.cursor() as cursor:
cursor.execute("SELECT * FROM samples.nyctaxi.trips LIMIT 2")
result = cursor.fetchall()
for row in result:
print(row)
插入数据
以下示例演示如何插入少量数据(数千行):
from databricks import sql
import os
with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN")) as connection:
with connection.cursor() as cursor:
cursor.execute("CREATE TABLE IF NOT EXISTS squares (x int, x_squared int)")
squares = [(i, i * i) for i in range(100)]
values = ",".join([f"({x}, {y})" for (x, y) in squares])
cursor.execute(f"INSERT INTO squares VALUES {values}")
cursor.execute("SELECT * FROM squares LIMIT 10")
result = cursor.fetchall()
for row in result:
print(row)
对于大量数据,应先将数据上传到云存储,然后执行 COPY INTO 命令。
查询元数据
可通过一些专用方法检索元数据。 以下示例检索有关示例表中的列的元数据:
from databricks import sql
import os
with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN")) as connection:
with connection.cursor() as cursor:
cursor.columns(schema_name="default", table_name="squares")
print(cursor.fetchall())
管理游标和连接
最好关闭不再使用的任何连接和游标。 这会释放 Azure Databricks 全用途计算和 Databricks SQL 仓库上的资源。
可以使用上下文管理器(在前面示例中使用的 with
语法)来管理资源,或显式调用 close
:
from databricks import sql
import os
connection = sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN"))
cursor = connection.cursor()
cursor.execute("SELECT * from range(10)")
print(cursor.fetchall())
cursor.close()
connection.close()
管理 Unity Catalog 卷中的文件
Databricks SQL 连接器可让你将本地文件写入 Unity Catalog 卷、从卷下载文件以及从卷中删除文件,如下例所示:
from databricks import sql
import os
# For writing local files to volumes and downloading files from volumes,
# you must set the staging_allows_local_path argument to the path to the
# local folder that contains the files to be written or downloaded.
# For deleting files in volumes, you must also specify the
# staging_allows_local_path argument, but its value is ignored,
# so in that case its value can be set for example to an empty string.
with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN"),
staging_allowed_local_path = "/tmp/") as connection:
with connection.cursor() as cursor:
# Write a local file to the specified path in a volume.
# Specify OVERWRITE to overwrite any existing file in that path.
cursor.execute(
"PUT '/temp/my-data.csv' INTO '/Volumes/main/default/my-volume/my-data.csv' OVERWRITE"
)
# Download a file from the specified path in a volume.
cursor.execute(
"GET '/Volumes/main/default/my-volume/my-data.csv' TO '/tmp/my-downloaded-data.csv'"
)
# Delete a file from the specified path in a volume.
cursor.execute(
"REMOVE '/Volumes/main/default/my-volume/my-data.csv'"
)
配置日志记录
Databricks SQL 连接器使用 Python 的标准日志记录模块。 可以配置类似于以下内容的日志记录级别:
from databricks import sql
import os, logging
logging.getLogger("databricks.sql").setLevel(logging.DEBUG)
logging.basicConfig(filename = "results.log",
level = logging.DEBUG)
connection = sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN"))
cursor = connection.cursor()
cursor.execute("SELECT * from range(10)")
result = cursor.fetchall()
for row in result:
logging.debug(row)
cursor.close()
connection.close()
测试
若要测试代码,请使用 Python 测试框架,例如 pytest。 若要在模拟条件下测试代码,而不调用 Azure Databricks REST API 终结点或更改 Azure Databricks 帐户或工作区的状态,可以使用 Python 模拟库(如 unittest.mock)。
例如,如果以下名为“helpers.py
”的文件,其中包含使用 Azure Databricks 个人访问令牌返回到 Azure Databricks 工作区的连接的 get_connection_personal_access_token
函数,以及使用连接从 select_nyctaxi_trips
目录的 trips
架构中的 samples
表中获取指定数目的数据行的 nyctaxi
函数:
# helpers.py
from databricks import sql
from databricks.sql.client import Connection, List, Row, Cursor
def get_connection_personal_access_token(
server_hostname: str,
http_path: str,
access_token: str
) -> Connection:
return sql.connect(
server_hostname = server_hostname,
http_path = http_path,
access_token = access_token
)
def select_nyctaxi_trips(
connection: Connection,
num_rows: int
) -> List[Row]:
cursor: Cursor = connection.cursor()
cursor.execute(f"SELECT * FROM samples.nyctaxi.trips LIMIT {num_rows}")
result: List[Row] = cursor.fetchall()
return result
另外,假设以下名为 main.py
的文件调用 get_connection_personal_access_token
和 select_nyctaxi_trips
函数:
# main.py
from databricks.sql.client import Connection, List, Row
import os
from helpers import get_connection_personal_access_token, select_nyctaxi_trips
connection: Connection = get_connection_personal_access_token(
server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN")
)
rows: List[Row] = select_nyctaxi_trips(
connection = connection,
num_rows = 2
)
for row in rows:
print(row)
以下名为 test_helpers.py
的文件测试 select_nyctaxi_trips
函数是否返回预期的响应。 此测试将模拟 Connection
对象,而不是创建与目标工作区的真实连接。 该测试还模拟一些符合真实数据中的架构和值的数据。 测试通过模拟连接返回模拟数据,然后检查其中一个模拟数据行的值是否与预期值匹配。
# test_helpers.py
import pytest
from databricks.sql.client import Connection, List, Row
from datetime import datetime
from helpers import select_nyctaxi_trips
from unittest.mock import create_autospec
@pytest.fixture
def mock_data() -> List[Row]:
return [
Row(
tpep_pickup_datetime = datetime(2016, 2, 14, 16, 52, 13),
tpep_dropoff_datetime = datetime(2016, 2, 14, 17, 16, 4),
trip_distance = 4.94,
fare_amount = 19.0,
pickup_zip = 10282,
dropoff_zip = 10171
),
Row(
tpep_pickup_datetime = datetime(2016, 2, 4, 18, 44, 19),
tpep_dropoff_datetime = datetime(2016, 2, 4, 18, 46),
trip_distance = 0.28,
fare_amount = 3.5,
pickup_zip = 10110,
dropoff_zip = 10110
)
]
def test_select_nyctaxi_trips(mock_data: List[Row]):
# Create a mock Connection.
mock_connection = create_autospec(Connection)
# Set the mock Connection's cursor().fetchall() to the mock data.
mock_connection.cursor().fetchall.return_value = mock_data
# Call the real function with the mock Connection.
response: List[Row] = select_nyctaxi_trips(
connection = mock_connection,
num_rows = 2)
# Check the value of one of the mocked data row's columns.
assert response[1].fare_amount == 3.5
select_nyctaxi_trips
函数包含 SELECT
语句,因此不会更改 trips
表的状态,在此示例中并不是一定需要模拟。 但是,模拟让你能够快速运行测试,而无需等待与工作区建立实际连接。 此外,通过模拟,可以多次针对可能更改表状态的函数运行模拟测试,例如 INSERT INTO
、UPDATE
和 DELETE FROM
。
API 参考
程序包
databricks-sql-connector
使用情况:pip install databricks-sql-connector
另请参阅 Python 包索引 (PyPI) 中的 databricks-sql-connector。
模块
databricks.sql
使用情况:from databricks import sql
类
选定的类包括:
类 |
---|
Connection 一种 Azure Databricks 计算资源的会话。 |
Cursor 一种遍历数据记录的机制。 |
Row SQL 查询结果中的一行数据。 |
Connection
类
若要创建 Connection
对象,请使用以下参数调用 databricks.sql.connect
方法:
参数 |
---|
server_hostname 类型: str 用于全用途计算或 SQL 仓库的服务器主机名。 要获取服务器主机名,请参阅本文前面部分的说明。 此参数是必需的。 示例: adb-1234567890123456.7.azuredatabricks.net |
http_path 类型: str 通用计算或 SQL 仓库的 HTTP 路径。 要获取 HTTP 路径,请参阅本文前面部分的说明。 此参数是必需的。 例如: sql/protocolv1/o/1234567890123456/1234-567890-test123 用于通用计算。SQL 仓库的 /sql/1.0/warehouses/a1b234c567d8e9fa 。 |
access_token 、auth_type 类型: str 有关 Azure Databricks 身份验证设置的信息。 有关详细信息,请参阅身份验证。 |
session_configuration 类型: dict[str, Any] Spark 会话配置参数的字典。 设置一个等效于使用 SET key=val SQL 命令的配置。 运行 SQL 命令 SET -v 可以获取可用配置的完整列表。默认为 None 。此参数是可选的。 示例: {"spark.sql.variable.substitute": True} |
http_headers 类型: List[Tuple[str, str]]] 在客户端发出的每个 RPC 请求的 HTTP 标头中设置的其他(键、值)对。 典型用法不会设置任何额外的 HTTP 标头。 默认为 None 。此参数是可选的。 从版本 2.0 开始 |
catalog 类型: str 用于连接的初始目录。 默认为 None (在这种情况下,将使用默认目录,通常为 hive_metastore )。此参数是可选的。 从版本 2.0 开始 |
schema 类型: str 用于连接的初始架构。 默认为 None (在这种情况下,将使用默认架构 default )。此参数是可选的。 从版本 2.0 开始 |
use_cloud_fetch 类型: bool True 将提取请求直接发送到云对象存储以下载数据区块。
False 默认将提取请求直接发送到 Azure Databricks。如果 use_cloud_fetch 设置为 True 但网络访问被阻止,则提取请求将失败。从版本 2.8 开始 |
user_agent_entry 类型: str 包含在 HTTP 请求标头中的 User-Agent 条目,用于跟踪使用情况。 默认为 PyDatabricksSqlConnector 。此参数是可选的。 自版本 4.0.1 起 |
选定的 Connection
方法包括:
方法 |
---|
close 关闭与数据库的连接,并释放服务器上所有关联的资源。 对此连接的任何其他调用都将引发 Error 。无参数。 没有返回值。 |
cursor 返回一个新的 Cursor 对象,该对象允许遍历数据库中的各种记录。无参数。 |
Cursor
类
若要创建 Cursor
对象,请调用 Connection
类的 cursor
方法。
选定的 Cursor
属性包括:
特性 |
---|
arraysize 与 fetchmany 方法一起使用,指定内部缓冲区大小,该大小也是一次性从服务器实际提取的行数。 默认值为 10000 。 对于窄结果(在结果中每行未包含大量数据),应增大该值以提高性能。读写访问。 |
description 包含 list 对象的 Python tuple 。 每个 tuple 对象都包含 7 个值,并且每个 tuple 对象的前 2 个项目包含如下所示的描述单个结果列的信息:
每个 7 项目 tuple 对象的剩余 5 个项目未实现,并且其值未定义。 它们通常以 4 个None 值的形式返回,后跟一个 True 值。只读访问。 |
选定的 Cursor
方法包括:
方法 |
---|
cancel 中断运行游标启动的任何数据库查询或命令。 要释放服务器上关联的资源,请在 调用 close 方法后,调用 cancel 方法。无参数。 没有返回值。 |
close 关闭游标并释放服务器上关联的资源。 关闭已关闭的游标可能会引发错误。 无参数。 没有返回值。 |
execute 准备并运行数据库查询或命令。 没有返回值。 参数: operation 类型: str 要准备并运行的查询或命令。 此参数是必需的。 不使用 parameters 参数的示例:cursor.execute( 'SELECT * FROM samples.nyctaxi.trips WHERE pickup_zip="10019" LIMIT 2' ) 不使用 parameters 参数的示例:cursor.execute( 'SELECT * FROM samples.nyctaxi.trips WHERE zip=%(pickup_zip)s LIMIT 2', { 'pickup_zip': '10019' } ) parameters 类型:字典 要与 operation 参数一起使用的参数序列。此参数是可选的。 默认为 None 。 |
executemany 使用 seq_of_parameters 实参中的所有形参序列来准备并运行数据库查询或命令。 仅保留最终结果集。没有返回值。 参数: operation 类型: str 要准备并运行的查询或命令。 此参数是必需的。 seq_of_parameters 类型: list 的 dict 与 operation 参数一起使用的多个参数集的序列。此参数是必需的。 |
catalogs 执行有关目录的元数据查询。 然后应使用 fetchmany 或 fetchall 获取实际结果。结果集中的重要字段包括:
无参数。 没有返回值。 从版本 1.0 开始 |
schemas 执行有关架构的元数据查询。 然后应使用 fetchmany 或 fetchall 获取实际结果。结果集中的重要字段包括:
没有返回值。 从版本 1.0 开始 参数: catalog_name 类型: str 要检索其相关信息的目录名称。 % 字符解释为通配符。此参数是可选的。 schema_name 类型: str 要检索其相关信息的架构名称。 % 字符解释为通配符。此参数是可选的。 |
tables 执行有关表和视图的元数据查询。 然后应使用 fetchmany 或 fetchall 获取实际结果。结果集中的重要字段包括:
没有返回值。 从版本 1.0 开始 参数 catalog_name 类型: str 要检索其相关信息的目录名称。 % 字符解释为通配符。此参数是可选的。 schema_name 类型: str 要检索其相关信息的架构名称。 % 字符解释为通配符。此参数是可选的。 table_name 类型: str 要检索其相关信息的表名称。 % 字符解释为通配符。此参数是可选的。 table_types 类型: List[str] 要匹配的表类型列表,例如 TABLE 或 VIEW 。此参数是可选的。 |
columns 执行有关列的元数据查询。 然后应使用 fetchmany 或 fetchall 获取实际结果。结果集中的重要字段包括:
没有返回值。 从版本 1.0 开始 参数: catalog_name 类型: str 要检索其相关信息的目录名称。 % 字符解释为通配符。此参数是可选的。 schema_name 类型: str 要检索其相关信息的架构名称。 % 字符解释为通配符。此参数是可选的。 table_name 类型: str 要检索其相关信息的表名称。 % 字符解释为通配符。此参数是可选的。 column_name 类型: str 要检索其相关信息的列名称。 % 字符解释为通配符。此参数是可选的。 |
fetchall 获取查询的所有(或所有剩余)行。 无参数。 以 Python list 形式返回查询的所有(或所有剩余)行,该形式属于 Row 对象。如果之前对 Error 方法的调用未返回任何数据或尚未进行 execute 调用,则会引发 execute 。 |
fetchmany 获取查询的后续行。 以 size 对象的 Python arraysize 的形式返回查询后续行中 size 行及以前的行(如果没有指定 list ,则返回 Row 属性)。如果要提取的行数少于 size ,将返回所有剩余的行。如果之前对 Error 方法的调用未返回任何数据或尚未进行 execute 调用,则会引发 execute 。参数: size 类型: int 要获取的后续行数。 此参数是可选的。 如果未指定,则使用 arraysize 属性的值。示例: cursor.fetchmany(10) |
fetchone 获取数据集的下一行。 无参数。 将数据集的下一行以 Python tuple 对象的形式作为单个序列返回,或者如果没有更多可用数据,则返回 None 。如果之前对 Error 方法的调用未返回任何数据或尚未进行 execute 调用,则会引发 execute 。 |
fetchall_arrow 以 PyArrow Table 对象的形式获取查询的所有(或所有剩余)行。 返回大量数据的查询应改用 fetchmany_arrow ,以减少内存消耗。无参数。 以 PyArrow 表的形式返回查询的所有(或所有剩余)行。 如果之前对 Error 方法的调用未返回任何数据或尚未进行 execute 调用,则会引发 execute 。从版本 2.0 开始 |
fetchmany_arrow 以 PyArrow Table 对象的形式获取查询后续行。以 Python PyArrow 的形式返回查询后续行中 size 参数行及以前的行(如果没有指定 arraysize ,则返回 size 属性)Table 对象。如果之前对 Error 方法的调用未返回任何数据或尚未进行 execute 调用,则会引发 execute 。从版本 2.0 开始 参数: size 类型: int 要获取的后续行数。 此参数是可选的。 如果未指定,则使用 arraysize 属性的值。示例: cursor.fetchmany_arrow(10) |
Row
类
行类是一个类似于元组的数据结构,表示单个结果行。
如果行包含名为 "my_column"
的列,则你可以通过 "my_column"
访问 row
的 row.my_column
字段。 还可以使用数字索引来访问字段,例如 row[0]
。
如果不允许将列名称用作属性方法名称(例如,它以数字开头),则可以将字段作为 row["1_my_column"]
来访问。
从版本 1.0 开始
选定的 Row
方法包括:
| asDict
返回行的字典表示形式,此值按字段名称编制索引。 如果存在重复的字段名称,则在字典中返回一个重复字段(但只有一个)。 未定义返回哪个重复字段。
无参数。
返回字段的 dict
。 |
类型转换
下表将 Apache Spark SQL 数据类型映射到其 Python 数据类型等效项。
Apache Spark SQL 数据类型 | Python 数据类型 |
---|---|
array |
numpy.ndarray |
bigint |
int |
binary |
bytearray |
boolean |
bool |
date |
datetime.date |
decimal |
decimal.Decimal |
double |
float |
int |
int |
map |
str |
null |
NoneType |
smallint |
int |
string |
str |
struct |
str |
timestamp |
datetime.datetime |
tinyint |
int |
疑难解答
tokenAuthWrapperInvalidAccessToken: Invalid access token
消息
问题:运行代码时看到类似于 的消息。
可能的原因:传递给 的值不是有效的 Azure Databricks 个人访问令牌。
建议的解决方法:检查传递给 的值是否正确,然后重试。
gaierror(8, 'nodename nor servname provided, or not known')
消息
问题:运行代码时看到类似于 的消息。
可能的原因:传递给 的值不是正确的主机名。
建议的解决方法:检查传递给 的值是否正确,然后重试。
有关查找服务器主机名的详细信息,请参阅获取 Azure Databricks 计算资源的连接详细信息。
IpAclError
消息
问题:在运行代码期间,尝试使用 Azure Databricks 笔记本上的连接器时看到消息 。
可能的原因:可能为 Azure Databricks 工作区启用了 IP 允许列表。 使用 IP 允许列表时,默认不允许从 Spark 群集连接回到控制平面。
建议的解决方法:请管理员将计算平面子网添加到 IP 允许列表。
其他资源
有关详细信息,请参阅: