From b29136fdab1696e69bef6b05abdc2fb3ec0b05c8 Mon Sep 17 00:00:00 2001 From: GuyAv46 <47632673+GuyAv46@users.noreply.github.com> Date: Mon, 24 Apr 2023 18:17:28 +0300 Subject: [PATCH 01/12] Tiered HNSW - batch iterator [MOD-4323] (#350) * small modification to bf batch iterator * remove promise of perfect score in HNSW multi batch * implement batch iterator for tiered and some needed helpers * fix for merge results * make the iterator a nested class, fix and modify logic * added first unit test * some fixes and more tests * another test * first overlapping vector tests * fix a bug on reallocation * added an edge cases test * added comments * added `BY_SCORE_THEN_ID` order and sorter * make BF batch iterator use it in select-base search * modification to the BI to handle resize while alive, and use BY_SCORE_THEN_ID * added dynamic parallel test * move iterator from generic vec_sim_tiered to hnsw_tiered * leak fixes * fix clang build * minor test refactor * review fixes * decrease index size * move some array logic to arr_cpp.h * after rebase fixes * review fixes --- src/VecSim/algorithms/hnsw/hnsw_tiered.h | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/VecSim/algorithms/hnsw/hnsw_tiered.h b/src/VecSim/algorithms/hnsw/hnsw_tiered.h index 79d550b68..34e9adbeb 100644 --- a/src/VecSim/algorithms/hnsw/hnsw_tiered.h +++ b/src/VecSim/algorithms/hnsw/hnsw_tiered.h @@ -181,11 +181,7 @@ class TieredHNSWIndex : public VecSimTieredIndex { VecSimBatchIterator *newBatchIterator(const void *queryBlob, VecSimQueryParams *queryParams) const override { - size_t blobSize = this->backendIndex->getDim() * sizeof(DataType); - void *queryBlobCopy = this->allocator->allocate(blobSize); - memcpy(queryBlobCopy, queryBlob, blobSize); - return new (this->allocator) - TieredHNSW_BatchIterator(queryBlobCopy, this, queryParams, this->allocator); + return this->backendIndex->newBatchIterator(queryBlob, queryParams); } inline void setLastSearchMode(VecSearchMode mode) override { return this->backendIndex->setLastSearchMode(mode); From 8f8c21ded078e84e9d5971fa37010ba2f0fbe866 Mon Sep 17 00:00:00 2001 From: alon Date: Sun, 7 May 2023 12:33:32 +0300 Subject: [PATCH 02/12] Revert "Merge branch 'main' into feature_HNSW_tiered_index" This reverts commit fa7307e0288d225ccb5f8f6c85e60607e3ae3734, reversing changes made to b9a12a61be88dd01e93d589b4eb14df1bb5f7962. --- .../algorithms/brute_force/brute_force.h | 2 -- tests/unit/test_allocator.cpp | 4 ++-- tests/unit/test_bruteforce.cpp | 20 ++++++------------- 3 files changed, 8 insertions(+), 18 deletions(-) diff --git a/src/VecSim/algorithms/brute_force/brute_force.h b/src/VecSim/algorithms/brute_force/brute_force.h index d87b2b0e1..408df8ab1 100644 --- a/src/VecSim/algorithms/brute_force/brute_force.h +++ b/src/VecSim/algorithms/brute_force/brute_force.h @@ -161,7 +161,6 @@ void BruteForceIndex::appendVector(const void *vector_data, size_t last_block_vectors_count = id % this->blockSize; this->idToLabelMapping.resize( idToLabelMapping_size + this->blockSize - last_block_vectors_count, 0); - this->idToLabelMapping.shrink_to_fit(); } // add label to idToLabelMapping @@ -216,7 +215,6 @@ void BruteForceIndex::removeVector(idType id_to_delete) { if (this->count + this->blockSize <= idToLabel_size) { size_t vector_to_align_count = idToLabel_size % this->blockSize; this->idToLabelMapping.resize(idToLabel_size - this->blockSize - vector_to_align_count); - this->idToLabelMapping.shrink_to_fit(); } } } diff --git a/tests/unit/test_allocator.cpp b/tests/unit/test_allocator.cpp index 57b3b1319..fda1ae4f8 100644 --- a/tests/unit/test_allocator.cpp +++ b/tests/unit/test_allocator.cpp @@ -164,8 +164,8 @@ TYPED_TEST(IndexAllocatorTest, test_bf_index_block_size_1) { // collection allocate additional structures for their internal implementation. ASSERT_EQ(allocator->getAllocationSize(), expectedAllocationSize + deleteCommandAllocationDelta); - ASSERT_GE(expectedAllocationSize + expectedAllocationDelta, allocator->getAllocationSize()); - ASSERT_GE(expectedAllocationDelta, deleteCommandAllocationDelta); + ASSERT_LE(expectedAllocationSize + expectedAllocationDelta, allocator->getAllocationSize()); + ASSERT_LE(expectedAllocationDelta, deleteCommandAllocationDelta); info = bfIndex->info(); ASSERT_EQ(allocator->getAllocationSize(), info.commonInfo.memory); diff --git a/tests/unit/test_bruteforce.cpp b/tests/unit/test_bruteforce.cpp index 1b1b7801a..f7a7016f2 100644 --- a/tests/unit/test_bruteforce.cpp +++ b/tests/unit/test_bruteforce.cpp @@ -108,7 +108,7 @@ TYPED_TEST(BruteForceTest, brute_force_vector_update_test) { TYPED_TEST(BruteForceTest, resize_and_align_index) { size_t dim = 4; - size_t n = 14; + size_t n = 15; size_t blockSize = 10; BFParams params = { @@ -134,26 +134,24 @@ TYPED_TEST(BruteForceTest, resize_and_align_index) { // Add another vector, since index size equals to the capacity, this should cause resizing // (to fit a multiplication of block_size). - GenerateAndAddVector(index, dim, n); + GenerateAndAddVector(index, dim, n + 1); ASSERT_EQ(VecSimIndex_IndexSize(index), n + 1); // Check new capacity size, should be blockSize * 2. ASSERT_EQ(bf_index->idToLabelMapping.size(), 2 * blockSize); - ASSERT_EQ(bf_index->idToLabelMapping.capacity(), 2 * blockSize); - // Now size = n + 1 (= 15), capacity = 2 * bs (= 20). Test capacity overflow again - // to check that it stays aligned with block size. + // Now size = n + 1 = 16, capacity = 2* bs = 20. Test capacity overflow again + // to check that it stays aligned with blocksize. size_t add_vectors_count = 8; for (size_t i = 0; i < add_vectors_count; i++) { GenerateAndAddVector(index, dim, n + 2 + i, i); } - // Size should be n + 1 + 8 (= 25). + // Size should be n + 1 + 8 = 24. ASSERT_EQ(VecSimIndex_IndexSize(index), n + 1 + add_vectors_count); // Check new capacity size, should be blockSize * 3. ASSERT_EQ(bf_index->idToLabelMapping.size(), 3 * blockSize); - ASSERT_EQ(bf_index->idToLabelMapping.capacity(), 3 * blockSize); VecSimIndex_Free(index); } @@ -172,7 +170,7 @@ TYPED_TEST(BruteForceTest, resize_and_align_index_largeInitialCapacity) { BruteForceIndex *bf_index = this->CastToBF(index); ASSERT_EQ(VecSimIndex_IndexSize(index), 0); - // Add up to block size + 1 = 3 + 1 = 4 + // add up to blocksize + 1 = 3 + 1 = 4 for (size_t i = 0; i < bs + 1; i++) { GenerateAndAddVector(index, dim, i, i); } @@ -192,7 +190,6 @@ TYPED_TEST(BruteForceTest, resize_and_align_index_largeInitialCapacity) { // 10 - 3 - 10 % 3 (1) = 6 idToLabelMapping_size = bf_index->idToLabelMapping.size(); ASSERT_EQ(idToLabelMapping_size, n - bs - n % bs); - ASSERT_EQ(idToLabelMapping_size, bf_index->idToLabelMapping.capacity()); // Delete all the vectors to decrease idToLabelMapping size by another bs. size_t i = 0; @@ -201,24 +198,19 @@ TYPED_TEST(BruteForceTest, resize_and_align_index_largeInitialCapacity) { ++i; } ASSERT_EQ(bf_index->idToLabelMapping.size(), bs); - ASSERT_EQ(bf_index->idToLabelMapping.capacity(), bs); - // Add and delete a vector to achieve: // size % block_size == 0 && size + bs <= idToLabelMapping_size(3). // idToLabelMapping_size should be resized to zero. GenerateAndAddVector(index, dim, 0); VecSimIndex_DeleteVector(index, 0); ASSERT_EQ(bf_index->idToLabelMapping.size(), 0); - ASSERT_EQ(bf_index->idToLabelMapping.capacity(), 0); // Do it again. This time after adding a vector idToLabelMapping_size is increased by bs. // Upon deletion it will be resized to zero again. GenerateAndAddVector(index, dim, 0); ASSERT_EQ(bf_index->idToLabelMapping.size(), bs); - ASSERT_EQ(bf_index->idToLabelMapping.capacity(), bs); VecSimIndex_DeleteVector(index, 0); ASSERT_EQ(bf_index->idToLabelMapping.size(), 0); - ASSERT_EQ(bf_index->idToLabelMapping.capacity(), 0); VecSimIndex_Free(index); } From f315f70275a0cf574c3493343b15fcdd25636acb Mon Sep 17 00:00:00 2001 From: GuyAv46 <47632673+GuyAv46@users.noreply.github.com> Date: Sun, 7 May 2023 13:49:07 +0300 Subject: [PATCH 03/12] Implement `rangeQuery` for VecSimTieredIndex - [MOD-5164] (#360) * implemented `rangeQuery` for VecSimTieredIndex, ... including needed utility functions * renaming `merge_results.h` and moving `filter_results` to it * fix build * first test and some fixes * improved test and added a parallel test * fix a bug where we safely get (from `safeGetEntryPoint`) the old entry point but then we get the new max level when trying to search using the old one * fix tests * Update comments * review fixes * after rebase fixes * added a general comment on tiered index's guarantees --- src/VecSim/vec_sim_tiered_index.h | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/src/VecSim/vec_sim_tiered_index.h b/src/VecSim/vec_sim_tiered_index.h index d6023a9f2..4f56530a9 100644 --- a/src/VecSim/vec_sim_tiered_index.h +++ b/src/VecSim/vec_sim_tiered_index.h @@ -85,6 +85,25 @@ class VecSimTieredIndex : public VecSimIndexInterface { : this->frontendIndex->preferAdHocSearch(subsetSize, k, initial_check); } + virtual inline int64_t getAllocationSize() const override { + return this->allocator->getAllocationSize() + this->backendIndex->getAllocationSize() + + this->frontendIndex->getAllocationSize(); + } + + virtual VecSimIndexInfo info() const override; + virtual VecSimInfoIterator *infoIterator() const override; + + VecSimQueryResult_List rangeQuery(const void *queryBlob, double radius, + VecSimQueryParams *queryParams, + VecSimQueryResult_Order order) override; + + bool preferAdHocSearch(size_t subsetSize, size_t k, bool initial_check) override { + // For now, decide according to the bigger index. + return this->backendIndex->indexSize() > this->frontendIndex->indexSize() + ? this->backendIndex->preferAdHocSearch(subsetSize, k, initial_check) + : this->frontendIndex->preferAdHocSearch(subsetSize, k, initial_check); + } + // Return the current state of the global write mode (async/in-place). static VecSimWriteMode getWriteMode() { return VecSimIndexInterface::asyncWriteMode; } From 9abd6b40be43d73a0d3354d68c99e01ee49c2ec3 Mon Sep 17 00:00:00 2001 From: alonre24 Date: Sun, 7 May 2023 18:56:29 +0300 Subject: [PATCH 04/12] let the capacity of idToLabel in flat index be tight to the size. This requires changing a bit the logic of updating jobs id after removing a vector from flat buffer and swap ids. (#367) --- .../algorithms/brute_force/brute_force.h | 2 ++ tests/unit/test_allocator.cpp | 4 ++-- tests/unit/test_bruteforce.cpp | 20 +++++++++++++------ 3 files changed, 18 insertions(+), 8 deletions(-) diff --git a/src/VecSim/algorithms/brute_force/brute_force.h b/src/VecSim/algorithms/brute_force/brute_force.h index 408df8ab1..d87b2b0e1 100644 --- a/src/VecSim/algorithms/brute_force/brute_force.h +++ b/src/VecSim/algorithms/brute_force/brute_force.h @@ -161,6 +161,7 @@ void BruteForceIndex::appendVector(const void *vector_data, size_t last_block_vectors_count = id % this->blockSize; this->idToLabelMapping.resize( idToLabelMapping_size + this->blockSize - last_block_vectors_count, 0); + this->idToLabelMapping.shrink_to_fit(); } // add label to idToLabelMapping @@ -215,6 +216,7 @@ void BruteForceIndex::removeVector(idType id_to_delete) { if (this->count + this->blockSize <= idToLabel_size) { size_t vector_to_align_count = idToLabel_size % this->blockSize; this->idToLabelMapping.resize(idToLabel_size - this->blockSize - vector_to_align_count); + this->idToLabelMapping.shrink_to_fit(); } } } diff --git a/tests/unit/test_allocator.cpp b/tests/unit/test_allocator.cpp index fda1ae4f8..57b3b1319 100644 --- a/tests/unit/test_allocator.cpp +++ b/tests/unit/test_allocator.cpp @@ -164,8 +164,8 @@ TYPED_TEST(IndexAllocatorTest, test_bf_index_block_size_1) { // collection allocate additional structures for their internal implementation. ASSERT_EQ(allocator->getAllocationSize(), expectedAllocationSize + deleteCommandAllocationDelta); - ASSERT_LE(expectedAllocationSize + expectedAllocationDelta, allocator->getAllocationSize()); - ASSERT_LE(expectedAllocationDelta, deleteCommandAllocationDelta); + ASSERT_GE(expectedAllocationSize + expectedAllocationDelta, allocator->getAllocationSize()); + ASSERT_GE(expectedAllocationDelta, deleteCommandAllocationDelta); info = bfIndex->info(); ASSERT_EQ(allocator->getAllocationSize(), info.commonInfo.memory); diff --git a/tests/unit/test_bruteforce.cpp b/tests/unit/test_bruteforce.cpp index f7a7016f2..1b1b7801a 100644 --- a/tests/unit/test_bruteforce.cpp +++ b/tests/unit/test_bruteforce.cpp @@ -108,7 +108,7 @@ TYPED_TEST(BruteForceTest, brute_force_vector_update_test) { TYPED_TEST(BruteForceTest, resize_and_align_index) { size_t dim = 4; - size_t n = 15; + size_t n = 14; size_t blockSize = 10; BFParams params = { @@ -134,24 +134,26 @@ TYPED_TEST(BruteForceTest, resize_and_align_index) { // Add another vector, since index size equals to the capacity, this should cause resizing // (to fit a multiplication of block_size). - GenerateAndAddVector(index, dim, n + 1); + GenerateAndAddVector(index, dim, n); ASSERT_EQ(VecSimIndex_IndexSize(index), n + 1); // Check new capacity size, should be blockSize * 2. ASSERT_EQ(bf_index->idToLabelMapping.size(), 2 * blockSize); + ASSERT_EQ(bf_index->idToLabelMapping.capacity(), 2 * blockSize); - // Now size = n + 1 = 16, capacity = 2* bs = 20. Test capacity overflow again - // to check that it stays aligned with blocksize. + // Now size = n + 1 (= 15), capacity = 2 * bs (= 20). Test capacity overflow again + // to check that it stays aligned with block size. size_t add_vectors_count = 8; for (size_t i = 0; i < add_vectors_count; i++) { GenerateAndAddVector(index, dim, n + 2 + i, i); } - // Size should be n + 1 + 8 = 24. + // Size should be n + 1 + 8 (= 25). ASSERT_EQ(VecSimIndex_IndexSize(index), n + 1 + add_vectors_count); // Check new capacity size, should be blockSize * 3. ASSERT_EQ(bf_index->idToLabelMapping.size(), 3 * blockSize); + ASSERT_EQ(bf_index->idToLabelMapping.capacity(), 3 * blockSize); VecSimIndex_Free(index); } @@ -170,7 +172,7 @@ TYPED_TEST(BruteForceTest, resize_and_align_index_largeInitialCapacity) { BruteForceIndex *bf_index = this->CastToBF(index); ASSERT_EQ(VecSimIndex_IndexSize(index), 0); - // add up to blocksize + 1 = 3 + 1 = 4 + // Add up to block size + 1 = 3 + 1 = 4 for (size_t i = 0; i < bs + 1; i++) { GenerateAndAddVector(index, dim, i, i); } @@ -190,6 +192,7 @@ TYPED_TEST(BruteForceTest, resize_and_align_index_largeInitialCapacity) { // 10 - 3 - 10 % 3 (1) = 6 idToLabelMapping_size = bf_index->idToLabelMapping.size(); ASSERT_EQ(idToLabelMapping_size, n - bs - n % bs); + ASSERT_EQ(idToLabelMapping_size, bf_index->idToLabelMapping.capacity()); // Delete all the vectors to decrease idToLabelMapping size by another bs. size_t i = 0; @@ -198,19 +201,24 @@ TYPED_TEST(BruteForceTest, resize_and_align_index_largeInitialCapacity) { ++i; } ASSERT_EQ(bf_index->idToLabelMapping.size(), bs); + ASSERT_EQ(bf_index->idToLabelMapping.capacity(), bs); + // Add and delete a vector to achieve: // size % block_size == 0 && size + bs <= idToLabelMapping_size(3). // idToLabelMapping_size should be resized to zero. GenerateAndAddVector(index, dim, 0); VecSimIndex_DeleteVector(index, 0); ASSERT_EQ(bf_index->idToLabelMapping.size(), 0); + ASSERT_EQ(bf_index->idToLabelMapping.capacity(), 0); // Do it again. This time after adding a vector idToLabelMapping_size is increased by bs. // Upon deletion it will be resized to zero again. GenerateAndAddVector(index, dim, 0); ASSERT_EQ(bf_index->idToLabelMapping.size(), bs); + ASSERT_EQ(bf_index->idToLabelMapping.capacity(), bs); VecSimIndex_DeleteVector(index, 0); ASSERT_EQ(bf_index->idToLabelMapping.size(), 0); + ASSERT_EQ(bf_index->idToLabelMapping.capacity(), 0); VecSimIndex_Free(index); } From bfead9b476422b8e6d25d750db74a8eac6d434d2 Mon Sep 17 00:00:00 2001 From: meiravgri Date: Sun, 7 May 2023 05:20:11 +0000 Subject: [PATCH 05/12] flow tests of tiered with dbpedia to run bm (including flat buffer limit) --- src/python_bindings/bindings.cpp | 117 +++++++++ src/python_bindings/tiered_index_mock.h | 152 +++++++++++ tests/flow/common.py | 18 ++ tests/flow/test_bm_hnsw_tiered_dataset.py | 291 ++++++++++++++++++++++ 4 files changed, 578 insertions(+) create mode 100644 src/python_bindings/tiered_index_mock.h create mode 100644 tests/flow/test_bm_hnsw_tiered_dataset.py diff --git a/src/python_bindings/bindings.cpp b/src/python_bindings/bindings.cpp index c2ab25d80..dd4b841f1 100644 --- a/src/python_bindings/bindings.cpp +++ b/src/python_bindings/bindings.cpp @@ -16,8 +16,10 @@ #include #include #include +#include "tiered_index_mock.h" namespace py = pybind11; +using namespace tiered_index_mock; // Helper function that iterates query results and wrap them in python numpy object - // a tuple of two 2D arrays: (labels, distances) @@ -174,6 +176,13 @@ class PyVecSimIndex { size_t indexSize() { return VecSimIndex_IndexSize(index.get()); } + size_t indexMemory() { return this->index->getAllocationSize(); } + + double getGetDistanceFrom(size_t id, const py::object &input) { + py::array query(input); + return this->index->getDistanceFrom(id, (const char *)query.data(0)); + } + PyBatchIterator createBatchIterator(const py::object &input, VecSimQueryParams *query_params) { py::array query(input); return PyBatchIterator( @@ -360,6 +369,94 @@ class PyHNSWLibIndex : public PyVecSimIndex { } }; +class PyTIEREDIndex : public PyVecSimIndex { + +protected: + JobQueue jobQueue; // External queue that holds the jobs. + IndexExtCtx jobQueueCtx; // External context to be sent to the submit callback. + SubmitCB submitCb; // A callback that submits an array of jobs into a given jobQueue. + size_t memoryCtx; // External context that stores the index memory consumption. + UpdateMemoryCB UpdateMemCb; // A callback that updates the memoryCtx + // with a given memory (number). + size_t flatBufferLimit; // Maximum size allowed for the flat buffer. If flat buffer is full, use + // in-place insertion. + bool run_thread; + std::bitset executions_status; + + TieredIndexParams TieredIndexParams_Init() { + TieredIndexParams ret = { + .jobQueue = &this->jobQueue, + .jobQueueCtx = &this->jobQueueCtx, + .submitCb = this->submitCb, + .memoryCtx = &this->memoryCtx, + .UpdateMemCb = this->UpdateMemCb, + .flatBufferLimit = this->flatBufferLimit, + }; + + return ret; + } + +public: + explicit PyTIEREDIndex(size_t BufferLimit = 20000000) + : submitCb(submit_callback), memoryCtx(0), UpdateMemCb(update_mem_callback), flatBufferLimit(BufferLimit), + run_thread(true) { + + for (size_t i = 0; i < THREAD_POOL_SIZE; i++) { + ThreadParams params(run_thread, executions_status, i, jobQueue); + thread_pool.emplace_back(thread_main_loop, params); + } + } + + virtual ~PyTIEREDIndex() = 0; + + void WaitForIndex(size_t waiting_duration = 10) { + bool keep_wating = true; + while (keep_wating) { + std::this_thread::sleep_for(std::chrono::milliseconds(waiting_duration)); + std::unique_lock lock(queue_guard); + if (jobQueue.empty()) { + while (true) { + if (executions_status.count() == 0) { + keep_wating = false; + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(waiting_duration)); + } + } + } + } + + + static size_t GetThreadsNum() { return THREAD_POOL_SIZE; } + + size_t getBufferLimit() {return flatBufferLimit; } +}; + +PyTIEREDIndex::~PyTIEREDIndex() { thread_pool_terminate(jobQueue, run_thread); } +class PyTIERED_HNSWIndex : public PyTIEREDIndex { +public: + explicit PyTIERED_HNSWIndex(const HNSWParams &hnsw_params, + const TieredHNSWParams &tiered_hnsw_params) { + + // Create primaryIndexParams and specific params for hnsw tiered index. + VecSimParams primary_index_params = {.algo = VecSimAlgo_HNSWLIB, .hnswParams = hnsw_params}; + + // create TieredIndexParams + TieredIndexParams tiered_params = TieredIndexParams_Init(); + + tiered_params.primaryIndexParams = &primary_index_params; + tiered_params.specificParams.tieredHnswParams = tiered_hnsw_params; + + // create VecSimParams for TieredIndexParams + VecSimParams params = {.algo = VecSimAlgo_TIERED, .tieredParams = tiered_params}; + + this->index = std::shared_ptr(VecSimIndex_New(¶ms), VecSimIndex_Free); + // Set the created tiered index in the index external context. + this->jobQueueCtx.index_strong_ref = this->index; + } + size_t HNSWLabelCount() { return this->index->info().hnswInfo.indexLabelCount; } +}; + class PyBFIndex : public PyVecSimIndex { public: explicit PyBFIndex(const BFParams &bf_params) { @@ -413,6 +510,10 @@ PYBIND11_MODULE(VecSim, m) { .def_readwrite("initialCapacity", &BFParams::initialCapacity) .def_readwrite("blockSize", &BFParams::blockSize); + py::class_(m, "TieredHNSWParams") + .def(py::init()) + .def_readwrite("swapJobThreshold", &TieredHNSWParams::swapJobThreshold); + py::class_(m, "VecSimParams") .def(py::init()) .def_readwrite("algo", &VecSimParams::algo) @@ -439,8 +540,11 @@ PYBIND11_MODULE(VecSim, m) { .def("range_query", &PyVecSimIndex::range, py::arg("vector"), py::arg("radius"), py::arg("query_param") = nullptr) .def("index_size", &PyVecSimIndex::indexSize) + .def("index_memory", &PyVecSimIndex::indexMemory) .def("create_batch_iterator", &PyVecSimIndex::createBatchIterator, py::arg("query_blob"), py::arg("query_param") = nullptr) + .def("get_distance_from", &PyVecSimIndex::getGetDistanceFrom, py::arg("label"), + py::arg("blob")) .def("get_vector", &PyVecSimIndex::getVector); py::class_(m, "HNSWIndex") @@ -460,6 +564,19 @@ PYBIND11_MODULE(VecSim, m) { .def("range_parallel", &PyHNSWLibIndex::searchRangeParallel, py::arg("queries"), py::arg("radius"), py::arg("query_param") = nullptr, py::arg("num_threads") = -1); + py::class_(m, "TIEREDIndex") + .def("wait_for_index", &PyTIERED_HNSWIndex::WaitForIndex, py::arg("waiting_duration") = 10) + .def("get_buffer_limit", &PyTIERED_HNSWIndex::getBufferLimit) + .def_static("get_threads_num", &PyTIEREDIndex::GetThreadsNum); + + py::class_(m, "TIERED_HNSWIndex") + .def( + py::init([](const HNSWParams &hnsw_params, const TieredHNSWParams &tiered_hnsw_params) { + return new PyTIERED_HNSWIndex(hnsw_params, tiered_hnsw_params); + }), + py::arg("hnsw_params"), py::arg("tiered_hnsw_params")) + .def("hnsw_label_count", &PyTIERED_HNSWIndex::HNSWLabelCount); + py::class_(m, "BFIndex") .def(py::init([](const BFParams ¶ms) { return new PyBFIndex(params); }), py::arg("params")); diff --git a/src/python_bindings/tiered_index_mock.h b/src/python_bindings/tiered_index_mock.h new file mode 100644 index 000000000..e6bdd695f --- /dev/null +++ b/src/python_bindings/tiered_index_mock.h @@ -0,0 +1,152 @@ + /* + *Copyright Redis Ltd. 2021 - present + *Licensed under your choice of the Redis Source Available License 2.0 (RSALv2) or + *the Server Side Public License v1 (SSPLv1). + */ + +#pragma once + +#include +#include +#include + +#include "VecSim/vec_sim.h" +#include "VecSim/algorithms/hnsw/hnsw_tiered.h" +#include "pybind11/pybind11.h" + +namespace tiered_index_mock { + +typedef struct RefManagedJob { + AsyncJob *job; + std::weak_ptr index_weak_ref; +} RefManagedJob; + +struct SearchJobMock : public AsyncJob { + void *query; // The query vector. ownership is passed to the job in the constructor. + size_t k; // The number of results to return. + size_t n; // The number of vectors in the index (might be useful for the mock) + size_t dim; // The dimension of the vectors in the index (might be useful for the mock) + std::atomic_int &successful_searches; // A reference to a shared counter that counts the number + // of successful searches. + SearchJobMock(std::shared_ptr allocator, JobCallback searchCB, + VecSimIndex *index_, void *query_, size_t k_, size_t n_, size_t dim_, + std::atomic_int &successful_searches_) + : AsyncJob(allocator, HNSW_SEARCH_JOB, searchCB, index_), query(query_), k(k_), n(n_), + dim(dim_), successful_searches(successful_searches_) {} + ~SearchJobMock() { this->allocator->free_allocation(query); } +}; + +using JobQueue = std::queue; +int submit_callback(void *job_queue, AsyncJob **jobs, size_t len, void *index_ctx); +int update_mem_callback(void *mem_ctx, size_t mem); + +typedef struct IndexExtCtx { + std::shared_ptr index_strong_ref; + ~IndexExtCtx() { std::cout << "ctx dtor" << std::endl; } +} IndexExtCtx; + +static const size_t MAX_POOL_SIZE = 16; +static const size_t THREAD_POOL_SIZE = MIN(MAX_POOL_SIZE, std::thread::hardware_concurrency()); +extern std::vector thread_pool; +extern std::mutex queue_guard; +extern std::condition_variable queue_cond; + +void thread_pool_terminate(JobQueue &jobQ, bool &run_thread); + +class ThreadParams { +public: + bool &run_thread; + std::bitset &executions_status; + const unsigned int thread_index; + JobQueue &jobQ; + ThreadParams(bool &run_thread, std::bitset &executions_status, + const unsigned int thread_index, JobQueue &jobQ) + : run_thread(run_thread), executions_status(executions_status), thread_index(thread_index), + jobQ(jobQ) {} + + ThreadParams(const ThreadParams &other) = default; +}; + +void inline MarkExecuteInProcess(std::bitset &executions_status, + size_t thread_index) { + executions_status.set(thread_index); +} + +void inline MarkExecuteDone(std::bitset &executions_status, size_t thread_index) { + executions_status.reset(thread_index); +} +void thread_main_loop(ThreadParams params) { + while (params.run_thread) { + std::unique_lock lock(queue_guard); + // Wake up and acquire the lock (atomically) ONLY if the job queue is not empty at that + // point, or if the thread should not run anymore (and quit in that case). + queue_cond.wait(lock, [¶ms]() { return !(params.jobQ.empty()) || !params.run_thread; }); + if (!params.run_thread) + return; + auto managed_job = params.jobQ.front(); + MarkExecuteInProcess(params.executions_status, params.thread_index); + params.jobQ.pop(); + + lock.unlock(); + // Upgrade the index weak reference to a strong ref while we run the job over the index. + if (auto temp_ref = managed_job.index_weak_ref.lock()) { + managed_job.job->Execute(managed_job.job); + MarkExecuteDone(params.executions_status, params.thread_index); + } + } +} + +/* + * Mock callbacks for testing async tiered index. We use a simple std::queue to simulate the job + * queue. + */ + +std::mutex queue_guard; +std::condition_variable queue_cond; +std::vector thread_pool; + +int submit_callback(void *job_queue, AsyncJob **jobs, size_t len, void *index_ctx) { + { + std::unique_lock lock(queue_guard); + for (size_t i = 0; i < len; i++) { + // Wrap the job with a struct that contains a weak reference to the related index. + auto owned_job = RefManagedJob{ + .job = jobs[i], + .index_weak_ref = reinterpret_cast(index_ctx)->index_strong_ref}; + static_cast(job_queue)->push(owned_job); + } + } + if (len == 1) { + queue_cond.notify_one(); + } else { + queue_cond.notify_all(); + } + return VecSim_OK; +} + +int update_mem_callback(void *mem_ctx, size_t mem) { + *(size_t *)mem_ctx = mem; + return VecSim_OK; +} + +// Main loop for background worker threads that execute the jobs form the job queue. +// run_thread uses as a signal to the thread that indicates whether it should keep running or +// stop and terminate the thread. + +void thread_pool_terminate(JobQueue &jobQ, bool &run_thread) { + // Check every 10 ms if queue is empty, and if so, terminate the threads loop. + while (true) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + std::unique_lock lock(queue_guard); + if (jobQ.empty()) { + run_thread = false; + queue_cond.notify_all(); + break; + } + } + for (size_t i = 0; i < THREAD_POOL_SIZE; i++) { + thread_pool[i].join(); + } + thread_pool.clear(); +} +} // namespace tiered_index_mock diff --git a/tests/flow/common.py b/tests/flow/common.py index e5cad51f7..d755d96c1 100644 --- a/tests/flow/common.py +++ b/tests/flow/common.py @@ -7,7 +7,23 @@ from scipy import spatial from numpy.testing import assert_allclose import time +import math +def create_hnsw_params(dim, num_elements, metric, data_type, ef_construction=200, m=16, ef_runtime=10, epsilon=0.01, + is_multi=False): + hnsw_params = HNSWParams() + + hnsw_params.dim = dim + hnsw_params.metric = metric + hnsw_params.type = data_type + hnsw_params.M = m + hnsw_params.efConstruction = ef_construction + hnsw_params.initialCapacity = num_elements + hnsw_params.efRuntime = ef_runtime + hnsw_params.epsilon = epsilon + hnsw_params.multi = is_multi + + return hnsw_params # Helper function for creating an index,uses the default HNSW parameters if not specified. def create_hnsw_index(dim, num_elements, metric, data_type, ef_construction=200, m=16, ef_runtime=10, epsilon=0.01, is_multi=False): @@ -24,3 +40,5 @@ def create_hnsw_index(dim, num_elements, metric, data_type, ef_construction=200, hnsw_params.multi = is_multi return HNSWIndex(hnsw_params) + + \ No newline at end of file diff --git a/tests/flow/test_bm_hnsw_tiered_dataset.py b/tests/flow/test_bm_hnsw_tiered_dataset.py new file mode 100644 index 000000000..4b821c796 --- /dev/null +++ b/tests/flow/test_bm_hnsw_tiered_dataset.py @@ -0,0 +1,291 @@ +# Copyright Redis Ltd. 2021 - present +# Licensed under your choice of the Redis Source Available License 2.0 (RSALv2) or +# the Server Side Public License v1 (SSPLv1). +import concurrent +import math +import multiprocessing +import os +import time +from common import * +import hnswlib +import h5py +from urllib.request import urlretrieve +import pickle +from enum import Enum + +class CreationMode(Enum): + ONLY_PARAMS = 1 + CREATE_TIERED_INDEX = 2 + +def download(src, dst): + if not os.path.exists(dst): + print('downloading %s -> %s...' % (src, dst)) + urlretrieve(src, dst) + +# Download dataset from s3, save the file locally +def get_data_set(dataset_name): + hdf5_filename = os.path.join('%s.hdf5' % dataset_name) + url = 'https://s3.amazonaws.com/benchmarks.redislabs/vecsim/dbpedia/dbpedia-768.hdf5' + download(url, hdf5_filename) + return h5py.File(hdf5_filename, 'r') + +def load_data(dataset_name): + data = 0 + + np_data_file_path = os.path.join('np_train_%s.npy' % dataset_name) + + try: + data = np.load(np_data_file_path, allow_pickle = True) + print(f"yay! loaded ") + except: + dataset = get_data_set(dataset_name) + data = np.array(dataset['train']) + np.save(np_data_file_path, data) + print(f"yay! generated") + + return data + +def load_queries(dataset_name): + queries = 0 + np_test_file_path = os.path.join('np_test_%s.npy' % dataset_name) + + try: + queries = np.load(np_test_file_path, allow_pickle = True) + print(f"yay! loaded ") + except: + hdf5_filename = os.path.join('%s.hdf5' % dataset_name) + dataset = h5py.File(hdf5_filename, 'r') + queries = np.array(dataset['test']) + np.save(np_test_file_path, queries) + print(f"yay! generated ") + + return queries + +# swap_job_threshold = 0 means use the default swap_job_threshold defined in hnsw_tiered.h +def create_tiered_hnsw_params(swap_job_threshold = 0): + tiered_hnsw_params = TieredHNSWParams() + tiered_hnsw_params.swapJobThreshold = swap_job_threshold + return tiered_hnsw_params + +class DBPediaIndexCtx: + def __init__(self, data_size = 0, initialCap = 0, M = 32, ef_c = 512, ef_r = 10, metric = VecSimMetric_Cosine, is_multi = False, data_type = VecSimType_FLOAT32, swap_job_threshold = 0, mode=CreationMode.ONLY_PARAMS): + self.M = M + self.efConstruction = ef_c + self.efRuntime = ef_r + + data = load_data("dbpedia-768") + self.num_elements = data_size if data_size != 0 else data.shape[0] + self.initialCap = initialCap if initialCap != 0 else 2 * self.num_elements + + self.data = data[:self.num_elements] + self.dim = len(self.data[0]) + self.metric = metric + self.type = data_type + self.is_multi = is_multi + + self.hnsw_params = create_hnsw_params(dim=self.dim, + num_elements=self.initialCap, + metric=self.metric, + data_type=self.type, + ef_construction=ef_c, + m=M, + ef_runtime=ef_r, + is_multi=self.is_multi) + self.tiered_hnsw_params = create_tiered_hnsw_params(swap_job_threshold) + + assert isinstance(mode, CreationMode) + if mode == CreationMode.CREATE_TIERED_INDEX: + self.tiered_index = TIERED_HNSWIndex(self.hnsw_params, self.tiered_hnsw_params) + + def create_tiered(self): + return TIERED_HNSWIndex(self.hnsw_params, self.tiered_hnsw_params) + + def create_hnsw(self): + return HNSWIndex(self.hnsw_params) + + def set_num_vectors_per_label(self, num_per_label = 1): + self.num_per_label = num_per_label + + def init_and_populate_flat_index(self): + bfparams = BFParams() + bfparams.initialCapacity = self.num_elements + bfparams.dim =self.dim + bfparams.type =self.type + bfparams.metric =self.metric + bfparams.multi = self.is_multi + self.flat_index = BFIndex(bfparams) + + for i, vector in enumerate(self.data): + for _ in range(self.num_per_label): + self.flat_index.add_vector(vector, i) + + return self.flat_index + + def init_and_populate_hnsw_index(self): + hnsw_index = HNSWIndex(self.hnsw_params) + + for i, vector in enumerate(self.data): + hnsw_index.add_vector(vector, i) + self.hnsw_index = hnsw_index + return hnsw_index + + def generate_random_vectors(self, num_vectors): + vectors = 0 + np_file_path = os.path.join(f'np_{num_vectors}vec_dim{self.dim}.npy') + + try: + vectors = np.load(np_file_path, allow_pickle = True) + print(f"yay! loaded ") + except: + rng = np.random.default_rng(seed=47) + vectors = np.float32(rng.random((num_vectors, self.dim))) + np.save(np_file_path, vectors) + print(f"yay! generated ") + + return vectors + + def insert_in_batch(self, index, data, data_first_idx, batch_size, first_label): + duration = 0 + data_last_idx = data_first_idx + batch_size + for i, vector in enumerate(data[data_first_idx:data_last_idx]): + label = i + first_label + start_add = time.time() + index.add_vector(vector, label) + duration += time.time() - start_add + end = time.time() + return (duration, end) + + +def create_dbpedia(): + indices_ctx = DBPediaIndexCtx() + + threads_num = TIEREDIndex.get_threads_num() + print(f"thread num = {threads_num}") + data = indices_ctx.data + num_elements = indices_ctx.num_elements + + def create_tiered(): + index = indices_ctx.create_tiered() + + print(f"Insert {num_elements} vectors to tiered index") + print(f"flat buffer limit = {index.get_buffer_limit()}") + start = time.time() + for i, vector in enumerate(data): + index.add_vector(vector, i) + bf_dur = time.time() - start + + print(f''' insert to bf took {bf_dur}, current hnsw size is {index.hnsw_label_count()}") + wait for index\n''') + index.wait_for_index() + dur = time.time() - start + + assert index.hnsw_label_count() == num_elements + + # Measure insertion to tiered index + + print(f"Insert {num_elements} vectors to tiered index took {dur} s") + + # Measure total memory of the tiered index + print(f"total memory of tiered index = {index.index_memory()/pow(10,9)} GB") + + print(f"Start tiered hnsw creation") + create_tiered() + +def create_dbpedia_graph(): + indices_ctx = DBPediaIndexCtx(data_size = 100000) + + threads_num = TIEREDIndex.get_threads_num() + print(f"thread num = {threads_num}") + dbpeida_data = indices_ctx.data + num_elements = indices_ctx.num_elements + + batches_num_per_ds = 10 + batch_size = int(num_elements / batches_num_per_ds) + + #generate 1M random vectors + # random_vectors = indices_ctx.generate_random_vectors(num_vectors=pow(10,6)) + def create_tiered(): + index = indices_ctx.create_tiered() + flat_buffer_limit = index.get_buffer_limit() + print(f"flat buffer limit = {flat_buffer_limit}") + assert flat_buffer_limit > batch_size + + #first insert dbpedia in batches + for batch in range(batches_num_per_ds): + print(f"Insert {batch_size} vectors from dbpedia to tiered index") + first_label = batch * batch_size + + #insert in batches of batch size + bf_time, start_wait = indices_ctx.insert_in_batch(index, dbpeida_data, data_first_idx= first_label, batch_size=batch_size, first_label = first_label) + print(f''' insert to bf took {bf_time}, current hnsw size is {index.hnsw_label_count()}") + wait for index\n''') + + # measure time until wait for index for each batch + index.wait_for_index() + dur = time.time() - start_wait + assert index.hnsw_label_count() == (batch + 1) * batch_size + total_time = bf_time + dur + print(f"Batch number {batch} : Insert {batch_size} vectors to tiered index took {total_time} s") + print(f"total memory of tiered index = {index.index_memory()/pow(10,9)} GB") + + + #Next insert the random vactors + for batch in range(batches_num_per_ds): + print(f"Insert {batch_size} random vectors to tiered index") + data_first_idx = batch * batch_size + first_label = num_elements + data_first_idx + + #insert in batches of batch size + bf_time, start_wait = indices_ctx.insert_in_batch(index, dbpeida_data, data_first_idx= data_first_idx, + batch_size=batch_size, + first_label = first_label) + print(f''' insert to bf took {bf_time}, current hnsw size is {index.hnsw_label_count()}") + wait for index\n''') + + # measure time until wait for index for each batch + index.wait_for_index() + dur = time.time() - start_wait + assert index.hnsw_label_count() == num_elements + (batch + 1 ) * batch_size + total_time = bf_time + dur + print(f"Batch number {batch} : Insert {batch_size} vectors to tiered index took {total_time} s") + print(f"total memory of tiered index = {index.index_memory()/pow(10,9)} GB") + + print(f"Start tiered hnsw creation") + create_tiered() + def create_hnsw(): + index = indices_ctx.create_hnsw() + + #first insert dbpedia in batches + for batch in range(batches_num_per_ds): + print(f"Insert {batch_size} vectors from dbpedia to sync hnsw index") + first_label = batch * batch_size + + #insert in batches of batch size + batch_time, _ = indices_ctx.insert_in_batch(index, dbpeida_data, data_first_idx= first_label, batch_size=batch_size, first_label = first_label) + + assert index.index_size() == (batch + 1) * batch_size + print(f"Batch number {batch} : Insert {batch_size} vectors to tiered index took {batch_time} s") + print(f"total memory of tiered index = {index.index_memory()/pow(10,9)} GB") + #first insert dbpedia in batches + for batch in range(batches_num_per_ds): + print(f"Insert {batch_size} vectors from dbpedia to sync hnsw index") + data_first_idx = batch * batch_size + first_label = num_elements + data_first_idx + + #insert in batches of batch size + batch_time, _ = indices_ctx.insert_in_batch(index, dbpeida_data, data_first_idx= data_first_idx, batch_size=batch_size, first_label = first_label) + + assert index.index_size() == num_elements + (batch + 1) * batch_size + print(f"Batch number {batch} : Insert {batch_size} vectors to tiered index took {batch_time} s") + print(f"total memory of tiered index = {index.index_memory()/pow(10,9)} GB") + # print(f"dbpedia vectors = {dbpeida_data[0:4].shape[0]}") + # print(f"vectors = {vectors[0]}") + print(f"Start hnsw creation") + + create_hnsw() + +def test_main(): + print("Test creation") + # create_dbpedia() + create_dbpedia_graph() + From c80f0264e05b8f38090c886fbcf291ec1c8993dc Mon Sep 17 00:00:00 2001 From: meiravgri Date: Mon, 8 May 2023 10:51:40 +0000 Subject: [PATCH 06/12] added knn log ctx --- src/VecSim/vec_sim_index.h | 11 +++ src/VecSim/vec_sim_interface.cpp | 8 ++ src/VecSim/vec_sim_interface.h | 4 + src/VecSim/vec_sim_tiered_index.h | 10 ++- src/python_bindings/bindings.cpp | 52 +++++++++++-- src/python_bindings/tiered_index_mock.h | 17 +--- tests/flow/common.py | 9 +++ tests/flow/test_bm_hnsw_tiered_dataset.py | 95 ++++++++++++++++++++--- 8 files changed, 170 insertions(+), 36 deletions(-) diff --git a/src/VecSim/vec_sim_index.h b/src/VecSim/vec_sim_index.h index f7aa3accb..920addf94 100644 --- a/src/VecSim/vec_sim_index.h +++ b/src/VecSim/vec_sim_index.h @@ -133,6 +133,15 @@ struct VecSimIndexAbstract : public VecSimIndexInterface { delete[] buf; } } +#ifdef BUILD_TESTS + // Set new log context to be sent to the log callback. + // Returns the previous logctx. + inline void *setLogCtx(void *new_logCtx) { + void *prev_logCtx = this->logCallbackCtx; + this->logCallbackCtx = new_logCtx; + return prev_logCtx; + } +#endif void addCommonInfoToIterator(VecSimInfoIterator *infoIterator, const CommonInfo &info) const { infoIterator->addInfoField(VecSim_InfoField{ @@ -219,4 +228,6 @@ struct VecSimIndexAbstract : public VecSimIndexInterface { return this->newBatchIterator(query_to_send, queryParams); } + + }; diff --git a/src/VecSim/vec_sim_interface.cpp b/src/VecSim/vec_sim_interface.cpp index b1953b023..1b953cc50 100644 --- a/src/VecSim/vec_sim_interface.cpp +++ b/src/VecSim/vec_sim_interface.cpp @@ -14,3 +14,11 @@ void Vecsim_Log(void *ctx, const char *message) { std::cout << message << std::e timeoutCallbackFunction VecSimIndexInterface::timeoutCallback = [](void *ctx) { return 0; }; logCallbackFunction VecSimIndexInterface::logCallback = Vecsim_Log; VecSimWriteMode VecSimIndexInterface::asyncWriteMode = VecSim_WriteAsync; + +#ifdef BUILD_TESTS +static inline void Vecsim_Log_DO_NOTHING(void *ctx, const char *message) {} + +void VecSimIndexInterface::resetLogCallbackFunction() { + VecSimIndexInterface::logCallback = Vecsim_Log_DO_NOTHING; +} +#endif diff --git a/src/VecSim/vec_sim_interface.h b/src/VecSim/vec_sim_interface.h index c2f1e28d3..659ceb1bb 100644 --- a/src/VecSim/vec_sim_interface.h +++ b/src/VecSim/vec_sim_interface.h @@ -226,6 +226,10 @@ struct VecSimIndexInterface : public VecsimBaseObject { VecSimIndexInterface::logCallback = callback; } +#ifdef BUILD_TESTS + static void resetLogCallbackFunction(); +#endif + /** * @brief Allow 3rd party to set the write mode for tiered index - async insert/delete using * background jobs, or insert/delete inplace. diff --git a/src/VecSim/vec_sim_tiered_index.h b/src/VecSim/vec_sim_tiered_index.h index 4f56530a9..a05c53731 100644 --- a/src/VecSim/vec_sim_tiered_index.h +++ b/src/VecSim/vec_sim_tiered_index.h @@ -106,7 +106,9 @@ class VecSimTieredIndex : public VecSimIndexInterface { // Return the current state of the global write mode (async/in-place). static VecSimWriteMode getWriteMode() { return VecSimIndexInterface::asyncWriteMode; } - +#ifdef BUILD_TESTS + inline VecSimIndexAbstract *getFlatbufferIndex() { return this->frontendIndex; } +#endif private: virtual int addVectorWrapper(const void *blob, labelType label, void *auxiliaryCtx) override { // Will be used only if a processing stage is needed @@ -141,6 +143,8 @@ class VecSimTieredIndex : public VecSimIndexInterface { return this->newBatchIterator(query_to_send, queryParams); } + + }; template @@ -148,7 +152,9 @@ VecSimQueryResult_List VecSimTieredIndex::topKQuery(const void *queryBlob, size_t k, VecSimQueryParams *queryParams) { this->flatIndexGuard.lock_shared(); - +#ifdef BUILD_TESTS + this->getFlatbufferIndex()->log(""); +#endif // If the flat buffer is empty, we can simply query the main index. if (this->frontendIndex->indexSize() == 0) { // Release the flat lock and acquire the main lock. diff --git a/src/python_bindings/bindings.cpp b/src/python_bindings/bindings.cpp index dd4b841f1..8a21801e2 100644 --- a/src/python_bindings/bindings.cpp +++ b/src/python_bindings/bindings.cpp @@ -369,7 +369,19 @@ class PyHNSWLibIndex : public PyVecSimIndex { } }; +template +struct KNNLogCtx { + VecSimIndexAbstract *flat_index; + size_t curr_flat_size; + KNNLogCtx() : flat_index(nullptr), curr_flat_size(0) {} +}; + class PyTIEREDIndex : public PyVecSimIndex { +private: + VecSimIndexAbstract *getFlatBuffer() { + return reinterpret_cast *>(this->index.get()) + ->getFlatbufferIndex(); + } protected: JobQueue jobQueue; // External queue that holds the jobs. @@ -383,6 +395,8 @@ class PyTIEREDIndex : public PyVecSimIndex { bool run_thread; std::bitset executions_status; + KNNLogCtx knnLogCtx; + TieredIndexParams TieredIndexParams_Init() { TieredIndexParams ret = { .jobQueue = &this->jobQueue, @@ -397,14 +411,16 @@ class PyTIEREDIndex : public PyVecSimIndex { } public: - explicit PyTIEREDIndex(size_t BufferLimit = 20000000) - : submitCb(submit_callback), memoryCtx(0), UpdateMemCb(update_mem_callback), flatBufferLimit(BufferLimit), - run_thread(true) { + explicit PyTIEREDIndex(size_t BufferLimit = 1000) + : submitCb(submit_callback), memoryCtx(0), UpdateMemCb(update_mem_callback), + flatBufferLimit(BufferLimit), run_thread(true) { for (size_t i = 0; i < THREAD_POOL_SIZE; i++) { ThreadParams params(run_thread, executions_status, i, jobQueue); thread_pool.emplace_back(thread_main_loop, params); } + + ResetLogCB(); } virtual ~PyTIEREDIndex() = 0; @@ -426,13 +442,34 @@ class PyTIEREDIndex : public PyVecSimIndex { } } + static void log_flat_buffer_size(void *ctx, const char *msg) { + auto *knnLogCtx = reinterpret_cast *>(ctx); + knnLogCtx->curr_flat_size = knnLogCtx->flat_index->indexLabelCount(); + } + void SetKNNLogCtx() { + knnLogCtx.flat_index = getFlatBuffer(); + knnLogCtx.curr_flat_size = 0; + knnLogCtx.flat_index->setLogCtx(&knnLogCtx); + this->index->setLogCallbackFunction(log_flat_buffer_size); + } + size_t getFlatIndexSize(const char *mode = "None") { + if (!strcmp(mode, "insert_and_knn")) { + return knnLogCtx.curr_flat_size; + } + return getFlatBuffer()->indexLabelCount(); + } + + void ResetLogCB() { this->index->resetLogCallbackFunction(); } static size_t GetThreadsNum() { return THREAD_POOL_SIZE; } - size_t getBufferLimit() {return flatBufferLimit; } + size_t getBufferLimit() { return flatBufferLimit; } }; -PyTIEREDIndex::~PyTIEREDIndex() { thread_pool_terminate(jobQueue, run_thread); } +PyTIEREDIndex::~PyTIEREDIndex() { + thread_pool_terminate(jobQueue, run_thread); + ResetLogCB(); +} class PyTIERED_HNSWIndex : public PyTIEREDIndex { public: explicit PyTIERED_HNSWIndex(const HNSWParams &hnsw_params, @@ -566,8 +603,11 @@ PYBIND11_MODULE(VecSim, m) { py::class_(m, "TIEREDIndex") .def("wait_for_index", &PyTIERED_HNSWIndex::WaitForIndex, py::arg("waiting_duration") = 10) + .def("get_curr_bf_size", &PyTIERED_HNSWIndex::getFlatIndexSize, py::arg("mode") = "None") .def("get_buffer_limit", &PyTIERED_HNSWIndex::getBufferLimit) - .def_static("get_threads_num", &PyTIEREDIndex::GetThreadsNum); + .def_static("get_threads_num", &PyTIEREDIndex::GetThreadsNum) + .def("reset_log", &PyTIERED_HNSWIndex::ResetLogCB) + .def("start_knn_log", &PyTIERED_HNSWIndex::SetKNNLogCtx); py::class_(m, "TIERED_HNSWIndex") .def( diff --git a/src/python_bindings/tiered_index_mock.h b/src/python_bindings/tiered_index_mock.h index e6bdd695f..594bc7896 100644 --- a/src/python_bindings/tiered_index_mock.h +++ b/src/python_bindings/tiered_index_mock.h @@ -1,4 +1,4 @@ - /* +/* *Copyright Redis Ltd. 2021 - present *Licensed under your choice of the Redis Source Available License 2.0 (RSALv2) or *the Server Side Public License v1 (SSPLv1). @@ -21,21 +21,6 @@ typedef struct RefManagedJob { std::weak_ptr index_weak_ref; } RefManagedJob; -struct SearchJobMock : public AsyncJob { - void *query; // The query vector. ownership is passed to the job in the constructor. - size_t k; // The number of results to return. - size_t n; // The number of vectors in the index (might be useful for the mock) - size_t dim; // The dimension of the vectors in the index (might be useful for the mock) - std::atomic_int &successful_searches; // A reference to a shared counter that counts the number - // of successful searches. - SearchJobMock(std::shared_ptr allocator, JobCallback searchCB, - VecSimIndex *index_, void *query_, size_t k_, size_t n_, size_t dim_, - std::atomic_int &successful_searches_) - : AsyncJob(allocator, HNSW_SEARCH_JOB, searchCB, index_), query(query_), k(k_), n(n_), - dim(dim_), successful_searches(successful_searches_) {} - ~SearchJobMock() { this->allocator->free_allocation(query); } -}; - using JobQueue = std::queue; int submit_callback(void *job_queue, AsyncJob **jobs, size_t len, void *index_ctx); int update_mem_callback(void *mem_ctx, size_t mem); diff --git a/tests/flow/common.py b/tests/flow/common.py index d755d96c1..3d95fc387 100644 --- a/tests/flow/common.py +++ b/tests/flow/common.py @@ -41,4 +41,13 @@ def create_hnsw_index(dim, num_elements, metric, data_type, ef_construction=200, return HNSWIndex(hnsw_params) +def bytes_to_mega(bytes, ndigits = 3): + return round(bytes/pow(10,6), ndigits) + +def round_(f_value, ndigits = 2): + return round(f_value, ndigits) + + +def round_ms(f_value, ndigits = 2): + return round(f_value * 1000, ndigits) \ No newline at end of file diff --git a/tests/flow/test_bm_hnsw_tiered_dataset.py b/tests/flow/test_bm_hnsw_tiered_dataset.py index 4b821c796..ce96ab918 100644 --- a/tests/flow/test_bm_hnsw_tiered_dataset.py +++ b/tests/flow/test_bm_hnsw_tiered_dataset.py @@ -75,18 +75,19 @@ def __init__(self, data_size = 0, initialCap = 0, M = 32, ef_c = 512, ef_r = 10, data = load_data("dbpedia-768") self.num_elements = data_size if data_size != 0 else data.shape[0] - self.initialCap = initialCap if initialCap != 0 else 2 * self.num_elements + #self.initialCap = initialCap if initialCap != 0 else 2 * self.num_elements + self.initialCap = initialCap if initialCap != 0 else self.num_elements self.data = data[:self.num_elements] self.dim = len(self.data[0]) self.metric = metric - self.type = data_type + self.data_type = data_type self.is_multi = is_multi self.hnsw_params = create_hnsw_params(dim=self.dim, num_elements=self.initialCap, metric=self.metric, - data_type=self.type, + data_type=self.data_type, ef_construction=ef_c, m=M, ef_runtime=ef_r, @@ -102,22 +103,18 @@ def create_tiered(self): def create_hnsw(self): return HNSWIndex(self.hnsw_params) - - def set_num_vectors_per_label(self, num_per_label = 1): - self.num_per_label = num_per_label def init_and_populate_flat_index(self): bfparams = BFParams() bfparams.initialCapacity = self.num_elements bfparams.dim =self.dim - bfparams.type =self.type + bfparams.type =self.data_type bfparams.metric =self.metric bfparams.multi = self.is_multi self.flat_index = BFIndex(bfparams) for i, vector in enumerate(self.data): - for _ in range(self.num_per_label): - self.flat_index.add_vector(vector, i) + self.flat_index.add_vector(vector, i) return self.flat_index @@ -129,6 +126,16 @@ def init_and_populate_hnsw_index(self): self.hnsw_index = hnsw_index return hnsw_index + def populate_index(self, index): + start = time.time() + duration = 0 + for label, vector in enumerate(self.data): + start_add = time.time() + index.add_vector(vector, label) + duration += time.time() - start_add + end = time.time() + return (start, duration, end) + def generate_random_vectors(self, num_vectors): vectors = 0 np_file_path = os.path.join(f'np_{num_vectors}vec_dim{self.dim}.npy') @@ -154,7 +161,12 @@ def insert_in_batch(self, index, data, data_first_idx, batch_size, first_label): duration += time.time() - start_add end = time.time() return (duration, end) + + def generate_queries(self, num_queries): + self.rng = np.random.default_rng(seed=47) + queries = self.rng.random((num_queries, self.dim)) + return np.float32(queries) if self.data_type == VecSimType_FLOAT32 else queries def create_dbpedia(): indices_ctx = DBPediaIndexCtx() @@ -192,7 +204,7 @@ def create_tiered(): create_tiered() def create_dbpedia_graph(): - indices_ctx = DBPediaIndexCtx(data_size = 100000) + indices_ctx = DBPediaIndexCtx() threads_num = TIEREDIndex.get_threads_num() print(f"thread num = {threads_num}") @@ -283,9 +295,68 @@ def create_hnsw(): print(f"Start hnsw creation") create_hnsw() + +def search_insert(is_multi: bool, num_per_label = 1): + indices_ctx = DBPediaIndexCtx(data_size=1000, mode=CreationMode.CREATE_TIERED_INDEX, is_multi=is_multi) + index = indices_ctx.tiered_index + + num_elements = indices_ctx.num_elements + + query_data = indices_ctx.generate_queries(num_queries=1) + + # Add vectors to the flat index. + bf_index = indices_ctx.init_and_populate_flat_index() + + # Start background insertion to the tiered index. + index_start, _, _ = indices_ctx.populate_index(index) + + correct = 0 + k = 10 + searches_number = 0 + + # config knn log + index.start_knn_log() + + # run knn query every 1 s. + total_tiered_search_time = 0 + prev_bf_size = num_elements + while index.hnsw_label_count() < num_elements: + # For each run get the current hnsw size and the query time. + bf_curr_size = index.get_curr_bf_size(mode = 'insert_and_knn') + query_start = time.time() + tiered_labels, _ = index.knn_query(query_data, k) + query_dur = time.time() - query_start + total_tiered_search_time += query_dur + + print(f"query time = {round_ms(query_dur)} ms") + + # BF size should decrease. + print(f"bf size = {bf_curr_size}") + assert bf_curr_size < prev_bf_size + + # Run the query also in the bf index to get the ground truth results. + bf_labels, _ = bf_index.knn_query(query_data, k) + correct += len(np.intersect1d(tiered_labels[0], bf_labels[0])) + time.sleep(1) + searches_number += 1 + prev_bf_size = bf_curr_size + + index.reset_log() + + # HNSW labels count updates before the job is done, so we need to wait for the queue to be empty. + index.wait_for_index(1) + index_dur = time.time() - index_start + print(f"indexing during search in tiered took {round_(index_dur)} s") + + # Measure recall. + recall = float(correct)/(k*searches_number) + print("Average recall is:", round_(recall, 3)) + print("tiered query per seconds: ", round_(searches_number/total_tiered_search_time)) def test_main(): print("Test creation") - # create_dbpedia() - create_dbpedia_graph() + create_dbpedia() + # create_dbpedia_graph() + print(f"\nStart insert & search test") + # search_insert(is_multi=False) From 8adef94aa225b7169f34f13eae3a3c0ce74c6ea7 Mon Sep 17 00:00:00 2001 From: meiravgri Date: Mon, 8 May 2023 14:53:11 +0000 Subject: [PATCH 07/12] rebase fixes --- src/VecSim/vec_sim_tiered_index.h | 19 ------------------- src/python_bindings/bindings.cpp | 2 +- tests/flow/test_bm_hnsw_tiered_dataset.py | 4 ++-- 3 files changed, 3 insertions(+), 22 deletions(-) diff --git a/src/VecSim/vec_sim_tiered_index.h b/src/VecSim/vec_sim_tiered_index.h index a05c53731..74bedd449 100644 --- a/src/VecSim/vec_sim_tiered_index.h +++ b/src/VecSim/vec_sim_tiered_index.h @@ -85,25 +85,6 @@ class VecSimTieredIndex : public VecSimIndexInterface { : this->frontendIndex->preferAdHocSearch(subsetSize, k, initial_check); } - virtual inline int64_t getAllocationSize() const override { - return this->allocator->getAllocationSize() + this->backendIndex->getAllocationSize() + - this->frontendIndex->getAllocationSize(); - } - - virtual VecSimIndexInfo info() const override; - virtual VecSimInfoIterator *infoIterator() const override; - - VecSimQueryResult_List rangeQuery(const void *queryBlob, double radius, - VecSimQueryParams *queryParams, - VecSimQueryResult_Order order) override; - - bool preferAdHocSearch(size_t subsetSize, size_t k, bool initial_check) override { - // For now, decide according to the bigger index. - return this->backendIndex->indexSize() > this->frontendIndex->indexSize() - ? this->backendIndex->preferAdHocSearch(subsetSize, k, initial_check) - : this->frontendIndex->preferAdHocSearch(subsetSize, k, initial_check); - } - // Return the current state of the global write mode (async/in-place). static VecSimWriteMode getWriteMode() { return VecSimIndexInterface::asyncWriteMode; } #ifdef BUILD_TESTS diff --git a/src/python_bindings/bindings.cpp b/src/python_bindings/bindings.cpp index 8a21801e2..aa97fde10 100644 --- a/src/python_bindings/bindings.cpp +++ b/src/python_bindings/bindings.cpp @@ -491,7 +491,7 @@ class PyTIERED_HNSWIndex : public PyTIEREDIndex { // Set the created tiered index in the index external context. this->jobQueueCtx.index_strong_ref = this->index; } - size_t HNSWLabelCount() { return this->index->info().hnswInfo.indexLabelCount; } + size_t HNSWLabelCount() { return this->index->info().commonInfo.indexLabelCount; } }; class PyBFIndex : public PyVecSimIndex { diff --git a/tests/flow/test_bm_hnsw_tiered_dataset.py b/tests/flow/test_bm_hnsw_tiered_dataset.py index ce96ab918..8f299b3ad 100644 --- a/tests/flow/test_bm_hnsw_tiered_dataset.py +++ b/tests/flow/test_bm_hnsw_tiered_dataset.py @@ -355,8 +355,8 @@ def search_insert(is_multi: bool, num_per_label = 1): def test_main(): print("Test creation") - create_dbpedia() - # create_dbpedia_graph() +#create_dbpedia() + create_dbpedia_graph() print(f"\nStart insert & search test") # search_insert(is_multi=False) From 689d7a04b7e361c8049379e684647dd2381988c6 Mon Sep 17 00:00:00 2001 From: meiravgri Date: Tue, 9 May 2023 08:46:16 +0000 Subject: [PATCH 08/12] fixes --- src/VecSim/algorithms/hnsw/hnsw_tiered.h | 6 +- src/python_bindings/bindings.cpp | 4 +- tests/flow/test_bm_hnsw_tiered_dataset.py | 120 +++++++++++++--------- 3 files changed, 81 insertions(+), 49 deletions(-) diff --git a/src/VecSim/algorithms/hnsw/hnsw_tiered.h b/src/VecSim/algorithms/hnsw/hnsw_tiered.h index 34e9adbeb..79d550b68 100644 --- a/src/VecSim/algorithms/hnsw/hnsw_tiered.h +++ b/src/VecSim/algorithms/hnsw/hnsw_tiered.h @@ -181,7 +181,11 @@ class TieredHNSWIndex : public VecSimTieredIndex { VecSimBatchIterator *newBatchIterator(const void *queryBlob, VecSimQueryParams *queryParams) const override { - return this->backendIndex->newBatchIterator(queryBlob, queryParams); + size_t blobSize = this->backendIndex->getDim() * sizeof(DataType); + void *queryBlobCopy = this->allocator->allocate(blobSize); + memcpy(queryBlobCopy, queryBlob, blobSize); + return new (this->allocator) + TieredHNSW_BatchIterator(queryBlobCopy, this, queryParams, this->allocator); } inline void setLastSearchMode(VecSearchMode mode) override { return this->backendIndex->setLastSearchMode(mode); diff --git a/src/python_bindings/bindings.cpp b/src/python_bindings/bindings.cpp index aa97fde10..e505f68b3 100644 --- a/src/python_bindings/bindings.cpp +++ b/src/python_bindings/bindings.cpp @@ -411,7 +411,7 @@ class PyTIEREDIndex : public PyVecSimIndex { } public: - explicit PyTIEREDIndex(size_t BufferLimit = 1000) + explicit PyTIEREDIndex(size_t BufferLimit = 3000000) : submitCb(submit_callback), memoryCtx(0), UpdateMemCb(update_mem_callback), flatBufferLimit(BufferLimit), run_thread(true) { @@ -491,7 +491,7 @@ class PyTIERED_HNSWIndex : public PyTIEREDIndex { // Set the created tiered index in the index external context. this->jobQueueCtx.index_strong_ref = this->index; } - size_t HNSWLabelCount() { return this->index->info().commonInfo.indexLabelCount; } + size_t HNSWLabelCount() { return this->index->info().tieredInfo.backendCommonInfo.indexLabelCount; } }; class PyBFIndex : public PyVecSimIndex { diff --git a/tests/flow/test_bm_hnsw_tiered_dataset.py b/tests/flow/test_bm_hnsw_tiered_dataset.py index 8f299b3ad..0021f69b9 100644 --- a/tests/flow/test_bm_hnsw_tiered_dataset.py +++ b/tests/flow/test_bm_hnsw_tiered_dataset.py @@ -296,67 +296,95 @@ def create_hnsw(): create_hnsw() -def search_insert(is_multi: bool, num_per_label = 1): - indices_ctx = DBPediaIndexCtx(data_size=1000, mode=CreationMode.CREATE_TIERED_INDEX, is_multi=is_multi) - index = indices_ctx.tiered_index - num_elements = indices_ctx.num_elements - query_data = indices_ctx.generate_queries(num_queries=1) +def insert_and_update(): + indices_ctx = DBPediaIndexCtx(mode=CreationMode.CREATE_TIERED_INDEX) + index = indices_ctx.tiered_index + threads_num = TIEREDIndex.get_threads_num() + print(f"thread num = {threads_num}") + data = indices_ctx.data + num_elements = indices_ctx.num_elements - # Add vectors to the flat index. - bf_index = indices_ctx.init_and_populate_flat_index() + print(f"flat buffer limit = {index.get_buffer_limit()}") + start = time.time() + for i, vector in enumerate(data): + index.add_vector(vector, i) + bf_dur = time.time() - start - # Start background insertion to the tiered index. - index_start, _, _ = indices_ctx.populate_index(index) + print(f''' insert to bf took {bf_dur}, current hnsw size is {index.hnsw_label_count()}") + wait for index\n''') + index.wait_for_index() + dur = time.time() - start + # Measure insertion to tiered index - correct = 0 - k = 10 - searches_number = 0 + print(f"Insert {num_elements} vectors to tiered index took {dur} s") - # config knn log - index.start_knn_log() + # Measure total memory of the tiered index + print(f"total memory of tiered index = {index.index_memory()/pow(10,9)} GB") - # run knn query every 1 s. - total_tiered_search_time = 0 - prev_bf_size = num_elements - while index.hnsw_label_count() < num_elements: - # For each run get the current hnsw size and the query time. - bf_curr_size = index.get_curr_bf_size(mode = 'insert_and_knn') - query_start = time.time() - tiered_labels, _ = index.knn_query(query_data, k) - query_dur = time.time() - query_start - total_tiered_search_time += query_dur + assert index.get_curr_bf_size() == 0 + + def search_insert(is_multi: bool, num_per_label = 1): - print(f"query time = {round_ms(query_dur)} ms") + query_data = indices_ctx.generate_queries(num_queries=1) - # BF size should decrease. - print(f"bf size = {bf_curr_size}") - assert bf_curr_size < prev_bf_size + # Add vectors to the flat index. + bf_index = indices_ctx.init_and_populate_flat_index() + print(f"start overrideing") - # Run the query also in the bf index to get the ground truth results. - bf_labels, _ = bf_index.knn_query(query_data, k) - correct += len(np.intersect1d(tiered_labels[0], bf_labels[0])) - time.sleep(1) - searches_number += 1 - prev_bf_size = bf_curr_size - - index.reset_log() - - # HNSW labels count updates before the job is done, so we need to wait for the queue to be empty. - index.wait_for_index(1) - index_dur = time.time() - index_start - print(f"indexing during search in tiered took {round_(index_dur)} s") + # Start background insertion to the tiered index. + index_start, bf_dur, _ = indices_ctx.populate_index(index) + print(f"total memory of tiered index = {index.index_memory()/pow(10,9)} GB") + + print(f"insert to bf took {bf_dur}, current hnsw size is {index.hnsw_label_count()}") + + correct = 0 + k = 10 + searches_number = 0 + + # config knn log + index.start_knn_log() + + # run knn query every 1 s. + total_tiered_search_time = 0 + bf_curr_size = num_elements + while bf_curr_size != 0: + # For each run get the current hnsw size and the query time. + query_start = time.time() + tiered_labels, _ = index.knn_query(query_data, k) + query_dur = time.time() - query_start + total_tiered_search_time += query_dur + bf_curr_size = index.get_curr_bf_size(mode = 'insert_and_knn') + + print(f"query time = {round_ms(query_dur)} ms") + print(f"bf size = {bf_curr_size}") + + # Run the query also in the bf index to get the ground truth results. + bf_labels, _ = bf_index.knn_query(query_data, k) + correct += len(np.intersect1d(tiered_labels[0], bf_labels[0])) + time.sleep(1) + searches_number += 1 + + index.reset_log() + + # HNSW labels count updates before the job is done, so we need to wait for the queue to be empty. + index.wait_for_index(1) + index_dur = time.time() - index_start + print(f"indexing during search in tiered took {round_(index_dur)} s") + + # Measure recall. + recall = float(correct)/(k*searches_number) + print("Average recall is:", round_(recall, 3)) + print("tiered query per seconds: ", round_(searches_number/total_tiered_search_time)) + search_insert(is_multi=False) - # Measure recall. - recall = float(correct)/(k*searches_number) - print("Average recall is:", round_(recall, 3)) - print("tiered query per seconds: ", round_(searches_number/total_tiered_search_time)) def test_main(): print("Test creation") #create_dbpedia() - create_dbpedia_graph() + # create_dbpedia_graph() print(f"\nStart insert & search test") # search_insert(is_multi=False) + insert_and_update() From 6d6758414cd8fda1abac3ff264f3901afbf5b213 Mon Sep 17 00:00:00 2001 From: meiravgri Date: Thu, 11 May 2023 12:22:41 +0000 Subject: [PATCH 09/12] expose execute swap jobs if build tests is defined and added a function to python bindings --- src/VecSim/algorithms/hnsw/hnsw_tiered.h | 9 +- src/VecSim/vec_sim_index.h | 2 - src/VecSim/vec_sim_tiered_index.h | 2 - src/python_bindings/bindings.cpp | 12 +- src/python_bindings/tiered_index_mock.h | 6 +- tests/flow/test_bm_hnsw_tiered_dataset.py | 156 +++++++++++++++++++--- 6 files changed, 161 insertions(+), 26 deletions(-) diff --git a/src/VecSim/algorithms/hnsw/hnsw_tiered.h b/src/VecSim/algorithms/hnsw/hnsw_tiered.h index 79d550b68..5f1b95689 100644 --- a/src/VecSim/algorithms/hnsw/hnsw_tiered.h +++ b/src/VecSim/algorithms/hnsw/hnsw_tiered.h @@ -85,9 +85,13 @@ class TieredHNSWIndex : public VecSimTieredIndex { // To be executed synchronously upon deleting a vector, doesn't require a wrapper. Main HNSW // lock is assumed to be held exclusive here. void executeSwapJob(HNSWSwapJob *job, vecsim_stl::vector &idsToRemove); - +#ifdef BUILD_TESTS +public: +#endif void executeReadySwapJobs(); - +#ifdef BUILD_TESTS +private: +#endif // Wrappers static functions to be sent as callbacks upon creating the jobs (since members // functions cannot serve as callback, this serve as the "gateway" to the appropriate index). static void executeInsertJobWrapper(AsyncJob *job); @@ -668,6 +672,7 @@ int TieredHNSWIndex::addVector(const void *blob, labelType l } // Apply ready swap jobs if number of deleted vectors reached the threshold (under exclusive // lock of the main index guard). + this->executeReadySwapJobs(); // Insert job to the queue and signal the workers' updater. diff --git a/src/VecSim/vec_sim_index.h b/src/VecSim/vec_sim_index.h index 920addf94..709859843 100644 --- a/src/VecSim/vec_sim_index.h +++ b/src/VecSim/vec_sim_index.h @@ -228,6 +228,4 @@ struct VecSimIndexAbstract : public VecSimIndexInterface { return this->newBatchIterator(query_to_send, queryParams); } - - }; diff --git a/src/VecSim/vec_sim_tiered_index.h b/src/VecSim/vec_sim_tiered_index.h index 74bedd449..8467fd209 100644 --- a/src/VecSim/vec_sim_tiered_index.h +++ b/src/VecSim/vec_sim_tiered_index.h @@ -124,8 +124,6 @@ class VecSimTieredIndex : public VecSimIndexInterface { return this->newBatchIterator(query_to_send, queryParams); } - - }; template diff --git a/src/python_bindings/bindings.cpp b/src/python_bindings/bindings.cpp index e505f68b3..c9dcc3d8a 100644 --- a/src/python_bindings/bindings.cpp +++ b/src/python_bindings/bindings.cpp @@ -491,7 +491,14 @@ class PyTIERED_HNSWIndex : public PyTIEREDIndex { // Set the created tiered index in the index external context. this->jobQueueCtx.index_strong_ref = this->index; } - size_t HNSWLabelCount() { return this->index->info().tieredInfo.backendCommonInfo.indexLabelCount; } + size_t HNSWLabelCount() { + return this->index->info().tieredInfo.backendCommonInfo.indexLabelCount; + } + + void executeReadySwapJobs() { + reinterpret_cast *>(this->index.get()) + ->executeReadySwapJobs(); + } }; class PyBFIndex : public PyVecSimIndex { @@ -615,7 +622,8 @@ PYBIND11_MODULE(VecSim, m) { return new PyTIERED_HNSWIndex(hnsw_params, tiered_hnsw_params); }), py::arg("hnsw_params"), py::arg("tiered_hnsw_params")) - .def("hnsw_label_count", &PyTIERED_HNSWIndex::HNSWLabelCount); + .def("hnsw_label_count", &PyTIERED_HNSWIndex::HNSWLabelCount) + .def("execute_swap_jobs", &PyTIERED_HNSWIndex::executeReadySwapJobs); py::class_(m, "BFIndex") .def(py::init([](const BFParams ¶ms) { return new PyBFIndex(params); }), diff --git a/src/python_bindings/tiered_index_mock.h b/src/python_bindings/tiered_index_mock.h index 594bc7896..113f0c9a2 100644 --- a/src/python_bindings/tiered_index_mock.h +++ b/src/python_bindings/tiered_index_mock.h @@ -31,7 +31,11 @@ typedef struct IndexExtCtx { } IndexExtCtx; static const size_t MAX_POOL_SIZE = 16; -static const size_t THREAD_POOL_SIZE = MIN(MAX_POOL_SIZE, std::thread::hardware_concurrency()); +static const size_t hardware_cpu = std::thread::hardware_concurrency(); + +// hardware_cpu = 8; + +static const size_t THREAD_POOL_SIZE = MIN(MAX_POOL_SIZE, hardware_cpu); extern std::vector thread_pool; extern std::mutex queue_guard; extern std::condition_variable queue_cond; diff --git a/tests/flow/test_bm_hnsw_tiered_dataset.py b/tests/flow/test_bm_hnsw_tiered_dataset.py index 0021f69b9..a2de18066 100644 --- a/tests/flow/test_bm_hnsw_tiered_dataset.py +++ b/tests/flow/test_bm_hnsw_tiered_dataset.py @@ -7,12 +7,13 @@ import os import time from common import * -import hnswlib import h5py from urllib.request import urlretrieve import pickle from enum import Enum +from random import choice + class CreationMode(Enum): ONLY_PARAMS = 1 CREATE_TIERED_INDEX = 2 @@ -133,6 +134,8 @@ def populate_index(self, index): start_add = time.time() index.add_vector(vector, label) duration += time.time() - start_add + if label % 100000 == 0: + print(f"time passes= {duration}") end = time.time() return (start, duration, end) @@ -168,14 +171,26 @@ def generate_queries(self, num_queries): queries = self.rng.random((num_queries, self.dim)) return np.float32(queries) if self.data_type == VecSimType_FLOAT32 else queries + def generate_query_from_ds(self): + return choice(self.data) + + def create_dbpedia(): - indices_ctx = DBPediaIndexCtx() + indices_ctx = DBPediaIndexCtx(data_size= 1000000) threads_num = TIEREDIndex.get_threads_num() print(f"thread num = {threads_num}") data = indices_ctx.data num_elements = indices_ctx.num_elements - + def create_parallel(): + index = indices_ctx.create_hnsw() + print(f"Insert {num_elements} vectors to parallel index") + start = time.time() + index.add_vector_parallel(data, np.array(range(num_elements)), threads_num) + dur = time.time() - start + print(f"Insert {num_elements} vectors to parallel index took {dur} s") + + create_parallel() def create_tiered(): index = indices_ctx.create_tiered() @@ -214,8 +229,7 @@ def create_dbpedia_graph(): batches_num_per_ds = 10 batch_size = int(num_elements / batches_num_per_ds) - #generate 1M random vectors - # random_vectors = indices_ctx.generate_random_vectors(num_vectors=pow(10,6)) + def create_tiered(): index = indices_ctx.create_tiered() flat_buffer_limit = index.get_buffer_limit() @@ -297,6 +311,79 @@ def create_hnsw(): create_hnsw() +def insert_delete_reinsert(): + indices_ctx = DBPediaIndexCtx(data_size = 1000000,mode=CreationMode.CREATE_TIERED_INDEX) + index = indices_ctx.tiered_index + threads_num = TIEREDIndex.get_threads_num() + print(f"thread num = {threads_num}") + data = indices_ctx.data + num_elements = indices_ctx.num_elements + + # compute ground truth + k = 10 + query_data = indices_ctx.generate_query_from_ds() + + bf_index = indices_ctx.init_and_populate_flat_index() + bf_labels, _ = bf_index.knn_query(query_data, k) + + + print(f"flat buffer limit = {index.get_buffer_limit()}") + start = time.time() + for i, vector in enumerate(data): + index.add_vector(vector, i) + bf_dur = time.time() - start + + print(f''' insert to bf took {bf_dur}, current hnsw size is {index.hnsw_label_count()}") + wait for index\n''') + index.wait_for_index() + dur = time.time() - start + + query_start = time.time() + tiered_labels, _ = index.knn_query(query_data, k) + query_dur = time.time() - query_start + + print(f"query time = {round_ms(query_dur)} ms") + + curr_correct = len(np.intersect1d(tiered_labels[0], bf_labels[0])) + curr_recall = float(curr_correct)/k + print(f"curr recall after firsdt insertion= {curr_recall}") + + # Delete half of the index. + for i in range(0, num_elements, 2): + index.delete_vector(i) + assert index.hnsw_label_count() == (num_elements / 2) + index.wait_for_index() + + #reinsert the deleted vectors + start = time.time() + for i in range(0, num_elements, 2): + vector = data[i] + index.add_vector(vector, i) + bf_dur = time.time() - start + + print(f''' insert to bf took {bf_dur}, current hnsw size is {index.hnsw_label_count()}") + wait for index\n''') + index.wait_for_index() + dur = time.time() - start + assert index.hnsw_label_count() == (num_elements) + print(f''' reinsert to the hnsw took insert to bf took {dur}, current hnsw size is {index.hnsw_label_count()}")''') + print(f"total memory of tiered index = {index.index_memory()/pow(10,9)} GB") + + + + + print(f"total memory of bf index = {bf_index.index_memory()/pow(10,9)} GB") + + query_start = time.time() + tiered_labels, _ = index.knn_query(query_data, k) + query_dur = time.time() - query_start + + print(f"query time = {round_ms(query_dur)} ms") + + curr_correct = len(np.intersect1d(tiered_labels[0], bf_labels[0])) + curr_recall = float(curr_correct)/k + print(f"curr recall = {curr_recall}") + def insert_and_update(): indices_ctx = DBPediaIndexCtx(mode=CreationMode.CREATE_TIERED_INDEX) @@ -327,20 +414,24 @@ def insert_and_update(): def search_insert(is_multi: bool, num_per_label = 1): - query_data = indices_ctx.generate_queries(num_queries=1) + #choose random vector from the data base and perform query on it + query_data = indices_ctx.generate_query_from_ds() + k = 10 - # Add vectors to the flat index. + # Calculate ground truth results bf_index = indices_ctx.init_and_populate_flat_index() - print(f"start overrideing") + bf_labels, _ = bf_index.knn_query(query_data, k) + # Start background insertion to the tiered index. + print(f"start overriding") index_start, bf_dur, _ = indices_ctx.populate_index(index) - print(f"total memory of tiered index = {index.index_memory()/pow(10,9)} GB") - print(f"insert to bf took {bf_dur}, current hnsw size is {index.hnsw_label_count()}") + print(f"insert to bf took {bf_dur}, bf size if {index.get_curr_bf_size()} \n \ + current hnsw size is {index.hnsw_label_count()}") + print(f"total memory of tiered index = {index.index_memory()/pow(10,9)} GB") correct = 0 - k = 10 searches_number = 0 # config knn log @@ -360,10 +451,13 @@ def search_insert(is_multi: bool, num_per_label = 1): print(f"query time = {round_ms(query_dur)} ms") print(f"bf size = {bf_curr_size}") - # Run the query also in the bf index to get the ground truth results. - bf_labels, _ = bf_index.knn_query(query_data, k) - correct += len(np.intersect1d(tiered_labels[0], bf_labels[0])) - time.sleep(1) + + curr_correct = len(np.intersect1d(tiered_labels[0], bf_labels[0])) + curr_recall = float(curr_correct)/k + print(f"curr recall = {curr_recall}") + correct += curr_correct + + time.sleep(5) searches_number += 1 index.reset_log() @@ -371,20 +465,48 @@ def search_insert(is_multi: bool, num_per_label = 1): # HNSW labels count updates before the job is done, so we need to wait for the queue to be empty. index.wait_for_index(1) index_dur = time.time() - index_start - print(f"indexing during search in tiered took {round_(index_dur)} s") + assert index.get_curr_bf_size() == 0 + assert index.hnsw_label_count() == num_elements + + print(f"indexing during search in tiered took {round_(index_dur)} s, all repair jobs are done") + print(f"total memory of tiered index = {index.index_memory()/pow(10,9)} GB") # Measure recall. recall = float(correct)/(k*searches_number) print("Average recall is:", round_(recall, 3)) print("tiered query per seconds: ", round_(searches_number/total_tiered_search_time)) + + #execute swap jobs and execute query + swap_start = time.time() + + index.execute_swap_jobs() + + swap_dur = time.time() - swap_start + print(f"swap jobs took = {round_ms(swap_dur)} ms") + + query_start = time.time() + tiered_labels, _ = index.knn_query(query_data, k) + query_dur = time.time() - query_start + + print(f"last query time = {round_ms(query_dur)} ms") + + curr_correct = len(np.intersect1d(tiered_labels[0], bf_labels[0])) + curr_recall = float(curr_correct)/k + print(f"curr recall = {curr_recall}") + + + search_insert(is_multi=False) def test_main(): print("Test creation") -#create_dbpedia() + # create_dbpedia() # create_dbpedia_graph() print(f"\nStart insert & search test") # search_insert(is_multi=False) insert_and_update() + #or sanity + #insert_delete_reinsert() + From b5e64c2c5e7b5a2817094088ee6e1ba426d98f95 Mon Sep 17 00:00:00 2001 From: meiravgri Date: Sun, 14 May 2023 09:56:42 +0000 Subject: [PATCH 10/12] removed memorycb added marked deleted pybind max cpu = 8 taking alon's fixes for th on the pending swap jobs --- src/VecSim/algorithms/hnsw/hnsw_tiered.h | 39 ++++++++++++--- src/python_bindings/bindings.cpp | 14 +++--- src/python_bindings/tiered_index_mock.h | 17 ++----- tests/flow/test_bm_hnsw_tiered_dataset.py | 61 +++++++++++++---------- 4 files changed, 79 insertions(+), 52 deletions(-) diff --git a/src/VecSim/algorithms/hnsw/hnsw_tiered.h b/src/VecSim/algorithms/hnsw/hnsw_tiered.h index ad86ca7d8..7fa17c1b4 100644 --- a/src/VecSim/algorithms/hnsw/hnsw_tiered.h +++ b/src/VecSim/algorithms/hnsw/hnsw_tiered.h @@ -29,9 +29,10 @@ struct HNSWSwapJob : public VecsimBaseObject { HNSWSwapJob(std::shared_ptr allocator, idType deletedId) : VecsimBaseObject(allocator), deleted_id(deletedId), pending_repair_jobs_counter(0) {} void setRepairJobsNum(long num_repair_jobs) { pending_repair_jobs_counter = num_repair_jobs; } - void atomicDecreasePendingJobsNum() { - pending_repair_jobs_counter--; + int atomicDecreasePendingJobsNum() { + int ret = --pending_repair_jobs_counter; assert(pending_repair_jobs_counter >= 0); + return ret; } }; @@ -74,6 +75,7 @@ class TieredHNSWIndex : public VecSimTieredIndex { // vectors reached this limit, we apply swap jobs *only for vectors that has no more pending // repair jobs*, and are ready to be removed from the graph. size_t pendingSwapJobsThreshold; + size_t readySwapJobs; // Protect the both idToRepairJobs lookup and the pending_repair_jobs_counter for the // associated swap jobs. @@ -194,6 +196,10 @@ class TieredHNSWIndex : public VecSimTieredIndex { inline void setLastSearchMode(VecSearchMode mode) override { return this->backendIndex->setLastSearchMode(mode); } + +#ifdef BUILD_TESTS + void getDataByLabel(labelType label, std::vector> &vectors_output) const; +#endif }; /** @@ -228,7 +234,9 @@ void TieredHNSWIndex::executeSwapJob(HNSWSwapJob *job, for (auto &job_it : idToRepairJobs.at(job->deleted_id)) { job_it->node_id = INVALID_JOB_ID; for (auto &swap_job_it : job_it->associatedSwapJobs) { - swap_job_it->atomicDecreasePendingJobsNum(); + if (swap_job_it->atomicDecreasePendingJobsNum() == 0) { + readySwapJobs++; + } } } idToRepairJobs.erase(job->deleted_id); @@ -264,7 +272,7 @@ template void TieredHNSWIndex::executeReadySwapJobs() { // If swapJobs size is equal or larger than a threshold, go over the swap jobs and execute every // job for which all of its pending repair jobs were executed (otherwise finish and return). - if (idToSwapJob.size() < this->pendingSwapJobsThreshold) { + if (readySwapJobs < this->pendingSwapJobsThreshold) { return; } // Execute swap jobs - acquire hnsw write lock. @@ -283,6 +291,7 @@ void TieredHNSWIndex::executeReadySwapJobs() { for (idType id : idsToRemove) { idToSwapJob.erase(id); } + readySwapJobs-= idsToRemove.size(); this->mainIndexGuard.unlock(); } @@ -335,6 +344,9 @@ int TieredHNSWIndex::deleteLabelFromHNSW(labelType label) { } } swap_job->setRepairJobsNum(incoming_edges.size()); + if (incoming_edges.size() == 0) { + readySwapJobs++; + } this->idToRepairJobsGuard.unlock(); this->submitJobs(repair_jobs); @@ -510,7 +522,9 @@ void TieredHNSWIndex::executeRepairJob(HNSWRepairJob *job) { repair_jobs.pop_back(); } for (auto &it : job->associatedSwapJobs) { - it->atomicDecreasePendingJobsNum(); + if (it->atomicDecreasePendingJobsNum() == 0) { + readySwapJobs++; + } } this->idToRepairJobsGuard.unlock(); @@ -528,7 +542,7 @@ TieredHNSWIndex::TieredHNSWIndex(HNSWIndex allocator) : VecSimTieredIndex(hnsw_index, bf_index, tiered_index_params, allocator), labelToInsertJobs(this->allocator), idToRepairJobs(this->allocator), - idToSwapJob(this->allocator) { + idToSwapJob(this->allocator), readySwapJobs(0) { // If the param for swapJobThreshold is 0 use the default value, if it exceeds the maximum // allowed, use the maximum value. this->pendingSwapJobsThreshold = @@ -650,6 +664,10 @@ int TieredHNSWIndex::addVector(const void *blob, labelType l // If we removed the previous vector from both HNSW and flat in the overwrite process, // we still return 0 (not -1). ret = MAX(ret - this->deleteLabelFromHNSW(label), 0); + if(ret != 1 && ret != 0) { + std::cout<< " ret == " << ret << std:: endl; + + } } // Apply ready swap jobs if number of deleted vectors reached the threshold (under exclusive // lock of the main index guard). @@ -995,3 +1013,12 @@ void TieredHNSWIndex::TieredHNSW_BatchIterator::filter_irrel // Update number of results (pop the tail) array_pop_back_n(rl.results, end - cur_end); } + + +#ifdef BUILD_TESTS +template +void TieredHNSWIndex::getDataByLabel( + labelType label, std::vector> &vectors_output) const { + this->getHNSWIndex()->getDataByLabel(label, vectors_output); +} +#endif diff --git a/src/python_bindings/bindings.cpp b/src/python_bindings/bindings.cpp index c9dcc3d8a..d399ea1d6 100644 --- a/src/python_bindings/bindings.cpp +++ b/src/python_bindings/bindings.cpp @@ -387,9 +387,6 @@ class PyTIEREDIndex : public PyVecSimIndex { JobQueue jobQueue; // External queue that holds the jobs. IndexExtCtx jobQueueCtx; // External context to be sent to the submit callback. SubmitCB submitCb; // A callback that submits an array of jobs into a given jobQueue. - size_t memoryCtx; // External context that stores the index memory consumption. - UpdateMemoryCB UpdateMemCb; // A callback that updates the memoryCtx - // with a given memory (number). size_t flatBufferLimit; // Maximum size allowed for the flat buffer. If flat buffer is full, use // in-place insertion. bool run_thread; @@ -402,8 +399,6 @@ class PyTIEREDIndex : public PyVecSimIndex { .jobQueue = &this->jobQueue, .jobQueueCtx = &this->jobQueueCtx, .submitCb = this->submitCb, - .memoryCtx = &this->memoryCtx, - .UpdateMemCb = this->UpdateMemCb, .flatBufferLimit = this->flatBufferLimit, }; @@ -411,8 +406,8 @@ class PyTIEREDIndex : public PyVecSimIndex { } public: - explicit PyTIEREDIndex(size_t BufferLimit = 3000000) - : submitCb(submit_callback), memoryCtx(0), UpdateMemCb(update_mem_callback), + explicit PyTIEREDIndex(size_t BufferLimit = 1000) + : submitCb(submit_callback), flatBufferLimit(BufferLimit), run_thread(true) { for (size_t i = 0; i < THREAD_POOL_SIZE; i++) { @@ -495,6 +490,10 @@ class PyTIERED_HNSWIndex : public PyTIEREDIndex { return this->index->info().tieredInfo.backendCommonInfo.indexLabelCount; } + size_t HNSWMarkedDeleted() { + return this->index->info().tieredInfo.backendInfo.hnswInfo.numberOfMarkedDeletedNodes; + } + void executeReadySwapJobs() { reinterpret_cast *>(this->index.get()) ->executeReadySwapJobs(); @@ -623,6 +622,7 @@ PYBIND11_MODULE(VecSim, m) { }), py::arg("hnsw_params"), py::arg("tiered_hnsw_params")) .def("hnsw_label_count", &PyTIERED_HNSWIndex::HNSWLabelCount) + .def("hnsw_marked_deleted", &PyTIERED_HNSWIndex::HNSWMarkedDeleted) .def("execute_swap_jobs", &PyTIERED_HNSWIndex::executeReadySwapJobs); py::class_(m, "BFIndex") diff --git a/src/python_bindings/tiered_index_mock.h b/src/python_bindings/tiered_index_mock.h index 113f0c9a2..eb426878f 100644 --- a/src/python_bindings/tiered_index_mock.h +++ b/src/python_bindings/tiered_index_mock.h @@ -22,19 +22,16 @@ typedef struct RefManagedJob { } RefManagedJob; using JobQueue = std::queue; -int submit_callback(void *job_queue, AsyncJob **jobs, size_t len, void *index_ctx); -int update_mem_callback(void *mem_ctx, size_t mem); +int submit_callback(void *job_queue, void *index_ctx, AsyncJob **jobs, JobCallback *CBs, + JobCallback *freeCBs, size_t jobs_len); typedef struct IndexExtCtx { std::shared_ptr index_strong_ref; ~IndexExtCtx() { std::cout << "ctx dtor" << std::endl; } } IndexExtCtx; -static const size_t MAX_POOL_SIZE = 16; +static const size_t MAX_POOL_SIZE = 8; static const size_t hardware_cpu = std::thread::hardware_concurrency(); - -// hardware_cpu = 8; - static const size_t THREAD_POOL_SIZE = MIN(MAX_POOL_SIZE, hardware_cpu); extern std::vector thread_pool; extern std::mutex queue_guard; @@ -94,7 +91,8 @@ std::mutex queue_guard; std::condition_variable queue_cond; std::vector thread_pool; -int submit_callback(void *job_queue, AsyncJob **jobs, size_t len, void *index_ctx) { +int submit_callback(void *job_queue, void *index_ctx, AsyncJob **jobs, JobCallback *CBs, + JobCallback *freeCBs, size_t len) { { std::unique_lock lock(queue_guard); for (size_t i = 0; i < len; i++) { @@ -113,11 +111,6 @@ int submit_callback(void *job_queue, AsyncJob **jobs, size_t len, void *index_ct return VecSim_OK; } -int update_mem_callback(void *mem_ctx, size_t mem) { - *(size_t *)mem_ctx = mem; - return VecSim_OK; -} - // Main loop for background worker threads that execute the jobs form the job queue. // run_thread uses as a signal to the thread that indicates whether it should keep running or // stop and terminate the thread. diff --git a/tests/flow/test_bm_hnsw_tiered_dataset.py b/tests/flow/test_bm_hnsw_tiered_dataset.py index a2de18066..314faf3d5 100644 --- a/tests/flow/test_bm_hnsw_tiered_dataset.py +++ b/tests/flow/test_bm_hnsw_tiered_dataset.py @@ -386,7 +386,7 @@ def insert_delete_reinsert(): def insert_and_update(): - indices_ctx = DBPediaIndexCtx(mode=CreationMode.CREATE_TIERED_INDEX) + indices_ctx = DBPediaIndexCtx(data_size = 300000, mode=CreationMode.CREATE_TIERED_INDEX) index = indices_ctx.tiered_index threads_num = TIEREDIndex.get_threads_num() print(f"thread num = {threads_num}") @@ -399,7 +399,7 @@ def insert_and_update(): index.add_vector(vector, i) bf_dur = time.time() - start - print(f''' insert to bf took {bf_dur}, current hnsw size is {index.hnsw_label_count()}") + print(f''' insert {num_elements} vecs to bf took {bf_dur}, current hnsw size is {index.hnsw_label_count()}") wait for index\n''') index.wait_for_index() dur = time.time() - start @@ -416,45 +416,57 @@ def search_insert(is_multi: bool, num_per_label = 1): #choose random vector from the data base and perform query on it query_data = indices_ctx.generate_query_from_ds() + # query_data = indices_ctx.generate_queries(num_queries=1) k = 10 - # Calculate ground truth results bf_index = indices_ctx.init_and_populate_flat_index() bf_labels, _ = bf_index.knn_query(query_data, k) + assert bf_index.index_size() == num_elements + def query(): + query_start = time.time() + tiered_labels, _ = index.knn_query(query_data, k) + query_dur = time.time() - query_start + + print(f"query time = {round_ms(query_dur)} ms") + + curr_correct = len(np.intersect1d(tiered_labels[0], bf_labels[0])) + curr_recall = float(curr_correct)/k + print(f"curr recall = {curr_recall}") + + return query_dur, curr_correct + # config knn log + index.start_knn_log() + + + # query before any changes + print("query before overriding") + _, _ = query() + assert index.get_curr_bf_size(mode = 'insert_and_knn') == 0 # Start background insertion to the tiered index. print(f"start overriding") index_start, bf_dur, _ = indices_ctx.populate_index(index) - - print(f"insert to bf took {bf_dur}, bf size if {index.get_curr_bf_size()} \n \ - current hnsw size is {index.hnsw_label_count()}") + print(f"bf size is:" ) + bf_size = index.hnsw_label_count() + print(f"{bf_size}") + print(f"current hnsw size is {index.hnsw_label_count()}") + print(f"insert to bf took {bf_dur}") print(f"total memory of tiered index = {index.index_memory()/pow(10,9)} GB") correct = 0 searches_number = 0 - # config knn log - index.start_knn_log() - # run knn query every 1 s. total_tiered_search_time = 0 bf_curr_size = num_elements while bf_curr_size != 0: + query_dur, curr_correct = query() # For each run get the current hnsw size and the query time. - query_start = time.time() - tiered_labels, _ = index.knn_query(query_data, k) - query_dur = time.time() - query_start total_tiered_search_time += query_dur bf_curr_size = index.get_curr_bf_size(mode = 'insert_and_knn') - print(f"query time = {round_ms(query_dur)} ms") print(f"bf size = {bf_curr_size}") - - - curr_correct = len(np.intersect1d(tiered_labels[0], bf_labels[0])) - curr_recall = float(curr_correct)/k - print(f"curr recall = {curr_recall}") correct += curr_correct time.sleep(5) @@ -484,17 +496,12 @@ def search_insert(is_multi: bool, num_per_label = 1): swap_dur = time.time() - swap_start print(f"swap jobs took = {round_ms(swap_dur)} ms") - query_start = time.time() - tiered_labels, _ = index.knn_query(query_data, k) - query_dur = time.time() - query_start - - print(f"last query time = {round_ms(query_dur)} ms") - - curr_correct = len(np.intersect1d(tiered_labels[0], bf_labels[0])) - curr_recall = float(curr_correct)/k - print(f"curr recall = {curr_recall}") + assert index.hnsw_marked_deleted() == 0 + print("query after swap execution") + print(f"total memory of tiered index = {index.index_memory()/pow(10,9)} GB") + query_dur, curr_correct = query() search_insert(is_multi=False) From 5ed28c58421125010460e4b6cd750c98a9b97983 Mon Sep 17 00:00:00 2001 From: meiravgri Date: Sun, 14 May 2023 10:00:56 +0000 Subject: [PATCH 11/12] added const to getFlatbufferIndex --- src/VecSim/vec_sim_tiered_index.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/VecSim/vec_sim_tiered_index.h b/src/VecSim/vec_sim_tiered_index.h index 2699c960f..fdd369256 100644 --- a/src/VecSim/vec_sim_tiered_index.h +++ b/src/VecSim/vec_sim_tiered_index.h @@ -100,7 +100,7 @@ class VecSimTieredIndex : public VecSimIndexInterface { // Return the current state of the global write mode (async/in-place). static VecSimWriteMode getWriteMode() { return VecSimIndexInterface::asyncWriteMode; } #ifdef BUILD_TESTS - inline VecSimIndexAbstract *getFlatbufferIndex() { return this->frontendIndex; } + inline VecSimIndexAbstract *getFlatbufferIndex() const { return this->frontendIndex; } #endif private: virtual int addVectorWrapper(const void *blob, labelType label, void *auxiliaryCtx) override { From 7039291267821a18c9b8a0a3febbbd387c2b88fb Mon Sep 17 00:00:00 2001 From: meiravgri Date: Tue, 16 May 2023 12:36:55 +0000 Subject: [PATCH 12/12] add poetry build to make file added memory prints to hnsw resize --- Makefile | 3 ++- src/VecSim/algorithms/hnsw/hnsw.h | 23 ++++++++++++++++++ src/VecSim/algorithms/hnsw/hnsw_tiered.h | 29 +++++++++++++++-------- src/python_bindings/bindings.cpp | 8 ++++--- tests/flow/test_bm_hnsw_tiered_dataset.py | 7 ++++-- 5 files changed, 54 insertions(+), 16 deletions(-) diff --git a/Makefile b/Makefile index 55c39d647..7bc37e69f 100644 --- a/Makefile +++ b/Makefile @@ -186,8 +186,9 @@ valgrind: #---------------------------------------------------------------------------------------------- flow_test: + $(SHOW)poetry build -vv $(SHOW)poetry install - $(SHOW)poetry run pytest tests/flow -v -s + $(SHOW)poetry run pytest tests/flow -vv -s .PHONY: flow_test diff --git a/src/VecSim/algorithms/hnsw/hnsw.h b/src/VecSim/algorithms/hnsw/hnsw.h index a0e4e204a..e3f866d06 100644 --- a/src/VecSim/algorithms/hnsw/hnsw.h +++ b/src/VecSim/algorithms/hnsw/hnsw.h @@ -1307,19 +1307,33 @@ HNSWIndex::safeCollectAllNodeIncomingNeighbors(idType node_i template void HNSWIndex::resizeIndexInternal(size_t new_max_elements) { + std::cout<< "resize element_levels_" << std::endl; + element_levels_.resize(new_max_elements); + std::cout<< "resize element_levels_" << std::endl; + element_levels_.shrink_to_fit(); + std::cout<< "resize LabelLookup" << std::endl; + resizeLabelLookup(new_max_elements); + std::cout<< "resize visited_nodes_handler_pool" << std::endl; + visited_nodes_handler_pool.resize(new_max_elements); + std::cout<< "resize element_neighbors_locks_" << std::endl; + vecsim_stl::vector(new_max_elements, this->allocator) .swap(element_neighbors_locks_); // Reallocate base layer + std::cout<< "resize data_level0_memory from " << (*(((size_t *)data_level0_memory_) - 1))<< " to" << (new_max_elements * size_data_per_element_) <allocator->reallocate( data_level0_memory_, new_max_elements * size_data_per_element_); if (data_level0_memory_new == nullptr) throw std::runtime_error("Not enough memory: resizeIndex failed to allocate base layer"); data_level0_memory_ = data_level0_memory_new; + std::cout<< "linkLists_new " << (*(((size_t *)linkLists_) - 1))<< " to" << (sizeof(void *) * new_max_elements) <allocator->reallocate(linkLists_, sizeof(void *) * new_max_elements); @@ -1328,6 +1342,8 @@ void HNSWIndex::resizeIndexInternal(size_t new_max_elements) linkLists_ = linkLists_new; max_elements_ = new_max_elements; + std::cout<< "after resize hnsw allocation size = " << this->getAllocationSize() < @@ -1640,8 +1656,11 @@ HNSWIndex::~HNSWIndex() { */ template void HNSWIndex::increaseCapacity() { + std::cout<< "increase capacity current hnsw allocation size = " << this->getAllocationSize() <blockSize - max_elements_ % this->blockSize; resizeIndexInternal(max_elements_ + vectors_to_add); + std::cout<< "after resize hnsw allocation size = " << this->getAllocationSize() < @@ -1707,6 +1726,10 @@ void HNSWIndex::removeAndSwap(idType internalId) { size_t extra_space_to_free = max_elements_ % this->blockSize; // Remove one block from the capacity. + std::cout<< "resize down from removeAndSwap interanl id = " << internalId <getHNSWIndex()->getNumMarkedDeleted() << " marked delted vectors" << std::endl; // Execute swap jobs - acquire hnsw write lock. this->mainIndexGuard.lock(); @@ -293,6 +292,17 @@ void TieredHNSWIndex::executeReadySwapJobs() { } readySwapJobs-= idsToRemove.size(); this->mainIndexGuard.unlock(); + +} +template +void TieredHNSWIndex::executeReadySwapJobs() { + // If swapJobs size is equal or larger than a threshold, go over the swap jobs and execute every + // job for which all of its pending repair jobs were executed (otherwise finish and return). + if (readySwapJobs < this->pendingSwapJobsThreshold) { + return; + } + + executeReadySwapJobsIMP(); } template @@ -394,7 +404,10 @@ void TieredHNSWIndex::insertVectorToHNSW( // Check if resizing is still required (another thread might have done it in the meantime // while we release the shared lock). if (hnsw_index->indexCapacity() == hnsw_index->indexSize()) { + std::cout<< "current tiered total allocation size = " << this->getAllocationSize() <increaseCapacity(); + std::cout<< "after hnsw resize tiered total allocation size = " << this->getAllocationSize() <::addVector(const void *blob, labelType l // If we removed the previous vector from both HNSW and flat in the overwrite process, // we still return 0 (not -1). ret = MAX(ret - this->deleteLabelFromHNSW(label), 0); - if(ret != 1 && ret != 0) { - std::cout<< " ret == " << ret << std:: endl; - - } } // Apply ready swap jobs if number of deleted vectors reached the threshold (under exclusive // lock of the main index guard). diff --git a/src/python_bindings/bindings.cpp b/src/python_bindings/bindings.cpp index d399ea1d6..ef8b5319c 100644 --- a/src/python_bindings/bindings.cpp +++ b/src/python_bindings/bindings.cpp @@ -438,8 +438,10 @@ class PyTIEREDIndex : public PyVecSimIndex { } static void log_flat_buffer_size(void *ctx, const char *msg) { - auto *knnLogCtx = reinterpret_cast *>(ctx); - knnLogCtx->curr_flat_size = knnLogCtx->flat_index->indexLabelCount(); + if(ctx) { + auto *knnLogCtx = reinterpret_cast *>(ctx); + knnLogCtx->curr_flat_size = knnLogCtx->flat_index->indexLabelCount(); + } } void SetKNNLogCtx() { knnLogCtx.flat_index = getFlatBuffer(); @@ -496,7 +498,7 @@ class PyTIERED_HNSWIndex : public PyTIEREDIndex { void executeReadySwapJobs() { reinterpret_cast *>(this->index.get()) - ->executeReadySwapJobs(); + ->executeReadySwapJobsIMP(); } }; diff --git a/tests/flow/test_bm_hnsw_tiered_dataset.py b/tests/flow/test_bm_hnsw_tiered_dataset.py index 314faf3d5..9cc4b441e 100644 --- a/tests/flow/test_bm_hnsw_tiered_dataset.py +++ b/tests/flow/test_bm_hnsw_tiered_dataset.py @@ -26,6 +26,7 @@ def download(src, dst): # Download dataset from s3, save the file locally def get_data_set(dataset_name): hdf5_filename = os.path.join('%s.hdf5' % dataset_name) + print(f"download {hdf5_filename}") url = 'https://s3.amazonaws.com/benchmarks.redislabs/vecsim/dbpedia/dbpedia-768.hdf5' download(url, hdf5_filename) return h5py.File(hdf5_filename, 'r') @@ -36,9 +37,11 @@ def load_data(dataset_name): np_data_file_path = os.path.join('np_train_%s.npy' % dataset_name) try: + print(f"try to load {np_data_file_path}") data = np.load(np_data_file_path, allow_pickle = True) print(f"yay! loaded ") except: + print(f"failed to load {np_data_file_path}") dataset = get_data_set(dataset_name) data = np.array(dataset['train']) np.save(np_data_file_path, data) @@ -134,7 +137,7 @@ def populate_index(self, index): start_add = time.time() index.add_vector(vector, label) duration += time.time() - start_add - if label % 100000 == 0: + if label % 1000 == 0: print(f"time passes= {duration}") end = time.time() return (start, duration, end) @@ -386,7 +389,7 @@ def insert_delete_reinsert(): def insert_and_update(): - indices_ctx = DBPediaIndexCtx(data_size = 300000, mode=CreationMode.CREATE_TIERED_INDEX) + indices_ctx = DBPediaIndexCtx(mode=CreationMode.CREATE_TIERED_INDEX) index = indices_ctx.tiered_index threads_num = TIEREDIndex.get_threads_num() print(f"thread num = {threads_num}")