Created
July 20, 2016 17:32
-
-
Save JonathanRaiman/7e86d03243589fc8bc39577eb1e87161 to your computer and use it in GitHub Desktop.
Explicit and implicit BLAS gemm parallelism.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /* | |
| Comparing explicit BLAS parallelism using a thread pool | |
| with vendor-implemented parallelism. | |
| Program prints the runtime averaged over 100 runs of a matrix | |
| multiply between two float matrices. | |
| To run: | |
| ./gemm_parallel [<int> USE_EXPLICIT_PARALELLISM 0/1] [<int> LEADING_DIMENSION] | |
| Compile with: | |
| On Apple devices using Accelerate | |
| c++ gemm_parallel.cpp -o gemm_parallel -std=c++11 -lblas -I /System/Library/Frameworks/Accelerate.framework/Versions/Current/Frameworks/vecLib.framework/Versions/Current/Headers/ | |
| Other platforms: | |
| c++ gemm_parallel.cpp -o gemm_parallel -std=c++11 -lblas -I /location/of/cblas.h | |
| */ | |
| #include <cblas.h> | |
| #include <initializer_list> | |
| #include <vector> | |
| #include <stdlib.h> | |
| #include <stdexcept> | |
| #include <memory> | |
| #include <iostream> | |
| #include <iomanip> | |
| #include <mutex> | |
| #include <chrono> | |
| #include <string> | |
| #include <unordered_map> | |
| #include <stdexcept> | |
| #include <atomic> | |
| inline int division_ceil(int a, int b) { | |
| return (a + b - 1) / b; | |
| } | |
| class spinlock { | |
| std::atomic_flag lock_; | |
| public: | |
| spinlock() { | |
| lock_.clear(); | |
| } | |
| inline void lock() { | |
| while (lock_.test_and_set(std::memory_order_acquire)); | |
| } | |
| inline void unlock() { | |
| lock_.clear(std::memory_order_release); | |
| } | |
| inline bool try_lock() { | |
| return !lock_.test_and_set(std::memory_order_acquire); | |
| } | |
| } __attribute__((aligned(64))); | |
| namespace utils { | |
| class Timer { | |
| typedef std::chrono::system_clock clock_t; | |
| public: | |
| static std::unordered_map<std::string, std::atomic<int>> timers; | |
| private: | |
| static std::mutex timers_mutex; | |
| std::string name; | |
| bool stopped; | |
| bool started; | |
| std::chrono::time_point<clock_t> start_time; | |
| public: | |
| // creates timer and starts measuring time. | |
| Timer(std::string name, bool autostart=true); | |
| // destroys timer and stops counting if the timer was not previously stopped. | |
| ~Timer(); | |
| // explicitly start the timer | |
| void start(); | |
| // explicitly stop the timer | |
| void stop(); | |
| static void report(); | |
| }; | |
| std::unordered_map<std::string, std::atomic<int>> Timer::timers; | |
| std::mutex Timer::timers_mutex; | |
| Timer::Timer(std::string name, bool autostart) : name(name), | |
| stopped(false), | |
| started(false) { | |
| if (timers.find(name) == timers.end()) { | |
| std::lock_guard<decltype(timers_mutex)> guard(timers_mutex); | |
| if (timers.find(name) == timers.end()) | |
| timers[name] = 0; | |
| } | |
| if (autostart) | |
| start(); | |
| } | |
| void Timer::start() { | |
| start_time = clock_t::now(); | |
| started = true; | |
| } | |
| void Timer::stop() { | |
| timers[name] += std::chrono::duration_cast< std::chrono::milliseconds > | |
| (clock_t::now() - start_time).count(); | |
| stopped = true; | |
| } | |
| Timer::~Timer() { | |
| if(!stopped) | |
| stop(); | |
| } | |
| void Timer::report() { | |
| std::lock_guard<decltype(timers_mutex)> guard(timers_mutex); | |
| for (auto& kv : timers) { | |
| std::cout << "\"" << kv.first << "\" => " | |
| << std::fixed << std::setw(5) << std::setprecision(4) << std::setfill(' ') | |
| << (double) kv.second / 1000 << "s" << std::endl; | |
| } | |
| timers.clear(); | |
| } | |
| } | |
| /*** THREAD POOL ***/ | |
| #include <cassert> | |
| #include <chrono> | |
| #include <condition_variable> | |
| #include <deque> | |
| #include <functional> | |
| #include <iostream> | |
| #include <mutex> | |
| #include <thread> | |
| #include <vector> | |
| class ThreadPool { | |
| public: | |
| typedef spinlock locktype; | |
| private: | |
| typedef std::chrono::duration<double> Duration; | |
| static __thread bool in_thread_pool; | |
| // c++ assigns random id to each thread. This is not a thread_id | |
| // it's a number inside this thread pool. | |
| static __thread int thread_number; | |
| static locktype printing_lock; | |
| bool should_terminate; | |
| locktype queue_mutex; | |
| int active_count; | |
| std::deque<std::function<void()> > work; | |
| std::vector<std::thread> pool; | |
| Duration between_queue_checks; | |
| void thread_body(int _thread_id); | |
| public: | |
| // Creates a thread pool composed of num_threads threads. | |
| // threads are started immediately and exit only once ThreadPool | |
| // goes out of scope. Threads periodically check for new work | |
| // and the frequency of those checks is at minimum between_queue_checks | |
| // (it can be higher due to thread scheduling). | |
| ThreadPool(int num_threads, Duration between_queue_checks=std::chrono::milliseconds(1)); | |
| // Run a function on a thread in pool. | |
| void run(std::function<void()> f); | |
| bool wait_until_idle(); | |
| // Retruns true if all the work is done. | |
| bool idle() const; | |
| // Return number of active busy workers. | |
| int active_workers(); | |
| // Can be called from within a thread to get a thread number. | |
| // the number is unique for each thread in the thread pool and | |
| // is between 0 and num_threads-1. If called outside thread_pool returns -1. | |
| static int get_thread_number(); | |
| static void print_safely(std::string message); | |
| size_t size() const; | |
| ~ThreadPool(); | |
| }; | |
| #include <string> | |
| using std::chrono::milliseconds; | |
| using std::function; | |
| using std::vector; | |
| using std::thread; | |
| using Duration = std::chrono::duration<double>; | |
| __thread bool ThreadPool::in_thread_pool = false; | |
| __thread int ThreadPool::thread_number = -1; | |
| ThreadPool::locktype ThreadPool::printing_lock; | |
| ThreadPool::ThreadPool(int num_threads, Duration between_queue_checks) : | |
| between_queue_checks(between_queue_checks), | |
| should_terminate(false), | |
| active_count(0) { | |
| // Thread pool inception is not supported at this time. | |
| assert(!in_thread_pool); | |
| ThreadPool::between_queue_checks = between_queue_checks; | |
| for (int thread_number = 0; thread_number < num_threads; ++thread_number) { | |
| pool.emplace_back(&ThreadPool::thread_body, this, thread_number); | |
| } | |
| } | |
| size_t ThreadPool::size() const { | |
| return pool.size(); | |
| } | |
| void ThreadPool::thread_body(int _thread_id) { | |
| in_thread_pool = true; | |
| thread_number = _thread_id; | |
| bool am_i_active = false; | |
| while (true) { | |
| function<void()> f; | |
| { | |
| std::lock_guard<locktype> lock(queue_mutex); | |
| bool was_i_active = am_i_active; | |
| if (should_terminate && work.empty()) | |
| break; | |
| if (!work.empty()) { | |
| am_i_active = true; | |
| f = work.front(); | |
| work.pop_front(); | |
| } else { | |
| am_i_active = false; | |
| } | |
| if (am_i_active != was_i_active) { | |
| active_count += am_i_active ? 1 : -1; | |
| } | |
| } | |
| // Function defines implicit conversion to bool | |
| // which is true only if call target was set. | |
| if (static_cast<bool>(f)) { | |
| f(); | |
| } else { | |
| std::this_thread::sleep_for(between_queue_checks); | |
| } | |
| std::this_thread::yield(); | |
| } | |
| } | |
| int ThreadPool::active_workers() { | |
| std::lock_guard<decltype(queue_mutex)> lock(queue_mutex); | |
| return active_count; | |
| } | |
| bool ThreadPool::idle() const { | |
| return active_count == 0 && work.empty(); | |
| } | |
| void ThreadPool::run(function<void()> f) { | |
| int retries = 3; | |
| while (retries--) { | |
| try { | |
| std::unique_lock<decltype(queue_mutex)> lock(queue_mutex); | |
| work.push_back(f); | |
| return; | |
| } catch (...) {} | |
| } | |
| throw std::runtime_error( | |
| "exceeded retries when trying to run operation on thread pool." | |
| ); | |
| } | |
| ThreadPool::~ThreadPool() { | |
| // Terminates thread pool making sure that all the work | |
| // is completed. | |
| should_terminate = true; | |
| for (auto& t : pool) | |
| t.join(); | |
| } | |
| int ThreadPool::get_thread_number() { | |
| return thread_number; | |
| } | |
| void ThreadPool::print_safely(std::string message) { | |
| std::lock_guard<decltype(printing_lock)> guard(printing_lock); | |
| std::cout << "[thread " << get_thread_number() << "] " | |
| << message << std::endl; | |
| } | |
| /**** END THREAD POOL ***/ | |
| #define MAX_ARRAY_DIMS 9 | |
| #define INDENT_INCREMENT 2 | |
| struct ArrayData { | |
| void * ptr_; | |
| ArrayData(int size) : ptr_(malloc(size)) {} | |
| ~ArrayData() { | |
| free(ptr_); | |
| } | |
| }; | |
| struct Arange { | |
| double start_; | |
| double step_; | |
| Arange(double start, double step) : start_(start), step_(step) {} | |
| Arange() : start_(0.0), step_(1.0) {} | |
| template<typename Container> | |
| void operator()(Container& arr) const { | |
| auto arr_1d = arr.ravel(); | |
| for (int i = 0; i < arr_1d.shape_[0]; i++) { | |
| arr_1d.data()[i] = start_ + i * step_; | |
| } | |
| } | |
| }; | |
| int hypercube_volume(const int* shape, const int& ndim) { | |
| int volume = 1; | |
| for (int i = 0; i < ndim; i++) { | |
| volume *= shape[i]; | |
| } | |
| return volume; | |
| } | |
| template<typename T> | |
| struct Array { | |
| int shape_[MAX_ARRAY_DIMS]; | |
| int strides_[MAX_ARRAY_DIMS]; | |
| int ndim_; | |
| int offset_; | |
| std::shared_ptr<ArrayData> data_; | |
| typedef T value_type; | |
| Array(std::vector<int> dims, int offset, std::shared_ptr<ArrayData> data) : ndim_(dims.size()), offset_(offset), data_(data) { | |
| int volume = 1; | |
| if (dims.size() > MAX_ARRAY_DIMS) { | |
| throw std::runtime_error("too many dimensions"); | |
| } | |
| for (int i = 0; i < dims.size();i++) { | |
| volume *= dims[i]; | |
| shape_[i] = dims[i]; | |
| } | |
| for (int i = 0; i < ndim_; i++) { | |
| volume /= shape_[i]; | |
| strides_[i] = volume; | |
| } | |
| } | |
| Array(std::vector<int> dims) : ndim_(dims.size()), offset_(0) { | |
| int volume = 1; | |
| if (dims.size() > MAX_ARRAY_DIMS) { | |
| throw std::runtime_error("too many dimensions"); | |
| } | |
| for (int i = 0; i < dims.size();i++) { | |
| volume *= dims[i]; | |
| shape_[i] = dims[i]; | |
| } | |
| data_ = std::make_shared<ArrayData>(sizeof(T) * volume); | |
| for (int i = 0; i < ndim_; i++) { | |
| volume /= shape_[i]; | |
| strides_[i] = volume; | |
| } | |
| } | |
| Array(std::shared_ptr<ArrayData> data, int offset, int ndim, const int* shape, const int* strides) | |
| : ndim_(ndim), offset_(offset), data_(data) { | |
| for (int i = 0; i < ndim_; i++) { | |
| strides_[i] = strides[i]; | |
| shape_[i] = shape[i]; | |
| } | |
| } | |
| bool is_transpose() const { | |
| return ndim_ == 2 && strides_[0] == 1; | |
| } | |
| Array<T> subtensor(int idx) const { | |
| Array<T> out( | |
| data_, | |
| offset_ + idx * strides_[0], | |
| ndim_ - 1, | |
| shape_ + 1, | |
| strides_ + 1 | |
| ); | |
| return out; | |
| } | |
| inline T& operator[](const int& index) { | |
| return data()[index]; | |
| } | |
| inline const T& operator[](const int& index) const { | |
| return data()[index]; | |
| } | |
| Array<T>& operator=(const Arange& a) { | |
| a(*this); | |
| return *this; | |
| } | |
| Array<T> reshape(std::vector<int> new_shape) const { | |
| int vol = numel(); | |
| int new_vol = hypercube_volume(new_shape.data(), new_shape.size()); | |
| if (vol != new_vol) { | |
| throw std::runtime_error("reshape must have same number of elements."); | |
| } | |
| return Array<T>( | |
| new_shape, | |
| offset_, | |
| data_ | |
| ); | |
| } | |
| int numel() const { | |
| return hypercube_volume(shape_, ndim_); | |
| } | |
| Array<T> ravel() const { | |
| return Array<T>( | |
| {numel()}, | |
| offset_, | |
| data_ | |
| ); | |
| } | |
| void print() const { | |
| return print(0, false); | |
| } | |
| int ndim() const { | |
| return ndim_; | |
| } | |
| void print(int indent, bool print_comma) const { | |
| if (this->ndim() == 1) { | |
| std::cout << std::string(indent, ' ') << "["; | |
| for (int i = 0; i < this->shape_[0]; i++) { | |
| std::cout << (*this)[i]; | |
| if (i != this->shape_[0] - 1) { | |
| std::cout << " "; | |
| } | |
| } | |
| if (print_comma) { | |
| std::cout << "],\n"; | |
| } else { | |
| std::cout << "]\n"; | |
| } | |
| } else if (this->ndim() > 1) { | |
| std::cout << std::string(indent, ' ') << "[\n"; | |
| for (int i = 0; i < this->shape_[0]; i++) { | |
| auto child = subtensor(i); | |
| child.print(indent + INDENT_INCREMENT, i != this->shape_[0] - 1); | |
| } | |
| if (print_comma) { | |
| std::cout << std::string(indent, ' ') << "],\n"; | |
| } else { | |
| std::cout << std::string(indent, ' ') << "]\n"; | |
| } | |
| } else { | |
| std::cout << std::string(indent, ' ') << "()\n" << std::endl; | |
| } | |
| } | |
| inline const T* data() const { | |
| return (T*)data_->ptr_ + offset_; | |
| } | |
| inline T* data() { | |
| return (T*)data_->ptr_ + offset_; | |
| } | |
| }; | |
| template<typename T1, typename T2> | |
| bool equals(const Array<T1>& left, const Array<T2>& right, double atol=1e-6) { | |
| return false; | |
| } | |
| template<typename T> | |
| bool equals(const Array<T>& left, const Array<T>& right, double atol=1e-6) { | |
| if (left.numel() != right.numel()) { | |
| return false; | |
| } | |
| if (left.ndim_ != right.ndim_) { | |
| return false; | |
| } | |
| for (int i = 0; i < left.ndim_; i++) { | |
| if (left.shape_[i] != right.shape_[i]) { | |
| return false; | |
| } | |
| } | |
| auto raveled_left = left.ravel(); | |
| auto raveled_right = right.ravel(); | |
| for (int i = 0; i < left.numel(); i++) { | |
| if (std::abs(raveled_left.data()[i] - raveled_right.data()[i]) >= atol) { | |
| return false; | |
| } | |
| } | |
| return true; | |
| } | |
| template<typename T> | |
| struct GemmEngine { | |
| }; | |
| struct GemmEngineMulticore { | |
| static std::unique_ptr<ThreadPool> pool_; | |
| template<typename T> | |
| void operator()(bool transa, bool transb, | |
| int m, int n, int k, T alpha, | |
| const T *A, int lda, const T *B, int ldb, | |
| T beta, T *C, int ldc) { | |
| int num_chunks = pool_->size(); | |
| int chunk_size = division_ceil(n, num_chunks); | |
| std::atomic<int> count(0); | |
| for (int i = 0; i < num_chunks; ++i) { | |
| int range = std::min((i + 1) * chunk_size, n) - i * chunk_size; | |
| pool_->run([=, &count](){ | |
| if (range > 0) { | |
| GemmEngine<T>()( | |
| transa, | |
| transb, | |
| m, | |
| range, | |
| k, | |
| alpha, | |
| A, | |
| lda, | |
| B + i * chunk_size * ldb, | |
| ldb, | |
| beta, | |
| C + i * chunk_size * ldc, | |
| ldc | |
| ); | |
| } | |
| ++count; | |
| }); | |
| } | |
| while (count < num_chunks) {} | |
| } | |
| }; | |
| template<> | |
| struct GemmEngine<float> { | |
| void operator()(bool transa, bool transb, | |
| int m, int n, int k, float alpha, | |
| const float *A, int lda, const float *B, int ldb, | |
| float beta, float *C, int ldc) { | |
| cblas_sgemm( | |
| CblasColMajor, | |
| transa ? CblasTrans : CblasNoTrans, | |
| transb ? CblasTrans : CblasNoTrans, | |
| m, | |
| n, | |
| k, | |
| alpha, | |
| A, | |
| lda, | |
| B, | |
| ldb, | |
| beta, | |
| C, | |
| ldc | |
| ); | |
| } | |
| }; | |
| template<> | |
| struct GemmEngine<double> { | |
| void operator()(bool transa, bool transb, | |
| int m, int n, int k, double alpha, | |
| const double *A, int lda, const double *B, int ldb, | |
| double beta, double *C, int ldc) { | |
| cblas_dgemm( | |
| CblasColMajor, | |
| transa ? CblasTrans : CblasNoTrans, | |
| transb ? CblasTrans : CblasNoTrans, | |
| m, | |
| n, | |
| k, | |
| alpha, | |
| A, | |
| lda, | |
| B, | |
| ldb, | |
| beta, | |
| C, | |
| ldc | |
| ); | |
| } | |
| }; | |
| template<typename LeftT, typename RightT> | |
| Array<typename LeftT::value_type> dot(const LeftT& left, const RightT& right, bool parallel=false) { | |
| static_assert(std::is_same<typename LeftT::value_type, typename RightT::value_type>::value, "dot product on different dtypes not allowed."); | |
| if (left.ndim_ != 2 || right.ndim_ != 2) { | |
| throw std::runtime_error("inputs must both be 2D"); | |
| } | |
| if (left.shape_[1] != right.shape_[0]) { | |
| throw std::runtime_error("incompatible blas dims"); | |
| } | |
| Array<float> out({left.shape_[0], right.shape_[1]}); | |
| bool lhs_trans = left.is_transpose(); | |
| int lhs_stride = lhs_trans ? left.strides_[1] : left.strides_[0]; | |
| bool rhs_trans = right.is_transpose(); | |
| int rhs_stride = rhs_trans ? right.strides_[1] : right.strides_[0]; | |
| if (parallel) { | |
| GemmEngineMulticore()( | |
| rhs_trans, | |
| lhs_trans, | |
| right.shape_[1], | |
| left.shape_[0], | |
| right.shape_[0], | |
| typename LeftT::value_type(1.0), | |
| right.data(), | |
| rhs_stride, | |
| left.data(), | |
| lhs_stride, | |
| typename LeftT::value_type(0.0), | |
| out.data(), | |
| out.strides_[0] | |
| ); | |
| } else { | |
| GemmEngine<typename LeftT::value_type>()( | |
| rhs_trans, | |
| lhs_trans, | |
| right.shape_[1], | |
| left.shape_[0], | |
| right.shape_[0], | |
| 1.0, | |
| right.data(), | |
| rhs_stride, | |
| left.data(), | |
| lhs_stride, | |
| 0.0, | |
| out.data(), | |
| out.strides_[0] | |
| ); | |
| } | |
| return out; | |
| } | |
| std::unique_ptr<ThreadPool> GemmEngineMulticore::pool_(new ThreadPool(1)); | |
| int main(int argc, char** argv) { | |
| int num_threads = std::thread::hardware_concurrency(); | |
| GemmEngineMulticore::pool_ = std::unique_ptr<ThreadPool>(new ThreadPool(num_threads, std::chrono::nanoseconds(5))); | |
| bool parallel_execution = argc > 1 ? std::atoi(argv[1]) : false; | |
| std::cout << "parallel_execution = " << (parallel_execution ? "true" : "false") << std::endl; | |
| int leading_dimension = argc > 2 ? std::atoi(argv[2]) : 2000; | |
| std::cout << "leading_dimension = " << leading_dimension << std::endl; | |
| Array<float> a({leading_dimension, 200}); | |
| a = Arange(); | |
| // a.print(); | |
| Array<float> b({200, 200}); | |
| b = Arange(); | |
| // b.print(); | |
| while (true) { | |
| { | |
| utils::Timer t1("dot"); | |
| for (int i = 0; i < 100; i++) { | |
| auto c = dot(a, b, true); | |
| } | |
| } | |
| std::cout << ((double) utils::Timer::timers["dot"] / (100000.0)) << "s" << std::endl; | |
| utils::Timer::timers.clear(); | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment