Skip to content

Instantly share code, notes, and snippets.

@JonathanRaiman
Created July 20, 2016 17:32
Show Gist options
  • Select an option

  • Save JonathanRaiman/7e86d03243589fc8bc39577eb1e87161 to your computer and use it in GitHub Desktop.

Select an option

Save JonathanRaiman/7e86d03243589fc8bc39577eb1e87161 to your computer and use it in GitHub Desktop.
Explicit and implicit BLAS gemm parallelism.
/*
Comparing explicit BLAS parallelism using a thread pool
with vendor-implemented parallelism.
Program prints the runtime averaged over 100 runs of a matrix
multiply between two float matrices.
To run:
./gemm_parallel [<int> USE_EXPLICIT_PARALELLISM 0/1] [<int> LEADING_DIMENSION]
Compile with:
On Apple devices using Accelerate
c++ gemm_parallel.cpp -o gemm_parallel -std=c++11 -lblas -I /System/Library/Frameworks/Accelerate.framework/Versions/Current/Frameworks/vecLib.framework/Versions/Current/Headers/
Other platforms:
c++ gemm_parallel.cpp -o gemm_parallel -std=c++11 -lblas -I /location/of/cblas.h
*/
#include <cblas.h>
#include <initializer_list>
#include <vector>
#include <stdlib.h>
#include <stdexcept>
#include <memory>
#include <iostream>
#include <iomanip>
#include <mutex>
#include <chrono>
#include <string>
#include <unordered_map>
#include <stdexcept>
#include <atomic>
inline int division_ceil(int a, int b) {
return (a + b - 1) / b;
}
class spinlock {
std::atomic_flag lock_;
public:
spinlock() {
lock_.clear();
}
inline void lock() {
while (lock_.test_and_set(std::memory_order_acquire));
}
inline void unlock() {
lock_.clear(std::memory_order_release);
}
inline bool try_lock() {
return !lock_.test_and_set(std::memory_order_acquire);
}
} __attribute__((aligned(64)));
namespace utils {
class Timer {
typedef std::chrono::system_clock clock_t;
public:
static std::unordered_map<std::string, std::atomic<int>> timers;
private:
static std::mutex timers_mutex;
std::string name;
bool stopped;
bool started;
std::chrono::time_point<clock_t> start_time;
public:
// creates timer and starts measuring time.
Timer(std::string name, bool autostart=true);
// destroys timer and stops counting if the timer was not previously stopped.
~Timer();
// explicitly start the timer
void start();
// explicitly stop the timer
void stop();
static void report();
};
std::unordered_map<std::string, std::atomic<int>> Timer::timers;
std::mutex Timer::timers_mutex;
Timer::Timer(std::string name, bool autostart) : name(name),
stopped(false),
started(false) {
if (timers.find(name) == timers.end()) {
std::lock_guard<decltype(timers_mutex)> guard(timers_mutex);
if (timers.find(name) == timers.end())
timers[name] = 0;
}
if (autostart)
start();
}
void Timer::start() {
start_time = clock_t::now();
started = true;
}
void Timer::stop() {
timers[name] += std::chrono::duration_cast< std::chrono::milliseconds >
(clock_t::now() - start_time).count();
stopped = true;
}
Timer::~Timer() {
if(!stopped)
stop();
}
void Timer::report() {
std::lock_guard<decltype(timers_mutex)> guard(timers_mutex);
for (auto& kv : timers) {
std::cout << "\"" << kv.first << "\" => "
<< std::fixed << std::setw(5) << std::setprecision(4) << std::setfill(' ')
<< (double) kv.second / 1000 << "s" << std::endl;
}
timers.clear();
}
}
/*** THREAD POOL ***/
#include <cassert>
#include <chrono>
#include <condition_variable>
#include <deque>
#include <functional>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
class ThreadPool {
public:
typedef spinlock locktype;
private:
typedef std::chrono::duration<double> Duration;
static __thread bool in_thread_pool;
// c++ assigns random id to each thread. This is not a thread_id
// it's a number inside this thread pool.
static __thread int thread_number;
static locktype printing_lock;
bool should_terminate;
locktype queue_mutex;
int active_count;
std::deque<std::function<void()> > work;
std::vector<std::thread> pool;
Duration between_queue_checks;
void thread_body(int _thread_id);
public:
// Creates a thread pool composed of num_threads threads.
// threads are started immediately and exit only once ThreadPool
// goes out of scope. Threads periodically check for new work
// and the frequency of those checks is at minimum between_queue_checks
// (it can be higher due to thread scheduling).
ThreadPool(int num_threads, Duration between_queue_checks=std::chrono::milliseconds(1));
// Run a function on a thread in pool.
void run(std::function<void()> f);
bool wait_until_idle();
// Retruns true if all the work is done.
bool idle() const;
// Return number of active busy workers.
int active_workers();
// Can be called from within a thread to get a thread number.
// the number is unique for each thread in the thread pool and
// is between 0 and num_threads-1. If called outside thread_pool returns -1.
static int get_thread_number();
static void print_safely(std::string message);
size_t size() const;
~ThreadPool();
};
#include <string>
using std::chrono::milliseconds;
using std::function;
using std::vector;
using std::thread;
using Duration = std::chrono::duration<double>;
__thread bool ThreadPool::in_thread_pool = false;
__thread int ThreadPool::thread_number = -1;
ThreadPool::locktype ThreadPool::printing_lock;
ThreadPool::ThreadPool(int num_threads, Duration between_queue_checks) :
between_queue_checks(between_queue_checks),
should_terminate(false),
active_count(0) {
// Thread pool inception is not supported at this time.
assert(!in_thread_pool);
ThreadPool::between_queue_checks = between_queue_checks;
for (int thread_number = 0; thread_number < num_threads; ++thread_number) {
pool.emplace_back(&ThreadPool::thread_body, this, thread_number);
}
}
size_t ThreadPool::size() const {
return pool.size();
}
void ThreadPool::thread_body(int _thread_id) {
in_thread_pool = true;
thread_number = _thread_id;
bool am_i_active = false;
while (true) {
function<void()> f;
{
std::lock_guard<locktype> lock(queue_mutex);
bool was_i_active = am_i_active;
if (should_terminate && work.empty())
break;
if (!work.empty()) {
am_i_active = true;
f = work.front();
work.pop_front();
} else {
am_i_active = false;
}
if (am_i_active != was_i_active) {
active_count += am_i_active ? 1 : -1;
}
}
// Function defines implicit conversion to bool
// which is true only if call target was set.
if (static_cast<bool>(f)) {
f();
} else {
std::this_thread::sleep_for(between_queue_checks);
}
std::this_thread::yield();
}
}
int ThreadPool::active_workers() {
std::lock_guard<decltype(queue_mutex)> lock(queue_mutex);
return active_count;
}
bool ThreadPool::idle() const {
return active_count == 0 && work.empty();
}
void ThreadPool::run(function<void()> f) {
int retries = 3;
while (retries--) {
try {
std::unique_lock<decltype(queue_mutex)> lock(queue_mutex);
work.push_back(f);
return;
} catch (...) {}
}
throw std::runtime_error(
"exceeded retries when trying to run operation on thread pool."
);
}
ThreadPool::~ThreadPool() {
// Terminates thread pool making sure that all the work
// is completed.
should_terminate = true;
for (auto& t : pool)
t.join();
}
int ThreadPool::get_thread_number() {
return thread_number;
}
void ThreadPool::print_safely(std::string message) {
std::lock_guard<decltype(printing_lock)> guard(printing_lock);
std::cout << "[thread " << get_thread_number() << "] "
<< message << std::endl;
}
/**** END THREAD POOL ***/
#define MAX_ARRAY_DIMS 9
#define INDENT_INCREMENT 2
struct ArrayData {
void * ptr_;
ArrayData(int size) : ptr_(malloc(size)) {}
~ArrayData() {
free(ptr_);
}
};
struct Arange {
double start_;
double step_;
Arange(double start, double step) : start_(start), step_(step) {}
Arange() : start_(0.0), step_(1.0) {}
template<typename Container>
void operator()(Container& arr) const {
auto arr_1d = arr.ravel();
for (int i = 0; i < arr_1d.shape_[0]; i++) {
arr_1d.data()[i] = start_ + i * step_;
}
}
};
int hypercube_volume(const int* shape, const int& ndim) {
int volume = 1;
for (int i = 0; i < ndim; i++) {
volume *= shape[i];
}
return volume;
}
template<typename T>
struct Array {
int shape_[MAX_ARRAY_DIMS];
int strides_[MAX_ARRAY_DIMS];
int ndim_;
int offset_;
std::shared_ptr<ArrayData> data_;
typedef T value_type;
Array(std::vector<int> dims, int offset, std::shared_ptr<ArrayData> data) : ndim_(dims.size()), offset_(offset), data_(data) {
int volume = 1;
if (dims.size() > MAX_ARRAY_DIMS) {
throw std::runtime_error("too many dimensions");
}
for (int i = 0; i < dims.size();i++) {
volume *= dims[i];
shape_[i] = dims[i];
}
for (int i = 0; i < ndim_; i++) {
volume /= shape_[i];
strides_[i] = volume;
}
}
Array(std::vector<int> dims) : ndim_(dims.size()), offset_(0) {
int volume = 1;
if (dims.size() > MAX_ARRAY_DIMS) {
throw std::runtime_error("too many dimensions");
}
for (int i = 0; i < dims.size();i++) {
volume *= dims[i];
shape_[i] = dims[i];
}
data_ = std::make_shared<ArrayData>(sizeof(T) * volume);
for (int i = 0; i < ndim_; i++) {
volume /= shape_[i];
strides_[i] = volume;
}
}
Array(std::shared_ptr<ArrayData> data, int offset, int ndim, const int* shape, const int* strides)
: ndim_(ndim), offset_(offset), data_(data) {
for (int i = 0; i < ndim_; i++) {
strides_[i] = strides[i];
shape_[i] = shape[i];
}
}
bool is_transpose() const {
return ndim_ == 2 && strides_[0] == 1;
}
Array<T> subtensor(int idx) const {
Array<T> out(
data_,
offset_ + idx * strides_[0],
ndim_ - 1,
shape_ + 1,
strides_ + 1
);
return out;
}
inline T& operator[](const int& index) {
return data()[index];
}
inline const T& operator[](const int& index) const {
return data()[index];
}
Array<T>& operator=(const Arange& a) {
a(*this);
return *this;
}
Array<T> reshape(std::vector<int> new_shape) const {
int vol = numel();
int new_vol = hypercube_volume(new_shape.data(), new_shape.size());
if (vol != new_vol) {
throw std::runtime_error("reshape must have same number of elements.");
}
return Array<T>(
new_shape,
offset_,
data_
);
}
int numel() const {
return hypercube_volume(shape_, ndim_);
}
Array<T> ravel() const {
return Array<T>(
{numel()},
offset_,
data_
);
}
void print() const {
return print(0, false);
}
int ndim() const {
return ndim_;
}
void print(int indent, bool print_comma) const {
if (this->ndim() == 1) {
std::cout << std::string(indent, ' ') << "[";
for (int i = 0; i < this->shape_[0]; i++) {
std::cout << (*this)[i];
if (i != this->shape_[0] - 1) {
std::cout << " ";
}
}
if (print_comma) {
std::cout << "],\n";
} else {
std::cout << "]\n";
}
} else if (this->ndim() > 1) {
std::cout << std::string(indent, ' ') << "[\n";
for (int i = 0; i < this->shape_[0]; i++) {
auto child = subtensor(i);
child.print(indent + INDENT_INCREMENT, i != this->shape_[0] - 1);
}
if (print_comma) {
std::cout << std::string(indent, ' ') << "],\n";
} else {
std::cout << std::string(indent, ' ') << "]\n";
}
} else {
std::cout << std::string(indent, ' ') << "()\n" << std::endl;
}
}
inline const T* data() const {
return (T*)data_->ptr_ + offset_;
}
inline T* data() {
return (T*)data_->ptr_ + offset_;
}
};
template<typename T1, typename T2>
bool equals(const Array<T1>& left, const Array<T2>& right, double atol=1e-6) {
return false;
}
template<typename T>
bool equals(const Array<T>& left, const Array<T>& right, double atol=1e-6) {
if (left.numel() != right.numel()) {
return false;
}
if (left.ndim_ != right.ndim_) {
return false;
}
for (int i = 0; i < left.ndim_; i++) {
if (left.shape_[i] != right.shape_[i]) {
return false;
}
}
auto raveled_left = left.ravel();
auto raveled_right = right.ravel();
for (int i = 0; i < left.numel(); i++) {
if (std::abs(raveled_left.data()[i] - raveled_right.data()[i]) >= atol) {
return false;
}
}
return true;
}
template<typename T>
struct GemmEngine {
};
struct GemmEngineMulticore {
static std::unique_ptr<ThreadPool> pool_;
template<typename T>
void operator()(bool transa, bool transb,
int m, int n, int k, T alpha,
const T *A, int lda, const T *B, int ldb,
T beta, T *C, int ldc) {
int num_chunks = pool_->size();
int chunk_size = division_ceil(n, num_chunks);
std::atomic<int> count(0);
for (int i = 0; i < num_chunks; ++i) {
int range = std::min((i + 1) * chunk_size, n) - i * chunk_size;
pool_->run([=, &count](){
if (range > 0) {
GemmEngine<T>()(
transa,
transb,
m,
range,
k,
alpha,
A,
lda,
B + i * chunk_size * ldb,
ldb,
beta,
C + i * chunk_size * ldc,
ldc
);
}
++count;
});
}
while (count < num_chunks) {}
}
};
template<>
struct GemmEngine<float> {
void operator()(bool transa, bool transb,
int m, int n, int k, float alpha,
const float *A, int lda, const float *B, int ldb,
float beta, float *C, int ldc) {
cblas_sgemm(
CblasColMajor,
transa ? CblasTrans : CblasNoTrans,
transb ? CblasTrans : CblasNoTrans,
m,
n,
k,
alpha,
A,
lda,
B,
ldb,
beta,
C,
ldc
);
}
};
template<>
struct GemmEngine<double> {
void operator()(bool transa, bool transb,
int m, int n, int k, double alpha,
const double *A, int lda, const double *B, int ldb,
double beta, double *C, int ldc) {
cblas_dgemm(
CblasColMajor,
transa ? CblasTrans : CblasNoTrans,
transb ? CblasTrans : CblasNoTrans,
m,
n,
k,
alpha,
A,
lda,
B,
ldb,
beta,
C,
ldc
);
}
};
template<typename LeftT, typename RightT>
Array<typename LeftT::value_type> dot(const LeftT& left, const RightT& right, bool parallel=false) {
static_assert(std::is_same<typename LeftT::value_type, typename RightT::value_type>::value, "dot product on different dtypes not allowed.");
if (left.ndim_ != 2 || right.ndim_ != 2) {
throw std::runtime_error("inputs must both be 2D");
}
if (left.shape_[1] != right.shape_[0]) {
throw std::runtime_error("incompatible blas dims");
}
Array<float> out({left.shape_[0], right.shape_[1]});
bool lhs_trans = left.is_transpose();
int lhs_stride = lhs_trans ? left.strides_[1] : left.strides_[0];
bool rhs_trans = right.is_transpose();
int rhs_stride = rhs_trans ? right.strides_[1] : right.strides_[0];
if (parallel) {
GemmEngineMulticore()(
rhs_trans,
lhs_trans,
right.shape_[1],
left.shape_[0],
right.shape_[0],
typename LeftT::value_type(1.0),
right.data(),
rhs_stride,
left.data(),
lhs_stride,
typename LeftT::value_type(0.0),
out.data(),
out.strides_[0]
);
} else {
GemmEngine<typename LeftT::value_type>()(
rhs_trans,
lhs_trans,
right.shape_[1],
left.shape_[0],
right.shape_[0],
1.0,
right.data(),
rhs_stride,
left.data(),
lhs_stride,
0.0,
out.data(),
out.strides_[0]
);
}
return out;
}
std::unique_ptr<ThreadPool> GemmEngineMulticore::pool_(new ThreadPool(1));
int main(int argc, char** argv) {
int num_threads = std::thread::hardware_concurrency();
GemmEngineMulticore::pool_ = std::unique_ptr<ThreadPool>(new ThreadPool(num_threads, std::chrono::nanoseconds(5)));
bool parallel_execution = argc > 1 ? std::atoi(argv[1]) : false;
std::cout << "parallel_execution = " << (parallel_execution ? "true" : "false") << std::endl;
int leading_dimension = argc > 2 ? std::atoi(argv[2]) : 2000;
std::cout << "leading_dimension = " << leading_dimension << std::endl;
Array<float> a({leading_dimension, 200});
a = Arange();
// a.print();
Array<float> b({200, 200});
b = Arange();
// b.print();
while (true) {
{
utils::Timer t1("dot");
for (int i = 0; i < 100; i++) {
auto c = dot(a, b, true);
}
}
std::cout << ((double) utils::Timer::timers["dot"] / (100000.0)) << "s" << std::endl;
utils::Timer::timers.clear();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment