Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
Neste guia de início rápido, utilizarás o driver mssql-python para copiar dados em massa entre bases de dados. A aplicação descarrega tabelas de um esquema de base de dados de origem para ficheiros Parquet locais usando o Apache Arrow, e depois carrega-as para uma base de dados de destino usando o método de alto desempenho bulkcopy . Pode usar este padrão para migrar, replicar ou transformar dados entre SQL Server, Azure SQL Database e SQL Database no Fabric.
O mssql-python driver não requer nenhuma dependência externa em máquinas Windows. O driver instala tudo o que precisa com uma única pip instalação, permitindo que você use a versão mais recente do driver para novos scripts sem quebrar outros scripts que você não tem tempo para atualizar e testar.
Documentação | Código-fonte | Pacote (PyPI) | UV
Pré-requisitos
Python 3
Se ainda não tens o Python, instala o runtime do Python e o gestor de pacotes pip a partir de python.org.
Não quer usar o seu próprio ambiente? Abra como um devcontainer usando o GitHub Codespaces.
Visual Studio Code com as seguintes extensões:
Azure Command-Line Interface (CLI) para autenticação sem palavra-passe no macOS e Linux.
Se ainda não tiver
uv, siga as instruções de instalação.Uma base de dados de origem no SQL Server, Azure SQL Database ou base de dados SQL em Fabric com o
AdventureWorks2025esquema de exemplo e uma cadeia de ligação válida.Uma base de dados de destino no SQL Server, Azure SQL Database ou base de dados SQL em Fabric com uma string de ligação válida. O utilizador deve ter permissão para criar e escrever em tabelas. Se não tiveres uma segunda base de dados, podes alterar a cadeia de ligação de destino para apontar para a mesma base de dados e usar um esquema diferente para as tabelas de destino.
Instale pré-requisitos que devem ser instalados uma única vez específicos do sistema operacional.
Criar um banco de dados SQL
Este início rápido requer o esquema AdventureWorks2025 Lightweight como base de dados de origem.
Crie o projeto e execute o código
- Criar um novo projeto
- Adicionar dependências
- Iniciar o Visual Studio Code
- Atualizar pyproject.toml
- Atualizar main.py
- Guardar as cadeias de ligação
- Use uv run para executar o script
Criar um novo projeto
Abra um prompt de comando no diretório de desenvolvimento. Se você não tiver um, crie um novo diretório chamado
python,scripts, etc. Evite pastas no seu OneDrive, a sincronização pode interferir no gerenciamento do seu ambiente virtual.Crie um novo projeto com
uv.uv init mssql-python-bcp-qs cd mssql-python-bcp-qs
Adicionar dependências
No mesmo diretório, instale os pacotes mssql-python, python-dotenv e pyarrow.
uv add mssql-python python-dotenv pyarrow
Abra o Visual Studio Code.
No mesmo diretório, execute o seguinte comando.
code .
Atualizar pyproject.toml
O pyproject.toml contém os metadados para o seu projeto. Abra o arquivo em seu editor favorito.
Revise o conteúdo do arquivo. Deve ser semelhante a este exemplo. Nota a versão do Python e a dependência para
mssql-pythonusa>=para definir uma versão mínima. Se preferir uma versão exata, altere o>=para==antes do número da versão. As versões resolvidas de cada pacote são então armazenadas no uv.lock. O arquivo de bloqueio garante que os desenvolvedores que trabalham no projeto estejam usando versões de pacote consistentes. Ele também garante que o mesmo conjunto exato de versões de pacote seja usado ao distribuir seu pacote para usuários finais. Você não deve editar ouv.lockarquivo.[project] name = "mssql-python-bcp-qs" version = "0.1.0" description = "Add your description here" readme = "README.md" requires-python = ">=3.11" dependencies = [ "mssql-python>=1.4.0", "python-dotenv>=1.1.1", "pyarrow>=19.0.0", ]Atualize a descrição para ser mais descritiva.
description = "Bulk copies data between SQL databases using mssql-python and Apache Arrow"Salve e feche o arquivo.
Atualizar main.py
Abra o arquivo chamado
main.py. Deve ser semelhante a este exemplo.def main(): print("Hello from mssql-python-bcp-qs!") if __name__ == "__main__": main()Substitua o conteúdo de
main.pypelos seguintes blocos de código. Cada bloco constrói-se sobre o anterior e deve ser colocadomain.pyna ordem correta.Sugestão
Se o Visual Studio Code estiver tendo problemas para resolver pacotes, você precisará atualizar o intérprete para usar o ambiente virtual.
No topo de
main.py, adicione as importações e as constantes. O script utilizamssql_pythonpara conectividade a bases de dados,pyarrowepyarrow.parquetpara o tratamento de dados colunares e operações de I/O de ficheiros Parquet,python-dotenvpara carregar cadeias de ligação a partir de um ficheiro.env, e um padrão de expressão regular compilado que valida identificadores de SQL para evitar a injeção de SQL."""Round-trip: download tables from a source DB/schema to parquet, upload to a destination DB/schema.""" import os import re import time from uuid import UUID import pyarrow as pa import pyarrow.parquet as pq from dotenv import load_dotenv import mssql_python BATCH_SIZE = 64_000 _SAFE_IDENT = re.compile(r"^[A-Za-z0-9_]+$") def _validate_ident(name: str) -> str: if not _SAFE_IDENT.match(name): raise ValueError(f"Unsafe SQL identifier: {name!r}") return namePor baixo das importações, adicione o mapeamento do tipo SQL-para-Arrow. Este dicionário traduz os tipos de colunas do SQL Server para os seus equivalentes no Apache Arrow, de modo a preservar a fidelidade dos dados ao escrever no Parquet. As duas funções auxiliares constroem cadeias exatas de tipos SQL (por exemplo,
NVARCHAR(100)ouDECIMAL(18,2)) a partir deINFORMATION_SCHEMAmetadados e resolvem o tipo de seta correspondente para cada coluna._SQL_TO_ARROW = { "bit": pa.bool_(), "tinyint": pa.uint8(), "smallint": pa.int16(), "int": pa.int32(), "bigint": pa.int64(), "float": pa.float64(), "real": pa.float32(), "smallmoney": pa.decimal128(10, 4), "money": pa.decimal128(19, 4), "date": pa.date32(), "datetime": pa.timestamp("ms"), "datetime2": pa.timestamp("us"), "smalldatetime": pa.timestamp("s"), "uniqueidentifier": pa.string(), "xml": pa.string(), "image": pa.binary(), "binary": pa.binary(), "varbinary": pa.binary(), "timestamp": pa.binary(), } def _sql_type_str(data_type: str, max_length: int, precision: int, scale: int) -> str: """Build the exact SQL type string from INFORMATION_SCHEMA metadata.""" dt = data_type.lower() if dt in ("char", "varchar", "nchar", "nvarchar", "binary", "varbinary"): length = "MAX" if max_length == -1 else str(max_length) return f"{dt.upper()}({length})" if dt in ("decimal", "numeric"): return f"{dt.upper()}({precision},{scale})" return dt.upper() def _arrow_type(sql_type: str, precision: int, scale: int) -> pa.DataType: sql_type = sql_type.lower() if sql_type in _SQL_TO_ARROW: return _SQL_TO_ARROW[sql_type] if sql_type in ("decimal", "numeric"): return pa.decimal128(precision, scale) if sql_type in ("char", "varchar", "nchar", "nvarchar", "text", "ntext", "sysname"): return pa.string() return pa.string() def _convert_value(v): """Convert a SQL value to an Arrow-compatible Python type.""" if isinstance(v, UUID): return str(v) return vAdiciona as funções de introspeção de esquemas e geração DDL.
_get_arrow_schemaconsultasINFORMATION_SCHEMA.COLUMNSusando consultas parametrizadas, constrói um esquema Arrow e armazena o tipo SQL original como metadados de campo para que a tabela de destino possa ser recriada com definições exatas de colunas._create_table_ddllê esses metadados para gerarDROP/CREATE TABLEDDL. O tipotimestamp(rowversion) é remapeado paraVARBINARY(8)porque é gerado automaticamente e não é inserido diretamente.def _get_arrow_schema(cursor, schema_name: str, table_name: str) -> pa.Schema: """Build an Arrow schema from INFORMATION_SCHEMA.COLUMNS. Stores the original SQL type as field metadata so the round-trip CREATE TABLE can reproduce exact column definitions. """ cursor.execute( "SELECT COLUMN_NAME, DATA_TYPE, " "COALESCE(CHARACTER_MAXIMUM_LENGTH, 0), " "COALESCE(NUMERIC_PRECISION, 0), " "COALESCE(NUMERIC_SCALE, 0), " "IS_NULLABLE " "FROM INFORMATION_SCHEMA.COLUMNS " "WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? " "ORDER BY ORDINAL_POSITION", (schema_name, table_name), ) rows = cursor.fetchall() if not rows: raise ValueError(f"No columns found for {schema_name}.{table_name}") fields = [] for col_name, data_type, max_len, precision, scale, nullable in rows: arrow_t = _arrow_type(data_type, precision, scale) sql_t = _sql_type_str(data_type, max_len, precision, scale) fields.append( pa.field( col_name, arrow_t, nullable=(nullable == "YES"), metadata={"sql_type": sql_t}, ) ) return pa.schema(fields) def _create_table_ddl(target: str, schema: pa.Schema) -> str: """Build DROP/CREATE TABLE DDL from Arrow schema with SQL type metadata.""" col_defs = [] for f in schema: sql_t = f.metadata[b"sql_type"].decode() # timestamp/rowversion is auto-generated and not insertable if sql_t == "TIMESTAMP": sql_t = "VARBINARY(8)" null = "" if f.nullable else " NOT NULL" col_defs.append(f"[{f.name}] {sql_t}{null}") col_defs_str = ",\n ".join(col_defs) return ( f"IF OBJECT_ID('{target}', 'U') IS NOT NULL DROP TABLE {target};\n" f"CREATE TABLE {target} (\n {col_defs_str}\n);" )Adiciona a função de download.
download_tabletransmite linhas de uma tabela de origem em lotes usandofetchmany(), converte cada valor para um tipo Python compatível com Arrow e escreve lotes de registos incrementalmente para um ficheiro Parquet local usandoParquetWriter. Isto mantém a memória limitada independentemente do tamanho da tabela. A função utiliza dois cursores separados: um para ler metadados das colunas e outro para transmitir os dados.def download_table(conn, schema_name: str, table_name: str, parquet_file: str) -> int: """Download a SQL table to a parquet file. Returns row count (0 if empty).""" _validate_ident(table_name) source = f"{schema_name}.[{table_name}]" with conn.cursor() as cursor: schema = _get_arrow_schema(cursor, schema_name, table_name) n_cols = len(schema) row_count = 0 t0 = time.perf_counter() with conn.cursor() as cursor: cursor.execute(f"SELECT * FROM {source}") writer = None try: while True: rows = cursor.fetchmany(BATCH_SIZE) if not rows: break columns = [[] for _ in range(n_cols)] for row in rows: for i in range(n_cols): columns[i].append(_convert_value(row[i])) arrays = [ pa.array(columns[i], type=schema.field(i).type) for i in range(n_cols) ] batch = pa.record_batch(arrays, schema=schema) if writer is None: writer = pq.ParquetWriter(parquet_file, schema) writer.write_batch(batch) row_count += len(rows) finally: if writer is not None: writer.close() if row_count == 0: return 0 elapsed = time.perf_counter() - t0 rate = f"{int(row_count / elapsed):,} rows/sec" if elapsed > 0 else "n/a" print( f"{schema_name}.{table_name} → {parquet_file}: {row_count:,} rows downloaded " f"in {elapsed:.2f}s ({rate})" ) return row_countAdicione o gancho de enriquecimento.
enrich_parqueté um marcador onde podes adicionar transformações, colunas derivadas ou junções aos dados antes de serem carregados. Neste quickstart, consiste numa operação nula que devolve o caminho do ficheiro inalterado.def enrich_parquet(parquet_file: str) -> str: """Enrich a parquet file before upload. Returns the (possibly new) file path.""" # TODO: add transformations, derived columns, joins, etc. print(f"Enriching {parquet_file} (no-op)") return parquet_fileAdiciona a função de upload.
upload_parquetlê o esquema Arrow do ficheiro Parquet, gera e executaDROP/CREATE TABLEDDL para preparar o destino, após lê o ficheiro em lotes e chamacursor.bulkcopy()para uma inserção em massa de alto desempenho. Atable_lock=Trueopção melhora o rendimento ao minimizar a contenção de bloqueios. Depois de concluído o upload, a função é executadaSELECT COUNT(*)para verificar se a contagem de linhas corresponde.def upload_parquet(conn, parquet_file: str, target: str) -> int: """Upload a parquet file into a SQL table via BCP. Returns row count.""" # ── Create target table from parquet schema ── pf_schema = pq.read_schema(parquet_file) with conn.cursor() as cursor: cursor.execute(_create_table_ddl(target, pf_schema)) conn.commit() # ── Bulk insert ── uploaded = 0 t0 = time.perf_counter() with pq.ParquetFile(parquet_file) as pf: with conn.cursor() as cursor: for batch in pf.iter_batches(batch_size=BATCH_SIZE): rows = zip(*(col.to_pylist() for col in batch.columns)) cursor.bulkcopy( target, rows, batch_size=BATCH_SIZE, table_lock=True, timeout=3600, ) uploaded += batch.num_rows elapsed = time.perf_counter() - t0 # ── Verify ── with conn.cursor() as cursor: cursor.execute(f"SELECT COUNT(*) FROM {target}") count = cursor.fetchone()[0] rate = f"{int(uploaded / elapsed):,} rows/sec" if elapsed > 0 else "n/a" print( f"{parquet_file} → {target}: {uploaded:,} rows uploaded " f"in {elapsed:.2f}s ({rate}) " f"| verified: {count:,}" ) return uploadedAdiciona a função de orquestração.
transfer_tablesLiga as três fases. Liga-se à base de dados de origem, descobre todas as tabelas base no esquema dado através deINFORMATION_SCHEMA.TABLES, descarrega cada uma para um ficheiro Parquet local, executa o gancho de enriquecimento, depois liga-se à base de dados de destino e carrega cada ficheiro.def transfer_tables( source_conn_str: str, dest_conn_str: str, source_schema: str, dest_schema: str, ) -> None: """Download all tables from source DB/schema to parquet, upload to dest DB/schema.""" _validate_ident(source_schema) _validate_ident(dest_schema) parquet_dir = source_schema os.makedirs(parquet_dir, exist_ok=True) # ── Download from source ── with mssql_python.connect(source_conn_str) as src_conn: with src_conn.cursor() as cursor: cursor.execute( "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES " "WHERE TABLE_SCHEMA = ? AND TABLE_TYPE = 'BASE TABLE' " "ORDER BY TABLE_NAME", (source_schema,), ) tables = [row[0] for row in cursor.fetchall()] print(f"Found {len(tables)} {source_schema} tables: {', '.join(tables)}\n") parquet_files = [] for table_name in tables: parquet_file = os.path.join(parquet_dir, f"{table_name}.parquet") row_count = download_table(src_conn, source_schema, table_name, parquet_file) if row_count == 0: print(f"{source_schema}.{table_name}: empty, skipping") else: parquet_files.append((table_name, parquet_file)) # ── Enrich parquet files ── enriched = [] for table_name, parquet_file in parquet_files: enriched.append((table_name, enrich_parquet(parquet_file))) # ── Upload to destination ── with mssql_python.connect(dest_conn_str) as dest_conn: for table_name, parquet_file in enriched: target = f"{dest_schema}.[{table_name}]" upload_parquet(dest_conn, parquet_file, target)Por fim, adiciona o ponto de entrada
main. Carrega o.envficheiro, chamatransfer_tablescom as cadeias de ligação de origem e destino, e imprime o tempo total decorrido.def main(): load_dotenv() t_start = time.perf_counter() transfer_tables( source_conn_str=os.environ["SOURCE_CONNECTION_STRING"], dest_conn_str=os.environ["DEST_CONNECTION_STRING"], source_schema="SalesLT", dest_schema="dbo", ) print(f"Total: {time.perf_counter() - t_start:.2f}s") if __name__ == "__main__": main()Salve e feche
main.py.
Guardar as cadeias de ligação
Abra o
.gitignorearquivo e adicione uma exclusão para.envarquivos. Seu arquivo deve ser semelhante a este exemplo. Certifique-se de salvá-lo e fechá-lo quando terminar.# Python-generated files __pycache__/ *.py[oc] build/ dist/ wheels/ *.egg-info # Virtual environments .venv # Connection strings and secrets .envNo diretório atual, crie um novo arquivo chamado
.env.Dentro do
.envficheiro, adiciona entradas para as cadeias de ligação de origem e destino. Substitui os valores provisórios pelos nomes reais do teu servidor e base de dados.SOURCE_CONNECTION_STRING="Server=<source_server_name>;Database=<source_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive" DEST_CONNECTION_STRING="Server=<dest_server_name>;Database=<dest_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive"Sugestão
A cadeia de conexão usada aqui depende em grande parte do tipo de banco de dados SQL ao qual você está se conectando. Se você estiver se conectando a um Banco de Dados SQL do Azure ou a um banco de dados SQL na Malha, use a cadeia de conexão ODBC na guia Cadeias de conexão. Talvez seja necessário ajustar o tipo de autenticação dependendo do cenário. Para obter mais informações sobre cadeias de conexão e sua sintaxe, consulte Referência de sintaxe de cadeia de conexão.
Sugestão
No macOS, tanto ActiveDirectoryInteractive como ActiveDirectoryDefault funcionam para a autenticação do Microsoft Entra.
ActiveDirectoryInteractive Solicita-te para iniciar sessão sempre que executas o script. Para evitar repetidos prompts de início de sessão, inicie sessão uma vez através do Azure CLI executando az login, depois use ActiveDirectoryDefault, que reutiliza a credencial em cache.
Use uv run para executar o script
Na janela do terminal anterior ou em uma nova janela do terminal aberta no mesmo diretório, execute o seguinte comando.
uv run main.pyAqui está a saída esperada quando o script for concluído.
Found 12 SalesLT tables: Address, Customer, CustomerAddress, ... SalesLT.Address → SalesLT/Address.parquet: 450 rows downloaded in 0.15s (3,000 rows/sec) ... SalesLT/Address.parquet → dbo.[Address]: 450 rows uploaded in 0.10s (4,500 rows/sec) | verified: 450 ... Total: 2.35sLigue-se à base de dados de destino usando SQL Server Management Studio (SSMS) ou a extensão MSSQL para o VS Code e verifique se as tabelas e dados foram criados com sucesso.
Para implantar o script em outra máquina, copie todos os arquivos, exceto a
.venvpasta, para a outra máquina. O ambiente virtual é recriado com a primeira execução.
Como funciona o código
A aplicação realiza uma transferência completa de dados de ida e volta em três fases:
-
Download: Liga-se à base de dados de origem, lê metadados das colunas de
INFORMATION_SCHEMA.COLUMNS, constrói um esquema Apache Arrow e depois descarrega cada tabela para um ficheiro Parquet local. -
Enriquecer (opcional): Fornece um gancho (
enrich_parquet) onde pode adicionar transformações, colunas derivadas ou junções antes de carregar. -
Upload: Lê cada ficheiro Parquet em lotes, recria a tabela na base de dados de destino usando DDL gerado a partir dos metadados do esquema Arrow, e depois usa
cursor.bulkcopy()para inserção massiva de alto desempenho.
Próximo passo
Visite o mssql-python repositório GitHub do driver para obter mais exemplos, para contribuir com ideias ou relatar problemas.