Skip to content

Instantly share code, notes, and snippets.

@yuhanz
Last active December 3, 2024 06:10
Show Gist options
  • Select an option

  • Save yuhanz/8c47a670de3b59398d58d9facc6d06a3 to your computer and use it in GitHub Desktop.

Select an option

Save yuhanz/8c47a670de3b59398d58d9facc6d06a3 to your computer and use it in GitHub Desktop.
#puzzle1
import numba
import numpy as np
import warnings
from lib import CudaProblem, Coord
warnings.filterwarnings(
action="ignore", category=numba.NumbaPerformanceWarning, module="numba"
)
def map_spec(a):
return a + 10
def map_test(cuda):
def call(out, a) -> None:
local_i = cuda.threadIdx.x
out[local_i] = a[local_i] + 10
return call
SIZE = 4
out = np.zeros((SIZE,))
a = np.arange(SIZE)
problem = CudaProblem(
"Map", map_test, [a], out, threadsperblock=Coord(SIZE, 1), spec=map_spec
)
problem.show()
problem.check()
import numba
import numpy as np
import warnings
from lib import CudaProblem, Coord
warnings.filterwarnings(
action="ignore", category=numba.NumbaPerformanceWarning, module="numba"
)
def dot_spec(a, b):
return a @ b
TPB = 8
def dot_test(cuda):
def call(out, a, b, size) -> None:
shared = cuda.shared.array(TPB, numba.float32)
i = cuda.blockIdx.x * cuda.blockDim.x + cuda.threadIdx.x
local_i = cuda.threadIdx.x
ar = a[i]
br = b[i]
shared[i] = ar * br
cuda.syncthreads()
if(i == 0):
s = 0
for i in range(0, size):
s += shared[i]
out[0] = s
# FILL ME IN (roughly 9 lines)
return call
SIZE = 8
out = np.zeros(1)
a = np.arange(SIZE)
b = np.arange(SIZE)
problem = CudaProblem(
"Dot",
dot_test,
[a, b],
out,
[SIZE],
threadsperblock=Coord(SIZE, 1),
blockspergrid=Coord(1, 1),
spec=dot_spec,
)
problem.show()
problem.check()
import numba
import numpy as np
import warnings
from lib import CudaProblem, Coord
warnings.filterwarnings(
action="ignore", category=numba.NumbaPerformanceWarning, module="numba"
)
def conv_spec(a, b):
out = np.zeros(*a.shape)
len = b.shape[0]
for i in range(a.shape[0]):
out[i] = sum([a[i + j] * b[j] for j in range(len) if i + j < a.shape[0]])
return out
MAX_CONV = 4
TPB = 8
TPB_MAX_CONV = TPB + MAX_CONV
def conv_test(cuda):
def call(out, a, b, a_size, b_size) -> None:
i = cuda.blockIdx.x * cuda.blockDim.x + cuda.threadIdx.x
local_i = cuda.threadIdx.x
# FILL ME IN (roughly 17 lines)
shared_a = cuda.shared.array(TPB_MAX_CONV, numba.float32)
shared_b = cuda.shared.array(TPB_MAX_CONV, numba.float32)
if i < a_size:
shared_a[i] = a[i]
if i < b_size:
shared_b[i] = b[i]
cuda.syncthreads()
s = 0
for j in range(0, b_size):
s += shared_b[i + j] * shared_a[i]
# cuda.syncthreads()
if i < a_size:
out[i] = s
print('Thread i:', i)
return call
# Test 1
SIZE = 6
CONV = 3
out = np.zeros(SIZE)
a = np.arange(SIZE)
b = np.arange(CONV)
problem = CudaProblem(
"1D Conv (Simple)",
conv_test,
[a, b],
out,
[SIZE, CONV],
Coord(1, 1),
Coord(TPB, 1),
spec=conv_spec,
)
problem.show()
problem.check()
import numba
import numpy as np
import warnings
from lib import CudaProblem, Coord
warnings.filterwarnings(
action="ignore", category=numba.NumbaPerformanceWarning, module="numba"
)
TPB = 8
def sum_spec(a):
out = np.zeros((a.shape[0] + TPB - 1) // TPB)
for j, i in enumerate(range(0, a.shape[-1], TPB)):
out[j] = a[i : i + TPB].sum()
return out
def sum_test(cuda):
def call(out, a, size: int) -> None:
cache = cuda.shared.array(TPB, numba.float32)
i = cuda.blockIdx.x * cuda.blockDim.x + cuda.threadIdx.x
local_i = cuda.threadIdx.x
# FILL ME IN (roughly 12 lines)
if i < size:
cache[local_i] = a[i]
cuda.syncthreads()
max_skip = 1
for j in range(0,10):
new_max_skip = 2 ** j
if size <= new_max_skip:
break
max_skip = new_max_skip
skip = 1
while skip <= max_skip:
s = cache[local_i]
cuda.syncthreads()
if local_i + skip < TPB:
v = cache[local_i + skip]
else:
v = 0
cuda.syncthreads()
cache[local_i] = s + v
cuda.syncthreads()
skip *= 2
cuda.syncthreads()
out[cuda.blockIdx.x] = cache[local_i]
return call
# Test 1
SIZE = 8
out = np.zeros(1)
inp = np.arange(SIZE)
problem = CudaProblem(
"Sum (Simple)",
sum_test,
[inp],
out,
[SIZE],
Coord(1, 1),
Coord(TPB, 1),
spec=sum_spec,
)
problem.show()
problem.check()
# Test 2
print('Test 2')
SIZE = 15
out = np.zeros(2)
inp = np.arange(SIZE)
problem = CudaProblem(
"Sum (Full)",
sum_test,
[inp],
out,
[SIZE],
Coord(2, 1),
Coord(TPB, 1),
spec=sum_spec,
)
problem.show()
problem.check()
import numba
import numpy as np
import warnings
from lib import CudaProblem, Coord
warnings.filterwarnings(
action="ignore", category=numba.NumbaPerformanceWarning, module="numba"
)
TPB = 8
def sum_spec(a):
out = np.zeros((a.shape[0], (a.shape[1] + TPB - 1) // TPB))
for j, i in enumerate(range(0, a.shape[-1], TPB)):
out[..., j] = a[..., i : i + TPB].sum(-1)
return out
def axis_sum_test(cuda):
def call(out, a, size: int) -> None:
cache = cuda.shared.array(TPB, numba.float32)
i = cuda.blockIdx.y * cuda.blockDim.x + cuda.threadIdx.x
local_i = cuda.threadIdx.x
batch = cuda.blockIdx.y
print('---cuda.threadIdx.x', cuda.threadIdx.x)
print('cuda.blockIdx.x', cuda.blockIdx.x)
print('cuda.blockIdx.y', cuda.blockIdx.y)
print('cuda.blockDim.x', cuda.blockDim.x)
print('cuda.blockDim.y', cuda.blockDim.y)
# FILL ME IN (roughly 12 lines)
if local_i < size:
s = a[ batch, local_i ]
else:
s = 0
cache[local_i] = s
cuda.syncthreads()
print('s initially=', s, 'i=', i)
step = 1
while step <= int(TPB / 2):
s = cache[local_i]
index = local_i + step
if index < TPB:
v = cache[index]
else:
v = 0
s += v
if i == 0:
print('summing with index=', index, '. v=', v, 's_new=', s)
step *= 2
cuda.syncthreads()
cache[local_i] = s
cuda.syncthreads()
print('s at the end=', s, 'i=', i)
if local_i == 0:
out[batch, 0] = s
return call
BATCH = 4
SIZE = 6
out = np.zeros((BATCH, 1))
inp = np.arange(BATCH * SIZE).reshape((BATCH, SIZE))
problem = CudaProblem(
"Axis Sum",
axis_sum_test,
[inp],
out,
[SIZE],
Coord(1, BATCH),
Coord(TPB, 1),
spec=sum_spec,
)
problem.show()
problem.check()
import numba
import numpy as np
import warnings
from lib import CudaProblem, Coord
warnings.filterwarnings(
action="ignore", category=numba.NumbaPerformanceWarning, module="numba"
)
def matmul_spec(a, b):
return a @ b
TPB = 3
def mm_oneblock_test(cuda):
def call(out, a, b, size: int) -> None:
a_shared = cuda.shared.array((TPB, TPB), numba.float32)
b_shared = cuda.shared.array((TPB, TPB), numba.float32)
i = cuda.blockIdx.x * cuda.blockDim.x + cuda.threadIdx.x
j = cuda.blockIdx.y * cuda.blockDim.y + cuda.threadIdx.y
local_i = cuda.threadIdx.x
local_j = cuda.threadIdx.y
# FILL ME IN (roughly 14 lines)
if i >= size or j >= size:
v_a = 0
v_b = 0
else:
v_a = a[i,j]
v_b = b[i,j]
a_shared[local_i, local_j] = v_a
b_shared[local_i, local_j] = v_b
cuda.syncthreads()
s = 0
for ii in range(0, size):
s += a_shared[local_i, ii] * b_shared[ii, local_j]
if i < size and j < size:
out[i, j] = s
cuda.syncthreads()
return call
# Test 1
SIZE = 2
out = np.zeros((SIZE, SIZE))
inp1 = np.arange(SIZE * SIZE).reshape((SIZE, SIZE))
inp2 = np.arange(SIZE * SIZE).reshape((SIZE, SIZE)).T
problem = CudaProblem(
"Matmul (Simple)",
mm_oneblock_test,
[inp1, inp2],
out,
[SIZE],
Coord(1, 1),
Coord(TPB, TPB),
spec=matmul_spec,
)
problem.show(sparse=True)
problem.check()
import numba
import numpy as np
import warnings
from lib import CudaProblem, Coord
def zip_spec(a, b):
return a + b
def zip_test(cuda):
def call(out, a, b) -> None:
local_i = cuda.threadIdx.x
out[local_i] = a[local_i] + b[local_i]
return call
SIZE = 4
out = np.zeros((SIZE,))
a = np.arange(SIZE)
b = np.arange(SIZE)
problem = CudaProblem(
"Zip", zip_test, [a, b], out, threadsperblock=Coord(SIZE, 1), spec=zip_spec
)
problem.show()
problem.check()
import numba
import numpy as np
import warnings
from lib import CudaProblem, Coord
def map_spec(a):
return a + 10
def map_guard_test(cuda):
def call(out, a, size) -> None:
local_i = cuda.threadIdx.x
if local_i < size:
out[local_i] = a[local_i] + 10
return call
SIZE = 4
out = np.zeros((SIZE,))
a = np.arange(SIZE)
problem = CudaProblem(
"Guard",
map_guard_test,
[a],
out,
[SIZE],
threadsperblock=Coord(8, 1),
spec=map_spec,
)
problem.show()
problem.check()
import numba
import numpy as np
import warnings
from lib import CudaProblem, Coord
warnings.filterwarnings(
action="ignore", category=numba.NumbaPerformanceWarning, module="numba"
)
def map_spec(a):
return a + 10
def map_block_test(cuda):
def call(out, a, size) -> None:
i = cuda.blockIdx.x * cuda.blockDim.x + cuda.threadIdx.x
if i < size:
out[i] = a[i] + 10
return call
SIZE = 9
out = np.zeros((SIZE,))
a = np.arange(SIZE)
problem = CudaProblem(
"Blocks",
map_block_test,
[a],
out,
[SIZE],
threadsperblock=Coord(4, 1),
blockspergrid=Coord(3, 1),
spec=map_spec,
)
problem.show()
problem.check()
import numba
import numpy as np
import warnings
from lib import CudaProblem, Coord
warnings.filterwarnings(
action="ignore", category=numba.NumbaPerformanceWarning, module="numba"
)
def map_spec(a):
return a + 10
def map_block2D_test(cuda):
def call(out, a, size) -> None:
i = cuda.blockIdx.x * cuda.blockDim.x + cuda.threadIdx.x
j = cuda.blockIdx.y * cuda.blockDim.y + cuda.threadIdx.y
if i < size and j < size:
out[i,j] = a[i,j] + 10
return call
SIZE = 5
out = np.zeros((SIZE, SIZE))
a = np.ones((SIZE, SIZE))
problem = CudaProblem(
"Blocks 2D",
map_block2D_test,
[a],
out,
[SIZE],
threadsperblock=Coord(3, 3),
blockspergrid=Coord(2, 2),
spec=map_spec,
)
problem.show()
problem.check()
import numba
import numpy as np
import warnings
from lib import CudaProblem, Coord
warnings.filterwarnings(
action="ignore", category=numba.NumbaPerformanceWarning, module="numba"
)
TPB = 4
def shared_test(cuda):
def call(out, a, size) -> None:
shared = cuda.shared.array(TPB, numba.float32)
i = cuda.blockIdx.x * cuda.blockDim.x + cuda.threadIdx.x
local_i = cuda.threadIdx.x
if i < local_i:
shared[local_i] = a[i]
cuda.syncthreads()
out[i] = shared[local_i] + 10
else:
# FILL ME IN (roughly 2 lines)
shared[local_i] = a[i]
cuda.syncthreads()
out[i] = shared[local_i] + 10
return call
def map_spec(a):
return a + 10
SIZE = 8
out = np.zeros(SIZE)
a = np.ones(SIZE)
problem = CudaProblem(
"Shared",
shared_test,
[a],
out,
[SIZE],
threadsperblock=Coord(TPB, 1),
blockspergrid=Coord(2, 1),
spec=map_spec,
)
problem.show()
problem.check()
import numba
import numpy as np
import warnings
from lib import CudaProblem, Coord
warnings.filterwarnings(
action="ignore", category=numba.NumbaPerformanceWarning, module="numba"
)
def pool_spec(a):
out = np.zeros(*a.shape)
for i in range(a.shape[0]):
out[i] = a[max(i - 2, 0) : i + 1].sum()
return out
TPB = 8
def pool_test(cuda):
def call(out, a, size) -> None:
shared = cuda.shared.array(TPB, numba.float32)
i = cuda.blockIdx.x * cuda.blockDim.x + cuda.threadIdx.x
local_i = cuda.threadIdx.x
shared[local_i] = a[i]
cuda.syncthreads()
a = 0
b = 0
if i >= 1:
a = shared[local_i-1]
if i >= 2:
b = shared[local_i-2]
cuda.syncthreads()
shared[local_i] += a + b
cuda.syncthreads()
out[i] = shared[local_i]
return call
SIZE = 8
out = np.zeros(SIZE)
a = np.arange(SIZE)
problem = CudaProblem(
"Pooling",
pool_test,
[a],
out,
[SIZE],
threadsperblock=Coord(TPB, 1),
blockspergrid=Coord(1, 1),
spec=pool_spec,
)
problem.show()
problem.check()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment