Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Neste início rápido, você usará o mssql-python driver para copiar dados em massa entre bancos de dados SQL no Fabric. O aplicativo baixa tabelas de um esquema de banco de dados de origem para arquivos Parquet locais usando o Apache Arrow e, em seguida, carrega-as em um banco de dados de destino usando o método de alto desempenho bulkcopy . Você pode usar esse padrão para migrar, replicar ou transformar dados entre bancos de dados SQL no Fabric.
O mssql-python driver não requer nenhuma dependência externa em computadores Windows. O driver instala tudo o que precisa com uma única pip instalação, permitindo que você use a versão mais recente do driver para novos scripts sem quebrar outros scripts que você não tem tempo para atualizar e testar.
Documentação do mssql-python | Código-fonte do mssql-python | Pacote (PyPI) | UV
Pré-requisitos
Carregue dados de exemplo do AdventureWorks no banco de dados SQL como o banco de dados de origem.
(Opcional) Um segundo banco de dados SQL no Fabric a ser usado como destino. O usuário deve ter permissão para criar e gravar em tabelas. Se você não tiver um segundo banco de dados, poderá alterar a cadeia de conexão de destino para apontar para o mesmo banco de dados e usar um esquema diferente para as tabelas de destino.
Python 3
Se você ainda não tiver o Python, instale o runtime do Python e o gerenciador de pacotes pip de python.org.
Não quer usar seu próprio ambiente? Abra como um devcontainer usando codespaces do GitHub.
Visual Studio Code com as seguintes extensões:
- extensão Python para Visual Studio Code
CLI (Interface Command-Line do Azure) – Necessária para autenticação sem senha no macOS e linux.
Se você ainda não tiver
uv, instaleuvseguindo as instruções de https://docs.astral.sh/uv/getting-started/installation/.Instale pré-requisitos específicos do sistema operacional uma vez.
Criar o projeto e executar o código
- Criar um novo projeto
- Adicionar dependências
- Iniciar Visual Studio Code
- Atualizar pyproject.toml
- Atualizar main.py
- Salvar as cadeias de conexão
- Use o comando uv run para executar o script
Crie um novo projeto
Abra um prompt de comando no diretório de desenvolvimento. Se você não tiver um, crie um novo diretório chamado
python,scriptsetc. Evite pastas em seu OneDrive, a sincronização pode interferir no gerenciamento do seu ambiente virtual.Crie um novo projeto com
uv.uv init mssql-python-bcp-qs cd mssql-python-bcp-qs
Adicionar dependências
No mesmo diretório, instale os pacotes mssql-python, python-dotenv e pyarrow.
uv add mssql-python python-dotenv pyarrow
Iniciar o Visual Studio Code
No mesmo diretório, execute o comando a seguir.
code .
Atualizar pyproject.toml
O pyproject.toml contém os metadados do projeto. Abra o arquivo em seu editor favorito.
Examine o conteúdo do arquivo. Deve ser semelhante a este exemplo. Observe a versão e a dependência do Python para que
mssql-pythonuse>=para definir uma versão mínima. Se preferir uma versão exata, altere o>=para==antes do número da versão. As versões resolvidas de cada pacote são armazenadas no uv.lock. O arquivo de bloqueio garante que os desenvolvedores que trabalham no projeto estejam usando versões de pacote consistentes. Ele também garante que exatamente o mesmo conjunto de versões de pacote seja usado ao distribuir seu pacote para os usuários finais. Você não deve editar ouv.lockarquivo.[project] name = "mssql-python-bcp-qs" version = "0.1.0" description = "Add your description here" readme = "README.md" requires-python = ">=3.11" dependencies = [ "mssql-python>=1.4.0", "python-dotenv>=1.1.1", "pyarrow>=19.0.0", ]Atualize a descrição para ser mais descritiva.
description = "Bulk copies data between SQL databases using mssql-python and Apache Arrow"Salve e feche o arquivo.
Atualizar main.py
Abra o arquivo chamado
main.py. Deve ser semelhante a este exemplo.def main(): print("Hello from mssql-python-bcp-qs!") if __name__ == "__main__": main()Substitua o conteúdo do arquivo pelos seguintes blocos de código. Cada bloco se baseia no anterior e deve ser adicionado ao
main.pyem ordem.Dica
Se o Visual Studio Code estiver tendo problemas para resolver pacotes, você precisará atualizar o interpretador para usar o ambiente virtual.
Na parte superior,
main.pyadicione as importações e as constantes. O script usamssql_pythonpara conectividade de banco de dados,pyarrowepyarrow.parquetpara processamento de dados em formato colunar e leitura/gravação de arquivos Parquet,python-dotenvpara carregar strings de conexão de um arquivo.env, e um padrão regex compilado que valida identificadores SQL para evitar injeções."""Round-trip: download tables from a source DB/schema to parquet, upload to a destination DB/schema.""" import os import re import time from uuid import UUID import pyarrow as pa import pyarrow.parquet as pq from dotenv import load_dotenv import mssql_python BATCH_SIZE = 64_000 _SAFE_IDENT = re.compile(r"^[A-Za-z0-9_]+$") def _validate_ident(name: str) -> str: if not _SAFE_IDENT.match(name): raise ValueError(f"Unsafe SQL identifier: {name!r}") return nameAbaixo das importações, adicione o mapeamento de tipos SQL para Arrow. Esse dicionário converte os tipos de coluna do SQL Server nos seus equivalentes Apache Arrow, de modo que a fidelidade dos dados seja preservada ao gravar no Parquet. As duas funções auxiliares criam cadeias de caracteres de tipo SQL exatas (por exemplo,
NVARCHAR(100)ouDECIMAL(18,2)) deINFORMATION_SCHEMAmetadados e resolvem o tipo de seta correspondente para cada coluna._SQL_TO_ARROW = { "bit": pa.bool_(), "tinyint": pa.uint8(), "smallint": pa.int16(), "int": pa.int32(), "bigint": pa.int64(), "float": pa.float64(), "real": pa.float32(), "smallmoney": pa.decimal128(10, 4), "money": pa.decimal128(19, 4), "date": pa.date32(), "datetime": pa.timestamp("ms"), "datetime2": pa.timestamp("us"), "smalldatetime": pa.timestamp("s"), "uniqueidentifier": pa.string(), "xml": pa.string(), "image": pa.binary(), "binary": pa.binary(), "varbinary": pa.binary(), "timestamp": pa.binary(), } def _sql_type_str(data_type: str, max_length: int, precision: int, scale: int) -> str: """Build the exact SQL type string from INFORMATION_SCHEMA metadata.""" dt = data_type.lower() if dt in ("char", "varchar", "nchar", "nvarchar", "binary", "varbinary"): length = "MAX" if max_length == -1 else str(max_length) return f"{dt.upper()}({length})" if dt in ("decimal", "numeric"): return f"{dt.upper()}({precision},{scale})" return dt.upper() def _arrow_type(sql_type: str, precision: int, scale: int) -> pa.DataType: sql_type = sql_type.lower() if sql_type in _SQL_TO_ARROW: return _SQL_TO_ARROW[sql_type] if sql_type in ("decimal", "numeric"): return pa.decimal128(precision, scale) if sql_type in ("char", "varchar", "nchar", "nvarchar", "text", "ntext", "sysname"): return pa.string() return pa.string() def _convert_value(v): """Convert a SQL value to an Arrow-compatible Python type.""" if isinstance(v, UUID): return str(v) return vAdicione funções para introspecção de esquemas e geração de DDL.
_get_arrow_schemaconsulta usandoINFORMATION_SCHEMA.COLUMNSconsultas parametrizadas, cria um esquema Arrow e armazena o tipo SQL original como metadados de campo, permitindo que a tabela de destino seja recriada com definições exatas de coluna._create_table_ddllê os metadados de volta para gerarDROP/CREATE TABLEDDL. O tipotimestamp(rowversion) é remapeado paraVARBINARY(8)porque é gerado automaticamente e não pode ser inserido diretamente.def _get_arrow_schema(cursor, schema_name: str, table_name: str) -> pa.Schema: """Build an Arrow schema from INFORMATION_SCHEMA.COLUMNS. Stores the original SQL type as field metadata so the round-trip CREATE TABLE can reproduce exact column definitions. """ cursor.execute( "SELECT COLUMN_NAME, DATA_TYPE, " "COALESCE(CHARACTER_MAXIMUM_LENGTH, 0), " "COALESCE(NUMERIC_PRECISION, 0), " "COALESCE(NUMERIC_SCALE, 0), " "IS_NULLABLE " "FROM INFORMATION_SCHEMA.COLUMNS " "WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? " "ORDER BY ORDINAL_POSITION", (schema_name, table_name), ) rows = cursor.fetchall() if not rows: raise ValueError(f"No columns found for {schema_name}.{table_name}") fields = [] for col_name, data_type, max_len, precision, scale, nullable in rows: arrow_t = _arrow_type(data_type, precision, scale) sql_t = _sql_type_str(data_type, max_len, precision, scale) fields.append( pa.field( col_name, arrow_t, nullable=(nullable == "YES"), metadata={"sql_type": sql_t}, ) ) return pa.schema(fields) def _create_table_ddl(target: str, schema: pa.Schema) -> str: """Build DROP/CREATE TABLE DDL from Arrow schema with SQL type metadata.""" col_defs = [] for f in schema: sql_t = f.metadata[b"sql_type"].decode() # timestamp/rowversion is auto-generated and not insertable if sql_t == "TIMESTAMP": sql_t = "VARBINARY(8)" null = "" if f.nullable else " NOT NULL" col_defs.append(f"[{f.name}] {sql_t}{null}") col_defs_str = ",\n ".join(col_defs) return ( f"IF OBJECT_ID('{target}', 'U') IS NOT NULL DROP TABLE {target};\n" f"CREATE TABLE {target} (\n {col_defs_str}\n);" )Adicione a função de download.
download_tablelê linhas de uma tabela de origem em lotes usandofetchmany, converte cada valor em um tipo compatível com Python do Arrow e grava lotes de registros incrementalmente em um arquivo Parquet compq.ParquetWriter. Essa abordagem evita carregar a tabela inteira na memória. A função usa dois cursores separados: um para ler metadados de coluna e outro para transmitir os dados. Se a tabela estiver vazia, ela retornará mais cedo.def download_table(conn, schema_name: str, table_name: str, parquet_file: str) -> int: """Download a SQL table to a parquet file. Returns row count (0 if empty).""" _validate_ident(table_name) source = f"{schema_name}.[{table_name}]" with conn.cursor() as cursor: schema = _get_arrow_schema(cursor, schema_name, table_name) n_cols = len(schema) row_count = 0 t0 = time.perf_counter() with conn.cursor() as cursor: cursor.execute(f"SELECT * FROM {source}") with pq.ParquetWriter(parquet_file, schema) as writer: while True: rows = cursor.fetchmany(BATCH_SIZE) if not rows: break columns = [[] for _ in range(n_cols)] for row in rows: for i in range(n_cols): columns[i].append(_convert_value(row[i])) arrays = [ pa.array(columns[i], type=schema.field(i).type) for i in range(n_cols) ] batch = pa.record_batch(arrays, schema=schema) writer.write_batch(batch) row_count += len(rows) if row_count == 0: os.remove(parquet_file) return 0 elapsed = time.perf_counter() - t0 rate = f"{int(row_count / elapsed):,} rows/sec" if elapsed > 0 else "n/a" print( f"{schema_name}.{table_name} → {parquet_file}: {row_count:,} rows downloaded " f"in {elapsed:.2f}s ({rate})" ) return row_countAdicione o gancho de enriquecimento.
enrich_parqueté um espaço reservado em que você pode adicionar transformações, colunas derivadas ou junções aos dados antes de serem carregados. Neste início rápido, é um no-op que retorna o caminho do arquivo inalterado.def enrich_parquet(parquet_file: str) -> str: """Enrich a parquet file before upload. Returns the (possibly new) file path.""" # TODO: add transformations, derived columns, joins, etc. print(f"Enriching {parquet_file} (no-op)") return parquet_fileAdicione a função de upload.
upload_parquetlê o esquema Arrow do arquivo Parquet, gera e executaDROP/CREATE TABLEDDL para preparar o destino, em seguida lê o arquivo em lotes e chamacursor.bulkcopy()para inserção em massa de alto desempenho. A opçãotable_lock=Truemelhora a taxa de transferência ao minimizar a contenção de bloqueio. Após a conclusão do upload, a função executa umSELECT COUNT(*)para verificar se a contagem de linhas corresponde.def upload_parquet(conn, parquet_file: str, target: str) -> int: """Upload a parquet file into a SQL table via BCP. Returns row count.""" # ── Create target table from parquet schema ── pf_schema = pq.read_schema(parquet_file) with conn.cursor() as cursor: cursor.execute(_create_table_ddl(target, pf_schema)) conn.commit() # ── Bulk insert ── uploaded = 0 t0 = time.perf_counter() with pq.ParquetFile(parquet_file) as pf: with conn.cursor() as cursor: for batch in pf.iter_batches(batch_size=BATCH_SIZE): rows = zip(*(col.to_pylist() for col in batch.columns)) cursor.bulkcopy( target, rows, batch_size=BATCH_SIZE, table_lock=True, timeout=3600, ) uploaded += batch.num_rows conn.commit() elapsed = time.perf_counter() - t0 # ── Verify ── with conn.cursor() as cursor: cursor.execute(f"SELECT COUNT(*) FROM {target}") count = cursor.fetchone()[0] rate = f"{int(uploaded / elapsed):,} rows/sec" if elapsed > 0 else "n/a" print( f"{parquet_file} → {target}: {uploaded:,} rows uploaded " f"in {elapsed:.2f}s ({rate}) " f"| verified: {count:,}" ) return uploadedAdicione a função de orquestração.
transfer_tablesune as três fases. Ele se conecta ao banco de dados de origem, descobre todas as tabelas base no esquema especificado viaINFORMATION_SCHEMA.TABLES, baixa cada uma delas em um arquivo Parquet local, executa o hook de enriquecimento e, em seguida, conecta-se ao banco de dados de destino e carrega cada arquivo.def transfer_tables( source_conn_str: str, dest_conn_str: str, source_schema: str, dest_schema: str, ) -> None: """Download all tables from source DB/schema to parquet, upload to dest DB/schema.""" _validate_ident(source_schema) _validate_ident(dest_schema) parquet_dir = source_schema os.makedirs(parquet_dir, exist_ok=True) # ── Download from source ── with mssql_python.connect(source_conn_str) as src_conn: with src_conn.cursor() as cursor: cursor.execute( "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES " "WHERE TABLE_SCHEMA = ? AND TABLE_TYPE = 'BASE TABLE' " "ORDER BY TABLE_NAME", (source_schema,), ) tables = [row[0] for row in cursor.fetchall()] print(f"Found {len(tables)} {source_schema} tables: {', '.join(tables)}\n") parquet_files = [] for table_name in tables: parquet_file = os.path.join(parquet_dir, f"{table_name}.parquet") row_count = download_table(src_conn, source_schema, table_name, parquet_file) if row_count == 0: print(f"{source_schema}.{table_name}: empty, skipping") else: parquet_files.append((table_name, parquet_file)) # ── Enrich parquet files ── enriched_files = [] for table_name, parquet_file in parquet_files: enriched_file = enrich_parquet(parquet_file) enriched_files.append((table_name, enriched_file)) # ── Upload to destination ── with mssql_python.connect(dest_conn_str) as dest_conn: for table_name, parquet_file in enriched_files: target = f"{dest_schema}.[{table_name}]" upload_parquet(dest_conn, parquet_file, target)Por fim, adicione o ponto de entrada
main. Ele carrega o arquivo.env, chamatransfer_tablescom as cadeias de conexão de origem e destino, e imprime o tempo total decorrido.def main(): load_dotenv() t_start = time.perf_counter() transfer_tables( source_conn_str=os.environ["SOURCE_CONNECTION_STRING"], dest_conn_str=os.environ["DEST_CONNECTION_STRING"], source_schema="SalesLT", dest_schema="dbo", ) print(f"Total: {time.perf_counter() - t_start:.2f}s") if __name__ == "__main__": main()Salve e feche
main.py.
Salvar as strings de conexão
Abra o
.gitignorearquivo e adicione uma exclusão para.envarquivos. Seu arquivo deve ser semelhante a este exemplo. Salve-o e feche-o quando terminar.# Python-generated files __pycache__/ *.py[oc] build/ dist/ wheels/ *.egg-info # Virtual environments .venv # Connection strings and secrets .envNo diretório atual, crie um novo arquivo chamado
.env.No arquivo
.env, adicione entradas para as strings de conexão de origem e destino. Substitua os valores de placeholders pelos nomes reais do servidor e banco de dados.SOURCE_CONNECTION_STRING="Server=<source_server_name>;Database=<source_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive" DEST_CONNECTION_STRING="Server=<dest_server_name>;Database=<dest_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive"Dica
Para o banco de dados SQL no Fabric, use a cadeia de conexão ODBC da guia de cadeias de conexão sem as informações do DRIVER. Para obter mais informações, consulte Conectar-se ao banco de dados SQL no Fabric.
Dica
No macOS, tanto ActiveDirectoryInteractive quanto ActiveDirectoryDefault funcionam para autenticação do Microsoft Entra.
ActiveDirectoryInteractive solicita que você faça login toda vez que executar o script. Para evitar prompts de entrada repetidos, faça logon uma vez por meio da CLI do Azure executando az login, em seguida, use ActiveDirectoryDefault, que reutiliza a credencial armazenada em cache.
Use o comando uv run para executar o script
Na janela do terminal de antes ou em uma nova janela de terminal aberta para o mesmo diretório, execute o comando a seguir.
uv run main.pyEsta é a saída esperada quando o script é concluído.
Found 12 SalesLT tables: Address, Customer, CustomerAddress, ... SalesLT.Address → SalesLT/Address.parquet: 450 rows downloaded in 0.15s (3,000 rows/sec) ... SalesLT/Address.parquet → dbo.[Address]: 450 rows uploaded in 0.10s (4,500 rows/sec) | verified: 450 ... Total: 2.35sConecte-se ao banco de dados de destino e verifique se as tabelas e os dados foram criados com êxito. Para obter mais opções sobre como se conectar, consulte Conectar-se ao banco de dados SQL no Fabric.
Para implantar seu script em outro computador, copie todos os arquivos, exceto a
.venvpasta para o outro computador. O ambiente virtual é recriado com a primeira execução.
Como o código funciona
O aplicativo executa uma transferência completa de dados de ida e volta em três fases:
-
Download: Conecta-se ao banco de dados de origem, lê os metadados da coluna
INFORMATION_SCHEMA.COLUMNS, cria um esquema Apache Arrow e baixa cada tabela em lotes para um arquivo Parquet local usandopq.ParquetWriter. -
Enriquecer (opcional): fornece um gancho (
enrich_parquet) em que você pode adicionar transformações, colunas derivadas ou junções antes de carregar. -
Upload: lê cada arquivo Parquet em blocos, recria a tabela no banco de dados de destino usando DDL gerado a partir dos metadados de esquema do Arrow e, em seguida, utiliza
cursor.bulkcopy()para uma inserção em massa de alto desempenho.
Próxima etapa
Visite o repositório GitHub do mssql-python driver para obter mais exemplos, contribuir com ideias ou relatar problemas.