diff --git a/src/VecSim/algorithms/hnsw/hnsw.h b/src/VecSim/algorithms/hnsw/hnsw.h index 096dcc985..8358434f5 100644 --- a/src/VecSim/algorithms/hnsw/hnsw.h +++ b/src/VecSim/algorithms/hnsw/hnsw.h @@ -50,6 +50,7 @@ using graphNodeType = pair; // represented as: (element_id, leve ////////////////////////////////////// Auxiliary HNSW structs ////////////////////////////////////// +using elem_mutex_t = std::shared_mutex; // Vectors flags (for marking a specific vector) typedef enum { DELETE_MARK = 0x1, // element is logically deleted, but still exists in the graph @@ -74,8 +75,8 @@ struct ElementMetaData { ElementMetaData(labelType label = SIZE_MAX) noexcept : label(label), flags(IN_PROCESS) {} }; -#pragma pack() // restore default packing +#pragma pack() // restore default packing struct LevelData { vecsim_stl::vector *incomingEdges; linkListSize numLinks; @@ -94,7 +95,7 @@ struct LevelData { struct ElementGraphData { size_t toplevel; - std::mutex neighborsGuard; + elem_mutex_t neighborsGuard; LevelData *others; LevelData level0; @@ -293,10 +294,14 @@ class HNSWIndex : public VecSimIndexAbstract, inline auto safeGetEntryPointState() const; inline void lockIndexDataGuard() const; inline void unlockIndexDataGuard() const; - inline void lockNodeLinks(idType node_id) const; - inline void unlockNodeLinks(idType node_id) const; - inline void lockNodeLinks(ElementGraphData *node_data) const; - inline void unlockNodeLinks(ElementGraphData *node_data) const; + inline void writeLockNodeLinks(idType node_id) const; + inline void writeUnlockNodeLinks(idType node_id) const; + inline void writeLockNodeLinks(ElementGraphData *node_data) const; + inline void writeUnlockNodeLinks(ElementGraphData *node_data) const; + inline void readLockNodeLinks(idType node_id) const; + inline void readUnlockNodeLinks(idType node_id) const; + inline void readLockNodeLinks(ElementGraphData *node_data) const; + inline void readUnlockNodeLinks(ElementGraphData *node_data) const; inline VisitedNodesHandler *getVisitedList() const; inline void returnVisitedList(VisitedNodesHandler *visited_nodes_handler) const; VecSimIndexInfo info() const override; @@ -502,24 +507,46 @@ void HNSWIndex::unlockIndexDataGuard() const { indexDataGuard.unlock(); } +/////////////// WRITE LOCKS ///////////////// template -void HNSWIndex::lockNodeLinks(ElementGraphData *node_data) const { +void HNSWIndex::writeLockNodeLinks(ElementGraphData *node_data) const { node_data->neighborsGuard.lock(); } template -void HNSWIndex::unlockNodeLinks(ElementGraphData *node_data) const { +void HNSWIndex::writeUnlockNodeLinks(ElementGraphData *node_data) const { node_data->neighborsGuard.unlock(); } template -void HNSWIndex::lockNodeLinks(idType node_id) const { - lockNodeLinks(getGraphDataByInternalId(node_id)); +void HNSWIndex::writeLockNodeLinks(idType node_id) const { + writeLockNodeLinks(getGraphDataByInternalId(node_id)); +} + +template +void HNSWIndex::writeUnlockNodeLinks(idType node_id) const { + writeUnlockNodeLinks(getGraphDataByInternalId(node_id)); +} + +/////////////// READ LOCKS ///////////////// +template +void HNSWIndex::readLockNodeLinks(ElementGraphData *node_data) const { + node_data->neighborsGuard.lock_shared(); +} + +template +void HNSWIndex::readUnlockNodeLinks(ElementGraphData *node_data) const { + node_data->neighborsGuard.unlock_shared(); +} + +template +void HNSWIndex::readLockNodeLinks(idType node_id) const { + readLockNodeLinks(getGraphDataByInternalId(node_id)); } template -void HNSWIndex::unlockNodeLinks(idType node_id) const { - unlockNodeLinks(getGraphDataByInternalId(node_id)); +void HNSWIndex::readUnlockNodeLinks(idType node_id) const { + readUnlockNodeLinks(getGraphDataByInternalId(node_id)); } /** @@ -579,7 +606,7 @@ void HNSWIndex::processCandidate( candidatesMaxHeap &candidate_set, DistType &lowerBound) const { ElementGraphData *cur_element = getGraphDataByInternalId(curNodeId); - lockNodeLinks(cur_element); + readLockNodeLinks(cur_element); LevelData &node_level = getLevelData(cur_element, layer); if (node_level.numLinks > 0) { @@ -653,7 +680,7 @@ void HNSWIndex::processCandidate( } } } - unlockNodeLinks(cur_element); + readUnlockNodeLinks(cur_element); } template @@ -664,7 +691,7 @@ void HNSWIndex::processCandidate_RangeSearch( candidatesMaxHeap &candidate_set, DistType dyn_range, DistType radius) const { auto *cur_element = getGraphDataByInternalId(curNodeId); - lockNodeLinks(cur_element); + readLockNodeLinks(cur_element); LevelData &node_level = getLevelData(cur_element, layer); if (node_level.numLinks > 0) { @@ -719,7 +746,7 @@ void HNSWIndex::processCandidate_RangeSearch( } } } - unlockNodeLinks(cur_element); + readUnlockNodeLinks(cur_element); } template @@ -865,8 +892,8 @@ void HNSWIndex::revisitNeighborConnections( // Acquire all relevant locks for making the updates for the selected neighbor - all its removed // neighbors, along with the neighbors itself and the cur node. // but first, we release the node and neighbors lock to avoid deadlocks. - unlockNodeLinks(new_node_id); - unlockNodeLinks(selected_neighbor); + writeUnlockNodeLinks(new_node_id); + writeUnlockNodeLinks(selected_neighbor); nodes_to_update.push_back(selected_neighbor); nodes_to_update.push_back(new_node_id); @@ -874,7 +901,7 @@ void HNSWIndex::revisitNeighborConnections( std::sort(nodes_to_update.begin(), nodes_to_update.end()); size_t nodes_to_update_count = nodes_to_update.size(); for (size_t i = 0; i < nodes_to_update_count; i++) { - lockNodeLinks(nodes_to_update[i]); + writeLockNodeLinks(nodes_to_update[i]); } size_t neighbour_neighbours_idx = 0; bool update_cur_node_required = true; @@ -923,7 +950,7 @@ void HNSWIndex::revisitNeighborConnections( // Done updating the neighbor's neighbors. neighbor_level.numLinks = neighbour_neighbours_idx; for (size_t i = 0; i < nodes_to_update_count; i++) { - unlockNodeLinks(nodes_to_update[i]); + writeUnlockNodeLinks(nodes_to_update[i]); } } @@ -959,11 +986,11 @@ idType HNSWIndex::mutuallyConnectNewElement( idType selected_neighbor = neighbor_data.second; // neighbor's id auto *neighbor_graph_data = getGraphDataByInternalId(selected_neighbor); if (new_node_id < selected_neighbor) { - lockNodeLinks(new_node_level); - lockNodeLinks(neighbor_graph_data); + writeLockNodeLinks(new_node_level); + writeLockNodeLinks(neighbor_graph_data); } else { - lockNodeLinks(neighbor_graph_data); - lockNodeLinks(new_node_level); + writeLockNodeLinks(neighbor_graph_data); + writeLockNodeLinks(new_node_level); } // validations... @@ -975,15 +1002,15 @@ idType HNSWIndex::mutuallyConnectNewElement( if (new_node_level_data.numLinks == max_M_cur) { // The new node cannot add more neighbors this->log("Couldn't add all chosen neighbors upon inserting a new node"); - unlockNodeLinks(new_node_level); - unlockNodeLinks(neighbor_graph_data); + writeUnlockNodeLinks(new_node_level); + writeUnlockNodeLinks(neighbor_graph_data); break; } // If one of the two nodes has already deleted - skip the operation. if (isMarkedDeleted(new_node_id) || isMarkedDeleted(selected_neighbor)) { - unlockNodeLinks(new_node_level); - unlockNodeLinks(neighbor_graph_data); + writeUnlockNodeLinks(new_node_level); + writeUnlockNodeLinks(neighbor_graph_data); continue; } @@ -994,8 +1021,8 @@ idType HNSWIndex::mutuallyConnectNewElement( if (neighbor_level_data.numLinks < max_M_cur) { new_node_level_data.links[new_node_level_data.numLinks++] = selected_neighbor; neighbor_level_data.links[neighbor_level_data.numLinks++] = new_node_id; - unlockNodeLinks(new_node_level); - unlockNodeLinks(neighbor_graph_data); + writeUnlockNodeLinks(new_node_level); + writeUnlockNodeLinks(neighbor_graph_data); continue; } @@ -1105,7 +1132,7 @@ void HNSWIndex::replaceEntryPoint() { volatile idType candidate_in_process = INVALID_ID; // Go over the entry point's neighbors at the top level. - lockNodeLinks(old_entry_point); + readLockNodeLinks(old_entry_point); LevelData &old_ep_level = getLevelData(old_entry_point, maxLevel); // Tries to set the (arbitrary) first neighbor as the entry point which is not deleted, // if exists. @@ -1113,7 +1140,7 @@ void HNSWIndex::replaceEntryPoint() { if (!isMarkedDeleted(old_ep_level.links[i])) { if (!isInProcess(old_ep_level.links[i])) { entrypointNode = old_ep_level.links[i]; - unlockNodeLinks(old_entry_point); + readUnlockNodeLinks(old_entry_point); return; } else { // Store this candidate which is currently being inserted into the graph in @@ -1122,7 +1149,7 @@ void HNSWIndex::replaceEntryPoint() { } } } - unlockNodeLinks(old_entry_point); + readUnlockNodeLinks(old_entry_point); // If there is no neighbors in the current level, check for any vector at // this level to be the new entry point. @@ -1273,7 +1300,7 @@ void HNSWIndex::greedySearchLevel(const void *vector_data, s changed = false; auto *element = getGraphDataByInternalId(bestCand); - lockNodeLinks(element); + readLockNodeLinks(element); LevelData &node_level_data = getLevelData(element, level); for (int i = 0; i < node_level_data.numLinks; i++) { @@ -1295,7 +1322,7 @@ void HNSWIndex::greedySearchLevel(const void *vector_data, s } } } - unlockNodeLinks(element); + readUnlockNodeLinks(element); } while (changed); if (!running_query) { bestCand = bestNonDeletedCand; @@ -1311,18 +1338,18 @@ HNSWIndex::safeCollectAllNodeIncomingNeighbors(idType node_i for (size_t level = 0; level <= element->toplevel; level++) { // Save the node neighbor's in the current level while holding its neighbors lock. std::vector neighbors_copy; - lockNodeLinks(element); + readLockNodeLinks(element); auto &node_level_data = getLevelData(element, level); // Store the deleted element's neighbours. neighbors_copy.assign(node_level_data.links, node_level_data.links + node_level_data.numLinks); - unlockNodeLinks(element); + readUnlockNodeLinks(element); // Go over the neighbours and collect tho ones that also points back to the removed node. for (auto neighbour_id : neighbors_copy) { // Hold the neighbor's lock while we are going over its neighbors. auto *neighbor = getGraphDataByInternalId(neighbour_id); - lockNodeLinks(neighbor); + readLockNodeLinks(neighbor); LevelData &neighbour_level_data = getLevelData(neighbor, level); for (size_t j = 0; j < neighbour_level_data.numLinks; j++) { @@ -1332,16 +1359,16 @@ HNSWIndex::safeCollectAllNodeIncomingNeighbors(idType node_i break; } } - unlockNodeLinks(neighbor); + readUnlockNodeLinks(neighbor); } // Next, collect the rest of incoming edges (the ones that are not bidirectional) in the // current level to repair them. - lockNodeLinks(element); + readLockNodeLinks(element); for (auto incoming_edge : *node_level_data.incomingEdges) { incoming_neighbors.emplace_back(incoming_edge, (ushort)level); } - unlockNodeLinks(element); + readUnlockNodeLinks(element); } return incoming_neighbors; } @@ -1402,7 +1429,7 @@ void HNSWIndex::mutuallyUpdateForRepairedNode( std::sort(nodes_to_update.begin(), nodes_to_update.end()); size_t nodes_to_update_count = nodes_to_update.size(); for (size_t i = 0; i < nodes_to_update_count; i++) { - lockNodeLinks(nodes_to_update[i]); + writeLockNodeLinks(nodes_to_update[i]); } LevelData &node_level = getLevelData(node_id, level); @@ -1477,7 +1504,7 @@ void HNSWIndex::mutuallyUpdateForRepairedNode( // Done updating the node's neighbors. node_level.numLinks = node_neighbors_idx; for (size_t i = 0; i < nodes_to_update_count; i++) { - unlockNodeLinks(nodes_to_update[i]); + writeUnlockNodeLinks(nodes_to_update[i]); } } @@ -1499,7 +1526,7 @@ void HNSWIndex::repairNodeConnections(idType node_id, size_t // after the repair as well. const void *node_data = getDataByInternalId(node_id); auto *element = getGraphDataByInternalId(node_id); - lockNodeLinks(element); + readLockNodeLinks(element); LevelData &node_level_data = getLevelData(element, level); for (size_t j = 0; j < node_level_data.numLinks; j++) { node_orig_neighbours_set[node_level_data.links[j]] = true; @@ -1513,7 +1540,7 @@ void HNSWIndex::repairNodeConnections(idType node_id, size_t this->distFunc(node_data, getDataByInternalId(node_level_data.links[j]), this->dim), node_level_data.links[j]); } - unlockNodeLinks(element); + readUnlockNodeLinks(element); // If there are not deleted neighbors at that point the repair job has already been made by // another parallel job, and there is no need to repair the node anymore. @@ -1534,7 +1561,7 @@ void HNSWIndex::repairNodeConnections(idType node_id, size_t neighbors_to_remove.push_back(deleted_neighbor_id); auto *neighbor = getGraphDataByInternalId(deleted_neighbor_id); - lockNodeLinks(neighbor); + readLockNodeLinks(neighbor); LevelData &neighbor_level_data = getLevelData(neighbor, level); for (size_t j = 0; j < neighbor_level_data.numLinks; j++) { @@ -1551,7 +1578,7 @@ void HNSWIndex::repairNodeConnections(idType node_id, size_t this->dim), neighbor_level_data.links[j]); } - unlockNodeLinks(neighbor); + readUnlockNodeLinks(neighbor); } // Copy the original candidates, and run the heuristics. Afterwards, neighbors_candidates will diff --git a/src/VecSim/algorithms/hnsw/hnsw_batch_iterator.h b/src/VecSim/algorithms/hnsw/hnsw_batch_iterator.h index fa97dfbff..10ff396d8 100644 --- a/src/VecSim/algorithms/hnsw/hnsw_batch_iterator.h +++ b/src/VecSim/algorithms/hnsw/hnsw_batch_iterator.h @@ -118,7 +118,7 @@ VecSimQueryResult_Code HNSW_BatchIterator::scanGraphInternal // Take the current node out of the candidates queue and go over his neighbours. candidates.pop(); auto *node_graph_data = this->index->getGraphDataByInternalId(curr_node_id); - this->index->lockNodeLinks(node_graph_data); + this->index->readLockNodeLinks(node_graph_data); LevelData &node_level_data = this->index->getLevelData(node_graph_data, 0); if (node_level_data.numLinks > 0) { @@ -159,7 +159,7 @@ VecSimQueryResult_Code HNSW_BatchIterator::scanGraphInternal candidates.emplace(candidate_dist, candidate_id); } } - this->index->unlockNodeLinks(curr_node_id); + this->index->readUnlockNodeLinks(curr_node_id); } return VecSim_QueryResult_OK; }