Workflow that logs the completion of certain pipelines into a table

Hanna 220 Reputation points
2024-07-18T18:08:14.79+00:00

I'm having a lot of difficulty implementing the solutions I had in mind. Previously, I asked for help to create an architecture that would be efficient, easy to maintain, and cost-effective. I received various suggestions, but I can't decide which one to use.

The goal is to create a workflow that logs the completion of certain pipelines into a table. This table should have the following schema:

Column Type
run_name string
params string
run_id bigint
run_duration double
start_time string
end_time string
state_message_error string
state string

My biggest challenge is deciding if this monitoring should be in real-time, which might be more expensive. I need to process around 180 executions per day, and I only want to log pipelines that have the name prefix "pip_premium_".

In summary:

I need to capture the logs of completed pipelines with names starting with "pip_premium_", and insert them into a pre-created Delta table with a specified schema. Additionally, I want this table to have a retention period of 2 to 3 days, as it will be used for monitoring purposes only.

I want to use Azure tools exclusively and ensure that the architecture is clear and understandable to anyone.

Can you help me design this architecture? I would greatly appreciate it!

Azure Monitor
Azure Monitor
An Azure service that is used to collect, analyze, and act on telemetry data from Azure and on-premises environments.
3,033 questions
Azure Event Hubs
Azure Event Hubs
An Azure real-time data ingestion service.
600 questions
Azure Databricks
Azure Databricks
An Apache Spark-based analytics platform optimized for Azure.
2,075 questions
Azure Data Factory
Azure Data Factory
An Azure service for ingesting, preparing, and transforming data at scale.
10,178 questions
Azure Event Grid
Azure Event Grid
An Azure event routing service designed for high availability, consistent performance, and dynamic scale.
353 questions
0 comments No comments
{count} votes

1 answer

Sort by: Most helpful
  1. Amira Bedhiafi 19,946 Reputation points
    2024-07-19T19:35:43.0366667+00:00

    You can configure ADF pipelines with the required activities and that the pipeline names start with "pip_premium_" for the pipelines you want to log.

    Enable diagnostics logging in ADF to capture pipeline run status and configure it to send events to Azure Event Grid when pipelines complete.

    Then you need to create an Event Grid Topic to receive events from ADF.

    Create an Event Hub to capture the events from Event Grid and set up an Event Subscription in to forward events to Event Hubs.

    In Azure Databricks cluster, set up a cluster and configure Databricks to consume events from Azure Event Hubs using Spark Structured Streaming.

    You can write a Databricks notebook or job to process the incoming events where filter events to capture only those with pipeline names starting with "pip_premium_".

    from pyspark.sql.functions import col, from_json
    from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType
    # Define schema
    schema = StructType([
        StructField("run_name", StringType(), True),
        StructField("params", StringType(), True),
        StructField("run_id", LongType(), True),
        StructField("run_duration", DoubleType(), True),
        StructField("start_time", StringType(), True),
        StructField("end_time", StringType(), True),
        StructField("state_message_error", StringType(), True),
        StructField("state", StringType(), True)
    ])
    # Read from Event Hub
    event_hub_connection_string = "your_event_hub_connection_string"
    event_hub_name = "your_event_hub_name"
    df = (spark.readStream
          .format("eventhubs")
          .option("eventhubs.connectionString", event_hub_connection_string)
          .load())
    # Parse and filter events
    df_parsed = df.select(from_json(col("body").cast("string"), schema).alias("data")).select("data.*")
    df_filtered = df_parsed.filter(col("run_name").startswith("pip_premium_"))
    # Write to Delta Lake
    (df_filtered.writeStream
     .format("delta")
     .outputMode("append")
     .option("checkpointLocation", "/path/to/checkpoint/dir")
     .start("/path/to/delta/table"))
    

    For retention policy, use Delta Lake time travel features to manage data retention and schedule a Databricks job to vacuum old data periodically :

    VACUUM delta.`/path/to/delta/table` RETAIN 72 HOURS;
    
    0 comments No comments