Created
April 25, 2019 16:57
-
-
Save sriramch/fe322a112bdce89ff0611577828da6ca to your computer and use it in GitHub Desktop.
xgboost external memory training patch
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| diff --git a/src/common/hist_util.cu b/src/common/hist_util.cu | |
| index b4cc797..5aab67d 100644 | |
| --- a/src/common/hist_util.cu | |
| +++ b/src/common/hist_util.cu | |
| @@ -349,7 +349,7 @@ struct GPUSketcher { | |
| }; | |
| void Sketch(const SparsePage& batch, const MetaInfo& info, | |
| - HistCutMatrix* hmat, int gpu_batch_nrows) { | |
| + std::vector<WXQSketch> &sketches, int gpu_batch_nrows) { | |
| // create device shards | |
| shards_.resize(dist_.Devices().Size()); | |
| dh::ExecuteIndexShards(&shards_, [&](int i, std::unique_ptr<DeviceShard>& shard) { | |
| @@ -370,17 +370,13 @@ struct GPUSketcher { | |
| // merge the sketches from all shards | |
| // TODO(canonizer): do it in a tree-like reduction | |
| int num_cols = info.num_col_; | |
| - std::vector<WXQSketch> sketches(num_cols); | |
| WXQSketch::SummaryContainer summary; | |
| for (int icol = 0; icol < num_cols; ++icol) { | |
| - sketches[icol].Init(batch.Size(), 1.0 / (8 * param_.max_bin)); | |
| for (auto &shard : shards_) { | |
| shard->GetSummary(&summary, icol); | |
| sketches[icol].PushSummary(summary); | |
| } | |
| } | |
| - | |
| - hmat->Init(&sketches, param_.max_bin); | |
| } | |
| GPUSketcher(tree::TrainParam param, size_t n_rows) : param_(std::move(param)) { | |
| @@ -395,9 +391,10 @@ struct GPUSketcher { | |
| void DeviceSketch | |
| (const SparsePage& batch, const MetaInfo& info, | |
| - const tree::TrainParam& param, HistCutMatrix* hmat, int gpu_batch_nrows) { | |
| + const tree::TrainParam& param, std::vector<WXQSketch> &sketches, | |
| + int gpu_batch_nrows) { | |
| GPUSketcher sketcher(param, info.num_row_); | |
| - sketcher.Sketch(batch, info, hmat, gpu_batch_nrows); | |
| + sketcher.Sketch(batch, info, sketches, gpu_batch_nrows); | |
| } | |
| } // namespace common | |
| diff --git a/src/common/hist_util.h b/src/common/hist_util.h | |
| index 3aa2f23..47a2f5f 100644 | |
| --- a/src/common/hist_util.h | |
| +++ b/src/common/hist_util.h | |
| @@ -50,7 +50,8 @@ struct HistCutMatrix { | |
| /*! \brief Builds the cut matrix on the GPU */ | |
| void DeviceSketch | |
| (const SparsePage& batch, const MetaInfo& info, | |
| - const tree::TrainParam& param, HistCutMatrix* hmat, int gpu_batch_nrows); | |
| + const tree::TrainParam& param, std::vector<HistCutMatrix::WXQSketch> &sketches, | |
| + int gpu_batch_nrows); | |
| /*! | |
| * \brief A single row in global histogram index. | |
| diff --git a/src/tree/updater_gpu_hist.cu b/src/tree/updater_gpu_hist.cu | |
| index e93ca9a..18f0908 100644 | |
| --- a/src/tree/updater_gpu_hist.cu | |
| +++ b/src/tree/updater_gpu_hist.cu | |
| @@ -164,8 +164,10 @@ struct ELLPackMatrix { | |
| } | |
| void Init(common::Span<uint32_t> feature_segments, | |
| common::Span<bst_float> min_fvalue, | |
| - common::Span<bst_float> gidx_fvalue_map, size_t row_stride, | |
| - common::CompressedIterator<uint32_t> gidx_iter, bool is_dense, | |
| + common::Span<bst_float> gidx_fvalue_map, | |
| + size_t row_stride, | |
| + common::CompressedIterator<uint32_t> gidx_iter, | |
| + bool is_dense, | |
| int null_gidx_value) { | |
| this->feature_segments = feature_segments; | |
| this->min_fvalue = min_fvalue; | |
| @@ -656,6 +658,7 @@ struct DeviceShard { | |
| bst_uint row_begin_idx; | |
| bst_uint row_end_idx; | |
| bst_uint n_rows; | |
| + bst_uint rows_processed; | |
| TrainParam param; | |
| bool prediction_cache_initialised; | |
| @@ -674,36 +677,17 @@ struct DeviceShard { | |
| row_begin_idx(row_begin), | |
| row_end_idx(row_end), | |
| n_rows(row_end - row_begin), | |
| + rows_processed(0), | |
| n_bins(0), | |
| param(std::move(_param)), | |
| prediction_cache_initialised(false) {} | |
| - /* Init row_ptrs and row_stride */ | |
| - size_t InitRowPtrs(const SparsePage& row_batch) { | |
| - const auto& offset_vec = row_batch.offset.HostVector(); | |
| - row_ptrs.resize(n_rows + 1); | |
| - thrust::copy(offset_vec.data() + row_begin_idx, | |
| - offset_vec.data() + row_end_idx + 1, | |
| - row_ptrs.begin()); | |
| - auto row_iter = row_ptrs.begin(); | |
| - // find the maximum row size for converting to ELLPack | |
| - auto get_size = [=] __device__(size_t row) { | |
| - return row_iter[row + 1] - row_iter[row]; | |
| - }; // NOLINT | |
| - | |
| - auto counting = thrust::make_counting_iterator(size_t(0)); | |
| - using TransformT = thrust::transform_iterator<decltype(get_size), | |
| - decltype(counting), size_t>; | |
| - TransformT row_size_iter = TransformT(counting, get_size); | |
| - size_t row_stride = thrust::reduce(row_size_iter, row_size_iter + n_rows, 0, | |
| - thrust::maximum<size_t>()); | |
| - return row_stride; | |
| - } | |
| - | |
| void InitCompressedData( | |
| - const common::HistCutMatrix& hmat, const SparsePage& row_batch, bool is_dense); | |
| + const common::HistCutMatrix& hmat, size_t row_stride, bool is_dense); | |
| - void CreateHistIndices(const SparsePage& row_batch, size_t row_stride, int null_gidx_value); | |
| + void CreateHistIndices( | |
| + const SparsePage& row_batch, size_t shard_idx, | |
| + const std::vector<size_t> &shard_allocations, int null_gidx_value); | |
| ~DeviceShard() { | |
| dh::safe_cuda(cudaSetDevice(device_id)); | |
| @@ -1045,11 +1029,13 @@ struct GlobalMemHistBuilder : public GPUHistBuilderBase<GradientSumT> { | |
| template <typename GradientSumT> | |
| inline void DeviceShard<GradientSumT>::InitCompressedData( | |
| - const common::HistCutMatrix& hmat, const SparsePage& row_batch, bool is_dense) { | |
| - size_t row_stride = this->InitRowPtrs(row_batch); | |
| + const common::HistCutMatrix& hmat, size_t row_stride, bool is_dense) { | |
| n_bins = hmat.row_ptr.back(); | |
| int null_gidx_value = hmat.row_ptr.back(); | |
| + CHECK(!(param.max_leaves == 0 && param.max_depth == 0)) | |
| + << "Max leaves and max depth cannot both be unconstrained for " | |
| + "gpu_hist."; | |
| int max_nodes = | |
| param.max_leaves > 0 ? param.max_leaves * 2 : MaxNodesDepth(param.max_depth); | |
| @@ -1072,24 +1058,17 @@ inline void DeviceShard<GradientSumT>::InitCompressedData( | |
| node_sum_gradients.resize(max_nodes); | |
| ridx_segments.resize(max_nodes); | |
| - | |
| // allocate compressed bin data | |
| int num_symbols = n_bins + 1; | |
| // Required buffer size for storing data matrix in ELLPack format. | |
| size_t compressed_size_bytes = | |
| common::CompressedBufferWriter::CalculateBufferSize(row_stride * n_rows, | |
| num_symbols); | |
| - | |
| - CHECK(!(param.max_leaves == 0 && param.max_depth == 0)) | |
| - << "Max leaves and max depth cannot both be unconstrained for " | |
| - "gpu_hist."; | |
| ba.Allocate(device_id, &gidx_buffer, compressed_size_bytes); | |
| thrust::fill( | |
| thrust::device_pointer_cast(gidx_buffer.data()), | |
| thrust::device_pointer_cast(gidx_buffer.data() + gidx_buffer.size()), 0); | |
| - this->CreateHistIndices(row_batch, row_stride, null_gidx_value); | |
| - | |
| ellpack_matrix.Init( | |
| feature_segments, min_fvalue, | |
| gidx_fvalue_map, row_stride, | |
| @@ -1113,23 +1092,41 @@ inline void DeviceShard<GradientSumT>::InitCompressedData( | |
| template <typename GradientSumT> | |
| inline void DeviceShard<GradientSumT>::CreateHistIndices( | |
| - const SparsePage& row_batch, size_t row_stride, int null_gidx_value) { | |
| + const SparsePage& row_batch, | |
| + size_t shard_idx, | |
| + const std::vector<size_t> &shard_allocations, | |
| + int null_gidx_value) { | |
| + // Has any been allocated for me in this batch? | |
| + size_t num_elems = shard_allocations[shard_idx]; | |
| + if (!num_elems) return; | |
| + | |
| + size_t row_stride = this->ellpack_matrix.row_stride; | |
| + | |
| + // Take into account the rows that could be processed by other GPUs | |
| + size_t begin_idx(0); | |
| + for (size_t i = 0; i < shard_idx; ++i) begin_idx += shard_allocations[i]; | |
| + | |
| + const auto& offset_vec = row_batch.offset.HostVector(); | |
| + row_ptrs.resize(num_elems+1); | |
| + thrust::copy(offset_vec.data() + begin_idx, | |
| + offset_vec.data() + begin_idx + num_elems + 1, | |
| + row_ptrs.begin()); | |
| + | |
| int num_symbols = n_bins + 1; | |
| // bin and compress entries in batches of rows | |
| size_t gpu_batch_nrows = | |
| std::min | |
| (dh::TotalMemory(device_id) / (16 * row_stride * sizeof(Entry)), | |
| - static_cast<size_t>(n_rows)); | |
| + static_cast<size_t>(num_elems)); | |
| const std::vector<Entry>& data_vec = row_batch.data.HostVector(); | |
| thrust::device_vector<Entry> entries_d(gpu_batch_nrows * row_stride); | |
| - size_t gpu_nbatches = dh::DivRoundUp(n_rows, gpu_batch_nrows); | |
| - | |
| + size_t gpu_nbatches = dh::DivRoundUp(num_elems, gpu_batch_nrows); | |
| for (size_t gpu_batch = 0; gpu_batch < gpu_nbatches; ++gpu_batch) { | |
| size_t batch_row_begin = gpu_batch * gpu_batch_nrows; | |
| size_t batch_row_end = (gpu_batch + 1) * gpu_batch_nrows; | |
| - if (batch_row_end > n_rows) { | |
| - batch_row_end = n_rows; | |
| + if (batch_row_end > num_elems) { | |
| + batch_row_end = num_elems; | |
| } | |
| size_t batch_nrows = batch_row_end - batch_row_begin; | |
| // number of entries in this batch. | |
| @@ -1140,19 +1137,25 @@ inline void DeviceShard<GradientSumT>::CreateHistIndices( | |
| (entries_d.data().get(), data_vec.data() + row_ptrs[batch_row_begin], | |
| n_entries * sizeof(Entry), cudaMemcpyDefault)); | |
| const dim3 block3(32, 8, 1); // 256 threads | |
| - const dim3 grid3(dh::DivRoundUp(n_rows, block3.x), | |
| + const dim3 grid3(dh::DivRoundUp(num_elems, block3.x), | |
| dh::DivRoundUp(row_stride, block3.y), 1); | |
| CompressBinEllpackKernel<<<grid3, block3>>> | |
| (common::CompressedBufferWriter(num_symbols), | |
| gidx_buffer.data(), | |
| row_ptrs.data().get() + batch_row_begin, | |
| entries_d.data().get(), | |
| - gidx_fvalue_map.data(), feature_segments.data(), | |
| - batch_row_begin, batch_nrows, | |
| + gidx_fvalue_map.data(), | |
| + feature_segments.data(), | |
| + rows_processed + batch_row_begin, | |
| + batch_nrows, | |
| row_ptrs[batch_row_begin], | |
| - row_stride, null_gidx_value); | |
| + row_stride, | |
| + null_gidx_value); | |
| } | |
| + // This will be the offset into the gidx_buffer for the next batch | |
| + rows_processed += num_elems; | |
| + | |
| // free the memory that is no longer needed | |
| row_ptrs.resize(0); | |
| row_ptrs.shrink_to_fit(); | |
| @@ -1216,8 +1219,6 @@ class GPUHistMakerSpecialised{ | |
| reducer_.Init(device_list_); | |
| - auto batch_iter = dmat->GetRowBatches().begin(); | |
| - const SparsePage& batch = *batch_iter; | |
| // Create device shards | |
| shards_.resize(n_devices); | |
| dh::ExecuteIndexShards( | |
| @@ -1231,23 +1232,96 @@ class GPUHistMakerSpecialised{ | |
| start + size, param_)); | |
| }); | |
| - // Find the cuts. | |
| + // Initialize Sketches across all batches | |
| + std::vector<common::HistCutMatrix::WXQSketch> sketches(info_->num_col_); | |
| +#pragma omp parallel for schedule(static) | |
| + for (int icol = 0; icol < info_->num_col_; ++icol) { | |
| + sketches[icol].Init(info_->num_row_, 1.0 / (8 * param_.max_bin)); | |
| + } | |
| + | |
| + size_t row_stride(0); // Max row_stride across the entire dataset | |
| + size_t processed_size(0); | |
| + | |
| + // Rows allocated to each GPU in a batch | |
| + std::vector<std::vector<size_t>> shard_allocations; | |
| monitor_.StartCuda("Quantiles"); | |
| - common::DeviceSketch(batch, *info_, param_, &hmat_, hist_maker_param_.gpu_batch_nrows); | |
| - n_bins_ = hmat_.row_ptr.back(); | |
| + for (const auto &batch : dmat->GetRowBatches()) { | |
| + struct RowReset { | |
| + public: | |
| + RowReset(MetaInfo &minfo, const SparsePage &r_batch) | |
| + : minfo_(minfo) { | |
| + orig_nrows_ = minfo_.num_row_; | |
| + minfo.num_row_ = r_batch.Size(); | |
| + } | |
| + | |
| + ~RowReset() { | |
| + minfo_.num_row_ = orig_nrows_; | |
| + } | |
| + private: | |
| + MetaInfo &minfo_; | |
| + size_t orig_nrows_; | |
| + }rowgd(*info_, batch); | |
| + | |
| + // Find the cuts for each batch | |
| + common::DeviceSketch(batch, *info_, param_, sketches, hist_maker_param_.gpu_batch_nrows); | |
| + | |
| + const auto &offset_vec = batch.offset.HostVector(); | |
| + for (size_t i = 1; i < offset_vec.size(); ++i) { | |
| + row_stride = std::max(row_stride, offset_vec[i] - offset_vec[i-1]); | |
| + } | |
| + | |
| + std::vector<size_t> batch_allocation(shards_.size()); | |
| + // Distribute the rows in this batch to the different shards | |
| + for (size_t i = 0; i < shards_.size(); ++i) { | |
| + // Does this batch pertain to me? | |
| + ssize_t num_elems = processed_size + batch.Size() - shards_[i]->row_begin_idx; | |
| + ssize_t rem_elems = shards_[i]->row_end_idx - processed_size; | |
| + if (num_elems <= 0 || rem_elems <= 0) batch_allocation[i] = 0; | |
| + // How many elements do I process from this batch? | |
| + else { | |
| + num_elems = std::min( | |
| + std::min( | |
| + std::min(num_elems, rem_elems), | |
| + static_cast<ssize_t>(shards_[i]->n_rows)), | |
| + static_cast<ssize_t>(batch.Size())); | |
| + batch_allocation[i] = num_elems; | |
| + } | |
| + } | |
| + shard_allocations.push_back(batch_allocation); | |
| + | |
| + processed_size += batch.Size(); | |
| + } | |
| monitor_.StopCuda("Quantiles"); | |
| + | |
| + // Initialize hmat_ for the entire data set | |
| + hmat_.Init(&sketches, param_.max_bin); | |
| + | |
| + n_bins_ = hmat_.row_ptr.back(); | |
| + | |
| auto is_dense = info_->num_nonzero_ == info_->num_row_ * info_->num_col_; | |
| - monitor_.StartCuda("BinningCompression"); | |
| + // Init global data for each shard | |
| + monitor_.StartCuda("InitCompressedData"); | |
| dh::ExecuteIndexShards( | |
| &shards_, | |
| - [&](int idx, std::unique_ptr<DeviceShard<GradientSumT>>& shard) { | |
| - dh::safe_cuda(cudaSetDevice(shard->device_id)); | |
| - shard->InitCompressedData(hmat_, batch, is_dense); | |
| + [&](int i, std::unique_ptr<DeviceShard<GradientSumT>>& shard) { | |
| + dh::safe_cuda(cudaSetDevice(dist_.Devices().DeviceId(i))); | |
| + shard->InitCompressedData(hmat_, row_stride, is_dense); | |
| }); | |
| + monitor_.StopCuda("InitCompressedData"); | |
| + | |
| + monitor_.StartCuda("BinningCompression"); | |
| + size_t batch_idx(0); | |
| + for (const auto &batch : dmat->GetRowBatches()) { | |
| + dh::ExecuteIndexShards( | |
| + &shards_, | |
| + [&](int idx, std::unique_ptr<DeviceShard<GradientSumT>>& shard) { | |
| + dh::safe_cuda(cudaSetDevice(shard->device_id)); | |
| + shard->CreateHistIndices(batch, idx, shard_allocations[batch_idx], hmat_.row_ptr.back()); | |
| + }); | |
| + batch_idx++; | |
| + } | |
| monitor_.StopCuda("BinningCompression"); | |
| - ++batch_iter; | |
| - CHECK(batch_iter.AtEnd()) << "External memory not supported"; | |
| p_last_fmat_ = dmat; | |
| initialised_ = true; | |
| diff --git a/tests/cpp/common/test_gpu_hist_util.cu b/tests/cpp/common/test_gpu_hist_util.cu | |
| index 7c4fbd7..6713d6f 100644 | |
| --- a/tests/cpp/common/test_gpu_hist_util.cu | |
| +++ b/tests/cpp/common/test_gpu_hist_util.cu | |
| @@ -39,7 +39,13 @@ void TestDeviceSketch(const GPUSet& devices) { | |
| // find the cuts on the GPU | |
| const SparsePage& batch = *(*dmat)->GetRowBatches().begin(); | |
| HistCutMatrix hmat_gpu; | |
| - DeviceSketch(batch, (*dmat)->Info(), p, &hmat_gpu, gpu_batch_nrows); | |
| + std::vector<common::HistCutMatrix::WXQSketch> sketches((*dmat)->Info().num_col_); | |
| + for (int icol = 0; icol < (*dmat)->Info().num_col_; ++icol) { | |
| + sketches[icol].Init((*dmat)->Info().num_row_, 1.0 / (8 * p.max_bin)); | |
| + } | |
| + | |
| + DeviceSketch(batch, (*dmat)->Info(), p, sketches, gpu_batch_nrows); | |
| + hmat_gpu.Init(&sketches, p.max_bin); | |
| // compare the cuts | |
| double eps = 1e-2; | |
| diff --git a/tests/cpp/tree/test_gpu_hist.cu b/tests/cpp/tree/test_gpu_hist.cu | |
| index 37cce8b..5804786 100644 | |
| --- a/tests/cpp/tree/test_gpu_hist.cu | |
| +++ b/tests/cpp/tree/test_gpu_hist.cu | |
| @@ -77,7 +77,13 @@ void BuildGidx(DeviceShard<GradientSumT>* shard, int n_rows, int n_cols, | |
| auto is_dense = (*dmat)->Info().num_nonzero_ == | |
| (*dmat)->Info().num_row_ * (*dmat)->Info().num_col_; | |
| - shard->InitCompressedData(cmat, batch, is_dense); | |
| + size_t row_stride = 0; | |
| + const auto &offset_vec = batch.offset.HostVector(); | |
| + for (size_t i = 1; i < offset_vec.size(); ++i) { | |
| + row_stride = std::max(row_stride, offset_vec[i] - offset_vec[i-1]); | |
| + } | |
| + shard->InitCompressedData(cmat, row_stride, is_dense); | |
| + shard->CreateHistIndices(batch, 0, std::vector<size_t>(1, batch.Size()), cmat.row_ptr.back()); | |
| delete dmat; | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment