Esercizio - Progettare e implementare una dimensione a modifica lenta di tipo 1 con flussi di dati per mapping
In questo esercizio viene creato un flusso di dati per una dimensione a modifica lenta di tipo 1 usando il pool SQL dedicato di Azure Synapse come origine e destinazione. Questo flusso di dati può quindi essere aggiunto a un'istanza di Synapse Pipelines ed eseguito come parte del processo ETL (estrazione, trasformazione e caricamento).
Configurazione dei dati di origine e della tabella delle dimensioni
Per questo esercizio si vuole caricare una tabella delle dimensioni in Azure Synapse da dati di origine che possono provenire da molti tipi di sistemi diversi, ad esempio SQL di Azure, Archiviazione di Azure e così via. L'esempio viene mantenuto semplice creando i dati di origine nel database di Azure Synapse.
In Synapse Studio passare all'hub Dati.
Selezionare la scheda Area di lavoro(1), espandere Database, quindi fare clic con il pulsante destro del mouse su SQLPool01 (2). Selezionare Nuovo script SQL (3), quindi Script vuoto (4).
Incollare lo script seguente nella finestra Script vuoto, quindi selezionare Esegui o premere
F5
per eseguire la query:CREATE TABLE [dbo].[CustomerSource] ( [CustomerID] [int] NOT NULL, [Title] [nvarchar](8), [FirstName] [nvarchar](50), [MiddleName] [nvarchar](50), [LastName] [nvarchar](50), [Suffix] [nvarchar](10), [CompanyName] [nvarchar](128), [SalesPerson] [nvarchar](256), [EmailAddress] [nvarchar](50), [Phone] [nvarchar](25) ) WITH ( HEAP ) COPY INTO [dbo].[CustomerSource] FROM 'https://solliancepublicdata.blob.core.windows.net/dataengineering/dp-203/awdata/CustomerSource.csv' WITH ( FILE_TYPE='CSV', FIELDTERMINATOR='|', FIELDQUOTE='', ROWTERMINATOR='0x0a', ENCODING = 'UTF16' ) CREATE TABLE dbo.[DimCustomer]( [CustomerID] [int] NOT NULL, [Title] [nvarchar](8) NULL, [FirstName] [nvarchar](50) NOT NULL, [MiddleName] [nvarchar](50) NULL, [LastName] [nvarchar](50) NOT NULL, [Suffix] [nvarchar](10) NULL, [CompanyName] [nvarchar](128) NULL, [SalesPerson] [nvarchar](256) NULL, [EmailAddress] [nvarchar](50) NULL, [Phone] [nvarchar](25) NULL, [InsertedDate] [datetime] NOT NULL, [ModifiedDate] [datetime] NOT NULL, [HashKey] [char](64) ) WITH ( DISTRIBUTION = REPLICATE, CLUSTERED COLUMNSTORE INDEX )
Creare un flusso di dati per mapping
I flussi di dati per mapping sono attività della pipeline che consentono di specificare in modo visivo come trasformare i dati, tramite un'esperienza che non richiede la scrittura di codice. Di seguito viene creato un flusso di dati per mapping per creare una dimensione a modifica lenta di tipo 1.
Passare all'hub Sviluppo.
Selezionare +, quindi selezionare Flusso di dati.
Nel riquadro delle proprietà del nuovo flusso di dati immettere
UpdateCustomerDimension
nel campo Nome(1), quindi selezionare il pulsante Proprietà(2) per nascondere il riquadro delle proprietà.Selezionare Aggiungi origine nell'area di disegno.
In
Source settings
configurare le proprietà seguenti:- Nome flusso di output: immettere
SourceDB
- Tipo di origine: selezionare
Dataset
- Opzioni: selezionare
Allow schema drift
e lasciare deselezionate le altre opzioni - Campionamento: selezionare
Disable
- Set di dati: selezionare + Nuovo per creare un nuovo set di dati
- Nome flusso di output: immettere
Nella finestra di dialogo Nuovo set di dati di integrazione selezionare Azure Synapse Analyticse quindi Continua.
Nelle proprietà del set di dati configurare le impostazioni seguenti:
- Nome: immettere
CustomerSource
- Servizio collegato: selezionare il servizio collegato dell'area di lavoro di Synapse
- Nome tabella: selezionare il pulsante Aggiorna accanto all'elenco a discesa
- Nome: immettere
Nel campo Valore immettere il nome del pool SQL, quindi selezionare OK.
Selezionare
dbo.CustomerSource
in Nome tabella, selezionareFrom connection/store
in Importa schema, quindi fare clic su OK per creare il set di dati.Selezionare Apri accanto al set di dati
CustomerSource
aggiunto.Immettere il nome del pool SQL nel campo Valore accanto a
DBName
.Nell'editor del flusso di dati selezionare la casella Aggiungi origine sotto l'attività SourceDB. Configurare questa origine come tabella DimCustomer seguendo la stessa procedura usata per CustomerSource.
- Nome flusso di output: immettere
DimCustomer
- Tipo di origine: selezionare
Dataset
- Opzioni: selezionare
Allow schema drift
e lasciare deselezionate le altre opzioni - Campionamento: selezionare
Disable
- Set di dati: selezionare + Nuovo per creare un nuovo set di dati. Usare il servizio collegato Azure Synapse e scegliere la tabella DimCustomer. Assicurarsi di impostare DBName sul nome del pool SQL.
- Nome flusso di output: immettere
Aggiungere trasformazioni al flusso di dati
Selezionare + a destra dell'origine
SourceDB
nell'area di disegno e quindi selezionare Colonna derivata.In
Derived column's settings
configurare le proprietà seguenti:- Nome flusso di output: immettere
CreateCustomerHash
- Incoming stream (Flusso in ingresso): selezionare
SourceDB
- Colonne: immettere i valori seguenti:
Column Expression Descrizione Digitare HashKey
sha2(256, iifNull(Title,'') +FirstName +iifNull(MiddleName,'') +LastName +iifNull(Suffix,'') +iifNull(CompanyName,'') +iifNull(SalesPerson,'') +iifNull(EmailAddress,'') +iifNull(Phone,''))
Crea un hash SHA256 dei valori della tabella. Questo viene usato per rilevare le modifiche alle righe confrontando l'hash dei record in ingresso con il valore hash dei record di destinazione, corrispondente al valore CustomerID
. La funzioneiifNull
sostituisce i valori Null con stringhe vuote. In caso contrario, i valori hash tendono a essere duplicati quando sono presenti voci Null.- Nome flusso di output: immettere
Selezionare + a destra della colonna derivata
CreateCustomerHash
nell'area di disegno, quindi selezionare Esiste.In
Exists settings
configurare le proprietà seguenti:- Nome flusso di output: immettere
Exists
- Flusso di sinistra: selezionare
CreateCustomerHash
- Flusso di destra: selezionare
SynapseDimCustomer
- Tipo esistente: selezionare
Doesn't exist
- Condizioni Exists: impostare i valori seguenti per i flussi destro e sinistro:
Sinistro: colonna di CreateCustomerHash Destro: colonna di SynapseDimCustomer HashKey
HashKey
- Nome flusso di output: immettere
Selezionare + a destra di
Exists
nell'area di disegno, quindi selezionare Ricerca.In
Lookup settings
configurare le proprietà seguenti:- Nome flusso di output: immettere
LookupCustomerID
- Primary stream (Flusso primario): selezionare
Exists
- Lookup stream (Flusso di ricerca): selezionare
SynapseDimCustomer
- Corrispondenza in più righe: deselezionata
- Corrispondenza in: selezionare
Any row
- Lookup conditions (Condizioni di ricerca): impostare i valori seguenti per i flussi destro e sinistro:
Sinistro: colonna di Exists Destro: colonna di SynapseDimCustomer CustomerID
CustomerID
- Nome flusso di output: immettere
Selezionare + a destra di
LookupCustomerID
nell'area di disegno, quindi selezionare Colonna derivata.In
Derived column's settings
configurare le proprietà seguenti:- Nome flusso di output: immettere
SetDates
- Incoming stream (Flusso in ingresso): selezionare
LookupCustomerID
- Colonne: immettere i valori seguenti:
Column Expression Descrizione Selezionare InsertedDate
iif(isNull(InsertedDate), currentTimestamp(), {InsertedDate})
Se il valore di InsertedDate
è Null, inserire il timestamp corrente. In caso contrario, usare il valore diInsertedDate
.Selezionare ModifiedDate
currentTimestamp()
Aggiornare sempre il valore di ModifiedDate
con il timestamp corrente.Nota
Per inserire la seconda colonna, selezionare + Aggiungi sopra l'elenco Colonne, quindi selezionare Aggiungi colonna.
- Nome flusso di output: immettere
Selezionare + a destra del passaggio della colonna derivata
SetDates
nell'area di disegno, quindi selezionare Modifica riga.In
Alter row settings
configurare le proprietà seguenti:- Nome flusso di output: immettere
AllowUpserts
- Incoming stream (Flusso in ingresso): selezionare
SetDates
- Modifica le condizioni di riga: immettere i valori seguenti:
Condizione Expression Descrizione Selezionare Upsert if
true()
Impostare la condizione su true()
nella condizioneUpsert if
per consentire le operazioni di upsert. In questo modo, si garantisce che tutti i dati che attraversano i passaggi nel flusso di dati per mapping vengano inseriti o aggiornati nel sink.- Nome flusso di output: immettere
Selezionare + a destra del passaggio di modifica riga
AllowUpserts
nell'area di disegno, quindi selezionare Sink.In
Sink
configurare le proprietà seguenti:- Nome flusso di output: immettere
Sink
- Incoming stream (Flusso in ingresso): selezionare
AllowUpserts
- Tipo di sink: selezionare
Dataset
- Set di dati: selezionare
DimCustomer
- Opzioni: selezionare
Allow schema drift
e deselezionareValidate schema
- Nome flusso di output: immettere
Selezionare la scheda Impostazioni e configurare le proprietà seguenti:
- Metodo di aggiornamento: selezionare
Allow upsert
e deselezionare tutte le altre opzioni - Colonne chiave: selezionare
List of columns
, quindi selezionareCustomerID
nell'elenco - Azione tabella: selezionare
None
- Abilita staging: deselezionata
- Metodo di aggiornamento: selezionare
Selezionare la scheda Mapping, quindi deselezionare Auto mapping. Configurare il mapping delle colonne di input come descritto di seguito:
Colonne di input Colonne di output SourceDB@CustomerID
CustomerID
SourceDB@Title
Title
SourceDB@FirstName
FirstName
SourceDB@MiddleName
MiddleName
SourceDB@LastName
LastName
SourceDB@Suffix
Suffix
SourceDB@CompanyName
CompanyName
SourceDB@SalesPerson
SalesPerson
SourceDB@EmailAddress
EmailAddress
SourceDB@Phone
Phone
InsertedDate
InsertedDate
ModifiedDate
ModifiedDate
CreateCustomerHash@HashKey
HashKey
Il flusso per mapping completo sarà simile al seguente. Selezionare Pubblica tutti per salvare le modifiche.
Selezionare Pubblica.
Come testare il flusso di dati
Il flusso di dati per la dimensione a modifica lenta di tipo 1 è stato completato. Se si sceglie di testare il flusso di dati, è possibile aggiungerlo a una pipeline di integrazione di Synapse. È quindi possibile eseguire la pipeline una volta per effettuare il caricamento iniziale dei dati di origine del cliente nella destinazione DimCustomer.
Ogni esecuzione aggiuntiva della pipeline confronterà i dati nella tabella di origine con quelli già presenti nella tabella delle dimensioni (usando HashKey) e aggiornerà solo i record modificati. Per eseguire il test, è possibile aggiornare un record nella tabella di origine e quindi eseguire di nuovo la pipeline e verificare gli aggiornamenti dei record nella tabella delle dimensioni.
Esaminare il cliente Janet Gates come esempio. Il caricamento iniziale mostra che LastName
è Gates e CustomerId
è 4.
Qui è mostrata un'istruzione di esempio che aggiorna il cognome del cliente nella tabella di origine.
UPDATE [dbo].[CustomerSource]
SET LastName = 'Lopez'
WHERE [CustomerId] = 4
Dopo aver aggiornato il record e aver eseguito di nuovo la pipeline, DimCustomer mostrerà questi dati aggiornati.
Il record del cliente ha aggiornato il valore di LastName
in modo che corrisponda al record di origine e ha aggiornato ModifiedDate
, senza tenere traccia del valore precedente di LastName
. Questo è il comportamento previsto per una dimensione a modifica lenta di tipo 1. Se per il campo LastName
è necessaria la cronologia, modificare la tabella e il flusso di dati in modo che corrispondano agli altri tipi di dimensione a modifica lenta appresi qui.