Delen via


Quickstart: Bulksgewijs kopiëren met het mssql-python-stuurprogramma voor Python

In deze quickstart gebruikt u het mssql-python stuurprogramma om gegevens bulksgewijs tussen databases te kopiëren. De toepassing downloadt tabellen van een brondatabaseschema naar lokale Parquet-bestanden met behulp van Apache Arrow en uploadt ze vervolgens naar een doeldatabase met behulp van de krachtige bulkcopy methode. U kunt dit patroon gebruiken om gegevens te migreren, repliceren of transformeren tussen SQL Server, Azure SQL Database en SQL Database in Fabric.

Het mssql-python stuurprogramma vereist geen externe afhankelijkheden op Windows-computers. Het stuurprogramma installeert alles wat het nodig heeft met één pip installatie, zodat u de nieuwste versie van het stuurprogramma voor nieuwe scripts kunt gebruiken zonder dat andere scripts die u niet hoeft te upgraden en te testen, worden onderbroken.

documentatie | mssql-python-broncode | Pakket (PyPI) | Uv

Vereiste voorwaarden

  • Python 3

    • Als u Nog geen Python hebt, installeert u de Python-runtime en pip-pakketbeheer vanuit python.org.

    • Wilt u niet uw eigen omgeving gebruiken? Open als een devcontainer met GitHub Codespaces.

  • Visual Studio Code met de volgende extensies:

  • Azure Command-Line Interface (CLI) voor verificatie zonder wachtwoord in macOS en Linux.

  • Als u dat nog niet hebt uv, volgt u de installatie-instructies.

  • Een brondatabase op SQL Server, Azure SQL Database of SQL-database in Fabric met het AdventureWorks2025 voorbeeldschema en een geldige verbindingsreeks.

  • Een doeldatabase in SQL Server, Azure SQL Database of SQL Database in Fabric met een geldige verbindingsreeks. De gebruiker moet gemachtigd zijn om tabellen te maken en naar tabellen te schrijven. Als u geen tweede database hebt, kunt u de doelverbindingsreeks wijzigen zodat deze verwijst naar dezelfde database en een ander schema voor de doeltabellen gebruikt.

  • Installeer eenmalige vereisten voor het besturingssysteem.

    apk add libtool krb5-libs krb5-dev
    

Een SQL-database maken

Voor deze quickstart is het Lightweight-schema AdventureWorks2025 vereist als brondatabase.

Het project maken en de code uitvoeren

  1. Een nieuw project maken
  2. Afhankelijkheden toevoegen
  3. Visual Studio Code starten
  4. Pyproject.toml bijwerken
  5. Main.py bijwerken
  6. De verbindingsreeksen opslaan
  7. Uv-uitvoering gebruiken om het script uit te voeren

Een nieuw project maken

  1. Open een opdrachtprompt in uw ontwikkelingsmap. Als u geen map hebt, maakt u een nieuwe map met de naam python, scriptsenzovoort. Vermijd mappen in uw OneDrive, dan kan de synchronisatie het beheer van uw virtuele omgeving verstoren.

  2. Maak een nieuw project met uv.

    uv init mssql-python-bcp-qs
    cd mssql-python-bcp-qs
    

Afhankelijkheden toevoegen

Installeer in dezelfde map de mssql-python, python-dotenven pyarrow pakketten.

uv add mssql-python python-dotenv pyarrow

Visual Studio Code starten

Voer in dezelfde map de volgende opdracht uit.

code .

Pyproject.toml bijwerken

  1. Het pyproject.toml bevat de metagegevens voor uw project. Open het bestand in uw favoriete editor.

  2. Controleer de inhoud van het bestand. Deze moet vergelijkbaar zijn met dit voorbeeld. Noteer de Python-versie en -afhankelijkheid voor mssql-python gebruik >= om een minimale versie te definiëren. Als u de voorkeur geeft aan een exacte versie, wijzigt u de >= voor het versienummer in ==. De opgeloste versies van elk pakket worden vervolgens opgeslagen in de uv.lock. Het lockfile zorgt ervoor dat ontwikkelaars die aan het project werken, consistente pakketversies gebruiken. Het zorgt er ook voor dat dezelfde set pakketversies wordt gebruikt bij het distribueren van uw pakket naar eindgebruikers. U moet het uv.lock bestand niet bewerken.

    [project]
    name = "mssql-python-bcp-qs"
    version = "0.1.0"
    description = "Add your description here"
    readme = "README.md"
    requires-python = ">=3.11"
    dependencies = [
        "mssql-python>=1.4.0",
        "python-dotenv>=1.1.1",
        "pyarrow>=19.0.0",
    ]
    
  3. Werk de beschrijving bij zodat deze meer beschrijvend is.

    description = "Bulk copies data between SQL databases using mssql-python and Apache Arrow"
    
  4. Sla het bestand op en sluit het.

Main.py bijwerken

  1. Open het bestand met de naam main.py. Deze moet vergelijkbaar zijn met dit voorbeeld.

    def main():
        print("Hello from mssql-python-bcp-qs!")
    
    if __name__ == "__main__":
        main()
    
  2. Vervang de inhoud van main.py met de volgende codeblokken. Elk blok bouwt voort op de vorige en moet op main.py volgorde worden geplaatst.

    Aanbeveling

    Als Visual Studio Code problemen ondervindt bij het oplossen van pakketten, moet u de interpreter bijwerken om de virtuele omgeving te gebruiken.

  3. Voeg bovenaan main.pyde import- en constanten toe. Het script gebruikt mssql_python voor databaseconnectiviteit, pyarrow en pyarrow.parquet voor het verwerken van kolomgegevens en Parquet-bestands-I/O, én python-dotenv voor het laden van verbindingsreeksen uit een .env bestand, en een gecompileerd regex-patroon dat SQL-identifiers (id's) valideert om injectie te voorkomen.

    """Round-trip: download tables from a source DB/schema to parquet, upload to a destination DB/schema."""
    
    import os
    import re
    import time
    from uuid import UUID
    
    import pyarrow as pa
    import pyarrow.parquet as pq
    from dotenv import load_dotenv
    import mssql_python
    
    BATCH_SIZE = 64_000
    _SAFE_IDENT = re.compile(r"^[A-Za-z0-9_]+$")
    
    
    def _validate_ident(name: str) -> str:
        if not _SAFE_IDENT.match(name):
            raise ValueError(f"Unsafe SQL identifier: {name!r}")
        return name
    
  4. Voeg onder de imports de SQL-naar-Arrow type mapping toe. Deze woordenlijst vertaalt SQL Server-kolomtypen naar hun Apache Arrow-equivalenten, zodat de betrouwbaarheid van gegevens behouden blijft bij het schrijven naar Parquet. De twee helperfuncties bouwen exacte SQL-typetekenreeksen (bijvoorbeeld NVARCHAR(100) of DECIMAL(18,2)) op basis van INFORMATION_SCHEMA metagegevens en lossen het overeenkomende pijltype voor elke kolom op.

    _SQL_TO_ARROW = {
        "bit": pa.bool_(),
        "tinyint": pa.uint8(),
        "smallint": pa.int16(),
        "int": pa.int32(),
        "bigint": pa.int64(),
        "float": pa.float64(),
        "real": pa.float32(),
        "smallmoney": pa.decimal128(10, 4),
        "money": pa.decimal128(19, 4),
        "date": pa.date32(),
        "datetime": pa.timestamp("ms"),
        "datetime2": pa.timestamp("us"),
        "smalldatetime": pa.timestamp("s"),
        "uniqueidentifier": pa.string(),
        "xml": pa.string(),
        "image": pa.binary(),
        "binary": pa.binary(),
        "varbinary": pa.binary(),
        "timestamp": pa.binary(),
    }
    
    
    def _sql_type_str(data_type: str, max_length: int, precision: int, scale: int) -> str:
        """Build the exact SQL type string from INFORMATION_SCHEMA metadata."""
        dt = data_type.lower()
        if dt in ("char", "varchar", "nchar", "nvarchar", "binary", "varbinary"):
            length = "MAX" if max_length == -1 else str(max_length)
            return f"{dt.upper()}({length})"
        if dt in ("decimal", "numeric"):
            return f"{dt.upper()}({precision},{scale})"
        return dt.upper()
    
    
    def _arrow_type(sql_type: str, precision: int, scale: int) -> pa.DataType:
        sql_type = sql_type.lower()
        if sql_type in _SQL_TO_ARROW:
            return _SQL_TO_ARROW[sql_type]
        if sql_type in ("decimal", "numeric"):
            return pa.decimal128(precision, scale)
        if sql_type in ("char", "varchar", "nchar", "nvarchar", "text", "ntext", "sysname"):
            return pa.string()
        return pa.string()
    
    
    def _convert_value(v):
        """Convert a SQL value to an Arrow-compatible Python type."""
        if isinstance(v, UUID):
            return str(v)
        return v
    
  5. Voeg de functies schema-introspectie en DDL-generatie toe. _get_arrow_schema INFORMATION_SCHEMA.COLUMNS-query’s met behulp van geparameteriseerde query's, bouwt een Arrow schema en slaat het oorspronkelijke SQL-type op als veldmetagegevens, zodat de doeltabel opnieuw kan worden gemaakt met exacte kolomdefinities. _create_table_ddl leest die metagegevens terug om DDL te genereren DROP/CREATE TABLE . Het timestamp-type (rowversion) wordt opnieuw gemapt naar VARBINARY(8) omdat het automatisch wordt gegenereerd en niet kan worden ingevoegd.

    def _get_arrow_schema(cursor, schema_name: str, table_name: str) -> pa.Schema:
        """Build an Arrow schema from INFORMATION_SCHEMA.COLUMNS.
    
        Stores the original SQL type as field metadata so the round-trip
        CREATE TABLE can reproduce exact column definitions.
        """
        cursor.execute(
            "SELECT COLUMN_NAME, DATA_TYPE, "
            "COALESCE(CHARACTER_MAXIMUM_LENGTH, 0), "
            "COALESCE(NUMERIC_PRECISION, 0), "
            "COALESCE(NUMERIC_SCALE, 0), "
            "IS_NULLABLE "
            "FROM INFORMATION_SCHEMA.COLUMNS "
            "WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? "
            "ORDER BY ORDINAL_POSITION",
            (schema_name, table_name),
        )
        rows = cursor.fetchall()
        if not rows:
            raise ValueError(f"No columns found for {schema_name}.{table_name}")
        fields = []
        for col_name, data_type, max_len, precision, scale, nullable in rows:
            arrow_t = _arrow_type(data_type, precision, scale)
            sql_t = _sql_type_str(data_type, max_len, precision, scale)
            fields.append(
                pa.field(
                    col_name, arrow_t,
                    nullable=(nullable == "YES"),
                    metadata={"sql_type": sql_t},
                )
            )
        return pa.schema(fields)
    
    
    def _create_table_ddl(target: str, schema: pa.Schema) -> str:
        """Build DROP/CREATE TABLE DDL from Arrow schema with SQL type metadata."""
        col_defs = []
        for f in schema:
            sql_t = f.metadata[b"sql_type"].decode()
            # timestamp/rowversion is auto-generated and not insertable
            if sql_t == "TIMESTAMP":
                sql_t = "VARBINARY(8)"
            null = "" if f.nullable else " NOT NULL"
            col_defs.append(f"[{f.name}] {sql_t}{null}")
        col_defs_str = ",\n    ".join(col_defs)
        return (
            f"IF OBJECT_ID('{target}', 'U') IS NOT NULL DROP TABLE {target};\n"
            f"CREATE TABLE {target} (\n    {col_defs_str}\n);"
        )
    
  6. Voeg de downloadfunctie toe. download_table streamt rijen uit een brontabel in batches met behulp van fetchmany(), converteert elke waarde naar een python-type dat compatibel is met pijlen en schrijft recordbatches incrementeel naar een lokaal Parquet-bestand met behulp van ParquetWriter. Hierdoor blijft geheugen gebonden, ongeacht de tabelgrootte. De functie maakt gebruik van twee afzonderlijke cursors: een voor het lezen van kolommetagegevens en een andere om de gegevens te streamen.

    def download_table(conn, schema_name: str, table_name: str, parquet_file: str) -> int:
        """Download a SQL table to a parquet file. Returns row count (0 if empty)."""
        _validate_ident(table_name)
        source = f"{schema_name}.[{table_name}]"
    
        with conn.cursor() as cursor:
            schema = _get_arrow_schema(cursor, schema_name, table_name)
    
        n_cols = len(schema)
        row_count = 0
        t0 = time.perf_counter()
    
        with conn.cursor() as cursor:
            cursor.execute(f"SELECT * FROM {source}")
            writer = None
            try:
                while True:
                    rows = cursor.fetchmany(BATCH_SIZE)
                    if not rows:
                        break
                    columns = [[] for _ in range(n_cols)]
                    for row in rows:
                        for i in range(n_cols):
                            columns[i].append(_convert_value(row[i]))
                    arrays = [
                        pa.array(columns[i], type=schema.field(i).type)
                        for i in range(n_cols)
                    ]
                    batch = pa.record_batch(arrays, schema=schema)
                    if writer is None:
                        writer = pq.ParquetWriter(parquet_file, schema)
                    writer.write_batch(batch)
                    row_count += len(rows)
            finally:
                if writer is not None:
                    writer.close()
    
        if row_count == 0:
            return 0
    
        elapsed = time.perf_counter() - t0
        rate = f"{int(row_count / elapsed):,} rows/sec" if elapsed > 0 else "n/a"
        print(
            f"{schema_name}.{table_name} → {parquet_file}: {row_count:,} rows downloaded "
            f"in {elapsed:.2f}s ({rate})"
        )
        return row_count
    
  7. Voeg de verrijkingshook toe. enrich_parquet is een tijdelijke aanduiding waar u transformaties, afgeleide kolommen of joins kunt toevoegen aan gegevens voordat deze worden geüpload. In deze snelstartgids is er sprake van een no-op die het bestandspad ongewijzigd teruggeeft.

    def enrich_parquet(parquet_file: str) -> str:
        """Enrich a parquet file before upload. Returns the (possibly new) file path."""
        # TODO: add transformations, derived columns, joins, etc.
        print(f"Enriching {parquet_file} (no-op)")
        return parquet_file
    
  8. Voeg de uploadfunctie toe. upload_parquet leest het Arrow-schema uit het Parquet-bestand, genereert en voert DROP/CREATE TABLE DDL uit om de bestemming voor te bereiden. Vervolgens wordt het bestand in batches gelezen en wordt cursor.bulkcopy() aangeroepen voor een bulkinvoer met hoge prestaties. De table_lock=True optie verbetert de doorvoer door vergrendelingsconflicten te minimaliseren. Nadat het uploaden is voltooid, wordt een SELECT COUNT(*) functie uitgevoerd om te controleren of het aantal rijen overeenkomt.

    def upload_parquet(conn, parquet_file: str, target: str) -> int:
        """Upload a parquet file into a SQL table via BCP. Returns row count."""
        # ── Create target table from parquet schema ──
        pf_schema = pq.read_schema(parquet_file)
        with conn.cursor() as cursor:
            cursor.execute(_create_table_ddl(target, pf_schema))
        conn.commit()
    
        # ── Bulk insert ──
        uploaded = 0
        t0 = time.perf_counter()
        with pq.ParquetFile(parquet_file) as pf:
            with conn.cursor() as cursor:
                for batch in pf.iter_batches(batch_size=BATCH_SIZE):
                    rows = zip(*(col.to_pylist() for col in batch.columns))
                    cursor.bulkcopy(
                        target, rows, batch_size=BATCH_SIZE,
                        table_lock=True, timeout=3600,
                    )
                    uploaded += batch.num_rows
        elapsed = time.perf_counter() - t0
    
        # ── Verify ──
        with conn.cursor() as cursor:
            cursor.execute(f"SELECT COUNT(*) FROM {target}")
            count = cursor.fetchone()[0]
    
        rate = f"{int(uploaded / elapsed):,} rows/sec" if elapsed > 0 else "n/a"
        print(
            f"{parquet_file} → {target}: {uploaded:,} rows uploaded "
            f"in {elapsed:.2f}s ({rate}) "
            f"| verified: {count:,}"
        )
        return uploaded
    
  9. Voeg de orchestratiefunctie toe. transfer_tables koppelt de drie fasen aan elkaar. Het maakt verbinding met de brondatabase, detecteert alle basistabellen in het opgegeven schema via INFORMATION_SCHEMA.TABLES, downloadt elke tabel naar een lokaal Parquet-bestand, voert de verrijkingshook uit, maakt vervolgens verbinding met de doeldatabase en uploadt elk bestand.

    def transfer_tables(
        source_conn_str: str,
        dest_conn_str: str,
        source_schema: str,
        dest_schema: str,
    ) -> None:
        """Download all tables from source DB/schema to parquet, upload to dest DB/schema."""
        _validate_ident(source_schema)
        _validate_ident(dest_schema)
    
        parquet_dir = source_schema
        os.makedirs(parquet_dir, exist_ok=True)
    
        # ── Download from source ──
        with mssql_python.connect(source_conn_str) as src_conn:
            with src_conn.cursor() as cursor:
                cursor.execute(
                    "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES "
                    "WHERE TABLE_SCHEMA = ? AND TABLE_TYPE = 'BASE TABLE' "
                    "ORDER BY TABLE_NAME",
                    (source_schema,),
                )
                tables = [row[0] for row in cursor.fetchall()]
    
            print(f"Found {len(tables)} {source_schema} tables: {', '.join(tables)}\n")
    
            parquet_files = []
            for table_name in tables:
                parquet_file = os.path.join(parquet_dir, f"{table_name}.parquet")
                row_count = download_table(src_conn, source_schema, table_name, parquet_file)
                if row_count == 0:
                    print(f"{source_schema}.{table_name}: empty, skipping")
                else:
                    parquet_files.append((table_name, parquet_file))
    
        # ── Enrich parquet files ──
        enriched = []
        for table_name, parquet_file in parquet_files:
            enriched.append((table_name, enrich_parquet(parquet_file)))
    
        # ── Upload to destination ──
        with mssql_python.connect(dest_conn_str) as dest_conn:
            for table_name, parquet_file in enriched:
                target = f"{dest_schema}.[{table_name}]"
                upload_parquet(dest_conn, parquet_file, target)
    
  10. Voeg tot slot het main toegangspunt toe. Het laadt het bestand .env, roept transfer_tables aan met de verbindingsreeksen voor de bron en het doel, en drukt de totale verstreken tijd af.

    def main():
        load_dotenv()
        t_start = time.perf_counter()
    
        transfer_tables(
            source_conn_str=os.environ["SOURCE_CONNECTION_STRING"],
            dest_conn_str=os.environ["DEST_CONNECTION_STRING"],
            source_schema="SalesLT",
            dest_schema="dbo",
        )
    
        print(f"Total: {time.perf_counter() - t_start:.2f}s")
    
    
    if __name__ == "__main__":
        main()
    
  11. Opslaan en sluiten main.py.

De verbindingsreeksen opslaan

  1. Open het .gitignore bestand en voeg een uitsluiting toe voor .env bestanden. Het bestand moet er ongeveer uitzien als in dit voorbeeld. Zorg ervoor dat u deze opslaat en sluit wanneer u klaar bent.

    # Python-generated files
    __pycache__/
    *.py[oc]
    build/
    dist/
    wheels/
    *.egg-info
    
    # Virtual environments
    .venv
    
    # Connection strings and secrets
    .env
    
  2. Maak in de huidige map een nieuw bestand met de naam .env.

  3. Voeg in het .env bestand vermeldingen toe voor uw bron- en doelverbindingsreeksen. Vervang de tijdelijke aanduidingen door de werkelijke server- en databasenamen.

    SOURCE_CONNECTION_STRING="Server=<source_server_name>;Database=<source_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive"
    DEST_CONNECTION_STRING="Server=<dest_server_name>;Database=<dest_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive"
    

    Aanbeveling

    De hier gebruikte verbindingsreeks is grotendeels afhankelijk van het type SQL-database waarmee u verbinding maakt. Als u verbinding maakt met een Azure SQL Database of een SQL-database in Fabric, gebruikt u de ODBC-verbindingsreeks op het tabblad Verbindingsreeksen. Mogelijk moet u het verificatietype aanpassen, afhankelijk van uw scenario. Zie de naslaginformatie over de syntaxis van de verbindingsreeks voor meer informatie over verbindingsreeksen en de bijbehorende syntaxis.

Aanbeveling

In macOS werken beide ActiveDirectoryInteractive en ActiveDirectoryDefault voor Microsoft Entra-verificatie. ActiveDirectoryInteractive u wordt gevraagd u aan te melden telkens wanneer u het script uitvoert. Als u herhaalde aanmeldingsprompts wilt voorkomen, meldt u zich eenmaal aan via de Azure CLI door uit te voeren az loginen vervolgens te gebruiken ActiveDirectoryDefault, waarmee de referenties in de cache opnieuw worden gebruikt.

Gebruik uv run om het script uit te voeren

  1. Voer in het terminalvenster van vóór, of een nieuw terminalvenster dat is geopend in dezelfde map, de volgende opdracht uit.

     uv run main.py
    

    Dit is de verwachte uitvoer wanneer het script is voltooid.

    Found 12 SalesLT tables: Address, Customer, CustomerAddress, ...
    
    SalesLT.Address → SalesLT/Address.parquet: 450 rows downloaded in 0.15s (3,000 rows/sec)
    ...
    SalesLT/Address.parquet → dbo.[Address]: 450 rows uploaded in 0.10s (4,500 rows/sec) | verified: 450
    ...
    Total: 2.35s
    
  2. Maak verbinding met de doeldatabase met behulp van SQL Server Management Studio (SSMS) of de MSSQL-extensie voor VS Code en controleer of de tabellen en gegevens zijn gemaakt.

  3. Als u uw script op een andere computer wilt implementeren, kopieert u alle bestanden, met uitzondering van de .venv map naar de andere computer. De virtuele omgeving wordt opnieuw gemaakt met de eerste uitvoering.

Hoe de code werkt

De toepassing voert een volledige retourgegevensoverdracht uit in drie fasen:

  1. Download: Maakt verbinding met de brondatabase, leest kolommetagegevens van INFORMATION_SCHEMA.COLUMNS, bouwt een Apache Arrow-schema en downloadt vervolgens elke tabel in een lokaal Parquet-bestand.
  2. Verrijken (optioneel): Biedt een haak (enrich_parquet) waar u transformaties, afgeleide kolommen of joins kunt toevoegen voordat u uploadt.
  3. Uploaden: leest elk Parquet-bestand in batches, maakt de tabel opnieuw in de doeldatabase met behulp van DDL die is gegenereerd op basis van metagegevens van het pijlschema en gebruikt cursor.bulkcopy() vervolgens voor bulksgewijs invoegen met hoge prestaties.

Volgende stap

Ga naar de GitHub-opslagplaats van het mssql-python stuurprogramma voor meer voorbeelden, om ideeën bij te dragen of problemen te rapporteren.