I figured out why it was failing, and worked around to get unblocked. Basically, the conditional split component before the component which the aforementioned error was throwing did not have the default output stream specified. I specified one and the error was gone. However, why is it necessary to specify a default output stream, if we don't want to handle the rows not satisfying any of the conditions we specify in the conditional split component? And, if it is necessary, why is it not a mandatory field and can be removed? I'm coming from SSIS background and the SSIS equivalent of conditional split in the data flow has default output stream mandatory and it cannot be removed from the component UI.
I have another conditional split further down the data flow, however I didn't see the same error at this one despite not specifying the default output stream in it. I guess this is because there's no row reaching this component which does not satisfy any of the condition and would go on default output stream.
Per your request, here is the DSL script for your reference:
source(output(
DEPARTMENT_ID as decimal(19,0),
DEPARTMENT_NAME as string,
DEPARTMENT_DESC as string,
DELETE_FLAG as string,
CREATE_DATE as timestamp,
UPDATE_VERSION as binary
),
allowSchemaDrift: true,
validateSchema: false,
isolationLevel: 'READ_COMMITTED',
format: 'table') ~> departmentDataFromSource
source(output(
ITEM_URN as integer,
ITEM_BIZ_URN as integer,
PKEY as integer,
DEPARTMENT_NAME as string,
DESCRIPTION as string,
DELETED as boolean,
EFFECTIVE_FROM as timestamp,
EFFECTIVE_TO as timestamp
),
allowSchemaDrift: true,
validateSchema: false,
isolationLevel: 'READ_COMMITTED',
format: 'table') ~> departmentDataFromSink
source(output(
DEPARTMENT_ID as decimal(19,0),
DEPARTMENT_NAME as string,
DEPARTMENT_DESC as string,
DELETE_FLAG as string,
CREATE_DATE as timestamp,
UPDATE_VERSION as binary
),
allowSchemaDrift: true,
validateSchema: false,
isolationLevel: 'READ_COMMITTED',
format: 'table') ~> departmentDataFromSource2
departmentDataFromSink split(EFFECTIVE_TO == toTimestamp('2050-12-31 23:59:59.000'),
disjoint: false) ~> CurrentActiveRecords@(CurrentActiveRecords, Default)
DerivedColumn2, CurrentActiveRecords@CurrentActiveRecords exists(DEPARTMENT_ID_INT == PKEY,
negate:true,
broadcast: 'auto')~> NewDepartments
DerivedColumn1, CurrentActiveRecords@CurrentActiveRecords join(DEPARTMENT_ID_INT == PKEY,
joinType:'inner',
broadcast: 'auto')~> JoinWithPreimportedDimensions
Select1 split(DELETE_FLAG == 'y' && DELETED == false(),
SOURCE_DEPARTMENT_NAME != SINK_DEPARTMENT_NAME,
disjoint: false) ~> SCDType1Or2@(Type1, Type2)
NewDepartments derive(PKEY = toInteger(DEPARTMENT_ID),
DELETED = iif(DELETE_FLAG == 'y' || DELETE_FLAG == 'Y', true(), false()),
EFFECTIVE_FROM = CREATE_DATE,
EFFECTIVE_TO = toTimestamp('2050-12-31 23:59:59.000', 'yyyy-MM-dd HH:mm:ss.SSS', 'UTC')) ~> DerivedColumns
Type1ToBeUpdated alterRow(updateIf(DEPARTMENT_ID==PKEY&&DELETED!=iif(DELETE_FLAG=='y'||DELETE_FLAG=='Y',true(),false()))) ~> AlterRow
SCDType1Or2@Type1 select(mapColumn(
DEPARTMENT_ID,
DELETE_FLAG,
PKEY,
DELETED,
ITEM_URN
),
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> Type1ToBeUpdated
AlterRow derive(EFFECTIVE_TO = currentTimestamp(),
PKEY = toInteger(DEPARTMENT_ID)) ~> EffectiveTo
SCDType1Or2@Type2 select(mapColumn(
DEPARTMENT_ID,
DEPARTMENT_NAME = SOURCE_DEPARTMENT_NAME,
DEPARTMENT_DESC,
DELETE_FLAG,
ITEM_BIZ_URN
),
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> ToBeInserted
SCDType1Or2@Type2 select(mapColumn(
ITEM_URN,
PKEY
),
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> ToBeUpdated
ToBeInserted derive(EFFECTIVE_FROM = currentTimestamp(),
EFFECTIVE_TO = toTimestamp('2050-12-31 23:59:59.000', 'yyyy-MM-dd HH:mm:ss.SSS', 'UTC'),
PKEY = toInteger(DEPARTMENT_ID),
DELETED = iif(DELETE_FLAG == 'y' || DELETE_FLAG == 'Y', true(), false())) ~> DerivedColumnToBeInserted
ToBeUpdated derive(EFFECTIVE_TO = currentTimestamp()) ~> DerivedColumnToBeUpdated
DerivedColumnToBeUpdated alterRow(updateIf(true())) ~> AlterRow1
JoinWithPreimportedDimensions select(mapColumn(
DEPARTMENT_ID,
SOURCE_DEPARTMENT_NAME = departmentDataFromSource@DEPARTMENT_NAME,
DEPARTMENT_DESC,
DELETE_FLAG,
CREATE_DATE,
UPDATE_VERSION,
ITEM_URN,
ITEM_BIZ_URN,
PKEY,
SINK_DEPARTMENT_NAME = CurrentActiveRecords@CurrentActiveRecords@DEPARTMENT_NAME,
DESCRIPTION,
DELETED,
EFFECTIVE_FROM,
EFFECTIVE_TO
),
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> Select1
departmentDataFromSource derive(DEPARTMENT_ID_INT = toInteger(DEPARTMENT_ID)) ~> DerivedColumn1
departmentDataFromSource2 derive(DEPARTMENT_ID_INT = toInteger(DEPARTMENT_ID)) ~> DerivedColumn2
DerivedColumns sink(allowSchemaDrift: true,
validateSchema: false,
input(
ITEM_URN as integer,
ITEM_BIZ_URN as integer,
PKEY as integer,
DEPARTMENT_NAME as string,
DESCRIPTION as string,
DELETED as boolean,
EFFECTIVE_FROM as timestamp,
EFFECTIVE_TO as timestamp
),
deletable:false,
insertable:true,
updateable:false,
upsertable:false,
format: 'table',
skipDuplicateMapInputs: true,
errorHandlingOption: 'stopOnFirstError',
mapColumn(
PKEY,
DEPARTMENT_NAME,
DESCRIPTION = DEPARTMENT_DESC,
DELETED,
EFFECTIVE_FROM,
EFFECTIVE_TO
)) ~> InsertNewDimensions
EffectiveTo sink(allowSchemaDrift: true,
validateSchema: false,
input(
ITEM_URN as integer,
ITEM_BIZ_URN as integer,
PKEY as integer,
DEPARTMENT_NAME as string,
DESCRIPTION as string,
DELETED as boolean,
EFFECTIVE_FROM as timestamp,
EFFECTIVE_TO as timestamp
),
deletable:false,
insertable:false,
updateable:true,
upsertable:false,
keys:['PKEY','ITEM_URN'],
skipKeyWrites:true,
format: 'table',
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true,
errorHandlingOption: 'stopOnFirstError',
mapColumn(
DELETED,
EFFECTIVE_TO,
PKEY,
ITEM_URN
)) ~> DeletedDepartments
DerivedColumnToBeInserted sink(allowSchemaDrift: true,
validateSchema: false,
input(
ITEM_URN as integer,
ITEM_BIZ_URN as integer,
PKEY as integer,
DEPARTMENT_NAME as string,
DESCRIPTION as string,
DELETED as boolean,
EFFECTIVE_FROM as timestamp,
EFFECTIVE_TO as timestamp
),
deletable:false,
insertable:true,
updateable:false,
upsertable:false,
format: 'table',
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true,
errorHandlingOption: 'stopOnFirstError',
mapColumn(
ITEM_BIZ_URN,
PKEY,
DEPARTMENT_NAME,
DESCRIPTION = DEPARTMENT_DESC,
DELETED,
EFFECTIVE_FROM,
EFFECTIVE_TO
)) ~> UpdatedDepartmentsType2Insertion
AlterRow1 sink(allowSchemaDrift: true,
validateSchema: false,
input(
ITEM_URN as integer,
ITEM_BIZ_URN as integer,
PKEY as integer,
DEPARTMENT_NAME as string,
DESCRIPTION as string,
DELETED as boolean,
EFFECTIVE_FROM as timestamp,
EFFECTIVE_TO as timestamp
),
deletable:false,
insertable:false,
updateable:true,
upsertable:false,
keys:['PKEY','ITEM_URN'],
skipKeyWrites:true,
format: 'table',
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true,
errorHandlingOption: 'stopOnFirstError',
mapColumn(
ITEM_URN,
PKEY,
EFFECTIVE_TO
)) ~> UpdatedDepartmentsType2Update