Share via


Work with unstructured data in volumes

This page shows you how to store, query, and process unstructured data files using Unity Catalog volumes. You'll learn how to upload files, query metadata, process files with AI functions, apply access control, and share volumes with other organizations. Where possible, instructions for working through this tutorial using the Catalog Explorer UI have been included. If no Catalog Explorer option is shown, use the provided Python or SQL commands.

For a complete overview of volume capabilities and use cases, see What are Unity Catalog volumes?.

Requirements

  • A Azure Databricks workspace with Unity Catalog enabled.
  • CREATE CATALOG privilege on the metastore. See Create catalogs. If you can't create a catalog, ask your admin for access or use an existing catalog where you have the CREATE SCHEMA privilege.
  • Databricks Runtime 14.3 LTS and above.
  • For AI functions: A workspace in a supported region.
  • For Delta Sharing: CREATE SHARE and CREATE RECIPIENT privileges on the metastore. See Share data and AI assets securely.

Step 1: Create a volume

Create a catalog, schema, and volume to store your files. For detailed volume management instructions, see Create and manage Unity Catalog volumes.

Step 1.1: Create a catalog and schema

SQL

-- Create a catalog
CREATE CATALOG IF NOT EXISTS unstructured_data_lab;
USE CATALOG unstructured_data_lab;

-- Create a schema
CREATE SCHEMA IF NOT EXISTS raw;
USE SCHEMA raw;

Python

spark.sql("CREATE CATALOG IF NOT EXISTS unstructured_data_lab")
spark.sql("USE CATALOG unstructured_data_lab")
spark.sql("CREATE SCHEMA IF NOT EXISTS raw")
spark.sql("USE SCHEMA raw")

Catalog Explorer

  1. Click Data icon. Catalog in the sidebar.
  2. Click Create > Create a catalog.
  3. Enter unstructured_data_lab as the Catalog name.
  4. Click Create.
  5. Click View catalog.

On the catalog page:

  1. Click Create schema.
  2. Enter raw as the Schema name.
  3. Click Create.

Step 1.2: Create a managed volume

SQL

CREATE VOLUME IF NOT EXISTS files_volume
COMMENT 'Volume for storing unstructured data files';

Python

spark.sql("""
    CREATE VOLUME IF NOT EXISTS files_volume
    COMMENT 'Volume for storing unstructured data files'
""")

Catalog Explorer

On the schema page:

  1. Click Create > Volume.
  2. Enter files_volume as the Volume name.
  3. Verify that Managed volume is selected.
  4. Click Create.

Step 2: Upload files

Upload files to your volume. For comprehensive file management examples, see Work with files in Unity Catalog volumes.

Step 2.1: Upload files

You can use examples from databricks-datasets for this tutorial, or upload your own files using the Catalog Explorer UI.

Note

You can use the Python commands to copy files from databricks-datasets to your volume even if you're not familiar with Python. See Manage notebooks for instructions on running commands in notebooks.

Python

# Upload a single image file
dbutils.fs.cp(
    "dbfs:/databricks-datasets/flower_photos/roses/10090824183_d02c613f10_m.jpg",
    "/Volumes/unstructured_data_lab/raw/files_volume/rose.jpg"
)

# Upload a single PDF file
dbutils.fs.cp(
    "dbfs:/databricks-datasets/COVID/CORD-19/2020-03-13/COVID.DATA.LIC.AGMT.pdf",
    "/Volumes/unstructured_data_lab/raw/files_volume/covid.pdf"
)

# Upload a directory
local_dir = "dbfs:/databricks-datasets/samples/data/mllib"
volume_path = "/Volumes/unstructured_data_lab/raw/files_volume/sample_files"

for file_info in dbutils.fs.ls(local_dir):
    source = file_info.path
    dest = f"{volume_path}/{file_info.name}"
    dbutils.fs.cp(source, dest, recurse=True)
    print(f"Uploaded: {file_info.name}")

Catalog Explorer

The Python code in the Python tab uploads two files (a JPG and a PDF) and a directory that includes .txt and .csv files. To upload files using Catalog Explorer:

  1. From the volume page, click Upload to this volume.
  2. In the Upload files dialog, under Files, click browse or drag and drop files into the drop zone.
  3. Under Destination volume, verify that the volume you created in the previous step is selected.

Step 2.2: Verify the upload

SQL

LIST '/Volumes/unstructured_data_lab/raw/files_volume/';

Python

files = dbutils.fs.ls("/Volumes/unstructured_data_lab/raw/files_volume/")
for f in files:
    print(f"{f.name}\t{f.size} bytes")

Catalog Explorer

When files are uploaded, they appear on the volume page. Click a file name to see a preview, or click a directory to view individual files.

Alternative: Use the %fs magic command

Use the %fs magic command:

%fs ls /Volumes/unstructured_data_lab/raw/files_volume/

Step 3: Query file metadata

Query file information to understand what's in your volume. For more querying patterns, see List and query files in volumes with SQL.

Step 3.1: Show file metadata

SQL

SELECT
  path,
  _metadata.file_name,
  _metadata.file_size,
  _metadata.file_modification_time
FROM read_files(
  '/Volumes/unstructured_data_lab/raw/files_volume/',
  format => 'binaryFile'
);

Python

df = (
    spark.read
    .format("binaryFile")
    .option("recursiveFileLookup", "true")
    .load("/Volumes/unstructured_data_lab/raw/files_volume/")
)

df.select("path", "modificationTime", "length").show(truncate=False)

Catalog Explorer

The volume page in Catalog Explorer shows each file's Name (including the extension), Size, and Last modified date.

Step 4: Query and process files

Use Azure Databricks AI functions to extract content from documents and analyze images. For a complete overview of AI function capabilities, see Apply AI on data using Azure Databricks AI Functions.

Note

AI functions require a workspace in a supported region. See Apply AI on data using Azure Databricks AI Functions.

If you don't have access to AI functions, use standard Python libraries instead. Expand the Alternative sections below for examples.

Step 4.1: Parse documents

SQL

SELECT
  path AS file_path,
  ai_parse_document(content, map('version', '2.0')) AS parsed_content
FROM read_files(
  '/Volumes/unstructured_data_lab/raw/files_volume/',
  format => 'binaryFile',
  fileNamePattern => '*.pdf'
);

Python

result_df = spark.sql("""
    SELECT
      path AS file_path,
      ai_parse_document(content, map('version', '2.0')) AS parsed_content
    FROM read_files(
      '/Volumes/unstructured_data_lab/raw/files_volume/',
      format => 'binaryFile',
      fileNamePattern => '*.pdf'
    )
""")
display(result_df)
Alternative: Parse PDFs without AI functions

If AI functions aren't available in your region, use Python libraries:

%pip install PyPDF2

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from PyPDF2 import PdfReader
import io

@udf(returnType=StringType())
def extract_pdf_text(content):
    if content is None:
        return None
    try:
        reader = PdfReader(io.BytesIO(content))
        return "\n".join(page.extract_text() or "" for page in reader.pages)
    except Exception as e:
        return f"Error: {str(e)}"

df = spark.read.format("binaryFile") \
    .option("pathGlobFilter", "*.pdf") \
    .load("/Volumes/unstructured_data_lab/raw/files_volume/")

result_df = df.withColumn("text_content", extract_pdf_text("content"))
display(result_df.select("path", "text_content"))

Step 4.2: Analyze images

SQL

SELECT
  path,
  ai_query(
    'databricks-llama-4-maverick',
    'Describe this image in one sentence:',
    files => content
  ) AS description
FROM read_files(
  '/Volumes/unstructured_data_lab/raw/files_volume/',
  format => 'binaryFile',
  fileNamePattern => '*.{jpg,jpeg,png}'
)
WHERE _metadata.file_size < 5000000;

Python

result_df = spark.sql("""
    SELECT
      path,
      ai_query(
        'databricks-llama-4-maverick',
        'Describe this image in one sentence:',
        files => content
      ) AS description
    FROM read_files(
      '/Volumes/unstructured_data_lab/raw/files_volume/',
      format => 'binaryFile',
      fileNamePattern => '*.{jpg,jpeg,png}'
    )
    WHERE _metadata.file_size < 5000000
""")
display(result_df)
Alternative: Extract image metadata without AI functions

To extract image metadata without AI functions:

%pip install pillow

from pyspark.sql.functions import udf
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from PIL import Image
import io

image_schema = StructType([
    StructField("width", IntegerType()),
    StructField("height", IntegerType()),
    StructField("format", StringType())
])

@udf(returnType=image_schema)
def get_image_info(content):
    if content is None:
        return None
    try:
        img = Image.open(io.BytesIO(content))
        return {"width": img.width, "height": img.height, "format": img.format}
    except:
        return None

df = spark.read.format("binaryFile") \
    .option("pathGlobFilter", "*.{jpg,jpeg,png}") \
    .load("/Volumes/unstructured_data_lab/raw/files_volume/")

result_df = df.withColumn("image_info", get_image_info("content"))
display(result_df.select("path", "image_info.*"))

Step 4.3: Filter and analyze by filename

This example filters for image files with the substring "rose" in their filename.

SQL

SELECT
  path AS file_path,
  ai_query(
    'databricks-llama-4-maverick',
    'Describe this image in one sentence:',
    files => content
  ) AS description
FROM read_files(
  '/Volumes/unstructured_data_lab/raw/files_volume/',
  format => 'binaryFile',
  fileNamePattern => '*.{jpg,jpeg,png}'
)
WHERE _metadata.file_name ILIKE '%rose%';

Python

result_df = spark.sql("""
    SELECT
      path AS file_path,
      ai_query(
        'databricks-llama-4-maverick',
        'Describe this image in one sentence:',
        files => content
      ) AS description
    FROM read_files(
      '/Volumes/unstructured_data_lab/raw/files_volume/',
      format => 'binaryFile',
      fileNamePattern => '*.{jpg,jpeg,png}'
    )
    WHERE _metadata.file_name ILIKE '%rose%'
""")
display(result_df)

Step 4.4: Join files with structured tables

This example uses row numbers to pair files with taxi trips for demonstration purposes. In production, join on meaningful business keys.

SQL

-- This example demonstrates joining file metadata with structured data
-- by pairing files with taxi trips using row numbers
WITH files_with_row AS (
  SELECT
    path,
    SPLIT(path, '/')[SIZE(SPLIT(path, '/')) - 1] AS file_name,
    length,
    ROW_NUMBER() OVER (ORDER BY path) AS file_row
  FROM read_files(
    '/Volumes/unstructured_data_lab/raw/files_volume/',
    format => 'binaryFile'
  )
),
trips_with_row AS (
  SELECT
    tpep_pickup_datetime,
    pickup_zip,
    dropoff_zip,
    fare_amount,
    ROW_NUMBER() OVER (ORDER BY tpep_pickup_datetime) AS trip_row
  FROM samples.nyctaxi.trips
  WHERE pickup_zip IS NOT NULL
  LIMIT 5
)
SELECT
  f.path,
  f.file_name,
  f.length,
  t.pickup_zip,
  t.dropoff_zip,
  t.fare_amount,
  t.tpep_pickup_datetime
FROM files_with_row f
INNER JOIN trips_with_row t ON f.file_row = t.trip_row;

Python

from pyspark.sql.functions import col, row_number, element_at, split
from pyspark.sql.window import Window

# Read files and add row numbers
files_df = spark.read.format("binaryFile") \
    .load("/Volumes/unstructured_data_lab/raw/files_volume/") \
    .withColumn("file_name", element_at(split(col("path"), "/"), -1))

files_with_row = files_df.alias("files") \
    .withColumn("file_row", row_number().over(Window.orderBy("path")))

# Get trips and add row numbers
trips_df = spark.table("samples.nyctaxi.trips") \
    .filter(col("pickup_zip").isNotNull()) \
    .limit(5)

trips_with_row = trips_df.alias("trips") \
    .withColumn("trip_row", row_number().over(Window.orderBy("tpep_pickup_datetime")))

# Join on row numbers
result_df = files_with_row \
    .join(trips_with_row, col("file_row") == col("trip_row"), "inner") \
    .select(
        "files.path",
        "files.file_name",
        "files.length",
        "trips.pickup_zip",
        "trips.dropoff_zip",
        "trips.fare_amount",
        "trips.tpep_pickup_datetime"
    )

display(result_df)

Step 5: Apply access control

Control who can read and write files in your volumes. To learn more about managing privileges in Unity Catalog, see Manage privileges in Unity Catalog.

Step 5.1: Grant access

SQL

-- Replace <user-or-group-name> with your workspace group or user name

-- Grant read access
GRANT READ VOLUME ON VOLUME unstructured_data_lab.raw.files_volume
TO `<user-or-group-name>`;

-- Grant read and write access
GRANT READ VOLUME, WRITE VOLUME ON VOLUME unstructured_data_lab.raw.files_volume
TO `<user-or-group-name>`;

-- Grant all privileges
GRANT ALL PRIVILEGES ON VOLUME unstructured_data_lab.raw.files_volume
TO `<user-or-group-name>`;

Python

# Replace <user-or-group-name> with your workspace group or user name
spark.sql("""
    GRANT READ VOLUME ON VOLUME unstructured_data_lab.raw.files_volume
    TO `<user-or-group-name>`
""")

spark.sql("""
    GRANT READ VOLUME, WRITE VOLUME ON VOLUME unstructured_data_lab.raw.files_volume
    TO `<user-or-group-name>`
""")

spark.sql("""
    GRANT ALL PRIVILEGES ON VOLUME unstructured_data_lab.raw.files_volume
    TO `<user-or-group-name>`
""")

Catalog Explorer

  1. Go to the Permissions tab on the volume page.
  2. Click Grant.
  3. Enter the email address for a user or the name of a group.
  4. Select the permissions to grant.
  5. Click Confirm.

Step 5.2: View current privileges

SQL

SHOW GRANTS ON VOLUME unstructured_data_lab.raw.files_volume;

Python

display(spark.sql("SHOW GRANTS ON VOLUME unstructured_data_lab.raw.files_volume"))

Catalog Explorer

The Permissions tab on the volume page shows which users and groups have access to the volume.

Step 6: Set up incremental ingestion

Use Auto Loader to automatically process new files as they arrive in your volume. This pattern is useful for continuous data ingestion workflows. For more ingestion patterns, see Common data loading patterns.

Step 6.1: Create a streaming table

SQL

CREATE OR REFRESH STREAMING TABLE document_ingestion
SCHEDULE EVERY 1 HOUR
AS SELECT
  path,
  modificationTime,
  length,
  content,
  _metadata,
  current_timestamp() AS ingestion_time
FROM STREAM(read_files(
  '/Volumes/unstructured_data_lab/raw/files_volume/incoming/',
  format => 'binaryFile'
));

Python

from pyspark.sql.functions import current_timestamp, col

dbutils.fs.mkdirs("/Volumes/unstructured_data_lab/raw/files_volume/incoming/")

df = spark.readStream.format("cloudFiles") \
    .option("cloudFiles.format", "binaryFile") \
    .option("pathGlobFilter", "*.pdf") \
    .load("/Volumes/unstructured_data_lab/raw/files_volume/incoming/")

df_enriched = df \
    .withColumn("ingestion_time", current_timestamp()) \
    .withColumn("source_file", col("_metadata.file_path"))

query = df_enriched.writeStream \
    .option("checkpointLocation",
            "/Volumes/unstructured_data_lab/raw/files_volume/_checkpoints/docs") \
    .trigger(availableNow=True) \
    .toTable("document_ingestion")

query.awaitTermination()

Step 7: Share files with Delta Sharing

Share volumes securely with users in other organizations using Delta Sharing. You must create a recipient before sharing. A recipient represents an external organization or user who can access your shared data. See Create and manage data recipients for Delta Sharing (Databricks-to-Databricks sharing) for recipient setup.

Step 7.1: Create and configure a share

SQL

-- Create a share
CREATE SHARE IF NOT EXISTS unstructured_data_share
COMMENT 'Document files for partners';

-- Add the volume
ALTER SHARE unstructured_data_share
ADD VOLUME unstructured_data_lab.raw.files_volume;

-- Create a recipient
CREATE RECIPIENT IF NOT EXISTS <partner_org>
USING ID '<recipient-sharing-identifier>';

-- Grant access
GRANT SELECT ON SHARE unstructured_data_share
TO RECIPIENT <partner_org>;

Python

spark.sql("""
    CREATE SHARE IF NOT EXISTS unstructured_data_share
    COMMENT 'Document files for partners'
""")

spark.sql("""
    ALTER SHARE unstructured_data_share
    ADD VOLUME unstructured_data_lab.raw.files_volume
""")

spark.sql("""
    CREATE RECIPIENT IF NOT EXISTS <partner_org>
    USING ID '<recipient-sharing-identifier>'
""")

spark.sql("""
    GRANT SELECT ON SHARE unstructured_data_share
    TO RECIPIENT <partner_org>
""")

Step 7.2: Access shared data (as recipient)

SQL

-- View available shares
SHOW SHARES IN PROVIDER <provider_name>;

-- Create a catalog from the share
CREATE CATALOG IF NOT EXISTS shared_documents
FROM SHARE <provider_name>.unstructured_data_share;

-- Query shared files
SELECT * EXCEPT (content), _metadata
FROM read_files(
  '/Volumes/shared_documents/raw/files_volume/',
  format => 'binaryFile'
)
LIMIT 10;

Python

spark.sql("SHOW SHARES IN PROVIDER <provider_name>").show()

spark.sql("""
    CREATE CATALOG IF NOT EXISTS shared_documents
    FROM SHARE <provider_name>.unstructured_data_share
""")

df = spark.read.format("binaryFile") \
    .load("/Volumes/shared_documents/raw/files_volume/")

df.select("path", "modificationTime", "length").show(10)

Step 8: Clean up files

Remove files when they're no longer needed.

Python

# Delete a single file
dbutils.fs.rm("/Volumes/unstructured_data_lab/raw/files_volume/covid.pdf")

# Delete a directory recursively
dbutils.fs.rm("/Volumes/unstructured_data_lab/raw/files_volume/sample_files/", recurse=True)

CLI

# Delete a single file
databricks fs rm dbfs:/Volumes/unstructured_data_lab/raw/files_volume/covid.pdf

# Delete a directory recursively
databricks fs rm -r dbfs:/Volumes/unstructured_data_lab/raw/files_volume/sample_files/
Alternative: Use standard Python
import os
os.remove("/Volumes/unstructured_data_lab/raw/files_volume/covid.pdf")

import shutil
shutil.rmtree("/Volumes/unstructured_data_lab/raw/files_volume/sample_files/")

Next steps

Continue learning about volumes

SQL function references