在本快速入门中,你将使用 mssql-python 驱动程序在数据库之间批量复制数据。 应用程序使用 Apache Arrow 将源数据库架构中的表下载到本地 Parquet 文件,然后使用高性能 bulkcopy 方法将其上传到目标数据库。 可以使用此模式在 Fabric 中的 SQL Server、Azure SQL 数据库和 SQL 数据库之间迁移、复制或转换数据。
驱动程序 mssql-python 不需要 Windows 计算机上的任何外部依赖项。 驱动程序通过单个 pip 安装来安装它所需的一切,使你能够将最新版本的驱动程序用于新脚本,同时不会影响那些没有时间升级和测试的其他脚本。
mssql-python 文档 | mssql-python 源代码 | 包 (PyPI) | Uv
先决条件
Python 3
如果还没有 Python,请从 python.org 安装 Python 运行时和 pip 包管理器。
不想使用自己的环境? 使用 GitHub Codespaces 以 devcontainer 身份打开。
具有以下扩展的 Visual Studio Code:
Azure Command-Line 接口(CLI) 用于在 macOS 和 Linux 上进行无密码身份验证。
如果尚未安装
uv,请按照 安装说明操作。在 SQL Server、Azure SQL 数据库或 Fabric 的 SQL 数据库中,包含
AdventureWorks2025示例架构和有效连接字符串的源数据库。具有有效连接字符串的目标数据库,可以是 SQL Server、Azure SQL 数据库或 Fabric 中的 SQL 数据库。 用户必须具有创建和写入表的权限。 如果没有第二个数据库,可以将目标连接字符串更改为指向同一数据库,并为目标表使用不同的架构。
安装一次性操作系统特定的先决条件。
创建 SQL 数据库
本快速入门需要 AdventureWorks2025 轻型 架构作为源数据库。
创建项目并运行代码
创建新项目
在开发目录中打开命令提示符。 如果没有目录,请创建名为“
pythonscripts等”的新目录。避免 OneDrive 上的文件夹,同步可能会干扰管理虚拟环境。-
uv init mssql-python-bcp-qs cd mssql-python-bcp-qs
添加依赖项
在同一目录中,安装mssql-python、python-dotenv和pyarrow包。
uv add mssql-python python-dotenv pyarrow
启动 Visual Studio Code
在同一目录中运行以下命令。
code .
更新 pyproject.toml
pyproject.toml 包含项目的元数据。 在喜欢的编辑器中打开该文件。
查看文件的内容。 它应类似于此示例。 请注意,Python 版本及其依赖项通过
mssql-python使用>=来定义最低版本。 如果更喜欢确切的版本,请将>=版本号之前的版本号更改为==。 然后,每个包的解析版本存储在 uv.lock 中。 锁定文件确保参与项目的开发人员使用一致的包版本。 它还可确保在将包分发给最终用户时使用完全相同的包版本集。 不应编辑uv.lock该文件。[project] name = "mssql-python-bcp-qs" version = "0.1.0" description = "Add your description here" readme = "README.md" requires-python = ">=3.11" dependencies = [ "mssql-python>=1.4.0", "python-dotenv>=1.1.1", "pyarrow>=19.0.0", ]更新说明以更具描述性。
description = "Bulk copies data between SQL databases using mssql-python and Apache Arrow"保存并关闭该文件。
更新 main.py
打开名为
main.py的文件。 它应类似于此示例。def main(): print("Hello from mssql-python-bcp-qs!") if __name__ == "__main__": main()将
main.py以下内容的内容替换为以下代码块。 每个块都基于上一个块生成,应按顺序排列main.py。小窍门
如果 Visual Studio Code 在解决包时遇到问题,则需要 更新解释器以使用虚拟环境。
在顶部
main.py处添加导入和常量。 该脚本使用mssql_python进行数据库连接,使用pyarrow和pyarrow.parquet处理列式数据和 Parquet 文件 I/O,使用python-dotenv从.env文件加载连接字符串,并使用已编译的正则表达式模式来验证 SQL 标识符以防止注入。"""Round-trip: download tables from a source DB/schema to parquet, upload to a destination DB/schema.""" import os import re import time from uuid import UUID import pyarrow as pa import pyarrow.parquet as pq from dotenv import load_dotenv import mssql_python BATCH_SIZE = 64_000 _SAFE_IDENT = re.compile(r"^[A-Za-z0-9_]+$") def _validate_ident(name: str) -> str: if not _SAFE_IDENT.match(name): raise ValueError(f"Unsafe SQL identifier: {name!r}") return name请在导入部分下面添加 SQL 到 Arrow 类型的映射。 此字典将 SQL Server 列类型转换为其 Apache Arrow 等效项,以便在写入 Parquet 时保留数据保真度。 这两个帮助程序函数从
NVARCHAR(100)元数据生成确切的 SQL 类型字符串(例如,DECIMAL(18,2)或INFORMATION_SCHEMA)并解析每个列的匹配箭头类型。_SQL_TO_ARROW = { "bit": pa.bool_(), "tinyint": pa.uint8(), "smallint": pa.int16(), "int": pa.int32(), "bigint": pa.int64(), "float": pa.float64(), "real": pa.float32(), "smallmoney": pa.decimal128(10, 4), "money": pa.decimal128(19, 4), "date": pa.date32(), "datetime": pa.timestamp("ms"), "datetime2": pa.timestamp("us"), "smalldatetime": pa.timestamp("s"), "uniqueidentifier": pa.string(), "xml": pa.string(), "image": pa.binary(), "binary": pa.binary(), "varbinary": pa.binary(), "timestamp": pa.binary(), } def _sql_type_str(data_type: str, max_length: int, precision: int, scale: int) -> str: """Build the exact SQL type string from INFORMATION_SCHEMA metadata.""" dt = data_type.lower() if dt in ("char", "varchar", "nchar", "nvarchar", "binary", "varbinary"): length = "MAX" if max_length == -1 else str(max_length) return f"{dt.upper()}({length})" if dt in ("decimal", "numeric"): return f"{dt.upper()}({precision},{scale})" return dt.upper() def _arrow_type(sql_type: str, precision: int, scale: int) -> pa.DataType: sql_type = sql_type.lower() if sql_type in _SQL_TO_ARROW: return _SQL_TO_ARROW[sql_type] if sql_type in ("decimal", "numeric"): return pa.decimal128(precision, scale) if sql_type in ("char", "varchar", "nchar", "nvarchar", "text", "ntext", "sysname"): return pa.string() return pa.string() def _convert_value(v): """Convert a SQL value to an Arrow-compatible Python type.""" if isinstance(v, UUID): return str(v) return v添加架构反省和 DDL 生成函数。
_get_arrow_schema使用参数化查询对INFORMATION_SCHEMA.COLUMNS进行查询,构建箭头式架构,并将原始 SQL 类型存储为字段元数据,以便能够通过精确的列定义重新创建目标表。_create_table_ddl读取该元数据以生成DROP/CREATE TABLEDDL。timestamp(rowversion)类型被重新映射到VARBINARY(8),因为该类型是自动生成的,无法直接插入。def _get_arrow_schema(cursor, schema_name: str, table_name: str) -> pa.Schema: """Build an Arrow schema from INFORMATION_SCHEMA.COLUMNS. Stores the original SQL type as field metadata so the round-trip CREATE TABLE can reproduce exact column definitions. """ cursor.execute( "SELECT COLUMN_NAME, DATA_TYPE, " "COALESCE(CHARACTER_MAXIMUM_LENGTH, 0), " "COALESCE(NUMERIC_PRECISION, 0), " "COALESCE(NUMERIC_SCALE, 0), " "IS_NULLABLE " "FROM INFORMATION_SCHEMA.COLUMNS " "WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? " "ORDER BY ORDINAL_POSITION", (schema_name, table_name), ) rows = cursor.fetchall() if not rows: raise ValueError(f"No columns found for {schema_name}.{table_name}") fields = [] for col_name, data_type, max_len, precision, scale, nullable in rows: arrow_t = _arrow_type(data_type, precision, scale) sql_t = _sql_type_str(data_type, max_len, precision, scale) fields.append( pa.field( col_name, arrow_t, nullable=(nullable == "YES"), metadata={"sql_type": sql_t}, ) ) return pa.schema(fields) def _create_table_ddl(target: str, schema: pa.Schema) -> str: """Build DROP/CREATE TABLE DDL from Arrow schema with SQL type metadata.""" col_defs = [] for f in schema: sql_t = f.metadata[b"sql_type"].decode() # timestamp/rowversion is auto-generated and not insertable if sql_t == "TIMESTAMP": sql_t = "VARBINARY(8)" null = "" if f.nullable else " NOT NULL" col_defs.append(f"[{f.name}] {sql_t}{null}") col_defs_str = ",\n ".join(col_defs) return ( f"IF OBJECT_ID('{target}', 'U') IS NOT NULL DROP TABLE {target};\n" f"CREATE TABLE {target} (\n {col_defs_str}\n);" )添加下载函数。
download_table使用fetchmany()以批处理的方式从源表流式传输行,将每个值转换为兼容 Arrow 的 Python 类型,然后使用ParquetWriter将记录批增量写入本地 Parquet 文件。 无论表大小如何,这都会保持内存的边界。 该函数使用两个单独的游标:一个用于读取列元数据,另一个用于流式传输数据。def download_table(conn, schema_name: str, table_name: str, parquet_file: str) -> int: """Download a SQL table to a parquet file. Returns row count (0 if empty).""" _validate_ident(table_name) source = f"{schema_name}.[{table_name}]" with conn.cursor() as cursor: schema = _get_arrow_schema(cursor, schema_name, table_name) n_cols = len(schema) row_count = 0 t0 = time.perf_counter() with conn.cursor() as cursor: cursor.execute(f"SELECT * FROM {source}") writer = None try: while True: rows = cursor.fetchmany(BATCH_SIZE) if not rows: break columns = [[] for _ in range(n_cols)] for row in rows: for i in range(n_cols): columns[i].append(_convert_value(row[i])) arrays = [ pa.array(columns[i], type=schema.field(i).type) for i in range(n_cols) ] batch = pa.record_batch(arrays, schema=schema) if writer is None: writer = pq.ParquetWriter(parquet_file, schema) writer.write_batch(batch) row_count += len(rows) finally: if writer is not None: writer.close() if row_count == 0: return 0 elapsed = time.perf_counter() - t0 rate = f"{int(row_count / elapsed):,} rows/sec" if elapsed > 0 else "n/a" print( f"{schema_name}.{table_name} → {parquet_file}: {row_count:,} rows downloaded " f"in {elapsed:.2f}s ({rate})" ) return row_count添加扩充挂钩。
enrich_parquet是一个占位符,你可以在上传数据之前向数据添加转换、派生列或联接。 在本快速入门中,这实际上是一个不执行任何操作(no-op)的过程,即返回未更改的文件路径。def enrich_parquet(parquet_file: str) -> str: """Enrich a parquet file before upload. Returns the (possibly new) file path.""" # TODO: add transformations, derived columns, joins, etc. print(f"Enriching {parquet_file} (no-op)") return parquet_file添加上传函数。
upload_parquet从 Parquet 文件读取 Arrow 模式,生成并执行DROP/CREATE TABLEDDL 来设置目标环境,然后分批读取文件并调用cursor.bulkcopy()来实现高性能的批量插入。 该table_lock=True选项通过最大程度地减少锁争用来提高吞吐量。 上传完成后,该函数将运行一个SELECT COUNT(*)来核对行计数是否匹配。def upload_parquet(conn, parquet_file: str, target: str) -> int: """Upload a parquet file into a SQL table via BCP. Returns row count.""" # ── Create target table from parquet schema ── pf_schema = pq.read_schema(parquet_file) with conn.cursor() as cursor: cursor.execute(_create_table_ddl(target, pf_schema)) conn.commit() # ── Bulk insert ── uploaded = 0 t0 = time.perf_counter() with pq.ParquetFile(parquet_file) as pf: with conn.cursor() as cursor: for batch in pf.iter_batches(batch_size=BATCH_SIZE): rows = zip(*(col.to_pylist() for col in batch.columns)) cursor.bulkcopy( target, rows, batch_size=BATCH_SIZE, table_lock=True, timeout=3600, ) uploaded += batch.num_rows elapsed = time.perf_counter() - t0 # ── Verify ── with conn.cursor() as cursor: cursor.execute(f"SELECT COUNT(*) FROM {target}") count = cursor.fetchone()[0] rate = f"{int(uploaded / elapsed):,} rows/sec" if elapsed > 0 else "n/a" print( f"{parquet_file} → {target}: {uploaded:,} rows uploaded " f"in {elapsed:.2f}s ({rate}) " f"| verified: {count:,}" ) return uploaded添加编排功能。
transfer_tables将三个阶段联系在一起。 它连接到源数据库,通过以下方式INFORMATION_SCHEMA.TABLES发现给定架构中的所有基表:将每个基表下载到本地 Parquet 文件,运行扩充挂钩,然后连接到目标数据库并上传每个文件。def transfer_tables( source_conn_str: str, dest_conn_str: str, source_schema: str, dest_schema: str, ) -> None: """Download all tables from source DB/schema to parquet, upload to dest DB/schema.""" _validate_ident(source_schema) _validate_ident(dest_schema) parquet_dir = source_schema os.makedirs(parquet_dir, exist_ok=True) # ── Download from source ── with mssql_python.connect(source_conn_str) as src_conn: with src_conn.cursor() as cursor: cursor.execute( "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES " "WHERE TABLE_SCHEMA = ? AND TABLE_TYPE = 'BASE TABLE' " "ORDER BY TABLE_NAME", (source_schema,), ) tables = [row[0] for row in cursor.fetchall()] print(f"Found {len(tables)} {source_schema} tables: {', '.join(tables)}\n") parquet_files = [] for table_name in tables: parquet_file = os.path.join(parquet_dir, f"{table_name}.parquet") row_count = download_table(src_conn, source_schema, table_name, parquet_file) if row_count == 0: print(f"{source_schema}.{table_name}: empty, skipping") else: parquet_files.append((table_name, parquet_file)) # ── Enrich parquet files ── enriched = [] for table_name, parquet_file in parquet_files: enriched.append((table_name, enrich_parquet(parquet_file))) # ── Upload to destination ── with mssql_python.connect(dest_conn_str) as dest_conn: for table_name, parquet_file in enriched: target = f"{dest_schema}.[{table_name}]" upload_parquet(dest_conn, parquet_file, target)最后,添加
main入口点。 它会加载.env文件、使用源连接字符串和目标连接字符串进行调用transfer_tables,并输出总运行时间。def main(): load_dotenv() t_start = time.perf_counter() transfer_tables( source_conn_str=os.environ["SOURCE_CONNECTION_STRING"], dest_conn_str=os.environ["DEST_CONNECTION_STRING"], source_schema="SalesLT", dest_schema="dbo", ) print(f"Total: {time.perf_counter() - t_start:.2f}s") if __name__ == "__main__": main()保存并关闭
main.py。
保存连接字符串
打开
.gitignore文件,并为.env文件添加一个排除项。 文件应类似于此示例。 完成后,请务必保存并关闭它。# Python-generated files __pycache__/ *.py[oc] build/ dist/ wheels/ *.egg-info # Virtual environments .venv # Connection strings and secrets .env在当前目录中,创建一个名为
.env的新文件。在
.env文件中,为源连接字符串和目标连接字符串添加条目。 将占位符值替换为实际的服务器和数据库名称。SOURCE_CONNECTION_STRING="Server=<source_server_name>;Database=<source_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive" DEST_CONNECTION_STRING="Server=<dest_server_name>;Database=<dest_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive"小窍门
此处使用的连接字符串在很大程度上取决于要连接到的 SQL 数据库的类型。 如果要连接到 Fabric 中的 Azure SQL 数据库 或 SQL 数据库,请使用连接字符串选项卡中的 ODBC 连接字符串。可能需要根据方案调整身份验证类型。 有关连接字符串及其语法的详细信息,请参阅 连接字符串语法参考。
小窍门
在 macOS 上, ActiveDirectoryInteractive 和 ActiveDirectoryDefault 都可用于 Microsoft Entra 身份验证。
ActiveDirectoryInteractive 每次运行脚本时都会提示你登录。 若要避免重复的登录提示,请通过 Azure CLI 运行 az login 进行一次登录,然后使用 ActiveDirectoryDefault 来重用缓存的凭据。
使用 uv run 命令执行脚本
在之前所在的终端窗口中,或打开同一目录的新终端窗口,运行以下命令。
uv run main.py下面是脚本完成时的预期输出。
Found 12 SalesLT tables: Address, Customer, CustomerAddress, ... SalesLT.Address → SalesLT/Address.parquet: 450 rows downloaded in 0.15s (3,000 rows/sec) ... SalesLT/Address.parquet → dbo.[Address]: 450 rows uploaded in 0.10s (4,500 rows/sec) | verified: 450 ... Total: 2.35s使用 SQL Server Management Studio(SSMS) 或 VS Code 的 MSSQL 扩展 连接到目标数据库,并验证表和数据是否已成功创建。
若要将脚本部署到另一台计算机,请将除文件夹以外的
.venv所有文件复制到另一台计算机。 第一次运行时重新创建虚拟环境。
代码的工作原理
应用程序在三个阶段中执行完整往返数据传输:
-
下载:连接到源数据库,从
INFORMATION_SCHEMA.COLUMNS中读取列元数据,生成 Apache Arrow 架构,然后将每个表下载到本地 Parquet 文件中。 -
扩充 (可选):提供挂钩(
enrich_parquet),可在其中添加转换、派生列或联接,然后再上传。 -
上传:批量读取每个 Parquet 文件,使用从 Arrow 架构元数据生成的 DDL 重新创建目标数据库中的表,然后使用
cursor.bulkcopy()进行高性能大容量插入。
后续步骤
访问mssql-python 驱动程序的 GitHub 存储库以获取更多示例,并贡献想法或报告问题。