Fokozatos adatok betöltése több táblából az SQL Serverből az Azure SQL Database-be a PowerShell használatával

Vonatkozik: Azure Data Factory Azure Synapse Analytics

Tipp

Data Factory a Microsoft Fabric a Azure Data Factory következő generációja, egyszerűbb architektúrával, beépített AI-vel és új funkciókkal. Ha még nem ismerkedik az adatintegrációval, kezdje a Fabric Data Factoryvel. A meglévő ADF-számítási feladatok frissíthetők Fabric használatával, hogy elérjék az adatkutatás, a valós idejű elemzés és a jelentéskészítés új képességeit.

Ebben az oktatóanyagban létrehoz egy Azure Data Factory-t egy folyamattal, amely az SQL Server adatbázis több táblájából tölti be a változásokat az Azure SQL Database-be.

Az oktatóanyagban az alábbi lépéseket fogja végrehajtani:

  • Forrás- és céladattárak előkészítése.
  • Adat-előállító létrehozása
  • Saját üzemeltetésű integrációs modul létrehozása.
  • Telepítse az integrációs futtatókörnyezetet.
  • Társított szolgáltatások létrehozása.
  • Forrás-, fogadó- és küszöbadatkészletek létrehozása.
  • Folyamat létrehozása, futtatása és figyelése.
  • Tekintse át az eredményeket.
  • Adatok hozzáadása vagy frissítése a forrástáblákban.
  • Futtassa újra és figyelje a csővezetéket.
  • Tekintse át a végső eredményeket.

Overview

Az alábbiak a megoldás kialakításának leglényegesebb lépései:

  1. Válassza ki a vízjel oszlopot.

    Jelöljön ki egy oszlopot a forrásadattár minden táblájához, amely minden futtatás új vagy frissített rekordjait azonosíthatja. Normális esetben az ebben a kiválasztott oszlopban (például: last_modify_time vagy ID) lévő adatok a sorok létrehozásával vagy frissítésével folyamatosan növekednek. Az ebben az oszlopban lévő legnagyobb érték szolgál a küszöbként.

  2. Egy adatraktár előkészítése a küszöbértékek tárolására.

    Ebben az oktatóanyagban a küszöbértékeket egy SQL-adatbázisban tároljuk.

  3. Hozzon létre egy folyamatot a következő tevékenységekkel:

    1. Hozzon létre egy ForEach-tevékenységet, amely a folyamat paramétereként átadott forrástáblanevek listáján keresztül halad végig. Az egyes forrástáblák esetében az alábbi tevékenységek kerülnek meghívásra a tábla delta betöltésének végrehajtására.

    2. Hozzon létre két keresési tevékenységet. Az első lekérdezési tevékenység az utolsó vízjel értéket kéri le. Használja a második lekérdezési tevékenységet az új vízjel érték lekéréséhez. Ezeket a vízjelértékeket a Copy tevékenység-nek adják át.

    3. Hozzon létre egy Másolási tevékenységet, amely a forrásadattárból másolja azokat a sorokat, amelyeknek a vízjol oszlopának értéke nagyobb a régi vízjel értékénél, és kisebb vagy egyenlő az új vízjel értékével. Ezután átmásolja a deltaadatokat a forrásadattárból új fájlként Azure Blob Storage-ba.

    4. Egy StoredProcedure tevékenység létrehozása, amely frissíti a küszöbértékeket a folyamat következő futtatásához.

    Itt látható a megoldás összefoglaló jellegű ábrája:

    Adatok növekményes betöltése

Ha nem rendelkezik Azure-előfizetéssel, a kezdés előtt hozzon létre egy free fiókot.

Prerequisites

  • SQL Server. Ebben az oktatóanyagban egy SQL Server adatbázist használ forrásadattárként.
  • Azure SQL Database. Adatbázist használ a Azure SQL Database fogadóadattárként. Ha még nincs SQL-adatbázisa, tekintse meg a Adatbázis létrehozása az Azure SQL Database-ben, hogy megtekintse a létrehozás lépéseit.

Forrástáblák létrehozása a SQL Server-adatbázisban

  1. Nyissa meg SQL Server Management Studio (SSMS) vagy Visual Studio Code, és csatlakozzon a SQL Server-adatbázishoz.

  2. A Kiszolgálókezelő (SSMS) vagy a Kapcsolódások panelen (Visual Studio Code) kattintson a jobb gombbal az adatbázisra, és válassza a Új lekérdezés lehetőséget.

  3. Futtassa a következő SQL-parancsot az adatbázison a következő nevű táblák customer_tableproject_tablelétrehozásához:

     create table customer_table
     (
         PersonID int,
         Name varchar(255),
         LastModifytime datetime
     );
    
     create table project_table
     (
         Project varchar(255),
         Creationtime datetime
     );
    
     INSERT INTO customer_table
     (PersonID, Name, LastModifytime)
     VALUES
     (1, 'John','9/1/2017 12:56:00 AM'),
     (2, 'Mike','9/2/2017 5:23:00 AM'),
     (3, 'Alice','9/3/2017 2:36:00 AM'),
     (4, 'Andy','9/4/2017 3:21:00 AM'),
     (5, 'Anny','9/5/2017 8:06:00 AM');
    
     INSERT INTO project_table
     (Project, Creationtime)
     VALUES
     ('project1','1/1/2015 0:00:00 AM'),
     ('project2','2/2/2016 1:23:00 AM'),
     ('project3','3/4/2017 5:16:00 AM');
    

Céltáblák létrehozása a Azure SQL Database

  1. Nyissa meg SQL Server Management Studio (SSMS) vagy Visual Studio Code, és csatlakozzon a SQL Server-adatbázishoz.

  2. A Kiszolgálókezelő (SSMS) vagy a Kapcsolódások panelen (Visual Studio Code) kattintson a jobb gombbal az adatbázisra, és válassza a Új lekérdezés lehetőséget.

  3. Futtassa a következő SQL-parancsot az adatbázison a következő nevű táblák customer_tableproject_tablelétrehozásához:

     create table customer_table
     (
         PersonID int,
         Name varchar(255),
         LastModifytime datetime
     );
    
     create table project_table
     (
         Project varchar(255),
         Creationtime datetime
     );
    

Hozzon létre egy másik táblát az Azure SQL-adatbázisban a magas vízjel értékének tárolásához.

  1. Futtassa a következő SQL-parancsot az adatbázisán egy watermarktable nevű tábla létrehozásához, amely a vízjel értékét fogja tárolni:

     create table watermarktable
     (
    
         TableName varchar(255),
         WatermarkValue datetime,
     );
    
  2. Szúrja be mindkét forrástábla kezdeti vízjelértékét a vízjeltáblába.

     INSERT INTO watermarktable
     VALUES
     ('customer_table','1/1/2010 12:00:00 AM'),
     ('project_table','1/1/2010 12:00:00 AM');
    

Tárolt eljárás létrehozása a Azure SQL Database

Futtassa a következő parancsot egy tárolt eljárás létrehozásához az adatbázisban. Ez a tárolt eljárás minden csővezeték futtatása után frissíti a vízjel értékét.

CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS

BEGIN

UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName

END

Azure SQL Database adattípusok és további tárolt eljárások létrehozása

Futtassa a következő lekérdezést két tárolt eljárás és két adattípus létrehozásához az adatbázisban. A forrástáblákból származó adatok céltáblákba való egyesítése.

Annak érdekében, hogy az út könnyen kezdődjön, közvetlenül ezeket a tárolt eljárásokat használjuk, amelyek egy táblaváltozón keresztül adják át a deltaadatokat, majd egyesítjük őket a céltárolóba. Legyen óvatos, nem javasolt, hogy "nagy" számú (100-nál több) delta sort tároljanak a táblaváltozóban.

Ha nagy számú deltasort kell egyesítenie a céltárolóba, javasoljuk, hogy másolási tevékenység használatával másolja az összes delta-adatot egy ideiglenes "átmeneti" táblába a céltárolóban, majd a táblaváltozó használata nélkül saját tárolt eljárást épített ki az "előkészítési" táblából a "végleges" táblába való egyesítéshez.

CREATE TYPE DataTypeforCustomerTable AS TABLE(
    PersonID int,
    Name varchar(255),
    LastModifytime datetime
);

GO

CREATE PROCEDURE usp_upsert_customer_table @customer_table DataTypeforCustomerTable READONLY
AS

BEGIN
  MERGE customer_table AS target
  USING @customer_table AS source
  ON (target.PersonID = source.PersonID)
  WHEN MATCHED THEN
      UPDATE SET Name = source.Name,LastModifytime = source.LastModifytime
  WHEN NOT MATCHED THEN
      INSERT (PersonID, Name, LastModifytime)
      VALUES (source.PersonID, source.Name, source.LastModifytime);
END

GO

CREATE TYPE DataTypeforProjectTable AS TABLE(
    Project varchar(255),
    Creationtime datetime
);

GO

CREATE PROCEDURE usp_upsert_project_table @project_table DataTypeforProjectTable READONLY
AS

BEGIN
  MERGE project_table AS target
  USING @project_table AS source
  ON (target.Project = source.Project)
  WHEN MATCHED THEN
      UPDATE SET Creationtime = source.Creationtime
  WHEN NOT MATCHED THEN
      INSERT (Project, Creationtime)
      VALUES (source.Project, source.Creationtime);
END

Azure PowerShell

Telepítse a legújabb Azure PowerShell modulokat a Installban található utasításokat követve, és konfigurálja a Azure PowerShell.

Adat-előállító létrehozása

  1. Adjon meg egy olyan változót, amelyet később a PowerShell-parancsokban az erőforráscsoport neveként fog használni. Másolja a következő parancsszöveget a PowerShellbe, adja meg a Azure erőforráscsoport nevét dupla idézőjelek között, majd futtassa a parancsot. Például: "adfrg".

    $resourceGroupName = "ADFTutorialResourceGroup";
    

    Ha az erőforráscsoport már létezik, előfordulhat, hogy nem kívánja felülírni. Rendeljen egy másik értéket a $resourceGroupName változóhoz, majd futtassa újra a parancsot.

  2. Adjon meg egy változót az adat-előállító helyéhez.

    $location = "East US"
    
  3. A Azure erőforráscsoport létrehozásához futtassa a következő parancsot:

    New-AzResourceGroup $resourceGroupName $location
    

    Ha az erőforráscsoport már létezik, előfordulhat, hogy nem kívánja felülírni. Rendeljen egy másik értéket a $resourceGroupName változóhoz, majd futtassa újra a parancsot.

  4. Adjon meg egy változót az adat-előállító nevéhez.

    Fontos

    Frissítse az adat-előállító nevét, hogy globálisan egyedi legyen. Ilyen például az ADFIncMultiCopyTutorialFactorySP1127.

    $dataFactoryName = "ADFIncMultiCopyTutorialFactory";
    
  5. Az adat-előállító létrehozásához futtassa a következő Set-AzDataFactoryV2 parancsmagot:

    Set-AzDataFactoryV2 -ResourceGroupName $resourceGroupName -Location $location -Name $dataFactoryName
    

Jegyezze fel a következő pontokat:

  • Az adat-előállító nevének globálisan egyedinek kell lennie. Ha a következő hibaüzenetet kapja, módosítsa a nevet, majd próbálkozzon újra:

    Set-AzDataFactoryV2 : HTTP Status Code: Conflict
    Error Code: DataFactoryNameInUse
    Error Message: The specified resource name 'ADFIncMultiCopyTutorialFactory' is already in use. Resource names must be globally unique.
    
  • Data Factory-példányok létrehozásához a Azure való bejelentkezéshez használt felhasználói fióknak közreműködői vagy tulajdonosi szerepkörök tagjának vagy a Azure-előfizetés rendszergazdájának kell lennie.

  • Azoknak a Azure régióknak a listájához, amelyekben a Data Factory jelenleg elérhető, jelölje ki az Önt érdeklő régiókat az alábbi lapon, majd bontsa ki a Analytics elemet a Data Factory: Régiók szerint elérhető termékek. Az adat-előállító által használt adattárak (Azure Storage, SQL Database, SQL Managed Instance stb.) és számításai (Azure HDInsight stb.) más régiókban is lehetnek.

Saját üzemeltetésű integrációs modul létrehozása

Ebben a szakaszban létrehoz egy saját üzemeltetésű integrációs modult, és társítja azt egy helyszíni géppel a SQL Server adatbázishoz. A saját üzemeltetésű integrációs modul az az összetevő, amely adatokat másol a gépéről a SQL Serverről az Azure SQL Database-be.

  1. Hozzon létre egy változót az integrációs modul nevéhez. Használjon egyedi nevet, és jegyezze fel. Ezt az oktatóanyag későbbi részében fogja használni.

    $integrationRuntimeName = "ADFTutorialIR"
    
  2. Saját üzemeltetésű integrációs modul létrehozása.

    Set-AzDataFactoryV2IntegrationRuntime -Name $integrationRuntimeName -Type SelfHosted -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName
    

    Itt látható a minta kimenete:

     Name              : <Integration Runtime name>
     Type              : SelfHosted
     ResourceGroupName : <ResourceGroupName>
     DataFactoryName   : <DataFactoryName>
     Description       : 
     Id                : /subscriptions/<subscription ID>/resourceGroups/<ResourceGroupName>/providers/Microsoft.DataFactory/factories/<DataFactoryName>/integrationruntimes/ADFTutorialIR
    
  3. A létrehozott integrációs modul állapotának lekéréséhez futtassa a következő parancsot. Győződjön meg arról, hogy az Állam tulajdonság értéke NeedRegistration értékre van állítva.

    Get-AzDataFactoryV2IntegrationRuntime -name $integrationRuntimeName -ResourceGroupName $resourceGroupName -DataFactoryName $dataFactoryName -Status
    

    Itt látható a minta kimenete:

    State                     : NeedRegistration
    Version                   : 
    CreateTime                : 9/24/2019 6:00:00 AM
    AutoUpdate                : On
    ScheduledUpdateDate       : 
    UpdateDelayOffset         : 
    LocalTimeZoneOffset       : 
    InternalChannelEncryption : 
    Capabilities              : {}
    ServiceUrls               : {eu.frontend.clouddatahub.net}
    Nodes                     : {}
    Links                     : {}
    Name                      : ADFTutorialIR
    Type                      : SelfHosted
    ResourceGroupName         : <ResourceGroup name>
    DataFactoryName           : <DataFactory name>
    Description               : 
    Id                        : /subscriptions/<subscription ID>/resourceGroups/<ResourceGroup name>/providers/Microsoft.DataFactory/factories/<DataFactory name>/integrationruntimes/<Integration Runtime name>
    
  4. A saját üzemeltetésű integrációs modul felhőbeli Azure Data Factory szolgáltatással való regisztrálásához használt hitelesítési kulcsok lekéréséhez futtassa a következő parancsot:

    Get-AzDataFactoryV2IntegrationRuntimeKey -Name $integrationRuntimeName -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName | ConvertTo-Json
    

    Itt látható a minta kimenete:

    {
     "AuthKey1": "IR@0000000000-0000-0000-0000-000000000000@xy0@xy@xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx=",
     "AuthKey2":  "IR@0000000000-0000-0000-0000-000000000000@xy0@xy@yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy="
    }
    
  5. Másolja ki az önálló üzemeltetésű integrációs futtatókörnyezet regisztrálásához használt kulcsok egyikét (a kettős idézőjelek mellőzésével) a következő lépések során, miután telepítette azt a számítógépére.

Az integrációs modul telepítése

  1. Ha már rendelkezik az integrációs modullal a számítógépen, távolítsa el azt a Programok hozzáadása vagy eltávolítása funkcióval.

  2. Töltse le a helyileg üzemeltetett integrációs futtatókörnyezetet egy Windows gépre. Futassa a telepítést.

  3. Az Microsoft Integration Runtime telepítő lapon válassza a Tovább lehetőséget.

  4. A End-User Licencszerződés lapon fogadja el a feltételeket és a licencszerződést, és válassza a Tovább gombot.

  5. A Célmappa lapon válassza a Tovább gombot.

  6. A Microsoft Integration Runtime telepítésére kész oldalon válassza a Telepítés lehetőséget.

  7. A Microsoft Integration Runtime telepítésének befejezése lapon válassza a Befejezés lehetőséget.

  8. A Regisztrátor Integration Runtime (saját üzemeltetésű) lapon illessze be az előző szakaszban mentett kulcsot, és válassza a Register lehetőséget.

    Az integrációs modul regisztrálása

  9. A Új Integration Runtime (saját üzemeltetésű) csomópont lapon válassza a Finish lehetőséget.

  10. A saját üzemeltetésű integrációs modul sikeres regisztrálása után a következő üzenet jelenik meg:

    Sikeresen regisztrálva

  11. A Regisztráció Integration Runtime (saját üzemeltetésű) lapon válassza az Indítsa el a Konfigurációkezelő-t lehetőséget.

  12. Amikor a csomópont csatlakozik a felhőszolgáltatáshoz, a következő oldal jelenik meg:

    A csomópont csatlakoztatva van

  13. Most tesztelje a SQL Server adatbázishoz való kapcsolatot.

    Diagnosztikai fül

    a. A Konfigurációkezelő lapon lépjen a Diagnostics lapra.

    b. Válassza ki az SqlServert az adatforrás típusához.

    c. Adja meg a kiszolgáló nevét.

    d. Adja meg az adatbázis nevét.

    e. Válassza ki a hitelesítési módot.

    f. Adja meg a felhasználónevet.

    g. Adja meg a felhasználónévhez társított jelszót.

    h. Válassza a Test lehetőséget annak ellenőrzéséhez, hogy az integrációs modul képes-e csatlakozni SQL Server. Ha a kapcsolat sikeres, zöld pipa jelenik meg. Ha a kapcsolat nem sikerült, hibaüzenet jelenik meg. Javítsa ki a problémákat, és győződjön meg arról, hogy az integrációs modul képes csatlakozni SQL Server.

    Note

    Jegyezze fel a hitelesítési típus, a kiszolgáló, az adatbázis, a felhasználó és a jelszó értékeit. Ezeket az oktatóanyag későbbi részében használhatja.

Társított szolgáltatások létrehozása

Társított szolgáltatásokat hoz létre egy adat-előállítóban az adattárak és a számítási szolgáltatások adat-előállítóval történő társításához. Ebben a szakaszban társított szolgáltatásokat hoz létre az SQL Server-adatbázishoz és az Azure SQL Database-ben lévő adatbázishoz.

A SQL Server társított szolgáltatás létrehozása

Ebben a lépésben a SQL Server adatbázist az adat-előállítóhoz csatolja.

  1. Hozzon létre egy SqlServerLinkedService.json nevű JSON-fájlt a C:\ADFTutorials\IncCopyMultiTableTutorial mappában (hozza létre a helyi mappákat, ha még nem léteznek) a következő tartalommal. Válassza ki a megfelelő szakaszt a SQL Server való csatlakozáshoz használt hitelesítés alapján.

    Fontos

    Válassza ki a megfelelő szakaszt a SQL Server való csatlakozáshoz használt hitelesítés alapján.

    HA SQL-hitelesítést használ, másolja a következő JSON-definíciót:

     {
         "name":"SqlServerLinkedService",
         "properties":{
             "annotations":[
    
             ],
             "type":"SqlServer",
             "typeProperties":{
                 "connectionString":"integrated security=False;data source=<servername>;initial catalog=<database name>;user id=<username>;Password=<password>"
             },
             "connectVia":{
                 "referenceName":"<integration runtime name>",
                 "type":"IntegrationRuntimeReference"
             }
         }
     }
    

    Ha Windows authentication használ, másolja a következő JSON-definíciót:

     {
         "name":"SqlServerLinkedService",
         "properties":{
             "annotations":[
    
             ],
             "type":"SqlServer",
             "typeProperties":{
                 "connectionString":"integrated security=True;data source=<servername>;initial catalog=<database name>",
                 "userName":"<username> or <domain>\\<username>",
                 "password":{
                     "type":"SecureString",
                     "value":"<password>"
                 }
             },
             "connectVia":{
                 "referenceName":"<integration runtime name>",
                 "type":"IntegrationRuntimeReference"
             }
         }
     }
    

    Fontos

    • Válassza ki a megfelelő szakaszt a SQL Server való csatlakozáshoz használt hitelesítés alapján.
    • Cserélje le <az integrációs futtatási környezet nevét> a saját integrációs futtatási környezet nevére.
    • Cserélje le <servername>, <databasename>, <username> és <password> a SQL Server adatbázis értékeivel a fájl mentése előtt.
    • Ha perjel karaktert (\) kell használnia a felhasználói fiókjában vagy a kiszolgáló nevében, használja a feloldó karaktert (\). Például: mydomain\\myuser.
  2. A PowerShellben futtassa a következő parancsmagot a C:\ADFTutorials\IncCopyMultiTableTutorial mappára való váltáshoz.

    Set-Location 'C:\ADFTutorials\IncCopyMultiTableTutorial'
    
  3. Futtassa a Set-AzDataFactoryV2LinkedService parancsmagot az AzureStorageLinkedService társított szolgáltatás létrehozásához. A következő példában a ResourceGroupName és a DataFactoryName paraméter értékeit fogja megadni:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SqlServerLinkedService" -File ".\SqlServerLinkedService.json"
    

    Itt látható a minta kimenete:

    LinkedServiceName : SqlServerLinkedService
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Properties        : Microsoft.Azure.Management.DataFactory.Models.SqlServerLinkedService
    

Az SQL Database társított szolgáltatásának létrehozása

  1. Hozzon létre egy AzureSQLDatabaseLinkedService.json nevű JSON-fájlt a C:\ADFTutorials\IncCopyMultiTableTutorial mappában az alábbi tartalommal. (Hozza létre az ADF mappát, ha még nem létezik.) Cserélje le <servername>, <adatbázis neve>, <user neve> és <password> a fájl mentése előtt az SQL Server adatbázis nevét, felhasználónevét és jelszavát.

     {
         "name":"AzureSQLDatabaseLinkedService",
         "properties":{
             "annotations":[
    
             ],
             "type":"AzureSqlDatabase",
             "typeProperties":{
                 "connectionString":"integrated security=False;encrypt=True;connection timeout=30;data source=<servername>.database.windows.net;initial catalog=<database name>;user id=<user name>;Password=<password>;"
             }
         }
     }
    
  2. A PowerShellben futtassa a Set-AzDataFactoryV2LinkedService parancsmagot az AzureSQLDatabaseLinkedService társított szolgáltatás létrehozásához.

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureSQLDatabaseLinkedService" -File ".\AzureSQLDatabaseLinkedService.json"
    

    Itt látható a minta kimenete:

    LinkedServiceName : AzureSQLDatabaseLinkedService
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlDatabaseLinkedService
    

Adatkészletek létrehozása

Ebben a lépésben olyan adatkészleteket hoz létre, amelyek az adatforrást, az adatcélt és a vízjel tárolásának helyét jelölik.

Forrásadatkészlet létrehozása

  1. Hozzon létre egy SourceDataset.json nevű JSON-fájlt ugyanabban a mappában a következő tartalommal:

    {
         "name":"SourceDataset",
         "properties":{
             "linkedServiceName":{
                 "referenceName":"SqlServerLinkedService",
                 "type":"LinkedServiceReference"
             },
             "annotations":[
    
             ],
             "type":"SqlServerTable",
             "schema":[
    
             ]
         }
    }
    

    A folyamat "Copy tevékenység" egy SQL-lekérdezés használatával tölti be az adatokat, ahelyett hogy a teljes táblát töltené be.

  2. Futtassa a Set-AzDataFactoryV2Dataset parancsmagot a SourceDataset adatkészlet létrehozásához.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SourceDataset" -File ".\SourceDataset.json"
    

    Itt látható a parancsmag mintakimenete:

    DatasetName       : SourceDataset
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.SqlServerTableDataset
    

Adatkimenet adatkészlet létrehozása

  1. Hozzon létre egy SinkDataset.json nevű JSON-fájlt ugyanabban a mappában az alábbi tartalommal. A tableName elemet a folyamat futásidőben dinamikusan állítja be. A folyamat ForEach-tevékenysége végighalad a táblanevek listáján, és minden iterációban átadja a tábla nevét ennek az adatkészletnek.

     {
         "name":"SinkDataset",
         "properties":{
             "linkedServiceName":{
                 "referenceName":"AzureSQLDatabaseLinkedService",
                 "type":"LinkedServiceReference"
             },
             "parameters":{
                 "SinkTableName":{
                     "type":"String"
                 }
             },
             "annotations":[
    
             ],
             "type":"AzureSqlTable",
             "typeProperties":{
                 "tableName":{
                     "value":"@dataset().SinkTableName",
                     "type":"Expression"
                 }
             }
         }
     }
    
  2. Futtassa a Set-AzDataFactoryV2Dataset parancsmagot a SinkDataset adathalmaz létrehozásához.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SinkDataset" -File ".\SinkDataset.json"
    

    Itt látható a parancsmag mintakimenete:

    DatasetName       : SinkDataset
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
    

Adatkészlet létrehozása egy vízjelhez

Ebben a lépésben egy adatkészletet hozunk létre a felső küszöbértékek tárolására.

  1. Hozzon létre egy WatermarkDataset.json nevű JSON-fájlt ugyanabban a mappában a következő tartalommal:

     {
         "name": " WatermarkDataset ",
         "properties": {
             "type": "AzureSqlTable",
             "typeProperties": {
                 "tableName": "watermarktable"
             },
             "linkedServiceName": {
                 "referenceName": "AzureSQLDatabaseLinkedService",
                 "type": "LinkedServiceReference"
             }
         }
     }
    
  2. Futtassa a Set-AzDataFactoryV2Dataset parancsmagot a WatermarkDataset adathalmaz létrehozásához.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "WatermarkDataset" -File ".\WatermarkDataset.json"
    

    Itt látható a parancsmag mintakimenete:

    DatasetName       : WatermarkDataset
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
    

Folyamat létrehozása

A folyamat paraméterként a táblanevek listáját veszi fel. A ForEach-tevékenység végigvezeti a táblanevek listáján, és a következő műveleteket hajtja végre:

  1. A Keresési tevékenység használatával kérje le a régi vízjel értékét (a kezdeti értéket vagy az utolsó iterációban használt értéket).

  2. A Keresési tevékenység használatával kérje le az új vízjel értékét (a forrástábla vízjeloszlopának maximális értékét).

  3. A Copy tevékenység használatával adatokat másolhat a forrásadatbázisból a céladatbázisba a két vízjelérték között.

  4. A StoredProcedure tevékenység használatával frissítse a régi vízjel értékét, amelyet a következő iteráció első lépésében kell használni.

A csővezeték létrehozása

  1. Hozzon létre egy IncrementalCopyPipeline.json nevű JSON-fájlt ugyanabban a mappában a következő tartalommal:

     {
         "name":"IncrementalCopyPipeline",
         "properties":{
             "activities":[
                 {
                     "name":"IterateSQLTables",
                     "type":"ForEach",
                     "dependsOn":[
    
                     ],
                     "userProperties":[
    
                     ],
                     "typeProperties":{
                         "items":{
                             "value":"@pipeline().parameters.tableList",
                             "type":"Expression"
                         },
                         "isSequential":false,
                         "activities":[
                             {
                                 "name":"LookupOldWaterMarkActivity",
                                 "type":"Lookup",
                                 "dependsOn":[
    
                                 ],
                                 "policy":{
                                     "timeout":"7.00:00:00",
                                     "retry":0,
                                     "retryIntervalInSeconds":30,
                                     "secureOutput":false,
                                     "secureInput":false
                                 },
                                 "userProperties":[
    
                                 ],
                                 "typeProperties":{
                                     "source":{
                                         "type":"AzureSqlSource",
                                         "sqlReaderQuery":{
                                             "value":"select * from watermarktable where TableName  =  '@{item().TABLE_NAME}'",
                                             "type":"Expression"
                                         }
                                     },
                                     "dataset":{
                                         "referenceName":"WatermarkDataset",
                                         "type":"DatasetReference"
                                     }
                                 }
                             },
                             {
                                 "name":"LookupNewWaterMarkActivity",
                                 "type":"Lookup",
                                 "dependsOn":[
    
                                 ],
                                 "policy":{
                                     "timeout":"7.00:00:00",
                                     "retry":0,
                                     "retryIntervalInSeconds":30,
                                     "secureOutput":false,
                                     "secureInput":false
                                 },
                                 "userProperties":[
    
                                 ],
                                 "typeProperties":{
                                     "source":{
                                         "type":"SqlServerSource",
                                         "sqlReaderQuery":{
                                             "value":"select MAX(@{item().WaterMark_Column}) as NewWatermarkvalue from @{item().TABLE_NAME}",
                                             "type":"Expression"
                                         }
                                     },
                                     "dataset":{
                                         "referenceName":"SourceDataset",
                                         "type":"DatasetReference"
                                     },
                                     "firstRowOnly":true
                                 }
                             },
                             {
                                 "name":"IncrementalCopyActivity",
                                 "type":"Copy",
                                 "dependsOn":[
                                     {
                                         "activity":"LookupOldWaterMarkActivity",
                                         "dependencyConditions":[
                                             "Succeeded"
                                         ]
                                     },
                                     {
                                         "activity":"LookupNewWaterMarkActivity",
                                         "dependencyConditions":[
                                             "Succeeded"
                                         ]
                                     }
                                 ],
                                 "policy":{
                                     "timeout":"7.00:00:00",
                                     "retry":0,
                                     "retryIntervalInSeconds":30,
                                     "secureOutput":false,
                                     "secureInput":false
                                 },
                                 "userProperties":[
    
                                 ],
                                 "typeProperties":{
                                     "source":{
                                         "type":"SqlServerSource",
                                         "sqlReaderQuery":{
                                             "value":"select * from @{item().TABLE_NAME} where @{item().WaterMark_Column} > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and @{item().WaterMark_Column} <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'",
                                             "type":"Expression"
                                         }
                                     },
                                     "sink":{
                                         "type":"AzureSqlSink",
                                         "sqlWriterStoredProcedureName":{
                                             "value":"@{item().StoredProcedureNameForMergeOperation}",
                                             "type":"Expression"
                                         },
                                         "sqlWriterTableType":{
                                             "value":"@{item().TableType}",
                                             "type":"Expression"
                                         },
                                         "storedProcedureTableTypeParameterName":{
                                             "value":"@{item().TABLE_NAME}",
                                             "type":"Expression"
                                         },
                                         "disableMetricsCollection":false
                                     },
                                     "enableStaging":false
                                 },
                                 "inputs":[
                                     {
                                         "referenceName":"SourceDataset",
                                         "type":"DatasetReference"
                                     }
                                 ],
                                 "outputs":[
                                     {
                                         "referenceName":"SinkDataset",
                                         "type":"DatasetReference",
                                         "parameters":{
                                             "SinkTableName":{
                                                 "value":"@{item().TABLE_NAME}",
                                                 "type":"Expression"
                                             }
                                         }
                                     }
                                 ]
                             },
                             {
                                 "name":"StoredProceduretoWriteWatermarkActivity",
                                 "type":"SqlServerStoredProcedure",
                                 "dependsOn":[
                                     {
                                         "activity":"IncrementalCopyActivity",
                                         "dependencyConditions":[
                                             "Succeeded"
                                         ]
                                     }
                                 ],
                                 "policy":{
                                     "timeout":"7.00:00:00",
                                     "retry":0,
                                     "retryIntervalInSeconds":30,
                                     "secureOutput":false,
                                     "secureInput":false
                                 },
                                 "userProperties":[
    
                                 ],
                                 "typeProperties":{
                                     "storedProcedureName":"[dbo].[usp_write_watermark]",
                                     "storedProcedureParameters":{
                                         "LastModifiedtime":{
                                             "value":{
                                                 "value":"@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}",
                                                 "type":"Expression"
                                             },
                                             "type":"DateTime"
                                         },
                                         "TableName":{
                                             "value":{
                                                 "value":"@{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}",
                                                 "type":"Expression"
                                             },
                                             "type":"String"
                                         }
                                     }
                                 },
                                 "linkedServiceName":{
                                     "referenceName":"AzureSQLDatabaseLinkedService",
                                     "type":"LinkedServiceReference"
                                 }
                             }
                         ]
                     }
                 }
             ],
             "parameters":{
                 "tableList":{
                     "type":"array"
                 }
             },
             "annotations":[
    
             ]
         }
     }
    
  2. Futtassa a Set-AzDataFactoryV2Pipeline parancsmagot az "IncrementalCopyPipeline" kötet létrehozásához.

    Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "IncrementalCopyPipeline" -File ".\IncrementalCopyPipeline.json"
    

    Itt látható a minta kimenete:

     PipelineName      : IncrementalCopyPipeline
     ResourceGroupName : <ResourceGroupName>
     DataFactoryName   : <DataFactoryName>
     Activities        : {IterateSQLTables}
     Parameters        : {[tableList, Microsoft.Azure.Management.DataFactory.Models.ParameterSpecification]}
    

Csővezeték futtatása

  1. Hozzon létre egy Parameters.json nevű paraméterfájlt ugyanabban a mappában a következő tartalommal:

     {
         "tableList":
         [
             {
                 "TABLE_NAME": "customer_table",
                 "WaterMark_Column": "LastModifytime",
                 "TableType": "DataTypeforCustomerTable",
                 "StoredProcedureNameForMergeOperation": "usp_upsert_customer_table"
             },
             {
                 "TABLE_NAME": "project_table",
                 "WaterMark_Column": "Creationtime",
                 "TableType": "DataTypeforProjectTable",
                 "StoredProcedureNameForMergeOperation": "usp_upsert_project_table"
             }
         ]
     }
    
  2. Futtassa az IncrementalCopyPipeline folyamatot az Invoke-AzDataFactoryV2Pipeline parancsmaggal. Cserélje le a helyőrzőket a saját erőforráscsoportja és adat-előállítója nevére.

    $RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroup $resourceGroupName -dataFactoryName $dataFactoryName -ParameterFile ".\Parameters.json"
    

A folyamat monitorozása

  1. Jelentkezzen be a Azure portálra.

  2. Válassza a Minden szolgáltatás lehetőséget, keressen az Adat-előállítók kulcsszóval, és válassza az Adat-előállítók lehetőséget.

  3. Keresse meg az adat-előállítót az adat-előállítók listájában, és válassza ki a Data Factory lap megnyitásához.

  4. A Adat-előállító lapon válassza a Open lehetőséget a Open Azure Data Factory Studio csempén a Azure Data Factory külön lapon való elindításához.

  5. A Azure Data Factory kezdőlapon válassza a bal oldalon található Monitor lehetőséget.

    A képernyőkép az Azure Data Factory kezdőlapját mutatja.

  6. Láthatja az összes pipeline futást és állapotukat. Kérjük, vegye észre, hogy a következő példában a pipeline futtatásának állapota Sikeres. A folyamatnak átadott paraméterek ellenőrzéséhez válassza ki a hivatkozást a Paraméterek oszlopban. Hiba esetén megjelenik egy hivatkozás a Hiba oszlopban.

    Képernyőkép egy adatgyár csővezetékeinek futtatásairól, beleértve a te csővezetékeidet is.

  7. Amikor kiválasztja a hivatkozást a Műveletek oszlopban, megjelenik a folyamat összes tevékenységfuttatása.

  8. Ha vissza szeretne lépni a Folyamatfuttatások nézetre, válassza az Összes folyamatfuttatás lehetőséget.

Az eredmények áttekintése

A SQL Server Management Studio futtassa a következő lekérdezéseket a cél SQL-adatbázison annak ellenőrzéséhez, hogy az adatok át lettek-e másolva a forrástáblákból a céltáblákba:

Query

select * from customer_table

Output

===========================================
PersonID    Name    LastModifytime
===========================================
1            John    2017-09-01 00:56:00.000
2            Mike    2017-09-02 05:23:00.000
3            Alice    2017-09-03 02:36:00.000
4            Andy    2017-09-04 03:21:00.000
5            Anny    2017-09-05 08:06:00.000

Query

select * from project_table

Output

===================================
Project        Creationtime
===================================
project1    2015-01-01 00:00:00.000
project2    2016-02-02 01:23:00.000
project3    2017-03-04 05:16:00.000

Query

select * from watermarktable

Output

======================================
TableName        WatermarkValue
======================================
customer_table    2017-09-05 08:06:00.000
project_table    2017-03-04 05:16:00.000

Figyelje meg, hogy mindkét tábla vízjelértékeinek frissítése megtörtént.

További adatok hozzáadása a forrástáblákhoz

Futtassa a következő lekérdezést a forrás SQL Server adatbázison egy meglévő sor frissítésére a customer_table táblában. Szúrjon be egy új sort a project_table-táblába.

UPDATE customer_table
SET [LastModifytime] = '2017-09-08T00:00:00Z', [name]='NewName' where [PersonID] = 3

INSERT INTO project_table
(Project, Creationtime)
VALUES
('NewProject','10/1/2017 0:00:00 AM');

A folyamat újrafuttatása

  1. Most futtassa újra a folyamatot a következő PowerShell-parancs végrehajtásával:

    $RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroup $resourceGroupname -dataFactoryName $dataFactoryName -ParameterFile ".\Parameters.json"
    
  2. A folyamatfuttatások figyelése a folyamat figyelése szakasz utasításait követve. Ha a folyamat állapota folyamatban van, a Műveletek területen egy másik művelethivatkozás jelenik meg a folyamatfuttatás megszakításához.

  3. Válassza a Frissítés lehetőséget a lista frissítéséhez, amíg a folyamat futtatása sikeres nem lesz.

  4. Ha szeretné, válassza a Tevékenységfuttatások megtekintése hivatkozást a Műveletek területen a folyamatfuttatáshoz társított összes tevékenységfuttatás megtekintéséhez.

A végleges eredmények áttekintése

A SQL Server Management Studio futtassa a következő lekérdezéseket a céladatbázison annak ellenőrzéséhez, hogy a frissített/új adatok át lettek-e másolva a forrástáblákból a céltáblákba.

Query

select * from customer_table

Output

===========================================
PersonID    Name    LastModifytime
===========================================
1            John    2017-09-01 00:56:00.000
2            Mike    2017-09-02 05:23:00.000
3            NewName    2017-09-08 00:00:00.000
4            Andy    2017-09-04 03:21:00.000
5            Anny    2017-09-05 08:06:00.000

Figyelje meg a 3- os számhoz tartozó PersonIDnév és LastModifytime új értékeit.

Query

select * from project_table

Output

===================================
Project        Creationtime
===================================
project1    2015-01-01 00:00:00.000
project2    2016-02-02 01:23:00.000
project3    2017-03-04 05:16:00.000
NewProject    2017-10-01 00:00:00.000

Vegye észre, hogy a NewProject bejegyzést hozzáadták a project_table táblához.

Query

select * from watermarktable

Output

======================================
TableName        WatermarkValue
======================================
customer_table    2017-09-08 00:00:00.000
project_table    2017-10-01 00:00:00.000

Figyelje meg, hogy mindkét tábla vízjelértékeinek frissítése megtörtént.

Az oktatóanyagban az alábbi lépéseket hajtotta végre:

  • Forrás- és céladattárak előkészítése.
  • Adat-előállító létrehozása
  • Saját üzemeltetésű integrációs modul (IR) létrehozása.
  • Telepítse az integrációs futtatókörnyezetet.
  • Társított szolgáltatások létrehozása.
  • Forrás-, fogadó- és küszöbadatkészletek létrehozása.
  • Folyamat létrehozása, futtatása és figyelése.
  • Tekintse át az eredményeket.
  • Adatok hozzáadása vagy frissítése a forrástáblákban.
  • Futtassa újra és figyelje a csővezetéket.
  • Tekintse át a végső eredményeket.

Folytassa a következő oktatóanyaggal, amelyből megtudhatja, hogyan alakíthat át adatokat egy Spark-fürt használatával az Azure-en.