Have you tried testing this by hardcoding the query string in the source?
pass select query in filter adf

I am trying to pass select query in my filter .I am getting {"StatusCode":"DF-Executor-StoreIsNotDefined","Message":"Job failed due to reason: The store configuration is not defined. This error is potentially caused by invalid parameter assignement in the pipeline.","Details":""}
parameters{
srcTable as string,
trgTable as string,
filter as string,
unique_columns as string,
cdc_columns as string,
partition_columns as string,
load_type as string,
max_cdc_column as timestamp
}
source(output(
PERSONID as decimal(38,0),
NAME as string,
LASTMODIFYTIME as timestamp
),
allowSchemaDrift: true,
validateSchema: false,
query: (concat('select * from ',$srcTable,' where ',$filter)),
format: 'query') ~> source
Filter1 alterRow(insertIf(equalsIgnoreCase($load_type,'insert')),
updateIf(equalsIgnoreCase($load_type,'update')),
upsertIf(equalsIgnoreCase($load_type,'merge'))) ~> AlterRow1
source filter($cdc_columns> 'select WATERMARKVALUE from watermarktable where TABLENAME = $srcTable') ~> Filter1
AlterRow1 sink(allowSchemaDrift: true,
validateSchema: false,
deletable:false,
insertable:true,
updateable:true,
upsertable:false,
keys:(split($unique_columns,',')),
format: 'table',
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true,
saveOrder: 1,
stageInsert: true) ~> sink1
Hi @Aditya Raj ,
Did you get chance to try above comment mentioned ask? Kindly share updates, so that we can try to repro and help in resolution. Thank you.
Hi @Aditya Raj ,
Tried above comment mentioned ask? Kindly share updates, so that we can try to repro and help in resolution. Thank you.
I tried it and its working
Hi @Aditya Raj Thank you for confirming and accepting answer.
Sign in to comment