Partager via


Démarrage rapide : Copie en bloc avec le pilote mssql-python pour Python

Dans ce guide rapide de démarrage, vous utilisez le mssql-python pilote pour copier en bloc des données entre des bases de données. L’application télécharge des tables à partir d’un schéma de base de données source vers des fichiers Parquet locaux à l’aide d’Apache Arrow, puis les charge dans une base de données de destination à l’aide de la méthode hautes performances bulkcopy . Vous pouvez utiliser ce modèle pour migrer, répliquer ou transformer des données entre SQL Server, Azure SQL Database et SQL Database dans Fabric.

Le mssql-python pilote ne nécessite aucune dépendance externe sur les machines Windows. Le pilote installe tout ce dont il a besoin avec une seule pip installation, ce qui vous permet d’utiliser la dernière version du pilote pour les nouveaux scripts sans interrompre d’autres scripts que vous n’avez pas le temps de mettre à niveau et de tester.

documentation | code | Package (PyPI) | Uv

Prerequisites

  • Python 3

    • Si vous n’avez pas encore Python, installez le runtime Python et le gestionnaire de package pip à partir de python.org.

    • Ne souhaitez pas utiliser votre propre environnement ? Ouvrez en tant que devcontainer à l’aide de GitHub Codespaces.

  • Visual Studio Code avec les extensions suivantes :

  • Azure Command-Line Interface (CLI) pour l’authentification sans mot de passe sur macOS et Linux.

  • Si vous n’avez uvpas encore, suivez les instructions d’installation.

  • Base de données source sur SQL Server, Azure SQL Database ou base de données SQL dans Fabric avec l’exemple AdventureWorks2025 de schéma et une chaîne de connexion valide.

  • Base de données de destination sur SQL Server, Azure SQL Database ou base de données SQL dans Fabric avec une chaîne de connexion valide. L’utilisateur doit avoir l’autorisation de créer et d’écrire dans des tables. Si vous n’avez pas de deuxième base de données, vous pouvez modifier la chaîne de connexion de destination pour qu’elle pointe vers la même base de données et utiliser un schéma différent pour les tables de destination.

  • Installez les prérequis ponctuels spécifiques au système d'exploitation.

    apk add libtool krb5-libs krb5-dev
    

Créer une base de données SQL

Ce guide de démarrage rapide nécessite le schéma AdventureWorks2025 Lightweight comme base de données source.

Créer le projet et exécuter le code

  1. Créer un projet
  2. Ajouter des dépendances
  3. Lancer Visual Studio Code
  4. Mettre à jour pyproject.toml
  5. Mettre à jour main.py
  6. Enregistrer les chaînes de connexion
  7. Utilisez uv run pour exécuter le script

Créer un projet

  1. Ouvrez une invite de commandes dans votre répertoire de développement. Si vous n’en avez pas, créez un répertoire appelé python, scriptsetc. Évitez les dossiers sur votre OneDrive, la synchronisation peut interférer avec la gestion de votre environnement virtuel.

  2. Créez un projet avec uv.

    uv init mssql-python-bcp-qs
    cd mssql-python-bcp-qs
    

Ajout de dépendances

Dans le même répertoire, installez les packages mssql-python, python-dotenv et pyarrow.

uv add mssql-python python-dotenv pyarrow

Lancer Visual Studio Code

Dans le même répertoire, exécutez la commande suivante.

code .

Mettre à jour pyproject.toml

  1. Pyproject.toml contient les métadonnées de votre projet. Ouvrez le fichier dans votre éditeur favori.

  2. Passez en revue le contenu du fichier. Il doit être similaire à cet exemple. Notez la version et la dépendance Python car mssql-python utilise >= pour définir une version minimale. Si vous préférez une version exacte, changez le >= avant le numéro de version en ==. Les versions résolues de chaque package sont ensuite stockées dans uv.lock. Le fichier de verrouillage garantit que les développeurs travaillant sur le projet utilisent des versions de package cohérentes. Il garantit également que le même ensemble de versions de package est utilisé lors de la distribution de votre package aux utilisateurs finaux. Vous ne devez pas modifier le uv.lock fichier.

    [project]
    name = "mssql-python-bcp-qs"
    version = "0.1.0"
    description = "Add your description here"
    readme = "README.md"
    requires-python = ">=3.11"
    dependencies = [
        "mssql-python>=1.4.0",
        "python-dotenv>=1.1.1",
        "pyarrow>=19.0.0",
    ]
    
  3. Mettez à jour la description pour être plus descriptive.

    description = "Bulk copies data between SQL databases using mssql-python and Apache Arrow"
    
  4. Enregistrez et fermez le fichier.

Mettre à jour main.py

  1. Ouvrez le fichier nommé main.py. Il doit être similaire à cet exemple.

    def main():
        print("Hello from mssql-python-bcp-qs!")
    
    if __name__ == "__main__":
        main()
    
  2. Remplacez le contenu de main.py par les blocs de code suivants. Chaque bloc s'appuie sur le précédent et doit être placé dans l'ordre main.py.

    Conseil / Astuce

    Si Visual Studio Code rencontre des difficultés pour résoudre les packages, vous devez mettre à jour l’interpréteur pour utiliser l’environnement virtuel.

  3. En haut de main.py, ajoutez les importations et les constantes. Le script utilise mssql_python pour la connectivité de base de données et pyarrowpyarrow.parquet pour la gestion des données en colonnes et les E/S de fichier Parquet, python-dotenv pour le chargement de chaînes de connexion à partir d’un .env fichier et un modèle regex compilé qui valide les identificateurs SQL pour empêcher l’injection.

    """Round-trip: download tables from a source DB/schema to parquet, upload to a destination DB/schema."""
    
    import os
    import re
    import time
    from uuid import UUID
    
    import pyarrow as pa
    import pyarrow.parquet as pq
    from dotenv import load_dotenv
    import mssql_python
    
    BATCH_SIZE = 64_000
    _SAFE_IDENT = re.compile(r"^[A-Za-z0-9_]+$")
    
    
    def _validate_ident(name: str) -> str:
        if not _SAFE_IDENT.match(name):
            raise ValueError(f"Unsafe SQL identifier: {name!r}")
        return name
    
  4. En dessous des importations, ajoutez le mappage de type SQL-to-Arrow. Ce dictionnaire traduit les types de colonnes SQL Server en leurs équivalents Apache Arrow afin que la fidélité des données soit conservée lors de l’écriture dans Parquet. Les deux fonctions d’assistance créent des chaînes de type SQL exactes (par exemple, NVARCHAR(100) ou DECIMAL(18,2)) à partir de INFORMATION_SCHEMA métadonnées et résolvent le type de flèche correspondant pour chaque colonne.

    _SQL_TO_ARROW = {
        "bit": pa.bool_(),
        "tinyint": pa.uint8(),
        "smallint": pa.int16(),
        "int": pa.int32(),
        "bigint": pa.int64(),
        "float": pa.float64(),
        "real": pa.float32(),
        "smallmoney": pa.decimal128(10, 4),
        "money": pa.decimal128(19, 4),
        "date": pa.date32(),
        "datetime": pa.timestamp("ms"),
        "datetime2": pa.timestamp("us"),
        "smalldatetime": pa.timestamp("s"),
        "uniqueidentifier": pa.string(),
        "xml": pa.string(),
        "image": pa.binary(),
        "binary": pa.binary(),
        "varbinary": pa.binary(),
        "timestamp": pa.binary(),
    }
    
    
    def _sql_type_str(data_type: str, max_length: int, precision: int, scale: int) -> str:
        """Build the exact SQL type string from INFORMATION_SCHEMA metadata."""
        dt = data_type.lower()
        if dt in ("char", "varchar", "nchar", "nvarchar", "binary", "varbinary"):
            length = "MAX" if max_length == -1 else str(max_length)
            return f"{dt.upper()}({length})"
        if dt in ("decimal", "numeric"):
            return f"{dt.upper()}({precision},{scale})"
        return dt.upper()
    
    
    def _arrow_type(sql_type: str, precision: int, scale: int) -> pa.DataType:
        sql_type = sql_type.lower()
        if sql_type in _SQL_TO_ARROW:
            return _SQL_TO_ARROW[sql_type]
        if sql_type in ("decimal", "numeric"):
            return pa.decimal128(precision, scale)
        if sql_type in ("char", "varchar", "nchar", "nvarchar", "text", "ntext", "sysname"):
            return pa.string()
        return pa.string()
    
    
    def _convert_value(v):
        """Convert a SQL value to an Arrow-compatible Python type."""
        if isinstance(v, UUID):
            return str(v)
        return v
    
  5. Ajoutez les fonctions d’introspection de schéma et de génération DDL. _get_arrow_schema requêtes INFORMATION_SCHEMA.COLUMNS utilisant des requêtes paramétrables, génère un schéma de flèche et stocke le type SQL d’origine en tant que métadonnées de champ afin que la table de destination puisse être recréée avec des définitions de colonnes exactes. _create_table_ddl lit ces métadonnées pour générer DROP/CREATE TABLE DDL. Le type timestamp (rowversion) est remappé en VARBINARY(8) parce qu'il est généré automatiquement et ne peut pas être inséré directement.

    def _get_arrow_schema(cursor, schema_name: str, table_name: str) -> pa.Schema:
        """Build an Arrow schema from INFORMATION_SCHEMA.COLUMNS.
    
        Stores the original SQL type as field metadata so the round-trip
        CREATE TABLE can reproduce exact column definitions.
        """
        cursor.execute(
            "SELECT COLUMN_NAME, DATA_TYPE, "
            "COALESCE(CHARACTER_MAXIMUM_LENGTH, 0), "
            "COALESCE(NUMERIC_PRECISION, 0), "
            "COALESCE(NUMERIC_SCALE, 0), "
            "IS_NULLABLE "
            "FROM INFORMATION_SCHEMA.COLUMNS "
            "WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? "
            "ORDER BY ORDINAL_POSITION",
            (schema_name, table_name),
        )
        rows = cursor.fetchall()
        if not rows:
            raise ValueError(f"No columns found for {schema_name}.{table_name}")
        fields = []
        for col_name, data_type, max_len, precision, scale, nullable in rows:
            arrow_t = _arrow_type(data_type, precision, scale)
            sql_t = _sql_type_str(data_type, max_len, precision, scale)
            fields.append(
                pa.field(
                    col_name, arrow_t,
                    nullable=(nullable == "YES"),
                    metadata={"sql_type": sql_t},
                )
            )
        return pa.schema(fields)
    
    
    def _create_table_ddl(target: str, schema: pa.Schema) -> str:
        """Build DROP/CREATE TABLE DDL from Arrow schema with SQL type metadata."""
        col_defs = []
        for f in schema:
            sql_t = f.metadata[b"sql_type"].decode()
            # timestamp/rowversion is auto-generated and not insertable
            if sql_t == "TIMESTAMP":
                sql_t = "VARBINARY(8)"
            null = "" if f.nullable else " NOT NULL"
            col_defs.append(f"[{f.name}] {sql_t}{null}")
        col_defs_str = ",\n    ".join(col_defs)
        return (
            f"IF OBJECT_ID('{target}', 'U') IS NOT NULL DROP TABLE {target};\n"
            f"CREATE TABLE {target} (\n    {col_defs_str}\n);"
        )
    
  6. Ajoutez la fonction de téléchargement. download_table diffuse des lignes d'une table source en lots à l’aide de fetchmany(), convertit chaque valeur en un type Python compatible avec Arrow, et écrit des lots d'enregistrements de manière incrémentielle dans un fichier Parquet local à l’aide de ParquetWriter. Cela conserve la mémoire limitée, quelle que soit la taille de la table. La fonction utilise deux curseurs distincts : un pour lire les métadonnées de colonne et un autre pour diffuser en continu les données.

    def download_table(conn, schema_name: str, table_name: str, parquet_file: str) -> int:
        """Download a SQL table to a parquet file. Returns row count (0 if empty)."""
        _validate_ident(table_name)
        source = f"{schema_name}.[{table_name}]"
    
        with conn.cursor() as cursor:
            schema = _get_arrow_schema(cursor, schema_name, table_name)
    
        n_cols = len(schema)
        row_count = 0
        t0 = time.perf_counter()
    
        with conn.cursor() as cursor:
            cursor.execute(f"SELECT * FROM {source}")
            writer = None
            try:
                while True:
                    rows = cursor.fetchmany(BATCH_SIZE)
                    if not rows:
                        break
                    columns = [[] for _ in range(n_cols)]
                    for row in rows:
                        for i in range(n_cols):
                            columns[i].append(_convert_value(row[i]))
                    arrays = [
                        pa.array(columns[i], type=schema.field(i).type)
                        for i in range(n_cols)
                    ]
                    batch = pa.record_batch(arrays, schema=schema)
                    if writer is None:
                        writer = pq.ParquetWriter(parquet_file, schema)
                    writer.write_batch(batch)
                    row_count += len(rows)
            finally:
                if writer is not None:
                    writer.close()
    
        if row_count == 0:
            return 0
    
        elapsed = time.perf_counter() - t0
        rate = f"{int(row_count / elapsed):,} rows/sec" if elapsed > 0 else "n/a"
        print(
            f"{schema_name}.{table_name} → {parquet_file}: {row_count:,} rows downloaded "
            f"in {elapsed:.2f}s ({rate})"
        )
        return row_count
    
  7. Ajoutez le hook d’enrichissement. enrich_parquet est un espace réservé où vous pouvez ajouter des transformations, des colonnes dérivées ou des jointures aux données avant leur chargement. Dans ce guide de prise en main rapide, c'est un no-op qui renvoie le chemin d'accès du fichier inchangé.

    def enrich_parquet(parquet_file: str) -> str:
        """Enrich a parquet file before upload. Returns the (possibly new) file path."""
        # TODO: add transformations, derived columns, joins, etc.
        print(f"Enriching {parquet_file} (no-op)")
        return parquet_file
    
  8. Ajoutez la fonction de chargement. upload_parquet lit le schéma Arrow à partir du fichier Parquet, génère et exécute DROP/CREATE TABLE DDL pour préparer la destination, puis lit le fichier par lots et appelle cursor.bulkcopy() pour une insertion en bloc haute performance. L’option table_lock=True améliore le débit en réduisant la contention de verrou. Une fois le chargement terminé, la fonction exécute un SELECT COUNT(*) pour vérifier que le nombre de lignes correspond.

    def upload_parquet(conn, parquet_file: str, target: str) -> int:
        """Upload a parquet file into a SQL table via BCP. Returns row count."""
        # ── Create target table from parquet schema ──
        pf_schema = pq.read_schema(parquet_file)
        with conn.cursor() as cursor:
            cursor.execute(_create_table_ddl(target, pf_schema))
        conn.commit()
    
        # ── Bulk insert ──
        uploaded = 0
        t0 = time.perf_counter()
        with pq.ParquetFile(parquet_file) as pf:
            with conn.cursor() as cursor:
                for batch in pf.iter_batches(batch_size=BATCH_SIZE):
                    rows = zip(*(col.to_pylist() for col in batch.columns))
                    cursor.bulkcopy(
                        target, rows, batch_size=BATCH_SIZE,
                        table_lock=True, timeout=3600,
                    )
                    uploaded += batch.num_rows
        elapsed = time.perf_counter() - t0
    
        # ── Verify ──
        with conn.cursor() as cursor:
            cursor.execute(f"SELECT COUNT(*) FROM {target}")
            count = cursor.fetchone()[0]
    
        rate = f"{int(uploaded / elapsed):,} rows/sec" if elapsed > 0 else "n/a"
        print(
            f"{parquet_file} → {target}: {uploaded:,} rows uploaded "
            f"in {elapsed:.2f}s ({rate}) "
            f"| verified: {count:,}"
        )
        return uploaded
    
  9. Ajoutez la fonction d’orchestration. transfer_tables lie les trois phases ensemble. Il se connecte à la base de données source, découvre toutes les tables de base dans le schéma donné via INFORMATION_SCHEMA.TABLES, télécharge chacun dans un fichier Parquet local, exécute le hook d’enrichissement, puis se connecte à la base de données de destination et charge chaque fichier.

    def transfer_tables(
        source_conn_str: str,
        dest_conn_str: str,
        source_schema: str,
        dest_schema: str,
    ) -> None:
        """Download all tables from source DB/schema to parquet, upload to dest DB/schema."""
        _validate_ident(source_schema)
        _validate_ident(dest_schema)
    
        parquet_dir = source_schema
        os.makedirs(parquet_dir, exist_ok=True)
    
        # ── Download from source ──
        with mssql_python.connect(source_conn_str) as src_conn:
            with src_conn.cursor() as cursor:
                cursor.execute(
                    "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES "
                    "WHERE TABLE_SCHEMA = ? AND TABLE_TYPE = 'BASE TABLE' "
                    "ORDER BY TABLE_NAME",
                    (source_schema,),
                )
                tables = [row[0] for row in cursor.fetchall()]
    
            print(f"Found {len(tables)} {source_schema} tables: {', '.join(tables)}\n")
    
            parquet_files = []
            for table_name in tables:
                parquet_file = os.path.join(parquet_dir, f"{table_name}.parquet")
                row_count = download_table(src_conn, source_schema, table_name, parquet_file)
                if row_count == 0:
                    print(f"{source_schema}.{table_name}: empty, skipping")
                else:
                    parquet_files.append((table_name, parquet_file))
    
        # ── Enrich parquet files ──
        enriched = []
        for table_name, parquet_file in parquet_files:
            enriched.append((table_name, enrich_parquet(parquet_file)))
    
        # ── Upload to destination ──
        with mssql_python.connect(dest_conn_str) as dest_conn:
            for table_name, parquet_file in enriched:
                target = f"{dest_schema}.[{table_name}]"
                upload_parquet(dest_conn, parquet_file, target)
    
  10. Enfin, ajoutez le point d’entrée main . Il charge le fichier .env, appelle transfer_tables avec les chaînes de connexion source et de destination, et affiche le temps total écoulé.

    def main():
        load_dotenv()
        t_start = time.perf_counter()
    
        transfer_tables(
            source_conn_str=os.environ["SOURCE_CONNECTION_STRING"],
            dest_conn_str=os.environ["DEST_CONNECTION_STRING"],
            source_schema="SalesLT",
            dest_schema="dbo",
        )
    
        print(f"Total: {time.perf_counter() - t_start:.2f}s")
    
    
    if __name__ == "__main__":
        main()
    
  11. Enregistrez et fermez main.py.

Enregistrer les chaînes de connexion

  1. Ouvrez le .gitignore fichier et ajoutez une exclusion pour .env les fichiers. Votre fichier doit être similaire à cet exemple. Veillez à l’enregistrer et à le fermer lorsque vous avez terminé.

    # Python-generated files
    __pycache__/
    *.py[oc]
    build/
    dist/
    wheels/
    *.egg-info
    
    # Virtual environments
    .venv
    
    # Connection strings and secrets
    .env
    
  2. Dans le répertoire actif, créez un fichier nommé .env.

  3. Dans le .env fichier, ajoutez des entrées pour vos chaînes de connexion source et de destination. Remplacez les valeurs d’espace réservé par vos noms de serveur et de base de données réels.

    SOURCE_CONNECTION_STRING="Server=<source_server_name>;Database=<source_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive"
    DEST_CONNECTION_STRING="Server=<dest_server_name>;Database=<dest_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive"
    

    Conseil / Astuce

    La chaîne de connexion utilisée ici dépend en grande partie du type de base de données SQL à laquelle vous vous connectez. Si vous vous connectez à une base de données Azure SQL ou à une base de données SQL dans Fabric, utilisez la chaîne de connexion ODBC à partir de l’onglet chaînes de connexion. Vous devrez peut-être ajuster le type d’authentification en fonction de votre scénario. Pour plus d’informations sur les chaînes de connexion et leur syntaxe, consultez la référence de la syntaxe des chaînes de connexion.

Conseil / Astuce

Sur macOS, les deux ActiveDirectoryInteractive et ActiveDirectoryDefault fonctionnent pour l’authentification Microsoft Entra. ActiveDirectoryInteractive vous invite à vous connecter chaque fois que vous exécutez le script. Pour éviter les invites de connexion répétées, connectez-vous une fois via Azure CLI en exécutant az login, puis utilisez ActiveDirectoryDefault, qui réutilise les informations d’identification mises en cache.

Utilisez uv run pour exécuter le script

  1. Dans la fenêtre de terminal à partir de l’avant, ou une nouvelle fenêtre de terminal ouverte au même répertoire, exécutez la commande suivante.

     uv run main.py
    

    Voici la sortie attendue lorsque le script est terminé.

    Found 12 SalesLT tables: Address, Customer, CustomerAddress, ...
    
    SalesLT.Address → SalesLT/Address.parquet: 450 rows downloaded in 0.15s (3,000 rows/sec)
    ...
    SalesLT/Address.parquet → dbo.[Address]: 450 rows uploaded in 0.10s (4,500 rows/sec) | verified: 450
    ...
    Total: 2.35s
    
  2. Connectez-vous à la base de données de destination à l’aide de SQL Server Management Studio (SSMS) ou de l’extension MSSQL pour VS Code et vérifiez que les tables et données ont été créées avec succès.

  3. Pour déployer votre script sur un autre ordinateur, copiez tous les fichiers à l’exception du .venv dossier sur l’autre ordinateur. L’environnement virtuel est recréé avec la première exécution.

Fonctionnement du code

L’application effectue un transfert complet de données aller-retour en trois phases :

  1. Télécharger : se connecte à la base de données source, lit les métadonnées de colonne à partir de INFORMATION_SCHEMA.COLUMNS, génère un schéma Apache Arrow, puis télécharge chaque table dans un fichier Parquet local.
  2. Enrichir (facultatif) : fournit un hook (enrich_parquet) dans lequel vous pouvez ajouter des transformations, des colonnes dérivées ou des jointures avant le chargement.
  3. Chargement : lit chaque fichier Parquet par lots, recrée la table de la base de données de destination à l’aide de DDL générée à partir des métadonnées de schéma Arrow, puis utilise cursor.bulkcopy() pour une insertion en bloc en haute performance.

Étape suivante

Pour obtenir d’autres exemples, consultez le dépôt GitHub du mssql-python pilote pour contribuer à des idées ou signaler des problèmes.