Created
November 19, 2025 13:13
-
-
Save andrewparkermorgan/e705fa4ccaa7c6f77efae2de93899a1a to your computer and use it in GitHub Desktop.
make site frequency spectrum from a VCF file
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #! /usr/bin/env python | |
| """ | |
| vcf_to_sfs.py | |
| Compute n-dimensional site frequency spectrum (SFS) from a VCF file. | |
| """ | |
| import os | |
| import sys | |
| import argparse | |
| import logging | |
| import math | |
| import numpy as np | |
| from collections import defaultdict, OrderedDict | |
| from itertools import combinations, product | |
| from cyvcf2 import VCF | |
| from pybedtools import BedTool | |
| parser = argparse.ArgumentParser(description = "Compute n-dimensional SFS from a VCF file.") | |
| parser.add_argument( "-i","--infile", type = str, | |
| default = "-", | |
| help = "path to VCF file [default: stdin]" ) | |
| parser.add_argument( "-L","--length", type = int, | |
| default = None, | |
| help = "nominal length of region analyzed [default: guess from VCF]" ) | |
| parser.add_argument( "-p","--pop", type = argparse.FileType("r"), | |
| required = True, | |
| help = "file containing population labels; one sample per line, like 'sample_id pop_id'" ) | |
| parser.add_argument( "-n","--nonmissing", type = int, | |
| default = 0, | |
| help = "minimum number of nonmissing calls in each population [default: %(default)d]" ) | |
| parser.add_argument( "-P","--ploidy", type = int, | |
| default = 0, | |
| help = "enforce default ploidy, ignoring what is provided in --pop [default: %(default)d]" ) | |
| parser.add_argument( "--folded", action = "store_true", | |
| default = False, | |
| help = "treat spectrum as folded: alleles are coded as major-minor [default: unfolded]" ) | |
| parser.add_argument( "--ref-is-ancestral", action = "store_true", | |
| default = False, | |
| help = "assume REF allele is ancestral state [default: false]" ) | |
| parser.add_argument( "--alt-is-ancestral", action = "store_true", | |
| default = False, | |
| help = "assume first ALT allele is ancestral state [default: false]" ) | |
| parser.add_argument( "-w","--windows", type = argparse.FileType("rU"), nargs = "?", | |
| help = "bed file of intervals in which to compute SFS [default: whole VCF in one swallow]" ) | |
| args = parser.parse_args() | |
| ## start logging trace | |
| logging.basicConfig(level = logging.INFO) | |
| logger = logging.getLogger() | |
| ## read population labels and ploidy | |
| pops = defaultdict(list) | |
| ploidy = defaultdict(int) | |
| for line in args.pop: | |
| if line.strip().startswith("#"): | |
| continue | |
| pieces = line.strip().split() | |
| if len(pieces) < 2: | |
| continue | |
| sm = pieces.pop(0) | |
| p = pieces.pop(0) | |
| nchr = 2 | |
| if args.ploidy > 0: | |
| nchr = args.ploidy | |
| elif len(pieces): | |
| nchr = int(pieces.pop(0)) | |
| pops[p].append(sm) | |
| ploidy[sm] = nchr | |
| ## report populations smallest to biggest | |
| pop_labels = sorted(pops.keys(), key = lambda k: len(pops[k])) | |
| ssz = OrderedDict({ p: sum(ploidy[sm] for sm in pops[p]) for i,p in enumerate(pop_labels) }) | |
| if args.folded: | |
| # TODO: fix for folded spectra | |
| ns = [ int(math.ceil(args.ploidy/2))*(ssz[p])+1 for p in pop_labels ] | |
| else: | |
| ns = [ ssz[p]+1 for p in pop_labels ] | |
| logger.info("Sample sizes: {}".format(ssz)) | |
| ## connect to vcf file | |
| vcf = VCF(args.infile, gts012 = True) | |
| sample_idx = OrderedDict({ pop: [ vcf.samples.index(sm) for sm in pops[pop] ] for pop in pop_labels }) | |
| sample_ploidy = OrderedDict({ pop: np.array([ ploidy[sm] for sm in pops[pop] ], dtype = int) for pop in pop_labels }) | |
| if args.ref_is_ancestral and args.alt_is_ancestral: | |
| logger.info("Both --ref-is-ancestral and --alt-is-ancestral were specified; assuming REF is the ancestral allele.") | |
| ## get iterator over genomic windows (or the 'trivial window' of the whole VCF file) | |
| if args.windows: | |
| windows = BedTool(args.windows) | |
| window_iter = windows | |
| else: | |
| window_iter = [None] | |
| ## initialize output | |
| print("#pops={}".format(",".join(pop_labels))) | |
| print("#dims={}".format(",".join(str(d-1) for d in ns))) | |
| print("#folded={}".format(args.folded)) | |
| print("#ploidy={}".format(args.ploidy)) | |
| ## loop on windows | |
| logger.info("Tabulating SFS ...") | |
| for window in window_iter: | |
| ## initialize SFS | |
| sfs = np.zeros(ns, dtype = np.uint64) | |
| if window is not None: | |
| region = "{}:{}-{}".format(window.chrom, window.start+1, window.end) | |
| else: | |
| region = None | |
| logger.info("\t... region: {}".format(region)) | |
| ## loop on sites | |
| for site in vcf(region = region): | |
| # biallelic sites only | |
| if len(site.ALT) > 1: | |
| continue | |
| # skip if no ancestral allele and we want folded spectrum | |
| if args.ref_is_ancestral: | |
| anc = site.REF | |
| elif args.alt_is_ancestral: | |
| anc = site.ALT[0] | |
| else: | |
| try: | |
| anc = site.INFO.get("AA") | |
| #logger.info("{}:{} -- {}".format(site.CHROM, site.POS, anc)) | |
| except: | |
| continue | |
| # count alleles in each population | |
| idx = [] | |
| for ii,pop in enumerate(pop_labels): | |
| geno = site.gt_types[ sample_idx[pop] ].astype(float) | |
| geno[ geno == 3 ] = np.nan | |
| #print(geno, file = sys.stderr) | |
| geno[ np.logical_and(sample_ploidy[pop] < 2, geno == 1) ] = np.nan | |
| if not site.REF == anc: | |
| geno = sample_ploidy[pop] - geno | |
| geno = (geno*(sample_ploidy[pop]/2)).round() | |
| nder = int(np.nansum(geno)) | |
| nonmiss = np.sum(~np.isnan(geno)) | |
| nchrom = ns[ii]-1 | |
| denom = np.sum( sample_ploidy[pop][ ~np.isnan(geno) ] ) | |
| if denom: | |
| frac = nder/denom | |
| if args.folded: | |
| bin = int(min(frac, 1-frac)*nchrom) | |
| else: | |
| bin = int(frac*nchrom) | |
| else: | |
| bin = 0 | |
| #print(nonmiss, nder, denom, bin, "\n", file = sys.stderr) | |
| idx.append(bin) | |
| sfs[ tuple(idx) ] += 1 | |
| if args.length: | |
| ncalled = np.sum(sfs) | |
| nremain = args.length - ncalled | |
| sfs[ tuple([0]*len(pop_labels)) ] = nremain | |
| ## write sfs, row-wise like angsd | |
| #print("#L={}".format(bpspan)) | |
| #print("#nsites={}".format(kept)) | |
| print(*sfs.flatten()) | |
| logger.info("Done.\n") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment