Del via


Quickstart: Massekopier data mellem SQL-databaser i Fabric med mssql-python-driveren

I denne quickstart bruger du driveren mssql-python til at massekopiere data mellem SQL-databaser i Fabric. Applikationen downloader tabeller fra et kildedatabaseskema til lokale Parquet-filer ved hjælp af Apache Arrow og uploader dem derefter til en destinationsdatabase ved hjælp af højtydende bulkcopy metoden. Du kan bruge dette mønster til at migrere, replikere eller transformere data mellem SQL-databaser i Fabric.

Driveren mssql-python kræver ingen eksterne afhængigheder på Windows-maskiner. Driveren installerer alt, hvad den har brug for, med en enkelt pip installation, så du kan bruge den nyeste version af driveren til nye scripts uden at bryde andre scripts, som du ikke har tid til at opgradere og teste.

mssql-python dokumentation | mssql-python kildekode | Pakke (PyPI) | UV

Forudsætninger


Opret projektet, og kør koden

Opret et nyt projekt

  1. Åbn en kommandoprompt i din udviklingsmappe. Hvis du ikke har en, skal du oprette en ny mappe kaldet python, scriptsosv. Undgå mapper på dit OneDrive, synkroniseringen kan forstyrre administrationen af dit virtuelle miljø.

  2. Opret et nyt projekt med uv.

    uv init mssql-python-bcp-qs
    cd mssql-python-bcp-qs
    

Tilføj afhængigheder

I den samme mappe skal du installere pakkerne mssql-python, python-dotenv, og pyarrow .

uv add mssql-python python-dotenv pyarrow

Start Visual Studio Code

Kør følgende kommando i samme mappe.

code .

Opdater pyproject.toml

  1. pyproject.toml indeholder metadataene for dit projekt. Åbn filen i din foretrukne editor.

  2. Gennemgå indholdet af filen. Det burde ligne dette eksempel. Bemærk Python-versionen og afhængigheden for anvendelser >= til mssql-python at definere en minimumsversion. Hvis du foretrækker en nøjagtig version, så ændr versionsnummeret >===før til . De løste versioner af hver pakke gemmes derefter i uv.lock. Låsefilen sikrer, at udviklerne, der arbejder på projektet, bruger ensartede pakkeversioner. Det sikrer også, at det samme sæt pakkeversioner bruges, når din pakke distribueres til slutbrugerne. Du bør ikke redigere filen uv.lock .

    [project]
    name = "mssql-python-bcp-qs"
    version = "0.1.0"
    description = "Add your description here"
    readme = "README.md"
    requires-python = ">=3.11"
    dependencies = [
        "mssql-python>=1.4.0",
        "python-dotenv>=1.1.1",
        "pyarrow>=19.0.0",
    ]
    
  3. Opdater beskrivelsen, så den er mere beskrivende.

    description = "Bulk copies data between SQL databases using mssql-python and Apache Arrow"
    
  4. Gem og luk filen.

Opdater main.py

  1. Åbn filen med navnet main.py. Det burde ligne dette eksempel.

    def main():
        print("Hello from mssql-python-bcp-qs!")
    
    if __name__ == "__main__":
        main()
    
  2. Erstat indholdet af filen med følgende kodeblokke. Hver blok bygger videre på den forrige og bør tilføjes main.py i rækkefølge.

    Tips

    Hvis Visual Studio Code har problemer med at løse pakker, skal du opdatere fortolkeren for at bruge det virtuelle miljø.

  3. Øverst i main.py, tilføj importerne og konstanterne. Scriptet bruger mssql_python til databaseforbindelse, pyarrow til pyarrow.parquet håndtering af kolonnedata og Parquet-fil-I/O til python-dotenv indlæsning af forbindelsesstrenge fra en .env fil, samt et kompileret regex-mønster, der validerer SQL-identifikatorer for at forhindre injektion.

    """Round-trip: download tables from a source DB/schema to parquet, upload to a destination DB/schema."""
    
    import os
    import re
    import time
    from uuid import UUID
    
    import pyarrow as pa
    import pyarrow.parquet as pq
    from dotenv import load_dotenv
    import mssql_python
    
    BATCH_SIZE = 64_000
    _SAFE_IDENT = re.compile(r"^[A-Za-z0-9_]+$")
    
    
    def _validate_ident(name: str) -> str:
        if not _SAFE_IDENT.match(name):
            raise ValueError(f"Unsafe SQL identifier: {name!r}")
        return name
    
  4. Under importene tilføjes SQL-til-Arrow-typemappingen. Denne ordbog oversætter SQL Server-kolonnetyper til deres Apache Arrow-ækvivalenter, så datanøjagtigheden bevares, når man skriver til Parquet. De to hjælpefunktioner bygger nøjagtige SQL-typestrenge (for eksempel NVARCHAR(100) eller DECIMAL(18,2)) ud fra INFORMATION_SCHEMA metadata og løser den matchende Arrow-type for hver kolonne.

    _SQL_TO_ARROW = {
        "bit": pa.bool_(),
        "tinyint": pa.uint8(),
        "smallint": pa.int16(),
        "int": pa.int32(),
        "bigint": pa.int64(),
        "float": pa.float64(),
        "real": pa.float32(),
        "smallmoney": pa.decimal128(10, 4),
        "money": pa.decimal128(19, 4),
        "date": pa.date32(),
        "datetime": pa.timestamp("ms"),
        "datetime2": pa.timestamp("us"),
        "smalldatetime": pa.timestamp("s"),
        "uniqueidentifier": pa.string(),
        "xml": pa.string(),
        "image": pa.binary(),
        "binary": pa.binary(),
        "varbinary": pa.binary(),
        "timestamp": pa.binary(),
    }
    
    
    def _sql_type_str(data_type: str, max_length: int, precision: int, scale: int) -> str:
        """Build the exact SQL type string from INFORMATION_SCHEMA metadata."""
        dt = data_type.lower()
        if dt in ("char", "varchar", "nchar", "nvarchar", "binary", "varbinary"):
            length = "MAX" if max_length == -1 else str(max_length)
            return f"{dt.upper()}({length})"
        if dt in ("decimal", "numeric"):
            return f"{dt.upper()}({precision},{scale})"
        return dt.upper()
    
    
    def _arrow_type(sql_type: str, precision: int, scale: int) -> pa.DataType:
        sql_type = sql_type.lower()
        if sql_type in _SQL_TO_ARROW:
            return _SQL_TO_ARROW[sql_type]
        if sql_type in ("decimal", "numeric"):
            return pa.decimal128(precision, scale)
        if sql_type in ("char", "varchar", "nchar", "nvarchar", "text", "ntext", "sysname"):
            return pa.string()
        return pa.string()
    
    
    def _convert_value(v):
        """Convert a SQL value to an Arrow-compatible Python type."""
        if isinstance(v, UUID):
            return str(v)
        return v
    
  5. Tilføj funktionerne til skema-introspektion og DDL-generering. _get_arrow_schema forespørgsler INFORMATION_SCHEMA.COLUMNS ved hjælp af parameteriserede forespørgsler, bygger et Arrow-skema og gemmer den oprindelige SQL-type som feltmetadata, så destinationstabellen kan genskabes med nøjagtige kolonnedefinitioner. _create_table_ddl læser metadataene tilbage for at generere DROP/CREATE TABLE DDL. Typen timestamp (rowversion) ommappes til VARBINARY(8) , fordi den er auto-genereret og ikke direkte indsættbar.

    def _get_arrow_schema(cursor, schema_name: str, table_name: str) -> pa.Schema:
        """Build an Arrow schema from INFORMATION_SCHEMA.COLUMNS.
    
        Stores the original SQL type as field metadata so the round-trip
        CREATE TABLE can reproduce exact column definitions.
        """
        cursor.execute(
            "SELECT COLUMN_NAME, DATA_TYPE, "
            "COALESCE(CHARACTER_MAXIMUM_LENGTH, 0), "
            "COALESCE(NUMERIC_PRECISION, 0), "
            "COALESCE(NUMERIC_SCALE, 0), "
            "IS_NULLABLE "
            "FROM INFORMATION_SCHEMA.COLUMNS "
            "WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? "
            "ORDER BY ORDINAL_POSITION",
            (schema_name, table_name),
        )
        rows = cursor.fetchall()
        if not rows:
            raise ValueError(f"No columns found for {schema_name}.{table_name}")
        fields = []
        for col_name, data_type, max_len, precision, scale, nullable in rows:
            arrow_t = _arrow_type(data_type, precision, scale)
            sql_t = _sql_type_str(data_type, max_len, precision, scale)
            fields.append(
                pa.field(
                    col_name, arrow_t,
                    nullable=(nullable == "YES"),
                    metadata={"sql_type": sql_t},
                )
            )
        return pa.schema(fields)
    
    
    def _create_table_ddl(target: str, schema: pa.Schema) -> str:
        """Build DROP/CREATE TABLE DDL from Arrow schema with SQL type metadata."""
        col_defs = []
        for f in schema:
            sql_t = f.metadata[b"sql_type"].decode()
            # timestamp/rowversion is auto-generated and not insertable
            if sql_t == "TIMESTAMP":
                sql_t = "VARBINARY(8)"
            null = "" if f.nullable else " NOT NULL"
            col_defs.append(f"[{f.name}] {sql_t}{null}")
        col_defs_str = ",\n    ".join(col_defs)
        return (
            f"IF OBJECT_ID('{target}', 'U') IS NOT NULL DROP TABLE {target};\n"
            f"CREATE TABLE {target} (\n    {col_defs_str}\n);"
        )
    
  6. Tilføj downloadfunktionen. download_table læser rækker fra en kildetabel i batches ved hjælp af fetchmany, konverterer hver værdi til en Arrow-kompatibel Python-type, og skriver recordbatches inkrementelt til en Parquet-fil med pq.ParquetWriter. Denne tilgang undgår at indlæse hele tabellen i hukommelsen. Funktionen bruger to separate cursors: én til at læse kolonnemetadata og en anden til at streame dataene. Hvis bordet er tomt, vender det tilbage tidligt.

    def download_table(conn, schema_name: str, table_name: str, parquet_file: str) -> int:
        """Download a SQL table to a parquet file. Returns row count (0 if empty)."""
        _validate_ident(table_name)
        source = f"{schema_name}.[{table_name}]"
    
        with conn.cursor() as cursor:
            schema = _get_arrow_schema(cursor, schema_name, table_name)
    
        n_cols = len(schema)
        row_count = 0
        t0 = time.perf_counter()
    
        with conn.cursor() as cursor:
            cursor.execute(f"SELECT * FROM {source}")
            with pq.ParquetWriter(parquet_file, schema) as writer:
                while True:
                    rows = cursor.fetchmany(BATCH_SIZE)
                    if not rows:
                        break
                    columns = [[] for _ in range(n_cols)]
                    for row in rows:
                        for i in range(n_cols):
                            columns[i].append(_convert_value(row[i]))
                    arrays = [
                        pa.array(columns[i], type=schema.field(i).type)
                        for i in range(n_cols)
                    ]
                    batch = pa.record_batch(arrays, schema=schema)
                    writer.write_batch(batch)
                    row_count += len(rows)
    
        if row_count == 0:
            os.remove(parquet_file)
            return 0
    
        elapsed = time.perf_counter() - t0
        rate = f"{int(row_count / elapsed):,} rows/sec" if elapsed > 0 else "n/a"
        print(
            f"{schema_name}.{table_name} → {parquet_file}: {row_count:,} rows downloaded "
            f"in {elapsed:.2f}s ({rate})"
        )
        return row_count
    
  7. Tilføj berigelseskrogen. enrich_parquet er en pladsholder, hvor du kan tilføje transformationer, afledte kolonner eller joins til data, før de uploades. I denne quickstart er det en no-op, der returnerer filstien uændret.

    def enrich_parquet(parquet_file: str) -> str:
        """Enrich a parquet file before upload. Returns the (possibly new) file path."""
        # TODO: add transformations, derived columns, joins, etc.
        print(f"Enriching {parquet_file} (no-op)")
        return parquet_file
    
  8. Tilføj upload-funktionen. upload_parquet læser Arrow-skemaet fra Parquet-filen, genererer og udfører DROP/CREATE TABLE DDL for at forberede destinationen, læser derefter filen i batches og kalder cursor.bulkcopy() på højtydende bulkindsættelse. Muligheden table_lock=True forbedrer gennemstrømningen ved at minimere låskonkurrence. Når uploaden er færdig, kører funktionen en SELECT COUNT(*) for at verificere, at rækkeantallet stemmer overens.

    def upload_parquet(conn, parquet_file: str, target: str) -> int:
        """Upload a parquet file into a SQL table via BCP. Returns row count."""
        # ── Create target table from parquet schema ──
        pf_schema = pq.read_schema(parquet_file)
        with conn.cursor() as cursor:
            cursor.execute(_create_table_ddl(target, pf_schema))
        conn.commit()
    
        # ── Bulk insert ──
        uploaded = 0
        t0 = time.perf_counter()
        with pq.ParquetFile(parquet_file) as pf:
            with conn.cursor() as cursor:
                for batch in pf.iter_batches(batch_size=BATCH_SIZE):
                    rows = zip(*(col.to_pylist() for col in batch.columns))
                    cursor.bulkcopy(
                        target, rows, batch_size=BATCH_SIZE,
                        table_lock=True, timeout=3600,
                    )
                    uploaded += batch.num_rows
        conn.commit()
        elapsed = time.perf_counter() - t0
    
        # ── Verify ──
        with conn.cursor() as cursor:
            cursor.execute(f"SELECT COUNT(*) FROM {target}")
            count = cursor.fetchone()[0]
    
        rate = f"{int(uploaded / elapsed):,} rows/sec" if elapsed > 0 else "n/a"
        print(
            f"{parquet_file} → {target}: {uploaded:,} rows uploaded "
            f"in {elapsed:.2f}s ({rate}) "
            f"| verified: {count:,}"
        )
        return uploaded
    
  9. Tilføj orkestreringsfunktionen. transfer_tables binder de tre faser sammen. Den forbinder til kildedatabasen, opdager alle basetabeller i det givne skema via INFORMATION_SCHEMA.TABLES, downloader hver enkelt til en lokal Parquet-fil, kører berigelseskrogen, forbinder derefter til destinationsdatabasen og uploader hver fil.

    def transfer_tables(
        source_conn_str: str,
        dest_conn_str: str,
        source_schema: str,
        dest_schema: str,
    ) -> None:
        """Download all tables from source DB/schema to parquet, upload to dest DB/schema."""
        _validate_ident(source_schema)
        _validate_ident(dest_schema)
    
        parquet_dir = source_schema
        os.makedirs(parquet_dir, exist_ok=True)
    
        # ── Download from source ──
        with mssql_python.connect(source_conn_str) as src_conn:
            with src_conn.cursor() as cursor:
                cursor.execute(
                    "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES "
                    "WHERE TABLE_SCHEMA = ? AND TABLE_TYPE = 'BASE TABLE' "
                    "ORDER BY TABLE_NAME",
                    (source_schema,),
                )
                tables = [row[0] for row in cursor.fetchall()]
    
            print(f"Found {len(tables)} {source_schema} tables: {', '.join(tables)}\n")
    
            parquet_files = []
            for table_name in tables:
                parquet_file = os.path.join(parquet_dir, f"{table_name}.parquet")
                row_count = download_table(src_conn, source_schema, table_name, parquet_file)
                if row_count == 0:
                    print(f"{source_schema}.{table_name}: empty, skipping")
                else:
                    parquet_files.append((table_name, parquet_file))
    
        # ── Enrich parquet files ──
        enriched_files = []
        for table_name, parquet_file in parquet_files:
            enriched_file = enrich_parquet(parquet_file)
            enriched_files.append((table_name, enriched_file))
    
        # ── Upload to destination ──
        with mssql_python.connect(dest_conn_str) as dest_conn:
            for table_name, parquet_file in enriched_files:
                target = f"{dest_schema}.[{table_name}]"
                upload_parquet(dest_conn, parquet_file, target)
    
  10. Til sidst tilføj indgangspunktet main . Den indlæser filen .env , kalder transfer_tables med kilde- og destinationsforbindelsesstrengene og udskriver den samlede forløbne tid.

    def main():
        load_dotenv()
        t_start = time.perf_counter()
    
        transfer_tables(
            source_conn_str=os.environ["SOURCE_CONNECTION_STRING"],
            dest_conn_str=os.environ["DEST_CONNECTION_STRING"],
            source_schema="SalesLT",
            dest_schema="dbo",
        )
    
        print(f"Total: {time.perf_counter() - t_start:.2f}s")
    
    
    if __name__ == "__main__":
        main()
    
  11. Gem og luk main.py.

Gem forbindelsesstrengene

  1. Åbn filen, .gitignore og tilføj en udeladelse for .env filer. Din fil skal ligne dette eksempel. Sørg for at gemme og lukke den, når du er færdig.

    # Python-generated files
    __pycache__/
    *.py[oc]
    build/
    dist/
    wheels/
    *.egg-info
    
    # Virtual environments
    .venv
    
    # Connection strings and secrets
    .env
    
  2. I den aktuelle mappe skal du oprette en ny fil med navnet .env.

  3. Inden for .env filen tilføjes der poster for dine kilde- og destinationsforbindelser. Erstat pladsholderværdierne med dine faktiske server- og databasenavne.

    SOURCE_CONNECTION_STRING="Server=<source_server_name>;Database=<source_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive"
    DEST_CONNECTION_STRING="Server=<dest_server_name>;Database=<dest_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive"
    

    Tips

    For SQL-database i Fabric skal du bruge ODBC-forbindelsesstrengen fra fanen Forbindelsesstrenge uden DRIVER-oplysningerne . For mere information, se Connect to your SQL-database i Fabric.

Tips

På macOS fungerer begge ActiveDirectoryInteractive til ActiveDirectoryDefault Microsoft Entra-autentificering. ActiveDirectoryInteractive opfordrer dig til at logge ind hver gang, du kører scriptet. For at undgå gentagne login-prompts, log ind én gang via Azure CLI ved at køre az login, og brug ActiveDirectoryDefaultderefter , som genbruger den cachede legitimationsoplysninger.

Brug uv run til at udføre scriptet

  1. I terminalvinduet fra før, eller et nyt terminalvindue, der er åbent i den samme mappe, skal du køre følgende kommando.

     uv run main.py
    

    Her er det forventede output, når scriptet er færdigt.

    Found 12 SalesLT tables: Address, Customer, CustomerAddress, ...
    
    SalesLT.Address → SalesLT/Address.parquet: 450 rows downloaded in 0.15s (3,000 rows/sec)
    ...
    SalesLT/Address.parquet → dbo.[Address]: 450 rows uploaded in 0.10s (4,500 rows/sec) | verified: 450
    ...
    Total: 2.35s
    
  2. Forbind til destinationsdatabasen og verificér, at tabellerne og dataene er oprettet med succes. For flere muligheder for, hvordan du forbinder, se Connect to your SQL-database i Fabric.

  3. For at deploye dit script til en anden maskine, kopier alle filer undtagen .venv mappen til den anden maskine. Det virtuelle miljø genskabes ved første gennemspilning.

Hvordan koden fungerer

Applikationen udfører en fuld rundrejse dataoverførsel i tre faser:

  1. Download: Forbinder til kildedatabasen, læser kolonnemetadata fra INFORMATION_SCHEMA.COLUMNS, bygger et Apache Arrow-skema og downloader derefter hver tabel i batches til en lokal Parquet-fil ved hjælp af pq.ParquetWriter.
  2. Berik (valgfrit): Giver en hook (enrich_parquet), hvor du kan tilføje transformationer, afledte kolonner eller joins før upload.
  3. Upload: Læser hver Parquet-fil i batches, genskaber tabellen i destinationsdatabasen ved hjælp af DDL genereret fra Arrow-skemametadata, og bruger cursor.bulkcopy() derefter til højtydende bulk-insert.

Næste trin

Besøg driverens mssql-python GitHub-lager for at få flere eksempler for at bidrage med ideer eller rapportere problemer.