Last active
March 12, 2026 00:43
-
-
Save mplekh/3afbdfb9f063cf531cfd3d00685cfdc0 to your computer and use it in GitHub Desktop.
Karpathy's microgpt modified to use Wengert Tape architecture (Flat Array of Values instead of Graph of Objects)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import math | |
| import random | |
| random.seed(42) | |
| # ----------------------------------------------------------------------------- | |
| # Tape-based Autograd Engine | |
| # ----------------------------------------------------------------------------- | |
| class Tape: | |
| """ | |
| A simple autograd engine that records operations on a 'tape' | |
| to perform reverse-mode differentiation. | |
| """ | |
| def __init__(self): | |
| self.data = [] # Values of nodes | |
| self.grad = [] # Gradients of nodes | |
| self.children = [] # Indices of parent nodes (c1, c2) | |
| self.local_grads = [] # Local derivatives w.r.t parents | |
| def val(self, x): | |
| """Creates a leaf node (constant or parameter).""" | |
| idx = len(self.data) | |
| self.data.append(float(x)) | |
| self.grad.append(0.0) | |
| self.children.append((None, None)) | |
| self.local_grads.append((0.0, 0.0)) | |
| return idx | |
| def push(self, val, children, local_grads): | |
| """Records a new operation on the tape.""" | |
| idx = len(self.data) | |
| self.data.append(val) | |
| self.grad.append(0.0) | |
| self.children.append(children) | |
| self.local_grads.append(local_grads) | |
| return idx | |
| def truncate(self, size): | |
| """Resets the tape to a specific size, keeping persistent weights.""" | |
| self.data = self.data[:size] | |
| self.grad = self.grad[:size] | |
| self.children = self.children[:size] | |
| self.local_grads = self.local_grads[:size] | |
| def zero_grad(self): | |
| """Resets all gradients to zero.""" | |
| self.grad = [0.0] * len(self.grad) | |
| def backward(self, root_idx): | |
| """Performs backpropagation from the given root index.""" | |
| self.grad[root_idx] = 1.0 | |
| # Iterate backwards through the tape (reverse topological order) | |
| for i in range(root_idx, -1, -1): | |
| g = self.grad[i] | |
| if g == 0: continue | |
| (c1, c2), (l1, l2) = self.children[i], self.local_grads[i] | |
| if c1 is not None: | |
| self.grad[c1] += l1 * g | |
| if c2 is not None: | |
| self.grad[c2] += l2 * g | |
| # --- Tape Operations (Primitives) --- | |
| def add(t, a, b): | |
| return t.push(t.data[a] + t.data[b], (a, b), (1.0, 1.0)) | |
| def mul(t, a, b): | |
| return t.push(t.data[a] * t.data[b], (a, b), (t.data[b], t.data[a])) | |
| def mul_const(t, a, c): | |
| return t.push(t.data[a] * c, (a, None), (c, 0.0)) | |
| def pow_const(t, a, p): | |
| val = t.data[a] | |
| return t.push(val**p, (a, None), (p * val**(p-1) if val != 0 else 0.0, 0.0)) | |
| def div(t, a, b): | |
| return mul(t, a, pow_const(t, b, -1)) | |
| def log(t, a): | |
| eps = 1e-10 | |
| val = max(eps, t.data[a]) | |
| return t.push(math.log(val), (a, None), (1.0 / val, 0.0)) | |
| def relu(t, a): | |
| return t.push(max(0, t.data[a]), (a, None), (float(t.data[a] > 0), 0.0)) | |
| # ----------------------------------------------------------------------------- | |
| # Transformer Components | |
| # ----------------------------------------------------------------------------- | |
| def linear(t, x, weight_mat): | |
| """Standard Y = XW transformation.""" | |
| out = [] | |
| for row in weight_mat: | |
| acc = t.val(0.0) | |
| for xi, wi in zip(x, row): | |
| acc = add(t, acc, mul(t, xi, wi)) | |
| out.append(acc) | |
| return out | |
| def rmsnorm(t, x): | |
| """Root Mean Square Layer Normalization.""" | |
| ss = t.val(0.0) | |
| for xi in x: | |
| ss = add(t, ss, mul(t, xi, xi)) | |
| ms = div(t, ss, t.val(len(x))) | |
| ms_e = add(t, ms, t.val(1e-5)) | |
| inv_std = pow_const(t, ms_e, -0.5) | |
| return [mul(t, xi, inv_std) for xi in x] | |
| def softmax(t, logits): | |
| """Computes softmax probabilities over a list of logit indices.""" | |
| max_val = max(t.data[l] for l in logits) | |
| exps = [] | |
| for l in logits: | |
| # Shift for numerical stability | |
| shifted = t.push(t.data[l] - max_val, (l, None), (1.0, 0.0)) | |
| exps.append(t.push(math.exp(t.data[shifted]), (shifted, None), (math.exp(t.data[shifted]), 0.0))) | |
| total = t.val(0.0) | |
| for e in exps: | |
| total = add(t, total, e) | |
| inv_total = pow_const(t, total, -1.0) | |
| return [mul(t, e, inv_total) for e in exps] | |
| # ----------------------------------------------------------------------------- | |
| # GPT Model Function | |
| # ----------------------------------------------------------------------------- | |
| def gpt(t, token_id, pos_id, keys_cache, values_cache, params, cfg): | |
| wte, wpe, lm_head = params[:3] | |
| n_head, head_dim = cfg['n_head'], cfg['head_dim'] | |
| # 1. Input Embeddings (Token + Position) | |
| embedding = [add(t, wte[token_id][i], wpe[pos_id][i]) for i in range(len(wte[0]))] | |
| x = rmsnorm(t, embedding) | |
| # 2. Transformer Layers | |
| for li in range(cfg['n_layer']): | |
| # Extract weights for this layer | |
| wq, wk, wv, wo, f1, f2 = params[3 + 6*li : 3 + 6*(li+1)] | |
| # Attention Block | |
| x_norm = rmsnorm(t, x) | |
| q = linear(t, x_norm, wq) | |
| k = linear(t, x_norm, wk) | |
| v = linear(t, x_norm, wv) | |
| keys_cache[li].append(k) | |
| values_cache[li].append(v) | |
| x_attn = [] | |
| scale = t.val(head_dim**-0.5) | |
| for h in range(n_head): | |
| hs = h * head_dim | |
| q_h = q[hs : hs+head_dim] | |
| # Scaled Dot-Product Attention | |
| attn_logits = [] | |
| for past_k in keys_cache[li]: | |
| dot = t.val(0.0) | |
| for j in range(head_dim): | |
| dot = add(t, dot, mul(t, q_h[j], past_k[hs+j])) | |
| attn_logits.append(mul(t, dot, scale)) | |
| probs = softmax(t, attn_logits) | |
| # Weighted sum of values | |
| for j in range(head_dim): | |
| sum_v = t.val(0.0) | |
| for tidx, past_v in enumerate(values_cache[li]): | |
| sum_v = add(t, sum_v, mul(t, probs[tidx], past_v[hs+j])) | |
| x_attn.append(sum_v) | |
| # Output projection + Residual connection | |
| attn_out = linear(t, x_attn, wo) | |
| x = [add(t, x[i], attn_out[i]) for i in range(len(x))] | |
| # MLP Block | |
| x_norm = rmsnorm(t, x) | |
| mlp_hidden = [relu(t, i) for i in linear(t, x_norm, f1)] | |
| mlp_out = linear(t, mlp_hidden, f2) | |
| x = [add(t, x[i], mlp_out[i]) for i in range(len(x))] | |
| # 3. Language Model Head | |
| return linear(t, x, lm_head) | |
| # ----------------------------------------------------------------------------- | |
| # Initialization & Training Loop | |
| # ----------------------------------------------------------------------------- | |
| # Hyperparameters | |
| input_file = 'input.txt' | |
| num_steps = 1000 | |
| n_layer, n_embd, block_size, n_head = 1, 16, 16, 4 | |
| head_dim = n_embd // n_head | |
| cfg = {'n_head': n_head, 'head_dim': head_dim, 'n_layer': n_layer} | |
| # Data Loading (Assuming input.txt exists) | |
| try: | |
| with open(input_file, 'r') as f: | |
| docs = [l.strip().capitalize() for l in f if l.strip()] | |
| except FileNotFoundError: | |
| docs = [input_file] + "error reading input file not found".split() | |
| num_steps = 80 | |
| random.shuffle(docs) | |
| uchars = sorted(list(set(''.join(docs)))) | |
| BOS, vocab_size = len(uchars), len(uchars) + 1 | |
| print(f"{num_steps=}") | |
| print(f"num docs: {len(docs)}") | |
| print(f"vocab size: {vocab_size}") | |
| tape = Tape() | |
| def init_mat(r, c): | |
| return [[tape.val(random.gauss(0, 0.08)) for _ in range(c)] for _ in range(r)] | |
| # Initialize Persistent Parameters | |
| params = [ | |
| init_mat(vocab_size, n_embd), # wte | |
| init_mat(block_size, n_embd), # wpe | |
| init_mat(vocab_size, n_embd), # lm_head | |
| ] | |
| for _ in range(n_layer): | |
| params.extend([init_mat(n_embd, n_embd) for _ in range(4)]) # q, k, v, o | |
| params.extend([init_mat(4 * n_embd, n_embd), init_mat(n_embd, 4 * n_embd)]) # f1, f2 | |
| param_indices = [idx for mat in params for row in mat for idx in row] | |
| weights_end_idx = len(tape.data) | |
| print(f"num params: {weights_end_idx}") | |
| # Optimizer State (Adam) | |
| m = [0.0] * len(param_indices) | |
| v = [0.0] * len(param_indices) | |
| for step in range(num_steps): | |
| doc = docs[step % len(docs)] | |
| tokens = [BOS] + [uchars.index(c) for c in doc] + [BOS] | |
| keys, values, losses = [[] for _ in range(n_layer)], [[] for _ in range(n_layer)], [] | |
| # Forward Pass | |
| for pos in range(min(len(tokens)-1, block_size)): | |
| logits = gpt(tape, tokens[pos], pos, keys, values, params, cfg) | |
| probs = softmax(tape, logits) | |
| loss = mul_const(tape, log(tape, probs[tokens[pos+1]]), -1.0) | |
| losses.append(loss) | |
| # Average Loss calculation | |
| sum_loss = tape.val(0.0) | |
| for l in losses: sum_loss = add(tape, sum_loss, l) | |
| total_loss_idx = div(tape, sum_loss, tape.val(len(losses))) | |
| # Backward Pass | |
| tape.backward(total_loss_idx) | |
| # Adam Update | |
| b1, b2, eps = 0.85, 0.99, 1e-8 | |
| lr = 0.01 * (1 - step / num_steps) | |
| for i, p_idx in enumerate(param_indices): | |
| g = tape.grad[p_idx] | |
| m[i] = b1 * m[i] + (1 - b1) * g | |
| v[i] = b2 * v[i] + (1 - b2) * (g**2) | |
| m_hat = m[i] / (1 - b1**(step + 1)) | |
| v_hat = v[i] / (1 - b2**(step + 1)) | |
| tape.data[p_idx] -= lr * m_hat / (math.sqrt(v_hat) + eps) | |
| print(f"Step {step+1:4} | Loss: {tape.data[total_loss_idx]:.4f}", end='\r') | |
| tape.truncate(weights_end_idx) | |
| tape.zero_grad() | |
| # --- Inference --- | |
| print("\n--- Generated Samples ---") | |
| for s_idx in range(20): | |
| keys, values = [[] for _ in range(n_layer)], [[] for _ in range(n_layer)] | |
| token_id, sample = BOS, [] | |
| tape.truncate(weights_end_idx) | |
| for pos_id in range(block_size): | |
| logits_idxs = gpt(tape, token_id, pos_id, keys, values, params, cfg) | |
| # Simple sampling with temperature | |
| logits = [tape.data[idx] / 0.5 for idx in logits_idxs] | |
| exp_l = [math.exp(l) for l in logits] | |
| sum_l = sum(exp_l) | |
| probs = [el / sum_l for el in exp_l] | |
| next_token = random.choices(range(vocab_size), weights=probs)[0] | |
| if next_token == BOS: break | |
| token_id = next_token | |
| sample.append(uchars[token_id]) | |
| print(f"{s_idx+1}: {''.join(sample)}") |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Revision 4: support layers, fully equivalent to original https://gist.github.com/karpathy/8627fe009c40f57531cb18360106ce95

but 4x-5x faster:
(for side by side comparison: in line 206 remove ".capitalize()")