Example: Combine multiple bronze tables into one silver table

Important

Some or all of this functionality is available as part of a preview release. The content and the functionality are subject to change.

This example shows a general pattern for combining multiple bronze tables into one new silver table.

The scenario in this article creates a new silver OutreachTouchpoint table from two Dynamics 365 Sales activity tables:

  • email
  • phonecall

When to use this pattern

Use this pattern when multiple bronze tables represent one analytical concept and can be normalized to one shared silver grain.

Add the silver schema

Open the Fundraising_SL_CreateSchema notebook and add the schema for the new table.

@staticmethod
def get_outreachtouchpoint_schema() -> StructType:
    return StructType([
        StructField("OutreachTouchpointId", StringType(), nullable=False, metadata={ "primaryKey": True, "pkType": "guid" }),
        StructField("CampaignId", StringType()),
        StructField("ChannelId", StringType()),
        StructField("ActivityType", StringType(), nullable=False),
        StructField("ActivityDate", TimestampType()),
        StructField("Subject", StringType()),
        StructField("Description", StringType()),
        StructField("CreatedDate", TimestampType()),
        StructField("ModifiedDate", TimestampType()),
        StructField("SourceId", StringType(), nullable=False),
        StructField("SourceSystemId", StringType())
    ])

Register the table in get_entities().

def get_entities(self) -> list[tuple[StructType, str, bool]]:
    return [
        # ... existing tables ...
        (NonprofitSilverModel.get_outreachtouchpoint_schema(), "OutreachTouchpoint", True),
    ]

Use a union-safe SourceSystemId value such as email:<Id> and phonecall:<Id> so the two source tables can't collide.

Build a staging view from both bronze tables

Open the Fundraising_D365_Transform notebook and create a temporary view that normalizes both tables to the same shape.

spark.sql(f"""
CREATE OR REPLACE TEMPORARY VIEW outreach_touchpoint_stage_union AS
SELECT
    concat('email:', cast(Id as string)) AS StageId,
    'Email' AS ActivityType,
    Id AS NativeActivityId,
    subject,
    description,
    coalesce(actualend, scheduledend, createdon) AS ActivityDate,
    regardingobjectid,
    regardingobjectid_entitytype,
    createdon,
    modifiedon,
    IsDelete,
    SinkModifiedOn
FROM {bronze_lakehouse_name}.email
WHERE Id IS NOT NULL

UNION ALL

SELECT
    concat('phonecall:', cast(Id as string)) AS StageId,
    'Phonecall' AS ActivityType,
    Id AS NativeActivityId,
    subject,
    description,
    coalesce(actualend, actualstart, createdon) AS ActivityDate,
    regardingobjectid,
    regardingobjectid_entitytype,
    createdon,
    modifiedon,
    IsDelete,
    SinkModifiedOn
FROM {bronze_lakehouse_name}.phonecall
WHERE Id IS NOT NULL
""")

Transform and sync the new silver table

Create a transform function that enriches both activity types and then syncs them into the new table.

from functools import reduce
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, lit

def transform_outreach_touchpoint(df: DataFrame) -> DataFrame:
    with_campaign = resolve_campaign_for_activity(df)

    email_channel = lookup_channel_by_name(CHANNEL_EMAIL)
    phone_channel = lookup_channel_by_name(CHANNEL_PHONE_CALL)

    email_rows = (
        with_campaign
        .filter(col("ActivityType") == lit("Email"))
        .crossJoin(email_channel)
    )

    phone_rows = (
        with_campaign
        .filter(col("ActivityType") == lit("Phonecall"))
        .crossJoin(phone_channel)
    )

    unioned = reduce(lambda left, right: left.unionByName(right), [email_rows, phone_rows])

    return unioned.select(
        col("StageId").alias("SourceSystemId"),
        col("CampaignId"),
        col("ChannelId"),
        col("ActivityType"),
        col("ActivityDate").cast("timestamp").alias("ActivityDate"),
        col("subject").alias("Subject"),
        col("description").alias("Description"),
        col("createdon").cast("timestamp").alias("CreatedDate"),
        col("modifiedon").cast("timestamp").alias("ModifiedDate")
    )

data_sync.sync_table(
    source_table="outreach_touchpoint_stage_union",
    source_primary_key="StageId",
    source_columns=[
        "StageId", "ActivityType", "NativeActivityId", "subject", "description",
        "ActivityDate", "regardingobjectid", "regardingobjectid_entitytype",
        "createdon", "modifiedon"
    ],
    target_table="OutreachTouchpoint",
    target_primary_key="OutreachTouchpointId",
    transform_func=transform_outreach_touchpoint,
    source_table_lakehouse=""
)

Use source_table_lakehouse="" because the source is a temporary view rather than a lakehouse table.

Verify the result

  1. Open the OutreachTouchpoint table.
  2. Confirm that rows exist for both Email and Phonecall in ActivityType.
  3. Confirm that SourceSystemId values are prefixed, such as email:<guid> and phonecall:<guid>.
  4. Confirm that ChannelId is populated from the existing channel dimension.
  5. Confirm that CampaignId is populated for campaign-related activities.

Considerations

  • Keep only the columns that are shared across all source tables.
  • Include SinkModifiedOn and IsDelete in the staging view so incremental processing still works.
  • If source IDs can collide, prefix them before the union.
  • If one side is missing a column, add a NULL placeholder and cast it to the shared type before the union.

See also