Megosztás a következőn keresztül:


Rövid útmutató: Tömeges másolás a Python mssql-python illesztőprogramjával

Ebben a rövid útmutatóban az mssql-python illesztővel tömegesen másolhat adatokat az adatbázisok között. Az alkalmazás letölti a táblákat egy forrásadatbázis-sémából a helyi Parquet-fájlokba az Apache Arrow használatával, majd feltölti őket egy céladatbázisba a nagy teljesítményű bulkcopy módszerrel. Ezzel a mintával migrálhatja, replikálhatja vagy átalakíthatja az adatokat az SQL Server, az Azure SQL Database és az SQL Database között a Fabricben.

Az mssql-python illesztőprogram nem igényel külső függőségeket a Windows rendszerű gépeken. Az illesztőprogram egyetlen pip telepítéssel mindent telepít, amire szüksége van, így az illesztőprogram legújabb verzióját használhatja az új szkriptekhez anélkül, hogy megszegi azokat a szkripteket, amelyek frissítésére és tesztelésére nincs ideje.

az mssql-python dokumentációja | mssql-python forráskód | Csomag (PyPI) | Uv

Előfeltételek

  • Python 3

    • Ha még nem rendelkezik Pythonnal, telepítse a Python futtatókörnyezetet és a pip csomagkezelőta python.org.

    • Nem szeretné használni a saját környezetét? Nyissa meg devcontainerként a GitHub Codespaces használatával.

  • Visual Studio Code a következő bővítményekkel:

  • Az Azure Command-Line Interface (CLI) a jelszó nélküli hitelesítéshez macOS és Linux rendszeren.

  • Ha még nem tette meg uv, kövesse a telepítési utasításokat.

  • Forrásadatbázis az SQL Serveren, az Azure SQL Database-en vagy az SQL Database-ben a Fabricben a AdventureWorks2025 mintasémával és egy érvényes kapcsolati sztringgel.

  • Egy céladatbázis az SQL Serveren, az Azure SQL-adatbázison vagy a Fabricben lévő SQL-adatbázison, érvényes kapcsolati karakterlánccal. A felhasználónak engedéllyel kell rendelkeznie a táblák létrehozásához és írásához. Ha nincs második adatbázisa, módosíthatja a célkapcsolati sztringet úgy, hogy ugyanarra az adatbázisra mutasson, és használjon másik sémát a céltáblákhoz.

  • Egyszeri operációs rendszerspecifikus előfeltételek telepítése.

    apk add libtool krb5-libs krb5-dev
    

SQL-adatbázis létrehozása

Ehhez a rövid útmutatóhoz az AdventureWorks2025 egyszerűsített sémája szükséges forrásadatbázisként.

A projekt létrehozása és a kód futtatása

  1. Új projekt létrehozása
  2. Függőségek hozzáadása
  3. A Visual Studio Code indítása
  4. Pyproject.toml frissítése
  5. Main.py frissítése
  6. A kapcsolati sztringek mentése
  7. A szkript végrehajtása uv-futtatás használatával

Új projekt létrehozása

  1. Nyisson meg egy parancssort a fejlesztői címtárban. Ha nem rendelkezik ilyennel, hozzon létre egy új , stb. nevű pythonscriptskönyvtárat. Kerülje a mappákat a OneDrive-on, a szinkronizálás megzavarhatja a virtuális környezet kezelését.

  2. Hozzon létre egy új projektet a uv segítségével.

    uv init mssql-python-bcp-qs
    cd mssql-python-bcp-qs
    

Függőségek hozzáadása

Ugyanabban a könyvtárban telepítse a mssql-python, python-dotenvés pyarrow a csomagokat.

uv add mssql-python python-dotenv pyarrow

Indítsa el a Visual Studio Code-ot

Ugyanabban a könyvtárban futtassa a következő parancsot.

code .

Pyproject.toml frissítése

  1. A pyproject.toml a projekt metaadatait tartalmazza. Nyissa meg a fájlt a kedvenc szerkesztőjében.

  2. Tekintse át a fájl tartalmát. Ennek hasonlónak kell lennie ehhez a példához. Jegyezze fel a Python-verziót és a függőséget egy minimális verzió meghatározásához mssql-python>= . Ha pontos verziót szeretne, módosítsa a >= verziószám előtti értéket ==. Az egyes csomagok feloldott verzióit ezután az uv.lock tárolja. A lockfile biztosítja, hogy a projekten dolgozó fejlesztők egységes csomagverziókat használjanak. Azt is biztosítja, hogy pontosan ugyanazt a csomagverziót használja a csomag végfelhasználóknak való terjesztésekor. Ne szerkessze a uv.lock fájlt.

    [project]
    name = "mssql-python-bcp-qs"
    version = "0.1.0"
    description = "Add your description here"
    readme = "README.md"
    requires-python = ">=3.11"
    dependencies = [
        "mssql-python>=1.4.0",
        "python-dotenv>=1.1.1",
        "pyarrow>=19.0.0",
    ]
    
  3. Módosítsa a leírást részletesebbre.

    description = "Bulk copies data between SQL databases using mssql-python and Apache Arrow"
    
  4. Mentse és zárja be a fájlt.

Main.py frissítése

  1. Nyissa meg a nevű main.pyfájlt. Ennek hasonlónak kell lennie ehhez a példához.

    def main():
        print("Hello from mssql-python-bcp-qs!")
    
    if __name__ == "__main__":
        main()
    
  2. Cserélje le a következő kódblokkok tartalmát main.py . Minden blokk az előzőre épül, és main.py sorrendben kell elhelyezni.

    Jótanács

    Ha a Visual Studio Code-nak problémái vannak a csomagok megoldásával, frissítenie kell az értelmezőt a virtuális környezet használatára.

  3. A felső részen main.pyadja hozzá az importot és az állandókat. A szkript a mssql_python az adatbázis-kapcsolathoz, a pyarrow és pyarrow.parquet az oszlopos adatkezeléshez és a Parquet-fájl I/O-hoz, a python-dotenv a .env fájlból való kapcsolati sztringek betöltéséhez, valamint egy lefordított regex minta, amely az SQL-azonosítók ellenőrzésére szolgál az injektálás megakadályozása érdekében.

    """Round-trip: download tables from a source DB/schema to parquet, upload to a destination DB/schema."""
    
    import os
    import re
    import time
    from uuid import UUID
    
    import pyarrow as pa
    import pyarrow.parquet as pq
    from dotenv import load_dotenv
    import mssql_python
    
    BATCH_SIZE = 64_000
    _SAFE_IDENT = re.compile(r"^[A-Za-z0-9_]+$")
    
    
    def _validate_ident(name: str) -> str:
        if not _SAFE_IDENT.match(name):
            raise ValueError(f"Unsafe SQL identifier: {name!r}")
        return name
    
  4. Az importálás alatt adja hozzá az SQL-to-Arrow típusú leképezést. Ez a szótár lefordítja az SQL Server-oszloptípusokat az Apache Arrow megfelelőire, így az adatok hűsége megmarad a Parquetbe való íráskor. A két segédfüggvény pontos SQL-típusú sztringeket hoz létre (például NVARCHAR(100) vagy DECIMAL(18,2)) a INFORMATION_SCHEMA metaadatokból, és feloldja az egyes oszlopokhoz tartozó megfelelő Arrow-típust.

    _SQL_TO_ARROW = {
        "bit": pa.bool_(),
        "tinyint": pa.uint8(),
        "smallint": pa.int16(),
        "int": pa.int32(),
        "bigint": pa.int64(),
        "float": pa.float64(),
        "real": pa.float32(),
        "smallmoney": pa.decimal128(10, 4),
        "money": pa.decimal128(19, 4),
        "date": pa.date32(),
        "datetime": pa.timestamp("ms"),
        "datetime2": pa.timestamp("us"),
        "smalldatetime": pa.timestamp("s"),
        "uniqueidentifier": pa.string(),
        "xml": pa.string(),
        "image": pa.binary(),
        "binary": pa.binary(),
        "varbinary": pa.binary(),
        "timestamp": pa.binary(),
    }
    
    
    def _sql_type_str(data_type: str, max_length: int, precision: int, scale: int) -> str:
        """Build the exact SQL type string from INFORMATION_SCHEMA metadata."""
        dt = data_type.lower()
        if dt in ("char", "varchar", "nchar", "nvarchar", "binary", "varbinary"):
            length = "MAX" if max_length == -1 else str(max_length)
            return f"{dt.upper()}({length})"
        if dt in ("decimal", "numeric"):
            return f"{dt.upper()}({precision},{scale})"
        return dt.upper()
    
    
    def _arrow_type(sql_type: str, precision: int, scale: int) -> pa.DataType:
        sql_type = sql_type.lower()
        if sql_type in _SQL_TO_ARROW:
            return _SQL_TO_ARROW[sql_type]
        if sql_type in ("decimal", "numeric"):
            return pa.decimal128(precision, scale)
        if sql_type in ("char", "varchar", "nchar", "nvarchar", "text", "ntext", "sysname"):
            return pa.string()
        return pa.string()
    
    
    def _convert_value(v):
        """Convert a SQL value to an Arrow-compatible Python type."""
        if isinstance(v, UUID):
            return str(v)
        return v
    
  5. Adja hozzá a sémabevezetési és DDL-létrehozási függvényeket. _get_arrow_schema paraméterezett lekérdezéseket futtat INFORMATION_SCHEMA.COLUMNS, Arrow sémát épít fel, és az eredeti SQL-típust mező metaadatként tárolja annak érdekében, hogy a céltábla pontos oszlopdefiníciókkal újra létrehozható legyen. _create_table_ddl visszaolvassa a metaadatokat a DDL létrehozásához DROP/CREATE TABLE . A(z) timestamp (rowversion) típus át van alakítva VARBINARY(8) típusra, mivel automatikus módon generálódik, és közvetlenül nem szúrható be.

    def _get_arrow_schema(cursor, schema_name: str, table_name: str) -> pa.Schema:
        """Build an Arrow schema from INFORMATION_SCHEMA.COLUMNS.
    
        Stores the original SQL type as field metadata so the round-trip
        CREATE TABLE can reproduce exact column definitions.
        """
        cursor.execute(
            "SELECT COLUMN_NAME, DATA_TYPE, "
            "COALESCE(CHARACTER_MAXIMUM_LENGTH, 0), "
            "COALESCE(NUMERIC_PRECISION, 0), "
            "COALESCE(NUMERIC_SCALE, 0), "
            "IS_NULLABLE "
            "FROM INFORMATION_SCHEMA.COLUMNS "
            "WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? "
            "ORDER BY ORDINAL_POSITION",
            (schema_name, table_name),
        )
        rows = cursor.fetchall()
        if not rows:
            raise ValueError(f"No columns found for {schema_name}.{table_name}")
        fields = []
        for col_name, data_type, max_len, precision, scale, nullable in rows:
            arrow_t = _arrow_type(data_type, precision, scale)
            sql_t = _sql_type_str(data_type, max_len, precision, scale)
            fields.append(
                pa.field(
                    col_name, arrow_t,
                    nullable=(nullable == "YES"),
                    metadata={"sql_type": sql_t},
                )
            )
        return pa.schema(fields)
    
    
    def _create_table_ddl(target: str, schema: pa.Schema) -> str:
        """Build DROP/CREATE TABLE DDL from Arrow schema with SQL type metadata."""
        col_defs = []
        for f in schema:
            sql_t = f.metadata[b"sql_type"].decode()
            # timestamp/rowversion is auto-generated and not insertable
            if sql_t == "TIMESTAMP":
                sql_t = "VARBINARY(8)"
            null = "" if f.nullable else " NOT NULL"
            col_defs.append(f"[{f.name}] {sql_t}{null}")
        col_defs_str = ",\n    ".join(col_defs)
        return (
            f"IF OBJECT_ID('{target}', 'U') IS NOT NULL DROP TABLE {target};\n"
            f"CREATE TABLE {target} (\n    {col_defs_str}\n);"
        )
    
  6. Adja hozzá a letöltési funkciót. download_table sorokat streamel egy forrástáblából kötegekben fetchmany(), az egyes értékeket Arrow-kompatibilis Python-típussá alakítja, és fokozatosan egy helyi Parquet-fájlba ParquetWriter írja a rekordokat. Így a memória a tábla méretétől függetlenül korlátozott marad. A függvény két külön kurzort használ: az egyik az oszlop metaadatainak olvasására, a másik pedig az adatok streamelésére.

    def download_table(conn, schema_name: str, table_name: str, parquet_file: str) -> int:
        """Download a SQL table to a parquet file. Returns row count (0 if empty)."""
        _validate_ident(table_name)
        source = f"{schema_name}.[{table_name}]"
    
        with conn.cursor() as cursor:
            schema = _get_arrow_schema(cursor, schema_name, table_name)
    
        n_cols = len(schema)
        row_count = 0
        t0 = time.perf_counter()
    
        with conn.cursor() as cursor:
            cursor.execute(f"SELECT * FROM {source}")
            writer = None
            try:
                while True:
                    rows = cursor.fetchmany(BATCH_SIZE)
                    if not rows:
                        break
                    columns = [[] for _ in range(n_cols)]
                    for row in rows:
                        for i in range(n_cols):
                            columns[i].append(_convert_value(row[i]))
                    arrays = [
                        pa.array(columns[i], type=schema.field(i).type)
                        for i in range(n_cols)
                    ]
                    batch = pa.record_batch(arrays, schema=schema)
                    if writer is None:
                        writer = pq.ParquetWriter(parquet_file, schema)
                    writer.write_batch(batch)
                    row_count += len(rows)
            finally:
                if writer is not None:
                    writer.close()
    
        if row_count == 0:
            return 0
    
        elapsed = time.perf_counter() - t0
        rate = f"{int(row_count / elapsed):,} rows/sec" if elapsed > 0 else "n/a"
        print(
            f"{schema_name}.{table_name} → {parquet_file}: {row_count:,} rows downloaded "
            f"in {elapsed:.2f}s ({rate})"
        )
        return row_count
    
  7. Adja hozzá a bővítőhorogot. enrich_parquet egy helyőrző, ahol átalakításokat, származtatott oszlopokat vagy illesztéseket adhat hozzá az adatokhoz a feltöltés előtt. Ebben a gyorsindító útmutatóban ez egy művelet nélküli funkció, amely változatlanul adja vissza a fájl elérési útját.

    def enrich_parquet(parquet_file: str) -> str:
        """Enrich a parquet file before upload. Returns the (possibly new) file path."""
        # TODO: add transformations, derived columns, joins, etc.
        print(f"Enriching {parquet_file} (no-op)")
        return parquet_file
    
  8. Adja hozzá a feltöltési függvényt. upload_parquetBeolvassa a Nyíl sémát a Parquet-fájlból, létrehozza és végrehajtja DROP/CREATE TABLE a DDL-t a cél előkészítéséhez, majd kötegekben olvassa be a fájlt, és nagy teljesítményű tömeges beszúrást kér.cursor.bulkcopy() A table_lock=True beállítás a zárolási versengés minimalizálásával javítja az átviteli sebességet. A feltöltés befejezése után a függvény futtat egy SELECT COUNT(*) parancsot a sorok számának ellenőrzésére.

    def upload_parquet(conn, parquet_file: str, target: str) -> int:
        """Upload a parquet file into a SQL table via BCP. Returns row count."""
        # ── Create target table from parquet schema ──
        pf_schema = pq.read_schema(parquet_file)
        with conn.cursor() as cursor:
            cursor.execute(_create_table_ddl(target, pf_schema))
        conn.commit()
    
        # ── Bulk insert ──
        uploaded = 0
        t0 = time.perf_counter()
        with pq.ParquetFile(parquet_file) as pf:
            with conn.cursor() as cursor:
                for batch in pf.iter_batches(batch_size=BATCH_SIZE):
                    rows = zip(*(col.to_pylist() for col in batch.columns))
                    cursor.bulkcopy(
                        target, rows, batch_size=BATCH_SIZE,
                        table_lock=True, timeout=3600,
                    )
                    uploaded += batch.num_rows
        elapsed = time.perf_counter() - t0
    
        # ── Verify ──
        with conn.cursor() as cursor:
            cursor.execute(f"SELECT COUNT(*) FROM {target}")
            count = cursor.fetchone()[0]
    
        rate = f"{int(uploaded / elapsed):,} rows/sec" if elapsed > 0 else "n/a"
        print(
            f"{parquet_file} → {target}: {uploaded:,} rows uploaded "
            f"in {elapsed:.2f}s ({rate}) "
            f"| verified: {count:,}"
        )
        return uploaded
    
  9. Adjon hozzá egy orchestration funkciót. transfer_tables összekapcsolja a három fázist. Csatlakozik a forrásadatbázishoz, felderíti az adott séma INFORMATION_SCHEMA.TABLESösszes alaptábláját, letölti mindegyiket egy helyi Parquet-fájlba, futtatja a bővítési horgot, majd csatlakozik a céladatbázishoz, és feltölti az egyes fájlokat.

    def transfer_tables(
        source_conn_str: str,
        dest_conn_str: str,
        source_schema: str,
        dest_schema: str,
    ) -> None:
        """Download all tables from source DB/schema to parquet, upload to dest DB/schema."""
        _validate_ident(source_schema)
        _validate_ident(dest_schema)
    
        parquet_dir = source_schema
        os.makedirs(parquet_dir, exist_ok=True)
    
        # ── Download from source ──
        with mssql_python.connect(source_conn_str) as src_conn:
            with src_conn.cursor() as cursor:
                cursor.execute(
                    "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES "
                    "WHERE TABLE_SCHEMA = ? AND TABLE_TYPE = 'BASE TABLE' "
                    "ORDER BY TABLE_NAME",
                    (source_schema,),
                )
                tables = [row[0] for row in cursor.fetchall()]
    
            print(f"Found {len(tables)} {source_schema} tables: {', '.join(tables)}\n")
    
            parquet_files = []
            for table_name in tables:
                parquet_file = os.path.join(parquet_dir, f"{table_name}.parquet")
                row_count = download_table(src_conn, source_schema, table_name, parquet_file)
                if row_count == 0:
                    print(f"{source_schema}.{table_name}: empty, skipping")
                else:
                    parquet_files.append((table_name, parquet_file))
    
        # ── Enrich parquet files ──
        enriched = []
        for table_name, parquet_file in parquet_files:
            enriched.append((table_name, enrich_parquet(parquet_file)))
    
        # ── Upload to destination ──
        with mssql_python.connect(dest_conn_str) as dest_conn:
            for table_name, parquet_file in enriched:
                target = f"{dest_schema}.[{table_name}]"
                upload_parquet(dest_conn, parquet_file, target)
    
  10. Végül adja hozzá a main belépési pontot. Betölti a .env fájlt, meghívja transfer_tables a forrás- és célkapcsolati sztringeket, és kinyomtatja az eltelt időt.

    def main():
        load_dotenv()
        t_start = time.perf_counter()
    
        transfer_tables(
            source_conn_str=os.environ["SOURCE_CONNECTION_STRING"],
            dest_conn_str=os.environ["DEST_CONNECTION_STRING"],
            source_schema="SalesLT",
            dest_schema="dbo",
        )
    
        print(f"Total: {time.perf_counter() - t_start:.2f}s")
    
    
    if __name__ == "__main__":
        main()
    
  11. Mentés és bezárás main.py.

A kapcsolati sztringek mentése

  1. Nyissa meg a .gitignore fájlt, és adjon hozzá kizárást a fájlokhoz .env . A fájlnak hasonlónak kell lennie ehhez a példához. Mindenképpen mentse és zárja be, ha elkészült.

    # Python-generated files
    __pycache__/
    *.py[oc]
    build/
    dist/
    wheels/
    *.egg-info
    
    # Virtual environments
    .venv
    
    # Connection strings and secrets
    .env
    
  2. Az aktuális könyvtárban hozzon létre egy új fájlt..env

  3. A fájlban .env adjon hozzá bejegyzéseket a forrás- és célkapcsolati karakterláncokhoz. Cserélje le a helyőrző értékeket a tényleges kiszolgáló- és adatbázisnevekre.

    SOURCE_CONNECTION_STRING="Server=<source_server_name>;Database=<source_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive"
    DEST_CONNECTION_STRING="Server=<dest_server_name>;Database=<dest_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive"
    

    Jótanács

    Az itt használt kapcsolati sztring nagyban függ attól, hogy milyen TÍPUSÚ SQL-adatbázishoz csatlakozik. Ha Azure SQL Database-hez vagy SQL-adatbázishoz csatlakozik a Fabricben, használja az ODBC kapcsolati sztringet a kapcsolati sztringek lapról. Előfordulhat, hogy a forgatókönyvtől függően módosítania kell a hitelesítési típust. A kapcsolati sztringekről és azok szintaxisáról további információt a kapcsolati sztring szintaxisának hivatkozásában talál.

Jótanács

macOS rendszeren mind a ActiveDirectoryInteractive, mind a ActiveDirectoryDefault használható a Microsoft Entra-hitelesítéshez. ActiveDirectoryInteractive minden alkalommal, amikor futtatja a szkriptet, arra kéri, hogy jelentkezzen be. Az ismétlődő bejelentkezési kérések elkerülése érdekében jelentkezzen be egyszer az Azure CLI segítségével a az login parancs végrehajtásával, majd használja a ActiveDirectoryDefault, amely újrahasználja a gyorsítótárazott hitelesítő adatokat.

Használja az "uv run" parancsot a szkript futtatásához

  1. A korábban megnyitott terminálablakban vagy egy új terminálablakban futtassa a következő parancsot.

     uv run main.py
    

    A szkript befejeződésekor a következő várható kimenet jelenik meg.

    Found 12 SalesLT tables: Address, Customer, CustomerAddress, ...
    
    SalesLT.Address → SalesLT/Address.parquet: 450 rows downloaded in 0.15s (3,000 rows/sec)
    ...
    SalesLT/Address.parquet → dbo.[Address]: 450 rows uploaded in 0.10s (4,500 rows/sec) | verified: 450
    ...
    Total: 2.35s
    
  2. Csatlakozzon a céladatbázishoz az SQL Server Management Studio (SSMS) vagy a VS Code MSSQL-bővítménye használatával, és ellenőrizze, hogy a táblák és adatok sikeresen létrejöttek-e.

  3. Ha a szkriptet egy másik gépen szeretné üzembe helyezni, másolja az összes fájlt, kivéve a .venv mappát a másik gépre. A virtuális környezet az első futtatáskor újra létre lesz hozva.

A kód működése

Az alkalmazás három fázisban hajtja végre a teljes oda-vissza adatátvitelt:

  1. Letöltés: Csatlakozik a forrásadatbázishoz, beolvassa az oszlop metaadatait INFORMATION_SCHEMA.COLUMNS, létrehoz egy Apache Arrow-sémát, majd letölti az egyes táblákat egy helyi Parquet-fájlba.
  2. Bővítés (nem kötelező): Olyan horogot (enrich_parquet) biztosít, amelyben átalakításokat, származtatott oszlopokat vagy illesztéseket adhat hozzá feltöltés előtt.
  3. Feltöltés: Beolvassa az egyes Parquet-fájlokat kötegekben, újra létrehozza a táblát a céladatbázisban a Nyílséma metaadataiból létrehozott DDL használatával, majd nagy teljesítményű tömeges beszúráshoz használja cursor.bulkcopy() .

Következő lépés

Látogasson el az mssql-python illesztőprogram GitHub-adattárába további példákért, ötletek megosztásához, vagy problémák jelentéséhez.