Poznámka
Na prístup k tejto stránke sa vyžaduje oprávnenie. Môžete sa skúsiť prihlásiť alebo zmeniť adresáre.
Na prístup k tejto stránke sa vyžaduje oprávnenie. Môžete skúsiť zmeniť adresáre.
In this quickstart, you use the mssql-python driver to bulk copy data between databases. The application downloads tables from a source database schema to local Parquet files using Apache Arrow, then uploads them to a destination database using the high-performance bulkcopy method. You can use this pattern to migrate, replicate, or transform data between SQL Server, Azure SQL Database, and SQL database in Fabric.
The mssql-python driver doesn't require any external dependencies on Windows machines. The driver installs everything that it needs with a single pip install, allowing you to use the latest version of the driver for new scripts without breaking other scripts that you don't have time to upgrade and test.
mssql-python documentation | mssql-python source code | Package (PyPI) | uv
Prerequisites
Python 3
If you don't already have Python, install the Python runtime and pip package manager from python.org.
Don't want to use your own environment? Open as a devcontainer using GitHub Codespaces.
Visual Studio Code with the following extensions:
Azure Command-Line Interface (CLI) for passwordless authentication on macOS and Linux.
If you don't already have
uv, follow the installation instructions.A source database on SQL Server, Azure SQL Database, or SQL database in Fabric with the
AdventureWorks2025sample schema and a valid connection string.A destination database on SQL Server, Azure SQL Database, or SQL database in Fabric with a valid connection string. The user must have permission to create and write to tables. If you don't have a second database, you can change the destination connection string to point to the same database and use a different schema for the destination tables.
Install one-time operating system specific prerequisites.
Create a SQL database
This quickstart requires the AdventureWorks2025 Lightweight schema as the source database.
Create the project and run the code
- Create a new project
- Add dependencies
- Launch Visual Studio Code
- Update pyproject.toml
- Update main.py
- Save the connection strings
- Use uv run to execute the script
Create a new project
Open a command prompt in your development directory. If you don't have one, create a new directory called
python,scripts, etc. Avoid folders on your OneDrive, the synchronization can interfere with managing your virtual environment.Create a new project with
uv.uv init mssql-python-bcp-qs cd mssql-python-bcp-qs
Add dependencies
In the same directory, install the mssql-python, python-dotenv, and pyarrow packages.
uv add mssql-python python-dotenv pyarrow
Launch Visual Studio Code
In the same directory, run the following command.
code .
Update pyproject.toml
The pyproject.toml contains the metadata for your project. Open the file in your favorite editor.
Review the contents of the file. It should be similar to this example. Note the Python version and dependency for
mssql-pythonuses>=to define a minimum version. If you prefer an exact version, change the>=before the version number to==. The resolved versions of each package are then stored in the uv.lock. The lockfile ensures that developers working on the project are using consistent package versions. It also ensures that the exact same set of package versions is used when distributing your package to end users. You shouldn't edit theuv.lockfile.[project] name = "mssql-python-bcp-qs" version = "0.1.0" description = "Add your description here" readme = "README.md" requires-python = ">=3.11" dependencies = [ "mssql-python>=1.4.0", "python-dotenv>=1.1.1", "pyarrow>=19.0.0", ]Update the description to be more descriptive.
description = "Bulk copies data between SQL databases using mssql-python and Apache Arrow"Save and close the file.
Update main.py
Open the file named
main.py. It should be similar to this example.def main(): print("Hello from mssql-python-bcp-qs!") if __name__ == "__main__": main()Replace the contents of
main.pywith the following code blocks. Each block builds on the previous one and should be placed inmain.pyin order.Tip
If Visual Studio Code is having trouble resolving packages, you need to update the interpreter to use the virtual environment.
At the top of
main.py, add the imports and constants. The script usesmssql_pythonfor database connectivity,pyarrowandpyarrow.parquetfor columnar data handling and Parquet file I/O,python-dotenvfor loading connection strings from a.envfile, and a compiled regex pattern that validates SQL identifiers to prevent injection."""Round-trip: download tables from a source DB/schema to parquet, upload to a destination DB/schema.""" import os import re import time from uuid import UUID import pyarrow as pa import pyarrow.parquet as pq from dotenv import load_dotenv import mssql_python BATCH_SIZE = 64_000 _SAFE_IDENT = re.compile(r"^[A-Za-z0-9_]+$") def _validate_ident(name: str) -> str: if not _SAFE_IDENT.match(name): raise ValueError(f"Unsafe SQL identifier: {name!r}") return nameBelow the imports, add the SQL-to-Arrow type mapping. This dictionary translates SQL Server column types to their Apache Arrow equivalents so that data fidelity is preserved when writing to Parquet. The two helper functions build exact SQL type strings (for example,
NVARCHAR(100)orDECIMAL(18,2)) fromINFORMATION_SCHEMAmetadata and resolve the matching Arrow type for each column._SQL_TO_ARROW = { "bit": pa.bool_(), "tinyint": pa.uint8(), "smallint": pa.int16(), "int": pa.int32(), "bigint": pa.int64(), "float": pa.float64(), "real": pa.float32(), "smallmoney": pa.decimal128(10, 4), "money": pa.decimal128(19, 4), "date": pa.date32(), "datetime": pa.timestamp("ms"), "datetime2": pa.timestamp("us"), "smalldatetime": pa.timestamp("s"), "uniqueidentifier": pa.string(), "xml": pa.string(), "image": pa.binary(), "binary": pa.binary(), "varbinary": pa.binary(), "timestamp": pa.binary(), } def _sql_type_str(data_type: str, max_length: int, precision: int, scale: int) -> str: """Build the exact SQL type string from INFORMATION_SCHEMA metadata.""" dt = data_type.lower() if dt in ("char", "varchar", "nchar", "nvarchar", "binary", "varbinary"): length = "MAX" if max_length == -1 else str(max_length) return f"{dt.upper()}({length})" if dt in ("decimal", "numeric"): return f"{dt.upper()}({precision},{scale})" return dt.upper() def _arrow_type(sql_type: str, precision: int, scale: int) -> pa.DataType: sql_type = sql_type.lower() if sql_type in _SQL_TO_ARROW: return _SQL_TO_ARROW[sql_type] if sql_type in ("decimal", "numeric"): return pa.decimal128(precision, scale) if sql_type in ("char", "varchar", "nchar", "nvarchar", "text", "ntext", "sysname"): return pa.string() return pa.string() def _convert_value(v): """Convert a SQL value to an Arrow-compatible Python type.""" if isinstance(v, UUID): return str(v) return vAdd the schema introspection and DDL generation functions.
_get_arrow_schemaqueriesINFORMATION_SCHEMA.COLUMNSusing parameterized queries, builds an Arrow schema, and stores the original SQL type as field metadata so the destination table can be recreated with exact column definitions._create_table_ddlreads that metadata back to generateDROP/CREATE TABLEDDL. Thetimestamp(rowversion) type is remapped toVARBINARY(8)because it's auto-generated and not directly insertable.def _get_arrow_schema(cursor, schema_name: str, table_name: str) -> pa.Schema: """Build an Arrow schema from INFORMATION_SCHEMA.COLUMNS. Stores the original SQL type as field metadata so the round-trip CREATE TABLE can reproduce exact column definitions. """ cursor.execute( "SELECT COLUMN_NAME, DATA_TYPE, " "COALESCE(CHARACTER_MAXIMUM_LENGTH, 0), " "COALESCE(NUMERIC_PRECISION, 0), " "COALESCE(NUMERIC_SCALE, 0), " "IS_NULLABLE " "FROM INFORMATION_SCHEMA.COLUMNS " "WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? " "ORDER BY ORDINAL_POSITION", (schema_name, table_name), ) rows = cursor.fetchall() if not rows: raise ValueError(f"No columns found for {schema_name}.{table_name}") fields = [] for col_name, data_type, max_len, precision, scale, nullable in rows: arrow_t = _arrow_type(data_type, precision, scale) sql_t = _sql_type_str(data_type, max_len, precision, scale) fields.append( pa.field( col_name, arrow_t, nullable=(nullable == "YES"), metadata={"sql_type": sql_t}, ) ) return pa.schema(fields) def _create_table_ddl(target: str, schema: pa.Schema) -> str: """Build DROP/CREATE TABLE DDL from Arrow schema with SQL type metadata.""" col_defs = [] for f in schema: sql_t = f.metadata[b"sql_type"].decode() # timestamp/rowversion is auto-generated and not insertable if sql_t == "TIMESTAMP": sql_t = "VARBINARY(8)" null = "" if f.nullable else " NOT NULL" col_defs.append(f"[{f.name}] {sql_t}{null}") col_defs_str = ",\n ".join(col_defs) return ( f"IF OBJECT_ID('{target}', 'U') IS NOT NULL DROP TABLE {target};\n" f"CREATE TABLE {target} (\n {col_defs_str}\n);" )Add the download function.
download_tablestreams rows from a source table in batches usingfetchmany(), converts each value to an Arrow-compatible Python type, and writes record batches incrementally to a local Parquet file usingParquetWriter. This keeps memory bounded regardless of table size. The function uses two separate cursors: one to read column metadata, and another to stream the data.def download_table(conn, schema_name: str, table_name: str, parquet_file: str) -> int: """Download a SQL table to a parquet file. Returns row count (0 if empty).""" _validate_ident(table_name) source = f"{schema_name}.[{table_name}]" with conn.cursor() as cursor: schema = _get_arrow_schema(cursor, schema_name, table_name) n_cols = len(schema) row_count = 0 t0 = time.perf_counter() with conn.cursor() as cursor: cursor.execute(f"SELECT * FROM {source}") writer = None try: while True: rows = cursor.fetchmany(BATCH_SIZE) if not rows: break columns = [[] for _ in range(n_cols)] for row in rows: for i in range(n_cols): columns[i].append(_convert_value(row[i])) arrays = [ pa.array(columns[i], type=schema.field(i).type) for i in range(n_cols) ] batch = pa.record_batch(arrays, schema=schema) if writer is None: writer = pq.ParquetWriter(parquet_file, schema) writer.write_batch(batch) row_count += len(rows) finally: if writer is not None: writer.close() if row_count == 0: return 0 elapsed = time.perf_counter() - t0 rate = f"{int(row_count / elapsed):,} rows/sec" if elapsed > 0 else "n/a" print( f"{schema_name}.{table_name} → {parquet_file}: {row_count:,} rows downloaded " f"in {elapsed:.2f}s ({rate})" ) return row_countAdd the enrichment hook.
enrich_parquetis a placeholder where you can add transformations, derived columns, or joins to data before it's uploaded. In this quickstart, it's a no-op that returns the file path unchanged.def enrich_parquet(parquet_file: str) -> str: """Enrich a parquet file before upload. Returns the (possibly new) file path.""" # TODO: add transformations, derived columns, joins, etc. print(f"Enriching {parquet_file} (no-op)") return parquet_fileAdd the upload function.
upload_parquetreads the Arrow schema from the Parquet file, generates and executesDROP/CREATE TABLEDDL to prepare the destination, then reads the file in batches and callscursor.bulkcopy()for high-performance bulk insert. Thetable_lock=Trueoption improves throughput by minimizing lock contention. After the upload completes, the function runs aSELECT COUNT(*)to verify the row count matches.def upload_parquet(conn, parquet_file: str, target: str) -> int: """Upload a parquet file into a SQL table via BCP. Returns row count.""" # ── Create target table from parquet schema ── pf_schema = pq.read_schema(parquet_file) with conn.cursor() as cursor: cursor.execute(_create_table_ddl(target, pf_schema)) conn.commit() # ── Bulk insert ── uploaded = 0 t0 = time.perf_counter() with pq.ParquetFile(parquet_file) as pf: with conn.cursor() as cursor: for batch in pf.iter_batches(batch_size=BATCH_SIZE): rows = zip(*(col.to_pylist() for col in batch.columns)) cursor.bulkcopy( target, rows, batch_size=BATCH_SIZE, table_lock=True, timeout=3600, ) uploaded += batch.num_rows elapsed = time.perf_counter() - t0 # ── Verify ── with conn.cursor() as cursor: cursor.execute(f"SELECT COUNT(*) FROM {target}") count = cursor.fetchone()[0] rate = f"{int(uploaded / elapsed):,} rows/sec" if elapsed > 0 else "n/a" print( f"{parquet_file} → {target}: {uploaded:,} rows uploaded " f"in {elapsed:.2f}s ({rate}) " f"| verified: {count:,}" ) return uploadedAdd the orchestration function.
transfer_tablesties the three phases together. It connects to the source database, discovers all base tables in the given schema viaINFORMATION_SCHEMA.TABLES, downloads each one to a local Parquet file, runs the enrichment hook, then connects to the destination database and uploads each file.def transfer_tables( source_conn_str: str, dest_conn_str: str, source_schema: str, dest_schema: str, ) -> None: """Download all tables from source DB/schema to parquet, upload to dest DB/schema.""" _validate_ident(source_schema) _validate_ident(dest_schema) parquet_dir = source_schema os.makedirs(parquet_dir, exist_ok=True) # ── Download from source ── with mssql_python.connect(source_conn_str) as src_conn: with src_conn.cursor() as cursor: cursor.execute( "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES " "WHERE TABLE_SCHEMA = ? AND TABLE_TYPE = 'BASE TABLE' " "ORDER BY TABLE_NAME", (source_schema,), ) tables = [row[0] for row in cursor.fetchall()] print(f"Found {len(tables)} {source_schema} tables: {', '.join(tables)}\n") parquet_files = [] for table_name in tables: parquet_file = os.path.join(parquet_dir, f"{table_name}.parquet") row_count = download_table(src_conn, source_schema, table_name, parquet_file) if row_count == 0: print(f"{source_schema}.{table_name}: empty, skipping") else: parquet_files.append((table_name, parquet_file)) # ── Enrich parquet files ── enriched = [] for table_name, parquet_file in parquet_files: enriched.append((table_name, enrich_parquet(parquet_file))) # ── Upload to destination ── with mssql_python.connect(dest_conn_str) as dest_conn: for table_name, parquet_file in enriched: target = f"{dest_schema}.[{table_name}]" upload_parquet(dest_conn, parquet_file, target)Finally, add the
mainentry point. It loads the.envfile, callstransfer_tableswith the source and destination connection strings, and prints the total elapsed time.def main(): load_dotenv() t_start = time.perf_counter() transfer_tables( source_conn_str=os.environ["SOURCE_CONNECTION_STRING"], dest_conn_str=os.environ["DEST_CONNECTION_STRING"], source_schema="SalesLT", dest_schema="dbo", ) print(f"Total: {time.perf_counter() - t_start:.2f}s") if __name__ == "__main__": main()Save and close
main.py.
Save the connection strings
Open the
.gitignorefile and add an exclusion for.envfiles. Your file should be similar to this example. Be sure to save and close it when you're done.# Python-generated files __pycache__/ *.py[oc] build/ dist/ wheels/ *.egg-info # Virtual environments .venv # Connection strings and secrets .envIn the current directory, create a new file named
.env.Within the
.envfile, add entries for your source and destination connection strings. Replace the placeholder values with your actual server and database names.SOURCE_CONNECTION_STRING="Server=<source_server_name>;Database=<source_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive" DEST_CONNECTION_STRING="Server=<dest_server_name>;Database=<dest_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive"Tip
The connection string used here largely depends on the type of SQL database you're connecting to. If you're connecting to an Azure SQL Database or a SQL database in Fabric, use the ODBC connection string from the connection strings tab. You might need to adjust the authentication type depending on your scenario. For more information on connection strings and their syntax, see connection string syntax reference.
Tip
On macOS, both ActiveDirectoryInteractive and ActiveDirectoryDefault work for Microsoft Entra authentication. ActiveDirectoryInteractive prompts you to sign in every time you run the script. To avoid repeated sign-in prompts, sign in once via the Azure CLI by running az login, then use ActiveDirectoryDefault, which reuses the cached credential.
Use uv run to execute the script
In the terminal window from before, or a new terminal window open to the same directory, run the following command.
uv run main.pyHere's the expected output when the script completes.
Found 12 SalesLT tables: Address, Customer, CustomerAddress, ... SalesLT.Address → SalesLT/Address.parquet: 450 rows downloaded in 0.15s (3,000 rows/sec) ... SalesLT/Address.parquet → dbo.[Address]: 450 rows uploaded in 0.10s (4,500 rows/sec) | verified: 450 ... Total: 2.35sConnect to the destination database using SQL Server Management Studio (SSMS) or the MSSQL extension for VS Code and verify that the tables and data were created successfully.
To deploy your script to another machine, copy all files except for the
.venvfolder to the other machine. The virtual environment is recreated with the first run.
How the code works
The application performs a full round-trip data transfer in three phases:
- Download: Connects to the source database, reads column metadata from
INFORMATION_SCHEMA.COLUMNS, builds an Apache Arrow schema, then downloads each table into a local Parquet file. - Enrich (optional): Provides a hook (
enrich_parquet) where you can add transformations, derived columns, or joins before uploading. - Upload: Reads each Parquet file in batches, recreates the table in the destination database using DDL generated from Arrow schema metadata, then uses
cursor.bulkcopy()for high-performance bulk insert.
Next step
Visit the mssql-python driver GitHub repository for more examples, to contribute ideas or report issues.