通过


快速入门:使用适用于 Python 的 mssql-python 驱动程序批量复制

在本快速入门中,你将使用 mssql-python 驱动程序在数据库之间批量复制数据。 应用程序使用 Apache Arrow 将源数据库架构中的表下载到本地 Parquet 文件,然后使用高性能 bulkcopy 方法将其上传到目标数据库。 可以使用此模式在 Fabric 中的 SQL Server、Azure SQL 数据库和 SQL 数据库之间迁移、复制或转换数据。

驱动程序 mssql-python 不需要 Windows 计算机上的任何外部依赖项。 驱动程序通过单个 pip 安装来安装它所需的一切,使你能够将最新版本的驱动程序用于新脚本,同时不会影响那些没有时间升级和测试的其他脚本。

mssql-python 文档 | mssql-python 源代码 | 包 (PyPI) | Uv

先决条件

  • Python 3

    • 如果还没有 Python,请从 python.org 安装 Python 运行时pip 包管理器

    • 不想使用自己的环境? 使用 GitHub Codespaces 以 devcontainer 身份打开。

  • 具有以下扩展的 Visual Studio Code

  • Azure Command-Line 接口(CLI) 用于在 macOS 和 Linux 上进行无密码身份验证。

  • 如果尚未安装 uv,请按照 安装说明操作

  • 在 SQL Server、Azure SQL 数据库或 Fabric 的 SQL 数据库中,包含 AdventureWorks2025 示例架构和有效连接字符串的源数据库。

  • 具有有效连接字符串的目标数据库,可以是 SQL Server、Azure SQL 数据库或 Fabric 中的 SQL 数据库。 用户必须具有创建和写入表的权限。 如果没有第二个数据库,可以将目标连接字符串更改为指向同一数据库,并为目标表使用不同的架构。

  • 安装一次性操作系统特定的先决条件。

    apk add libtool krb5-libs krb5-dev
    

创建 SQL 数据库

本快速入门需要 AdventureWorks2025 轻型 架构作为源数据库。

创建项目并运行代码

  1. 创建新项目
  2. 添加依赖项
  3. 启动 Visual Studio Code
  4. 更新 pyproject.toml
  5. 更新 main.py
  6. 保存连接字符串
  7. 使用 uv 运行执行脚本

创建新项目

  1. 在开发目录中打开命令提示符。 如果没有目录,请创建名为“pythonscripts等”的新目录。避免 OneDrive 上的文件夹,同步可能会干扰管理虚拟环境。

  2. 使用 . 创建一个新uv

    uv init mssql-python-bcp-qs
    cd mssql-python-bcp-qs
    

添加依赖项

在同一目录中,安装mssql-pythonpython-dotenvpyarrow包。

uv add mssql-python python-dotenv pyarrow

启动 Visual Studio Code

在同一目录中运行以下命令。

code .

更新 pyproject.toml

  1. pyproject.toml 包含项目的元数据。 在喜欢的编辑器中打开该文件。

  2. 查看文件的内容。 它应类似于此示例。 请注意,Python 版本及其依赖项通过mssql-python 使用>=来定义最低版本。 如果更喜欢确切的版本,请将 >= 版本号之前的版本号更改为 ==。 然后,每个包的解析版本存储在 uv.lock 中。 锁定文件确保参与项目的开发人员使用一致的包版本。 它还可确保在将包分发给最终用户时使用完全相同的包版本集。 不应编辑 uv.lock 该文件。

    [project]
    name = "mssql-python-bcp-qs"
    version = "0.1.0"
    description = "Add your description here"
    readme = "README.md"
    requires-python = ">=3.11"
    dependencies = [
        "mssql-python>=1.4.0",
        "python-dotenv>=1.1.1",
        "pyarrow>=19.0.0",
    ]
    
  3. 更新说明以更具描述性。

    description = "Bulk copies data between SQL databases using mssql-python and Apache Arrow"
    
  4. 保存并关闭该文件。

更新 main.py

  1. 打开名为main.py的文件。 它应类似于此示例。

    def main():
        print("Hello from mssql-python-bcp-qs!")
    
    if __name__ == "__main__":
        main()
    
  2. main.py 以下内容的内容替换为以下代码块。 每个块都基于上一个块生成,应按顺序排列 main.py

    小窍门

    如果 Visual Studio Code 在解决包时遇到问题,则需要 更新解释器以使用虚拟环境

  3. 在顶部main.py处添加导入和常量。 该脚本使用 mssql_python 进行数据库连接,使用 pyarrowpyarrow.parquet 处理列式数据和 Parquet 文件 I/O,使用 python-dotenv.env 文件加载连接字符串,并使用已编译的正则表达式模式来验证 SQL 标识符以防止注入。

    """Round-trip: download tables from a source DB/schema to parquet, upload to a destination DB/schema."""
    
    import os
    import re
    import time
    from uuid import UUID
    
    import pyarrow as pa
    import pyarrow.parquet as pq
    from dotenv import load_dotenv
    import mssql_python
    
    BATCH_SIZE = 64_000
    _SAFE_IDENT = re.compile(r"^[A-Za-z0-9_]+$")
    
    
    def _validate_ident(name: str) -> str:
        if not _SAFE_IDENT.match(name):
            raise ValueError(f"Unsafe SQL identifier: {name!r}")
        return name
    
  4. 请在导入部分下面添加 SQL 到 Arrow 类型的映射。 此字典将 SQL Server 列类型转换为其 Apache Arrow 等效项,以便在写入 Parquet 时保留数据保真度。 这两个帮助程序函数从NVARCHAR(100)元数据生成确切的 SQL 类型字符串(例如,DECIMAL(18,2)INFORMATION_SCHEMA)并解析每个列的匹配箭头类型。

    _SQL_TO_ARROW = {
        "bit": pa.bool_(),
        "tinyint": pa.uint8(),
        "smallint": pa.int16(),
        "int": pa.int32(),
        "bigint": pa.int64(),
        "float": pa.float64(),
        "real": pa.float32(),
        "smallmoney": pa.decimal128(10, 4),
        "money": pa.decimal128(19, 4),
        "date": pa.date32(),
        "datetime": pa.timestamp("ms"),
        "datetime2": pa.timestamp("us"),
        "smalldatetime": pa.timestamp("s"),
        "uniqueidentifier": pa.string(),
        "xml": pa.string(),
        "image": pa.binary(),
        "binary": pa.binary(),
        "varbinary": pa.binary(),
        "timestamp": pa.binary(),
    }
    
    
    def _sql_type_str(data_type: str, max_length: int, precision: int, scale: int) -> str:
        """Build the exact SQL type string from INFORMATION_SCHEMA metadata."""
        dt = data_type.lower()
        if dt in ("char", "varchar", "nchar", "nvarchar", "binary", "varbinary"):
            length = "MAX" if max_length == -1 else str(max_length)
            return f"{dt.upper()}({length})"
        if dt in ("decimal", "numeric"):
            return f"{dt.upper()}({precision},{scale})"
        return dt.upper()
    
    
    def _arrow_type(sql_type: str, precision: int, scale: int) -> pa.DataType:
        sql_type = sql_type.lower()
        if sql_type in _SQL_TO_ARROW:
            return _SQL_TO_ARROW[sql_type]
        if sql_type in ("decimal", "numeric"):
            return pa.decimal128(precision, scale)
        if sql_type in ("char", "varchar", "nchar", "nvarchar", "text", "ntext", "sysname"):
            return pa.string()
        return pa.string()
    
    
    def _convert_value(v):
        """Convert a SQL value to an Arrow-compatible Python type."""
        if isinstance(v, UUID):
            return str(v)
        return v
    
  5. 添加架构反省和 DDL 生成函数。 _get_arrow_schema 使用参数化查询对 INFORMATION_SCHEMA.COLUMNS 进行查询,构建箭头式架构,并将原始 SQL 类型存储为字段元数据,以便能够通过精确的列定义重新创建目标表。 _create_table_ddl 读取该元数据以生成 DROP/CREATE TABLE DDL。 timestamp(rowversion)类型被重新映射到VARBINARY(8),因为该类型是自动生成的,无法直接插入。

    def _get_arrow_schema(cursor, schema_name: str, table_name: str) -> pa.Schema:
        """Build an Arrow schema from INFORMATION_SCHEMA.COLUMNS.
    
        Stores the original SQL type as field metadata so the round-trip
        CREATE TABLE can reproduce exact column definitions.
        """
        cursor.execute(
            "SELECT COLUMN_NAME, DATA_TYPE, "
            "COALESCE(CHARACTER_MAXIMUM_LENGTH, 0), "
            "COALESCE(NUMERIC_PRECISION, 0), "
            "COALESCE(NUMERIC_SCALE, 0), "
            "IS_NULLABLE "
            "FROM INFORMATION_SCHEMA.COLUMNS "
            "WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? "
            "ORDER BY ORDINAL_POSITION",
            (schema_name, table_name),
        )
        rows = cursor.fetchall()
        if not rows:
            raise ValueError(f"No columns found for {schema_name}.{table_name}")
        fields = []
        for col_name, data_type, max_len, precision, scale, nullable in rows:
            arrow_t = _arrow_type(data_type, precision, scale)
            sql_t = _sql_type_str(data_type, max_len, precision, scale)
            fields.append(
                pa.field(
                    col_name, arrow_t,
                    nullable=(nullable == "YES"),
                    metadata={"sql_type": sql_t},
                )
            )
        return pa.schema(fields)
    
    
    def _create_table_ddl(target: str, schema: pa.Schema) -> str:
        """Build DROP/CREATE TABLE DDL from Arrow schema with SQL type metadata."""
        col_defs = []
        for f in schema:
            sql_t = f.metadata[b"sql_type"].decode()
            # timestamp/rowversion is auto-generated and not insertable
            if sql_t == "TIMESTAMP":
                sql_t = "VARBINARY(8)"
            null = "" if f.nullable else " NOT NULL"
            col_defs.append(f"[{f.name}] {sql_t}{null}")
        col_defs_str = ",\n    ".join(col_defs)
        return (
            f"IF OBJECT_ID('{target}', 'U') IS NOT NULL DROP TABLE {target};\n"
            f"CREATE TABLE {target} (\n    {col_defs_str}\n);"
        )
    
  6. 添加下载函数。 download_table 使用 fetchmany() 以批处理的方式从源表流式传输行,将每个值转换为兼容 Arrow 的 Python 类型,然后使用 ParquetWriter 将记录批增量写入本地 Parquet 文件。 无论表大小如何,这都会保持内存的边界。 该函数使用两个单独的游标:一个用于读取列元数据,另一个用于流式传输数据。

    def download_table(conn, schema_name: str, table_name: str, parquet_file: str) -> int:
        """Download a SQL table to a parquet file. Returns row count (0 if empty)."""
        _validate_ident(table_name)
        source = f"{schema_name}.[{table_name}]"
    
        with conn.cursor() as cursor:
            schema = _get_arrow_schema(cursor, schema_name, table_name)
    
        n_cols = len(schema)
        row_count = 0
        t0 = time.perf_counter()
    
        with conn.cursor() as cursor:
            cursor.execute(f"SELECT * FROM {source}")
            writer = None
            try:
                while True:
                    rows = cursor.fetchmany(BATCH_SIZE)
                    if not rows:
                        break
                    columns = [[] for _ in range(n_cols)]
                    for row in rows:
                        for i in range(n_cols):
                            columns[i].append(_convert_value(row[i]))
                    arrays = [
                        pa.array(columns[i], type=schema.field(i).type)
                        for i in range(n_cols)
                    ]
                    batch = pa.record_batch(arrays, schema=schema)
                    if writer is None:
                        writer = pq.ParquetWriter(parquet_file, schema)
                    writer.write_batch(batch)
                    row_count += len(rows)
            finally:
                if writer is not None:
                    writer.close()
    
        if row_count == 0:
            return 0
    
        elapsed = time.perf_counter() - t0
        rate = f"{int(row_count / elapsed):,} rows/sec" if elapsed > 0 else "n/a"
        print(
            f"{schema_name}.{table_name} → {parquet_file}: {row_count:,} rows downloaded "
            f"in {elapsed:.2f}s ({rate})"
        )
        return row_count
    
  7. 添加扩充挂钩。 enrich_parquet 是一个占位符,你可以在上传数据之前向数据添加转换、派生列或联接。 在本快速入门中,这实际上是一个不执行任何操作(no-op)的过程,即返回未更改的文件路径。

    def enrich_parquet(parquet_file: str) -> str:
        """Enrich a parquet file before upload. Returns the (possibly new) file path."""
        # TODO: add transformations, derived columns, joins, etc.
        print(f"Enriching {parquet_file} (no-op)")
        return parquet_file
    
  8. 添加上传函数。 upload_parquet 从 Parquet 文件读取 Arrow 模式,生成并执行 DROP/CREATE TABLE DDL 来设置目标环境,然后分批读取文件并调用 cursor.bulkcopy() 来实现高性能的批量插入。 该 table_lock=True 选项通过最大程度地减少锁争用来提高吞吐量。 上传完成后,该函数将运行一个 SELECT COUNT(*) 来核对行计数是否匹配。

    def upload_parquet(conn, parquet_file: str, target: str) -> int:
        """Upload a parquet file into a SQL table via BCP. Returns row count."""
        # ── Create target table from parquet schema ──
        pf_schema = pq.read_schema(parquet_file)
        with conn.cursor() as cursor:
            cursor.execute(_create_table_ddl(target, pf_schema))
        conn.commit()
    
        # ── Bulk insert ──
        uploaded = 0
        t0 = time.perf_counter()
        with pq.ParquetFile(parquet_file) as pf:
            with conn.cursor() as cursor:
                for batch in pf.iter_batches(batch_size=BATCH_SIZE):
                    rows = zip(*(col.to_pylist() for col in batch.columns))
                    cursor.bulkcopy(
                        target, rows, batch_size=BATCH_SIZE,
                        table_lock=True, timeout=3600,
                    )
                    uploaded += batch.num_rows
        elapsed = time.perf_counter() - t0
    
        # ── Verify ──
        with conn.cursor() as cursor:
            cursor.execute(f"SELECT COUNT(*) FROM {target}")
            count = cursor.fetchone()[0]
    
        rate = f"{int(uploaded / elapsed):,} rows/sec" if elapsed > 0 else "n/a"
        print(
            f"{parquet_file} → {target}: {uploaded:,} rows uploaded "
            f"in {elapsed:.2f}s ({rate}) "
            f"| verified: {count:,}"
        )
        return uploaded
    
  9. 添加编排功能。 transfer_tables 将三个阶段联系在一起。 它连接到源数据库,通过以下方式 INFORMATION_SCHEMA.TABLES发现给定架构中的所有基表:将每个基表下载到本地 Parquet 文件,运行扩充挂钩,然后连接到目标数据库并上传每个文件。

    def transfer_tables(
        source_conn_str: str,
        dest_conn_str: str,
        source_schema: str,
        dest_schema: str,
    ) -> None:
        """Download all tables from source DB/schema to parquet, upload to dest DB/schema."""
        _validate_ident(source_schema)
        _validate_ident(dest_schema)
    
        parquet_dir = source_schema
        os.makedirs(parquet_dir, exist_ok=True)
    
        # ── Download from source ──
        with mssql_python.connect(source_conn_str) as src_conn:
            with src_conn.cursor() as cursor:
                cursor.execute(
                    "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES "
                    "WHERE TABLE_SCHEMA = ? AND TABLE_TYPE = 'BASE TABLE' "
                    "ORDER BY TABLE_NAME",
                    (source_schema,),
                )
                tables = [row[0] for row in cursor.fetchall()]
    
            print(f"Found {len(tables)} {source_schema} tables: {', '.join(tables)}\n")
    
            parquet_files = []
            for table_name in tables:
                parquet_file = os.path.join(parquet_dir, f"{table_name}.parquet")
                row_count = download_table(src_conn, source_schema, table_name, parquet_file)
                if row_count == 0:
                    print(f"{source_schema}.{table_name}: empty, skipping")
                else:
                    parquet_files.append((table_name, parquet_file))
    
        # ── Enrich parquet files ──
        enriched = []
        for table_name, parquet_file in parquet_files:
            enriched.append((table_name, enrich_parquet(parquet_file)))
    
        # ── Upload to destination ──
        with mssql_python.connect(dest_conn_str) as dest_conn:
            for table_name, parquet_file in enriched:
                target = f"{dest_schema}.[{table_name}]"
                upload_parquet(dest_conn, parquet_file, target)
    
  10. 最后,添加 main 入口点。 它会加载 .env 文件、使用源连接字符串和目标连接字符串进行调用 transfer_tables ,并输出总运行时间。

    def main():
        load_dotenv()
        t_start = time.perf_counter()
    
        transfer_tables(
            source_conn_str=os.environ["SOURCE_CONNECTION_STRING"],
            dest_conn_str=os.environ["DEST_CONNECTION_STRING"],
            source_schema="SalesLT",
            dest_schema="dbo",
        )
    
        print(f"Total: {time.perf_counter() - t_start:.2f}s")
    
    
    if __name__ == "__main__":
        main()
    
  11. 保存并关闭 main.py

保存连接字符串

  1. 打开.gitignore文件,并为.env文件添加一个排除项。 文件应类似于此示例。 完成后,请务必保存并关闭它。

    # Python-generated files
    __pycache__/
    *.py[oc]
    build/
    dist/
    wheels/
    *.egg-info
    
    # Virtual environments
    .venv
    
    # Connection strings and secrets
    .env
    
  2. 在当前目录中,创建一个名为 .env 的新文件。

  3. .env 文件中,为源连接字符串和目标连接字符串添加条目。 将占位符值替换为实际的服务器和数据库名称。

    SOURCE_CONNECTION_STRING="Server=<source_server_name>;Database=<source_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive"
    DEST_CONNECTION_STRING="Server=<dest_server_name>;Database=<dest_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive"
    

    小窍门

    此处使用的连接字符串在很大程度上取决于要连接到的 SQL 数据库的类型。 如果要连接到 Fabric 中的 Azure SQL 数据库SQL 数据库,请使用连接字符串选项卡中的 ODBC 连接字符串。可能需要根据方案调整身份验证类型。 有关连接字符串及其语法的详细信息,请参阅 连接字符串语法参考

小窍门

在 macOS 上, ActiveDirectoryInteractiveActiveDirectoryDefault 都可用于 Microsoft Entra 身份验证。 ActiveDirectoryInteractive 每次运行脚本时都会提示你登录。 若要避免重复的登录提示,请通过 Azure CLI 运行 az login 进行一次登录,然后使用 ActiveDirectoryDefault 来重用缓存的凭据。

使用 uv run 命令执行脚本

  1. 在之前所在的终端窗口中,或打开同一目录的新终端窗口,运行以下命令。

     uv run main.py
    

    下面是脚本完成时的预期输出。

    Found 12 SalesLT tables: Address, Customer, CustomerAddress, ...
    
    SalesLT.Address → SalesLT/Address.parquet: 450 rows downloaded in 0.15s (3,000 rows/sec)
    ...
    SalesLT/Address.parquet → dbo.[Address]: 450 rows uploaded in 0.10s (4,500 rows/sec) | verified: 450
    ...
    Total: 2.35s
    
  2. 使用 SQL Server Management Studio(SSMS)VS Code 的 MSSQL 扩展 连接到目标数据库,并验证表和数据是否已成功创建。

  3. 若要将脚本部署到另一台计算机,请将除文件夹以外的 .venv 所有文件复制到另一台计算机。 第一次运行时重新创建虚拟环境。

代码的工作原理

应用程序在三个阶段中执行完整往返数据传输:

  1. 下载:连接到源数据库,从 INFORMATION_SCHEMA.COLUMNS中读取列元数据,生成 Apache Arrow 架构,然后将每个表下载到本地 Parquet 文件中。
  2. 扩充 (可选):提供挂钩(enrich_parquet),可在其中添加转换、派生列或联接,然后再上传。
  3. 上传:批量读取每个 Parquet 文件,使用从 Arrow 架构元数据生成的 DDL 重新创建目标数据库中的表,然后使用 cursor.bulkcopy() 进行高性能大容量插入。

后续步骤

访问mssql-python 驱动程序的 GitHub 存储库以获取更多示例,并贡献想法或报告问题。