Update Policy Fuction

Bexy Morgan 260 Reputation points
2023-10-17T17:49:41.7166667+00:00

Hey @Sander van de Velde | MVP My previous question got reported unfortunately, I think my question will be clear now. How should the write the update policy here to ingest the data from the below 3 functions sta( ), stb( ), stc( ) into Target Table. How to I write the update policy function for this. Final table name is Target Table and its schema also include in the below code snippet.

.create table SourceTableA (PK:string,A1:string,A2:int)
.create table SourceTableB (PK:string,B1:string,B2:int)
.create table SourceTableC (PK:string,C1:string)

.ingest inline into table ["SourceTableA"] <|
"ABC","Q",88
"DEF","W",11

.ingest inline into table ["SourceTableB"] <|
"ABC","Al",100
"DEF","Md",1111
"FGH","Bd",9824

.ingest inline into table ["SourceTableC"] <|
"ABC","Quad"
"FGH","Tex"


.create function
 with (docstring = 'STA', folder = 'UpdatePolicyFunctions')
     sta()  
{
SourceTableA
| extend extraAcolumn1 = A2+2
|extend extraAcolumn2 = strcat("anonymous",A1)
}

.create function
 with (docstring = 'STB', folder = 'UpdatePolicyFunctions')
     stb()  
{
SourceTableB
| extend extraBcolumn = 1 + B2
}


.create function
 with (docstring = 'STC', folder = 'UpdatePolicyFunctions')
     stc()  
{
SourceTableC
| extend extraCcolumn = strcat("anonymous", C1) 
}

.create table Target (PK:string,A1:string,A2:int,B1:string,B2:int,C1:string,extraAcolumn1:int,extraAcolumn2:string,
extraBcolumn:int,extraCcolumn:string)

Target
Azure Data Explorer
Azure Data Explorer
An Azure data analytics service for real-time analysis on large volumes of data streaming from sources including applications, websites, and internet of things devices.
561 questions
0 comments No comments
{count} votes

Accepted answer
  1. Sander van de Velde | MVP 35,971 Reputation points MVP
    2023-10-17T21:13:28.2866667+00:00

    Hello Bexy,

    I finally found some time to write the script.

    Here you see three source tables filling the same (one) target table:

    .create table SourceTableA (PK:string, A1:string, A2:int)
    
    .create table SourceTableB (PK:string, B1:string, B2:int)
    
    .create table SourceTableC (PK:string, C1:string)
    
    .alter table SourceTableA policy streamingingestion enable
    
    .alter table SourceTableB policy streamingingestion enable
    
    .alter table SourceTableC policy streamingingestion enable
    
    .create table Target (PK:string, A1:string, A2:int, B1:string, B2:int, C1:string, extraAcolumn1:int, extraAcolumn2:string, extraBcolumn:int, extraCcolumn:string)
    
    .create function SourceAToTarget()  
    {
    SourceTableA
    | project PK , A1 , A2
    | extend B1 = '', B2 = toint(1) , C1 = '', extraAcolumn1 = toint(1), extraAcolumn2 = '', extraBcolumn = toint(1), extraCcolumn = ''
    }
    
    .create function SourceBToTarget()  
    {
    SourceTableB
    | project PK, B1 , B2 
    | extend A1 = '', A2 = toint(1) 
    | extend  C1 = '', extraAcolumn1 = toint(1), extraAcolumn2 = '', extraBcolumn = toint(1), extraCcolumn = ''
    | project-reorder PK, A1, A2, B1, B2
    }
    
    .create function SourceCToTarget()  
    {
    SourceTableC
    | project PK , C1
    | extend A1 = '', A2 = toint(1), B1 = '', B2 = toint(1), extraAcolumn1 = toint(1), extraAcolumn2 = '', extraBcolumn = toint(1), extraCcolumn = ''
    | project-reorder PK, A1, A2, B1, B2, C1
    }
    
    .alter table Target policy update  @'[{"Source": "SourceTableA", "Query": "SourceAToTarget", "IsEnabled" : true, "IsTransactional": true },{"Source": "SourceTableB", "Query": "SourceBToTarget", "IsEnabled" : true, "IsTransactional": true },{"Source": "SourceTableC", "Query": "SourceCToTarget", "IsEnabled" : true, "IsTransactional": true }]'
    
    .show table Target policy update 
    
    .ingest inline into table ["SourceTableA"] <|
    "pk1a","a1",1
    "pk2a","a2",2
    
    SourceTableA
    
    Target
    
    .ingest inline into table ["SourceTableB"] <|
    "pk1b","bl",3
    "pk2b","b2",4
    
    SourceTableB
    
    Target
    
    
    .ingest inline into table ["SourceTableC"] <|
    "pk1c","c1"
    "pk2c","c2"
    
    SourceTableC
    
    Target
    
    

    First, check out '.alter table Target policy update', it expects an array of policies:

    User's image

    Finally, the order and type of the columns outputted by the queries are specific:

    User's image

    Now you know how to combine multiple update policies.


    If the response helped, do "Accept Answer". If it doesn't work, please let us know the progress. All community members with similar issues will benefit by doing so. Your contribution is highly appreciated.


1 additional answer

Sort by: Most helpful
  1. Sander van de Velde | MVP 35,971 Reputation points MVP
    2023-10-17T18:01:44.89+00:00

    Hello Bexy,

    You want to ingest data from three different (source) tables into one (target) table.

    A (target) table can have multiple update policies.

    Follow this blog post regarding adding a (one) update policy.

    Just give your target table three separate update policies, each for another source table with a custom update policy function.

    So you probably need three functions.

    All (three) functions receive messages from their source table and need to honor the target table format.

    Each function and policy would look a bit like this:

    .create function
    with (docstring = 'Ingest raw source A data and project to target table', folder='ingestprojection') 
    ParseSourceA ()
    {
      MySourceTableA
        | project 
          counter = toint(rawpayload.counter),
          ts = todatetime(rawpayload.ts),
          IotHubDeviceId = tostring(IotHubDeviceId)
    }
     
    .alter table
    TargetTable 
    policy update @'[{"Source": "MySourceTableA", "Query": "ParseSourceA", "IsEnabled" : true, "IsTransactional": true }]'
    
    

    Notice the function seems to query the whole table 'MySourceTableA'. If will only read newly ingested rows.. every time it is executed.

    So please repeat this three times. Each function should output rows in the same format as the target table columns.

    In the end, you have three separate functions and a target table having three update policies.


    If the response helped, do "Accept Answer". If it doesn't work, please let us know the progress. All community members with similar issues will benefit by doing so. Your contribution is highly appreciated.


Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.