Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Dans ce guide rapide de démarrage, vous utilisez le mssql-python pilote pour copier en bloc des données entre des bases de données. L’application télécharge des tables à partir d’un schéma de base de données source vers des fichiers Parquet locaux à l’aide d’Apache Arrow, puis les charge dans une base de données de destination à l’aide de la méthode hautes performances bulkcopy . Vous pouvez utiliser ce modèle pour migrer, répliquer ou transformer des données entre SQL Server, Azure SQL Database et SQL Database dans Fabric.
Le mssql-python pilote ne nécessite aucune dépendance externe sur les machines Windows. Le pilote installe tout ce dont il a besoin avec une seule pip installation, ce qui vous permet d’utiliser la dernière version du pilote pour les nouveaux scripts sans interrompre d’autres scripts que vous n’avez pas le temps de mettre à niveau et de tester.
documentation | code | Package (PyPI) | Uv
Prerequisites
Python 3
Si vous n’avez pas encore Python, installez le runtime Python et le gestionnaire de package pip à partir de python.org.
Ne souhaitez pas utiliser votre propre environnement ? Ouvrez en tant que devcontainer à l’aide de GitHub Codespaces.
Visual Studio Code avec les extensions suivantes :
- Extension Python pour Visual Studio Code
Azure Command-Line Interface (CLI) pour l’authentification sans mot de passe sur macOS et Linux.
Si vous n’avez
uvpas encore, suivez les instructions d’installation.Base de données source sur SQL Server, Azure SQL Database ou base de données SQL dans Fabric avec l’exemple
AdventureWorks2025de schéma et une chaîne de connexion valide.Base de données de destination sur SQL Server, Azure SQL Database ou base de données SQL dans Fabric avec une chaîne de connexion valide. L’utilisateur doit avoir l’autorisation de créer et d’écrire dans des tables. Si vous n’avez pas de deuxième base de données, vous pouvez modifier la chaîne de connexion de destination pour qu’elle pointe vers la même base de données et utiliser un schéma différent pour les tables de destination.
Installez les prérequis ponctuels spécifiques au système d'exploitation.
Créer une base de données SQL
Ce guide de démarrage rapide nécessite le schéma AdventureWorks2025 Lightweight comme base de données source.
Créer le projet et exécuter le code
- Créer un projet
- Ajouter des dépendances
- Lancer Visual Studio Code
- Mettre à jour pyproject.toml
- Mettre à jour main.py
- Enregistrer les chaînes de connexion
- Utilisez uv run pour exécuter le script
Créer un projet
Ouvrez une invite de commandes dans votre répertoire de développement. Si vous n’en avez pas, créez un répertoire appelé
python,scriptsetc. Évitez les dossiers sur votre OneDrive, la synchronisation peut interférer avec la gestion de votre environnement virtuel.Créez un projet avec
uv.uv init mssql-python-bcp-qs cd mssql-python-bcp-qs
Ajout de dépendances
Dans le même répertoire, installez les packages mssql-python, python-dotenv et pyarrow.
uv add mssql-python python-dotenv pyarrow
Lancer Visual Studio Code
Dans le même répertoire, exécutez la commande suivante.
code .
Mettre à jour pyproject.toml
Pyproject.toml contient les métadonnées de votre projet. Ouvrez le fichier dans votre éditeur favori.
Passez en revue le contenu du fichier. Il doit être similaire à cet exemple. Notez la version et la dépendance Python car
mssql-pythonutilise>=pour définir une version minimale. Si vous préférez une version exacte, changez le>=avant le numéro de version en==. Les versions résolues de chaque package sont ensuite stockées dans uv.lock. Le fichier de verrouillage garantit que les développeurs travaillant sur le projet utilisent des versions de package cohérentes. Il garantit également que le même ensemble de versions de package est utilisé lors de la distribution de votre package aux utilisateurs finaux. Vous ne devez pas modifier leuv.lockfichier.[project] name = "mssql-python-bcp-qs" version = "0.1.0" description = "Add your description here" readme = "README.md" requires-python = ">=3.11" dependencies = [ "mssql-python>=1.4.0", "python-dotenv>=1.1.1", "pyarrow>=19.0.0", ]Mettez à jour la description pour être plus descriptive.
description = "Bulk copies data between SQL databases using mssql-python and Apache Arrow"Enregistrez et fermez le fichier.
Mettre à jour main.py
Ouvrez le fichier nommé
main.py. Il doit être similaire à cet exemple.def main(): print("Hello from mssql-python-bcp-qs!") if __name__ == "__main__": main()Remplacez le contenu de
main.pypar les blocs de code suivants. Chaque bloc s'appuie sur le précédent et doit être placé dans l'ordremain.py.Conseil / Astuce
Si Visual Studio Code rencontre des difficultés pour résoudre les packages, vous devez mettre à jour l’interpréteur pour utiliser l’environnement virtuel.
En haut de
main.py, ajoutez les importations et les constantes. Le script utilisemssql_pythonpour la connectivité de base de données etpyarrowpyarrow.parquetpour la gestion des données en colonnes et les E/S de fichier Parquet,python-dotenvpour le chargement de chaînes de connexion à partir d’un.envfichier et un modèle regex compilé qui valide les identificateurs SQL pour empêcher l’injection."""Round-trip: download tables from a source DB/schema to parquet, upload to a destination DB/schema.""" import os import re import time from uuid import UUID import pyarrow as pa import pyarrow.parquet as pq from dotenv import load_dotenv import mssql_python BATCH_SIZE = 64_000 _SAFE_IDENT = re.compile(r"^[A-Za-z0-9_]+$") def _validate_ident(name: str) -> str: if not _SAFE_IDENT.match(name): raise ValueError(f"Unsafe SQL identifier: {name!r}") return nameEn dessous des importations, ajoutez le mappage de type SQL-to-Arrow. Ce dictionnaire traduit les types de colonnes SQL Server en leurs équivalents Apache Arrow afin que la fidélité des données soit conservée lors de l’écriture dans Parquet. Les deux fonctions d’assistance créent des chaînes de type SQL exactes (par exemple,
NVARCHAR(100)ouDECIMAL(18,2)) à partir deINFORMATION_SCHEMAmétadonnées et résolvent le type de flèche correspondant pour chaque colonne._SQL_TO_ARROW = { "bit": pa.bool_(), "tinyint": pa.uint8(), "smallint": pa.int16(), "int": pa.int32(), "bigint": pa.int64(), "float": pa.float64(), "real": pa.float32(), "smallmoney": pa.decimal128(10, 4), "money": pa.decimal128(19, 4), "date": pa.date32(), "datetime": pa.timestamp("ms"), "datetime2": pa.timestamp("us"), "smalldatetime": pa.timestamp("s"), "uniqueidentifier": pa.string(), "xml": pa.string(), "image": pa.binary(), "binary": pa.binary(), "varbinary": pa.binary(), "timestamp": pa.binary(), } def _sql_type_str(data_type: str, max_length: int, precision: int, scale: int) -> str: """Build the exact SQL type string from INFORMATION_SCHEMA metadata.""" dt = data_type.lower() if dt in ("char", "varchar", "nchar", "nvarchar", "binary", "varbinary"): length = "MAX" if max_length == -1 else str(max_length) return f"{dt.upper()}({length})" if dt in ("decimal", "numeric"): return f"{dt.upper()}({precision},{scale})" return dt.upper() def _arrow_type(sql_type: str, precision: int, scale: int) -> pa.DataType: sql_type = sql_type.lower() if sql_type in _SQL_TO_ARROW: return _SQL_TO_ARROW[sql_type] if sql_type in ("decimal", "numeric"): return pa.decimal128(precision, scale) if sql_type in ("char", "varchar", "nchar", "nvarchar", "text", "ntext", "sysname"): return pa.string() return pa.string() def _convert_value(v): """Convert a SQL value to an Arrow-compatible Python type.""" if isinstance(v, UUID): return str(v) return vAjoutez les fonctions d’introspection de schéma et de génération DDL.
_get_arrow_schemarequêtesINFORMATION_SCHEMA.COLUMNSutilisant des requêtes paramétrables, génère un schéma de flèche et stocke le type SQL d’origine en tant que métadonnées de champ afin que la table de destination puisse être recréée avec des définitions de colonnes exactes._create_table_ddllit ces métadonnées pour générerDROP/CREATE TABLEDDL. Le typetimestamp(rowversion) est remappé enVARBINARY(8)parce qu'il est généré automatiquement et ne peut pas être inséré directement.def _get_arrow_schema(cursor, schema_name: str, table_name: str) -> pa.Schema: """Build an Arrow schema from INFORMATION_SCHEMA.COLUMNS. Stores the original SQL type as field metadata so the round-trip CREATE TABLE can reproduce exact column definitions. """ cursor.execute( "SELECT COLUMN_NAME, DATA_TYPE, " "COALESCE(CHARACTER_MAXIMUM_LENGTH, 0), " "COALESCE(NUMERIC_PRECISION, 0), " "COALESCE(NUMERIC_SCALE, 0), " "IS_NULLABLE " "FROM INFORMATION_SCHEMA.COLUMNS " "WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? " "ORDER BY ORDINAL_POSITION", (schema_name, table_name), ) rows = cursor.fetchall() if not rows: raise ValueError(f"No columns found for {schema_name}.{table_name}") fields = [] for col_name, data_type, max_len, precision, scale, nullable in rows: arrow_t = _arrow_type(data_type, precision, scale) sql_t = _sql_type_str(data_type, max_len, precision, scale) fields.append( pa.field( col_name, arrow_t, nullable=(nullable == "YES"), metadata={"sql_type": sql_t}, ) ) return pa.schema(fields) def _create_table_ddl(target: str, schema: pa.Schema) -> str: """Build DROP/CREATE TABLE DDL from Arrow schema with SQL type metadata.""" col_defs = [] for f in schema: sql_t = f.metadata[b"sql_type"].decode() # timestamp/rowversion is auto-generated and not insertable if sql_t == "TIMESTAMP": sql_t = "VARBINARY(8)" null = "" if f.nullable else " NOT NULL" col_defs.append(f"[{f.name}] {sql_t}{null}") col_defs_str = ",\n ".join(col_defs) return ( f"IF OBJECT_ID('{target}', 'U') IS NOT NULL DROP TABLE {target};\n" f"CREATE TABLE {target} (\n {col_defs_str}\n);" )Ajoutez la fonction de téléchargement.
download_tablediffuse des lignes d'une table source en lots à l’aide defetchmany(), convertit chaque valeur en un type Python compatible avec Arrow, et écrit des lots d'enregistrements de manière incrémentielle dans un fichier Parquet local à l’aide deParquetWriter. Cela conserve la mémoire limitée, quelle que soit la taille de la table. La fonction utilise deux curseurs distincts : un pour lire les métadonnées de colonne et un autre pour diffuser en continu les données.def download_table(conn, schema_name: str, table_name: str, parquet_file: str) -> int: """Download a SQL table to a parquet file. Returns row count (0 if empty).""" _validate_ident(table_name) source = f"{schema_name}.[{table_name}]" with conn.cursor() as cursor: schema = _get_arrow_schema(cursor, schema_name, table_name) n_cols = len(schema) row_count = 0 t0 = time.perf_counter() with conn.cursor() as cursor: cursor.execute(f"SELECT * FROM {source}") writer = None try: while True: rows = cursor.fetchmany(BATCH_SIZE) if not rows: break columns = [[] for _ in range(n_cols)] for row in rows: for i in range(n_cols): columns[i].append(_convert_value(row[i])) arrays = [ pa.array(columns[i], type=schema.field(i).type) for i in range(n_cols) ] batch = pa.record_batch(arrays, schema=schema) if writer is None: writer = pq.ParquetWriter(parquet_file, schema) writer.write_batch(batch) row_count += len(rows) finally: if writer is not None: writer.close() if row_count == 0: return 0 elapsed = time.perf_counter() - t0 rate = f"{int(row_count / elapsed):,} rows/sec" if elapsed > 0 else "n/a" print( f"{schema_name}.{table_name} → {parquet_file}: {row_count:,} rows downloaded " f"in {elapsed:.2f}s ({rate})" ) return row_countAjoutez le hook d’enrichissement.
enrich_parquetest un espace réservé où vous pouvez ajouter des transformations, des colonnes dérivées ou des jointures aux données avant leur chargement. Dans ce guide de prise en main rapide, c'est un no-op qui renvoie le chemin d'accès du fichier inchangé.def enrich_parquet(parquet_file: str) -> str: """Enrich a parquet file before upload. Returns the (possibly new) file path.""" # TODO: add transformations, derived columns, joins, etc. print(f"Enriching {parquet_file} (no-op)") return parquet_fileAjoutez la fonction de chargement.
upload_parquetlit le schéma Arrow à partir du fichier Parquet, génère et exécuteDROP/CREATE TABLEDDL pour préparer la destination, puis lit le fichier par lots et appellecursor.bulkcopy()pour une insertion en bloc haute performance. L’optiontable_lock=Trueaméliore le débit en réduisant la contention de verrou. Une fois le chargement terminé, la fonction exécute unSELECT COUNT(*)pour vérifier que le nombre de lignes correspond.def upload_parquet(conn, parquet_file: str, target: str) -> int: """Upload a parquet file into a SQL table via BCP. Returns row count.""" # ── Create target table from parquet schema ── pf_schema = pq.read_schema(parquet_file) with conn.cursor() as cursor: cursor.execute(_create_table_ddl(target, pf_schema)) conn.commit() # ── Bulk insert ── uploaded = 0 t0 = time.perf_counter() with pq.ParquetFile(parquet_file) as pf: with conn.cursor() as cursor: for batch in pf.iter_batches(batch_size=BATCH_SIZE): rows = zip(*(col.to_pylist() for col in batch.columns)) cursor.bulkcopy( target, rows, batch_size=BATCH_SIZE, table_lock=True, timeout=3600, ) uploaded += batch.num_rows elapsed = time.perf_counter() - t0 # ── Verify ── with conn.cursor() as cursor: cursor.execute(f"SELECT COUNT(*) FROM {target}") count = cursor.fetchone()[0] rate = f"{int(uploaded / elapsed):,} rows/sec" if elapsed > 0 else "n/a" print( f"{parquet_file} → {target}: {uploaded:,} rows uploaded " f"in {elapsed:.2f}s ({rate}) " f"| verified: {count:,}" ) return uploadedAjoutez la fonction d’orchestration.
transfer_tableslie les trois phases ensemble. Il se connecte à la base de données source, découvre toutes les tables de base dans le schéma donné viaINFORMATION_SCHEMA.TABLES, télécharge chacun dans un fichier Parquet local, exécute le hook d’enrichissement, puis se connecte à la base de données de destination et charge chaque fichier.def transfer_tables( source_conn_str: str, dest_conn_str: str, source_schema: str, dest_schema: str, ) -> None: """Download all tables from source DB/schema to parquet, upload to dest DB/schema.""" _validate_ident(source_schema) _validate_ident(dest_schema) parquet_dir = source_schema os.makedirs(parquet_dir, exist_ok=True) # ── Download from source ── with mssql_python.connect(source_conn_str) as src_conn: with src_conn.cursor() as cursor: cursor.execute( "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES " "WHERE TABLE_SCHEMA = ? AND TABLE_TYPE = 'BASE TABLE' " "ORDER BY TABLE_NAME", (source_schema,), ) tables = [row[0] for row in cursor.fetchall()] print(f"Found {len(tables)} {source_schema} tables: {', '.join(tables)}\n") parquet_files = [] for table_name in tables: parquet_file = os.path.join(parquet_dir, f"{table_name}.parquet") row_count = download_table(src_conn, source_schema, table_name, parquet_file) if row_count == 0: print(f"{source_schema}.{table_name}: empty, skipping") else: parquet_files.append((table_name, parquet_file)) # ── Enrich parquet files ── enriched = [] for table_name, parquet_file in parquet_files: enriched.append((table_name, enrich_parquet(parquet_file))) # ── Upload to destination ── with mssql_python.connect(dest_conn_str) as dest_conn: for table_name, parquet_file in enriched: target = f"{dest_schema}.[{table_name}]" upload_parquet(dest_conn, parquet_file, target)Enfin, ajoutez le point d’entrée
main. Il charge le fichier.env, appelletransfer_tablesavec les chaînes de connexion source et de destination, et affiche le temps total écoulé.def main(): load_dotenv() t_start = time.perf_counter() transfer_tables( source_conn_str=os.environ["SOURCE_CONNECTION_STRING"], dest_conn_str=os.environ["DEST_CONNECTION_STRING"], source_schema="SalesLT", dest_schema="dbo", ) print(f"Total: {time.perf_counter() - t_start:.2f}s") if __name__ == "__main__": main()Enregistrez et fermez
main.py.
Enregistrer les chaînes de connexion
Ouvrez le
.gitignorefichier et ajoutez une exclusion pour.envles fichiers. Votre fichier doit être similaire à cet exemple. Veillez à l’enregistrer et à le fermer lorsque vous avez terminé.# Python-generated files __pycache__/ *.py[oc] build/ dist/ wheels/ *.egg-info # Virtual environments .venv # Connection strings and secrets .envDans le répertoire actif, créez un fichier nommé
.env.Dans le
.envfichier, ajoutez des entrées pour vos chaînes de connexion source et de destination. Remplacez les valeurs d’espace réservé par vos noms de serveur et de base de données réels.SOURCE_CONNECTION_STRING="Server=<source_server_name>;Database=<source_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive" DEST_CONNECTION_STRING="Server=<dest_server_name>;Database=<dest_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive"Conseil / Astuce
La chaîne de connexion utilisée ici dépend en grande partie du type de base de données SQL à laquelle vous vous connectez. Si vous vous connectez à une base de données Azure SQL ou à une base de données SQL dans Fabric, utilisez la chaîne de connexion ODBC à partir de l’onglet chaînes de connexion. Vous devrez peut-être ajuster le type d’authentification en fonction de votre scénario. Pour plus d’informations sur les chaînes de connexion et leur syntaxe, consultez la référence de la syntaxe des chaînes de connexion.
Conseil / Astuce
Sur macOS, les deux ActiveDirectoryInteractive et ActiveDirectoryDefault fonctionnent pour l’authentification Microsoft Entra.
ActiveDirectoryInteractive vous invite à vous connecter chaque fois que vous exécutez le script. Pour éviter les invites de connexion répétées, connectez-vous une fois via Azure CLI en exécutant az login, puis utilisez ActiveDirectoryDefault, qui réutilise les informations d’identification mises en cache.
Utilisez uv run pour exécuter le script
Dans la fenêtre de terminal à partir de l’avant, ou une nouvelle fenêtre de terminal ouverte au même répertoire, exécutez la commande suivante.
uv run main.pyVoici la sortie attendue lorsque le script est terminé.
Found 12 SalesLT tables: Address, Customer, CustomerAddress, ... SalesLT.Address → SalesLT/Address.parquet: 450 rows downloaded in 0.15s (3,000 rows/sec) ... SalesLT/Address.parquet → dbo.[Address]: 450 rows uploaded in 0.10s (4,500 rows/sec) | verified: 450 ... Total: 2.35sConnectez-vous à la base de données de destination à l’aide de SQL Server Management Studio (SSMS) ou de l’extension MSSQL pour VS Code et vérifiez que les tables et données ont été créées avec succès.
Pour déployer votre script sur un autre ordinateur, copiez tous les fichiers à l’exception du
.venvdossier sur l’autre ordinateur. L’environnement virtuel est recréé avec la première exécution.
Fonctionnement du code
L’application effectue un transfert complet de données aller-retour en trois phases :
-
Télécharger : se connecte à la base de données source, lit les métadonnées de colonne à partir de
INFORMATION_SCHEMA.COLUMNS, génère un schéma Apache Arrow, puis télécharge chaque table dans un fichier Parquet local. -
Enrichir (facultatif) : fournit un hook (
enrich_parquet) dans lequel vous pouvez ajouter des transformations, des colonnes dérivées ou des jointures avant le chargement. -
Chargement : lit chaque fichier Parquet par lots, recrée la table de la base de données de destination à l’aide de DDL générée à partir des métadonnées de schéma Arrow, puis utilise
cursor.bulkcopy()pour une insertion en bloc en haute performance.
Étape suivante
Pour obtenir d’autres exemples, consultez le dépôt GitHub du mssql-python pilote pour contribuer à des idées ou signaler des problèmes.