Error "Stream #default not found" in the azure data factory data flow.

Ajay Narvekar 1 Reputation point
2021-06-05T17:19:51.863+00:00

I'm getting following error from whetever component I use after a specific join component in the data flow of data factory:

{"StatusCode":"DFExecutorUserError","Message":"Job failed due to reason: at Select 'Select1'(Line 97/Col 25): Stream #default not found","Details":"at Select 'Select1'(Line 97/Col 25): Stream #default not found"}

This is one of the run IDs I got this error at:

d2f77396-c7a7-451d-910a-a3c2a669808f

Azure Data Factory
Azure Data Factory
An Azure service for ingesting, preparing, and transforming data at scale.
9,983 questions
{count} votes

1 answer

Sort by: Most helpful
  1. Ajay Narvekar 1 Reputation point
    2021-06-07T03:20:17.327+00:00

    @Kiran-MSFT

    I figured out why it was failing, and worked around to get unblocked. Basically, the conditional split component before the component which the aforementioned error was throwing did not have the default output stream specified. I specified one and the error was gone. However, why is it necessary to specify a default output stream, if we don't want to handle the rows not satisfying any of the conditions we specify in the conditional split component? And, if it is necessary, why is it not a mandatory field and can be removed? I'm coming from SSIS background and the SSIS equivalent of conditional split in the data flow has default output stream mandatory and it cannot be removed from the component UI.

    I have another conditional split further down the data flow, however I didn't see the same error at this one despite not specifying the default output stream in it. I guess this is because there's no row reaching this component which does not satisfy any of the condition and would go on default output stream.

    Per your request, here is the DSL script for your reference:

    source(output(  
    		DEPARTMENT_ID as decimal(19,0),  
    		DEPARTMENT_NAME as string,  
    		DEPARTMENT_DESC as string,  
    		DELETE_FLAG as string,  
    		CREATE_DATE as timestamp,  
    		UPDATE_VERSION as binary  
    	),  
    	allowSchemaDrift: true,  
    	validateSchema: false,  
    	isolationLevel: 'READ_COMMITTED',  
    	format: 'table') ~> departmentDataFromSource  
    source(output(  
    		ITEM_URN as integer,  
    		ITEM_BIZ_URN as integer,  
    		PKEY as integer,  
    		DEPARTMENT_NAME as string,  
    		DESCRIPTION as string,  
    		DELETED as boolean,  
    		EFFECTIVE_FROM as timestamp,  
    		EFFECTIVE_TO as timestamp  
    	),  
    	allowSchemaDrift: true,  
    	validateSchema: false,  
    	isolationLevel: 'READ_COMMITTED',  
    	format: 'table') ~> departmentDataFromSink  
    source(output(  
    		DEPARTMENT_ID as decimal(19,0),  
    		DEPARTMENT_NAME as string,  
    		DEPARTMENT_DESC as string,  
    		DELETE_FLAG as string,  
    		CREATE_DATE as timestamp,  
    		UPDATE_VERSION as binary  
    	),  
    	allowSchemaDrift: true,  
    	validateSchema: false,  
    	isolationLevel: 'READ_COMMITTED',  
    	format: 'table') ~> departmentDataFromSource2  
    departmentDataFromSink split(EFFECTIVE_TO == toTimestamp('2050-12-31 23:59:59.000'),  
    	disjoint: false) ~> CurrentActiveRecords@(CurrentActiveRecords, Default)  
    DerivedColumn2, CurrentActiveRecords@CurrentActiveRecords exists(DEPARTMENT_ID_INT == PKEY,  
    	negate:true,  
    	broadcast: 'auto')~> NewDepartments  
    DerivedColumn1, CurrentActiveRecords@CurrentActiveRecords join(DEPARTMENT_ID_INT == PKEY,  
    	joinType:'inner',  
    	broadcast: 'auto')~> JoinWithPreimportedDimensions  
    Select1 split(DELETE_FLAG == 'y' && DELETED == false(),  
    	SOURCE_DEPARTMENT_NAME != SINK_DEPARTMENT_NAME,  
    	disjoint: false) ~> SCDType1Or2@(Type1, Type2)  
    NewDepartments derive(PKEY = toInteger(DEPARTMENT_ID),  
    		DELETED = iif(DELETE_FLAG == 'y' || DELETE_FLAG == 'Y', true(), false()),  
    		EFFECTIVE_FROM = CREATE_DATE,  
    		EFFECTIVE_TO = toTimestamp('2050-12-31 23:59:59.000', 'yyyy-MM-dd HH:mm:ss.SSS', 'UTC')) ~> DerivedColumns  
    Type1ToBeUpdated alterRow(updateIf(DEPARTMENT_ID==PKEY&&DELETED!=iif(DELETE_FLAG=='y'||DELETE_FLAG=='Y',true(),false()))) ~> AlterRow  
    SCDType1Or2@Type1 select(mapColumn(  
    		DEPARTMENT_ID,  
    		DELETE_FLAG,  
    		PKEY,  
    		DELETED,  
    		ITEM_URN  
    	),  
    	skipDuplicateMapInputs: true,  
    	skipDuplicateMapOutputs: true) ~> Type1ToBeUpdated  
    AlterRow derive(EFFECTIVE_TO = currentTimestamp(),  
    		PKEY = toInteger(DEPARTMENT_ID)) ~> EffectiveTo  
    SCDType1Or2@Type2 select(mapColumn(  
    		DEPARTMENT_ID,  
    		DEPARTMENT_NAME = SOURCE_DEPARTMENT_NAME,  
    		DEPARTMENT_DESC,  
    		DELETE_FLAG,  
    		ITEM_BIZ_URN  
    	),  
    	skipDuplicateMapInputs: true,  
    	skipDuplicateMapOutputs: true) ~> ToBeInserted  
    SCDType1Or2@Type2 select(mapColumn(  
    		ITEM_URN,  
    		PKEY  
    	),  
    	skipDuplicateMapInputs: true,  
    	skipDuplicateMapOutputs: true) ~> ToBeUpdated  
    ToBeInserted derive(EFFECTIVE_FROM = currentTimestamp(),  
    		EFFECTIVE_TO = toTimestamp('2050-12-31 23:59:59.000', 'yyyy-MM-dd HH:mm:ss.SSS', 'UTC'),  
    		PKEY = toInteger(DEPARTMENT_ID),  
    		DELETED = iif(DELETE_FLAG == 'y' || DELETE_FLAG == 'Y', true(), false())) ~> DerivedColumnToBeInserted  
    ToBeUpdated derive(EFFECTIVE_TO = currentTimestamp()) ~> DerivedColumnToBeUpdated  
    DerivedColumnToBeUpdated alterRow(updateIf(true())) ~> AlterRow1  
    JoinWithPreimportedDimensions select(mapColumn(  
    		DEPARTMENT_ID,  
    		SOURCE_DEPARTMENT_NAME = departmentDataFromSource@DEPARTMENT_NAME,  
    		DEPARTMENT_DESC,  
    		DELETE_FLAG,  
    		CREATE_DATE,  
    		UPDATE_VERSION,  
    		ITEM_URN,  
    		ITEM_BIZ_URN,  
    		PKEY,  
    		SINK_DEPARTMENT_NAME = CurrentActiveRecords@CurrentActiveRecords@DEPARTMENT_NAME,  
    		DESCRIPTION,  
    		DELETED,  
    		EFFECTIVE_FROM,  
    		EFFECTIVE_TO  
    	),  
    	skipDuplicateMapInputs: true,  
    	skipDuplicateMapOutputs: true) ~> Select1  
    departmentDataFromSource derive(DEPARTMENT_ID_INT = toInteger(DEPARTMENT_ID)) ~> DerivedColumn1  
    departmentDataFromSource2 derive(DEPARTMENT_ID_INT = toInteger(DEPARTMENT_ID)) ~> DerivedColumn2  
    DerivedColumns sink(allowSchemaDrift: true,  
    	validateSchema: false,  
    	input(  
    		ITEM_URN as integer,  
    		ITEM_BIZ_URN as integer,  
    		PKEY as integer,  
    		DEPARTMENT_NAME as string,  
    		DESCRIPTION as string,  
    		DELETED as boolean,  
    		EFFECTIVE_FROM as timestamp,  
    		EFFECTIVE_TO as timestamp  
    	),  
    	deletable:false,  
    	insertable:true,  
    	updateable:false,  
    	upsertable:false,  
    	format: 'table',  
    	skipDuplicateMapInputs: true,  
    	errorHandlingOption: 'stopOnFirstError',  
    	mapColumn(  
    		PKEY,  
    		DEPARTMENT_NAME,  
    		DESCRIPTION = DEPARTMENT_DESC,  
    		DELETED,  
    		EFFECTIVE_FROM,  
    		EFFECTIVE_TO  
    	)) ~> InsertNewDimensions  
    EffectiveTo sink(allowSchemaDrift: true,  
    	validateSchema: false,  
    	input(  
    		ITEM_URN as integer,  
    		ITEM_BIZ_URN as integer,  
    		PKEY as integer,  
    		DEPARTMENT_NAME as string,  
    		DESCRIPTION as string,  
    		DELETED as boolean,  
    		EFFECTIVE_FROM as timestamp,  
    		EFFECTIVE_TO as timestamp  
    	),  
    	deletable:false,  
    	insertable:false,  
    	updateable:true,  
    	upsertable:false,  
    	keys:['PKEY','ITEM_URN'],  
    	skipKeyWrites:true,  
    	format: 'table',  
    	skipDuplicateMapInputs: true,  
    	skipDuplicateMapOutputs: true,  
    	errorHandlingOption: 'stopOnFirstError',  
    	mapColumn(  
    		DELETED,  
    		EFFECTIVE_TO,  
    		PKEY,  
    		ITEM_URN  
    	)) ~> DeletedDepartments  
    DerivedColumnToBeInserted sink(allowSchemaDrift: true,  
    	validateSchema: false,  
    	input(  
    		ITEM_URN as integer,  
    		ITEM_BIZ_URN as integer,  
    		PKEY as integer,  
    		DEPARTMENT_NAME as string,  
    		DESCRIPTION as string,  
    		DELETED as boolean,  
    		EFFECTIVE_FROM as timestamp,  
    		EFFECTIVE_TO as timestamp  
    	),  
    	deletable:false,  
    	insertable:true,  
    	updateable:false,  
    	upsertable:false,  
    	format: 'table',  
    	skipDuplicateMapInputs: true,  
    	skipDuplicateMapOutputs: true,  
    	errorHandlingOption: 'stopOnFirstError',  
    	mapColumn(  
    		ITEM_BIZ_URN,  
    		PKEY,  
    		DEPARTMENT_NAME,  
    		DESCRIPTION = DEPARTMENT_DESC,  
    		DELETED,  
    		EFFECTIVE_FROM,  
    		EFFECTIVE_TO  
    	)) ~> UpdatedDepartmentsType2Insertion  
    AlterRow1 sink(allowSchemaDrift: true,  
    	validateSchema: false,  
    	input(  
    		ITEM_URN as integer,  
    		ITEM_BIZ_URN as integer,  
    		PKEY as integer,  
    		DEPARTMENT_NAME as string,  
    		DESCRIPTION as string,  
    		DELETED as boolean,  
    		EFFECTIVE_FROM as timestamp,  
    		EFFECTIVE_TO as timestamp  
    	),  
    	deletable:false,  
    	insertable:false,  
    	updateable:true,  
    	upsertable:false,  
    	keys:['PKEY','ITEM_URN'],  
    	skipKeyWrites:true,  
    	format: 'table',  
    	skipDuplicateMapInputs: true,  
    	skipDuplicateMapOutputs: true,  
    	errorHandlingOption: 'stopOnFirstError',  
    	mapColumn(  
    		ITEM_URN,  
    		PKEY,  
    		EFFECTIVE_TO  
    	)) ~> UpdatedDepartmentsType2Update