You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
We often use dask together with plateau to perform the following task in parallel:
load a chunk of data
do a transformation
write the chunk of data to disk
We typically do this with datasets that don't fit into our machine's RAM. This used to work fine, but with the new query planner, we typically run out of memory, because dask now loads all the data before doing a transformation. Before, dask would operate chunk by chunk.
Minimal Complete Verifiable Example:
I tried to come up with a simple example that doesn't involve plateau (or a lot of data):
importdaskimportdask.dataframeasddimportpandasaspdfromdask.distributedimportClient, LocalCluster# load the data from disk@dask.delayeddef_load(x):
print(f"Loading chunk {x}")
returnpd.DataFrame({"x": x, "y": [1, 2, 3]})
# transform and storedef_transform_and_store(df):
# this example doesn't transform or store anything, but you get the ideax=df["x"].unique().item()
print(f"Storing chunk {x}")
if__name__=="__main__":
ddf=dd.from_delayed([_load(x) forxinrange(10)], meta={"x": "int64", "y": "int64"})
withLocalCluster(n_workers=1, threads_per_worker=1) ascluster:
withClient(cluster):
ddf.map_partitions(_transform_and_store, meta={}).compute()
Here, we first load everything and so we'll run out of memory.
Is there a way to tell the query planner to operate chunk-by-chunk when it encounters map_partitions?
Should I not be using map_partitions for something like this?
NB: DASK_DISTRIBUTED__SCHEDULER__WORKER_SATURATION=1 doesn't help
Anything else that we should know?
Note that it's really tricky to turn off the query planner (other than through environment variables). When using dask.config from within Python, it's important that the config is set before the first import of dask.dataframe, which is difficult to control.
Environment:
Dask version: 2024.4.2
Python version: 3.12.3
Operating System: linux
Install method (conda, pip, source): conda
Thank you!
The text was updated successfully, but these errors were encountered:
I migrated read_dataset_as_ddf already to from_map in data-engineering-collective/plateau#80 if there are any remaining from_delayed calls I strongly recommend to replace them as well
Describe the issue:
We often use dask together with plateau to perform the following task in parallel:
We typically do this with datasets that don't fit into our machine's RAM. This used to work fine, but with the new query planner, we typically run out of memory, because dask now loads all the data before doing a transformation. Before, dask would operate chunk by chunk.
Minimal Complete Verifiable Example:
I tried to come up with a simple example that doesn't involve plateau (or a lot of data):
The behavior without the query planner is:
So, we load, transform and store and won't run out of memory.
Here, we first load everything and so we'll run out of memory.
map_partitions
?map_partitions
for something like this?DASK_DISTRIBUTED__SCHEDULER__WORKER_SATURATION=1
doesn't helpAnything else that we should know?
Note that it's really tricky to turn off the query planner (other than through environment variables). When using
dask.config
from within Python, it's important that the config is set before the first import ofdask.dataframe
, which is difficult to control.Environment:
Thank you!
The text was updated successfully, but these errors were encountered: