Sdílet prostřednictvím


Rychlý start: Hromadné kopírování pomocí ovladače mssql-python pro Python

V tomto rychlém průvodci použijete mssql-python ovladač k hromadnému kopírování dat mezi databázemi. Aplikace stáhne tabulky ze zdrojového schématu databáze do místních souborů Parquet pomocí Apache Arrow a pak je nahraje do cílové databáze pomocí vysoce výkonné bulkcopy metody. Tento vzor můžete použít k migraci, replikaci nebo transformaci dat mezi SQL Serverem, Azure SQL Database a SQL databází v prostředí Fabric.

Ovladač mssql-python nevyžaduje žádné externí závislosti na počítačích s Windows. Ovladač nainstaluje všechno, co potřebuje, s jednou pip instalací, což vám umožní používat nejnovější verzi ovladače pro nové skripty bez přerušení jiných skriptů, které nemáte čas upgradovat a testovat.

Dokumentace mssql-python, zdrojový kód mssql-python, Balíček (PyPI) uv

Předpoklady

  • Python 3

    • Pokud ještě nemáte Python, nainstalujte Python runtime a správce balíčků pip z python.org.

    • Nechcete používat vlastní prostředí? Otevřete jako devcontainer pomocí GitHub Codespaces.

  • Visual Studio Code s následujícími rozšířeními:

  • Azure Command-Line Interface (CLI) pro ověřování bez hesla v systému macOS a Linux.

  • Pokud ještě uvnemáte, postupujte podle pokynů k instalaci.

  • Zdrojová databáze na SQL Serveru, Azure SQL Database nebo SQL databázi ve Fabric s AdventureWorks2025 ukázkovým schématem a platným připojovacím řetězcem.

  • Cílová databáze na serveru SQL Server, Azure SQL Database nebo databázi SQL ve Fabric s platným připojovacím řetězcem. Uživatel musí mít oprávnění k vytváření a zápisu do tabulek. Pokud nemáte druhou databázi, můžete změnit cílový připojovací řetězec tak, aby odkazovat na stejnou databázi, a použít jiné schéma pro cílové tabulky.

  • Nainstalujte požadavky specifické pro jednorázový operační systém.

    apk add libtool krb5-libs krb5-dev
    

Vytvoření databáze SQL

Tento rychlý start vyžaduje jako zdrojovou databázi schéma AdventureWorks2025 Lightweight .

Vytvoření projektu a spuštění kódu

  1. Vytvoření nového projektu
  2. Přidání závislostí
  3. Spuštění editoru Visual Studio Code
  4. Aktualizace pyproject.toml
  5. Aktualizace main.py
  6. Uložte připojovací řetězce
  7. Použij uv run pro spuštění skriptu

Vytvoření nového projektu

  1. Otevřete příkazový řádek ve vývojovém adresáři. Pokud ho nemáte, vytvořte nový adresář s názvem python, scriptsatd. Vyhněte se složkám na OneDrivu, synchronizace může narušit správu vašeho virtuálního prostředí.

  2. Vytvořte nový projekt pomocí uvpříkazu .

    uv init mssql-python-bcp-qs
    cd mssql-python-bcp-qs
    

Přidejte závislosti

Ve stejném adresáři nainstalujte balíčky mssql-python, python-dotenv a pyarrow.

uv add mssql-python python-dotenv pyarrow

Spusťte Visual Studio Code

Ve stejném adresáři spusťte následující příkaz.

code .

Aktualizace pyproject.toml

  1. Soubor pyproject.toml obsahuje metadata pro váš projekt. Otevřete soubor v oblíbeném editoru.

  2. Zkontrolujte obsah souboru. Měl by se podobat tomuto příkladu. Všimněte si verze Pythonu a závislosti pro mssql-python, které používá >= k definování minimální verze. Pokud dáváte přednost přesné verzi, změňte >= před číslem verze na ==. Vyřešené verze každého balíčku jsou pak uloženy v uv.lock. Soubor lockfile zajišťuje, aby vývojáři pracující na projektu používali konzistentní verze balíčků. Také zajišťuje, aby se při distribuci balíčku koncovým uživatelům používala úplně stejná sada verzí balíčků. Soubor uv.lock byste neměli upravovat.

    [project]
    name = "mssql-python-bcp-qs"
    version = "0.1.0"
    description = "Add your description here"
    readme = "README.md"
    requires-python = ">=3.11"
    dependencies = [
        "mssql-python>=1.4.0",
        "python-dotenv>=1.1.1",
        "pyarrow>=19.0.0",
    ]
    
  3. Aktualizujte popis, aby byl popisnější.

    description = "Bulk copies data between SQL databases using mssql-python and Apache Arrow"
    
  4. Uložte a zavřete soubor.

Aktualizace main.py

  1. Otevřete soubor s názvem main.py. Měl by se podobat tomuto příkladu.

    def main():
        print("Hello from mssql-python-bcp-qs!")
    
    if __name__ == "__main__":
        main()
    
  2. main.py Obsah nahraďte následujícími bloky kódu. Každý blok vychází z předchozího bloku a měl by být v main.py daném pořadí.

    Návod

    Pokud má Visual Studio Code potíže s řešením balíčků, musíte interpreta aktualizovat tak, aby používal virtuální prostředí.

  3. V horní části souboru main.pypřidejte importy a konstanty. Skript používá mssql_python pro připojení k databázi, pyarrow a pyarrow.parquet pro zpracování sloupcových dat a vstupně-výstupní operace souborů Parquet, python-dotenv pro načítání připojovacích řetězců ze souboru .env, a zkompilovaný vzor regulárního výrazu, který ověřuje identifikátory SQL, aby se zabránilo injection.

    """Round-trip: download tables from a source DB/schema to parquet, upload to a destination DB/schema."""
    
    import os
    import re
    import time
    from uuid import UUID
    
    import pyarrow as pa
    import pyarrow.parquet as pq
    from dotenv import load_dotenv
    import mssql_python
    
    BATCH_SIZE = 64_000
    _SAFE_IDENT = re.compile(r"^[A-Za-z0-9_]+$")
    
    
    def _validate_ident(name: str) -> str:
        if not _SAFE_IDENT.match(name):
            raise ValueError(f"Unsafe SQL identifier: {name!r}")
        return name
    
  4. Pod importy přidejte mapování typu SQL-to-Arrow. Tento slovník překládá typy sloupců SQL Serveru na jejich ekvivalenty Apache Arrow, aby se při zápisu do Parquet zachovala věrnost dat. Tyto dvě pomocné funkce vytvářejí přesné řetězce typu SQL (například NVARCHAR(100) nebo DECIMAL(18,2)) z INFORMATION_SCHEMA metadat a určí odpovídající Arrow typ pro každý sloupec.

    _SQL_TO_ARROW = {
        "bit": pa.bool_(),
        "tinyint": pa.uint8(),
        "smallint": pa.int16(),
        "int": pa.int32(),
        "bigint": pa.int64(),
        "float": pa.float64(),
        "real": pa.float32(),
        "smallmoney": pa.decimal128(10, 4),
        "money": pa.decimal128(19, 4),
        "date": pa.date32(),
        "datetime": pa.timestamp("ms"),
        "datetime2": pa.timestamp("us"),
        "smalldatetime": pa.timestamp("s"),
        "uniqueidentifier": pa.string(),
        "xml": pa.string(),
        "image": pa.binary(),
        "binary": pa.binary(),
        "varbinary": pa.binary(),
        "timestamp": pa.binary(),
    }
    
    
    def _sql_type_str(data_type: str, max_length: int, precision: int, scale: int) -> str:
        """Build the exact SQL type string from INFORMATION_SCHEMA metadata."""
        dt = data_type.lower()
        if dt in ("char", "varchar", "nchar", "nvarchar", "binary", "varbinary"):
            length = "MAX" if max_length == -1 else str(max_length)
            return f"{dt.upper()}({length})"
        if dt in ("decimal", "numeric"):
            return f"{dt.upper()}({precision},{scale})"
        return dt.upper()
    
    
    def _arrow_type(sql_type: str, precision: int, scale: int) -> pa.DataType:
        sql_type = sql_type.lower()
        if sql_type in _SQL_TO_ARROW:
            return _SQL_TO_ARROW[sql_type]
        if sql_type in ("decimal", "numeric"):
            return pa.decimal128(precision, scale)
        if sql_type in ("char", "varchar", "nchar", "nvarchar", "text", "ntext", "sysname"):
            return pa.string()
        return pa.string()
    
    
    def _convert_value(v):
        """Convert a SQL value to an Arrow-compatible Python type."""
        if isinstance(v, UUID):
            return str(v)
        return v
    
  5. Přidejte introspekci schématu a funkce generování DDL. _get_arrow_schema INFORMATION_SCHEMA.COLUMNS provádí dotazy využívající parametrizované dotazy, sestaví Arrow schéma a uloží původní typ SQL jako metadata polí, aby bylo možné cílovou tabulku znovu vytvořit s přesnými definicemi sloupců. _create_table_ddl čte zpět tato metadata k generování DROP/CREATE TABLE DDL. Typ timestamp (rowversion) je namapován na VARBINARY(8) , protože se automaticky generuje a není možné ho přímo vložit.

    def _get_arrow_schema(cursor, schema_name: str, table_name: str) -> pa.Schema:
        """Build an Arrow schema from INFORMATION_SCHEMA.COLUMNS.
    
        Stores the original SQL type as field metadata so the round-trip
        CREATE TABLE can reproduce exact column definitions.
        """
        cursor.execute(
            "SELECT COLUMN_NAME, DATA_TYPE, "
            "COALESCE(CHARACTER_MAXIMUM_LENGTH, 0), "
            "COALESCE(NUMERIC_PRECISION, 0), "
            "COALESCE(NUMERIC_SCALE, 0), "
            "IS_NULLABLE "
            "FROM INFORMATION_SCHEMA.COLUMNS "
            "WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? "
            "ORDER BY ORDINAL_POSITION",
            (schema_name, table_name),
        )
        rows = cursor.fetchall()
        if not rows:
            raise ValueError(f"No columns found for {schema_name}.{table_name}")
        fields = []
        for col_name, data_type, max_len, precision, scale, nullable in rows:
            arrow_t = _arrow_type(data_type, precision, scale)
            sql_t = _sql_type_str(data_type, max_len, precision, scale)
            fields.append(
                pa.field(
                    col_name, arrow_t,
                    nullable=(nullable == "YES"),
                    metadata={"sql_type": sql_t},
                )
            )
        return pa.schema(fields)
    
    
    def _create_table_ddl(target: str, schema: pa.Schema) -> str:
        """Build DROP/CREATE TABLE DDL from Arrow schema with SQL type metadata."""
        col_defs = []
        for f in schema:
            sql_t = f.metadata[b"sql_type"].decode()
            # timestamp/rowversion is auto-generated and not insertable
            if sql_t == "TIMESTAMP":
                sql_t = "VARBINARY(8)"
            null = "" if f.nullable else " NOT NULL"
            col_defs.append(f"[{f.name}] {sql_t}{null}")
        col_defs_str = ",\n    ".join(col_defs)
        return (
            f"IF OBJECT_ID('{target}', 'U') IS NOT NULL DROP TABLE {target};\n"
            f"CREATE TABLE {target} (\n    {col_defs_str}\n);"
        )
    
  6. Přidejte funkci pro stažení. download_table streamuje řádky ze zdrojové tabulky v dávkách pomocí fetchmany(), převede každou hodnotu na typ Python kompatibilní se šipkou a zapíše dávky záznamů přírůstkově do místního souboru Parquet pomocí ParquetWriter. To udržuje paměť ohraničenou bez ohledu na velikost tabulky. Funkce používá dva samostatné kurzory: jeden ke čtení metadat sloupců a druhý pro streamování dat.

    def download_table(conn, schema_name: str, table_name: str, parquet_file: str) -> int:
        """Download a SQL table to a parquet file. Returns row count (0 if empty)."""
        _validate_ident(table_name)
        source = f"{schema_name}.[{table_name}]"
    
        with conn.cursor() as cursor:
            schema = _get_arrow_schema(cursor, schema_name, table_name)
    
        n_cols = len(schema)
        row_count = 0
        t0 = time.perf_counter()
    
        with conn.cursor() as cursor:
            cursor.execute(f"SELECT * FROM {source}")
            writer = None
            try:
                while True:
                    rows = cursor.fetchmany(BATCH_SIZE)
                    if not rows:
                        break
                    columns = [[] for _ in range(n_cols)]
                    for row in rows:
                        for i in range(n_cols):
                            columns[i].append(_convert_value(row[i]))
                    arrays = [
                        pa.array(columns[i], type=schema.field(i).type)
                        for i in range(n_cols)
                    ]
                    batch = pa.record_batch(arrays, schema=schema)
                    if writer is None:
                        writer = pq.ParquetWriter(parquet_file, schema)
                    writer.write_batch(batch)
                    row_count += len(rows)
            finally:
                if writer is not None:
                    writer.close()
    
        if row_count == 0:
            return 0
    
        elapsed = time.perf_counter() - t0
        rate = f"{int(row_count / elapsed):,} rows/sec" if elapsed > 0 else "n/a"
        print(
            f"{schema_name}.{table_name} → {parquet_file}: {row_count:,} rows downloaded "
            f"in {elapsed:.2f}s ({rate})"
        )
        return row_count
    
  7. Přidejte háček pro rozšiřování. enrich_parquet je zástupný symbol, ve kterém můžete před nahráním přidat transformace, odvozené sloupce nebo spojení k datům. V tomto rychlém startu se jedná o no-op, který vrátí cestu k souboru beze změny.

    def enrich_parquet(parquet_file: str) -> str:
        """Enrich a parquet file before upload. Returns the (possibly new) file path."""
        # TODO: add transformations, derived columns, joins, etc.
        print(f"Enriching {parquet_file} (no-op)")
        return parquet_file
    
  8. Přidejte funkci pro nahrání. upload_parquet přečte Arrow schema ze souboru Parquet, vygeneruje a spustí DROP/CREATE TABLE DDL pro přípravu cíle, poté načte soubor v dávkách a využije cursor.bulkcopy() pro vysoce výkonné hromadné vložení. Tato table_lock=True možnost zlepšuje propustnost minimalizací kolizí zámků. Po dokončení nahrávání spustí funkce SELECT COUNT(*), která ověří, že se shoduje počet řádků.

    def upload_parquet(conn, parquet_file: str, target: str) -> int:
        """Upload a parquet file into a SQL table via BCP. Returns row count."""
        # ── Create target table from parquet schema ──
        pf_schema = pq.read_schema(parquet_file)
        with conn.cursor() as cursor:
            cursor.execute(_create_table_ddl(target, pf_schema))
        conn.commit()
    
        # ── Bulk insert ──
        uploaded = 0
        t0 = time.perf_counter()
        with pq.ParquetFile(parquet_file) as pf:
            with conn.cursor() as cursor:
                for batch in pf.iter_batches(batch_size=BATCH_SIZE):
                    rows = zip(*(col.to_pylist() for col in batch.columns))
                    cursor.bulkcopy(
                        target, rows, batch_size=BATCH_SIZE,
                        table_lock=True, timeout=3600,
                    )
                    uploaded += batch.num_rows
        elapsed = time.perf_counter() - t0
    
        # ── Verify ──
        with conn.cursor() as cursor:
            cursor.execute(f"SELECT COUNT(*) FROM {target}")
            count = cursor.fetchone()[0]
    
        rate = f"{int(uploaded / elapsed):,} rows/sec" if elapsed > 0 else "n/a"
        print(
            f"{parquet_file} → {target}: {uploaded:,} rows uploaded "
            f"in {elapsed:.2f}s ({rate}) "
            f"| verified: {count:,}"
        )
        return uploaded
    
  9. Přidejte funkci orchestrace. transfer_tables spojí tři fáze dohromady. Připojí se ke zdrojové databázi, zjistí všechny základní tabulky v daném schématu prostřednictvím INFORMATION_SCHEMA.TABLES, stáhne každý z nich do místního souboru Parquet, spustí háček rozšiřování, pak se připojí k cílové databázi a nahraje každý soubor.

    def transfer_tables(
        source_conn_str: str,
        dest_conn_str: str,
        source_schema: str,
        dest_schema: str,
    ) -> None:
        """Download all tables from source DB/schema to parquet, upload to dest DB/schema."""
        _validate_ident(source_schema)
        _validate_ident(dest_schema)
    
        parquet_dir = source_schema
        os.makedirs(parquet_dir, exist_ok=True)
    
        # ── Download from source ──
        with mssql_python.connect(source_conn_str) as src_conn:
            with src_conn.cursor() as cursor:
                cursor.execute(
                    "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES "
                    "WHERE TABLE_SCHEMA = ? AND TABLE_TYPE = 'BASE TABLE' "
                    "ORDER BY TABLE_NAME",
                    (source_schema,),
                )
                tables = [row[0] for row in cursor.fetchall()]
    
            print(f"Found {len(tables)} {source_schema} tables: {', '.join(tables)}\n")
    
            parquet_files = []
            for table_name in tables:
                parquet_file = os.path.join(parquet_dir, f"{table_name}.parquet")
                row_count = download_table(src_conn, source_schema, table_name, parquet_file)
                if row_count == 0:
                    print(f"{source_schema}.{table_name}: empty, skipping")
                else:
                    parquet_files.append((table_name, parquet_file))
    
        # ── Enrich parquet files ──
        enriched = []
        for table_name, parquet_file in parquet_files:
            enriched.append((table_name, enrich_parquet(parquet_file)))
    
        # ── Upload to destination ──
        with mssql_python.connect(dest_conn_str) as dest_conn:
            for table_name, parquet_file in enriched:
                target = f"{dest_schema}.[{table_name}]"
                upload_parquet(dest_conn, parquet_file, target)
    
  10. Nakonec přidejte vstupní bod main. Načte .env soubor, volá transfer_tables se zdrojovými a cílovými připojovacími řetězci a vytiskne celkový uplynulý čas.

    def main():
        load_dotenv()
        t_start = time.perf_counter()
    
        transfer_tables(
            source_conn_str=os.environ["SOURCE_CONNECTION_STRING"],
            dest_conn_str=os.environ["DEST_CONNECTION_STRING"],
            source_schema="SalesLT",
            dest_schema="dbo",
        )
    
        print(f"Total: {time.perf_counter() - t_start:.2f}s")
    
    
    if __name__ == "__main__":
        main()
    
  11. Uložte a zavřete main.py.

Uložte připojovací řetězce

  1. .gitignore Otevřete soubor a přidejte vyloučení souborů.env. Soubor by měl být podobný tomuto příkladu. Až budete hotovi, nezapomeňte ho uložit a zavřít.

    # Python-generated files
    __pycache__/
    *.py[oc]
    build/
    dist/
    wheels/
    *.egg-info
    
    # Virtual environments
    .venv
    
    # Connection strings and secrets
    .env
    
  2. V aktuálním adresáři vytvořte nový soubor s názvem .env.

  3. .env V souboru přidejte položky pro zdrojové a cílové připojovací řetězce. Nahraďte zástupné hodnoty skutečnými názvy serverů a databází.

    SOURCE_CONNECTION_STRING="Server=<source_server_name>;Database=<source_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive"
    DEST_CONNECTION_STRING="Server=<dest_server_name>;Database=<dest_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive"
    

    Návod

    Připojovací řetězec použitý zde do značné míry závisí na typu databáze SQL, ke které se připojujete. Pokud se připojujete k Azure SQL Database nebo k SQL databázi ve Fabric, použijte připojovací řetězec ODBC z karty Připojovací řetězce. Možná budete muset upravit typ ověřování v závislosti na vašem scénáři. Další informace o připojovacích řetězcích a jejich syntaxi najdete v referenčních informacích k syntaxi připojovacího řetězce.

Návod

V systému macOS funguje ověřování pomocí Microsoft Entra jak s ActiveDirectoryInteractive, tak s ActiveDirectoryDefault. ActiveDirectoryInteractive vás vyzve k přihlášení při každém spuštění skriptu. Pokud se chcete vyhnout opakovaným výzvám k přihlášení, přihlaste se jednou přes Azure CLI spuštěním az logina pak použijte ActiveDirectoryDefaultpříkaz , který znovu použije přihlašovací údaje uložené v mezipaměti.

Použijte 'uv run' ke spuštění skriptu

  1. V okně terminálu před nebo v novém okně terminálu, které se otevře ve stejném adresáři, spusťte následující příkaz.

     uv run main.py
    

    Tady je očekávaný výstup po dokončení skriptu.

    Found 12 SalesLT tables: Address, Customer, CustomerAddress, ...
    
    SalesLT.Address → SalesLT/Address.parquet: 450 rows downloaded in 0.15s (3,000 rows/sec)
    ...
    SalesLT/Address.parquet → dbo.[Address]: 450 rows uploaded in 0.10s (4,500 rows/sec) | verified: 450
    ...
    Total: 2.35s
    
  2. Připojte se k cílové databázi pomocí aplikace SQL Server Management Studio (SSMS) nebo rozšíření MSSQL pro VS Code a ověřte, že se tabulky a data úspěšně vytvořily.

  3. Pokud chcete skript nasadit do jiného počítače, zkopírujte všechny soubory s výjimkou .venv složky na druhý počítač. Virtuální prostředí se znovu vytvoří při prvním spuštění.

Jak kód funguje

Aplikace provádí kompletní přenos dat během zpáteční cesty ve třech fázích:

  1. Ke stažení: Připojí se ke zdrojové databázi, přečte metadata sloupců z INFORMATION_SCHEMA.COLUMNS, sestaví schéma Apache Arrow a pak stáhne každou tabulku do místního souboru Parquet.
  2. Obohacení (volitelné): Poskytuje háček (enrich_parquet), kde můžete před nahráním přidat transformace, odvozené sloupce nebo spojení.
  3. Nahrání: Načte každý soubor Parquet v dávkách, znovu vytvoří tabulku v cílové databázi pomocí DDL generovaného z metadat schématu Arrow a použije cursor.bulkcopy() k výkonnému hromadnému vložení.

Další krok

Další příklady najdete v mssql-python úložišti GitHubu ovladače, kde můžete přispívat nápady nebo hlásit problémy.