Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNMG ANN #1993

Open
wants to merge 19 commits into
base: branch-24.04
Choose a base branch
from
Open

SNMG ANN #1993

wants to merge 19 commits into from

Conversation

viclafargue
Copy link
Contributor

@viclafargue viclafargue commented Nov 14, 2023

The goal of this PR is to implement a distributed (single-node-multiple-GPUs) implementation of ANN indexes. It will allow to build, extend and search an index on multiple GPUs.

Before building the index, the user has to choose between two modes :

  • Sharding mode : The index dataset is split, each GPU trains its own index with its respective share of the dataset. This is intended to both increase the search throughput and the maximal size of the index.
  • Index duplication mode : The index is built once on a GPU and then copied over to others. Alternatively, the index dataset is sent to each GPU to be built there. This intended to increase the search throughput.

Copy link

copy-pr-bot bot commented Nov 14, 2023

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

@cjnolet cjnolet added improvement Improvement / enhancement to an existing function non-breaking Non-breaking change Vector Search labels Nov 14, 2023
@viclafargue
Copy link
Contributor Author

viclafargue commented Nov 27, 2023

The PR is ready for a first review. In its current state, it implements the build, extend and search ANN methods (IVF-Flat and IVF-PQ only for now) in index duplication and sharding mode. For now, the index duplication mode only works by copying the index dataset over and building the index on each GPU separately. I am now looking to improve the API in such a way that it would allow to build the index on a GPU and copy it over. Serialization on disk would work, but does not seem ideal. Then, transferring the index attributes through NCCL seem like not very safe. What would you recommend?

@viclafargue viclafargue marked this pull request as ready for review November 27, 2023 17:45
@viclafargue viclafargue requested review from a team as code owners November 27, 2023 17:45
@viclafargue viclafargue requested a review from a team as a code owner January 10, 2024 14:42
for (int rank = 0; rank < num_ranks_; rank++) {
RAFT_CUDA_TRY(cudaSetDevice(dev_ids_[rank]));
auto& ann_if = ann_interfaces_.emplace_back();
ann_if.build(dev_resources_[rank], index_params, index_dataset);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it every GPU will copy the host dataset into device so the total number of copies will be num_ranks_?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another related question, will GPU 1 not start and wait until GPU 0 build finishes? If that's the case, the total runtime of the for loop seems to be single GPU build time * num_ranks_.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes exactly, in the index duplication mode the dataset is copied in full to each GPU for training. An alternative method is to train a model locally, serialize it and distribute it with either one of the distribute_flat, distribute_pq or distribute_cagra functions.

Another related question, will GPU 1 not start and wait until GPU 0 build finishes? If that's the case, the total runtime of the for loop seems to be single GPU build time * num_ranks_.

The build, extend and search functions take in a handle parameter containing the CUDA stream on which the kernels should be launched. These operations are supposed to be asynchronous allowing fast switching of GPUs. However, this has not yet been tested. An actual benchmark would be necessary to confirm that things scale as expected.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An alternative method is to train a model locally, serialize it and distribute it with either one of the

This is definitely what we want here. We're going to have to wait for the index to build anyways, but in replicated mode we should only have to build it once and then broadcast it to the other GPUs.

Copy link
Contributor

@tfeher tfeher Apr 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a problem here. Building a GPU index is not only a GPU operation. It can have significant CPU work (e.g. CAGRA graph optimization, NN descent data pre/post proc, host side sub-sampling for IVF-methods).

Furthermore there are cases where our algorithms block CPU thread while waiting for GPU kernels to finish ( e.g. wait for return values that determine memory allocation size).

We cannot launch build on a single CPU thread and expect that it will run parallel just because the GPU ops are asynchronous. Most are, but the the few that I cite above will essentially serialize the whole process.

At least we would need different worker threads for each GPU stream. But I would recommend one process per GPU.

We should also keep in mind that build is multi-threaded. It spawns OpenMP threads to help shuffle data in host memory (singe thread is not enough to saturate mem bandwidth). We should document that this can be controlled with the OMP_NUM_THREADS variable.

RAFT_NCCL_TRY(ncclCommInitAll(nccl_comms_.data(), num_ranks_, dev_ids_.data()));
for (int rank = 0; rank < num_ranks_; rank++) {
RAFT_CUDA_TRY(cudaSetDevice(dev_ids_[rank]));
raft::comms::build_comms_nccl_only(&dev_resources_[rank], nccl_comms_[rank], num_ranks_, rank);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The NCCL initialization seems to be "one process multiple GPUs".

Is it possible to adapt it to "one process or thread one GPU"? May have to use something like std::thread. But the benefit is to enable the APIs of the PR to be reusable to Dask/Spark. Both Dask and Spark currently comply with one process one GPU when initializing NCCL.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The single process solution was better suited to implement the much requested feature in RAFT for now. But, I agree that in the end we should definitely look into making it possible to make things run on Dask/Spark. This would probably involve the use of multiple processes/threads and a much broader use of NCCL.
cc @cjnolet

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that RAFT also developer guide also suggests one process per GPU https://github.com/rapidsai/raft/blob/branch-24.06/docs/source/developer_guide.md#multi-gpu

@lijinf2
Copy link

lijinf2 commented Jan 31, 2024

Thank you Victor. I have learned a lot from the code! I like the idea of combining three algorithms into one unified interface. A few questions to make myself more familiar with the PR and design choice. Will be wonderful if Spark Rapids ML can leverage the APIs in this PR.

@viclafargue viclafargue requested a review from a team as a code owner March 26, 2024 17:06
Copy link

Check out this pull request on  ReviewNB

See visual diffs & provide feedback on Jupyter Notebooks.


Powered by ReviewNB

@github-actions github-actions bot added the ci label Mar 26, 2024
@viclafargue viclafargue changed the base branch from branch-23.12 to branch-24.04 March 26, 2024 17:06
Copy link
Member

@cjnolet cjnolet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey Victor. This is a pretty sizeable PR so my suggestions will come in a few different passes through the changes. I took an initial look. Overall I think it's headed in the right direction. A lot of my suggestions so far are mechanical things. I'll take a closer look at the impl next.

.gitignore Outdated Show resolved Hide resolved
cpp/CMakeLists.txt Outdated Show resolved Hide resolved
cpp/bench/ann/src/raft/raft_ann_mg_wrapper.h Outdated Show resolved Hide resolved
cpp/include/raft/neighbors/ann_mg.cuh Outdated Show resolved Hide resolved
cpp/include/raft/neighbors/ann_mg.cuh Outdated Show resolved Hide resolved
cpp/include/raft/neighbors/ann_mg.cuh Outdated Show resolved Hide resolved
cpp/include/raft/neighbors/detail/ann_mg.cuh Outdated Show resolved Hide resolved
for (int rank = 0; rank < num_ranks_; rank++) {
RAFT_CUDA_TRY(cudaSetDevice(dev_ids_[rank]));
auto& ann_if = ann_interfaces_.emplace_back();
ann_if.build(dev_resources_[rank], index_params, index_dataset);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An alternative method is to train a model locally, serialize it and distribute it with either one of the

This is definitely what we want here. We're going to have to wait for the index to build anyways, but in replicated mode we should only have to build it once and then broadcast it to the other GPUs.

@@ -472,6 +472,19 @@
{"nprobe": 2000}
]
},
{
"name": "raft_ann_mg.nlist16384",
"algo": "raft_ann_mg",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think raft_ivf_flat_mg and raft_ivf_pq_mg might make more sense here.

Copy link
Contributor

@tfeher tfeher left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Victor for the PR! The code is well structured and clean, but I want to point out a few issues that we need to discuss (see below). I think these can be conceptually easily fixed by adhering to our One Process per GPU principle.

distances_ann.data(), ps.num_queries, ps.k);

/*
TODO : fix CAGRA serialization issue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the issue?

cpp/include/raft/neighbors/detail/ann_mg.cuh Outdated Show resolved Hide resolved
public:
void build(raft::resources const& handle,
const ann::index_params* index_params,
raft::host_matrix_view<const T, IdxT, row_major> h_index_dataset)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just have an mdspan as input with accessor as template type? Here is one example. Alternatively in IVF-Flat we have overloads with host and device mdspans, although I would advocate against having many overloads with different mdspan variants.

We want to provide flexibility for input dataset location. The user might have a custom mdspan with a specific allocator that is accessible both in host and device (such thing is used in CAGRA wrapper for raft-ann bench).

{
IdxT n_rows = h_index_dataset.extent(0);
IdxT n_dims = h_index_dataset.extent(1);
auto d_index_dataset = raft::make_device_matrix<T, IdxT, row_major>(handle, n_rows, n_dims);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some algorithms work on dataset larger than device mem. It would be better not to assume that we fit into GPU mem.

{
IdxT n_rows = h_new_vectors.extent(0);
IdxT n_dims = h_new_vectors.extent(1);
auto d_new_vectors = raft::make_device_matrix<T, IdxT, row_major>(handle, n_rows, n_dims);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, do not assume that dataset fits into GPU memory.

Comment on lines 259 to 260
init_device_resources();
init_nccl_clique();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we init the communicator inside this algorithm? To me it would be more natural if we allow a user to create a communicator and pass it through a device_resources handle (and our developer guide also recommends that mode of operation). We could still provide a helper function to initialize the resources, to make life easier for users.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tagging @cjnolet for input.

RAFT_NCCL_TRY(ncclCommInitAll(nccl_comms_.data(), num_ranks_, dev_ids_.data()));
for (int rank = 0; rank < num_ranks_; rank++) {
RAFT_CUDA_TRY(cudaSetDevice(dev_ids_[rank]));
raft::comms::build_comms_nccl_only(&dev_resources_[rank], nccl_comms_[rank], num_ranks_, rank);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that RAFT also developer guide also suggests one process per GPU https://github.com/rapidsai/raft/blob/branch-24.06/docs/source/developer_guide.md#multi-gpu

for (int rank = 0; rank < num_ranks_; rank++) {
RAFT_CUDA_TRY(cudaSetDevice(dev_ids_[rank]));
auto& ann_if = ann_interfaces_.emplace_back();
ann_if.build(dev_resources_[rank], index_params, index_dataset);
Copy link
Contributor

@tfeher tfeher Apr 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a problem here. Building a GPU index is not only a GPU operation. It can have significant CPU work (e.g. CAGRA graph optimization, NN descent data pre/post proc, host side sub-sampling for IVF-methods).

Furthermore there are cases where our algorithms block CPU thread while waiting for GPU kernels to finish ( e.g. wait for return values that determine memory allocation size).

We cannot launch build on a single CPU thread and expect that it will run parallel just because the GPU ops are asynchronous. Most are, but the the few that I cite above will essentially serialize the whole process.

At least we would need different worker threads for each GPU stream. But I would recommend one process per GPU.

We should also keep in mind that build is multi-threaded. It spawns OpenMP threads to help shuffle data in host memory (singe thread is not enough to saturate mem bandwidth). We should document that this can be controlled with the OMP_NUM_THREADS variable.

Comment on lines 351 to 354
n_rows_per_shard = std::min(n_rows_per_shard, n_rows - offset);
const T* partition_ptr = index_dataset.data_handle() + (offset * n_cols);
auto partition = raft::make_host_matrix_view<const T, IdxT, row_major>(
partition_ptr, n_rows_per_shard, n_cols);
Copy link
Contributor

@tfeher tfeher Apr 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it really the job of ann_mg_index to shard the data? An alternative is to provide this sharding code as a helper function, that the user has to explicitly call to provide already sharded input to ann_mg_index::build

On single node, it make sense to add sharding inside build. The hole dataset can be loaded (or mmap-ed) by the host, and we can shard the data here.

On multiple node the data has to be loaded by each node. Due to its large size, each node would load only their portion of the data. If we allow ann_mp_index::build to take that sharded data, then we can already support multi node.

(Yes, I know that multi node is out of scope of the current PR, but if we plan to add that in the future, then we should probably agree already now on a build API that would easily enables that).

Tagging @cjnolet to share his thoughts.

auto d_trans = raft::make_device_vector<IdxT, IdxT>(root_handle_, num_ranks_);
raft::copy(d_trans.data_handle(), h_trans.data(), num_ranks_, resource::get_cuda_stream(root_handle_));
auto translations = std::make_optional<raft::device_vector_view<IdxT, IdxT>>(d_trans.view());
raft::neighbors::brute_force::knn_merge_parts<float, IdxT>(root_handle_,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of scope for the current PR, but we might consider as a follow up: IVF-PQ or CAGRA-Q only return approximate distances. While merging parts based on the approximate distances, we might be throwing out good neighbors due to innacurate distance values. If we plan to do refinement, then we can treat the in_neighbors as candidates for refinement, and run refinement directly instead of calling knn_merge_parts.

@github-actions github-actions bot removed the ci label May 3, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CMake cpp improvement Improvement / enhancement to an existing function non-breaking Non-breaking change python Vector Search
Projects
Status: In Progress
Development

Successfully merging this pull request may close these issues.

None yet

4 participants