Share via


DataverseClient Class

High-level client for Microsoft Dataverse operations.

This client provides a simple, stable interface for interacting with Dataverse environments through the Web API. It handles authentication via Azure Identity and delegates HTTP operations to an internal <xref:PowerPlatform.Dataverse.data._odata._ODataClient>.

Key capabilities:

  • OData CRUD operations: create, read, update, delete records

  • SQL queries: execute read-only SQL via Web API ?sql parameter

  • Table metadata: create, inspect, and delete custom tables; create and delete columns

  • File uploads: upload files to file columns with chunking support

Note

The client lazily initializes its internal OData client on first use, allowing lightweight construction without immediate network calls.

Constructor

DataverseClient(base_url: str, credential: TokenCredential, config: DataverseConfig | None = None)

Parameters

Name Description
base_url
Required
str

Your Dataverse environment URL, for example "https://org.crm.dynamics.com". Trailing slash is automatically removed.

credential
Required

Azure Identity credential for authentication.

config

Optional configuration for language, timeouts, and retries. If not provided, defaults are loaded from from_env.

Default value: None

Examples

Create a client and perform basic operations:


   from azure.identity import InteractiveBrowserCredential
   from PowerPlatform.Dataverse.client import DataverseClient

   credential = InteractiveBrowserCredential()
   client = DataverseClient(
       "https://org.crm.dynamics.com",
       credential
   )

   # Create a record
   record_ids = client.create("account", {"name": "Contoso Ltd"})
   print(f"Created account: {record_ids[0]}")

   # Update a record
   client.update("account", record_ids[0], {"telephone1": "555-0100"})

   # Query records
   for batch in client.get("account", filter="name eq 'Contoso Ltd'"):
       for account in batch:
           print(account["name"])

   # Delete a record
   client.delete("account", record_ids[0])

Methods

create

Create one or more records by table name.

create_columns

Create one or more columns on an existing table using a schema-style mapping.

create_table

Create a simple custom table with specified columns.

delete

Delete one or more records by GUID.

delete_columns

Delete one or more columns from a table.

delete_table

Delete a custom table by name.

Warning

This operation is irreversible and will delete all records in the table along

with the table definition. Use with caution.

flush_cache

Flush cached client metadata or state.

get

Fetch a single record by ID or query multiple records.

When record_id is provided, returns a single record dictionary. When record_id is None, returns a generator yielding batches of records.

get_table_info

Get basic metadata for a table if it exists.

list_tables

List all custom tables in the Dataverse environment.

query_sql

Execute a read-only SQL query using the Dataverse Web API ?sql capability.

The SQL query must follow the supported subset: a single SELECT statement with optional WHERE, TOP (integer literal), ORDER BY (column names only), and a simple table alias after FROM.

Note

The SQL support is limited to read-only queries. Complex joins, subqueries, and certain SQL functions may not be supported. Consult the Dataverse documentation for the current feature set.

update

Update one or more records.

This method supports three usage patterns:

  1. Single record update: update("account", "guid", {"name": "New Name"})

  2. Broadcast update: update("account", [id1, id2], {"status": 1}) - applies same changes to all IDs

  3. Paired updates: update("account", [id1, id2], [changes1, changes2]) - one-to-one mapping

Note

Single updates discard the response representation for better performance. For broadcast or paired updates, the method delegates to the internal client's batch update logic.

upload_file

Upload a file to a Dataverse file column.

Note

Large files are automatically chunked to avoid request size limits. The chunk mode performs multiple requests with resumable upload support.

create

Create one or more records by table name.

create(table_schema_name: str, records: Dict[str, Any] | List[Dict[str, Any]]) -> List[str]

Parameters

Name Description
table_schema_name
Required
str

Schema name of the table (e.g. "account", "contact", or "new_MyTestTable").

records
Required
dict or list of dict

A single record dictionary or a list of record dictionaries. Each dictionary should contain column schema names as keys.

Returns

Type Description
list <xref:of> str

List of created record GUIDs. Returns a single-element list for a single input.

Exceptions

Type Description

If records is not a dict or list[dict], or if the internal client returns an unexpected type.

Examples

Create a single record:


   client = DataverseClient(base_url, credential)
   ids = client.create("account", {"name": "Contoso"})
   print(f"Created: {ids[0]}")

Create multiple records:


   records = [
       {"name": "Contoso"},
       {"name": "Fabrikam"}
   ]
   ids = client.create("account", records)
   print(f"Created {len(ids)} accounts")

create_columns

Create one or more columns on an existing table using a schema-style mapping.

create_columns(table_schema_name: str, columns: Dict[str, Any]) -> List[str]

Parameters

Name Description
table_schema_name
Required
str

Schema name of the table (e.g. "new_MyTestTable").

columns
Required
dict mapping str to Any

Mapping of column schema names (with customization prefix value) to supported types. All custom column names must include the customization prefix value** (e.g. "new_Notes"). Primitive types include string, int, decimal, float, datetime, and bool. Enum subclasses (IntEnum preferred) generate a local option set and can specify localized labels via __labels__.

Returns

Type Description
list <xref:of> str

Schema names for the columns that were created.

Examples

Create two columns on the custom table:


   created = client.create_columns(
       "new_MyTestTable",
       {
           "new_Scratch": "string",
           "new_Flags": "bool",
       },
   )
   print(created)  # ['new_Scratch', 'new_Flags']

create_table

Create a simple custom table with specified columns.

create_table(table_schema_name: str, columns: Dict[str, Any], solution_unique_name: str | None = None, primary_column_schema_name: str | None = None) -> Dict[str, Any]

Parameters

Name Description
table_schema_name
Required
str

Schema name of the table with customization prefix value (e.g. "new_MyTestTable").

columns
Required
dict mapping str to Any

Dictionary mapping column names (with customization prefix value) to their types. All custom column names must include the customization prefix value (e.g. "new_Title"). Supported types:

  • Primitive types: "string", "int", "decimal", "float", "datetime", "bool"

  • Enum subclass (IntEnum preferred): Creates a local option set. Optional multilingual labels can be provided via __labels__ class attribute, defined inside the Enum subclass:

    
       class ItemStatus(IntEnum):
           ACTIVE = 1
           INACTIVE = 2
           __labels__ = {
               1033: {"Active": "Active", "Inactive": "Inactive"},
               1036: {"Active": "Actif", "Inactive": "Inactif"}
           }
    
solution_unique_name
str

Optional solution unique name that should own the new table. When omitted the table is created in the default solution.

Default value: None
primary_column_schema_name
str

Optional primary name column schema name with customization prefix value (e.g. "new_MyTestTable"). If not provided, defaults to "{customization prefix value}_Name".

Default value: None

Returns

Type Description

Dictionary containing table metadata including table_schema_name, entity_set_name, table_logical_name, metadata_id, and columns_created.

Exceptions

Type Description

If table creation fails or the schema is invalid.

Examples

Create a table with simple columns:


   from enum import IntEnum

   class ItemStatus(IntEnum):
       ACTIVE = 1
       INACTIVE = 2

   columns = {
       "new_Title": "string",      # Note: includes 'new_' customization prefix value
       "new_Quantity": "int",
       "new_Price": "decimal",
       "new_Available": "bool",
       "new_Status": ItemStatus
   }

   result = client.create_table("new_MyTestTable", columns)
   print(f"Created table: {result['table_schema_name']}")
   print(f"Columns: {result['columns_created']}")

Create a table with a custom primary column name:


   result = client.create_table(
       "new_Product",
       {"new_Price": "decimal"},
       primary_column_schema_name="new_ProductName"
   )

delete

Delete one or more records by GUID.

delete(table_schema_name: str, ids: str | List[str], use_bulk_delete: bool = True) -> str | None

Parameters

Name Description
table_schema_name
Required
str

Schema name of the table (e.g. "account" or "new_MyTestTable").

ids
Required
str or list of str

Single GUID string or list of GUID strings to delete.

use_bulk_delete

When True (default) and ids is a list, execute the BulkDelete action and return its async job identifier. When False each record is deleted sequentially.

Default value: True

Returns

Type Description
str,

BulkDelete job ID when deleting multiple records via BulkDelete; otherwise None.

Exceptions

Type Description

If ids is not str or list[str].

If the underlying Web API delete request fails.

Examples

Delete a single record:


   client.delete("account", account_id)

Delete multiple records:


   job_id = client.delete("account", [id1, id2, id3])

delete_columns

Delete one or more columns from a table.

delete_columns(table_schema_name: str, columns: str | List[str]) -> List[str]

Parameters

Name Description
table_schema_name
Required
str

Schema name of the table (e.g. "new_MyTestTable").

columns
Required
str or list of str

Column name or list of column names to remove. Must include customization prefix value (e.g. "new_TestColumn").

Returns

Type Description
list <xref:of> str

Schema names for the columns that were removed.

Examples

Remove two custom columns by schema name:

removed = client.delete_columns( "new_MyTestTable", ["new_Scratch", "new_Flags"],

) print(removed) # ['new_Scratch', 'new_Flags']

delete_table

Delete a custom table by name.

Warning

This operation is irreversible and will delete all records in the table along

with the table definition. Use with caution.

delete_table(table_schema_name: str) -> None

Parameters

Name Description
table_schema_name
Required
str

Schema name of the table (e.g. "new_MyTestTable" or "account").

Exceptions

Type Description

If the table does not exist or deletion fails.

Examples

Delete a custom table:


   client.delete_table("new_MyTestTable")

flush_cache

Flush cached client metadata or state.

flush_cache(kind) -> int

Parameters

Name Description
kind
Required
str

Cache kind to flush. Currently supported values:

  • "picklist": Clears picklist label cache used for label-to-integer conversion

Future kinds (e.g. "entityset", "primaryid") may be added without breaking this signature.

Returns

Type Description
int

Number of cache entries removed.

Examples

Clear the picklist cache:


   removed = client.flush_cache("picklist")
   print(f"Cleared {removed} cached picklist entries")

get

Fetch a single record by ID or query multiple records.

When record_id is provided, returns a single record dictionary. When record_id is None, returns a generator yielding batches of records.

get(table_schema_name: str, record_id: str | None = None, select: List[str] | None = None, filter: str | None = None, orderby: List[str] | None = None, top: int | None = None, expand: List[str] | None = None, page_size: int | None = None) -> Dict[str, Any] | Iterable[List[Dict[str, Any]]]

Parameters

Name Description
table_schema_name
Required
str

Schema name of the table (e.g. "account" or "new_MyTestTable").

record_id
str

Optional GUID to fetch a specific record. If None, queries multiple records.

Default value: None
select
list of str

Optional list of attribute logical names to retrieve. Column names are case-insensitive and automatically lowercased (e.g. ["new_Title", "new_Amount"] becomes "new_title,new_amount").

Default value: None
filter
str

Optional OData filter string, e.g. "name eq 'Contoso'" or "new_quantity gt 5". Column names in filter expressions must use exact lowercase logical names (e.g. "new_quantity", not "new_Quantity"). The filter string is passed directly to the Dataverse Web API without transformation.

Default value: None
orderby
list of str

Optional list of attributes to sort by, e.g. ["name asc", "createdon desc"]. Column names are automatically lowercased.

Default value: None
top
int

Optional maximum number of records to return.

Default value: None
expand
list of str

Optional list of navigation properties to expand, e.g. ["primarycontactid"]. Navigation property names are case-sensitive and must match the server-defined names exactly. These are NOT automatically transformed. Consult entity metadata for correct casing.

Default value: None
page_size
int

Optional number of records per page for pagination.

Default value: None

Returns

Type Description
dict,
Iterable <xref:of> list <xref:of> dict

Single record dict if record_id is provided, otherwise a generator yielding lists of record dictionaries (one list per page).

Exceptions

Type Description

If record_id is provided but not a string.

Examples

Fetch a single record:


   record = client.get("account", record_id=account_id, select=["name", "telephone1"])
   print(record["name"])

Query multiple records with filtering (note: exact logical names in filter):


   for batch in client.get(
       "account",
       filter="statecode eq 0 and name eq 'Contoso'",  # Must use exact logical names (lower-case)
       select=["name", "telephone1"]
   ):
       for account in batch:
           print(account["name"])

Query with navigation property expansion (note: case-sensitive property name):


   for batch in client.get(
       "account",
       select=["name"],
       expand=["primarycontactid"],  # Case-sensitive! Check metadata for exact name
       filter="statecode eq 0"
   ):
       for account in batch:
           print(f"{account['name']} - Contact: {account.get('primarycontactid', {}).get('fullname')}")

Query with sorting and pagination:


   for batch in client.get(
       "account",
       orderby=["createdon desc"],
       top=100,
       page_size=50
   ):
       print(f"Batch size: {len(batch)}")

get_table_info

Get basic metadata for a table if it exists.

get_table_info(table_schema_name: str) -> Dict[str, Any] | None

Parameters

Name Description
table_schema_name
Required
str

Schema name of the table (e.g. "new_MyTestTable" or "account").

Returns

Type Description
dict,

Dictionary containing table metadata with keys table_schema_name, table_logical_name, entity_set_name, and metadata_id. Returns None if the table is not found.

Examples

Retrieve table metadata:


   info = client.get_table_info("new_MyTestTable")
   if info:
       print(f"Logical name: {info['table_logical_name']}")
       print(f"Entity set: {info['entity_set_name']}")

list_tables

List all custom tables in the Dataverse environment.

list_tables() -> list[str]

Returns

Type Description
list <xref:of> str

List of custom table names.

Examples

List all custom tables:


   tables = client.list_tables()
   for table in tables:
       print(table)

query_sql

Execute a read-only SQL query using the Dataverse Web API ?sql capability.

The SQL query must follow the supported subset: a single SELECT statement with optional WHERE, TOP (integer literal), ORDER BY (column names only), and a simple table alias after FROM.

Note

The SQL support is limited to read-only queries. Complex joins, subqueries, and certain SQL functions may not be supported. Consult the Dataverse documentation for the current feature set.

query_sql(sql: str)

Parameters

Name Description
sql
Required
str

Supported SQL SELECT statement.

Returns

Type Description
list <xref:of> dict

List of result row dictionaries. Returns an empty list if no rows match.

Exceptions

Type Description

If the SQL query uses unsupported syntax.

If the Web API returns an error.

Examples

Basic SQL query:


   sql = "SELECT TOP 10 accountid, name FROM account WHERE name LIKE 'C%' ORDER BY name"
   results = client.query_sql(sql)
   for row in results:
       print(row["name"])

Query with alias:


   sql = "SELECT a.name, a.telephone1 FROM account AS a WHERE a.statecode = 0"
   results = client.query_sql(sql)

update

Update one or more records.

This method supports three usage patterns:

  1. Single record update: update("account", "guid", {"name": "New Name"})

  2. Broadcast update: update("account", [id1, id2], {"status": 1}) - applies same changes to all IDs

  3. Paired updates: update("account", [id1, id2], [changes1, changes2]) - one-to-one mapping

Note

Single updates discard the response representation for better performance. For broadcast or paired updates, the method delegates to the internal client's batch update logic.

update(table_schema_name: str, ids: str | List[str], changes: Dict[str, Any] | List[Dict[str, Any]]) -> None

Parameters

Name Description
table_schema_name
Required
str

Schema name of the table (e.g. "account" or "new_MyTestTable").

ids
Required
str or list of str

Single GUID string or list of GUID strings to update.

changes
Required
dict or list of dict

Dictionary of changes for single/broadcast mode, or list of dictionaries for paired mode. When ids is a list and changes is a single dict, the same changes are broadcast to all records. When both are lists, they must have equal length for one-to-one mapping.

Exceptions

Type Description

If ids is not str or list[str], or if changes type doesn't match usage pattern.

Examples

Single record update:


   client.update("account", account_id, {"telephone1": "555-0100"})

Broadcast same changes to multiple records:


   client.update("account", [id1, id2, id3], {"statecode": 1})

Update multiple records with different values:


   ids = [id1, id2]
   changes = [
       {"name": "Updated Name 1"},
       {"name": "Updated Name 2"}
   ]
   client.update("account", ids, changes)

upload_file

Upload a file to a Dataverse file column.

Note

Large files are automatically chunked to avoid request size limits. The chunk mode performs multiple requests with resumable upload support.

upload_file(table_schema_name: str, record_id: str, file_name_attribute: str, path: str, mode: str | None = None, mime_type: str | None = None, if_none_match: bool = True) -> None

Parameters

Name Description
table_schema_name
Required
str

Schema name of the table, e.g. "account" or "new_MyTestTable".

record_id
Required
str

GUID of the target record.

file_name_attribute
Required
str

Logical name of the file column attribute.

path
Required
str

Local filesystem path to the file. The stored filename will be the basename of this path.

mode
str

Upload strategy: "auto" (default), "small", or "chunk". Auto mode selects small or chunked upload based on file size.

Default value: None
mime_type
str

Explicit MIME type to store with the file (e.g. "application/pdf"). If not provided, the MIME type may be inferred from the file extension.

Default value: None
if_none_match

When True (default), sends If-None-Match: null header to only succeed if the column is currently empty. Set False to always overwrite using If-Match: *. Used for small and chunk modes only.

Default value: True

Exceptions

Type Description

If the upload fails or the file column is not empty when if_none_match=True

If the specified file path does not exist.

Examples

Upload a PDF file:


   client.upload_file(
       table_schema_name="account",
       record_id=account_id,
       file_name_attribute="new_contract",
       path="/path/to/contract.pdf",
       mime_type="application/pdf"
   )

Upload with auto mode selection:


   client.upload_file(
       table_schema_name="email",
       record_id=email_id,
       file_name_attribute="new_attachment",
       path="/path/to/large_file.zip",
       mode="auto"
   )