Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of mappen te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen om mappen te wijzigen.
Important
Some or all of this functionality is available as part of a preview release. The content and the functionality are subject to change.
This example shows a general pattern for combining multiple bronze tables into one new silver table.
The scenario in this article creates a new silver OutreachTouchpoint table from two Dynamics 365 Sales activity tables:
emailphonecall
When to use this pattern
Use this pattern when multiple bronze tables represent one analytical concept and can be normalized to one shared silver grain.
Add the silver schema
Open the Fundraising_SL_CreateSchema notebook and add the schema for the new table.
@staticmethod
def get_outreachtouchpoint_schema() -> StructType:
return StructType([
StructField("OutreachTouchpointId", StringType(), nullable=False, metadata={ "primaryKey": True, "pkType": "guid" }),
StructField("CampaignId", StringType()),
StructField("ChannelId", StringType()),
StructField("ActivityType", StringType(), nullable=False),
StructField("ActivityDate", TimestampType()),
StructField("Subject", StringType()),
StructField("Description", StringType()),
StructField("CreatedDate", TimestampType()),
StructField("ModifiedDate", TimestampType()),
StructField("SourceId", StringType(), nullable=False),
StructField("SourceSystemId", StringType())
])
Register the table in get_entities().
def get_entities(self) -> list[tuple[StructType, str, bool]]:
return [
# ... existing tables ...
(NonprofitSilverModel.get_outreachtouchpoint_schema(), "OutreachTouchpoint", True),
]
Use a union-safe SourceSystemId value such as email:<Id> and phonecall:<Id> so the two source tables can't collide.
Build a staging view from both bronze tables
Open the Fundraising_D365_Transform notebook and create a temporary view that normalizes both tables to the same shape.
spark.sql(f"""
CREATE OR REPLACE TEMPORARY VIEW outreach_touchpoint_stage_union AS
SELECT
concat('email:', cast(Id as string)) AS StageId,
'Email' AS ActivityType,
Id AS NativeActivityId,
subject,
description,
coalesce(actualend, scheduledend, createdon) AS ActivityDate,
regardingobjectid,
regardingobjectid_entitytype,
createdon,
modifiedon,
IsDelete,
SinkModifiedOn
FROM {bronze_lakehouse_name}.email
WHERE Id IS NOT NULL
UNION ALL
SELECT
concat('phonecall:', cast(Id as string)) AS StageId,
'Phonecall' AS ActivityType,
Id AS NativeActivityId,
subject,
description,
coalesce(actualend, actualstart, createdon) AS ActivityDate,
regardingobjectid,
regardingobjectid_entitytype,
createdon,
modifiedon,
IsDelete,
SinkModifiedOn
FROM {bronze_lakehouse_name}.phonecall
WHERE Id IS NOT NULL
""")
Transform and sync the new silver table
Create a transform function that enriches both activity types and then syncs them into the new table.
from functools import reduce
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, lit
def transform_outreach_touchpoint(df: DataFrame) -> DataFrame:
with_campaign = resolve_campaign_for_activity(df)
email_channel = lookup_channel_by_name(CHANNEL_EMAIL)
phone_channel = lookup_channel_by_name(CHANNEL_PHONE_CALL)
email_rows = (
with_campaign
.filter(col("ActivityType") == lit("Email"))
.crossJoin(email_channel)
)
phone_rows = (
with_campaign
.filter(col("ActivityType") == lit("Phonecall"))
.crossJoin(phone_channel)
)
unioned = reduce(lambda left, right: left.unionByName(right), [email_rows, phone_rows])
return unioned.select(
col("StageId").alias("SourceSystemId"),
col("CampaignId"),
col("ChannelId"),
col("ActivityType"),
col("ActivityDate").cast("timestamp").alias("ActivityDate"),
col("subject").alias("Subject"),
col("description").alias("Description"),
col("createdon").cast("timestamp").alias("CreatedDate"),
col("modifiedon").cast("timestamp").alias("ModifiedDate")
)
data_sync.sync_table(
source_table="outreach_touchpoint_stage_union",
source_primary_key="StageId",
source_columns=[
"StageId", "ActivityType", "NativeActivityId", "subject", "description",
"ActivityDate", "regardingobjectid", "regardingobjectid_entitytype",
"createdon", "modifiedon"
],
target_table="OutreachTouchpoint",
target_primary_key="OutreachTouchpointId",
transform_func=transform_outreach_touchpoint,
source_table_lakehouse=""
)
Use source_table_lakehouse="" because the source is a temporary view rather than a lakehouse table.
Verify the result
- Open the
OutreachTouchpointtable. - Confirm that rows exist for both
EmailandPhonecallinActivityType. - Confirm that
SourceSystemIdvalues are prefixed, such asemail:<guid>andphonecall:<guid>. - Confirm that
ChannelIdis populated from the existing channel dimension. - Confirm that
CampaignIdis populated for campaign-related activities.
Considerations
- Keep only the columns that are shared across all source tables.
- Include
SinkModifiedOnandIsDeletein the staging view so incremental processing still works. - If source IDs can collide, prefix them before the union.
- If one side is missing a column, add a
NULLplaceholder and cast it to the shared type before the union.