Megjegyzés
Az oldalhoz való hozzáféréshez engedély szükséges. Megpróbálhat bejelentkezni vagy módosítani a címtárat.
Az oldalhoz való hozzáféréshez engedély szükséges. Megpróbálhatja módosítani a címtárat.
Vonatkozik:
Azure Data Factory
Azure Synapse Analytics
Tipp
Ebben az oktatóanyagban létrehoz egy Azure Data Factory-t egy folyamattal, amely az SQL Server adatbázis több táblájából tölti be a változásokat az Azure SQL Database-be.
Az oktatóanyagban az alábbi lépéseket fogja végrehajtani:
- Forrás- és céladattárak előkészítése.
- Adat-előállító létrehozása
- Saját üzemeltetésű integrációs modul létrehozása.
- Telepítse az integrációs futtatókörnyezetet.
- Társított szolgáltatások létrehozása.
- Forrás-, fogadó- és küszöbadatkészletek létrehozása.
- Folyamat létrehozása, futtatása és figyelése.
- Tekintse át az eredményeket.
- Adatok hozzáadása vagy frissítése a forrástáblákban.
- Futtassa újra és figyelje a csővezetéket.
- Tekintse át a végső eredményeket.
Overview
Az alábbiak a megoldás kialakításának leglényegesebb lépései:
Válassza ki a vízjel oszlopot.
Jelöljön ki egy oszlopot a forrásadattár minden táblájához, amely minden futtatás új vagy frissített rekordjait azonosíthatja. Normális esetben az ebben a kiválasztott oszlopban (például: last_modify_time vagy ID) lévő adatok a sorok létrehozásával vagy frissítésével folyamatosan növekednek. Az ebben az oszlopban lévő legnagyobb érték szolgál a küszöbként.
Egy adatraktár előkészítése a küszöbértékek tárolására.
Ebben az oktatóanyagban a küszöbértékeket egy SQL-adatbázisban tároljuk.
Hozzon létre egy folyamatot a következő tevékenységekkel:
Hozzon létre egy ForEach-tevékenységet, amely a folyamat paramétereként átadott forrástáblanevek listáján keresztül halad végig. Az egyes forrástáblák esetében az alábbi tevékenységek kerülnek meghívásra a tábla delta betöltésének végrehajtására.
Hozzon létre két keresési tevékenységet. Az első lekérdezési tevékenység az utolsó vízjel értéket kéri le. Használja a második lekérdezési tevékenységet az új vízjel érték lekéréséhez. Ezeket a vízjelértékeket a Copy tevékenység-nek adják át.
Hozzon létre egy Másolási tevékenységet, amely a forrásadattárból másolja azokat a sorokat, amelyeknek a vízjol oszlopának értéke nagyobb a régi vízjel értékénél, és kisebb vagy egyenlő az új vízjel értékével. Ezután átmásolja a deltaadatokat a forrásadattárból új fájlként Azure Blob Storage-ba.
Egy StoredProcedure tevékenység létrehozása, amely frissíti a küszöbértékeket a folyamat következő futtatásához.
Itt látható a megoldás összefoglaló jellegű ábrája:
Ha nem rendelkezik Azure-előfizetéssel, a kezdés előtt hozzon létre egy free fiókot.
Prerequisites
- SQL Server. Ebben az oktatóanyagban egy SQL Server adatbázist használ forrásadattárként.
- Azure SQL Database. Adatbázist használ a Azure SQL Database fogadóadattárként. Ha még nincs SQL-adatbázisa, tekintse meg a Adatbázis létrehozása az Azure SQL Database-ben, hogy megtekintse a létrehozás lépéseit.
Forrástáblák létrehozása a SQL Server-adatbázisban
Nyissa meg SQL Server Management Studio (SSMS) vagy Visual Studio Code, és csatlakozzon a SQL Server-adatbázishoz.
A Kiszolgálókezelő (SSMS) vagy a Kapcsolódások panelen (Visual Studio Code) kattintson a jobb gombbal az adatbázisra, és válassza a Új lekérdezés lehetőséget.
Futtassa a következő SQL-parancsot az adatbázison a következő nevű táblák
customer_tableproject_tablelétrehozásához:create table customer_table ( PersonID int, Name varchar(255), LastModifytime datetime ); create table project_table ( Project varchar(255), Creationtime datetime ); INSERT INTO customer_table (PersonID, Name, LastModifytime) VALUES (1, 'John','9/1/2017 12:56:00 AM'), (2, 'Mike','9/2/2017 5:23:00 AM'), (3, 'Alice','9/3/2017 2:36:00 AM'), (4, 'Andy','9/4/2017 3:21:00 AM'), (5, 'Anny','9/5/2017 8:06:00 AM'); INSERT INTO project_table (Project, Creationtime) VALUES ('project1','1/1/2015 0:00:00 AM'), ('project2','2/2/2016 1:23:00 AM'), ('project3','3/4/2017 5:16:00 AM');
Céltáblák létrehozása a Azure SQL Database
Nyissa meg SQL Server Management Studio (SSMS) vagy Visual Studio Code, és csatlakozzon a SQL Server-adatbázishoz.
A Kiszolgálókezelő (SSMS) vagy a Kapcsolódások panelen (Visual Studio Code) kattintson a jobb gombbal az adatbázisra, és válassza a Új lekérdezés lehetőséget.
Futtassa a következő SQL-parancsot az adatbázison a következő nevű táblák
customer_tableproject_tablelétrehozásához:create table customer_table ( PersonID int, Name varchar(255), LastModifytime datetime ); create table project_table ( Project varchar(255), Creationtime datetime );
Hozzon létre egy másik táblát az Azure SQL-adatbázisban a magas vízjel értékének tárolásához.
Futtassa a következő SQL-parancsot az adatbázisán egy
watermarktablenevű tábla létrehozásához, amely a vízjel értékét fogja tárolni:create table watermarktable ( TableName varchar(255), WatermarkValue datetime, );Szúrja be mindkét forrástábla kezdeti vízjelértékét a vízjeltáblába.
INSERT INTO watermarktable VALUES ('customer_table','1/1/2010 12:00:00 AM'), ('project_table','1/1/2010 12:00:00 AM');
Tárolt eljárás létrehozása a Azure SQL Database
Futtassa a következő parancsot egy tárolt eljárás létrehozásához az adatbázisban. Ez a tárolt eljárás minden csővezeték futtatása után frissíti a vízjel értékét.
CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS
BEGIN
UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName
END
Azure SQL Database adattípusok és további tárolt eljárások létrehozása
Futtassa a következő lekérdezést két tárolt eljárás és két adattípus létrehozásához az adatbázisban. A forrástáblákból származó adatok céltáblákba való egyesítése.
Annak érdekében, hogy az út könnyen kezdődjön, közvetlenül ezeket a tárolt eljárásokat használjuk, amelyek egy táblaváltozón keresztül adják át a deltaadatokat, majd egyesítjük őket a céltárolóba. Legyen óvatos, nem javasolt, hogy "nagy" számú (100-nál több) delta sort tároljanak a táblaváltozóban.
Ha nagy számú deltasort kell egyesítenie a céltárolóba, javasoljuk, hogy másolási tevékenység használatával másolja az összes delta-adatot egy ideiglenes "átmeneti" táblába a céltárolóban, majd a táblaváltozó használata nélkül saját tárolt eljárást épített ki az "előkészítési" táblából a "végleges" táblába való egyesítéshez.
CREATE TYPE DataTypeforCustomerTable AS TABLE(
PersonID int,
Name varchar(255),
LastModifytime datetime
);
GO
CREATE PROCEDURE usp_upsert_customer_table @customer_table DataTypeforCustomerTable READONLY
AS
BEGIN
MERGE customer_table AS target
USING @customer_table AS source
ON (target.PersonID = source.PersonID)
WHEN MATCHED THEN
UPDATE SET Name = source.Name,LastModifytime = source.LastModifytime
WHEN NOT MATCHED THEN
INSERT (PersonID, Name, LastModifytime)
VALUES (source.PersonID, source.Name, source.LastModifytime);
END
GO
CREATE TYPE DataTypeforProjectTable AS TABLE(
Project varchar(255),
Creationtime datetime
);
GO
CREATE PROCEDURE usp_upsert_project_table @project_table DataTypeforProjectTable READONLY
AS
BEGIN
MERGE project_table AS target
USING @project_table AS source
ON (target.Project = source.Project)
WHEN MATCHED THEN
UPDATE SET Creationtime = source.Creationtime
WHEN NOT MATCHED THEN
INSERT (Project, Creationtime)
VALUES (source.Project, source.Creationtime);
END
Azure PowerShell
Telepítse a legújabb Azure PowerShell modulokat a Installban található utasításokat követve, és konfigurálja a Azure PowerShell.
Adat-előállító létrehozása
Adjon meg egy olyan változót, amelyet később a PowerShell-parancsokban az erőforráscsoport neveként fog használni. Másolja a következő parancsszöveget a PowerShellbe, adja meg a Azure erőforráscsoport nevét dupla idézőjelek között, majd futtassa a parancsot. Például:
"adfrg".$resourceGroupName = "ADFTutorialResourceGroup";Ha az erőforráscsoport már létezik, előfordulhat, hogy nem kívánja felülírni. Rendeljen egy másik értéket a
$resourceGroupNameváltozóhoz, majd futtassa újra a parancsot.Adjon meg egy változót az adat-előállító helyéhez.
$location = "East US"A Azure erőforráscsoport létrehozásához futtassa a következő parancsot:
New-AzResourceGroup $resourceGroupName $locationHa az erőforráscsoport már létezik, előfordulhat, hogy nem kívánja felülírni. Rendeljen egy másik értéket a
$resourceGroupNameváltozóhoz, majd futtassa újra a parancsot.Adjon meg egy változót az adat-előállító nevéhez.
Fontos
Frissítse az adat-előállító nevét, hogy globálisan egyedi legyen. Ilyen például az ADFIncMultiCopyTutorialFactorySP1127.
$dataFactoryName = "ADFIncMultiCopyTutorialFactory";Az adat-előállító létrehozásához futtassa a következő Set-AzDataFactoryV2 parancsmagot:
Set-AzDataFactoryV2 -ResourceGroupName $resourceGroupName -Location $location -Name $dataFactoryName
Jegyezze fel a következő pontokat:
Az adat-előállító nevének globálisan egyedinek kell lennie. Ha a következő hibaüzenetet kapja, módosítsa a nevet, majd próbálkozzon újra:
Set-AzDataFactoryV2 : HTTP Status Code: Conflict Error Code: DataFactoryNameInUse Error Message: The specified resource name 'ADFIncMultiCopyTutorialFactory' is already in use. Resource names must be globally unique.Data Factory-példányok létrehozásához a Azure való bejelentkezéshez használt felhasználói fióknak közreműködői vagy tulajdonosi szerepkörök tagjának vagy a Azure-előfizetés rendszergazdájának kell lennie.
Azoknak a Azure régióknak a listájához, amelyekben a Data Factory jelenleg elérhető, jelölje ki az Önt érdeklő régiókat az alábbi lapon, majd bontsa ki a Analytics elemet a Data Factory: Régiók szerint elérhető termékek. Az adat-előállító által használt adattárak (Azure Storage, SQL Database, SQL Managed Instance stb.) és számításai (Azure HDInsight stb.) más régiókban is lehetnek.
Saját üzemeltetésű integrációs modul létrehozása
Ebben a szakaszban létrehoz egy saját üzemeltetésű integrációs modult, és társítja azt egy helyszíni géppel a SQL Server adatbázishoz. A saját üzemeltetésű integrációs modul az az összetevő, amely adatokat másol a gépéről a SQL Serverről az Azure SQL Database-be.
Hozzon létre egy változót az integrációs modul nevéhez. Használjon egyedi nevet, és jegyezze fel. Ezt az oktatóanyag későbbi részében fogja használni.
$integrationRuntimeName = "ADFTutorialIR"Saját üzemeltetésű integrációs modul létrehozása.
Set-AzDataFactoryV2IntegrationRuntime -Name $integrationRuntimeName -Type SelfHosted -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupNameItt látható a minta kimenete:
Name : <Integration Runtime name> Type : SelfHosted ResourceGroupName : <ResourceGroupName> DataFactoryName : <DataFactoryName> Description : Id : /subscriptions/<subscription ID>/resourceGroups/<ResourceGroupName>/providers/Microsoft.DataFactory/factories/<DataFactoryName>/integrationruntimes/ADFTutorialIRA létrehozott integrációs modul állapotának lekéréséhez futtassa a következő parancsot. Győződjön meg arról, hogy az Állam tulajdonság értéke NeedRegistration értékre van állítva.
Get-AzDataFactoryV2IntegrationRuntime -name $integrationRuntimeName -ResourceGroupName $resourceGroupName -DataFactoryName $dataFactoryName -StatusItt látható a minta kimenete:
State : NeedRegistration Version : CreateTime : 9/24/2019 6:00:00 AM AutoUpdate : On ScheduledUpdateDate : UpdateDelayOffset : LocalTimeZoneOffset : InternalChannelEncryption : Capabilities : {} ServiceUrls : {eu.frontend.clouddatahub.net} Nodes : {} Links : {} Name : ADFTutorialIR Type : SelfHosted ResourceGroupName : <ResourceGroup name> DataFactoryName : <DataFactory name> Description : Id : /subscriptions/<subscription ID>/resourceGroups/<ResourceGroup name>/providers/Microsoft.DataFactory/factories/<DataFactory name>/integrationruntimes/<Integration Runtime name>A saját üzemeltetésű integrációs modul felhőbeli Azure Data Factory szolgáltatással való regisztrálásához használt hitelesítési kulcsok lekéréséhez futtassa a következő parancsot:
Get-AzDataFactoryV2IntegrationRuntimeKey -Name $integrationRuntimeName -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName | ConvertTo-JsonItt látható a minta kimenete:
{ "AuthKey1": "IR@0000000000-0000-0000-0000-000000000000@xy0@xy@xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx=", "AuthKey2": "IR@0000000000-0000-0000-0000-000000000000@xy0@xy@yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy=" }Másolja ki az önálló üzemeltetésű integrációs futtatókörnyezet regisztrálásához használt kulcsok egyikét (a kettős idézőjelek mellőzésével) a következő lépések során, miután telepítette azt a számítógépére.
Az integrációs modul telepítése
Ha már rendelkezik az integrációs modullal a számítógépen, távolítsa el azt a Programok hozzáadása vagy eltávolítása funkcióval.
Töltse le a helyileg üzemeltetett integrációs futtatókörnyezetet egy Windows gépre. Futassa a telepítést.
Az Microsoft Integration Runtime telepítő lapon válassza a Tovább lehetőséget.
A End-User Licencszerződés lapon fogadja el a feltételeket és a licencszerződést, és válassza a Tovább gombot.
A Célmappa lapon válassza a Tovább gombot.
A Microsoft Integration Runtime telepítésére kész oldalon válassza a Telepítés lehetőséget.
A Microsoft Integration Runtime telepítésének befejezése lapon válassza a Befejezés lehetőséget.
A Regisztrátor Integration Runtime (saját üzemeltetésű) lapon illessze be az előző szakaszban mentett kulcsot, és válassza a Register lehetőséget.
A Új Integration Runtime (saját üzemeltetésű) csomópont lapon válassza a Finish lehetőséget.
A saját üzemeltetésű integrációs modul sikeres regisztrálása után a következő üzenet jelenik meg:
A Regisztráció Integration Runtime (saját üzemeltetésű) lapon válassza az Indítsa el a Konfigurációkezelő-t lehetőséget.
Amikor a csomópont csatlakozik a felhőszolgáltatáshoz, a következő oldal jelenik meg:
Most tesztelje a SQL Server adatbázishoz való kapcsolatot.
a. A Konfigurációkezelő lapon lépjen a Diagnostics lapra.
b. Válassza ki az SqlServert az adatforrás típusához.
c. Adja meg a kiszolgáló nevét.
d. Adja meg az adatbázis nevét.
e. Válassza ki a hitelesítési módot.
f. Adja meg a felhasználónevet.
g. Adja meg a felhasználónévhez társított jelszót.
h. Válassza a Test lehetőséget annak ellenőrzéséhez, hogy az integrációs modul képes-e csatlakozni SQL Server. Ha a kapcsolat sikeres, zöld pipa jelenik meg. Ha a kapcsolat nem sikerült, hibaüzenet jelenik meg. Javítsa ki a problémákat, és győződjön meg arról, hogy az integrációs modul képes csatlakozni SQL Server.
Note
Jegyezze fel a hitelesítési típus, a kiszolgáló, az adatbázis, a felhasználó és a jelszó értékeit. Ezeket az oktatóanyag későbbi részében használhatja.
Társított szolgáltatások létrehozása
Társított szolgáltatásokat hoz létre egy adat-előállítóban az adattárak és a számítási szolgáltatások adat-előállítóval történő társításához. Ebben a szakaszban társított szolgáltatásokat hoz létre az SQL Server-adatbázishoz és az Azure SQL Database-ben lévő adatbázishoz.
A SQL Server társított szolgáltatás létrehozása
Ebben a lépésben a SQL Server adatbázist az adat-előállítóhoz csatolja.
Hozzon létre egy SqlServerLinkedService.json nevű JSON-fájlt a C:\ADFTutorials\IncCopyMultiTableTutorial mappában (hozza létre a helyi mappákat, ha még nem léteznek) a következő tartalommal. Válassza ki a megfelelő szakaszt a SQL Server való csatlakozáshoz használt hitelesítés alapján.
Fontos
Válassza ki a megfelelő szakaszt a SQL Server való csatlakozáshoz használt hitelesítés alapján.
HA SQL-hitelesítést használ, másolja a következő JSON-definíciót:
{ "name":"SqlServerLinkedService", "properties":{ "annotations":[ ], "type":"SqlServer", "typeProperties":{ "connectionString":"integrated security=False;data source=<servername>;initial catalog=<database name>;user id=<username>;Password=<password>" }, "connectVia":{ "referenceName":"<integration runtime name>", "type":"IntegrationRuntimeReference" } } }Ha Windows authentication használ, másolja a következő JSON-definíciót:
{ "name":"SqlServerLinkedService", "properties":{ "annotations":[ ], "type":"SqlServer", "typeProperties":{ "connectionString":"integrated security=True;data source=<servername>;initial catalog=<database name>", "userName":"<username> or <domain>\\<username>", "password":{ "type":"SecureString", "value":"<password>" } }, "connectVia":{ "referenceName":"<integration runtime name>", "type":"IntegrationRuntimeReference" } } }Fontos
- Válassza ki a megfelelő szakaszt a SQL Server való csatlakozáshoz használt hitelesítés alapján.
- Cserélje le <az integrációs futtatási környezet nevét> a saját integrációs futtatási környezet nevére.
- Cserélje le <servername>, <databasename>, <username> és <password> a SQL Server adatbázis értékeivel a fájl mentése előtt.
- Ha perjel karaktert (
\) kell használnia a felhasználói fiókjában vagy a kiszolgáló nevében, használja a feloldó karaktert (\). Például:mydomain\\myuser.
A PowerShellben futtassa a következő parancsmagot a C:\ADFTutorials\IncCopyMultiTableTutorial mappára való váltáshoz.
Set-Location 'C:\ADFTutorials\IncCopyMultiTableTutorial'Futtassa a Set-AzDataFactoryV2LinkedService parancsmagot az AzureStorageLinkedService társított szolgáltatás létrehozásához. A következő példában a ResourceGroupName és a DataFactoryName paraméter értékeit fogja megadni:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SqlServerLinkedService" -File ".\SqlServerLinkedService.json"Itt látható a minta kimenete:
LinkedServiceName : SqlServerLinkedService ResourceGroupName : <ResourceGroupName> DataFactoryName : <DataFactoryName> Properties : Microsoft.Azure.Management.DataFactory.Models.SqlServerLinkedService
Az SQL Database társított szolgáltatásának létrehozása
Hozzon létre egy AzureSQLDatabaseLinkedService.json nevű JSON-fájlt a C:\ADFTutorials\IncCopyMultiTableTutorial mappában az alábbi tartalommal. (Hozza létre az ADF mappát, ha még nem létezik.) Cserélje le <servername>, <adatbázis neve>, <user neve> és <password> a fájl mentése előtt az SQL Server adatbázis nevét, felhasználónevét és jelszavát.
{ "name":"AzureSQLDatabaseLinkedService", "properties":{ "annotations":[ ], "type":"AzureSqlDatabase", "typeProperties":{ "connectionString":"integrated security=False;encrypt=True;connection timeout=30;data source=<servername>.database.windows.net;initial catalog=<database name>;user id=<user name>;Password=<password>;" } } }A PowerShellben futtassa a Set-AzDataFactoryV2LinkedService parancsmagot az AzureSQLDatabaseLinkedService társított szolgáltatás létrehozásához.
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureSQLDatabaseLinkedService" -File ".\AzureSQLDatabaseLinkedService.json"Itt látható a minta kimenete:
LinkedServiceName : AzureSQLDatabaseLinkedService ResourceGroupName : <ResourceGroupName> DataFactoryName : <DataFactoryName> Properties : Microsoft.Azure.Management.DataFactory.Models.AzureSqlDatabaseLinkedService
Adatkészletek létrehozása
Ebben a lépésben olyan adatkészleteket hoz létre, amelyek az adatforrást, az adatcélt és a vízjel tárolásának helyét jelölik.
Forrásadatkészlet létrehozása
Hozzon létre egy SourceDataset.json nevű JSON-fájlt ugyanabban a mappában a következő tartalommal:
{ "name":"SourceDataset", "properties":{ "linkedServiceName":{ "referenceName":"SqlServerLinkedService", "type":"LinkedServiceReference" }, "annotations":[ ], "type":"SqlServerTable", "schema":[ ] } }A folyamat "Copy tevékenység" egy SQL-lekérdezés használatával tölti be az adatokat, ahelyett hogy a teljes táblát töltené be.
Futtassa a Set-AzDataFactoryV2Dataset parancsmagot a SourceDataset adatkészlet létrehozásához.
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SourceDataset" -File ".\SourceDataset.json"Itt látható a parancsmag mintakimenete:
DatasetName : SourceDataset ResourceGroupName : <ResourceGroupName> DataFactoryName : <DataFactoryName> Structure : Properties : Microsoft.Azure.Management.DataFactory.Models.SqlServerTableDataset
Adatkimenet adatkészlet létrehozása
Hozzon létre egy SinkDataset.json nevű JSON-fájlt ugyanabban a mappában az alábbi tartalommal. A tableName elemet a folyamat futásidőben dinamikusan állítja be. A folyamat ForEach-tevékenysége végighalad a táblanevek listáján, és minden iterációban átadja a tábla nevét ennek az adatkészletnek.
{ "name":"SinkDataset", "properties":{ "linkedServiceName":{ "referenceName":"AzureSQLDatabaseLinkedService", "type":"LinkedServiceReference" }, "parameters":{ "SinkTableName":{ "type":"String" } }, "annotations":[ ], "type":"AzureSqlTable", "typeProperties":{ "tableName":{ "value":"@dataset().SinkTableName", "type":"Expression" } } } }Futtassa a Set-AzDataFactoryV2Dataset parancsmagot a SinkDataset adathalmaz létrehozásához.
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SinkDataset" -File ".\SinkDataset.json"Itt látható a parancsmag mintakimenete:
DatasetName : SinkDataset ResourceGroupName : <ResourceGroupName> DataFactoryName : <DataFactoryName> Structure : Properties : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
Adatkészlet létrehozása egy vízjelhez
Ebben a lépésben egy adatkészletet hozunk létre a felső küszöbértékek tárolására.
Hozzon létre egy WatermarkDataset.json nevű JSON-fájlt ugyanabban a mappában a következő tartalommal:
{ "name": " WatermarkDataset ", "properties": { "type": "AzureSqlTable", "typeProperties": { "tableName": "watermarktable" }, "linkedServiceName": { "referenceName": "AzureSQLDatabaseLinkedService", "type": "LinkedServiceReference" } } }Futtassa a Set-AzDataFactoryV2Dataset parancsmagot a WatermarkDataset adathalmaz létrehozásához.
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "WatermarkDataset" -File ".\WatermarkDataset.json"Itt látható a parancsmag mintakimenete:
DatasetName : WatermarkDataset ResourceGroupName : <ResourceGroupName> DataFactoryName : <DataFactoryName> Structure : Properties : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
Folyamat létrehozása
A folyamat paraméterként a táblanevek listáját veszi fel. A ForEach-tevékenység végigvezeti a táblanevek listáján, és a következő műveleteket hajtja végre:
A Keresési tevékenység használatával kérje le a régi vízjel értékét (a kezdeti értéket vagy az utolsó iterációban használt értéket).
A Keresési tevékenység használatával kérje le az új vízjel értékét (a forrástábla vízjeloszlopának maximális értékét).
A Copy tevékenység használatával adatokat másolhat a forrásadatbázisból a céladatbázisba a két vízjelérték között.
A StoredProcedure tevékenység használatával frissítse a régi vízjel értékét, amelyet a következő iteráció első lépésében kell használni.
A csővezeték létrehozása
Hozzon létre egy IncrementalCopyPipeline.json nevű JSON-fájlt ugyanabban a mappában a következő tartalommal:
{ "name":"IncrementalCopyPipeline", "properties":{ "activities":[ { "name":"IterateSQLTables", "type":"ForEach", "dependsOn":[ ], "userProperties":[ ], "typeProperties":{ "items":{ "value":"@pipeline().parameters.tableList", "type":"Expression" }, "isSequential":false, "activities":[ { "name":"LookupOldWaterMarkActivity", "type":"Lookup", "dependsOn":[ ], "policy":{ "timeout":"7.00:00:00", "retry":0, "retryIntervalInSeconds":30, "secureOutput":false, "secureInput":false }, "userProperties":[ ], "typeProperties":{ "source":{ "type":"AzureSqlSource", "sqlReaderQuery":{ "value":"select * from watermarktable where TableName = '@{item().TABLE_NAME}'", "type":"Expression" } }, "dataset":{ "referenceName":"WatermarkDataset", "type":"DatasetReference" } } }, { "name":"LookupNewWaterMarkActivity", "type":"Lookup", "dependsOn":[ ], "policy":{ "timeout":"7.00:00:00", "retry":0, "retryIntervalInSeconds":30, "secureOutput":false, "secureInput":false }, "userProperties":[ ], "typeProperties":{ "source":{ "type":"SqlServerSource", "sqlReaderQuery":{ "value":"select MAX(@{item().WaterMark_Column}) as NewWatermarkvalue from @{item().TABLE_NAME}", "type":"Expression" } }, "dataset":{ "referenceName":"SourceDataset", "type":"DatasetReference" }, "firstRowOnly":true } }, { "name":"IncrementalCopyActivity", "type":"Copy", "dependsOn":[ { "activity":"LookupOldWaterMarkActivity", "dependencyConditions":[ "Succeeded" ] }, { "activity":"LookupNewWaterMarkActivity", "dependencyConditions":[ "Succeeded" ] } ], "policy":{ "timeout":"7.00:00:00", "retry":0, "retryIntervalInSeconds":30, "secureOutput":false, "secureInput":false }, "userProperties":[ ], "typeProperties":{ "source":{ "type":"SqlServerSource", "sqlReaderQuery":{ "value":"select * from @{item().TABLE_NAME} where @{item().WaterMark_Column} > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and @{item().WaterMark_Column} <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'", "type":"Expression" } }, "sink":{ "type":"AzureSqlSink", "sqlWriterStoredProcedureName":{ "value":"@{item().StoredProcedureNameForMergeOperation}", "type":"Expression" }, "sqlWriterTableType":{ "value":"@{item().TableType}", "type":"Expression" }, "storedProcedureTableTypeParameterName":{ "value":"@{item().TABLE_NAME}", "type":"Expression" }, "disableMetricsCollection":false }, "enableStaging":false }, "inputs":[ { "referenceName":"SourceDataset", "type":"DatasetReference" } ], "outputs":[ { "referenceName":"SinkDataset", "type":"DatasetReference", "parameters":{ "SinkTableName":{ "value":"@{item().TABLE_NAME}", "type":"Expression" } } } ] }, { "name":"StoredProceduretoWriteWatermarkActivity", "type":"SqlServerStoredProcedure", "dependsOn":[ { "activity":"IncrementalCopyActivity", "dependencyConditions":[ "Succeeded" ] } ], "policy":{ "timeout":"7.00:00:00", "retry":0, "retryIntervalInSeconds":30, "secureOutput":false, "secureInput":false }, "userProperties":[ ], "typeProperties":{ "storedProcedureName":"[dbo].[usp_write_watermark]", "storedProcedureParameters":{ "LastModifiedtime":{ "value":{ "value":"@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}", "type":"Expression" }, "type":"DateTime" }, "TableName":{ "value":{ "value":"@{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}", "type":"Expression" }, "type":"String" } } }, "linkedServiceName":{ "referenceName":"AzureSQLDatabaseLinkedService", "type":"LinkedServiceReference" } } ] } } ], "parameters":{ "tableList":{ "type":"array" } }, "annotations":[ ] } }Futtassa a Set-AzDataFactoryV2Pipeline parancsmagot az "IncrementalCopyPipeline" kötet létrehozásához.
Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "IncrementalCopyPipeline" -File ".\IncrementalCopyPipeline.json"Itt látható a minta kimenete:
PipelineName : IncrementalCopyPipeline ResourceGroupName : <ResourceGroupName> DataFactoryName : <DataFactoryName> Activities : {IterateSQLTables} Parameters : {[tableList, Microsoft.Azure.Management.DataFactory.Models.ParameterSpecification]}
Csővezeték futtatása
Hozzon létre egy Parameters.json nevű paraméterfájlt ugyanabban a mappában a következő tartalommal:
{ "tableList": [ { "TABLE_NAME": "customer_table", "WaterMark_Column": "LastModifytime", "TableType": "DataTypeforCustomerTable", "StoredProcedureNameForMergeOperation": "usp_upsert_customer_table" }, { "TABLE_NAME": "project_table", "WaterMark_Column": "Creationtime", "TableType": "DataTypeforProjectTable", "StoredProcedureNameForMergeOperation": "usp_upsert_project_table" } ] }Futtassa az IncrementalCopyPipeline folyamatot az Invoke-AzDataFactoryV2Pipeline parancsmaggal. Cserélje le a helyőrzőket a saját erőforráscsoportja és adat-előállítója nevére.
$RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroup $resourceGroupName -dataFactoryName $dataFactoryName -ParameterFile ".\Parameters.json"
A folyamat monitorozása
Jelentkezzen be a Azure portálra.
Válassza a Minden szolgáltatás lehetőséget, keressen az Adat-előállítók kulcsszóval, és válassza az Adat-előállítók lehetőséget.
Keresse meg az adat-előállítót az adat-előállítók listájában, és válassza ki a Data Factory lap megnyitásához.
A Adat-előállító lapon válassza a Open lehetőséget a Open Azure Data Factory Studio csempén a Azure Data Factory külön lapon való elindításához.
A Azure Data Factory kezdőlapon válassza a bal oldalon található Monitor lehetőséget.
A képernyőkép az Azure Data Factory kezdőlapját mutatja. Láthatja az összes pipeline futást és állapotukat. Kérjük, vegye észre, hogy a következő példában a pipeline futtatásának állapota Sikeres. A folyamatnak átadott paraméterek ellenőrzéséhez válassza ki a hivatkozást a Paraméterek oszlopban. Hiba esetén megjelenik egy hivatkozás a Hiba oszlopban.
Amikor kiválasztja a hivatkozást a Műveletek oszlopban, megjelenik a folyamat összes tevékenységfuttatása.
Ha vissza szeretne lépni a Folyamatfuttatások nézetre, válassza az Összes folyamatfuttatás lehetőséget.
Az eredmények áttekintése
A SQL Server Management Studio futtassa a következő lekérdezéseket a cél SQL-adatbázison annak ellenőrzéséhez, hogy az adatok át lettek-e másolva a forrástáblákból a céltáblákba:
Query
select * from customer_table
Output
===========================================
PersonID Name LastModifytime
===========================================
1 John 2017-09-01 00:56:00.000
2 Mike 2017-09-02 05:23:00.000
3 Alice 2017-09-03 02:36:00.000
4 Andy 2017-09-04 03:21:00.000
5 Anny 2017-09-05 08:06:00.000
Query
select * from project_table
Output
===================================
Project Creationtime
===================================
project1 2015-01-01 00:00:00.000
project2 2016-02-02 01:23:00.000
project3 2017-03-04 05:16:00.000
Query
select * from watermarktable
Output
======================================
TableName WatermarkValue
======================================
customer_table 2017-09-05 08:06:00.000
project_table 2017-03-04 05:16:00.000
Figyelje meg, hogy mindkét tábla vízjelértékeinek frissítése megtörtént.
További adatok hozzáadása a forrástáblákhoz
Futtassa a következő lekérdezést a forrás SQL Server adatbázison egy meglévő sor frissítésére a customer_table táblában. Szúrjon be egy új sort a project_table-táblába.
UPDATE customer_table
SET [LastModifytime] = '2017-09-08T00:00:00Z', [name]='NewName' where [PersonID] = 3
INSERT INTO project_table
(Project, Creationtime)
VALUES
('NewProject','10/1/2017 0:00:00 AM');
A folyamat újrafuttatása
Most futtassa újra a folyamatot a következő PowerShell-parancs végrehajtásával:
$RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroup $resourceGroupname -dataFactoryName $dataFactoryName -ParameterFile ".\Parameters.json"A folyamatfuttatások figyelése a folyamat figyelése szakasz utasításait követve. Ha a folyamat állapota folyamatban van, a Műveletek területen egy másik művelethivatkozás jelenik meg a folyamatfuttatás megszakításához.
Válassza a Frissítés lehetőséget a lista frissítéséhez, amíg a folyamat futtatása sikeres nem lesz.
Ha szeretné, válassza a Tevékenységfuttatások megtekintése hivatkozást a Műveletek területen a folyamatfuttatáshoz társított összes tevékenységfuttatás megtekintéséhez.
A végleges eredmények áttekintése
A SQL Server Management Studio futtassa a következő lekérdezéseket a céladatbázison annak ellenőrzéséhez, hogy a frissített/új adatok át lettek-e másolva a forrástáblákból a céltáblákba.
Query
select * from customer_table
Output
===========================================
PersonID Name LastModifytime
===========================================
1 John 2017-09-01 00:56:00.000
2 Mike 2017-09-02 05:23:00.000
3 NewName 2017-09-08 00:00:00.000
4 Andy 2017-09-04 03:21:00.000
5 Anny 2017-09-05 08:06:00.000
Figyelje meg a 3- os számhoz tartozó PersonIDnév és LastModifytime új értékeit.
Query
select * from project_table
Output
===================================
Project Creationtime
===================================
project1 2015-01-01 00:00:00.000
project2 2016-02-02 01:23:00.000
project3 2017-03-04 05:16:00.000
NewProject 2017-10-01 00:00:00.000
Vegye észre, hogy a NewProject bejegyzést hozzáadták a project_table táblához.
Query
select * from watermarktable
Output
======================================
TableName WatermarkValue
======================================
customer_table 2017-09-08 00:00:00.000
project_table 2017-10-01 00:00:00.000
Figyelje meg, hogy mindkét tábla vízjelértékeinek frissítése megtörtént.
Kapcsolódó tartalom
Az oktatóanyagban az alábbi lépéseket hajtotta végre:
- Forrás- és céladattárak előkészítése.
- Adat-előállító létrehozása
- Saját üzemeltetésű integrációs modul (IR) létrehozása.
- Telepítse az integrációs futtatókörnyezetet.
- Társított szolgáltatások létrehozása.
- Forrás-, fogadó- és küszöbadatkészletek létrehozása.
- Folyamat létrehozása, futtatása és figyelése.
- Tekintse át az eredményeket.
- Adatok hozzáadása vagy frissítése a forrástáblákban.
- Futtassa újra és figyelje a csővezetéket.
- Tekintse át a végső eredményeket.
Folytassa a következő oktatóanyaggal, amelyből megtudhatja, hogyan alakíthat át adatokat egy Spark-fürt használatával az Azure-en.