Skip to content

Instantly share code, notes, and snippets.

@andrewparkermorgan
Created November 19, 2025 13:13
Show Gist options
  • Select an option

  • Save andrewparkermorgan/e705fa4ccaa7c6f77efae2de93899a1a to your computer and use it in GitHub Desktop.

Select an option

Save andrewparkermorgan/e705fa4ccaa7c6f77efae2de93899a1a to your computer and use it in GitHub Desktop.
make site frequency spectrum from a VCF file
#! /usr/bin/env python
"""
vcf_to_sfs.py
Compute n-dimensional site frequency spectrum (SFS) from a VCF file.
"""
import os
import sys
import argparse
import logging
import math
import numpy as np
from collections import defaultdict, OrderedDict
from itertools import combinations, product
from cyvcf2 import VCF
from pybedtools import BedTool
parser = argparse.ArgumentParser(description = "Compute n-dimensional SFS from a VCF file.")
parser.add_argument( "-i","--infile", type = str,
default = "-",
help = "path to VCF file [default: stdin]" )
parser.add_argument( "-L","--length", type = int,
default = None,
help = "nominal length of region analyzed [default: guess from VCF]" )
parser.add_argument( "-p","--pop", type = argparse.FileType("r"),
required = True,
help = "file containing population labels; one sample per line, like 'sample_id pop_id'" )
parser.add_argument( "-n","--nonmissing", type = int,
default = 0,
help = "minimum number of nonmissing calls in each population [default: %(default)d]" )
parser.add_argument( "-P","--ploidy", type = int,
default = 0,
help = "enforce default ploidy, ignoring what is provided in --pop [default: %(default)d]" )
parser.add_argument( "--folded", action = "store_true",
default = False,
help = "treat spectrum as folded: alleles are coded as major-minor [default: unfolded]" )
parser.add_argument( "--ref-is-ancestral", action = "store_true",
default = False,
help = "assume REF allele is ancestral state [default: false]" )
parser.add_argument( "--alt-is-ancestral", action = "store_true",
default = False,
help = "assume first ALT allele is ancestral state [default: false]" )
parser.add_argument( "-w","--windows", type = argparse.FileType("rU"), nargs = "?",
help = "bed file of intervals in which to compute SFS [default: whole VCF in one swallow]" )
args = parser.parse_args()
## start logging trace
logging.basicConfig(level = logging.INFO)
logger = logging.getLogger()
## read population labels and ploidy
pops = defaultdict(list)
ploidy = defaultdict(int)
for line in args.pop:
if line.strip().startswith("#"):
continue
pieces = line.strip().split()
if len(pieces) < 2:
continue
sm = pieces.pop(0)
p = pieces.pop(0)
nchr = 2
if args.ploidy > 0:
nchr = args.ploidy
elif len(pieces):
nchr = int(pieces.pop(0))
pops[p].append(sm)
ploidy[sm] = nchr
## report populations smallest to biggest
pop_labels = sorted(pops.keys(), key = lambda k: len(pops[k]))
ssz = OrderedDict({ p: sum(ploidy[sm] for sm in pops[p]) for i,p in enumerate(pop_labels) })
if args.folded:
# TODO: fix for folded spectra
ns = [ int(math.ceil(args.ploidy/2))*(ssz[p])+1 for p in pop_labels ]
else:
ns = [ ssz[p]+1 for p in pop_labels ]
logger.info("Sample sizes: {}".format(ssz))
## connect to vcf file
vcf = VCF(args.infile, gts012 = True)
sample_idx = OrderedDict({ pop: [ vcf.samples.index(sm) for sm in pops[pop] ] for pop in pop_labels })
sample_ploidy = OrderedDict({ pop: np.array([ ploidy[sm] for sm in pops[pop] ], dtype = int) for pop in pop_labels })
if args.ref_is_ancestral and args.alt_is_ancestral:
logger.info("Both --ref-is-ancestral and --alt-is-ancestral were specified; assuming REF is the ancestral allele.")
## get iterator over genomic windows (or the 'trivial window' of the whole VCF file)
if args.windows:
windows = BedTool(args.windows)
window_iter = windows
else:
window_iter = [None]
## initialize output
print("#pops={}".format(",".join(pop_labels)))
print("#dims={}".format(",".join(str(d-1) for d in ns)))
print("#folded={}".format(args.folded))
print("#ploidy={}".format(args.ploidy))
## loop on windows
logger.info("Tabulating SFS ...")
for window in window_iter:
## initialize SFS
sfs = np.zeros(ns, dtype = np.uint64)
if window is not None:
region = "{}:{}-{}".format(window.chrom, window.start+1, window.end)
else:
region = None
logger.info("\t... region: {}".format(region))
## loop on sites
for site in vcf(region = region):
# biallelic sites only
if len(site.ALT) > 1:
continue
# skip if no ancestral allele and we want folded spectrum
if args.ref_is_ancestral:
anc = site.REF
elif args.alt_is_ancestral:
anc = site.ALT[0]
else:
try:
anc = site.INFO.get("AA")
#logger.info("{}:{} -- {}".format(site.CHROM, site.POS, anc))
except:
continue
# count alleles in each population
idx = []
for ii,pop in enumerate(pop_labels):
geno = site.gt_types[ sample_idx[pop] ].astype(float)
geno[ geno == 3 ] = np.nan
#print(geno, file = sys.stderr)
geno[ np.logical_and(sample_ploidy[pop] < 2, geno == 1) ] = np.nan
if not site.REF == anc:
geno = sample_ploidy[pop] - geno
geno = (geno*(sample_ploidy[pop]/2)).round()
nder = int(np.nansum(geno))
nonmiss = np.sum(~np.isnan(geno))
nchrom = ns[ii]-1
denom = np.sum( sample_ploidy[pop][ ~np.isnan(geno) ] )
if denom:
frac = nder/denom
if args.folded:
bin = int(min(frac, 1-frac)*nchrom)
else:
bin = int(frac*nchrom)
else:
bin = 0
#print(nonmiss, nder, denom, bin, "\n", file = sys.stderr)
idx.append(bin)
sfs[ tuple(idx) ] += 1
if args.length:
ncalled = np.sum(sfs)
nremain = args.length - ncalled
sfs[ tuple([0]*len(pop_labels)) ] = nremain
## write sfs, row-wise like angsd
#print("#L={}".format(bpspan))
#print("#nsites={}".format(kept))
print(*sfs.flatten())
logger.info("Done.\n")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment