Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
В этом кратком руководстве вы используете mssql-python драйвер для пакетного копирования данных между базами данных. Приложение загружает таблицы из схемы исходной базы данных в локальные файлы Parquet с помощью Apache Arrow, а затем отправляет их в целевую базу данных с помощью высокопроизводительного bulkcopy метода. Этот шаблон можно использовать для переноса, репликации или преобразования данных между SQL Server, базой данных SQL Azure и базой данных SQL в Fabric.
Драйвер mssql-python не требует внешних зависимостей на компьютерах Windows. Драйвер устанавливает все, что требуется с одной pip установкой, что позволяет использовать самую последнюю версию драйвера в новых сценариях без нарушения работы других сценариев, для обновления и тестирования которых у вас нет времени.
Документация | Исходный код mssql-python | Пакет (PyPI) | uv
Необходимые условия
Python 3
Если у вас еще нет Python, установите среду выполнения Python и диспетчер пакетов pip из python.org.
Не хотите использовать собственную среду? Откройте в качестве контейнера для разработки с помощью GitHub Codespaces.
Visual Studio Code со следующими расширениями:
Интерфейс azure Command-Line (CLI) для проверки подлинности без пароля в macOS и Linux.
Если у вас еще нет
uv, следуйте инструкциям по установке.Исходная база данных в SQL Server, базе данных SQL Azure или базе данных SQL в Fabric с примером
AdventureWorks2025схемы и допустимой строкой подключения.Целевая база данных в SQL Server, базе данных SQL Azure или базе данных SQL в Fabric с допустимой строкой подключения. Пользователь должен иметь разрешение на создание и запись в таблицы. Если у вас нет второй базы данных, можно изменить строку подключения назначения, чтобы указать на ту же базу данных и использовать другую схему для целевых таблиц.
Установите единовременные предварительные условия для операционной системы.
apk add libtool krb5-libs krb5-dev
Создание базы данных SQL
В этом кратком руководстве требуется упрощенная схема AdventureWorks2025 в качестве исходной базы данных.
Создание проекта и запуск кода
- Создание нового проекта
- Добавление зависимостей
- Запуск Visual Studio Code
- Обновление pyproject.toml
- Обновление main.py
- Сохранение строк подключения
- Использование uv run для выполнения скрипта
Создание нового проекта
Откройте командную строку в каталоге разработки. Если у вас нет одного, создайте новый каталог с именем
python,scriptsи т. д. Избегайте папок в OneDrive, синхронизация может препятствовать управлению виртуальной средой.Создайте новый проект с
uv.uv init mssql-python-bcp-qs cd mssql-python-bcp-qs
Добавление зависимостей
В том же каталоге установите mssql-pythonи python-dotenvpyarrow пакеты.
uv add mssql-python python-dotenv pyarrow
Запустите Visual Studio Code.
В том же каталоге выполните следующую команду.
code .
Обновление pyproject.toml
Pyproject.toml содержит метаданные проекта. Откройте файл в избранном редакторе.
Просмотрите содержимое файла. Он должен быть похож на этот пример. Обратите внимание на версию Python и зависимость:
mssql-pythonиспользует>=для определения минимальной версии. Если вы предпочитаете точную версию, измените>=на==перед номером версии. Затем разрешенные версии каждого пакета хранятся в uv.lock. Lockfile гарантирует, что разработчики, работающие над проектом, используют согласованные версии пакетов. Он также гарантирует, что при распространении пакета пользователям используется тот же набор версий пакетов. Не следует изменятьuv.lockфайл.[project] name = "mssql-python-bcp-qs" version = "0.1.0" description = "Add your description here" readme = "README.md" requires-python = ">=3.11" dependencies = [ "mssql-python>=1.4.0", "python-dotenv>=1.1.1", "pyarrow>=19.0.0", ]Обновите описание, чтобы быть более описательным.
description = "Bulk copies data between SQL databases using mssql-python and Apache Arrow"Сохраните и закройте файл.
Обновление main.py
Откройте файл с именем
main.py. Он должен быть похож на этот пример.def main(): print("Hello from mssql-python-bcp-qs!") if __name__ == "__main__": main()Замените содержимое
main.pyследующими блоками кода. Каждый блок строится на предыдущем и должен быть размещен вmain.pyпоследовательно.Подсказка
Если в Visual Studio Code возникли проблемы с разрешением пакетов, необходимо обновить интерпретатор для использования виртуальной среды.
В верхней части
main.pyдобавьте импорт и константы. Скрипт используетmssql_pythonдля подключения к базе данных,pyarrowиpyarrow.parquetдля обработки данных в колонках и ввода-вывода файлов Parquet,python-dotenvдля загрузки строк подключения из.envфайла, а также скомпилированный шаблон regex, который проверяет идентификаторы SQL, чтобы предотвратить внедрение."""Round-trip: download tables from a source DB/schema to parquet, upload to a destination DB/schema.""" import os import re import time from uuid import UUID import pyarrow as pa import pyarrow.parquet as pq from dotenv import load_dotenv import mssql_python BATCH_SIZE = 64_000 _SAFE_IDENT = re.compile(r"^[A-Za-z0-9_]+$") def _validate_ident(name: str) -> str: if not _SAFE_IDENT.match(name): raise ValueError(f"Unsafe SQL identifier: {name!r}") return nameНиже импортов добавьте сопоставление типов SQL-to-Arrow. Этот словарь преобразует типы столбцов SQL Server в эквивалентные типы Apache Arrow, чтобы сохранялась точность данных при записи в Parquet. Две вспомогательные функции создают точные строки типа SQL (например,
NVARCHAR(100)илиDECIMAL(18,2)) изINFORMATION_SCHEMAметаданных и разрешают соответствующий тип стрелки для каждого столбца._SQL_TO_ARROW = { "bit": pa.bool_(), "tinyint": pa.uint8(), "smallint": pa.int16(), "int": pa.int32(), "bigint": pa.int64(), "float": pa.float64(), "real": pa.float32(), "smallmoney": pa.decimal128(10, 4), "money": pa.decimal128(19, 4), "date": pa.date32(), "datetime": pa.timestamp("ms"), "datetime2": pa.timestamp("us"), "smalldatetime": pa.timestamp("s"), "uniqueidentifier": pa.string(), "xml": pa.string(), "image": pa.binary(), "binary": pa.binary(), "varbinary": pa.binary(), "timestamp": pa.binary(), } def _sql_type_str(data_type: str, max_length: int, precision: int, scale: int) -> str: """Build the exact SQL type string from INFORMATION_SCHEMA metadata.""" dt = data_type.lower() if dt in ("char", "varchar", "nchar", "nvarchar", "binary", "varbinary"): length = "MAX" if max_length == -1 else str(max_length) return f"{dt.upper()}({length})" if dt in ("decimal", "numeric"): return f"{dt.upper()}({precision},{scale})" return dt.upper() def _arrow_type(sql_type: str, precision: int, scale: int) -> pa.DataType: sql_type = sql_type.lower() if sql_type in _SQL_TO_ARROW: return _SQL_TO_ARROW[sql_type] if sql_type in ("decimal", "numeric"): return pa.decimal128(precision, scale) if sql_type in ("char", "varchar", "nchar", "nvarchar", "text", "ntext", "sysname"): return pa.string() return pa.string() def _convert_value(v): """Convert a SQL value to an Arrow-compatible Python type.""" if isinstance(v, UUID): return str(v) return vДобавьте функции интроспекции схемы и генерации DDL.
_get_arrow_schemaзапросыINFORMATION_SCHEMA.COLUMNSс использованием параметризованных запросов, создание схемы Arrow и сохранение исходного типа SQL в качестве метаданных поля, чтобы целевая таблица могла быть восстановлена с точными определениями столбцов._create_table_ddlсчитывает эти метаданные обратно для созданияDROP/CREATE TABLEDDL. Типtimestamp(rowversion) переконвертируется вVARBINARY(8), так как он создается автоматически и не может быть вставлен напрямую.def _get_arrow_schema(cursor, schema_name: str, table_name: str) -> pa.Schema: """Build an Arrow schema from INFORMATION_SCHEMA.COLUMNS. Stores the original SQL type as field metadata so the round-trip CREATE TABLE can reproduce exact column definitions. """ cursor.execute( "SELECT COLUMN_NAME, DATA_TYPE, " "COALESCE(CHARACTER_MAXIMUM_LENGTH, 0), " "COALESCE(NUMERIC_PRECISION, 0), " "COALESCE(NUMERIC_SCALE, 0), " "IS_NULLABLE " "FROM INFORMATION_SCHEMA.COLUMNS " "WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? " "ORDER BY ORDINAL_POSITION", (schema_name, table_name), ) rows = cursor.fetchall() if not rows: raise ValueError(f"No columns found for {schema_name}.{table_name}") fields = [] for col_name, data_type, max_len, precision, scale, nullable in rows: arrow_t = _arrow_type(data_type, precision, scale) sql_t = _sql_type_str(data_type, max_len, precision, scale) fields.append( pa.field( col_name, arrow_t, nullable=(nullable == "YES"), metadata={"sql_type": sql_t}, ) ) return pa.schema(fields) def _create_table_ddl(target: str, schema: pa.Schema) -> str: """Build DROP/CREATE TABLE DDL from Arrow schema with SQL type metadata.""" col_defs = [] for f in schema: sql_t = f.metadata[b"sql_type"].decode() # timestamp/rowversion is auto-generated and not insertable if sql_t == "TIMESTAMP": sql_t = "VARBINARY(8)" null = "" if f.nullable else " NOT NULL" col_defs.append(f"[{f.name}] {sql_t}{null}") col_defs_str = ",\n ".join(col_defs) return ( f"IF OBJECT_ID('{target}', 'U') IS NOT NULL DROP TABLE {target};\n" f"CREATE TABLE {target} (\n {col_defs_str}\n);" )Добавьте функцию скачивания.
download_tableпоток строк из исходной таблицы обрабатывает пакетами с помощьюfetchmany(), преобразует каждое значение в тип Python, совместимый с Arrow, и записывает пакеты записей постепенно в локальный файл Parquet с помощьюParquetWriter. При этом память ограничивается независимо от размера таблицы. Функция использует два отдельных курсора: один для чтения метаданных столбца, а другой — для потоковой передачи данных.def download_table(conn, schema_name: str, table_name: str, parquet_file: str) -> int: """Download a SQL table to a parquet file. Returns row count (0 if empty).""" _validate_ident(table_name) source = f"{schema_name}.[{table_name}]" with conn.cursor() as cursor: schema = _get_arrow_schema(cursor, schema_name, table_name) n_cols = len(schema) row_count = 0 t0 = time.perf_counter() with conn.cursor() as cursor: cursor.execute(f"SELECT * FROM {source}") writer = None try: while True: rows = cursor.fetchmany(BATCH_SIZE) if not rows: break columns = [[] for _ in range(n_cols)] for row in rows: for i in range(n_cols): columns[i].append(_convert_value(row[i])) arrays = [ pa.array(columns[i], type=schema.field(i).type) for i in range(n_cols) ] batch = pa.record_batch(arrays, schema=schema) if writer is None: writer = pq.ParquetWriter(parquet_file, schema) writer.write_batch(batch) row_count += len(rows) finally: if writer is not None: writer.close() if row_count == 0: return 0 elapsed = time.perf_counter() - t0 rate = f"{int(row_count / elapsed):,} rows/sec" if elapsed > 0 else "n/a" print( f"{schema_name}.{table_name} → {parquet_file}: {row_count:,} rows downloaded " f"in {elapsed:.2f}s ({rate})" ) return row_countДобавьте крючок обогащения.
enrich_parquet— это заполнитель, в котором можно добавлять преобразования, производные столбцы или объединение данных перед отправкой. В этом кратком руководстве это операция без действий, которая возвращает путь к файлу без изменений.def enrich_parquet(parquet_file: str) -> str: """Enrich a parquet file before upload. Returns the (possibly new) file path.""" # TODO: add transformations, derived columns, joins, etc. print(f"Enriching {parquet_file} (no-op)") return parquet_fileДобавьте функцию отправки.
upload_parquetсчитывает схему Arrow из файла Parquet, создает и выполняетDROP/CREATE TABLEDDL для подготовки назначения, затем считывает файл пакетами и вызываетcursor.bulkcopy()для высокопроизводительной массовой вставки. Этотtable_lock=Trueпараметр повышает пропускную способность путем минимизации конфликтов блокировки. После завершения загрузки функция запускаетSELECT COUNT(*)для проверки совпадения подсчёта строк.def upload_parquet(conn, parquet_file: str, target: str) -> int: """Upload a parquet file into a SQL table via BCP. Returns row count.""" # ── Create target table from parquet schema ── pf_schema = pq.read_schema(parquet_file) with conn.cursor() as cursor: cursor.execute(_create_table_ddl(target, pf_schema)) conn.commit() # ── Bulk insert ── uploaded = 0 t0 = time.perf_counter() with pq.ParquetFile(parquet_file) as pf: with conn.cursor() as cursor: for batch in pf.iter_batches(batch_size=BATCH_SIZE): rows = zip(*(col.to_pylist() for col in batch.columns)) cursor.bulkcopy( target, rows, batch_size=BATCH_SIZE, table_lock=True, timeout=3600, ) uploaded += batch.num_rows elapsed = time.perf_counter() - t0 # ── Verify ── with conn.cursor() as cursor: cursor.execute(f"SELECT COUNT(*) FROM {target}") count = cursor.fetchone()[0] rate = f"{int(uploaded / elapsed):,} rows/sec" if elapsed > 0 else "n/a" print( f"{parquet_file} → {target}: {uploaded:,} rows uploaded " f"in {elapsed:.2f}s ({rate}) " f"| verified: {count:,}" ) return uploadedДобавьте функцию оркестрации.
transfer_tablesсвязывает три этапа вместе. Он подключается к исходной базе данных, обнаруживает все базовые таблицы в данной схеме черезINFORMATION_SCHEMA.TABLES, загружает каждую из них в локальный файл Parquet, запускает модуль обогащения, после чего подключается к целевой базе данных и отправляет каждый файл.def transfer_tables( source_conn_str: str, dest_conn_str: str, source_schema: str, dest_schema: str, ) -> None: """Download all tables from source DB/schema to parquet, upload to dest DB/schema.""" _validate_ident(source_schema) _validate_ident(dest_schema) parquet_dir = source_schema os.makedirs(parquet_dir, exist_ok=True) # ── Download from source ── with mssql_python.connect(source_conn_str) as src_conn: with src_conn.cursor() as cursor: cursor.execute( "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES " "WHERE TABLE_SCHEMA = ? AND TABLE_TYPE = 'BASE TABLE' " "ORDER BY TABLE_NAME", (source_schema,), ) tables = [row[0] for row in cursor.fetchall()] print(f"Found {len(tables)} {source_schema} tables: {', '.join(tables)}\n") parquet_files = [] for table_name in tables: parquet_file = os.path.join(parquet_dir, f"{table_name}.parquet") row_count = download_table(src_conn, source_schema, table_name, parquet_file) if row_count == 0: print(f"{source_schema}.{table_name}: empty, skipping") else: parquet_files.append((table_name, parquet_file)) # ── Enrich parquet files ── enriched = [] for table_name, parquet_file in parquet_files: enriched.append((table_name, enrich_parquet(parquet_file))) # ── Upload to destination ── with mssql_python.connect(dest_conn_str) as dest_conn: for table_name, parquet_file in enriched: target = f"{dest_schema}.[{table_name}]" upload_parquet(dest_conn, parquet_file, target)Наконец, добавьте точку
mainвхода. Он загружает.envфайл, вызываетtransfer_tablesс строками подключения источника и назначения и выводит общее время выполнения.def main(): load_dotenv() t_start = time.perf_counter() transfer_tables( source_conn_str=os.environ["SOURCE_CONNECTION_STRING"], dest_conn_str=os.environ["DEST_CONNECTION_STRING"], source_schema="SalesLT", dest_schema="dbo", ) print(f"Total: {time.perf_counter() - t_start:.2f}s") if __name__ == "__main__": main()Сохраните и закройте
main.py.
Сохранение строк подключения
.gitignoreОткройте файл и добавьте исключение для.envфайлов. Файл должен быть похож на этот пример. Не забудьте сохранить и закрыть его после завершения.# Python-generated files __pycache__/ *.py[oc] build/ dist/ wheels/ *.egg-info # Virtual environments .venv # Connection strings and secrets .envВ текущем каталоге создайте новый файл с именем
.env.В файле
.envдобавьте записи для строк подключения источника и назначения. Замените значения заполнителей фактическими именами сервера и базы данных.SOURCE_CONNECTION_STRING="Server=<source_server_name>;Database=<source_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive" DEST_CONNECTION_STRING="Server=<dest_server_name>;Database=<dest_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive"Подсказка
Строка подключения, используемая здесь, в значительной степени зависит от типа базы данных SQL, к которой вы подключаетесь. Если вы подключаетесь к базе данных SQL Azure или базе данных SQL в Fabric, используйте строку подключения ODBC на вкладке строк подключения. Возможно, вам потребуется настроить тип проверки подлинности в зависимости от вашего сценария. Дополнительные сведения о строках подключения и их синтаксисе см. в справочнике по синтаксису строки подключения.
Подсказка
На macOS, оба ActiveDirectoryInteractive и ActiveDirectoryDefault работают для проверки подлинности Microsoft Entra.
ActiveDirectoryInteractive запрашивает вход при каждом запуске скрипта. Чтобы избежать повторяющихся запросов на вход, выполните вход один раз с помощью Azure CLIaz login, а затем используйте ActiveDirectoryDefault, который повторно использует кэшированные учетные данные.
Использование uv run для выполнения скрипта
В окне терминала до или в новом окне терминала, открываемом в том же каталоге, выполните следующую команду.
uv run main.pyНиже приведены ожидаемые выходные данные при завершении скрипта.
Found 12 SalesLT tables: Address, Customer, CustomerAddress, ... SalesLT.Address → SalesLT/Address.parquet: 450 rows downloaded in 0.15s (3,000 rows/sec) ... SalesLT/Address.parquet → dbo.[Address]: 450 rows uploaded in 0.10s (4,500 rows/sec) | verified: 450 ... Total: 2.35sПодключитесь к целевой базе данных с помощью SQL Server Management Studio (SSMS) или расширения MSSQL для VS Code и убедитесь, что таблицы и данные были созданы успешно.
Чтобы развернуть скрипт на другом компьютере, скопируйте все файлы, кроме
.venvпапки на другой компьютер. Виртуальная среда повторно создается при первом запуске.
Как работает код
Приложение выполняет полную передачу данных по пути в три этапа:
-
Скачать: подключается к исходной базе данных, считывает метаданные столбца из
INFORMATION_SCHEMA.COLUMNS, создает схему Apache Arrow, а затем загружает каждую таблицу в локальный файл Parquet. -
Обогащение (необязательно): предоставляет точку расширения (
enrich_parquet), где можно добавлять преобразования, производные столбцы и объединения перед отправкой. -
Отправка: считывает каждый файл Parquet пакетами, воссоздает таблицу в целевой базе данных с использованием DDL, созданной из метаданных схемы Apache Arrow, а затем использует
cursor.bulkcopy()для высокой производительности массовой вставки.
Следующий шаг
Посетите репозиторий mssql-python драйверов GitHub, чтобы получить дополнительные примеры, чтобы внести идеи или сообщить о проблемах.