Skip to content

Commit 8080658

Browse files
committed
backup_task: remove a component once it is uploaded
Previously, during backup, SSTable components are preserved in the snapshot directory even after being uploaded. This leads to redundant uploads in case of failed backups or restarts, wasting time and resources (S3 API calls). This change - adds an optional query parameter named "move_files" to "/storage_service/backup" API. if it is set to "true", SSTable components are removed once they are backed up to object storage. - conditionally removes SSTable components from the snapshot directory once they are successfully uploaded to the target location. This prevents re-uploading the same files and reduces disk usage. This change only "Refs" scylladb#20655, because, we can move further optimize the backup process, consider: - Sending HEAD requests to S3 to check for existing files before uploading. - Implementing support for resuming partially uploaded files. Fixes scylladb#21799 Refs scylladb#20655 Signed-off-by: Kefu Chai <[email protected]>
1 parent 32d2237 commit 8080658

File tree

8 files changed

+79
-9
lines changed

8 files changed

+79
-9
lines changed

api/api-doc/storage_service.json

+8
Original file line numberDiff line numberDiff line change
@@ -813,6 +813,14 @@
813813
"allowMultiple":false,
814814
"type":"string",
815815
"paramType":"query"
816+
},
817+
{
818+
"name":"move_files",
819+
"description":"Move component files instead of copying them",
820+
"required":false,
821+
"allowMultiple":false,
822+
"type":"boolean",
823+
"paramType":"query"
816824
}
817825
]
818826
}

api/storage_service.cc

+2-1
Original file line numberDiff line numberDiff line change
@@ -1799,13 +1799,14 @@ void set_snapshot(http_context& ctx, routes& r, sharded<db::snapshot_ctl>& snap_
17991799
auto bucket = req->get_query_param("bucket");
18001800
auto prefix = req->get_query_param("prefix");
18011801
auto snapshot_name = req->get_query_param("snapshot");
1802+
auto move_files = req_param<bool>(*req, "move_files", false);
18021803
if (snapshot_name.empty()) {
18031804
// TODO: If missing, snapshot should be taken by scylla, then removed
18041805
throw httpd::bad_param_exception("The snapshot name must be specified");
18051806
}
18061807

18071808
auto& ctl = snap_ctl.local();
1808-
auto task_id = co_await ctl.start_backup(std::move(endpoint), std::move(bucket), std::move(prefix), std::move(keyspace), std::move(table), std::move(snapshot_name));
1809+
auto task_id = co_await ctl.start_backup(std::move(endpoint), std::move(bucket), std::move(prefix), std::move(keyspace), std::move(table), std::move(snapshot_name), move_files);
18091810
co_return json::json_return_type(fmt::to_string(task_id));
18101811
});
18111812

db/snapshot-ctl.cc

+3-3
Original file line numberDiff line numberDiff line change
@@ -137,10 +137,10 @@ future<int64_t> snapshot_ctl::true_snapshots_size() {
137137
}));
138138
}
139139

140-
future<tasks::task_id> snapshot_ctl::start_backup(sstring endpoint, sstring bucket, sstring prefix, sstring keyspace, sstring table, sstring snapshot_name) {
140+
future<tasks::task_id> snapshot_ctl::start_backup(sstring endpoint, sstring bucket, sstring prefix, sstring keyspace, sstring table, sstring snapshot_name, bool move_files) {
141141
if (this_shard_id() != 0) {
142142
co_return co_await container().invoke_on(0, [&](auto& local) {
143-
return local.start_backup(endpoint, bucket, prefix, keyspace, table, snapshot_name);
143+
return local.start_backup(endpoint, bucket, prefix, keyspace, table, snapshot_name, move_files);
144144
});
145145
}
146146

@@ -175,7 +175,7 @@ future<tasks::task_id> snapshot_ctl::start_backup(sstring endpoint, sstring buck
175175
sstables::snapshots_dir /
176176
std::string_view(snapshot_name));
177177
auto task = co_await _task_manager_module->make_and_start_task<::db::snapshot::backup_task_impl>(
178-
{}, *this, std::move(cln), std::move(bucket), std::move(prefix), keyspace, dir);
178+
{}, *this, std::move(cln), std::move(bucket), std::move(prefix), keyspace, dir, move_files);
179179
co_return task->id();
180180
}
181181

db/snapshot-ctl.hh

+1-1
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ public:
105105
*/
106106
future<> clear_snapshot(sstring tag, std::vector<sstring> keyspace_names, sstring cf_name);
107107

108-
future<tasks::task_id> start_backup(sstring endpoint, sstring bucket, sstring prefix, sstring keyspace, sstring table, sstring snapshot_name);
108+
future<tasks::task_id> start_backup(sstring endpoint, sstring bucket, sstring prefix, sstring keyspace, sstring table, sstring snapshot_name, bool move_files);
109109

110110
future<std::unordered_map<sstring, db_snapshot_details>> get_snapshot_details();
111111

db/snapshot/backup_task.cc

+20-2
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,15 @@ backup_task_impl::backup_task_impl(tasks::task_manager::module_ptr module,
3030
sstring bucket,
3131
sstring prefix,
3232
sstring ks,
33-
std::filesystem::path snapshot_dir) noexcept
33+
std::filesystem::path snapshot_dir,
34+
bool move_files) noexcept
3435
: tasks::task_manager::task::impl(module, tasks::task_id::create_random_id(), 0, "node", ks, "", "", tasks::task_id::create_null_id())
3536
, _snap_ctl(ctl)
3637
, _client(std::move(client))
3738
, _bucket(std::move(bucket))
3839
, _prefix(std::move(prefix))
39-
, _snapshot_dir(std::move(snapshot_dir)) {
40+
, _snapshot_dir(std::move(snapshot_dir))
41+
, _remove_on_uploaded(move_files) {
4042
_status.progress_units = "bytes ('total' may grow along the way)";
4143
}
4244

@@ -79,6 +81,22 @@ future<> backup_task_impl::upload_component(sstring name) {
7981
snap_log.error("Error uploading {}: {}", component_name.native(), std::current_exception());
8082
throw;
8183
}
84+
85+
if (!_remove_on_uploaded) {
86+
co_return;
87+
}
88+
89+
// Delete the uploaded component to:
90+
// 1. Free up disk space immediately
91+
// 2. Avoid costly S3 existence checks on future backup attempts
92+
try {
93+
co_await remove_file(component_name.native());
94+
} catch (...) {
95+
// If deletion of an uploaded file fails, the backup process will continue.
96+
// While this doesn't halt the backup, it may indicate filesystem permissions
97+
// issues or system constraints that should be investigated.
98+
snap_log.warn("Failed to remove {}: {}", component_name, std::current_exception());
99+
}
82100
}
83101

84102
future<> backup_task_impl::do_backup() {

db/snapshot/backup_task.hh

+3-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ class backup_task_impl : public tasks::task_manager::task::impl {
2424
sstring _bucket;
2525
sstring _prefix;
2626
std::filesystem::path _snapshot_dir;
27+
bool _remove_on_uploaded;
2728
s3::upload_progress _progress = {};
2829

2930
future<> do_backup();
@@ -39,7 +40,8 @@ public:
3940
sstring bucket,
4041
sstring prefix,
4142
sstring ks,
42-
std::filesystem::path snapshot_dir) noexcept;
43+
std::filesystem::path snapshot_dir,
44+
bool move_files) noexcept;
4345

4446
virtual std::string type() const override;
4547
virtual tasks::is_internal is_internal() const noexcept override;

test/object_store/test_backup.py

+34
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,40 @@ async def test_simple_backup(manager: ManagerClient, s3_server):
8989
assert len(res) == 1 and res[0][1].group(1) == 'strm'
9090

9191

92+
@pytest.mark.asyncio
93+
@pytest.mark.parametrize("move_files", [False, True])
94+
async def test_backup_move(manager: ManagerClient, s3_server, move_files):
95+
'''check that backing up a snapshot by _moving_ sstable to object storage'''
96+
97+
cfg = {'enable_user_defined_functions': False,
98+
'object_storage_config_file': str(s3_server.config_file),
99+
'experimental_features': ['keyspace-storage-options'],
100+
'task_ttl_in_seconds': 300
101+
}
102+
cmd = ['--logger-log-level', 'snapshots=trace:task_manager=trace']
103+
server = await manager.server_add(config=cfg, cmdline=cmd)
104+
ks, cf = await prepare_snapshot_for_backup(manager, server)
105+
106+
workdir = await manager.server_get_workdir(server.server_id)
107+
cf_dir = os.listdir(f'{workdir}/data/{ks}')[0]
108+
files = set(os.listdir(f'{workdir}/data/{ks}/{cf_dir}/snapshots/backup'))
109+
assert len(files) > 0
110+
111+
print('Backup snapshot')
112+
prefix = f'{cf}/backup'
113+
tid = await manager.api.backup(server.ip_addr, ks, cf, 'backup', s3_server.address, s3_server.bucket_name, prefix,
114+
move_files=move_files)
115+
print(f'Started task {tid}')
116+
status = await manager.api.get_task_status(server.ip_addr, tid)
117+
print(f'Status: {status}, waiting to finish')
118+
status = await manager.api.wait_task(server.ip_addr, tid)
119+
assert (status is not None) and (status['state'] == 'done')
120+
assert (status['progress_total'] > 0) and (status['progress_completed'] == status['progress_total'])
121+
122+
# all components in the "backup" snapshot should have been moved into bucket if move_files
123+
assert len(os.listdir(f'{workdir}/data/{ks}/{cf_dir}/snapshots/backup')) == 0 if move_files else len(files)
124+
125+
92126
@pytest.mark.asyncio
93127
async def test_backup_to_non_existent_bucket(manager: ManagerClient, s3_server):
94128
'''backup should fail if the destination bucket does not exist'''

test/pylib/rest_client.py

+8-1
Original file line numberDiff line numberDiff line change
@@ -318,14 +318,21 @@ async def flush_all_keyspaces(self, node_ip: str) -> None:
318318
"""Flush all keyspaces"""
319319
await self.client.post(f"/storage_service/flush", host=node_ip)
320320

321-
async def backup(self, node_ip: str, ks: str, table: str, tag: str, dest: str, bucket: str, prefix: str) -> str:
321+
async def backup(self, node_ip: str, ks: str, table: str, tag: str, dest: str, bucket: str, prefix: str, **kwargs) -> str:
322322
"""Backup keyspace's snapshot"""
323323
params = {"keyspace": ks,
324324
"table": table,
325325
"endpoint": dest,
326326
"bucket": bucket,
327327
"prefix": prefix,
328328
"snapshot": tag}
329+
# add optional args. for instance, "move_files".
330+
for key, value in kwargs.items():
331+
if isinstance(value, bool):
332+
params[key] = 'true' if value else 'false'
333+
else:
334+
assert any(isinstance(value, t) for t in (str, int, float))
335+
params[key] = value
329336
return await self.client.post_json(f"/storage_service/backup", host=node_ip, params=params)
330337

331338
async def restore(self, node_ip: str, ks: str, cf: str, dest: str, bucket: str, prefix: str, sstables: list[str], scope: str = None) -> str:

0 commit comments

Comments
 (0)