Skip to content

Commit 356ea6e

Browse files
authored
Fix memory leak in thread-local VM & plugin caches (#357)
* Fix memory leak in thread-local VM & plugin caches Since VM and plugin thread-local cache keys include volatile parts (namely VM configuration, code and plugin configuration), their reconfiguration/update (e.g. with Envoy ADS protocol) might lead to memory leak by leaving those thread-local map stale keys behind. The current cleanup method is insufficient, as it accounts only for cache hit case. Signed-off-by: Maxim Philippov <[email protected]> * Use key queues for a bounded stale entries cleanup in local caches Signed-off-by: Maxim Philippov <[email protected]> * Linter fixes Signed-off-by: Maxim Philippov <[email protected]> * Fix code formatting Signed-off-by: Maxim Philippov <[email protected]> * Addressed code style comments Signed-off-by: Maxim Philippov <[email protected]> * Made getThreadLocalWasm more similar to other funcs Signed-off-by: Maxim Philippov <[email protected]> * Erase stale cache entries whenever we can Signed-off-by: Maxim Philippov <[email protected]> * Fix casing in comment Signed-off-by: Maxim Philippov <[email protected]> --------- Signed-off-by: Maxim Philippov <[email protected]>
1 parent 25daf5e commit 356ea6e

File tree

1 file changed

+59
-14
lines changed

1 file changed

+59
-14
lines changed

src/wasm.cc

+59-14
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include <limits>
2424
#include <memory>
2525
#include <mutex>
26+
#include <queue>
2627
#include <string>
2728
#include <unordered_map>
2829
#include <utility>
@@ -36,13 +37,56 @@ namespace proxy_wasm {
3637

3738
namespace {
3839

39-
// Map from Wasm Key to the local Wasm instance.
40+
// Map from Wasm key to the thread-local Wasm instance.
4041
thread_local std::unordered_map<std::string, std::weak_ptr<WasmHandleBase>> local_wasms;
42+
// Wasm key queue to track stale entries in `local_wasms`.
43+
thread_local std::queue<std::string> local_wasms_keys;
44+
45+
// Map from plugin key to the thread-local plugin instance.
4146
thread_local std::unordered_map<std::string, std::weak_ptr<PluginHandleBase>> local_plugins;
47+
// Plugin key queue to track stale entries in `local_plugins`.
48+
thread_local std::queue<std::string> local_plugins_keys;
49+
50+
// Check no more than `MAX_LOCAL_CACHE_GC_CHUNK_SIZE` cache entries at a time during stale entries
51+
// cleanup.
52+
const size_t MAX_LOCAL_CACHE_GC_CHUNK_SIZE = 64;
53+
4254
// Map from Wasm Key to the base Wasm instance, using a pointer to avoid the initialization fiasco.
4355
std::mutex base_wasms_mutex;
4456
std::unordered_map<std::string, std::weak_ptr<WasmHandleBase>> *base_wasms = nullptr;
4557

58+
void cacheLocalWasm(const std::string &key, const std::shared_ptr<WasmHandleBase> &wasm_handle) {
59+
local_wasms[key] = wasm_handle;
60+
local_wasms_keys.emplace(key);
61+
}
62+
63+
void cacheLocalPlugin(const std::string &key,
64+
const std::shared_ptr<PluginHandleBase> &plugin_handle) {
65+
local_plugins[key] = plugin_handle;
66+
local_plugins_keys.emplace(key);
67+
}
68+
69+
template <class T>
70+
void removeStaleLocalCacheEntries(std::unordered_map<std::string, std::weak_ptr<T>> &cache,
71+
std::queue<std::string> &keys) {
72+
auto num_keys_to_check = std::min(MAX_LOCAL_CACHE_GC_CHUNK_SIZE, keys.size());
73+
for (size_t i = 0; i < num_keys_to_check; i++) {
74+
std::string key(keys.front());
75+
keys.pop();
76+
77+
const auto it = cache.find(key);
78+
if (it == cache.end()) {
79+
continue;
80+
}
81+
82+
if (it->second.expired()) {
83+
cache.erase(it);
84+
} else {
85+
keys.push(std::move(key));
86+
}
87+
}
88+
}
89+
4690
} // namespace
4791

4892
std::string makeVmKey(std::string_view vm_id, std::string_view vm_configuration,
@@ -525,14 +569,15 @@ std::shared_ptr<WasmHandleBase> createWasm(const std::string &vm_key, const std:
525569

526570
std::shared_ptr<WasmHandleBase> getThreadLocalWasm(std::string_view vm_key) {
527571
auto it = local_wasms.find(std::string(vm_key));
528-
if (it == local_wasms.end()) {
529-
return nullptr;
530-
}
531-
auto wasm = it->second.lock();
532-
if (!wasm) {
533-
local_wasms.erase(std::string(vm_key));
572+
if (it != local_wasms.end()) {
573+
auto wasm = it->second.lock();
574+
if (wasm) {
575+
return wasm;
576+
}
577+
local_wasms.erase(it);
534578
}
535-
return wasm;
579+
removeStaleLocalCacheEntries(local_wasms, local_wasms_keys);
580+
return nullptr;
536581
}
537582

538583
static std::shared_ptr<WasmHandleBase>
@@ -546,9 +591,9 @@ getOrCreateThreadLocalWasm(const std::shared_ptr<WasmHandleBase> &base_handle,
546591
if (wasm_handle) {
547592
return wasm_handle;
548593
}
549-
// Remove stale entry.
550-
local_wasms.erase(vm_key);
594+
local_wasms.erase(it);
551595
}
596+
removeStaleLocalCacheEntries(local_wasms, local_wasms_keys);
552597
// Create and initialize new thread-local WasmVM.
553598
auto wasm_handle = clone_factory(base_handle);
554599
if (!wasm_handle) {
@@ -560,7 +605,7 @@ getOrCreateThreadLocalWasm(const std::shared_ptr<WasmHandleBase> &base_handle,
560605
base_handle->wasm()->fail(FailState::UnableToInitializeCode, "Failed to initialize Wasm code");
561606
return nullptr;
562607
}
563-
local_wasms[vm_key] = wasm_handle;
608+
cacheLocalWasm(vm_key, wasm_handle);
564609
wasm_handle->wasm()->wasm_vm()->addFailCallback([vm_key](proxy_wasm::FailState fail_state) {
565610
if (fail_state == proxy_wasm::FailState::RuntimeError) {
566611
// If VM failed, erase the entry so that:
@@ -583,9 +628,9 @@ std::shared_ptr<PluginHandleBase> getOrCreateThreadLocalPlugin(
583628
if (plugin_handle) {
584629
return plugin_handle;
585630
}
586-
// Remove stale entry.
587-
local_plugins.erase(key);
631+
local_plugins.erase(it);
588632
}
633+
removeStaleLocalCacheEntries(local_plugins, local_plugins_keys);
589634
// Get thread-local WasmVM.
590635
auto wasm_handle = getOrCreateThreadLocalWasm(base_handle, clone_factory);
591636
if (!wasm_handle) {
@@ -603,7 +648,7 @@ std::shared_ptr<PluginHandleBase> getOrCreateThreadLocalPlugin(
603648
return nullptr;
604649
}
605650
auto plugin_handle = plugin_factory(wasm_handle, plugin);
606-
local_plugins[key] = plugin_handle;
651+
cacheLocalPlugin(key, plugin_handle);
607652
wasm_handle->wasm()->wasm_vm()->addFailCallback([key](proxy_wasm::FailState fail_state) {
608653
if (fail_state == proxy_wasm::FailState::RuntimeError) {
609654
// If VM failed, erase the entry so that:

0 commit comments

Comments
 (0)