Azureml: Metadata mismatch for dask dataframe after using filter()

Caterina K 1 Reputation point
2022-03-16T10:18:45.947+00:00

I noticed weird behaviour when filtering an azureml TabularDataset instance using filter() and converting it to a dask dataframe afterwards. Here is my code to recreate the issue:

Imports:

from azureml.core import Dataset
from azureml.data import TabularDataset
import dask.dataframe as ddf
import pandas as pd

Register a dask dataframe to the datastore and load it as a TabularDataset:

test_df = pd.DataFrame({"id": [3,4,5], "price": [199, 98, 50]})
test_dask = ddf.from_pandas(test_df, chunksize=1)

Dataset.Tabular.register_dask_dataframe(test_dask, datastore, name='bug_test')

dataset = TabularDataset.get_by_name(workspace, name='bug_test')

Now printing the loaded dataset after converting it to dask dataframe works (almost) well (almost, since there is weird indexing as the 0 appears two times):

loaded_dask = dataset.to_dask_dataframe()
print(loaded_dask.compute())

>> 

          id  price  __null_dask_index__
       0   3    199                   0
       0   4     98                    1
       1   5     50                    2

We now want to filter for rows where id equals to 5, which works perfectly when it is filtered after converting it to dask dataframe with loaded_dask[loaded_dask.id == 5].compute()

Now computing the dask dataframe after filtering with the filter() method throws following exception, either no data or no datatype is found (for full error message see below):

filtered_ds = dataset.filter(dataset["id"] == 5)
filtered_ds.to_dask_dataframe().compute()

>> 

    ValueError: Metadata mismatch found in `from_delayed`.

    Partition type: `pandas.core.frame.DataFrame`
    +-----------------------+-------+----------+
    | Column                | Found | Expected |
    +-----------------------+-------+----------+
    | '__null_dask_index__' | -     | int32    |
    | 'id'                  | -     | int32    |
    | 'price'               | -     | int32    |
    +-----------------------+-------+----------+

Note: When filtering for invalid values, e.g. for dataset["id"] == 6 it correctly returns me an empty dataframe

Also a weird behaviour happens when playing around with the dtypes parameter in to_dask_dataframe(). When specifying types for only one column, datatypes can suddenly be found:

filtered_ds.to_dask_dataframe(dtypes={"id": "object"}).compute()

>>

    ValueError: Metadata mismatch found in `from_delayed`.

    Partition type: `pandas.core.frame.DataFrame`
    +-----------------------+-------+----------+
    | Column                | Found | Expected |
    +-----------------------+-------+----------+
    | '__null_dask_index__' | int64 | -        |
    | 'id'                  | int64 | object   |
    | 'price'               | int64 | -        |
    +-----------------------+-------+----------+

but setting dtypes={"id": "int64", "price": "int64", "__null_dask_index__": "int64"} leads again to the same ValueError as before that either no data or no datatype is found (full error ouput):

filtered_ds.to_dask_dataframe(dtypes={"id": "int64", "price": "int64", "__null_dask_index__": "int64"}).compute()

>>

    Traceback (most recent call last):
      File "\bug_analysis.py", line 117, in <module>
        filtered_ds.to_dask_dataframe(dtypes={"id": "int64", "price": "int64", "__null_dask_index__": "int64"}).compute()
      File "\venv\lib\site-packages\dask\base.py", line 290, in compute
        (result,) = compute(self, traverse=False, **kwargs)
      File "\envs\venv\lib\site-packages\dask\base.py", line 573, in compute
        results = schedule(dsk, keys, **kwargs)
      File "\venv\lib\site-packages\dask\threaded.py", line 81, in get
        results = get_async(
      File "\venv\lib\site-packages\dask\local.py", line 506, in get_async
        raise_exception(exc, tb)
      File "\venv\lib\site-packages\dask\local.py", line 314, in reraise
        raise exc
      File "\venv\lib\site-packages\dask\local.py", line 219, in execute_task
        result = _execute_task(task, data)
      File "\venv\lib\site-packages\dask\core.py", line 119, in _execute_task
        return func(*(_execute_task(a, cache) for a in args))
      File "\venv\lib\site-packages\dask\dataframe\utils.py", line 407, in check_meta
        raise ValueError(
    ValueError: Metadata mismatch found in `from_delayed`.

    Partition type: `pandas.core.frame.DataFrame`
    +-----------------------+-------+----------+
    | Column                | Found | Expected |
    +-----------------------+-------+----------+
    | '__null_dask_index__' | -     | int64    |
    | 'id'                  | -     | int64    |
    | 'price'               | -     | int64    |
    +-----------------------+-------+----------+

The exceptions are raised when the dask dataframes are computed with compute().

I am aware that I used two experimental methods ( TabularDataset.to_dask_dataframe() and TabularDataset.filter() ), so is this a bug or am I using the methods incorrectly at some point?

Azure Machine Learning
Azure Machine Learning
An Azure machine learning service for building and deploying models.
3,341 questions
{count} votes

1 answer

Sort by: Most helpful
  1. Mohanmedvi 1 Reputation point
    2022-04-05T04:59:25.48+00:00

    I have the same problem, any solution?

    0 comments No comments

Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.