Dela via


Läs in data stegvis från flera tabeller i SQL Server till Azure SQL Database med hjälp av PowerShell

GÄLLER FÖR: Azure Data Factory Azure Synapse Analytics

Dricks

Prova Data Factory i Microsoft Fabric, en allt-i-ett-analyslösning för företag. Microsoft Fabric omfattar allt från dataflytt till datavetenskap, realtidsanalys, business intelligence och rapportering. Lär dig hur du startar en ny utvärderingsversion kostnadsfritt!

I den här självstudien skapar du en Azure Data Factory med en pipeline som läser in deltadata från flera tabeller i en SQL Server-databas till Azure SQL Database.

I de här självstudierna går du igenom följande steg:

  • Förbereda käll- och måldatalager.
  • Skapa en datafabrik.
  • Skapa en lokalt installerad integrationskörning.
  • Installera Integration Runtime.
  • Skapa länkade tjänster.
  • Skapa datauppsättningar för källa, mottagare och vattenstämpel.
  • Skapa, köra och övervaka en pipeline.
  • Granska resultaten.
  • Lägga till eller uppdatera data i källtabeller.
  • Köra och övervaka pipelinen igen.
  • Granska de slutliga resultaten.

Översikt

Här är några viktiga steg för att skapa den här lösningen:

  1. Markera vattenstämpelkolumnen.

    Välj en kolumn för varje tabell i källdatalagret, som du kan identifiera de nya eller uppdaterade posterna för varje körning. Vanligtvis ökar data i den markerade kolumnen (till exempel last_modify_time elle ID) när rader skapas eller uppdateras. Det maximala värdet i den här kolumnen används som vattenstämpel.

  2. Förbered datalagringen för att lagra värdet för vattenstämpeln.

    I den här självstudien lagrar du storleksgränsen i en SQL-databas.

  3. Skapa en pipeline med följande aktiviteter:

    1. Skapa en ForEach-aktivitet som upprepas över en lista med namn på källtabeller och som skickas som en parameter till pipelinen. För varje källtabell anropas följande aktiviteter som utför deltainläsningen för tabellen.

    2. Skapa två sökningsaktiviteter. Använd den första sökningsaktiviteten för att hämta det sista vattenstämpelvärdet. Använd den andra sökningsaktiviteten för att hämta det nya vattenstämpelvärdet. Vattenstämpelvärdena skickas till kopieringsaktiviteten.

    3. Skapa en kopieringsaktivitet som kopierar rader från källdatalagret med värdet för vattenstämpelkolumnen större än det gamla vattenstämpelvärdet och mindre än eller lika med det nya vattenstämpelvärdet. Sedan kopieras deltadata från källdatalagringen till Azure Blob-lagring som en ny fil.

    4. Skapa en StoredProcedure-aktivitet som uppdaterar vattenstämpelvärdet för den pipeline som körs nästa gång.

    Här är det avancerade diagrammet:

    Läsa in data stegvis

Om du inte har någon Azure-prenumeration skapar du ett kostnadsfritt konto innan du börjar.

Förutsättningar

  • SQL Server. Du använder en SQL Server-databas som källdatalager i den här självstudien.
  • Azure SQL Database. Du använder en databas i Azure SQL Database som datalager för mottagare. Om du inte har någon SQL-databas kan du läsa Skapa en databas i Azure SQL Database för steg för att skapa en.

Skapa källtabeller i din SQL Server-databas

  1. Öppna SQL Server Management Studio (SSMS) eller Azure Data Studio och anslut till din SQL Server-databas.

  2. I Server Explorer (SSMS) eller i fönstret Anslutningar (Azure Data Studio) högerklickar du på databasen och väljer Ny fråga.

  3. Kör följande SQL-kommando mot databasen för att skapa tabeller med namnen customer_table och project_table:

     create table customer_table
     (
         PersonID int,
         Name varchar(255),
         LastModifytime datetime
     );
    
     create table project_table
     (
         Project varchar(255),
         Creationtime datetime
     );
    
     INSERT INTO customer_table
     (PersonID, Name, LastModifytime)
     VALUES
     (1, 'John','9/1/2017 12:56:00 AM'),
     (2, 'Mike','9/2/2017 5:23:00 AM'),
     (3, 'Alice','9/3/2017 2:36:00 AM'),
     (4, 'Andy','9/4/2017 3:21:00 AM'),
     (5, 'Anny','9/5/2017 8:06:00 AM');
    
     INSERT INTO project_table
     (Project, Creationtime)
     VALUES
     ('project1','1/1/2015 0:00:00 AM'),
     ('project2','2/2/2016 1:23:00 AM'),
     ('project3','3/4/2017 5:16:00 AM');
    

Skapa måltabeller i din Azure SQL Database

  1. Öppna SQL Server Management Studio (SSMS) eller Azure Data Studio och anslut till din SQL Server-databas.

  2. I Server Explorer (SSMS) eller i fönstret Anslutningar (Azure Data Studio) högerklickar du på databasen och väljer Ny fråga.

  3. Kör följande SQL-kommando mot databasen för att skapa tabeller med namnen customer_table och project_table:

     create table customer_table
     (
         PersonID int,
         Name varchar(255),
         LastModifytime datetime
     );
    
     create table project_table
     (
         Project varchar(255),
         Creationtime datetime
     );
    

Skapa en annan tabell i Azure SQL Database för att lagra värdet för högvattenstämpel

  1. Kör följande SQL-kommando mot databasen för att skapa en tabell med namnet watermarktable för att lagra vattenstämpelvärdet:

     create table watermarktable
     (
    
         TableName varchar(255),
         WatermarkValue datetime,
     );
    
  2. Infoga inledande värden för högvattenmärket för båda källtabellerna i vattenmärkestabellen.

     INSERT INTO watermarktable
     VALUES
     ('customer_table','1/1/2010 12:00:00 AM'),
     ('project_table','1/1/2010 12:00:00 AM');
    

Skapa en lagrad procedur i Azure SQL Database

Kör följande kommando för att skapa en lagrad procedur i databasen. Den här lagrade proceduren uppdaterar vattenmärkets värde efter varje pipelinekörning.

CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS

BEGIN

UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName

END

Skapa datatyper och ytterligare lagrade procedurer i Azure SQL Database

Kör följande fråga för att skapa två lagrade procedurer och två datatyper i databasen. De används för att slå samman data från källtabellerna till måltabellerna.

För att göra resan enkel att börja med använder vi direkt dessa lagrade procedurer som skickar deltadata via en tabellvariabel och sammanfogar dem sedan till målarkivet. Var försiktig så att det inte förväntar sig att ett "stort" antal deltarader (mer än 100) ska lagras i tabellvariabeln.

Om du behöver sammanfoga ett stort antal deltarader i målarkivet föreslår vi att du använder kopieringsaktivitet för att kopiera alla deltadata till en tillfällig "mellanlagringstabell" i mållagret först och sedan skapar en egen lagrad procedur utan att använda tabellvariabeln för att sammanfoga dem från tabellen "mellanlagring" till tabellen "final".

CREATE TYPE DataTypeforCustomerTable AS TABLE(
    PersonID int,
    Name varchar(255),
    LastModifytime datetime
);

GO

CREATE PROCEDURE usp_upsert_customer_table @customer_table DataTypeforCustomerTable READONLY
AS

BEGIN
  MERGE customer_table AS target
  USING @customer_table AS source
  ON (target.PersonID = source.PersonID)
  WHEN MATCHED THEN
      UPDATE SET Name = source.Name,LastModifytime = source.LastModifytime
  WHEN NOT MATCHED THEN
      INSERT (PersonID, Name, LastModifytime)
      VALUES (source.PersonID, source.Name, source.LastModifytime);
END

GO

CREATE TYPE DataTypeforProjectTable AS TABLE(
    Project varchar(255),
    Creationtime datetime
);

GO

CREATE PROCEDURE usp_upsert_project_table @project_table DataTypeforProjectTable READONLY
AS

BEGIN
  MERGE project_table AS target
  USING @project_table AS source
  ON (target.Project = source.Project)
  WHEN MATCHED THEN
      UPDATE SET Creationtime = source.Creationtime
  WHEN NOT MATCHED THEN
      INSERT (Project, Creationtime)
      VALUES (source.Project, source.Creationtime);
END

Azure PowerShell

Installera de senaste Azure PowerShell-modulerna enligt instruktionerna i Installera och konfigurera Azure PowerShell.

Skapa en datafabrik

  1. Definiera en variabel för resursgruppens namn som du kan använda senare i PowerShell-kommandon. Kopiera följande kommandotext till PowerShell, ange ett namn för Azure-resursgruppen, sätt dubbla citattecken omkring namnet och kör sedan kommandot. Ett exempel är "adfrg".

    $resourceGroupName = "ADFTutorialResourceGroup";
    

    Om resursgruppen redan finns behöver du kanske inte skriva över den. Ge variabeln $resourceGroupName ett annat värde och kör kommandot igen.

  2. Definiera en variabel för datafabrikens plats.

    $location = "East US"
    
  3. Kör följande kommando för att skapa en Azure-resursgrupp:

    New-AzResourceGroup $resourceGroupName $location
    

    Om resursgruppen redan finns behöver du kanske inte skriva över den. Ge variabeln $resourceGroupName ett annat värde och kör kommandot igen.

  4. Definiera en variabel för datafabrikens namn.

    Viktigt!

    Uppdateringen av datafabrikens namn för att göra det unikt globalt. Ett exempel är ADFIncMultiCopyTutorialFactorySP1127.

    $dataFactoryName = "ADFIncMultiCopyTutorialFactory";
    
  5. Skapa datafabriken genom att köra följande Set-AzDataFactoryV2-cmdlet :

    Set-AzDataFactoryV2 -ResourceGroupName $resourceGroupName -Location $location -Name $dataFactoryName
    

Observera följande:

  • Namnet på datafabriken måste vara globalt unikt. Om du får följande felmeddelande ändrar du namnet och försöker igen:

    Set-AzDataFactoryV2 : HTTP Status Code: Conflict
    Error Code: DataFactoryNameInUse
    Error Message: The specified resource name 'ADFIncMultiCopyTutorialFactory' is already in use. Resource names must be globally unique.
    
  • Om du vill skapa Data Factory-instanser måste det användarkonto du använder för att logga in på Azure vara medlem av rollerna deltagare eller ägare, eller vara administratör för Azure-prenumerationen.

  • Om du vill se en lista med Azure-regioner där Data Factory är tillgängligt för närvarande markerar du de regioner du är intresserad av på följande sida. Expandera sedan Analytics och leta rätt på Data Factory: Tillgängliga produkter per region. Datalager (Azure Storage, SQL Database, SQL Managed Instance och så vidare) och beräkningar (Azure HDInsight osv.) som används av datafabriken kan finnas i andra regioner.

Skapa en lokalt installerad integrationskörning

I det här avsnittet kan du skapa en lokal Integration Runtime och koppla den till en lokal dator med SQL Server-databasen. Den lokalt installerade integrationskörningen är komponenten som kopierar data från SQL Server på datorn till Azure SQL Database.

  1. Skapa en variabel för namnet på din Integration Runtime. Använd ett unikt namn och anteckna det. Du använder det senare i den här självstudien.

    $integrationRuntimeName = "ADFTutorialIR"
    
  2. Skapa en lokalt installerad integrationskörning.

    Set-AzDataFactoryV2IntegrationRuntime -Name $integrationRuntimeName -Type SelfHosted -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName
    

    Här är exempel på utdata:

     Name              : <Integration Runtime name>
     Type              : SelfHosted
     ResourceGroupName : <ResourceGroupName>
     DataFactoryName   : <DataFactoryName>
     Description       : 
     Id                : /subscriptions/<subscription ID>/resourceGroups/<ResourceGroupName>/providers/Microsoft.DataFactory/factories/<DataFactoryName>/integrationruntimes/ADFTutorialIR
    
  3. Kör följande kommando för att hämta statusen för din skapade Integration Runtime. Kontrollera att värdet för egenskapen State är inställd på NeedRegistration.

    Get-AzDataFactoryV2IntegrationRuntime -name $integrationRuntimeName -ResourceGroupName $resourceGroupName -DataFactoryName $dataFactoryName -Status
    

    Här är exempel på utdata:

    State                     : NeedRegistration
    Version                   : 
    CreateTime                : 9/24/2019 6:00:00 AM
    AutoUpdate                : On
    ScheduledUpdateDate       : 
    UpdateDelayOffset         : 
    LocalTimeZoneOffset       : 
    InternalChannelEncryption : 
    Capabilities              : {}
    ServiceUrls               : {eu.frontend.clouddatahub.net}
    Nodes                     : {}
    Links                     : {}
    Name                      : ADFTutorialIR
    Type                      : SelfHosted
    ResourceGroupName         : <ResourceGroup name>
    DataFactoryName           : <DataFactory name>
    Description               : 
    Id                        : /subscriptions/<subscription ID>/resourceGroups/<ResourceGroup name>/providers/Microsoft.DataFactory/factories/<DataFactory name>/integrationruntimes/<Integration Runtime name>
    
  4. Kör följande kommando som används för att hämta autentiseringsnycklarna för att registrera en lokal Integration Runtime med Azure Data Factory-tjänsten i molnet:

    Get-AzDataFactoryV2IntegrationRuntimeKey -Name $integrationRuntimeName -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName | ConvertTo-Json
    

    Här är exempel på utdata:

    {
     "AuthKey1": "IR@0000000000-0000-0000-0000-000000000000@xy0@xy@xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx=",
     "AuthKey2":  "IR@0000000000-0000-0000-0000-000000000000@xy0@xy@yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy="
    }
    
  5. Kopiera en av nycklarna (uteslut de dubbla citattecknen) för att registrera den lokala installation av Integration Runtime som du installerar på datorn i följande steg.

Installera integration runtime-verktyget

  1. Om du redan har Integration Runtime på din dator ska du avinstallera det med Lägg till eller ta bort program.

  2. Ladda ned Integration Runtime med egen värd på en lokal Windows-dator. Kör installationen.

  3. På sidan Välkommen till Microsoft Integration Runtime klickar du på Nästa.

  4. På sidan med licensavtalet för slutanvändare godkänner du villkoren och väljer du Nästa.

  5. På sidan Målmapp väljer du Nästa.

  6. På sidan Klar att installera Microsoft Integration Runtime väljer du Installera.

  7. På sidan Slutfört installationen av Microsoft Integration Runtime väljer du Slutför.

  8. Klistra in den nyckel som du sparade i föregående avsnitt på sidan Registrera Integration Runtime (lokal installation) och välj Registrera.

    Registrera Integration Runtime

  9. På sidan Ny integrationskörningsnod (lokalt installerad) väljer du Slutför.

  10. När integration runtime med egen värd har registrerats ser du följande meddelande:

    Registered successfully (Registreringen lyckades)

  11. På sidan Registrera Integration Runtime (lokal installation) väljer du Starta konfigurationshanteraren.

  12. När noden är ansluten till molntjänsten ser du följande sida:

    Sidan Noden är ansluten

  13. Testa nu anslutningen till din SQL Server-databasen.

    Fliken Diagnostik

    a. I fönstret Konfigurationshanteraren växlar du till fliken Diagnostik.

    b. VäljSqlServer som typ av datakälla.

    c. Ange servernamnet.

    d. Ange namnet på databasen.

    e. Välj autentiseringsläge.

    f. Ange användarnamnet.

    g. Ange lösenordet som är associerat med för användarnamnet.

    h. Klicka på Test för att bekräfta att Integration Runtime kan ansluta till SQL Server. Du ser en grön bockmarkering om anslutningen är klar. Om anslutningen inte lyckats får du ett felmeddelande. Åtgärda eventuella problem och se till att Integration Runtime kan ansluta till SQL Server.

    Kommentar

    Anteckna värdena för autentiseringstyp, server, databas, användare och lösenord. Du använder det senare i den här självstudien.

Skapa länkade tjänster

Du kan skapa länkade tjänster i en datafabrik för att länka ditt datalager och beräkna datafabrik-tjänster. I det här avsnittet skapar du länkade tjänster till din SQL Server-databas och din databas i Azure SQL Database.

Skapa länkad tjänst till SQL Server

I det här steget länkar du SQL Server-databasen till datafabriken.

  1. Skapa en JSON-fil med namnet SqlServerLinkedService.json i mappen C:\ADFTutorials\IncCopyMultiTableTutorial (skapa de lokala mapparna om de inte redan finns) med följande innehåll. Välj rätt avsnitt baserat på vilken autentisering du använder när du ansluter till SQL Server.

    Viktigt!

    Välj rätt avsnitt baserat på vilken autentisering du använder när du ansluter till SQL Server.

    Om du använder SQL-autentisering kopierar du följande JSON-definition:

     {
         "name":"SqlServerLinkedService",
         "properties":{
             "annotations":[
    
             ],
             "type":"SqlServer",
             "typeProperties":{
                 "connectionString":"integrated security=False;data source=<servername>;initial catalog=<database name>;user id=<username>;Password=<password>"
             },
             "connectVia":{
                 "referenceName":"<integration runtime name>",
                 "type":"IntegrationRuntimeReference"
             }
         }
     }
    

    Om du använder Windows-autentisering kopierar du följande JSON-definition:

     {
         "name":"SqlServerLinkedService",
         "properties":{
             "annotations":[
    
             ],
             "type":"SqlServer",
             "typeProperties":{
                 "connectionString":"integrated security=True;data source=<servername>;initial catalog=<database name>",
                 "userName":"<username> or <domain>\\<username>",
                 "password":{
                     "type":"SecureString",
                     "value":"<password>"
                 }
             },
             "connectVia":{
                 "referenceName":"<integration runtime name>",
                 "type":"IntegrationRuntimeReference"
             }
         }
     }
    

    Viktigt!

    • Välj rätt avsnitt baserat på vilken autentisering du använder när du ansluter till SQL Server.
    • Ersätt <integration runtime-namnet> med namnet på din integrationskörning.
    • Ersätt <servernamn>, <databasnamn>, <användarnamn> och <lösenord> med värden för SQL Server-databasen innan du sparar filen.
    • Om du behöver använda ett snedstreck (\) i ditt användarkonto eller användarnamn använder du escape-tecknet (\). Ett exempel är mydomain\\myuser.
  2. I PowerShell kör du följande cmdlet för att växla till mappen C:\ADFTutorials\IncCopyMultiTableTutorial.

    Set-Location 'C:\ADFTutorials\IncCopyMultiTableTutorial'
    
  3. Kör cmdleten Set-AzDataFactoryV2LinkedService för att skapa den länkade tjänsten AzureStorageLinkedService. I följande exempel skickar du värden för parametrarna ResourceGroupName och DataFactoryName:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SqlServerLinkedService" -File ".\SqlServerLinkedService.json"
    

    Här är exempel på utdata:

    LinkedServiceName : SqlServerLinkedService
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Properties        : Microsoft.Azure.Management.DataFactory.Models.SqlServerLinkedService
    

Skapa den länkade SQL Database-tjänsten

  1. Skapa en JSON-fil med namnet AzureSQLDatabaseLinkedService.json i mappen C:\ADFTutorials\IncCopyMultiTableTutorial med följande innehåll. (Skapa mappen ADF om den inte redan finns.) Ersätt <servernamn>, <databasnamn>, <användarnamn> och <lösenord> med namnet på din SQL Server-databas, namnet på databasen, användarnamnet och lösenordet innan du sparar filen.

     {
         "name":"AzureSQLDatabaseLinkedService",
         "properties":{
             "annotations":[
    
             ],
             "type":"AzureSqlDatabase",
             "typeProperties":{
                 "connectionString":"integrated security=False;encrypt=True;connection timeout=30;data source=<servername>.database.windows.net;initial catalog=<database name>;user id=<user name>;Password=<password>;"
             }
         }
     }
    
  2. I PowerShell kör du cmdleten Set-AzDataFactoryV2LinkedService för att skapa den länkade tjänsten AzureSQLDatabaseLinkedService.

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureSQLDatabaseLinkedService" -File ".\AzureSQLDatabaseLinkedService.json"
    

    Här är exempel på utdata:

    LinkedServiceName : AzureSQLDatabaseLinkedService
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlDatabaseLinkedService
    

Skapa datauppsättningar

I det här steget skapar du datauppsättningar som representerar datakällan, datamålet och platsen för vattenstämpeln.

Skapa en källdatauppsättning

  1. Skapa en JSON-fil med namnet SourceDataset.json i samma mapp med följande innehåll:

    {
         "name":"SourceDataset",
         "properties":{
             "linkedServiceName":{
                 "referenceName":"SqlServerLinkedService",
                 "type":"LinkedServiceReference"
             },
             "annotations":[
    
             ],
             "type":"SqlServerTable",
             "schema":[
    
             ]
         }
    }
    

    Kopieringsaktivitet i pipelinen använder en SQL-fråga till att läsa in data, snarare än att läsa in hela tabellen.

  2. Kör cmdleten Set-AzDataFactoryV2Dataset för att skapa datauppsättningen SourceDataset.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SourceDataset" -File ".\SourceDataset.json"
    

    Här är exempel på utdata från cmdleten:

    DatasetName       : SourceDataset
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.SqlServerTableDataset
    

Skapa en källdatauppsättning

  1. Skapa en JSON-fil med namnet SinkDataset.json i samma mapp med följande innehåll. Elementet tableName har angetts av pipelinen dynamiskt vid körning. ForEach-aktiviteten i pipelinen upprepas över en lista med tabellnamn och skickar tabellnamnet till datamängden i varje iteration.

     {
         "name":"SinkDataset",
         "properties":{
             "linkedServiceName":{
                 "referenceName":"AzureSQLDatabaseLinkedService",
                 "type":"LinkedServiceReference"
             },
             "parameters":{
                 "SinkTableName":{
                     "type":"String"
                 }
             },
             "annotations":[
    
             ],
             "type":"AzureSqlTable",
             "typeProperties":{
                 "tableName":{
                     "value":"@dataset().SinkTableName",
                     "type":"Expression"
                 }
             }
         }
     }
    
  2. Kör cmdleten Set-AzDataFactoryV2Dataset för att skapa datauppsättningen SinkDataset.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SinkDataset" -File ".\SinkDataset.json"
    

    Här är exempel på utdata från cmdleten:

    DatasetName       : SinkDataset
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
    

Skapa en datauppsättning för en vattenstämpel

I det här steget skapar du en datauppsättning för att lagra ett värde för ett högvattenmärke.

  1. Skapa en JSON-fil med namnet WatermarkDataset.json i samma mapp med följande innehåll:

     {
         "name": " WatermarkDataset ",
         "properties": {
             "type": "AzureSqlTable",
             "typeProperties": {
                 "tableName": "watermarktable"
             },
             "linkedServiceName": {
                 "referenceName": "AzureSQLDatabaseLinkedService",
                 "type": "LinkedServiceReference"
             }
         }
     }
    
  2. Kör cmdleten Set-AzDataFactoryV2Dataset för att skapa datauppsättningen WatermarkDataset.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "WatermarkDataset" -File ".\WatermarkDataset.json"
    

    Här är exempel på utdata från cmdleten:

    DatasetName       : WatermarkDataset
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
    

Skapa en pipeline

Den här pipelinen tar en lista med tabellnamn som en parameter. ForEach-aktiviteten itererar genom listan med tabellnamn och utför följande åtgärder:

  1. Använd uppslagsaktiviteten för att hämta det gamla vattenstämpelvärdet (det ursprungliga värdet eller det som användes i den senaste iterationen).

  2. Använd uppslagsaktiviteten för att hämta det nya vattenstämpelvärdet (det maximala värdet för vattenstämpelkolumnen i källtabellen).

  3. Använd aktiviteten Kopiera för att kopiera data mellan dessa två vattenstämpelvärden från källdatabasen till måldatabasen.

  4. Använd aktiviteten StoredProcedure för att uppdatera det gamla vattenstämpelvärdet som ska användas i det första steget i nästa iteration.

Skapa pipelinen

  1. Skapa en JSON-fil med namnet IncrementalCopyPipeline.json i samma mapp med följande innehåll:

     {
         "name":"IncrementalCopyPipeline",
         "properties":{
             "activities":[
                 {
                     "name":"IterateSQLTables",
                     "type":"ForEach",
                     "dependsOn":[
    
                     ],
                     "userProperties":[
    
                     ],
                     "typeProperties":{
                         "items":{
                             "value":"@pipeline().parameters.tableList",
                             "type":"Expression"
                         },
                         "isSequential":false,
                         "activities":[
                             {
                                 "name":"LookupOldWaterMarkActivity",
                                 "type":"Lookup",
                                 "dependsOn":[
    
                                 ],
                                 "policy":{
                                     "timeout":"7.00:00:00",
                                     "retry":0,
                                     "retryIntervalInSeconds":30,
                                     "secureOutput":false,
                                     "secureInput":false
                                 },
                                 "userProperties":[
    
                                 ],
                                 "typeProperties":{
                                     "source":{
                                         "type":"AzureSqlSource",
                                         "sqlReaderQuery":{
                                             "value":"select * from watermarktable where TableName  =  '@{item().TABLE_NAME}'",
                                             "type":"Expression"
                                         }
                                     },
                                     "dataset":{
                                         "referenceName":"WatermarkDataset",
                                         "type":"DatasetReference"
                                     }
                                 }
                             },
                             {
                                 "name":"LookupNewWaterMarkActivity",
                                 "type":"Lookup",
                                 "dependsOn":[
    
                                 ],
                                 "policy":{
                                     "timeout":"7.00:00:00",
                                     "retry":0,
                                     "retryIntervalInSeconds":30,
                                     "secureOutput":false,
                                     "secureInput":false
                                 },
                                 "userProperties":[
    
                                 ],
                                 "typeProperties":{
                                     "source":{
                                         "type":"SqlServerSource",
                                         "sqlReaderQuery":{
                                             "value":"select MAX(@{item().WaterMark_Column}) as NewWatermarkvalue from @{item().TABLE_NAME}",
                                             "type":"Expression"
                                         }
                                     },
                                     "dataset":{
                                         "referenceName":"SourceDataset",
                                         "type":"DatasetReference"
                                     },
                                     "firstRowOnly":true
                                 }
                             },
                             {
                                 "name":"IncrementalCopyActivity",
                                 "type":"Copy",
                                 "dependsOn":[
                                     {
                                         "activity":"LookupOldWaterMarkActivity",
                                         "dependencyConditions":[
                                             "Succeeded"
                                         ]
                                     },
                                     {
                                         "activity":"LookupNewWaterMarkActivity",
                                         "dependencyConditions":[
                                             "Succeeded"
                                         ]
                                     }
                                 ],
                                 "policy":{
                                     "timeout":"7.00:00:00",
                                     "retry":0,
                                     "retryIntervalInSeconds":30,
                                     "secureOutput":false,
                                     "secureInput":false
                                 },
                                 "userProperties":[
    
                                 ],
                                 "typeProperties":{
                                     "source":{
                                         "type":"SqlServerSource",
                                         "sqlReaderQuery":{
                                             "value":"select * from @{item().TABLE_NAME} where @{item().WaterMark_Column} > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and @{item().WaterMark_Column} <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'",
                                             "type":"Expression"
                                         }
                                     },
                                     "sink":{
                                         "type":"AzureSqlSink",
                                         "sqlWriterStoredProcedureName":{
                                             "value":"@{item().StoredProcedureNameForMergeOperation}",
                                             "type":"Expression"
                                         },
                                         "sqlWriterTableType":{
                                             "value":"@{item().TableType}",
                                             "type":"Expression"
                                         },
                                         "storedProcedureTableTypeParameterName":{
                                             "value":"@{item().TABLE_NAME}",
                                             "type":"Expression"
                                         },
                                         "disableMetricsCollection":false
                                     },
                                     "enableStaging":false
                                 },
                                 "inputs":[
                                     {
                                         "referenceName":"SourceDataset",
                                         "type":"DatasetReference"
                                     }
                                 ],
                                 "outputs":[
                                     {
                                         "referenceName":"SinkDataset",
                                         "type":"DatasetReference",
                                         "parameters":{
                                             "SinkTableName":{
                                                 "value":"@{item().TABLE_NAME}",
                                                 "type":"Expression"
                                             }
                                         }
                                     }
                                 ]
                             },
                             {
                                 "name":"StoredProceduretoWriteWatermarkActivity",
                                 "type":"SqlServerStoredProcedure",
                                 "dependsOn":[
                                     {
                                         "activity":"IncrementalCopyActivity",
                                         "dependencyConditions":[
                                             "Succeeded"
                                         ]
                                     }
                                 ],
                                 "policy":{
                                     "timeout":"7.00:00:00",
                                     "retry":0,
                                     "retryIntervalInSeconds":30,
                                     "secureOutput":false,
                                     "secureInput":false
                                 },
                                 "userProperties":[
    
                                 ],
                                 "typeProperties":{
                                     "storedProcedureName":"[dbo].[usp_write_watermark]",
                                     "storedProcedureParameters":{
                                         "LastModifiedtime":{
                                             "value":{
                                                 "value":"@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}",
                                                 "type":"Expression"
                                             },
                                             "type":"DateTime"
                                         },
                                         "TableName":{
                                             "value":{
                                                 "value":"@{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}",
                                                 "type":"Expression"
                                             },
                                             "type":"String"
                                         }
                                     }
                                 },
                                 "linkedServiceName":{
                                     "referenceName":"AzureSQLDatabaseLinkedService",
                                     "type":"LinkedServiceReference"
                                 }
                             }
                         ]
                     }
                 }
             ],
             "parameters":{
                 "tableList":{
                     "type":"array"
                 }
             },
             "annotations":[
    
             ]
         }
     }
    
  2. Kör cmdleten Set-AzDataFactoryV2Pipeline för att skapa pipelinen IncrementalCopyPipeline.

    Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "IncrementalCopyPipeline" -File ".\IncrementalCopyPipeline.json"
    

    Här är exempel på utdata:

     PipelineName      : IncrementalCopyPipeline
     ResourceGroupName : <ResourceGroupName>
     DataFactoryName   : <DataFactoryName>
     Activities        : {IterateSQLTables}
     Parameters        : {[tableList, Microsoft.Azure.Management.DataFactory.Models.ParameterSpecification]}
    

Köra pipelinen

  1. Skapa en parameterfil med namnet Parameters.json i samma mapp med följande innehåll:

     {
         "tableList":
         [
             {
                 "TABLE_NAME": "customer_table",
                 "WaterMark_Column": "LastModifytime",
                 "TableType": "DataTypeforCustomerTable",
                 "StoredProcedureNameForMergeOperation": "usp_upsert_customer_table"
             },
             {
                 "TABLE_NAME": "project_table",
                 "WaterMark_Column": "Creationtime",
                 "TableType": "DataTypeforProjectTable",
                 "StoredProcedureNameForMergeOperation": "usp_upsert_project_table"
             }
         ]
     }
    
  2. Kör pipelinen IncrementalCopyPipeline med hjälp av cmdleten Invoke-AzDataFactoryV2Pipeline . Ersätt platshållarna med din egen grupp och datafabrikens namn.

    $RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroup $resourceGroupName -dataFactoryName $dataFactoryName -ParameterFile ".\Parameters.json"
    

Övervaka pipelinen

  1. Logga in på Azure-portalen.

  2. Välj Alla tjänster, sök med nyckelordet Datafabriker och välj Datafabriker.

  3. Sök efter din datafabrik i listan med datafabriker och välj den så att du öppnar sidan Datafabrik.

  4. På sidan Datafabrik väljer du Öppnapanelen Öppna Azure Data Factory Studio för att starta Azure Data Factory på en separat flik.

  5. På startsidan för Azure Data Factory väljer du Övervaka till vänster.

    Skärmbild som visar startsidan för Azure Data Factory.

  6. Du kan se alla pipelinekörningar och deras status. Lägg i följande exempel märke till att statusen för pipelinekörningen är Lyckades. Du kan kontrollera parametrarna som skickats till pipelinen genom att klicka på länken i kolumnen Parametrar. Om det uppstod ett fel ser du en länk i kolumnen Fel.

    Skärmbild som visar pipelinekörningar för en datafabrik, inklusive din pipeline.

  7. När du väljer länken i kolumnen Åtgärder visas alla aktivitetskörningar för pipelinen.

  8. Om du vill gå tillbaka till vyn Pipelinekörningar väljer du Alla pipelinekörningar.

Granska resultaten

Kör följande frågor mot SQL-måldatabasen i SQL Server Management Studio för att verifiera att data har kopierats från källtabellerna till måltabellerna:

Fråga

select * from customer_table

Output

===========================================
PersonID    Name    LastModifytime
===========================================
1            John    2017-09-01 00:56:00.000
2            Mike    2017-09-02 05:23:00.000
3            Alice    2017-09-03 02:36:00.000
4            Andy    2017-09-04 03:21:00.000
5            Anny    2017-09-05 08:06:00.000

Fråga

select * from project_table

Output

===================================
Project        Creationtime
===================================
project1    2015-01-01 00:00:00.000
project2    2016-02-02 01:23:00.000
project3    2017-03-04 05:16:00.000

Fråga

select * from watermarktable

Output

======================================
TableName        WatermarkValue
======================================
customer_table    2017-09-05 08:06:00.000
project_table    2017-03-04 05:16:00.000

Observera att vattenstämpelvärdena för båda tabellerna har uppdaterats.

Lägga till mer data i källtabellerna

Kör följande fråga mot SQL Servers källdatabas för att uppdatera en befintlig rad i customer_table. Infoga en ny rad i project_table.

UPDATE customer_table
SET [LastModifytime] = '2017-09-08T00:00:00Z', [name]='NewName' where [PersonID] = 3

INSERT INTO project_table
(Project, Creationtime)
VALUES
('NewProject','10/1/2017 0:00:00 AM');

Kör pipelinen igen

  1. Kör nu pipelinen igen genom att köra följande PowerShell-kommando:

    $RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroup $resourceGroupname -dataFactoryName $dataFactoryName -ParameterFile ".\Parameters.json"
    
  2. Övervaka pipelinekörningarna genom att följa anvisningarna i avsnittet Övervaka pipelinen. När pipelinestatusen är Pågår visas en annan åtgärdslänk under Åtgärder för att avbryta pipelinekörningen.

  3. Välj Uppdatera om du vill uppdatera listan tills pipelinekörningen lyckas.

  4. Om du vill kan du välja Visa aktivitetskörningar under Åtgärder om du vill se alla aktivitetskörningar som är associerade med den här pipelinekörningen.

Granska de slutliga resultaten

Kör följande frågor mot måldatabasen i SQL Server Management Studio för att verifiera att nya/uppdaterade data har kopierats från källtabellerna till måltabellerna.

Fråga

select * from customer_table

Output

===========================================
PersonID    Name    LastModifytime
===========================================
1            John    2017-09-01 00:56:00.000
2            Mike    2017-09-02 05:23:00.000
3            NewName    2017-09-08 00:00:00.000
4            Andy    2017-09-04 03:21:00.000
5            Anny    2017-09-05 08:06:00.000

Lägg märke till de nya värdena för Name och LastModifytime för PersonID för nummer 3.

Fråga

select * from project_table

Output

===================================
Project        Creationtime
===================================
project1    2015-01-01 00:00:00.000
project2    2016-02-02 01:23:00.000
project3    2017-03-04 05:16:00.000
NewProject    2017-10-01 00:00:00.000

Observera att posten NewProject har lagts till i project_table.

Fråga

select * from watermarktable

Output

======================================
TableName        WatermarkValue
======================================
customer_table    2017-09-08 00:00:00.000
project_table    2017-10-01 00:00:00.000

Observera att vattenstämpelvärdena för båda tabellerna har uppdaterats.

I den här självstudiekursen fick du:

  • Förbereda käll- och måldatalager.
  • Skapa en datafabrik.
  • Skapa Integration Runtime (IR) med egen värd.
  • Installera Integration Runtime.
  • Skapa länkade tjänster.
  • Skapa datauppsättningar för källa, mottagare och vattenstämpel.
  • Skapa, köra och övervaka en pipeline.
  • Granska resultaten.
  • Lägga till eller uppdatera data i källtabeller.
  • Köra och övervaka pipelinen igen.
  • Granska de slutliga resultaten.

Fortsätt till följande självstudie och lär dig att transformera data med ett Spark-kluster på Azure: