Last active
April 12, 2025 15:32
-
-
Save NinoRisteski/22f9d6dc308d3f1331e22a3eb6ede2c7 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from dataclasses import dataclass | |
| import torch | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| import math | |
| import os | |
| class LayerNorm(nn.Module): | |
| """ LayerNorm but with an optional bias. PyTorch doesn't support simply bias=False """ | |
| def __init__(self, ndim, bias): | |
| super().__init__() | |
| self.weight = nn.Parameter(torch.ones(ndim)) | |
| self.bias = nn.Parameter(torch.zeros(ndim)) if bias else None | |
| def forward(self, input): | |
| return F.layer_norm(input, self.weight.shape, self.weight, self.bias, 1e-5) | |
| #build this 4th | |
| class CausalSelfAttention(nn.Module): | |
| def __init__(self, config): | |
| super().__init__() | |
| assert config.n_embd % config.n_head == 0 | |
| # key, query, value projections for all heads, but in a batch | |
| self.c_attn = nn.Linear(config.n_embd, 3 * config.n_embd) | |
| # output projection | |
| self.c_proj = nn.Linear(config.n_embd, config.n_embd) | |
| self.c_proj.NANOGPT_SCALE_INIT = 1 | |
| # regularization | |
| self.n_head = config.n_head | |
| self.n_embd = config.n_embd | |
| def forward(self, x): | |
| B, T, C = x.size() # batch size, sequence length, embedding dimensionality (n_embd) | |
| # calculate query, key, values for all heads in batch and move head forward to be the batch dim | |
| # nh is "number of heads", hs is "head size", and C (number of channels) = nh * hs | |
| # e.g. in GPT-2 (124M), n_head=12, hs=64, so nh*hs=C=768 channels in the Transformer | |
| qkv = self.c_attn(x) | |
| q, k, v = qkv.split(self.n_embd, dim=2) | |
| k = k.view(B, T, self.n_head, C // self.n_head).transpose(1, 2) # (B, nh, T, hs) | |
| q = q.view(B, T, self.n_head, C // self.n_head).transpose(1, 2) # (B, nh, T, hs) | |
| v = v.view(B, T, self.n_head, C // self.n_head).transpose(1, 2) # (B, nh, T, hs) | |
| y = F.scaled_dot_product_attention(q, k, v, is_causal=True) # flash attention | |
| y = y.transpose(1, 2).contiguous().view(B, T, C) # re-assemble all head outputs side by side | |
| # output projection | |
| y = self.c_proj(y) | |
| return y | |
| #------------------------------ | |
| #3rd this | |
| class MLP(nn.Module): | |
| def __init__(self, config): | |
| super().__init__() | |
| self.c_fc = nn.Linear(config.n_embd, 4 * config.n_embd) | |
| self.gelu = nn.GELU(approximate='tanh') | |
| self.c_proj = nn.Linear(4 * config.n_embd, config.n_embd) | |
| self.c_proj.NANOGPT_SCALE_INIT = 1 | |
| def forward(self, x): | |
| x = self.c_fc(x) | |
| x = self.gelu(x) | |
| x = self.c_proj(x) | |
| return x | |
| #-------------------------------- | |
| #2nd build this | |
| class Block(nn.Module): | |
| def __init__(self, config): | |
| super().__init__() | |
| self.ln_1 = nn.LayerNorm(config.n_embd) | |
| self.attn = CausalSelfAttention(config) | |
| self.ln_2 = nn.LayerNorm(config.n_embd) | |
| self.mlp = MLP(config) | |
| def forward(self, x): | |
| x = x + self.attn(self.ln_1(x)) | |
| x = x + self.mlp(self.ln_2(x)) | |
| return x | |
| #-------------------------------- | |
| #first build this | |
| @dataclass | |
| class GPTConfig: | |
| block_size: int = 1024 | |
| vocab_size: int = 50257 | |
| n_layer: int = 12 | |
| n_head: int = 12 | |
| n_embd: int = 768 | |
| bias: bool = True | |
| dropout: float = 0.0 | |
| class GPT(nn.Module): | |
| def __init__(self, config): | |
| super().__init__() | |
| self.config = config | |
| self.transformer = nn.ModuleDict(dict( | |
| wte = nn.Embedding(config.vocab_size, config.n_embd), | |
| wpe = nn.Embedding(config.block_size, config.n_embd), | |
| h = nn.ModuleList([Block(config) for _ in range(config.n_layer)]), | |
| ln_f = nn.LayerNorm(config.n_embd), | |
| )) | |
| self.lm_head = nn.Linear(config.n_embd, config.vocab_size, bias=False) | |
| # weight sharing scheme | |
| self.transformer.wte.weight = self.lm_head.weight | |
| # init params | |
| self.apply(self._init_weights) | |
| def _init_weights(self, module): | |
| if isinstance(module, nn.Linear): | |
| std = 0.02 | |
| if hasattr(module, 'NANOGPT_SCALE_INIT'): | |
| std *= (2 * self.config.n_layer) ** -0.5 | |
| torch.nn.init.normal_(module.weight, mean=0.0, std=std) | |
| if module.bias is not None: | |
| torch.nn.init.zeros_(module.bias) | |
| elif isinstance(module, nn.Embedding): | |
| torch.nn.init.normal_(module.weight, mean=0.0, std=0.02) | |
| def forward(self, idx, targets=None): | |
| # idx is of shape (B, T) | |
| B, T = idx.size() | |
| assert T <= self.config.block_size, f"Cannot forward sequence of length {T}, block size is only {self.config.block_size}" | |
| # forward the token and posisition embeddings | |
| pos = torch.arange(0, T, dtype=torch.long, device=idx.device) # shape (T) | |
| pos_emb = self.transformer.wpe(pos) # position embeddings of shape (T, n_embd) | |
| tok_emb = self.transformer.wte(idx) # token embeddings of shape (B, T, n_embd) | |
| x = tok_emb + pos_emb | |
| # forward the blocks of the transformer | |
| for block in self.transformer.h: | |
| x = block(x) | |
| # forward the final layernorm and the classifier | |
| x = self.transformer.ln_f(x) | |
| logits = self.lm_head(x) # (B, T, vocab_size) | |
| loss = None | |
| if targets is not None: | |
| loss = F.cross_entropy(logits.view(-1, logits.size(-1)), targets.view(-1)) | |
| return logits, loss | |
| @classmethod | |
| def from_pretrained(cls, model_type): | |
| """Loads pretrained GPT-2 model weights from huggingface""" | |
| assert model_type in {'gpt2', 'gpt2-medium', 'gpt2-large', 'gpt2-xl'} | |
| from transformers import GPT2LMHeadModel | |
| print("loading weights from pretrained gpt: %s" % model_type) | |
| # n_layer, n_head and n_embd are determined from model_type | |
| config_args = { | |
| 'gpt2': dict(n_layer=12, n_head=12, n_embd=768), # 124M params | |
| 'gpt2-medium': dict(n_layer=24, n_head=16, n_embd=1024), # 350M params | |
| 'gpt2-large': dict(n_layer=36, n_head=20, n_embd=1280), # 774M params | |
| 'gpt2-xl': dict(n_layer=48, n_head=25, n_embd=1600), # 1558M params | |
| }[model_type] | |
| config_args['vocab_size'] = 50257 # always 50257 for GPT model checkpoints | |
| config_args['block_size'] = 1024 # always 1024 for GPT model checkpoints | |
| # create a from-scratch initialized minGPT model | |
| config = GPTConfig(**config_args) | |
| model = GPT(config) | |
| sd = model.state_dict() | |
| sd_keys = sd.keys() | |
| sd_keys = [k for k in sd_keys if not k.endswith('.attn.bias')] # discard this mask / buffer, not a param | |
| # init a huggingface/transformers model | |
| model_hf = GPT2LMHeadModel.from_pretrained(model_type) | |
| sd_hf = model_hf.state_dict() | |
| # copy while ensuring all of the parameters are aligned and match in names and shapes | |
| sd_keys_hf = sd_hf.keys() | |
| sd_keys_hf = [k for k in sd_keys_hf if not k.endswith('.attn.masked_bias')] # ignore these, just a buffer | |
| sd_keys_hf = [k for k in sd_keys_hf if not k.endswith('.attn.bias')] # same, just the mask (buffer) | |
| transposed = ['attn.c_attn.weight', 'attn.c_proj.weight', 'mlp.c_fc.weight', 'mlp.c_proj.weight'] | |
| # basically the openai checkpoints use a "Conv1D" module, but we only want to use a vanilla Linear | |
| # this means that we have to transpose these weights when we import them | |
| assert len(sd_keys_hf) == len(sd_keys), f"mismatched keys: {len(sd_keys_hf)} != {len(sd_keys)}" | |
| for k in sd_keys_hf: | |
| if any(k.endswith(w) for w in transposed): | |
| # special treatment for the Conv1D weights we need to transpose | |
| assert sd_hf[k].shape[::-1] == sd[k].shape | |
| with torch.no_grad(): | |
| sd[k].copy_(sd_hf[k].t()) | |
| else: | |
| # vanilla copy over the other parameters | |
| assert sd_hf[k].shape == sd[k].shape | |
| with torch.no_grad(): | |
| sd[k].copy_(sd_hf[k]) | |
| return model | |
| import tiktoken | |
| class DataLoaderLite: | |
| def __init__(self, B, T): | |
| self.B = B | |
| self.T = T | |
| # at init load tokens from disk and store them in memory | |
| with open('shakespeare.txt', 'r', encoding='utf-8') as f: | |
| text = f.read() | |
| enc = tiktoken.get_encoding('gpt2') | |
| tokens = enc.encode(text) | |
| self.tokens = torch.tensor(tokens) | |
| print(f"loaded {len(self.tokens)} tokens") | |
| print(f"1 epoch = {len(self.tokens) // (B * T)} batches") | |
| # state | |
| self.current_position = 0 | |
| def next_batch(self): | |
| B, T = self.B, self.T | |
| buf = self.tokens[self.current_position : self.current_position+B*T+1] | |
| x = (buf[:-1]).view(B, T) # inputs | |
| y = (buf[1:]).view(B, T) # targets | |
| # advance the position in the tensor | |
| self.current_position += B * T | |
| # if loading the next batch would be out of bounds, reset | |
| if self.current_position + (B * T + 1) > len(self.tokens): | |
| self.current_position = 0 | |
| return x, y | |
| #-------------------------------- | |
| # attempt to autodetect the device | |
| import time | |
| device = "cpu" | |
| if torch.cuda.is_available(): | |
| device = "cuda" | |
| print(f"CUDA available: {torch.cuda.device_count()} device(s)") | |
| print(f"CUDA device name: {torch.cuda.get_device_name(0)}") | |
| elif hasattr(torch.backends, "mps") and torch.backends.mps.is_available(): | |
| device = "mps" | |
| print(f"using device: {device}") | |
| torch.manual_seed(1337) | |
| if torch.cuda.is_available(): | |
| torch.cuda.manual_seed(1337) | |
| # Check if the shakespeare.txt file exists | |
| if not os.path.exists('shakespeare.txt'): | |
| print("Error: shakespeare.txt file not found. Please make sure it's in the current directory.") | |
| import sys; sys.exit(1) | |
| # Increased batch size and sequence length for V100 | |
| train_loader = DataLoaderLite(B=16, T=1024) # increased from B=4, T=512 for V100 | |
| torch.set_float32_matmul_precision('high') | |
| # get logits | |
| model = GPT(GPTConfig()) | |
| model.to(device) | |
| model = torch.compile(model) | |
| # optimize! | |
| optimizer = torch.optim.AdamW(model.parameters(), lr=3e-4) | |
| # Increased number of iterations for better training | |
| for i in range(1000): | |
| t0 = time.time() | |
| x, y = train_loader.next_batch() | |
| x, y = x.to(device), y.to(device) | |
| optimizer.zero_grad() | |
| with torch.autocast(device_type=device, dtype=torch.bfloat16): | |
| logits, loss = model(x, y) | |
| loss.backward() | |
| optimizer.step() | |
| torch.cuda.synchronize() # wait for the GPU to finish work | |
| t1 = time.time() | |
| dt = (t1 - t0)*1000 # time difference in miliseconds | |
| tokens_per_sec = (train_loader.B * train_loader.T) / (t1 - t0) | |
| print(f"step {i}, loss: {loss.item()}, dt: {dt:.2f}ms, tok/sec: {tokens_per_sec:.2f}") | |
| # Save checkpoint every 100 iterations | |
| if i % 100 == 0: | |
| torch.save({ | |
| 'model_state_dict': model.state_dict(), | |
| 'optimizer_state_dict': optimizer.state_dict(), | |
| 'loss': loss, | |
| 'step': i, | |
| }, f'checkpoint_step_{i}.pt') | |
| import sys; sys.exit(0) | |
| # prefix tokens | |
| model.eval() | |
| num_return_sequences = 5 | |
| max_length = 30 | |
| tokens = enc.encode("Hello, I'm a language model,") | |
| tokens = torch.tensor(tokens, dtype=torch.long) # (8,) | |
| tokens = tokens.unsqueeze(0).repeat(num_return_sequences, 1) # (5, 8) | |
| x = tokens.to(device) | |
| # generate! right now x is (B, T) where B = 5, T = 8 | |
| # set the seed to 42 | |
| torch.manual_seed(42) | |
| torch.cuda.manual_seed(42) | |
| while x.size(1) < max_length: | |
| # forward the model to get the logits | |
| with torch.no_grad(): | |
| logits = model(x) # (B, T, vocab_size) | |
| # take the logits at the last position | |
| logits = logits[:, -1, :] # (B, vocab_size) | |
| # get the probabilities | |
| probs = F.softmax(logits, dim=-1) | |
| # do top-k sampling of 50 (huggingface pipeline default) | |
| # topk_probs here becomes (5, 50), topk_indices is (5, 50) | |
| topk_probs, topk_indices = torch.topk(probs, 50, dim=-1) | |
| # select a token from the top-k probabilities | |
| # note: multinomial does not demand the input to sum to 1 | |
| ix = torch.multinomial(topk_probs, 1) # (B, 1) | |
| # gather the corresponding indices | |
| xcol = torch.gather(topk_indices, -1, ix) # (B, 1) | |
| # append to the sequence | |
| x = torch.cat((x, xcol), dim=1) | |
| # print the generated text | |
| for i in range(num_return_sequences): | |
| tokens = x[i, :max_length].tolist() | |
| decoded = enc.decode(tokens) | |
| print(">", decoded) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment