Skip to content

Server pinging during item listing to prevent deprovisioning #900

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions .github/workflows/integration-test-sync-deprovision.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
name: integration-test-sync-deprovision
on: [push]
concurrency: transfer-test
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
SKYPLANE_USAGE_STATS_ENABLED: 0
jobs:
integration:
runs-on: ubuntu-latest
strategy:
max-parallel: 1
matrix:
pairs:
# AWS to AWS
- aws:us-east-1 aws:us-east-1 --multipart
- aws:us-east-2 aws:us-west-2 --multipart
# GCP to GCP
- gcp:us-central1-a gcp:us-central1-a --multipart
- gcp:us-west1-a gcp:us-east1-a --multipart
# Azure to Azure
- azure:westus azure:westus
- azure:eastus azure:westus
# cross cloud tests
- aws:us-west-1 gcp:us-west2-a --multipart
- gcp:us-west2-a aws:us-west-1 --multipart
- aws:us-west-1 azure:westus
- azure:westus aws:us-west-1
- gcp:us-west2-a azure:westus
- azure:westus gcp:us-west2-a
timeout-minutes: 40
env:
STRATEGY_UUID: itest-${{ github.run_id }}-${{ github.run_attempt }}-${{ strategy.job-index }}
steps:
- uses: actions/checkout@v1
- name: Install poetry
run: pipx install poetry
- name: Set up Python 3.10
uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "poetry"
- name: Set Poetry config
run: |
poetry config virtualenvs.in-project false
poetry config virtualenvs.path ~/.virtualenvs
- name: Install Dependencies
run: poetry install -E aws -E azure -E gcp
if: steps.cache.outputs.cache-hit != 'true'
- id: 'auth'
uses: 'google-github-actions/auth@v1'
with:
credentials_json: '${{ secrets.GCP_CREDENTIALS_JSON }}'
- name: Log into Azure
uses: azure/login@v1
with:
creds: '{"clientId":"${{ secrets.AZURE_CLIENT_ID }}","clientSecret":"${{ secrets.AZURE_CLIENT_SECRET }}","subscriptionId":"${{ secrets.AZURE_SUBSCRIPTION_ID }}","tenantId":"${{ secrets.AZURE_TENANT_ID }}"}'
- name: Skyplane init
run: |
poetry run skyplane config set gcp_service_account_name ${{ env.STRATEGY_UUID }}
poetry run skyplane config set native_cmd_enabled false
cat ~/.skyplane/config
poetry run skyplane init -y --disable-config-cloudflare
poetry run skyplane config set usage_stats false
- name: 1000000 small files test
run: poetry run python tests/integration/sync.py ${{ matrix.pairs }} --n-files 10000000 --file-size-kb 1 --autoshutdown 1
- name: Cleanup GCP service account
if: always()
run: gcloud iam service-accounts delete ${{ env.STRATEGY_UUID }}@${{ secrets.GCP_PROJECT_ID }}.iam.gserviceaccount.com
deprovision:
runs-on: ubuntu-latest
if: ${{ always() }}
needs: [integration]
env:
STRATEGY_UUID: itest-d-${{ github.run_id }}-${{ github.run_attempt }}
steps:
- uses: actions/checkout@v1
- name: Set up Python 3.10
uses: actions/setup-python@v4
with:
python-version: "3.10"
- name: Install skyplane from pypi
run: pip install skyplane[aws,azure,gcp]
- id: 'auth'
uses: 'google-github-actions/auth@v1'
with:
credentials_json: '${{ secrets.GCP_CREDENTIALS_JSON }}'
- name: Skyplane init
run: |
skyplane config set gcp_service_account_name ${{ env.STRATEGY_UUID }}
skyplane init -y --disable-config-azure
skyplane config set usage_stats false
- name: Deprovision
run: skyplane deprovision
- name: Cleanup GCP service account
if: always()
run: gcloud iam service-accounts delete --quiet ${{ env.STRATEGY_UUID }}@${{ secrets.GCP_PROJECT_ID }}.iam.gserviceaccount.com
2 changes: 2 additions & 0 deletions skyplane/api/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ def make_auth_provider(self) -> compute.IBMCloudAuthentication:

@dataclass(frozen=True)
class TransferConfig:
"""Configuration for a specific transfer"""

autoterminate_minutes: int = 15
requester_pays: bool = False

Expand Down
38 changes: 14 additions & 24 deletions skyplane/api/transfer_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,25 +170,6 @@ def _run_multipart_chunk_thread(
mime_type = None
raise NotImplementedError("Multipart not implement for non-object store interfaces")

# def to_chunk_requests(self, gen_in: Generator[Chunk, None, None]) -> Generator[ChunkRequest, None, None]:
# """Converts a generator of chunks to a generator of chunk requests.

# :param gen_in: generator that generates chunk requests
# :type gen_in: Generator
# """
# src_region = self.src_iface.region_tag()
# src_bucket = self.src_iface.bucket()
# for chunk in gen_in:
# yield ChunkRequest(
# chunk=chunk,
# src_region=src_region,
# #dst_region=dest_region,
# src_object_store_bucket=src_bucket,
# #dst_object_store_bucket=dest_bucket,
# src_type="object_store",
# #dst_type="object_store",
# )

@staticmethod
def map_object_key_prefix(source_prefix: str, source_key: str, dest_prefix: str, recursive: bool = False):
"""
Expand Down Expand Up @@ -244,15 +225,18 @@ def transfer_pair_generator(
self,
src_prefix: str,
dst_prefixes: List[str],
dataplane: "Dataplane",
recursive: bool,
prefilter_fn: Optional[Callable[[ObjectStoreObject], bool]] = None, # TODO: change to StorageObject
) -> Generator[TransferPair, None, None]:
"""Query source region and return list of objects to transfer.

:param src_prefix: source bucket folder prefix
:type src_prefix: string
:param dst_prefix: destination bucket folder prefix
:type dst_prefix: string
:param dst_prefixes: destination bucket folder prefixes
:type dst_prefix: List[string]
:param dataplane: dataplane for the transfer
:type dataplane: skyplane.api.Dataplane
:param recursive: if true, will copy objects at folder prefix recursively
:type recursive: bool
:param prefilter_fn: filters out objects whose prefixes do not match the filter function (default: None)
Expand All @@ -273,6 +257,8 @@ def transfer_pair_generator(
logger.fs.debug(f"Querying objects in {self.src_iface.path()}")
n_objs = 0
for obj in self.src_iface.list_objects(src_prefix):
if dataplane.bound_nodes:
do_parallel(lambda i: i.run_command("echo 1"), dataplane.bound_nodes.values(), n=8)
if prefilter_fn is None or prefilter_fn(obj):
# collect list of destination objects
dest_objs = {}
Expand Down Expand Up @@ -586,6 +572,7 @@ def http_pool(self):
def gen_transfer_pairs(
self,
chunker: Optional[Chunker] = None,
dataplane: Optional["Dataplane"] = None,
transfer_config: Optional[TransferConfig] = field(init=False, default_factory=lambda: TransferConfig()),
) -> Generator[TransferPair, None, None]:
"""Generate transfer pairs for the transfer job.
Expand All @@ -595,7 +582,7 @@ def gen_transfer_pairs(
"""
if chunker is None: # used for external access to transfer pair list
chunker = Chunker(self.src_iface, self.dst_ifaces, transfer_config) # TODO: should read in existing transfer config
yield from chunker.transfer_pair_generator(self.src_prefix, self.dst_prefixes, self.recursive, self._pre_filter_fn)
yield from chunker.transfer_pair_generator(self.src_prefix, self.dst_prefixes, dataplane, self.recursive, self._pre_filter_fn)

def dispatch(
self,
Expand All @@ -613,7 +600,7 @@ def dispatch(
:type dispatch_batch_size: int
"""
chunker = Chunker(self.src_iface, self.dst_ifaces, transfer_config)
transfer_pair_generator = self.gen_transfer_pairs(chunker) # returns TransferPair objects
transfer_pair_generator = self.gen_transfer_pairs(chunker, dataplane) # returns TransferPair objects
gen_transfer_list = chunker.tail_generator(transfer_pair_generator, self.transfer_list)
chunks = chunker.chunk(gen_transfer_list)
batches = chunker.batch_generator(
Expand Down Expand Up @@ -779,6 +766,7 @@ def __init__(self, src_path: str, dst_paths: List[str] or str, requester_pays: b
def gen_transfer_pairs(
self,
chunker: Optional[Chunker] = None,
dataplane: Optional["Dataplane"] = None,
transfer_config: Optional[TransferConfig] = field(init=False, default_factory=lambda: TransferConfig()),
) -> Generator[TransferPair, None, None]:
"""Generate transfer pairs for the transfer job.
Expand All @@ -788,7 +776,9 @@ def gen_transfer_pairs(
"""
if chunker is None: # used for external access to transfer pair list
chunker = Chunker(self.src_iface, self.dst_ifaces, transfer_config)
transfer_pair_gen = chunker.transfer_pair_generator(self.src_prefix, self.dst_prefixes, self.recursive, self._pre_filter_fn)
transfer_pair_gen = chunker.transfer_pair_generator(
self.src_prefix, self.dst_prefixes, dataplane, self.recursive, self._pre_filter_fn
)

# only single destination supported
assert len(self.dst_ifaces) == 1, "Only single destination supported for sync job"
Expand Down
109 changes: 109 additions & 0 deletions tests/integration/sync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import argparse
import os
import tempfile
import uuid
from skyplane.utils.definitions import KB
from skyplane.obj_store.object_store_interface import ObjectStoreInterface
from skyplane.cli.cli import sync
from skyplane.config_paths import cloud_config
from skyplane.utils import logger


def setup_buckets(src_region, dest_region, n_files=1, file_size_kb=1):
src_provider, src_zone = src_region.split(":")
dest_provider, dest_zone = dest_region.split(":")
if src_provider == "azure":
src_bucket_name = f"integration{src_zone}/{str(uuid.uuid4()).replace('-', '')}"
else:
src_bucket_name = f"integration{src_zone}-{str(uuid.uuid4())[:8]}"
if dest_provider == "azure":
dest_bucket_name = f"integration{dest_zone}/{str(uuid.uuid4()).replace('-', '')}"
else:
dest_bucket_name = f"skyplane-integration-{dest_zone}-{str(uuid.uuid4())[:8]}"
logger.debug(f"creating buckets {src_bucket_name} and {dest_bucket_name}")
src_interface = ObjectStoreInterface.create(src_region, src_bucket_name)
dest_interface = ObjectStoreInterface.create(dest_region, dest_bucket_name)
src_interface.create_bucket(src_zone)
dest_interface.create_bucket(dest_zone)

src_prefix = f"src_{uuid.uuid4()}"
dest_prefix = f"dest_{uuid.uuid4()}"
with tempfile.NamedTemporaryFile() as tmp:
fpath = tmp.name
with open(fpath, "wb+") as f:
f.write(os.urandom(int(file_size_kb * KB)))
for i in range(n_files):
src_interface.upload_object(fpath, f"{src_prefix}/{i}", mime_type="text/plain")

return src_bucket_name, dest_bucket_name, src_prefix, dest_prefix


def run(src_region, dest_region, n_files=1, file_size_kb=1, multipart=False, autoshowdown_minutes=15):
logger.info(
f"Running skyplane sync integration test with config "
+ f"src_region={src_region}, "
+ f"dest_region={dest_region}, "
+ f"n_files={n_files}, "
+ f"file_size_kb={file_size_kb}, "
+ f"multipart={multipart}"
+ f"autoshowdown_minutes={autoshowdown_minutes}"
)
cloud_config.set_flag("autoshutdown_minutes", autoshowdown_minutes)
src_bucket_name, dest_bucket_name, src_prefix, dest_prefix = setup_buckets(
src_region, dest_region, n_files=n_files, file_size_kb=file_size_kb
)

def map_path(region, bucket, prefix):
provider, _ = region.split(":")
if provider == "aws":
return f"s3://{bucket}/{prefix}"
elif provider == "azure":
storage_account, container = bucket.split("/")
return f"https://{storage_account}.blob.core.windows.net/{container}/{prefix}"
elif provider == "gcp":
return f"gs://{bucket}/{prefix}"
else:
raise Exception(f"Unknown provider {provider}")

return_code = sync(
map_path(src_region, src_bucket_name, src_prefix),
map_path(dest_region, dest_bucket_name, dest_prefix),
debug=False,
multipart=multipart,
confirm=True,
max_instances=1,
max_connections=1,
solver="direct",
solver_required_throughput_gbits=1,
)

# clean up path
src_interface = ObjectStoreInterface.create(src_region, src_bucket_name)
dest_interface = ObjectStoreInterface.create(dest_region, dest_bucket_name)
src_interface.delete_objects([f"{src_prefix}/{i}" for i in range(n_files)])
dest_interface.delete_objects([f"{dest_prefix}/{i}" for i in range(n_files)])
src_interface.delete_bucket()
dest_interface.delete_bucket()

return return_code


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("src", help="source region")
parser.add_argument("dest", help="destination region")
parser.add_argument("--n-files", type=int, default=1)
parser.add_argument("--file-size-kb", type=int, default=1)
parser.add_argument("--multipart", action="store_true")
parser.add_argument("--autoshutdown", type=int, default=15)
args = parser.parse_args()

return_code = run(
args.src,
args.dest,
n_files=args.n_files,
file_size_kb=args.file_size_kb,
multipart=args.multipart,
autoshowdown_minutes=args.autoshutdown,
)
exit(return_code)