Skip to content

Commit

Permalink
[BACKPORT][Ray] Use main pool as owner when autoscale disabled (#2878) (
Browse files Browse the repository at this point in the history
#2903)

Co-authored-by: Shawn <[email protected]>
  • Loading branch information
wjsi and chaokunyang committed Apr 9, 2022
1 parent 2152931 commit ed300c5
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 26 deletions.
12 changes: 0 additions & 12 deletions .github/workflows/cancel-prev.yml

This file was deleted.

4 changes: 4 additions & 0 deletions .github/workflows/core-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ on:
pull_request:
types: ['opened', 'reopened', 'synchronize']

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

jobs:
build:
runs-on: ${{ matrix.os }}
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/docker-cd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ on:
tags:
- '*'

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

jobs:
build:
runs-on: ubuntu-latest
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/os-compat-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ on:
pull_request:
types: ['opened', 'reopened', 'synchronize']

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

jobs:
build:
runs-on: ${{ matrix.os }}
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/platform-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ on:
pull_request:
types: ['opened', 'reopened', 'synchronize']

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

jobs:
build:
runs-on: ${{ matrix.os }}
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/pypi-cd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ on:
tags:
- '*'

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

jobs:
build:
name: Build wheels on ${{ matrix.os }} for ${{ matrix.arch }}
Expand Down
11 changes: 0 additions & 11 deletions mars/services/storage/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -550,17 +550,6 @@ async def _setup_storage(
):
backend = get_storage_backend(storage_backend)
storage_config = storage_config or dict()

from ..cluster import ClusterAPI

if backend.name == "ray":
try:
cluster_api = await ClusterAPI.create(self.address)
supervisor_address = (await cluster_api.get_supervisors())[0]
# ray storage backend need to set supervisor as owner to avoid data lost when worker dies.
storage_config["owner"] = supervisor_address
except mo.ActorNotExist:
pass
init_params, teardown_params = await backend.setup(**storage_config)
client = backend(**init_params)
self._init_params[band_name][storage_backend] = init_params
Expand Down
28 changes: 25 additions & 3 deletions mars/services/storage/worker/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,31 @@ async def start(self):
backends = storage_configs.get("backends")
options = storage_configs.get("default_config", dict())
transfer_block_size = options.get("transfer_block_size", None)
backend_config = {
backend: storage_configs.get(backend, dict()) for backend in backends
}
backend_config = {}
for backend in backends:
storage_config = storage_configs.get(backend, dict())
backend_config[backend] = storage_config
if backend == "ray":
# Specify supervisor as ray owner will be costly when mars do shuffle which there will be m*n objects
# need to specify supervisor as owner, so enable it only for auto scale to avoid data lost when scale
# in. This limit can be removed when ray support ownership transfer.
if (
self._config.get("scheduling", {})
.get("autoscale", {})
.get("enabled", False)
):
try:
from ...cluster.api import ClusterAPI

cluster_api = await ClusterAPI.create(self._address)
supervisor_address = (await cluster_api.get_supervisors())[0]
# ray storage backend need to set supervisor as owner to avoid data lost when worker dies.
owner = supervisor_address
except mo.ActorNotExist:
owner = self._address
else:
owner = self._address
storage_config["owner"] = owner

await mo.create_actor(
StorageManagerActor,
Expand Down

0 comments on commit ed300c5

Please sign in to comment.