Ingest data from RabbitMQ

Important

This feature is in Beta. Workspace admins can control access to this feature from the Previews page. See Manage Azure Databricks previews.

This page shows how to create a managed RabbitMQ ingestion pipeline using Databricks Lakeflow Connect.

Requirements

  • To create an ingestion pipeline, you must first meet the following requirements:

    • Your workspace must be enabled for Unity Catalog.

    • Serverless compute must be enabled for your workspace. See Serverless compute requirements.

    • If you plan to create a new connection: You must have CREATE CONNECTION privileges on the metastore. See Manage privileges in Unity Catalog.

      If the connector supports UI-based pipeline authoring, an admin can create the connection and the pipeline at the same time by completing the steps on this page. However, if the users who create pipelines use API-based pipeline authoring or are non-admin users, an admin must first create the connection in Catalog Explorer. See Connect to managed ingestion sources.

    • If you plan to use an existing connection: You must have USE CONNECTION privileges or ALL PRIVILEGES on the connection object.

    • You must have USE CATALOG privileges on the target catalog.

    • You must have USE SCHEMA and CREATE TABLE privileges on an existing schema or CREATE SCHEMA privileges on the target catalog.

  • To ingest from RabbitMQ, you must first complete the steps in Connect to RabbitMQ.

Create an ingestion pipeline

Each RabbitMQ classic queue is ingested into a streaming table. For a list of supported data and limitations, see Supported data.

Note

UI-based pipeline authoring is not supported for the RabbitMQ connector in Beta. Use Declarative Automation Bundles or a Azure Databricks notebook to create your pipeline.

Declarative Automation Bundles

Use Declarative Automation Bundles to manage RabbitMQ pipelines as code. Bundles can contain YAML definitions of jobs and tasks, are managed using the Databricks CLI, and can be shared and run in different target workspaces (such as development, staging, and production). For more information, see What are Declarative Automation Bundles?.

  1. Create a new bundle using the Databricks CLI:

    databricks bundle init
    
  2. Add a pipeline definition file to the bundle (for example, resources/rabbitmq_pipeline.yml). See pipeline.ingestion_definition and Examples.

  3. Deploy the bundle using the Databricks CLI:

    databricks bundle deploy
    

Databricks notebook

  1. Import the following notebook into your Azure Databricks workspace:

    Get notebook

  2. Leave cell one as-is. Do not modify the channel field — it must remain PREVIEW.

  3. Modify the cell with your pipeline configuration details, including your queue name. See pipeline.ingestion_definition and Examples.

  4. Click Run all.

Examples

Use these examples to configure your pipeline.

Pipeline with single queue

This example ingests a single RabbitMQ classic queue into a streaming table:

Declarative Automation Bundles

variables:
  connection_name:
    default: my-rabbitmq-connection
  dest_catalog:
    default: main
  dest_schema:
    default: rabbitmq_ingest
resources:
  pipelines:
    rabbitmq_pipeline:
      name: rabbitmq-ingestion-pipeline
      serverless: true
      continuous: true
      channel: PREVIEW
      catalog: ${var.dest_catalog}
      target: ${var.dest_schema}
      ingestion_definition:
        connection_name: ${var.connection_name}
        objects:
          - table:
              destination_catalog: ${var.dest_catalog}
              destination_schema: ${var.dest_schema}
              destination_table: orders
              connector_options:
                rabbitmq_options:
                  queue: orders

Pipeline with multiple queues

This example ingests two RabbitMQ classic queues, each into its own destination table. Define one table entry per queue:

Declarative Automation Bundles

variables:
  connection_name:
    default: my-rabbitmq-connection
  dest_catalog:
    default: main
  dest_schema:
    default: rabbitmq_ingest
resources:
  pipelines:
    rabbitmq_pipeline:
      name: rabbitmq-ingestion-pipeline
      serverless: true
      continuous: true
      channel: PREVIEW
      catalog: ${var.dest_catalog}
      target: ${var.dest_schema}
      ingestion_definition:
        connection_name: ${var.connection_name}
        objects:
          - table:
              destination_catalog: ${var.dest_catalog}
              destination_schema: ${var.dest_schema}
              destination_table: orders
              connector_options:
                rabbitmq_options:
                  queue: orders
          - table:
              destination_catalog: ${var.dest_catalog}
              destination_schema: ${var.dest_schema}
              destination_table: shipments
              connector_options:
                rabbitmq_options:
                  queue: shipments

Common patterns

For advanced pipeline configurations, see Common patterns for managed ingestion pipelines.

Next steps

The RabbitMQ connector runs continuous pipelines. Next, start your pipeline and set alerts on it. See Common pipeline maintenance tasks.

Additional resources