Kopiera och transformera data till och från SQL Server med hjälp av Azure Data Factory eller Azure Synapse Analytics
GÄLLER FÖR: Azure Data Factory Azure Synapse Analytics
Den här artikeln beskriver hur du använder kopieringsaktiviteten i Azure Data Factory och Azure Synapse pipelines för att kopiera data från och till att SQL Server databas och använda Dataflöde för att transformera data i SQL Server databas. Mer information finns i den inledande artikeln om Azure Data Factory eller Azure Synapse Analytics.
Funktioner som stöds
Den här SQL Server-anslutningsappen stöds för följande funktioner:
Funktioner som stöds | IR |
---|---|
aktiviteten Kopiera (källa/mottagare) | ① ② |
Mappa dataflöde (källa/mottagare) | ① |
Sökningsaktivitet | ① ② |
GetMetadata-aktivitet | ① ② |
Skriptaktivitet | ① ② |
Lagrad proceduraktivitet | ① ② |
(1) Azure Integration Runtime (2) Lokalt installerad integrationskörning
En lista över datalager som stöds som källor eller mottagare av kopieringsaktiviteten finns i tabellen Datalager som stöds .
Mer specifikt stöder den här SQL Server-anslutningsappen:
- SQL Server version 2005 och senare.
- Kopiera data med hjälp av SQL- eller Windows-autentisering.
- Som källa hämtar du data med hjälp av en SQL-fråga eller en lagrad procedur. Du kan också välja att kopiera parallellt från SQL Server källa. Mer information finns i avsnittet Parallellkopiering från SQL-databas.
- Som mottagare skapar du automatiskt måltabellen om den inte finns baserat på källschemat. lägga till data i en tabell eller anropa en lagrad procedur med anpassad logik under kopieringen.
SQL Server Express LocalDB stöds inte.
Förutsättningar
Om ditt datalager finns i ett lokalt nätverk, ett virtuellt Azure-nätverk eller Amazon Virtual Private Cloud måste du konfigurera en integrationskörning med egen värd för att ansluta till det.
Om ditt datalager är en hanterad molndatatjänst kan du använda Azure-Integration Runtime. Om åtkomsten är begränsad till IP-adresser som är godkända i brandväggsreglerna kan du lägga till Azure Integration Runtime IP-adresser i listan över tillåtna.
Du kan också använda funktionen integreringskörning för hanterade virtuella nätverk i Azure Data Factory för att få åtkomst till det lokala nätverket utan att installera och konfigurera en lokalt installerad integrationskörning.
Mer information om de nätverkssäkerhetsmekanismer och alternativ som stöds av Data Factory finns i Strategier för dataåtkomst.
Kom igång
Om du vill utföra aktiviteten Kopiera med en pipeline kan du använda något av följande verktyg eller SDK:er:
- Verktyget Kopiera data
- Azure-portalen
- .NET SDK
- The Python SDK
- Azure PowerShell
- REST-API:et
- Azure Resource Manager-mallen
Skapa en SQL Server länkad tjänst med hjälp av användargränssnittet
Använd följande steg för att skapa en SQL Server länkad tjänst i användargränssnittet för Azure Portal.
Bläddra till fliken Hantera i din Azure Data Factory- eller Synapse-arbetsyta och välj Länkade tjänster och klicka sedan på Nytt:
Sök efter SQL och välj anslutningsappen SQL Server.
Konfigurera tjänstinformationen, testa anslutningen och skapa den nya länkade tjänsten.
Konfigurationsinformation för anslutningsprogram
Följande avsnitt innehåller information om egenskaper som används för att definiera Data Factory- och Synapse-pipelineentiteter som är specifika för SQL Server-databasanslutningsappen.
Egenskaper för länkad tjänst
Den här SQL Server-anslutningsappen stöder följande autentiseringstyper. Mer information finns i motsvarande avsnitt.
Tips
Om du stöter på ett fel med felkoden "UserErrorFailedToConnectToSqlServer" och ett meddelande som "Sessionsgränsen för databasen är XXX och har nåtts" lägger du till Pooling=false
i anslutningssträngen och försöker igen.
SQL-autentisering
Följande egenskaper stöds för att använda SQL-autentisering:
Egenskap | Beskrivning | Krävs |
---|---|---|
typ | Typegenskapen måste anges till SqlServer. | Yes |
Connectionstring | Ange connectionString-information som behövs för att ansluta till SQL Server-databasen. Ange ett inloggningsnamn som användarnamn och se till att databasen som du vill ansluta är mappad till den här inloggningen. Se följande exempel. | Yes |
password | Om du vill lägga till ett lösenord i Azure Key Vault hämtar du konfigurationen password från anslutningssträngen. Mer information finns i JSON-exemplet som följer tabellen och Lagra autentiseringsuppgifter i Azure Key Vault. |
No |
alwaysEncryptedSettings | Ange alwaysencryptedsettings-information som behövs för att aktivera Always Encrypted för att skydda känsliga data som lagras i SQL Server med hjälp av antingen hanterad identitet eller tjänstens huvudnamn. Mer information finns i JSON-exemplet som följer tabellen och avsnittet Using Always Encrypted (Använda Always Encrypted). Om inget anges inaktiveras standardinställningen Always Encrypted. | No |
connectVia | Den här integrationskörningen används för att ansluta till datalagret. Läs mer i avsnittet Förutsättningar . Om inget anges används standardkörningen för Azure-integrering. | No |
Exempel: Använda SQL-autentisering
{
"name": "SqlServerLinkedService",
"properties": {
"type": "SqlServer",
"typeProperties": {
"connectionString": "Data Source=<servername>\\<instance name if using named instance>;Initial Catalog=<databasename>;Integrated Security=False;User ID=<username>;Password=<password>;"
},
"connectVia": {
"referenceName": "<name of Integration Runtime>",
"type": "IntegrationRuntimeReference"
}
}
}
Exempel: Använda SQL-autentisering med ett lösenord i Azure Key Vault
{
"name": "SqlServerLinkedService",
"properties": {
"type": "SqlServer",
"typeProperties": {
"connectionString": "Data Source=<servername>\\<instance name if using named instance>;Initial Catalog=<databasename>;Integrated Security=False;User ID=<username>;",
"password": {
"type": "AzureKeyVaultSecret",
"store": {
"referenceName": "<Azure Key Vault linked service name>",
"type": "LinkedServiceReference"
},
"secretName": "<secretName>"
}
},
"connectVia": {
"referenceName": "<name of Integration Runtime>",
"type": "IntegrationRuntimeReference"
}
}
}
Exempel: Använd Always Encrypted
{
"name": "SqlServerLinkedService",
"properties": {
"type": "SqlServer",
"typeProperties": {
"connectionString": "Data Source=<servername>\\<instance name if using named instance>;Initial Catalog=<databasename>;Integrated Security=False;User ID=<username>;Password=<password>;"
},
"alwaysEncryptedSettings": {
"alwaysEncryptedAkvAuthType": "ServicePrincipal",
"servicePrincipalId": "<service principal id>",
"servicePrincipalKey": {
"type": "SecureString",
"value": "<service principal key>"
}
},
"connectVia": {
"referenceName": "<name of Integration Runtime>",
"type": "IntegrationRuntimeReference"
}
}
}
Windows-autentisering
Följande egenskaper stöds för att använda Windows-autentisering:
Egenskap | Beskrivning | Krävs |
---|---|---|
typ | Typegenskapen måste anges till SqlServer. | Yes |
Connectionstring | Ange connectionString-information som behövs för att ansluta till SQL Server-databasen. Se följande exempel. | |
userName | Ange ett användarnamn. Ett exempel är domännamn\användarnamn. | Yes |
password | Ange ett lösenord för det användarkonto som du angav för användarnamnet. Markera det här fältet som SecureString för att lagra det på ett säkert sätt. Eller så kan du referera till en hemlighet som lagras i Azure Key Vault. | Yes |
alwaysEncryptedSettings | Ange alwaysencryptedsettings-information som behövs för att aktivera Always Encrypted för att skydda känsliga data som lagras i SQL Server med hjälp av antingen hanterad identitet eller tjänstens huvudnamn. Mer information finns i avsnittet Använda Always Encrypted. Om inget anges inaktiveras standardinställningen Always Encrypted. | No |
connectVia | Den här integrationskörningen används för att ansluta till datalagret. Läs mer i avsnittet Förutsättningar . Om inget anges används standardkörningen för Azure-integrering. | No |
Anteckning
Windows-autentisering stöds inte i dataflödet.
Exempel: Använda Windows-autentisering
{
"name": "SqlServerLinkedService",
"properties": {
"type": "SqlServer",
"typeProperties": {
"connectionString": "Data Source=<servername>\\<instance name if using named instance>;Initial Catalog=<databasename>;Integrated Security=True;",
"userName": "<domain\\username>",
"password": {
"type": "SecureString",
"value": "<password>"
}
},
"connectVia": {
"referenceName": "<name of Integration Runtime>",
"type": "IntegrationRuntimeReference"
}
}
}
Exempel: Använda Windows-autentisering med ett lösenord i Azure Key Vault
{
"name": "SqlServerLinkedService",
"properties": {
"annotations": [],
"type": "SqlServer",
"typeProperties": {
"connectionString": "Data Source=<servername>\\<instance name if using named instance>;Initial Catalog=<databasename>;Integrated Security=True;",
"userName": "<domain\\username>",
"password": {
"type": "AzureKeyVaultSecret",
"store": {
"referenceName": "<Azure Key Vault linked service name>",
"type": "LinkedServiceReference"
},
"secretName": "<secretName>"
}
},
"connectVia": {
"referenceName": "<name of Integration Runtime>",
"type": "IntegrationRuntimeReference"
}
}
}
Egenskaper för datamängd
En fullständig lista över avsnitt och egenskaper som är tillgängliga för att definiera datauppsättningar finns i artikeln datauppsättningar . Det här avsnittet innehåller en lista över egenskaper som stöds av SQL Server datauppsättning.
Följande egenskaper stöds för att kopiera data från och till en SQL Server databas:
Egenskap | Beskrivning | Krävs |
---|---|---|
typ | Typegenskapen för datauppsättningen måste anges till SqlServerTable. | Yes |
schema | Namnet på schemat. | Nej för källa, Ja för mottagare |
bord | Namnet på tabellen/vyn. | Nej för källa, Ja för mottagare |
tableName | Namnet på tabellen/vyn med schemat. Den här egenskapen stöds för bakåtkompatibilitet. För ny arbetsbelastning använder du schema och table . |
Nej för källa, Ja för mottagare |
Exempel
{
"name": "SQLServerDataset",
"properties":
{
"type": "SqlServerTable",
"linkedServiceName": {
"referenceName": "<SQL Server linked service name>",
"type": "LinkedServiceReference"
},
"schema": [ < physical schema, optional, retrievable during authoring > ],
"typeProperties": {
"schema": "<schema_name>",
"table": "<table_name>"
}
}
}
Kopiera egenskaper för aktivitet
En fullständig lista över avsnitt och egenskaper som är tillgängliga för att definiera aktiviteter finns i artikeln Pipelines . Det här avsnittet innehåller en lista över egenskaper som stöds av SQL Server källa och mottagare.
SQL Server som källa
Tips
Om du vill läsa in data från SQL Server effektivt med hjälp av datapartitionering kan du läsa mer från Parallell kopiering från SQL-databas.
Om du vill kopiera data från SQL Server anger du källtypen i kopieringsaktiviteten till SqlSource. Följande egenskaper stöds i avsnittet kopieringsaktivitetskälla:
Egenskap | Beskrivning | Krävs |
---|---|---|
typ | Typegenskapen för kopieringsaktivitetskällan måste anges till SqlSource. | Yes |
sqlReaderQuery | Använd den anpassade SQL-frågan för att läsa data. Ett exempel är select * from MyTable . |
No |
sqlReaderStoredProcedureName | Den här egenskapen är namnet på den lagrade proceduren som läser data från källtabellen. Den sista SQL-instruktionen måste vara en SELECT-instruktion i den lagrade proceduren. | No |
storedProcedureParameters | De här parametrarna är för den lagrade proceduren. Tillåtna värden är namn- eller värdepar. Parametrarnas namn och hölje måste matcha namnen och höljet för parametrarna för den lagrade proceduren. |
No |
isolationLevel | Anger transaktionslåsningsbeteendet för SQL-källan. De tillåtna värdena är: ReadCommitted, ReadUncommitted, RepeatableRead, Serializable, Snapshot. Om inget anges används databasens standardisoleringsnivå. Mer information finns i det här dokumentet . | No |
partitionOptions | Anger de datapartitioneringsalternativ som används för att läsa in data från SQL Server. Tillåtna värden är: Ingen (standard), PhysicalPartitionsOfTable och DynamicRange. När ett partitionsalternativ är aktiverat (dvs. inte None ), styrs graden av parallellitet för samtidig inläsning av data från SQL Server av parallelCopies inställningen för kopieringsaktiviteten. |
No |
partitionSettings | Ange gruppen med inställningarna för datapartitionering. Använd när partitionsalternativet inte None är . |
No |
Under partitionSettings : |
||
partitionColumnName | Ange namnet på källkolumnen i heltal eller datum/datetime-typ (int , smallint , bigint , date , smalldatetime , datetime , datetime2 eller datetimeoffset ) som ska användas av intervallpartitionering för parallellkopiering. Om inget anges identifieras indexet eller primärnyckeln i tabellen automatiskt och används som partitionskolumn.Använd när partitionsalternativet är DynamicRange . Om du använder en fråga för att hämta källdata kopplar ?AdfDynamicRangePartitionCondition du in WHERE-satsen. Ett exempel finns i avsnittet Parallell kopia från SQL Database . |
No |
partitionUpperBound | Det maximala värdet för partitionskolumnen för partitionsintervalldelning. Det här värdet används för att bestämma partitionssteget, inte för att filtrera raderna i tabellen. Alla rader i tabellen eller frågeresultatet partitioneras och kopieras. Om inget anges identifierar kopieringsaktivitet automatiskt värdet. Använd när partitionsalternativet är DynamicRange . Ett exempel finns i avsnittet Parallell kopia från SQL Database . |
No |
partitionLowerBound | Minimivärdet för partitionskolumnen för partitionsintervalldelning. Det här värdet används för att bestämma partitionssteget, inte för att filtrera raderna i tabellen. Alla rader i tabellen eller frågeresultatet partitioneras och kopieras. Om inget anges identifierar kopieringsaktivitet automatiskt värdet. Använd när partitionsalternativet är DynamicRange . Ett exempel finns i avsnittet Parallell kopia från SQL Database . |
No |
Observera följande punkter:
- Om sqlReaderQuery har angetts för SqlSource kör kopieringsaktiviteten den här frågan mot den SQL Server källan för att hämta data. Du kan också ange en lagrad procedur genom att ange sqlReaderStoredProcedureName och storedProcedureParameters om den lagrade proceduren tar parametrar.
- När du använder lagrad procedur i källan för att hämta data bör du tänka på att om den lagrade proceduren är utformad som att returnera ett annat schema när ett annat parametervärde skickas in, kan det uppstå fel eller oväntade resultat när du importerar schemat från användargränssnittet eller när du kopierar data till SQL Database med automatisk tabellskapande.
Exempel: Använda SQL-fråga
"activities":[
{
"name": "CopyFromSQLServer",
"type": "Copy",
"inputs": [
{
"referenceName": "<SQL Server input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "SqlSource",
"sqlReaderQuery": "SELECT * FROM MyTable"
},
"sink": {
"type": "<sink type>"
}
}
}
]
Exempel: Använd en lagrad procedur
"activities":[
{
"name": "CopyFromSQLServer",
"type": "Copy",
"inputs": [
{
"referenceName": "<SQL Server input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "SqlSource",
"sqlReaderStoredProcedureName": "CopyTestSrcStoredProcedureWithParameters",
"storedProcedureParameters": {
"stringData": { "value": "str3" },
"identifier": { "value": "$$Text.Format('{0:yyyy}', <datetime parameter>)", "type": "Int"}
}
},
"sink": {
"type": "<sink type>"
}
}
}
]
Definitionen för lagrad procedur
CREATE PROCEDURE CopyTestSrcStoredProcedureWithParameters
(
@stringData varchar(20),
@identifier int
)
AS
SET NOCOUNT ON;
BEGIN
select *
from dbo.UnitTestSrcTable
where dbo.UnitTestSrcTable.stringData != stringData
and dbo.UnitTestSrcTable.identifier != identifier
END
GO
SQL Server som handfat
Tips
Läs mer om skrivbeteenden, konfigurationer och metodtips som stöds från Metodtips för att läsa in data i SQL Server.
Om du vill kopiera data till SQL Server anger du mottagartypen i kopieringsaktiviteten till SqlSink. Följande egenskaper stöds i avsnittet för kopieringsaktivitetens mottagare:
Egenskap | Beskrivning | Krävs |
---|---|---|
typ | Typegenskapen för kopieringsaktivitetens mottagare måste anges till SqlSink. | Yes |
preCopyScript | Den här egenskapen anger en SQL-fråga för kopieringsaktiviteten som ska köras innan data skrivs till SQL Server. Den anropas bara en gång per kopieringskörning. Du kan använda den här egenskapen för att rensa förinstallerade data. | No |
tableOption | Anger om mottagartabellen ska skapas automatiskt om den inte finns baserat på källschemat. Automatisk tabellskapande stöds inte när mottagaren anger lagrad procedur. Tillåtna värden är: none (standard), autoCreate . |
No |
sqlWriterStoredProcedureName | Namnet på den lagrade proceduren som definierar hur källdata ska tillämpas i en måltabell. Den här lagrade proceduren anropas per batch. För åtgärder som bara körs en gång och som inte har något att göra med källdata, till exempel ta bort eller trunkera, använder du preCopyScript egenskapen .Se exempel från Anropa en lagrad procedur från en SQL-mottagare. |
No |
storedProcedureTableTypeParameterName | Parameternamnet för den tabelltyp som anges i den lagrade proceduren. | No |
sqlWriterTableType | Tabelltypnamnet som ska användas i den lagrade proceduren. Kopieringsaktiviteten gör data som flyttas tillgängliga i en temporär tabell med den här tabelltypen. Lagrad procedurkod kan sedan sammanfoga de data som kopieras med befintliga data. | No |
storedProcedureParameters | Parametrar för den lagrade proceduren. Tillåtna värden är namn- och värdepar. Namn och hölje för parametrar måste matcha namnen och höljet för parametrarna för den lagrade proceduren. |
No |
writeBatchSize | Antal rader som ska infogas i SQL-tabellen per batch. Tillåtna värden är heltal för antalet rader. Som standard avgör tjänsten dynamiskt lämplig batchstorlek baserat på radstorleken. |
No |
writeBatchTimeout | Den här egenskapen anger väntetiden för batchinfogningsåtgärden som ska slutföras innan tidsgränsen uppnås. Tillåtna värden är för tidsintervallet. Ett exempel är "00:30:00" i 30 minuter. Om inget värde anges är tidsgränsen som standard "02:00:00". |
No |
maxConcurrentConnections | Den övre gränsen för samtidiga anslutningar som upprättas till datalagret under aktivitetskörningen. Ange endast ett värde när du vill begränsa samtidiga anslutningar. | No |
WriteBehavior | Ange skrivbeteendet för kopieringsaktiviteten för att läsa in data till SQL Server Database. Det tillåtna värdet är Insert och Upsert. Som standard använder tjänsten insert för att läsa in data. |
No |
upsertSettings | Ange gruppen med inställningarna för skrivbeteende. Använd när alternativet WriteBehavior är Upsert . |
No |
Under upsertSettings : |
||
useTempDB | Ange om du vill använda en global tillfällig tabell eller fysisk tabell som interimtabell för upsert. Som standard använder tjänsten global tillfällig tabell som interimstabell. värdet är true . |
No |
interimSchemaName | Ange interimschemat för att skapa interimtabell om fysisk tabell används. Obs! Användaren måste ha behörighet att skapa och ta bort tabellen. Som standard delar interimtabellen samma schema som mottagartabellen. Använd när alternativet useTempDB är False . |
No |
keys | Ange kolumnnamnen för unik radidentifiering. Antingen kan en enda nyckel eller en serie nycklar användas. Om inget anges används primärnyckeln. | No |
Exempel 1: Lägga till data
"activities":[
{
"name": "CopyToSQLServer",
"type": "Copy",
"inputs": [
{
"referenceName": "<input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<SQL Server output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "<source type>"
},
"sink": {
"type": "SqlSink",
"tableOption": "autoCreate",
"writeBatchSize": 100000
}
}
}
]
Exempel 2: Anropa en lagrad procedur under kopieringen
Läs mer i Anropa en lagrad procedur från en SQL-mottagare.
"activities":[
{
"name": "CopyToSQLServer",
"type": "Copy",
"inputs": [
{
"referenceName": "<input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<SQL Server output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "<source type>"
},
"sink": {
"type": "SqlSink",
"sqlWriterStoredProcedureName": "CopyTestStoredProcedureWithParameters",
"storedProcedureTableTypeParameterName": "MyTable",
"sqlWriterTableType": "MyTableType",
"storedProcedureParameters": {
"identifier": { "value": "1", "type": "Int" },
"stringData": { "value": "str1" }
}
}
}
}
]
Exempel 3: Upsert-data
"activities":[
{
"name": "CopyToSQLServer",
"type": "Copy",
"inputs": [
{
"referenceName": "<input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<SQL Server output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "<source type>"
},
"sink": {
"type": "SqlSink",
"tableOption": "autoCreate",
"writeBehavior": "upsert",
"upsertSettings": {
"useTempDB": true,
"keys": [
"<column name>"
]
},
}
}
}
]
Parallellkopiering från SQL-databas
Den SQL Server anslutningsappen i kopieringsaktiviteten ger inbyggd datapartitionering för att kopiera data parallellt. Du hittar alternativ för datapartitionering på fliken Källa för kopieringsaktiviteten.
När du aktiverar partitionerad kopiering kör kopieringsaktiviteten parallella frågor mot din SQL Server källa för att läsa in data efter partitioner. Den parallella graden styrs av parallelCopies
inställningen för kopieringsaktiviteten. Om du till exempel anger parallelCopies
till fyra genererar och kör tjänsten samtidigt fyra frågor baserat på det angivna partitionsalternativet och inställningarna, och varje fråga hämtar en del av data från din SQL Server.
Du rekommenderas att aktivera parallell kopiering med datapartitionering, särskilt när du läser in stora mängder data från SQL Server. Följande är föreslagna konfigurationer för olika scenarier. När du kopierar data till ett filbaserat datalager rekommenderar vi att du skriver till en mapp som flera filer (ange endast mappnamn), vilket innebär att prestandan är bättre än att skriva till en enda fil.
Scenario | Inställningar för förslag |
---|---|
Full belastning från en stor tabell med fysiska partitioner. | Partitionsalternativ: Fysiska partitioner i tabellen. Under körningen identifierar tjänsten automatiskt de fysiska partitionerna och kopierar data efter partitioner. Om du vill kontrollera om tabellen har fysisk partition eller inte kan du referera till den här frågan. |
Full belastning från en stor tabell, utan fysiska partitioner, med ett heltal eller en datetime-kolumn för datapartitionering. | Partitionsalternativ: Dynamisk intervallpartition. Partitionskolumn (valfritt): Ange den kolumn som används för att partitionera data. Om inget anges används primärnyckelkolumnen. Partitionens övre gräns och partitionens nedre gräns (valfritt): Ange om du vill fastställa partitionssteget. Detta gäller inte för filtrering av raderna i tabellen. Alla rader i tabellen partitioneras och kopieras. Om det inte anges identifierar kopieringsaktiviteten automatiskt värdena och det kan ta lång tid beroende på MIN- och MAX-värden. Vi rekommenderar att du anger övre gräns och nedre gräns. Om partitionskolumnen "ID" till exempel har värden mellan 1 och 100 och du anger den nedre gränsen som 20 och den övre gränsen till 80, med parallellkopiering som 4, hämtar tjänsten data med 4 partitioner – ID:n i intervallet <=20, [21, 50], [51, 80] >respektive =81. |
Läs in en stor mängd data med hjälp av en anpassad fråga, utan fysiska partitioner, med ett heltal eller en date/datetime-kolumn för datapartitionering. | Partitionsalternativ: Dynamisk intervallpartition. Fråga: SELECT * FROM <TableName> WHERE ?AdfDynamicRangePartitionCondition AND <your_additional_where_clause> .Partitionskolumn: Ange den kolumn som används för att partitionera data. Partitionens övre gräns och partitionens nedre gräns (valfritt): Ange om du vill fastställa partitionssteget. Detta gäller inte för filtrering av raderna i tabellen. Alla rader i frågeresultatet partitioneras och kopieras. Om inget anges identifierar kopieringsaktivitet automatiskt värdet. Under körningen ersätter ?AdfRangePartitionColumnName tjänsten med det faktiska kolumnnamnet och värdeintervallen för varje partition och skickar till SQL Server. Om partitionskolumnen "ID" till exempel har värden mellan 1 och 100 och du anger den nedre gränsen som 20 och den övre gränsen till 80, med parallellkopiering som 4, hämtar tjänsten data med 4 partitioner – ID:n i intervallet <=20, [21, 50], [51, 80] respektive >=81. Här är fler exempelfrågor för olika scenarier: 1. Fråga hela tabellen: SELECT * FROM <TableName> WHERE ?AdfDynamicRangePartitionCondition 2. Fråga från en tabell med kolumnval och ytterligare where-clause-filter: SELECT <column_list> FROM <TableName> WHERE ?AdfDynamicRangePartitionCondition AND <your_additional_where_clause> 3. Fråga med underfrågor: SELECT <column_list> FROM (<your_sub_query>) AS T WHERE ?AdfDynamicRangePartitionCondition AND <your_additional_where_clause> 4. Fråga med partition i underfråga: SELECT <column_list> FROM (SELECT <your_sub_query_column_list> FROM <TableName> WHERE ?AdfDynamicRangePartitionCondition) AS T |
Metodtips för att läsa in data med partitionsalternativ:
- Välj distinkt kolumn som partitionskolumn (till exempel primärnyckel eller unik nyckel) för att undvika snedställning av data.
- Om tabellen har en inbyggd partition använder du partitionsalternativet "Fysiska partitioner av tabellen" för att få bättre prestanda.
- Om du använder Azure Integration Runtime för att kopiera data kan du ange större "Data Integration Units (DIU)" (>4) för att använda fler beräkningsresurser. Kontrollera tillämpliga scenarier där.
- "Grad av kopieringsparallellitet" styr partitionsnumren och ställer in det här antalet för stort ibland skadar prestandan, rekommenderar att du anger det här talet som (DIU eller antalet lokalt installerade IR-noder) * (2 till 4).
Exempel: fullständig belastning från stor tabell med fysiska partitioner
"source": {
"type": "SqlSource",
"partitionOption": "PhysicalPartitionsOfTable"
}
Exempel: fråga med dynamisk intervallpartition
"source": {
"type": "SqlSource",
"query": "SELECT * FROM <TableName> WHERE ?AdfDynamicRangePartitionCondition AND <your_additional_where_clause>",
"partitionOption": "DynamicRange",
"partitionSettings": {
"partitionColumnName": "<partition_column_name>",
"partitionUpperBound": "<upper_value_of_partition_column (optional) to decide the partition stride, not as data filter>",
"partitionLowerBound": "<lower_value_of_partition_column (optional) to decide the partition stride, not as data filter>"
}
}
Exempelfråga för att kontrollera fysisk partition
SELECT DISTINCT s.name AS SchemaName, t.name AS TableName, pf.name AS PartitionFunctionName, c.name AS ColumnName, iif(pf.name is null, 'no', 'yes') AS HasPartition
FROM sys.tables AS t
LEFT JOIN sys.objects AS o ON t.object_id = o.object_id
LEFT JOIN sys.schemas AS s ON o.schema_id = s.schema_id
LEFT JOIN sys.indexes AS i ON t.object_id = i.object_id
LEFT JOIN sys.index_columns AS ic ON ic.partition_ordinal > 0 AND ic.index_id = i.index_id AND ic.object_id = t.object_id
LEFT JOIN sys.columns AS c ON c.object_id = ic.object_id AND c.column_id = ic.column_id
LEFT JOIN sys.partition_schemes ps ON i.data_space_id = ps.data_space_id
LEFT JOIN sys.partition_functions pf ON pf.function_id = ps.function_id
WHERE s.name='[your schema]' AND t.name = '[your table name]'
Om tabellen har en fysisk partition ser du "HasPartition" som "ja" som följande.
Bästa praxis för att läsa in data i SQL Server
När du kopierar data till SQL Server kan du behöva olika skrivbeteenden:
- Lägg till: Mina källdata har bara nya poster.
- Upsert: Mina källdata har både infogningar och uppdateringar.
- Skriv över: Jag vill läsa in hela dimensionstabellen varje gång.
- Skriv med anpassad logik: Jag behöver extra bearbetning innan den slutliga infogningen i måltabellen.
Se respektive avsnitt för hur du konfigurerar och metodtips.
Lägga till data
Att lägga till data är standardbeteendet för den här anslutningsappen för SQL Server mottagare. tjänsten gör en massinfogning för att skriva till tabellen effektivt. Du kan konfigurera källan och mottagaren i enlighet med kopieringsaktiviteten.
Upserta data
aktiviteten Kopiera stöder nu inbyggt inläsning av data i en tillfällig databastabell och uppdaterar sedan data i mottagartabellen om nyckeln finns och infogar på annat sätt nya data. Mer information om upsert-inställningar i kopieringsaktiviteter finns i SQL Server som mottagare.
Skriv över hela tabellen
Du kan konfigurera preCopyScript-egenskapen i en kopieringsaktivitetsmottagare. I det här fallet kör tjänsten skriptet först för varje kopieringsaktivitet som körs. Sedan körs kopian för att infoga data. Om du till exempel vill skriva över hela tabellen med de senaste data anger du ett skript för att först ta bort alla poster innan du massinläser nya data från källan.
Skriva data med anpassad logik
Stegen för att skriva data med anpassad logik liknar de som beskrivs i avsnittet Upsert-data . När du behöver tillämpa extra bearbetning innan källdata slutligen infogas i måltabellen kan du läsa in till en mellanlagringstabell och sedan anropa lagrad proceduraktivitet eller anropa en lagrad procedur i kopieringsaktivitetens mottagare för att tillämpa data.
Anropa en lagrad procedur från en SQL-mottagare
När du kopierar data till SQL Server databas kan du också konfigurera och anropa en användardefinierad lagrad procedur med ytterligare parametrar i varje batch i källtabellen. Funktionen för lagrad procedur drar nytta av tabellvärdesparametrar. Observera att tjänsten automatiskt omsluter den lagrade proceduren i sin egen transaktion, så alla transaktioner som skapas i den lagrade proceduren blir kapslade transaktioner och kan påverka undantagshanteringen.
Du kan använda en lagrad procedur när inbyggda kopieringsmekanismer inte tjänar syftet. Ett exempel är när du vill tillämpa extra bearbetning innan den slutliga infogningen av källdata i måltabellen. Några extra bearbetningsexempel är när du vill slå samman kolumner, leta upp ytterligare värden och infoga i mer än en tabell.
Följande exempel visar hur du använder en lagrad procedur för att göra en upsert till en tabell i SQL Server-databasen. Anta att indata och marknadsföringstabellen för mottagare har tre kolumner vardera: ProfileID, State och Category. Gör upsert baserat på kolumnen ProfileID och tillämpa den bara för en specifik kategori som heter "ProductA".
I databasen definierar du tabelltypen med samma namn som sqlWriterTableType. Schemat för tabelltypen är samma som schemat som returneras av dina indata.
CREATE TYPE [dbo].[MarketingType] AS TABLE( [ProfileID] [varchar](256) NOT NULL, [State] [varchar](256) NOT NULL, [Category] [varchar](256) NOT NULL )
I databasen definierar du den lagrade proceduren med samma namn som sqlWriterStoredProcedureName. Den hanterar indata från din angivna källa och sammanfogas till utdatatabellen. Parameternamnet för tabelltypen i den lagrade proceduren är samma som tableName som definierats i datauppsättningen.
CREATE PROCEDURE spOverwriteMarketing @Marketing [dbo].[MarketingType] READONLY, @category varchar(256) AS BEGIN MERGE [dbo].[Marketing] AS target USING @Marketing AS source ON (target.ProfileID = source.ProfileID and target.Category = @category) WHEN MATCHED THEN UPDATE SET State = source.State WHEN NOT MATCHED THEN INSERT (ProfileID, State, Category) VALUES (source.ProfileID, source.State, source.Category); END
Definiera avsnittet SQL-mottagare i kopieringsaktiviteten enligt följande:
"sink": { "type": "SqlSink", "sqlWriterStoredProcedureName": "spOverwriteMarketing", "storedProcedureTableTypeParameterName": "Marketing", "sqlWriterTableType": "MarketingType", "storedProcedureParameters": { "category": { "value": "ProductA" } } }
Mappa dataflödesegenskaper
När du transformerar data i dataflödet för mappning kan du läsa och skriva till tabeller från SQL Server Database. Mer information finns i källomvandlingen och mottagaromvandlingen i mappning av dataflöden.
Anteckning
För att komma åt lokala SQL Server måste du använda Azure Data Factory eller Synapse-arbetsytans hanterade Virtual Network med hjälp av en privat slutpunkt. I den här självstudien finns detaljerade steg.
Källtransformering
I tabellen nedan visas de egenskaper som stöds av SQL Server källa. Du kan redigera dessa egenskaper på fliken Källalternativ .
Name | Beskrivning | Krävs | Tillåtna värden | Skriptegenskap för dataflöde |
---|---|---|---|---|
Tabell | Om du väljer Tabell som indata hämtar dataflödet alla data från tabellen som anges i datauppsättningen. | No | - | - |
Söka i data | Om du väljer Fråga som indata anger du en SQL-fråga för att hämta data från källan, vilket åsidosätter alla tabeller som du anger i datauppsättningen. Att använda frågor är ett bra sätt att minska antalet rader för testning eller sökningar. Order By-satsen stöds inte, men du kan ange en fullständig SELECT FROM-instruktion. Du kan också använda användardefinierade tabellfunktioner. select * from udfGetData() är en UDF i SQL som returnerar en tabell som du kan använda i dataflödet. Frågeexempel: Select * from MyTable where customerId > 1000 and customerId < 2000 |
Nej | Sträng | query |
Batchstorlek | Ange en batchstorlek för att segmentera stora data i läsningar. | No | Integer | batchSize |
Isoleringsnivå | Välj någon av följande isoleringsnivåer: – Läs incheckad – Läs ej obligatoriskt (standard) - Repeterbar läsning -Serialiseras – Ingen (ignorera isoleringsnivå) |
No | READ_COMMITTED READ_UNCOMMITTED REPEATABLE_READ SERIALISERAS INGEN |
isolationLevel |
Aktivera inkrementellt extrahering | Använd det här alternativet om du vill att ADF endast ska bearbeta rader som har ändrats sedan den senaste gången pipelinen kördes. | No | - | - |
Inkrementell datumkolumn | När du använder funktionen för inkrementell extrahering måste du välja den datum/tid-kolumn som du vill använda som vattenstämpel i källtabellen. | No | - | - |
Aktivera intern insamling av ändringsdata (förhandsversion) | Använd det här alternativet för att be ADF att endast bearbeta deltadata som samlats in av SQL-teknik för datainsamling sedan den senaste gången pipelinen kördes. Med det här alternativet läses deltadata inklusive radinfogning, uppdatering och borttagning in automatiskt utan att någon inkrementell datumkolumn krävs. Du måste aktivera insamling av ändringsdata på SQL Server innan du använder det här alternativet i ADF. Mer information om det här alternativet i ADF finns i intern insamling av ändringsdata. | No | - | - |
Börja läsa från början | Om du anger det här alternativet med inkrementellt extrahering instrueras ADF att läsa alla rader vid första körningen av en pipeline med inkrementellt extrahering aktiverat. | No | - | - |
Tips
Det gemensamma tabelluttrycket (CTE) i SQL stöds inte i frågeläget för mappning av dataflöde, eftersom förutsättningen för att använda det här läget är att frågor kan användas i SQL-fråge-FROM-satsen, men det går inte att göra detta. Om du vill använda CTE:er måste du skapa en lagrad procedur med hjälp av följande fråga:
CREATE PROC CTESP @query nvarchar(max)
AS
BEGIN
EXECUTE sp_executesql @query;
END
Använd sedan läget Lagrad procedur i källomvandlingen av mappningsdataflödet och ange exemplet @query
with CTE as (select 'test' as a) select * from CTE
. Sedan kan du använda ctes som förväntat.
SQL Server exempel på källskript
När du använder SQL Server som källtyp är det associerade dataflödesskriptet:
source(allowSchemaDrift: true,
validateSchema: false,
isolationLevel: 'READ_UNCOMMITTED',
query: 'select * from MYTABLE',
format: 'query') ~> SQLSource
Transformering av mottagare
I tabellen nedan visas de egenskaper som stöds av SQL Server mottagare. Du kan redigera dessa egenskaper på fliken Alternativ för mottagare .
Name | Beskrivning | Krävs | Tillåtna värden | Skriptegenskap för dataflöde |
---|---|---|---|---|
Uppdateringsmetod | Ange vilka åtgärder som tillåts på databasmålet. Standardvärdet är att endast tillåta infogningar. För att uppdatera, uppdatera eller ta bort rader krävs en Alter-radtransformering för att tagga rader för dessa åtgärder. |
Yes | true eller false |
Deletable infogningsbar Uppdaterbar upsertable |
Nyckelkolumner | För uppdateringar, upserts och borttagningar måste nyckelkolumner anges för att avgöra vilken rad som ska ändras. Kolumnnamnet som du väljer som nyckel används som en del av den efterföljande uppdateringen, upsert, delete. Därför måste du välja en kolumn som finns i mappningen mottagare. |
No | Matris | keys |
Hoppa över att skriva nyckelkolumner | Om du inte vill skriva värdet till nyckelkolumnen väljer du "Hoppa över att skriva nyckelkolumner". | No | true eller false |
skipKeyWrites |
Tabellåtgärd | Avgör om du vill återskapa eller ta bort alla rader från måltabellen innan du skriver. - Ingen: Ingen åtgärd utförs i tabellen. - Återskapa: Tabellen tas bort och återskapas. Krävs om du skapar en ny tabell dynamiskt. - Trunkera: Alla rader från måltabellen tas bort. |
No | true eller false |
Återskapa truncate |
Batchstorlek | Ange hur många rader som skrivs i varje batch. Större batchstorlekar förbättrar komprimering och minnesoptimering, men riskerar att få slut på minnesfel vid cachelagring av data. | No | Integer | batchSize |
Sql-skript före och efter | Ange SQL-skript med flera rader som ska köras före (förbearbetning) och efter att (efterbearbetning) data har skrivits till mottagardatabasen. | Nej | Sträng | preSQLs postSQLs |
Tips
- Vi rekommenderar att du bryter enskilda batchskript med flera kommandon i flera batchar.
- Det går endast att köra instruktioner av typen Data Definition Language (DDL) och Data Manipulation Language (DML), som returnerar ett enda uppdateringsvärde, som del av en batch. Läs mer i Utföra batchåtgärder
SQL Server exempel på mottagarskript
När du använder SQL Server som mottagartyp är det associerade dataflödesskriptet:
IncomingStream sink(allowSchemaDrift: true,
validateSchema: false,
deletable:false,
insertable:true,
updateable:true,
upsertable:true,
keys:['keyColumn'],
format: 'table',
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> SQLSink
Datatypsmappning för SQL Server
När du kopierar data från och till SQL Server används följande mappningar från SQL Server datatyper till Azure Data Factory mellanliggande datatyper. Synapse-pipelines, som implementerar Data Factory, använder samma mappningar. Information om hur kopieringsaktiviteten mappar källschemat och datatypen till mottagaren finns i Mappningar för schema och datatyp.
SQL Server datatyp | Data Factory interimsdatatyp |
---|---|
bigint | Int64 |
binary | Byte[] |
bit | Boolesk |
char | Sträng, tecken[] |
datum | DateTime |
Datumtid | DateTime |
datetime2 | DateTime |
Datetimeoffset | DateTimeOffset |
Decimal | Decimal |
FILESTREAM-attribut (varbinary(max)) | Byte[] |
Float | Double |
image | Byte[] |
int | Int32 |
money | Decimal |
nchar | Sträng, tecken[] |
ntext | Sträng, tecken[] |
numeric | Decimal |
nvarchar | Sträng, tecken[] |
real | Enkel |
Rowversion | Byte[] |
smalldatetime | DateTime |
smallint | Int16 |
smallmoney | Decimal |
Sql_variant | Objekt |
text | Sträng, tecken[] |
time | TimeSpan |
timestamp | Byte[] |
tinyint | Int16 |
uniqueidentifier | GUID |
varbinary | Byte[] |
varchar | Sträng, tecken[] |
xml | Sträng |
Anteckning
För datatyper som mappar till interimtypen Decimal stöder aktiviteten Kopiera för närvarande precision upp till 28. Om du har data som kräver en precision som är större än 28 bör du överväga att konvertera till en sträng i en SQL-fråga.
När du kopierar data från SQL Server med hjälp av Azure Data Factory mappas bitdatatypen till den booleska interimdatatypen. Om du har data som måste behållas som bitdatatyp använder du frågor med T-SQL CAST eller CONVERT.
Egenskaper för uppslagsaktivitet
Mer information om egenskaperna finns i Sökningsaktivitet.
Egenskaper för GetMetadata-aktivitet
Mer information om egenskaperna finns i GetMetadata-aktiviteten
Använda Always Encrypted
När du kopierar data från/till SQL Server med Always Encrypted följer du stegen nedan:
Lagra kolumnhuvudnyckeln (CMK) i en Azure-Key Vault. Läs mer om hur du konfigurerar Always Encrypted med hjälp av Azure Key Vault
Se till att bevilja åtkomst till nyckelvalvet där kolumnhuvudnyckeln (CMK) lagras. I den här artikeln finns nödvändiga behörigheter.
Skapa en länkad tjänst för att ansluta till din SQL-databas och aktivera funktionen "Always Encrypted" med hjälp av antingen hanterad identitet eller tjänstens huvudnamn.
Anteckning
SQL Server Always Encrypted stöder följande scenarier:
- Datalager för antingen källa eller mottagare använder hanterad identitet eller tjänstens huvudnamn som autentiseringstyp för nyckelprovider.
- Både käll- och mottagardatalager använder hanterad identitet som nyckelproviderautentiseringstyp.
- Både käll- och mottagardatalager använder samma tjänsthuvudnamn som nyckelproviderns autentiseringstyp.
Anteckning
För närvarande stöds SQL Server Always Encrypted endast för källomvandling vid mappning av dataflöden.
Intern insamling av ändringsdata
Azure Data Factory har stöd för inbyggda funktioner för insamling av ändringsdata för SQL Server, Azure SQL DB och Azure SQL MI. Ändrade data, inklusive radinfogning, uppdatering och borttagning i SQL-lager, kan identifieras och extraheras automatiskt av ADF-mappningsdataflödet. Utan kod i mappning av dataflöde kan användarna enkelt uppnå datareplikeringsscenario från SQL-lagringsplatser genom att lägga till en databas som mållager. Dessutom kan användarna också skapa valfri datatransformeringslogik däremellan för att uppnå ett inkrementellt ETL-scenario från SQL-lagringsplatser.
Se till att behålla pipelinen och aktivitetsnamnet oförändrade så att kontrollpunkten kan registreras av ADF så att du kan hämta ändrade data från den senaste körningen automatiskt. Om du ändrar ditt pipelinenamn eller aktivitetsnamn återställs kontrollpunkten, vilket leder till att du börjar från början eller hämtar ändringar från och med nu i nästa körning. Om du vill ändra pipelinenamnet eller aktivitetsnamnet men ändå behålla kontrollpunkten för att hämta ändrade data från den senaste körningen automatiskt använder du din egen kontrollpunktsnyckel i dataflödesaktiviteten för att uppnå det.
När du felsöker pipelinen fungerar den här funktionen på samma sätt. Tänk på att kontrollpunkten återställs när du uppdaterar webbläsaren under felsökningskörningen. När du är nöjd med pipelineresultatet från felsökningskörningen kan du publicera och utlösa pipelinen. När du första gången utlöser din publicerade pipeline startas den automatiskt om från början eller hämtar ändringar från och med nu.
I övervakningsavsnittet har du alltid möjlighet att köra en pipeline igen. När du gör det hämtas alltid ändrade data från den tidigare kontrollpunkten för den valda pipelinekörningen.
Exempel 1:
När du direkt kedjar en källtransformering som refereras till en SQL CDC-aktiverad datauppsättning med en mottagartransformering som refereras till en databas i ett mappningsdataflöde, tillämpas ändringarna på SQL-källan automatiskt på måldatabasen, så att du enkelt kan hämta datareplikeringsscenariot mellan databaser. Du kan använda uppdateringsmetoden i mottagartransformering för att välja om du vill tillåta infogning, tillåta uppdatering eller tillåta borttagning i måldatabasen. Exempelskriptet i mappning av dataflödet är som nedan.
source(output(
id as integer,
name as string
),
allowSchemaDrift: true,
validateSchema: false,
enableNativeCdc: true,
netChanges: true,
skipInitialLoad: false,
isolationLevel: 'READ_UNCOMMITTED',
format: 'table') ~> source1
source1 sink(allowSchemaDrift: true,
validateSchema: false,
deletable:true,
insertable:true,
updateable:true,
upsertable:true,
keys:['id'],
format: 'table',
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true,
errorHandlingOption: 'stopOnFirstError') ~> sink1
Exempel 2:
Om du vill aktivera ETL-scenario i stället för datareplikering mellan databasen via SQL CDC kan du använda uttryck i mappning av dataflöde, inklusive isInsert(1), isUpdate(1) och isDelete(1) för att särskilja raderna med olika åtgärdstyper. Följande är ett av exempelskripten för att mappa dataflödet när en kolumn härleds med värdet: 1 för att ange infogade rader, 2 för att ange uppdaterade rader och 3 för att ange borttagna rader för underordnade transformeringar för att bearbeta deltadata.
source(output(
id as integer,
name as string
),
allowSchemaDrift: true,
validateSchema: false,
enableNativeCdc: true,
netChanges: true,
skipInitialLoad: false,
isolationLevel: 'READ_UNCOMMITTED',
format: 'table') ~> source1
source1 derive(operationType = iif(isInsert(1), 1, iif(isUpdate(1), 2, 3))) ~> derivedColumn1
derivedColumn1 sink(allowSchemaDrift: true,
validateSchema: false,
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> sink1
Känd begränsning:
- Endast nettoändringar från SQL CDC läses in av ADF via cdc.fn_cdc_get_net_changes_.
Felsöka anslutningsproblem
Konfigurera din SQL Server-instans för att acceptera fjärranslutningar. Starta SQL Server Management Studio, högerklicka på servern och välj Egenskaper. Välj Anslutningar i listan och markera kryssrutan Tillåt fjärranslutningar till den här servern .
Detaljerade anvisningar finns i Konfigurera konfigurationsalternativet för fjärråtkomstservern.
Starta Konfigurationshanteraren för SQL Server. Expandera SQL Server Nätverkskonfiguration för den instans du vill använda och välj Protokoll för MSSQLSERVER. Protokoll visas i den högra rutan. Aktivera TCP/IP genom att högerklicka på TCP/IP och välja Aktivera.
Mer information och alternativa sätt att aktivera TCP/IP-protokoll finns i Aktivera eller inaktivera ett servernätverksprotokoll.
I samma fönster dubbelklickar du på TCP/IP för att starta fönstret TCP/IP-egenskaper .
Växla till fliken IP-adresser . Rulla ned för att se avsnittet IPAll . Skriv ned TCP-porten. Standardvärdet är 1433.
Skapa en regel för Windows-brandväggen på datorn för att tillåta inkommande trafik via den här porten.
Verifiera anslutningen: Om du vill ansluta till SQL Server med ett fullständigt kvalificerat namn använder du SQL Server Management Studio från en annan dator. Ett exempel är
"<machine>.<domain>.corp.<company>.com,1433"
.
Nästa steg
En lista över datalager som stöds som källor och mottagare av kopieringsaktiviteten finns i Datalager som stöds.