I have the same problem, any solution?
Azureml: Metadata mismatch for dask dataframe after using filter()
I noticed weird behaviour when filtering an azureml TabularDataset
instance using filter()
and converting it to a dask dataframe afterwards. Here is my code to recreate the issue:
Imports:
from azureml.core import Dataset
from azureml.data import TabularDataset
import dask.dataframe as ddf
import pandas as pd
Register a dask dataframe to the datastore and load it as a TabularDataset
:
test_df = pd.DataFrame({"id": [3,4,5], "price": [199, 98, 50]})
test_dask = ddf.from_pandas(test_df, chunksize=1)
Dataset.Tabular.register_dask_dataframe(test_dask, datastore, name='bug_test')
dataset = TabularDataset.get_by_name(workspace, name='bug_test')
Now printing the loaded dataset after converting it to dask dataframe works (almost) well (almost, since there is weird indexing as the 0 appears two times):
loaded_dask = dataset.to_dask_dataframe()
print(loaded_dask.compute())
>>
id price __null_dask_index__
0 3 199 0
0 4 98 1
1 5 50 2
We now want to filter for rows where id
equals to 5, which works perfectly when it is filtered after converting it to dask dataframe with loaded_dask[loaded_dask.id == 5].compute()
Now computing the dask dataframe after filtering with the filter()
method throws following exception, either no data or no datatype is found (for full error message see below):
filtered_ds = dataset.filter(dataset["id"] == 5)
filtered_ds.to_dask_dataframe().compute()
>>
ValueError: Metadata mismatch found in `from_delayed`.
Partition type: `pandas.core.frame.DataFrame`
+-----------------------+-------+----------+
| Column | Found | Expected |
+-----------------------+-------+----------+
| '__null_dask_index__' | - | int32 |
| 'id' | - | int32 |
| 'price' | - | int32 |
+-----------------------+-------+----------+
Note: When filtering for invalid values, e.g. for dataset["id"] == 6
it correctly returns me an empty dataframe
Also a weird behaviour happens when playing around with the dtypes
parameter in to_dask_dataframe()
. When specifying types for only one column, datatypes can suddenly be found:
filtered_ds.to_dask_dataframe(dtypes={"id": "object"}).compute()
>>
ValueError: Metadata mismatch found in `from_delayed`.
Partition type: `pandas.core.frame.DataFrame`
+-----------------------+-------+----------+
| Column | Found | Expected |
+-----------------------+-------+----------+
| '__null_dask_index__' | int64 | - |
| 'id' | int64 | object |
| 'price' | int64 | - |
+-----------------------+-------+----------+
but setting dtypes={"id": "int64", "price": "int64", "__null_dask_index__": "int64"}
leads again to the same ValueError as before that either no data or no datatype is found (full error ouput):
filtered_ds.to_dask_dataframe(dtypes={"id": "int64", "price": "int64", "__null_dask_index__": "int64"}).compute()
>>
Traceback (most recent call last):
File "\bug_analysis.py", line 117, in <module>
filtered_ds.to_dask_dataframe(dtypes={"id": "int64", "price": "int64", "__null_dask_index__": "int64"}).compute()
File "\venv\lib\site-packages\dask\base.py", line 290, in compute
(result,) = compute(self, traverse=False, **kwargs)
File "\envs\venv\lib\site-packages\dask\base.py", line 573, in compute
results = schedule(dsk, keys, **kwargs)
File "\venv\lib\site-packages\dask\threaded.py", line 81, in get
results = get_async(
File "\venv\lib\site-packages\dask\local.py", line 506, in get_async
raise_exception(exc, tb)
File "\venv\lib\site-packages\dask\local.py", line 314, in reraise
raise exc
File "\venv\lib\site-packages\dask\local.py", line 219, in execute_task
result = _execute_task(task, data)
File "\venv\lib\site-packages\dask\core.py", line 119, in _execute_task
return func(*(_execute_task(a, cache) for a in args))
File "\venv\lib\site-packages\dask\dataframe\utils.py", line 407, in check_meta
raise ValueError(
ValueError: Metadata mismatch found in `from_delayed`.
Partition type: `pandas.core.frame.DataFrame`
+-----------------------+-------+----------+
| Column | Found | Expected |
+-----------------------+-------+----------+
| '__null_dask_index__' | - | int64 |
| 'id' | - | int64 |
| 'price' | - | int64 |
+-----------------------+-------+----------+
The exceptions are raised when the dask dataframes are computed with compute()
.
I am aware that I used two experimental methods ( TabularDataset.to_dask_dataframe()
and TabularDataset.filter()
), so is this a bug or am I using the methods incorrectly at some point?