You can configure ADF pipelines with the required activities and that the pipeline names start with "pip_premium_" for the pipelines you want to log.
Enable diagnostics logging in ADF to capture pipeline run status and configure it to send events to Azure Event Grid when pipelines complete.
Then you need to create an Event Grid Topic to receive events from ADF.
Create an Event Hub to capture the events from Event Grid and set up an Event Subscription in to forward events to Event Hubs.
In Azure Databricks cluster, set up a cluster and configure Databricks to consume events from Azure Event Hubs using Spark Structured Streaming.
You can write a Databricks notebook or job to process the incoming events where filter events to capture only those with pipeline names starting with "pip_premium_".
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType
# Define schema
schema = StructType([
StructField("run_name", StringType(), True),
StructField("params", StringType(), True),
StructField("run_id", LongType(), True),
StructField("run_duration", DoubleType(), True),
StructField("start_time", StringType(), True),
StructField("end_time", StringType(), True),
StructField("state_message_error", StringType(), True),
StructField("state", StringType(), True)
])
# Read from Event Hub
event_hub_connection_string = "your_event_hub_connection_string"
event_hub_name = "your_event_hub_name"
df = (spark.readStream
.format("eventhubs")
.option("eventhubs.connectionString", event_hub_connection_string)
.load())
# Parse and filter events
df_parsed = df.select(from_json(col("body").cast("string"), schema).alias("data")).select("data.*")
df_filtered = df_parsed.filter(col("run_name").startswith("pip_premium_"))
# Write to Delta Lake
(df_filtered.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/path/to/checkpoint/dir")
.start("/path/to/delta/table"))
For retention policy, use Delta Lake time travel features to manage data retention and schedule a Databricks job to vacuum old data periodically :
VACUUM delta.`/path/to/delta/table` RETAIN 72 HOURS;