Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A more comprehensive tuning guide for memory related options #949

Open
Kontinuation opened this issue Sep 18, 2024 · 9 comments
Open

A more comprehensive tuning guide for memory related options #949

Kontinuation opened this issue Sep 18, 2024 · 9 comments
Assignees
Labels
enhancement New feature or request performance
Milestone

Comments

@Kontinuation
Copy link
Member

Kontinuation commented Sep 18, 2024

What is the problem the feature request solves?

The DataFusion Comet documentation has a memory tuning section in the tuning guide after addressing #595, it looks simple at first glance, but I found that the actual behavior is more complex than what I've thought.

  1. spark.comet.memory.overhead.factor and spark.comet.memoryOverhead are for per-operator limit, not per-executor/per-worker or per-core. When a comet plan is created, it creates its own memory pool sized spark.comet.memoryOverhead. Usually, we have spark.executor.cores equal to the number of vCPUs, so the actual amount of memory allocated for comet in the worker instance will be (at least) spark.executor.cores* spark.comet.memoryOverhead.
  2. We have CometPlugin for configuring comet memory overhead automatically, but CometPlugin does not account for the existence of multiple executor cores. The actual per-instance comet memory consumption will be more than the configured memory overhead when spark.executor.cores > 1.
  3. Even when assuming spark.executor.cores = 1 and we are only running one single task on each executor instance, there are still chances to have multiple comet executors allocating multiple memory pools, so the actual memory limit will be multiple times of spark.comet.memoryOverhead. The following figure shows the DAG of a Spark job. We can see that Stage 205 has 3 CometSort nodes, each node may consume spark.comet.memoryOverhead amount of memory. This is a conservative estimation since we assume that all other nodes in this stage won't reserve significant amount of memory.
    image

The conclusion is that the actual memory limit for comet depends on:

  • spark.comet.memoryOverhead
  • Number of cores per executor, as well as related configurations such as spark.task.cpus
  • The maximum number of memory-intensive Comet nodes in one stage

This makes comet hard to tune and the behavior is hard to estimate (it depends on the actual queries). We'd better make it clear in the tuning guide or revamp the memory-related configurations to make it easier to tune and reason about.

Describe the potential solution

Ideally the spark.comet.memory.overhead.factor and spark.comet.memoryOverhead configure the per executor instance memory limit. I have the following ideas to achieve this:

  1. Use the unified memory manager introduced by feat: Introduce CometTaskMemoryManager and native side memory pool #83. This requires enabling off-heap memory in Spark. I'm not sure why it does not appear in the tuning guide (due to its maturity maybe). The downside is that comet operators cannot trigger the spilling of other memory consumers, which makes it easy to run into issues similar to SparkOutOfMemoryError happens when running CometColumnarExchange #886 due to its greedy/unfair nature.
  2. Making all comet operators in the same task sharing the same FairSpillPool. The memory limit of the fair spill pool can be spark.comet.memoryOverhead / numTaskSlots. It ensures that each operator can get the minimum amount of memory, especially when we only support self-spilling. The downside is memory under-utilization when the memory requirements of the operators are very uneven (Memory manager triggers unnecessary spills datafusion#2829).

I'm not sure if it is feasible to implement non-self-spill memory reclaiming on top of 1 or 2, but I think it will help a lot to handle various kinds of workloads efficiently.

Additional context

No response

@Kontinuation Kontinuation added the enhancement New feature or request label Sep 18, 2024
@andygrove andygrove added this to the 0.3.0 milestone Sep 18, 2024
@viirya
Copy link
Member

viirya commented Sep 18, 2024

The downside is that comet operators cannot trigger the spilling of other memory consumers

This is actually the limit on DataFusion memory consumer API design, if I remember it correctly.

@Kontinuation
Copy link
Member Author

Yes, the initial memory management proposal and implementation did support cooperative spilling. However, a later simplification removed that feature.

I believe using a shared FairSpillPool for comet operators within the same task is a reasonable approach, especially since the memory manager currently only supports self-spilling. Still, I would love to see cooperative spilling return to DataFusion.

@andygrove andygrove modified the milestones: 0.3.0, 0.4.0 Sep 24, 2024
@Kontinuation
Copy link
Member Author

I've experimented with approach 2 (per-task FairSpillPool) and it worked pretty well. I've also tried out per-worker FairSpillPool but it worked poorly, I'm still trying to figure out why.

I've also noticed that all the tests were run with off-heap memory enabled (for instance, the TPC-DS suite), which enables CometTaskMemoryManager and hooks into the unified memory manager of Spark; while the benchmarking guide suggests Spark configurations with off-heap memory disabled. I wonder which is the recommended way to configure comet? Is it preferred to enable off-heap memory or both configurations are well-supported?

@andygrove andygrove self-assigned this Oct 2, 2024
@andygrove
Copy link
Member

I am going to start working on this.

@andygrove
Copy link
Member

The "unified" approach certainly seems much safer and simpler.

I have been benchmarking locally with spark.memory.offHeap.enabled=true and the benchmarking guide does not reflect this.

I am going to start out with a PR just to correctly document how things work today.

@Kontinuation
Copy link
Member Author

Thank you so much for declaring the recommended setup! It's given us a great direction, especially towards enhancing the memory manager in "unified" mode. Currently, I think the absence of cooperative spilling seems to be a significant shortcoming. Do you think it is appropriate to add this to DataFusion or Comet?

@andygrove
Copy link
Member

Thank you so much for declaring the recommended setup! It's given us a great direction, especially towards enhancing the memory manager in "unified" mode. Currently, I think the absence of cooperative spilling seems to be a significant shortcoming. Do you think it is appropriate to add this to DataFusion or Comet?

If we want to explore cooperative spilling, I think that it would be better to have that conversation in DataFusion.

@andygrove
Copy link
Member

I created a separate issue for improving the "native memory management" approach.

#996

@andygrove
Copy link
Member

The downside (to using unified memory) is that comet operators cannot trigger the spilling of other memory consumers, which makes it easy to run into issues similar to #886 due to its greedy/unfair nature.

Issue #886 is related to Comet columnar shuffle, which currently has its own memory management, which is separate to the unified or native memory management features. There is a PR #1063 to make it use unified memory instead.

We are now leaning towards always using unified memory, and there is a PR to make this the only approach: #1062

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request performance
Projects
None yet
Development

No branches or pull requests

3 participants