-
Notifications
You must be signed in to change notification settings - Fork 178
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SNMG ANN #1993
base: branch-24.04
Are you sure you want to change the base?
SNMG ANN #1993
Conversation
The PR is ready for a first review. In its current state, it implements the build, extend and search ANN methods (IVF-Flat and IVF-PQ only for now) in index duplication and sharding mode. For now, the index duplication mode only works by copying the index dataset over and building the index on each GPU separately. I am now looking to improve the API in such a way that it would allow to build the index on a GPU and copy it over. Serialization on disk would work, but does not seem ideal. Then, transferring the index attributes through NCCL seem like not very safe. What would you recommend? |
for (int rank = 0; rank < num_ranks_; rank++) { | ||
RAFT_CUDA_TRY(cudaSetDevice(dev_ids_[rank])); | ||
auto& ann_if = ann_interfaces_.emplace_back(); | ||
ann_if.build(dev_resources_[rank], index_params, index_dataset); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it every GPU will copy the host dataset into device so the total number of copies will be num_ranks_?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another related question, will GPU 1 not start and wait until GPU 0 build finishes? If that's the case, the total runtime of the for loop seems to be single GPU build time * num_ranks_.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes exactly, in the index duplication mode the dataset is copied in full to each GPU for training. An alternative method is to train a model locally, serialize it and distribute it with either one of the distribute_flat
, distribute_pq
or distribute_cagra
functions.
Another related question, will GPU 1 not start and wait until GPU 0 build finishes? If that's the case, the total runtime of the for loop seems to be single GPU build time * num_ranks_.
The build
, extend
and search
functions take in a handle
parameter containing the CUDA stream on which the kernels should be launched. These operations are supposed to be asynchronous allowing fast switching of GPUs. However, this has not yet been tested. An actual benchmark would be necessary to confirm that things scale as expected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An alternative method is to train a model locally, serialize it and distribute it with either one of the
This is definitely what we want here. We're going to have to wait for the index to build anyways, but in replicated mode we should only have to build it once and then broadcast it to the other GPUs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have a problem here. Building a GPU index is not only a GPU operation. It can have significant CPU work (e.g. CAGRA graph optimization, NN descent data pre/post proc, host side sub-sampling for IVF-methods).
Furthermore there are cases where our algorithms block CPU thread while waiting for GPU kernels to finish ( e.g. wait for return values that determine memory allocation size).
We cannot launch build
on a single CPU thread and expect that it will run parallel just because the GPU ops are asynchronous. Most are, but the the few that I cite above will essentially serialize the whole process.
At least we would need different worker threads for each GPU stream. But I would recommend one process per GPU.
We should also keep in mind that build
is multi-threaded. It spawns OpenMP threads to help shuffle data in host memory (singe thread is not enough to saturate mem bandwidth). We should document that this can be controlled with the OMP_NUM_THREADS
variable.
RAFT_NCCL_TRY(ncclCommInitAll(nccl_comms_.data(), num_ranks_, dev_ids_.data())); | ||
for (int rank = 0; rank < num_ranks_; rank++) { | ||
RAFT_CUDA_TRY(cudaSetDevice(dev_ids_[rank])); | ||
raft::comms::build_comms_nccl_only(&dev_resources_[rank], nccl_comms_[rank], num_ranks_, rank); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The NCCL initialization seems to be "one process multiple GPUs".
Is it possible to adapt it to "one process or thread one GPU"? May have to use something like std::thread. But the benefit is to enable the APIs of the PR to be reusable to Dask/Spark. Both Dask and Spark currently comply with one process one GPU when initializing NCCL.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The single process solution was better suited to implement the much requested feature in RAFT for now. But, I agree that in the end we should definitely look into making it possible to make things run on Dask/Spark. This would probably involve the use of multiple processes/threads and a much broader use of NCCL.
cc @cjnolet
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that RAFT also developer guide also suggests one process per GPU https://github.com/rapidsai/raft/blob/branch-24.06/docs/source/developer_guide.md#multi-gpu
Thank you Victor. I have learned a lot from the code! I like the idea of combining three algorithms into one unified interface. A few questions to make myself more familiar with the PR and design choice. Will be wonderful if Spark Rapids ML can leverage the APIs in this PR. |
Check out this pull request on See visual diffs & provide feedback on Jupyter Notebooks. Powered by ReviewNB |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey Victor. This is a pretty sizeable PR so my suggestions will come in a few different passes through the changes. I took an initial look. Overall I think it's headed in the right direction. A lot of my suggestions so far are mechanical things. I'll take a closer look at the impl next.
for (int rank = 0; rank < num_ranks_; rank++) { | ||
RAFT_CUDA_TRY(cudaSetDevice(dev_ids_[rank])); | ||
auto& ann_if = ann_interfaces_.emplace_back(); | ||
ann_if.build(dev_resources_[rank], index_params, index_dataset); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An alternative method is to train a model locally, serialize it and distribute it with either one of the
This is definitely what we want here. We're going to have to wait for the index to build anyways, but in replicated mode we should only have to build it once and then broadcast it to the other GPUs.
@@ -472,6 +472,19 @@ | |||
{"nprobe": 2000} | |||
] | |||
}, | |||
{ | |||
"name": "raft_ann_mg.nlist16384", | |||
"algo": "raft_ann_mg", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think raft_ivf_flat_mg
and raft_ivf_pq_mg
might make more sense here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Victor for the PR! The code is well structured and clean, but I want to point out a few issues that we need to discuss (see below). I think these can be conceptually easily fixed by adhering to our One Process per GPU principle.
cpp/test/neighbors/ann_mg.cuh
Outdated
distances_ann.data(), ps.num_queries, ps.k); | ||
|
||
/* | ||
TODO : fix CAGRA serialization issue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the issue?
public: | ||
void build(raft::resources const& handle, | ||
const ann::index_params* index_params, | ||
raft::host_matrix_view<const T, IdxT, row_major> h_index_dataset) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we just have an mdspan
as input with accessor
as template type? Here is one example. Alternatively in IVF-Flat we have overloads with host and device mdspans, although I would advocate against having many overloads with different mdspan variants.
We want to provide flexibility for input dataset location. The user might have a custom mdspan with a specific allocator that is accessible both in host and device (such thing is used in CAGRA wrapper for raft-ann bench).
{ | ||
IdxT n_rows = h_index_dataset.extent(0); | ||
IdxT n_dims = h_index_dataset.extent(1); | ||
auto d_index_dataset = raft::make_device_matrix<T, IdxT, row_major>(handle, n_rows, n_dims); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some algorithms work on dataset larger than device mem. It would be better not to assume that we fit into GPU mem.
{ | ||
IdxT n_rows = h_new_vectors.extent(0); | ||
IdxT n_dims = h_new_vectors.extent(1); | ||
auto d_new_vectors = raft::make_device_matrix<T, IdxT, row_major>(handle, n_rows, n_dims); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above, do not assume that dataset fits into GPU memory.
init_device_resources(); | ||
init_nccl_clique(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we init the communicator inside this algorithm? To me it would be more natural if we allow a user to create a communicator and pass it through a device_resources
handle (and our developer guide also recommends that mode of operation). We could still provide a helper function to initialize the resources, to make life easier for users.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tagging @cjnolet for input.
RAFT_NCCL_TRY(ncclCommInitAll(nccl_comms_.data(), num_ranks_, dev_ids_.data())); | ||
for (int rank = 0; rank < num_ranks_; rank++) { | ||
RAFT_CUDA_TRY(cudaSetDevice(dev_ids_[rank])); | ||
raft::comms::build_comms_nccl_only(&dev_resources_[rank], nccl_comms_[rank], num_ranks_, rank); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that RAFT also developer guide also suggests one process per GPU https://github.com/rapidsai/raft/blob/branch-24.06/docs/source/developer_guide.md#multi-gpu
for (int rank = 0; rank < num_ranks_; rank++) { | ||
RAFT_CUDA_TRY(cudaSetDevice(dev_ids_[rank])); | ||
auto& ann_if = ann_interfaces_.emplace_back(); | ||
ann_if.build(dev_resources_[rank], index_params, index_dataset); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have a problem here. Building a GPU index is not only a GPU operation. It can have significant CPU work (e.g. CAGRA graph optimization, NN descent data pre/post proc, host side sub-sampling for IVF-methods).
Furthermore there are cases where our algorithms block CPU thread while waiting for GPU kernels to finish ( e.g. wait for return values that determine memory allocation size).
We cannot launch build
on a single CPU thread and expect that it will run parallel just because the GPU ops are asynchronous. Most are, but the the few that I cite above will essentially serialize the whole process.
At least we would need different worker threads for each GPU stream. But I would recommend one process per GPU.
We should also keep in mind that build
is multi-threaded. It spawns OpenMP threads to help shuffle data in host memory (singe thread is not enough to saturate mem bandwidth). We should document that this can be controlled with the OMP_NUM_THREADS
variable.
n_rows_per_shard = std::min(n_rows_per_shard, n_rows - offset); | ||
const T* partition_ptr = index_dataset.data_handle() + (offset * n_cols); | ||
auto partition = raft::make_host_matrix_view<const T, IdxT, row_major>( | ||
partition_ptr, n_rows_per_shard, n_cols); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it really the job of ann_mg_index
to shard the data? An alternative is to provide this sharding code as a helper function, that the user has to explicitly call to provide already sharded input to ann_mg_index::build
On single node, it make sense to add sharding inside build
. The hole dataset can be loaded (or mmap
-ed) by the host, and we can shard the data here.
On multiple node the data has to be loaded by each node. Due to its large size, each node would load only their portion of the data. If we allow ann_mp_index::build
to take that sharded data, then we can already support multi node.
(Yes, I know that multi node is out of scope of the current PR, but if we plan to add that in the future, then we should probably agree already now on a build API that would easily enables that).
Tagging @cjnolet to share his thoughts.
auto d_trans = raft::make_device_vector<IdxT, IdxT>(root_handle_, num_ranks_); | ||
raft::copy(d_trans.data_handle(), h_trans.data(), num_ranks_, resource::get_cuda_stream(root_handle_)); | ||
auto translations = std::make_optional<raft::device_vector_view<IdxT, IdxT>>(d_trans.view()); | ||
raft::neighbors::brute_force::knn_merge_parts<float, IdxT>(root_handle_, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of scope for the current PR, but we might consider as a follow up: IVF-PQ or CAGRA-Q only return approximate distances. While merging parts based on the approximate distances, we might be throwing out good neighbors due to innacurate distance values. If we plan to do refinement, then we can treat the in_neighbors
as candidates for refinement, and run refinement directly instead of calling knn_merge_parts
.
The goal of this PR is to implement a distributed (single-node-multiple-GPUs) implementation of ANN indexes. It will allow to
build
,extend
andsearch
an index on multiple GPUs.Before building the index, the user has to choose between two modes :