Skip to content

Instantly share code, notes, and snippets.

@sriramch
Created April 25, 2019 16:57
Show Gist options
  • Select an option

  • Save sriramch/fe322a112bdce89ff0611577828da6ca to your computer and use it in GitHub Desktop.

Select an option

Save sriramch/fe322a112bdce89ff0611577828da6ca to your computer and use it in GitHub Desktop.
xgboost external memory training patch
diff --git a/src/common/hist_util.cu b/src/common/hist_util.cu
index b4cc797..5aab67d 100644
--- a/src/common/hist_util.cu
+++ b/src/common/hist_util.cu
@@ -349,7 +349,7 @@ struct GPUSketcher {
};
void Sketch(const SparsePage& batch, const MetaInfo& info,
- HistCutMatrix* hmat, int gpu_batch_nrows) {
+ std::vector<WXQSketch> &sketches, int gpu_batch_nrows) {
// create device shards
shards_.resize(dist_.Devices().Size());
dh::ExecuteIndexShards(&shards_, [&](int i, std::unique_ptr<DeviceShard>& shard) {
@@ -370,17 +370,13 @@ struct GPUSketcher {
// merge the sketches from all shards
// TODO(canonizer): do it in a tree-like reduction
int num_cols = info.num_col_;
- std::vector<WXQSketch> sketches(num_cols);
WXQSketch::SummaryContainer summary;
for (int icol = 0; icol < num_cols; ++icol) {
- sketches[icol].Init(batch.Size(), 1.0 / (8 * param_.max_bin));
for (auto &shard : shards_) {
shard->GetSummary(&summary, icol);
sketches[icol].PushSummary(summary);
}
}
-
- hmat->Init(&sketches, param_.max_bin);
}
GPUSketcher(tree::TrainParam param, size_t n_rows) : param_(std::move(param)) {
@@ -395,9 +391,10 @@ struct GPUSketcher {
void DeviceSketch
(const SparsePage& batch, const MetaInfo& info,
- const tree::TrainParam& param, HistCutMatrix* hmat, int gpu_batch_nrows) {
+ const tree::TrainParam& param, std::vector<WXQSketch> &sketches,
+ int gpu_batch_nrows) {
GPUSketcher sketcher(param, info.num_row_);
- sketcher.Sketch(batch, info, hmat, gpu_batch_nrows);
+ sketcher.Sketch(batch, info, sketches, gpu_batch_nrows);
}
} // namespace common
diff --git a/src/common/hist_util.h b/src/common/hist_util.h
index 3aa2f23..47a2f5f 100644
--- a/src/common/hist_util.h
+++ b/src/common/hist_util.h
@@ -50,7 +50,8 @@ struct HistCutMatrix {
/*! \brief Builds the cut matrix on the GPU */
void DeviceSketch
(const SparsePage& batch, const MetaInfo& info,
- const tree::TrainParam& param, HistCutMatrix* hmat, int gpu_batch_nrows);
+ const tree::TrainParam& param, std::vector<HistCutMatrix::WXQSketch> &sketches,
+ int gpu_batch_nrows);
/*!
* \brief A single row in global histogram index.
diff --git a/src/tree/updater_gpu_hist.cu b/src/tree/updater_gpu_hist.cu
index e93ca9a..18f0908 100644
--- a/src/tree/updater_gpu_hist.cu
+++ b/src/tree/updater_gpu_hist.cu
@@ -164,8 +164,10 @@ struct ELLPackMatrix {
}
void Init(common::Span<uint32_t> feature_segments,
common::Span<bst_float> min_fvalue,
- common::Span<bst_float> gidx_fvalue_map, size_t row_stride,
- common::CompressedIterator<uint32_t> gidx_iter, bool is_dense,
+ common::Span<bst_float> gidx_fvalue_map,
+ size_t row_stride,
+ common::CompressedIterator<uint32_t> gidx_iter,
+ bool is_dense,
int null_gidx_value) {
this->feature_segments = feature_segments;
this->min_fvalue = min_fvalue;
@@ -656,6 +658,7 @@ struct DeviceShard {
bst_uint row_begin_idx;
bst_uint row_end_idx;
bst_uint n_rows;
+ bst_uint rows_processed;
TrainParam param;
bool prediction_cache_initialised;
@@ -674,36 +677,17 @@ struct DeviceShard {
row_begin_idx(row_begin),
row_end_idx(row_end),
n_rows(row_end - row_begin),
+ rows_processed(0),
n_bins(0),
param(std::move(_param)),
prediction_cache_initialised(false) {}
- /* Init row_ptrs and row_stride */
- size_t InitRowPtrs(const SparsePage& row_batch) {
- const auto& offset_vec = row_batch.offset.HostVector();
- row_ptrs.resize(n_rows + 1);
- thrust::copy(offset_vec.data() + row_begin_idx,
- offset_vec.data() + row_end_idx + 1,
- row_ptrs.begin());
- auto row_iter = row_ptrs.begin();
- // find the maximum row size for converting to ELLPack
- auto get_size = [=] __device__(size_t row) {
- return row_iter[row + 1] - row_iter[row];
- }; // NOLINT
-
- auto counting = thrust::make_counting_iterator(size_t(0));
- using TransformT = thrust::transform_iterator<decltype(get_size),
- decltype(counting), size_t>;
- TransformT row_size_iter = TransformT(counting, get_size);
- size_t row_stride = thrust::reduce(row_size_iter, row_size_iter + n_rows, 0,
- thrust::maximum<size_t>());
- return row_stride;
- }
-
void InitCompressedData(
- const common::HistCutMatrix& hmat, const SparsePage& row_batch, bool is_dense);
+ const common::HistCutMatrix& hmat, size_t row_stride, bool is_dense);
- void CreateHistIndices(const SparsePage& row_batch, size_t row_stride, int null_gidx_value);
+ void CreateHistIndices(
+ const SparsePage& row_batch, size_t shard_idx,
+ const std::vector<size_t> &shard_allocations, int null_gidx_value);
~DeviceShard() {
dh::safe_cuda(cudaSetDevice(device_id));
@@ -1045,11 +1029,13 @@ struct GlobalMemHistBuilder : public GPUHistBuilderBase<GradientSumT> {
template <typename GradientSumT>
inline void DeviceShard<GradientSumT>::InitCompressedData(
- const common::HistCutMatrix& hmat, const SparsePage& row_batch, bool is_dense) {
- size_t row_stride = this->InitRowPtrs(row_batch);
+ const common::HistCutMatrix& hmat, size_t row_stride, bool is_dense) {
n_bins = hmat.row_ptr.back();
int null_gidx_value = hmat.row_ptr.back();
+ CHECK(!(param.max_leaves == 0 && param.max_depth == 0))
+ << "Max leaves and max depth cannot both be unconstrained for "
+ "gpu_hist.";
int max_nodes =
param.max_leaves > 0 ? param.max_leaves * 2 : MaxNodesDepth(param.max_depth);
@@ -1072,24 +1058,17 @@ inline void DeviceShard<GradientSumT>::InitCompressedData(
node_sum_gradients.resize(max_nodes);
ridx_segments.resize(max_nodes);
-
// allocate compressed bin data
int num_symbols = n_bins + 1;
// Required buffer size for storing data matrix in ELLPack format.
size_t compressed_size_bytes =
common::CompressedBufferWriter::CalculateBufferSize(row_stride * n_rows,
num_symbols);
-
- CHECK(!(param.max_leaves == 0 && param.max_depth == 0))
- << "Max leaves and max depth cannot both be unconstrained for "
- "gpu_hist.";
ba.Allocate(device_id, &gidx_buffer, compressed_size_bytes);
thrust::fill(
thrust::device_pointer_cast(gidx_buffer.data()),
thrust::device_pointer_cast(gidx_buffer.data() + gidx_buffer.size()), 0);
- this->CreateHistIndices(row_batch, row_stride, null_gidx_value);
-
ellpack_matrix.Init(
feature_segments, min_fvalue,
gidx_fvalue_map, row_stride,
@@ -1113,23 +1092,41 @@ inline void DeviceShard<GradientSumT>::InitCompressedData(
template <typename GradientSumT>
inline void DeviceShard<GradientSumT>::CreateHistIndices(
- const SparsePage& row_batch, size_t row_stride, int null_gidx_value) {
+ const SparsePage& row_batch,
+ size_t shard_idx,
+ const std::vector<size_t> &shard_allocations,
+ int null_gidx_value) {
+ // Has any been allocated for me in this batch?
+ size_t num_elems = shard_allocations[shard_idx];
+ if (!num_elems) return;
+
+ size_t row_stride = this->ellpack_matrix.row_stride;
+
+ // Take into account the rows that could be processed by other GPUs
+ size_t begin_idx(0);
+ for (size_t i = 0; i < shard_idx; ++i) begin_idx += shard_allocations[i];
+
+ const auto& offset_vec = row_batch.offset.HostVector();
+ row_ptrs.resize(num_elems+1);
+ thrust::copy(offset_vec.data() + begin_idx,
+ offset_vec.data() + begin_idx + num_elems + 1,
+ row_ptrs.begin());
+
int num_symbols = n_bins + 1;
// bin and compress entries in batches of rows
size_t gpu_batch_nrows =
std::min
(dh::TotalMemory(device_id) / (16 * row_stride * sizeof(Entry)),
- static_cast<size_t>(n_rows));
+ static_cast<size_t>(num_elems));
const std::vector<Entry>& data_vec = row_batch.data.HostVector();
thrust::device_vector<Entry> entries_d(gpu_batch_nrows * row_stride);
- size_t gpu_nbatches = dh::DivRoundUp(n_rows, gpu_batch_nrows);
-
+ size_t gpu_nbatches = dh::DivRoundUp(num_elems, gpu_batch_nrows);
for (size_t gpu_batch = 0; gpu_batch < gpu_nbatches; ++gpu_batch) {
size_t batch_row_begin = gpu_batch * gpu_batch_nrows;
size_t batch_row_end = (gpu_batch + 1) * gpu_batch_nrows;
- if (batch_row_end > n_rows) {
- batch_row_end = n_rows;
+ if (batch_row_end > num_elems) {
+ batch_row_end = num_elems;
}
size_t batch_nrows = batch_row_end - batch_row_begin;
// number of entries in this batch.
@@ -1140,19 +1137,25 @@ inline void DeviceShard<GradientSumT>::CreateHistIndices(
(entries_d.data().get(), data_vec.data() + row_ptrs[batch_row_begin],
n_entries * sizeof(Entry), cudaMemcpyDefault));
const dim3 block3(32, 8, 1); // 256 threads
- const dim3 grid3(dh::DivRoundUp(n_rows, block3.x),
+ const dim3 grid3(dh::DivRoundUp(num_elems, block3.x),
dh::DivRoundUp(row_stride, block3.y), 1);
CompressBinEllpackKernel<<<grid3, block3>>>
(common::CompressedBufferWriter(num_symbols),
gidx_buffer.data(),
row_ptrs.data().get() + batch_row_begin,
entries_d.data().get(),
- gidx_fvalue_map.data(), feature_segments.data(),
- batch_row_begin, batch_nrows,
+ gidx_fvalue_map.data(),
+ feature_segments.data(),
+ rows_processed + batch_row_begin,
+ batch_nrows,
row_ptrs[batch_row_begin],
- row_stride, null_gidx_value);
+ row_stride,
+ null_gidx_value);
}
+ // This will be the offset into the gidx_buffer for the next batch
+ rows_processed += num_elems;
+
// free the memory that is no longer needed
row_ptrs.resize(0);
row_ptrs.shrink_to_fit();
@@ -1216,8 +1219,6 @@ class GPUHistMakerSpecialised{
reducer_.Init(device_list_);
- auto batch_iter = dmat->GetRowBatches().begin();
- const SparsePage& batch = *batch_iter;
// Create device shards
shards_.resize(n_devices);
dh::ExecuteIndexShards(
@@ -1231,23 +1232,96 @@ class GPUHistMakerSpecialised{
start + size, param_));
});
- // Find the cuts.
+ // Initialize Sketches across all batches
+ std::vector<common::HistCutMatrix::WXQSketch> sketches(info_->num_col_);
+#pragma omp parallel for schedule(static)
+ for (int icol = 0; icol < info_->num_col_; ++icol) {
+ sketches[icol].Init(info_->num_row_, 1.0 / (8 * param_.max_bin));
+ }
+
+ size_t row_stride(0); // Max row_stride across the entire dataset
+ size_t processed_size(0);
+
+ // Rows allocated to each GPU in a batch
+ std::vector<std::vector<size_t>> shard_allocations;
monitor_.StartCuda("Quantiles");
- common::DeviceSketch(batch, *info_, param_, &hmat_, hist_maker_param_.gpu_batch_nrows);
- n_bins_ = hmat_.row_ptr.back();
+ for (const auto &batch : dmat->GetRowBatches()) {
+ struct RowReset {
+ public:
+ RowReset(MetaInfo &minfo, const SparsePage &r_batch)
+ : minfo_(minfo) {
+ orig_nrows_ = minfo_.num_row_;
+ minfo.num_row_ = r_batch.Size();
+ }
+
+ ~RowReset() {
+ minfo_.num_row_ = orig_nrows_;
+ }
+ private:
+ MetaInfo &minfo_;
+ size_t orig_nrows_;
+ }rowgd(*info_, batch);
+
+ // Find the cuts for each batch
+ common::DeviceSketch(batch, *info_, param_, sketches, hist_maker_param_.gpu_batch_nrows);
+
+ const auto &offset_vec = batch.offset.HostVector();
+ for (size_t i = 1; i < offset_vec.size(); ++i) {
+ row_stride = std::max(row_stride, offset_vec[i] - offset_vec[i-1]);
+ }
+
+ std::vector<size_t> batch_allocation(shards_.size());
+ // Distribute the rows in this batch to the different shards
+ for (size_t i = 0; i < shards_.size(); ++i) {
+ // Does this batch pertain to me?
+ ssize_t num_elems = processed_size + batch.Size() - shards_[i]->row_begin_idx;
+ ssize_t rem_elems = shards_[i]->row_end_idx - processed_size;
+ if (num_elems <= 0 || rem_elems <= 0) batch_allocation[i] = 0;
+ // How many elements do I process from this batch?
+ else {
+ num_elems = std::min(
+ std::min(
+ std::min(num_elems, rem_elems),
+ static_cast<ssize_t>(shards_[i]->n_rows)),
+ static_cast<ssize_t>(batch.Size()));
+ batch_allocation[i] = num_elems;
+ }
+ }
+ shard_allocations.push_back(batch_allocation);
+
+ processed_size += batch.Size();
+ }
monitor_.StopCuda("Quantiles");
+
+ // Initialize hmat_ for the entire data set
+ hmat_.Init(&sketches, param_.max_bin);
+
+ n_bins_ = hmat_.row_ptr.back();
+
auto is_dense = info_->num_nonzero_ == info_->num_row_ * info_->num_col_;
- monitor_.StartCuda("BinningCompression");
+ // Init global data for each shard
+ monitor_.StartCuda("InitCompressedData");
dh::ExecuteIndexShards(
&shards_,
- [&](int idx, std::unique_ptr<DeviceShard<GradientSumT>>& shard) {
- dh::safe_cuda(cudaSetDevice(shard->device_id));
- shard->InitCompressedData(hmat_, batch, is_dense);
+ [&](int i, std::unique_ptr<DeviceShard<GradientSumT>>& shard) {
+ dh::safe_cuda(cudaSetDevice(dist_.Devices().DeviceId(i)));
+ shard->InitCompressedData(hmat_, row_stride, is_dense);
});
+ monitor_.StopCuda("InitCompressedData");
+
+ monitor_.StartCuda("BinningCompression");
+ size_t batch_idx(0);
+ for (const auto &batch : dmat->GetRowBatches()) {
+ dh::ExecuteIndexShards(
+ &shards_,
+ [&](int idx, std::unique_ptr<DeviceShard<GradientSumT>>& shard) {
+ dh::safe_cuda(cudaSetDevice(shard->device_id));
+ shard->CreateHistIndices(batch, idx, shard_allocations[batch_idx], hmat_.row_ptr.back());
+ });
+ batch_idx++;
+ }
monitor_.StopCuda("BinningCompression");
- ++batch_iter;
- CHECK(batch_iter.AtEnd()) << "External memory not supported";
p_last_fmat_ = dmat;
initialised_ = true;
diff --git a/tests/cpp/common/test_gpu_hist_util.cu b/tests/cpp/common/test_gpu_hist_util.cu
index 7c4fbd7..6713d6f 100644
--- a/tests/cpp/common/test_gpu_hist_util.cu
+++ b/tests/cpp/common/test_gpu_hist_util.cu
@@ -39,7 +39,13 @@ void TestDeviceSketch(const GPUSet& devices) {
// find the cuts on the GPU
const SparsePage& batch = *(*dmat)->GetRowBatches().begin();
HistCutMatrix hmat_gpu;
- DeviceSketch(batch, (*dmat)->Info(), p, &hmat_gpu, gpu_batch_nrows);
+ std::vector<common::HistCutMatrix::WXQSketch> sketches((*dmat)->Info().num_col_);
+ for (int icol = 0; icol < (*dmat)->Info().num_col_; ++icol) {
+ sketches[icol].Init((*dmat)->Info().num_row_, 1.0 / (8 * p.max_bin));
+ }
+
+ DeviceSketch(batch, (*dmat)->Info(), p, sketches, gpu_batch_nrows);
+ hmat_gpu.Init(&sketches, p.max_bin);
// compare the cuts
double eps = 1e-2;
diff --git a/tests/cpp/tree/test_gpu_hist.cu b/tests/cpp/tree/test_gpu_hist.cu
index 37cce8b..5804786 100644
--- a/tests/cpp/tree/test_gpu_hist.cu
+++ b/tests/cpp/tree/test_gpu_hist.cu
@@ -77,7 +77,13 @@ void BuildGidx(DeviceShard<GradientSumT>* shard, int n_rows, int n_cols,
auto is_dense = (*dmat)->Info().num_nonzero_ ==
(*dmat)->Info().num_row_ * (*dmat)->Info().num_col_;
- shard->InitCompressedData(cmat, batch, is_dense);
+ size_t row_stride = 0;
+ const auto &offset_vec = batch.offset.HostVector();
+ for (size_t i = 1; i < offset_vec.size(); ++i) {
+ row_stride = std::max(row_stride, offset_vec[i] - offset_vec[i-1]);
+ }
+ shard->InitCompressedData(cmat, row_stride, is_dense);
+ shard->CreateHistIndices(batch, 0, std::vector<size_t>(1, batch.Size()), cmat.row_ptr.back());
delete dmat;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment