DataverseClient Class
High-level client for Microsoft Dataverse operations.
This client provides a simple, stable interface for interacting with Dataverse environments through the Web API. It handles authentication via Azure Identity and delegates HTTP operations to an internal <xref:PowerPlatform.Dataverse.data._odata._ODataClient>.
Key capabilities:
OData CRUD operations: create, read, update, delete records
SQL queries: execute read-only SQL via Web API
?sqlparameterTable metadata: create, inspect, and delete custom tables; create and delete columns
File uploads: upload files to file columns with chunking support
Note
The client lazily initializes its internal OData client on first use, allowing lightweight construction without immediate network calls.
Constructor
DataverseClient(base_url: str, credential: TokenCredential, config: DataverseConfig | None = None)
Parameters
| Name | Description |
|---|---|
|
base_url
Required
|
Your Dataverse environment URL, for example
|
|
credential
Required
|
Azure Identity credential for authentication. |
|
config
|
Optional configuration for language, timeouts, and retries. If not provided, defaults are loaded from from_env. Default value: None
|
Examples
Create a client and perform basic operations:
from azure.identity import InteractiveBrowserCredential
from PowerPlatform.Dataverse.client import DataverseClient
credential = InteractiveBrowserCredential()
client = DataverseClient(
"https://org.crm.dynamics.com",
credential
)
# Create a record
record_ids = client.create("account", {"name": "Contoso Ltd"})
print(f"Created account: {record_ids[0]}")
# Update a record
client.update("account", record_ids[0], {"telephone1": "555-0100"})
# Query records
for batch in client.get("account", filter="name eq 'Contoso Ltd'"):
for account in batch:
print(account["name"])
# Delete a record
client.delete("account", record_ids[0])
Methods
| create |
Create one or more records by table name. |
| create_columns |
Create one or more columns on an existing table using a schema-style mapping. |
| create_table |
Create a simple custom table with specified columns. |
| delete |
Delete one or more records by GUID. |
| delete_columns |
Delete one or more columns from a table. |
| delete_table |
Delete a custom table by name. Warning This operation is irreversible and will delete all records in the table along with the table definition. Use with caution. |
| flush_cache |
Flush cached client metadata or state. |
| get |
Fetch a single record by ID or query multiple records. When |
| get_table_info |
Get basic metadata for a table if it exists. |
| list_tables |
List all custom tables in the Dataverse environment. |
| query_sql |
Execute a read-only SQL query using the Dataverse Web API The SQL query must follow the supported subset: a single SELECT statement with optional WHERE, TOP (integer literal), ORDER BY (column names only), and a simple table alias after FROM. Note The SQL support is limited to read-only queries. Complex joins, subqueries, and certain SQL functions may not be supported. Consult the Dataverse documentation for the current feature set. |
| update |
Update one or more records. This method supports three usage patterns:
Note Single updates discard the response representation for better performance. For broadcast or paired updates, the method delegates to the internal client's batch update logic. |
| upload_file |
Upload a file to a Dataverse file column. Note Large files are automatically chunked to avoid request size limits. The chunk mode performs multiple requests with resumable upload support. |
create
Create one or more records by table name.
create(table_schema_name: str, records: Dict[str, Any] | List[Dict[str, Any]]) -> List[str]
Parameters
| Name | Description |
|---|---|
|
table_schema_name
Required
|
Schema name of the table (e.g. |
|
records
Required
|
A single record dictionary or a list of record dictionaries. Each dictionary should contain column schema names as keys. |
Returns
| Type | Description |
|---|---|
|
List of created record GUIDs. Returns a single-element list for a single input. |
Exceptions
| Type | Description |
|---|---|
|
If |
Examples
Create a single record:
client = DataverseClient(base_url, credential)
ids = client.create("account", {"name": "Contoso"})
print(f"Created: {ids[0]}")
Create multiple records:
records = [
{"name": "Contoso"},
{"name": "Fabrikam"}
]
ids = client.create("account", records)
print(f"Created {len(ids)} accounts")
create_columns
Create one or more columns on an existing table using a schema-style mapping.
create_columns(table_schema_name: str, columns: Dict[str, Any]) -> List[str]
Parameters
| Name | Description |
|---|---|
|
table_schema_name
Required
|
Schema name of the table (e.g. |
|
columns
Required
|
Mapping of column schema names (with customization prefix value) to supported types. All custom column names must include the customization prefix value** (e.g. |
Returns
| Type | Description |
|---|---|
|
Schema names for the columns that were created. |
Examples
Create two columns on the custom table:
created = client.create_columns(
"new_MyTestTable",
{
"new_Scratch": "string",
"new_Flags": "bool",
},
)
print(created) # ['new_Scratch', 'new_Flags']
create_table
Create a simple custom table with specified columns.
create_table(table_schema_name: str, columns: Dict[str, Any], solution_unique_name: str | None = None, primary_column_schema_name: str | None = None) -> Dict[str, Any]
Parameters
| Name | Description |
|---|---|
|
table_schema_name
Required
|
Schema name of the table with customization prefix value (e.g. |
|
columns
Required
|
Dictionary mapping column names (with customization prefix value) to their types. All custom column names must include the customization prefix value (e.g.
|
|
solution_unique_name
|
Optional solution unique name that should own the new table. When omitted the table is created in the default solution. Default value: None
|
|
primary_column_schema_name
|
Optional primary name column schema name with customization prefix value (e.g. Default value: None
|
Returns
| Type | Description |
|---|---|
|
Dictionary containing table metadata including |
Exceptions
| Type | Description |
|---|---|
|
If table creation fails or the schema is invalid. |
Examples
Create a table with simple columns:
from enum import IntEnum
class ItemStatus(IntEnum):
ACTIVE = 1
INACTIVE = 2
columns = {
"new_Title": "string", # Note: includes 'new_' customization prefix value
"new_Quantity": "int",
"new_Price": "decimal",
"new_Available": "bool",
"new_Status": ItemStatus
}
result = client.create_table("new_MyTestTable", columns)
print(f"Created table: {result['table_schema_name']}")
print(f"Columns: {result['columns_created']}")
Create a table with a custom primary column name:
result = client.create_table(
"new_Product",
{"new_Price": "decimal"},
primary_column_schema_name="new_ProductName"
)
delete
Delete one or more records by GUID.
delete(table_schema_name: str, ids: str | List[str], use_bulk_delete: bool = True) -> str | None
Parameters
| Name | Description |
|---|---|
|
table_schema_name
Required
|
Schema name of the table (e.g. |
|
ids
Required
|
Single GUID string or list of GUID strings to delete. |
|
use_bulk_delete
|
When Default value: True
|
Returns
| Type | Description |
|---|---|
|
str,
|
BulkDelete job ID when deleting multiple records via BulkDelete; otherwise |
Exceptions
| Type | Description |
|---|---|
|
If |
|
|
If the underlying Web API delete request fails. |
Examples
Delete a single record:
client.delete("account", account_id)
Delete multiple records:
job_id = client.delete("account", [id1, id2, id3])
delete_columns
Delete one or more columns from a table.
delete_columns(table_schema_name: str, columns: str | List[str]) -> List[str]
Parameters
| Name | Description |
|---|---|
|
table_schema_name
Required
|
Schema name of the table (e.g. |
|
columns
Required
|
Column name or list of column names to remove. Must include customization prefix value (e.g. |
Returns
| Type | Description |
|---|---|
|
Schema names for the columns that were removed. |
Examples
Remove two custom columns by schema name:
removed = client.delete_columns( "new_MyTestTable", ["new_Scratch", "new_Flags"],
) print(removed) # ['new_Scratch', 'new_Flags']
delete_table
Delete a custom table by name.
Warning
This operation is irreversible and will delete all records in the table along
with the table definition. Use with caution.
delete_table(table_schema_name: str) -> None
Parameters
| Name | Description |
|---|---|
|
table_schema_name
Required
|
Schema name of the table (e.g. |
Exceptions
| Type | Description |
|---|---|
|
If the table does not exist or deletion fails. |
Examples
Delete a custom table:
client.delete_table("new_MyTestTable")
flush_cache
Flush cached client metadata or state.
flush_cache(kind) -> int
Parameters
| Name | Description |
|---|---|
|
kind
Required
|
Cache kind to flush. Currently supported values:
Future kinds (e.g. |
Returns
| Type | Description |
|---|---|
|
Number of cache entries removed. |
Examples
Clear the picklist cache:
removed = client.flush_cache("picklist")
print(f"Cleared {removed} cached picklist entries")
get
Fetch a single record by ID or query multiple records.
When record_id is provided, returns a single record dictionary.
When record_id is None, returns a generator yielding batches of records.
get(table_schema_name: str, record_id: str | None = None, select: List[str] | None = None, filter: str | None = None, orderby: List[str] | None = None, top: int | None = None, expand: List[str] | None = None, page_size: int | None = None) -> Dict[str, Any] | Iterable[List[Dict[str, Any]]]
Parameters
| Name | Description |
|---|---|
|
table_schema_name
Required
|
Schema name of the table (e.g. |
|
record_id
|
Optional GUID to fetch a specific record. If None, queries multiple records. Default value: None
|
|
select
|
Optional list of attribute logical names to retrieve. Column names are case-insensitive and automatically lowercased (e.g. Default value: None
|
|
filter
|
Optional OData filter string, e.g. Default value: None
|
|
orderby
|
Optional list of attributes to sort by, e.g. Default value: None
|
|
top
|
Optional maximum number of records to return. Default value: None
|
|
expand
|
Optional list of navigation properties to expand, e.g. Default value: None
|
|
page_size
|
Optional number of records per page for pagination. Default value: None
|
Returns
| Type | Description |
|---|---|
|
dict,
|
Single record dict if |
Exceptions
| Type | Description |
|---|---|
|
If |
Examples
Fetch a single record:
record = client.get("account", record_id=account_id, select=["name", "telephone1"])
print(record["name"])
Query multiple records with filtering (note: exact logical names in filter):
for batch in client.get(
"account",
filter="statecode eq 0 and name eq 'Contoso'", # Must use exact logical names (lower-case)
select=["name", "telephone1"]
):
for account in batch:
print(account["name"])
Query with navigation property expansion (note: case-sensitive property name):
for batch in client.get(
"account",
select=["name"],
expand=["primarycontactid"], # Case-sensitive! Check metadata for exact name
filter="statecode eq 0"
):
for account in batch:
print(f"{account['name']} - Contact: {account.get('primarycontactid', {}).get('fullname')}")
Query with sorting and pagination:
for batch in client.get(
"account",
orderby=["createdon desc"],
top=100,
page_size=50
):
print(f"Batch size: {len(batch)}")
get_table_info
Get basic metadata for a table if it exists.
get_table_info(table_schema_name: str) -> Dict[str, Any] | None
Parameters
| Name | Description |
|---|---|
|
table_schema_name
Required
|
Schema name of the table (e.g. |
Returns
| Type | Description |
|---|---|
|
dict,
|
Dictionary containing table metadata with keys |
Examples
Retrieve table metadata:
info = client.get_table_info("new_MyTestTable")
if info:
print(f"Logical name: {info['table_logical_name']}")
print(f"Entity set: {info['entity_set_name']}")
list_tables
query_sql
Execute a read-only SQL query using the Dataverse Web API ?sql capability.
The SQL query must follow the supported subset: a single SELECT statement with optional WHERE, TOP (integer literal), ORDER BY (column names only), and a simple table alias after FROM.
Note
The SQL support is limited to read-only queries. Complex joins, subqueries, and certain SQL functions may not be supported. Consult the Dataverse documentation for the current feature set.
query_sql(sql: str)
Parameters
| Name | Description |
|---|---|
|
sql
Required
|
Supported SQL SELECT statement. |
Returns
| Type | Description |
|---|---|
|
List of result row dictionaries. Returns an empty list if no rows match. |
Exceptions
| Type | Description |
|---|---|
|
If the SQL query uses unsupported syntax. |
|
|
If the Web API returns an error. |
Examples
Basic SQL query:
sql = "SELECT TOP 10 accountid, name FROM account WHERE name LIKE 'C%' ORDER BY name"
results = client.query_sql(sql)
for row in results:
print(row["name"])
Query with alias:
sql = "SELECT a.name, a.telephone1 FROM account AS a WHERE a.statecode = 0"
results = client.query_sql(sql)
update
Update one or more records.
This method supports three usage patterns:
Single record update:
update("account", "guid", {"name": "New Name"})Broadcast update:
update("account", [id1, id2], {"status": 1})- applies same changes to all IDsPaired updates:
update("account", [id1, id2], [changes1, changes2])- one-to-one mapping
Note
Single updates discard the response representation for better performance. For broadcast or paired updates, the method delegates to the internal client's batch update logic.
update(table_schema_name: str, ids: str | List[str], changes: Dict[str, Any] | List[Dict[str, Any]]) -> None
Parameters
| Name | Description |
|---|---|
|
table_schema_name
Required
|
Schema name of the table (e.g. |
|
ids
Required
|
Single GUID string or list of GUID strings to update. |
|
changes
Required
|
Dictionary of changes for single/broadcast mode, or list of dictionaries
for paired mode. When |
Exceptions
| Type | Description |
|---|---|
|
If |
Examples
Single record update:
client.update("account", account_id, {"telephone1": "555-0100"})
Broadcast same changes to multiple records:
client.update("account", [id1, id2, id3], {"statecode": 1})
Update multiple records with different values:
ids = [id1, id2]
changes = [
{"name": "Updated Name 1"},
{"name": "Updated Name 2"}
]
client.update("account", ids, changes)
upload_file
Upload a file to a Dataverse file column.
Note
Large files are automatically chunked to avoid request size limits. The chunk mode performs multiple requests with resumable upload support.
upload_file(table_schema_name: str, record_id: str, file_name_attribute: str, path: str, mode: str | None = None, mime_type: str | None = None, if_none_match: bool = True) -> None
Parameters
| Name | Description |
|---|---|
|
table_schema_name
Required
|
Schema name of the table, e.g. |
|
record_id
Required
|
GUID of the target record. |
|
file_name_attribute
Required
|
Logical name of the file column attribute. |
|
path
Required
|
Local filesystem path to the file. The stored filename will be the basename of this path. |
|
mode
|
Upload strategy: Default value: None
|
|
mime_type
|
Explicit MIME type to store with the file (e.g. Default value: None
|
|
if_none_match
|
When True (default), sends Default value: True
|
Exceptions
| Type | Description |
|---|---|
|
If the upload fails or the file column is not empty
when |
|
|
If the specified file path does not exist. |
Examples
Upload a PDF file:
client.upload_file(
table_schema_name="account",
record_id=account_id,
file_name_attribute="new_contract",
path="/path/to/contract.pdf",
mime_type="application/pdf"
)
Upload with auto mode selection:
client.upload_file(
table_schema_name="email",
record_id=email_id,
file_name_attribute="new_attachment",
path="/path/to/large_file.zip",
mode="auto"
)