Skip to content

Instantly share code, notes, and snippets.

@nickp60
Created August 31, 2017 13:13
Show Gist options
  • Select an option

  • Save nickp60/3353a69f6ce2ea3644fc601fdad6ca94 to your computer and use it in GitHub Desktop.

Select an option

Save nickp60/3353a69f6ce2ea3644fc601fdad6ca94 to your computer and use it in GitHub Desktop.
plotMSA.py: making entropy plots for msa's (with rDNA annotations)
#!/usr/bin/env python
"""
Gist of how to make similar plots to the entropy plots in riboSeed from an MSA.
This assumes you want to annotate rDNAs, but you probably don't
care about those. Adjust as needed.
"""
import os
import datetime
import time
import argparse
import sys
import shutil
import math
#from pyutilsnrw import utils3_5
sys.path.append(os.path.join('..', 'riboSeed'))
from pyutilsnrw.utils3_5 import set_up_logging, check_installed_tools
from riboSnag import calc_entropy_msa, annotate_msa_conensus, \
plot_scatter_with_anno
def get_args(): # pragma: no cover
"""get the arguments as a main parser with subparsers
for named required arguments and optional arguments
"""
parser = argparse.ArgumentParser(description="riboSnag lite. Use this " +
"to generate the plots made in riboSnag",
add_help=False)
parser.add_argument("msa", help="msa in fasta format")
requiredNamed = parser.add_argument_group('required named arguments')
requiredNamed.add_argument("-o", "--output",
help="output directory; default: %(default)s",
default=os.getcwd(),
type=str, dest="output")
# had to make this faux "optional" parse so that the named required ones
# above get listed first
optional = parser.add_argument_group('optional arguments')
optional.add_argument("-n", "--name",
help="rename the contigs with this prefix" +
# "default: %(default)s",
"default: date (YYYYMMDD)",
default=None, dest="name",
action="store", type=str)
optional.add_argument("-v", "--verbosity",
dest='verbosity', action="store",
default=2, type=int,
help="1 = debug(), 2 = info(), 3 = warning(), " +
"4 = error() and 5 = critical(); " +
"default: %(default)s")
optional.add_argument("--barrnap_exe", dest="barrnap_exe",
action="store", default="barrnap",
help="Path to barrnap executable; " +
"default: %(default)s")
optional.add_argument("--kingdom", dest="kingdom",
action="store", default="bac",
choices=["mito", "euk", "arc", "bac"],
help="kingdom for barrnap; " +
"default: %(default)s")
optional.add_argument("-s", "--seq_name", dest='seq_name',
action="store",
help="name of genome; "
"default: inferred from file name, as many cases " +
"involve multiple contigs, etc, making inference " +
"from record intractable;" +
"default: %(default)s",
default="plotMSA",
type=str)
# had to make this explicitly to call it a faux optional arg
optional.add_argument("-h", "--help",
action="help", default=argparse.SUPPRESS,
help="Displays this help message")
args = parser.parse_args()
return args
###################### From riboSeed's riboSnag ################
def calc_Shannon_entropy(matrix):
""" $j$ has entropy $H(j)$ such that
$H(j) = -sum_{i=(A,C,T,G)} p_i(j) log p_i(j)$
"""
entropies = []
for instance in matrix:
unique = set(instance)
proportions = {}
for i in unique:
proportions[i] = sum([x == i for x in instance]) / len(instance)
entropy = -sum([prob * (math.log(prob, math.e)) for
prob in proportions.values()])
entropies.append(entropy)
return entropies
def calc_entropy_msa(msa_path):
"""givn a path to an MSA in FASTA format, this gets the
entropy of each position in batches, so as to not gobble memory
return list
"""
batch_size = 1000 # read seequences in chunks this long
lengths = []
with open(msa_path) as fh:
msa_seqs = list(SeqIO.parse(fh, 'fasta'))
seq_names = []
for rec in msa_seqs:
lengths.append(len(rec))
seq_names.append(rec.id)
if not all([i == lengths[0] for i in lengths]):
raise ValueError("Sequences must all be the same length!")
entropies = []
tseq = []
# save memory by reading in chunks
for batch in range(0, (math.ceil(lengths[0] / batch_size))):
# get all sequences into an array
seq_array = []
for nseq, record in enumerate(msa_seqs):
seq_array.append(
[x for x in record.seq[(batch * batch_size):
((batch + 1) * batch_size)]])
# transpose
tseq_array = list(map(list, zip(*seq_array)))
tseq.extend(tseq_array)
entropies.extend(calc_Shannon_entropy(tseq_array))
# check length of sequence is the same as length of the entropies
assert len(entropies) == lengths[0]
return (entropies, seq_names, tseq)
def annotate_msa_conensus(tseq_array, seq_file, barrnap_exe,
kingdom="bact",
pattern='product=(.+?)$',
countdashcov=True, # include -'s in coverage
collapseNs=False, # include -'s in consensus
excludedash=False,
logger=None):
""" returns annotations (as gfflist),the consensus sequence as a
list[base, cov], and named coords as a list
TODO: The 'next_best' thing fails is an N is most frequent. Defaults to a T
"""
assert logger is not None, "Must use logger"
if excludedash:
logger.warning("CAUTION: excludedash selected. There is a known " +
"bug in the 'next_best' thing fails if an " +
"N is most frequent. Defaults to a T")
consensus = []
nseqs = len(tseq_array[0])
logger.info("calc coverage for each of the %i positions", len(tseq_array))
for position in tseq_array:
if all([x == position[0] for x in position]):
if position[0] == '-':
if collapseNs:
continue
elif not countdashcov:
consensus.append([position[0], 0])
else:
consensus.append([position[0], nseqs])
else:
max_count = 0 # starting count
nextbest_count = 0
best_nuc = None
nextbest_nuc = None
for nuc in set(position):
count = sum([nuc == z for z in position])
# if max count, swap with max to nextbest and update max
if count > max_count:
nextbest_count = max_count
max_count = count # update count if better
nextbest_nuc = best_nuc
best_nuc = nuc
else:
pass
# contol whether gaps are allowed in consensus
if (
all([x != '-' for x in position]) and
best_nuc == '-' and
excludedash):
# if we dont want n's, choose nextbest
if nextbest_nuc is None:
nextbest_nuc = 't' # I hate myself for this
consensus.append([nextbest_nuc, nextbest_count])
elif best_nuc == '-' and not countdashcov:
consensus.append([best_nuc, 0])
else:
consensus.append([best_nuc, max_count])
# if any are '-', replace with n's for barrnap
seq = str(''.join([x[0] for x in consensus])).replace('-', 'n')
# annotate seq
with open(seq_file, 'w') as output:
SeqIO.write(SeqRecord(
Seq(seq, IUPAC.IUPACAmbiguousDNA())), output, "fasta"),
barrnap_cmd = "{0} --kingdom {1} {2}".format(barrnap_exe,
kingdom, seq_file)
barrnap_gff = subprocess.run(barrnap_cmd,
shell=sys.platform != "win32",
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True)
results_list = [x.split('\t') for x in
barrnap_gff.stdout.decode("utf-8").split("\n")]
### make [name, [start_coord, end_coord]] list
named_coords = []
for i in results_list:
if i[0].startswith("#") or not len(i) == 9:
logger.debug("skipping gff line: %s", i)
continue
m = re.search(pattern, i[8])
if m:
found = m.group(1)
named_coords.append([found, [int(i[3]), int(i[4])]])
if len(named_coords) == 0:
raise ValueError(str("Error extracting coords from barrnap gff line" +
" %s with pattern %s!" % (str(i), pattern)))
return (results_list, consensus, named_coords)
def plot_scatter_with_anno(data,
consensus_cov,
anno_list,
names=["Position", "Entropy"],
title="Shannon Entropy by Position",
output_prefix="entropy_plot.png"):
"""Given annotation coords [feature, [start, end]],
consensus cov list ['base', coverage_depth],
entropy values (list) and consensus sequence
(same length for consensus_cov and data, no funny business),
plot out the entropies for each position,
plot the annotations, and return 0
"""
if len(consensus_cov) != len(data):
raise ValueError("data and consensus different lengths!")
df = pd.DataFrame({names[0]: range(1, len(data) + 1),
names[1]: data}) # columns=names)
df_con = pd.DataFrame(consensus_cov, columns=["base",
"depth"])
cov_max_depth = max(df_con['depth'])
fig, (ax1, ax2) = plt.subplots(2, 1, sharex=True,
gridspec_kw={'height_ratios': [4, 1]})
colors = ['#ff4c05', '#FFFB07', '#04FF08', '#06B9FF', '#6505FF', '#FF012F',
'#ff4c05', '#FFFB07', '#04FF08', '#06B9FF', '#6505FF', '#FF012F']
ax1.set_title(title, y=1.08)
xmin, xmax = 0, len(data)
ymin, ymax = -0.1, (max(data) * 1.2)
ax1.set_xlim([xmin, xmax])
ax1.set_ylim([ymin, ymax])
yjust = -.1
for index, anno in enumerate(anno_list):
rect1 = patches.Rectangle(
(anno[1][0], # starting x
ymin), # starting y
anno[1][1] - anno[1][0], # rel x end
ymax - ymin, # rel y end
facecolor=mpl.colors.ColorConverter().to_rgba(
colors[index], alpha=0.2),
edgecolor=mpl.colors.ColorConverter().to_rgba(
colors[index], alpha=0.2))
rect2 = patches.Rectangle(
(anno[1][0], # starting x
1), # starting y
anno[1][1] - anno[1][0], # rel x end
cov_max_depth, # dont -1 beacuse start at 1
facecolor=mpl.colors.ColorConverter().to_rgba(
colors[index], alpha=0.2),
edgecolor=mpl.colors.ColorConverter().to_rgba(
colors[index], alpha=0.2))
ax1.add_patch(rect1)
ax2.add_patch(rect2)
ax1.text((anno[1][0] + anno[1][1]) / 2, # x location
ymax - 0.48 - yjust, # y location
anno[0][0:20], # text first 20 char
ha='center', color='red', weight='bold', fontsize=10)
yjust = yjust * - 1
ax1.scatter(x=df["Position"], y=df["Entropy"],
marker='o', color='black', s=2)
# add smoothing for kicks
df["fit"] = savitzky_golay(df["Entropy"].values, 351, 3) # window size 51, polynomial order 3
ax1.scatter(x=df["Position"], y=df["fit"], color='red', s=1)
#
ax1.set_ylabel('Shannon Entropy')
ax1.get_yaxis().set_label_coords(-.05, 0.5)
ax2.set_xlim([xmin, xmax])
ax2.invert_yaxis()
ax2.set_ylabel('Consensus Coverage')
ax2.set_xlabel('Position (bp)')
ax2.get_yaxis().set_label_coords(-.05, 0.5)
# ax2.set_ylim([1, cov_max_depth + 1]) #, 1])
ax2.bar(df_con.index, df_con["depth"],
width=1, color='darkgrey', linewidth=0, edgecolor='darkgrey')
# ax2.step(df_con.index, df_con["depth"],
# where='mid', color='darkgrey')
for ax in [ax1, ax2]:
ax.spines['right'].set_visible(False)
# Only show ticks on the left and bottom spines
ax1.spines['top'].set_visible(False)
ax2.spines['bottom'].set_visible(False)
ax.yaxis.set_ticks_position('left')
ax2.xaxis.set_ticks_position('bottom')
ax1.xaxis.set_ticks_position('top')
ax1.tick_params(axis='y', colors='dimgrey')
ax2.tick_params(axis='y', colors='dimgrey')
ax1.tick_params(axis='x', colors='dimgrey')
ax2.tick_params(axis='x', colors='dimgrey')
ax1.yaxis.label.set_color('black')
ax2.yaxis.label.set_color('black')
ax1.xaxis.label.set_color('black')
ax2.xaxis.label.set_color('black')
plt.tight_layout()
fig.subplots_adjust(hspace=0)
fig.set_size_inches(12, 7.5)
fig.savefig(str(output_prefix + '.png'), dpi=(200))
fig.savefig(str(output_prefix + '.pdf'), dpi=(200))
return 0
if __name__ == "__main__": # pragma: no cover
args = get_args()
output_root = os.path.abspath(os.path.expanduser(args.output))
# Create output directory only if it does not exist
try:
os.makedirs(args.output)
except FileExistsError:
# '#' is needed in case streaming output eventually
print("#Selected output directory %s exists" %
args.output)
print("exiting")
sys.exit(1)
log_path = os.path.join(output_root,
str("{0}_makeMSA_log.txt".format(
time.strftime("%Y%m%d%H%M"))))
logger = set_up_logging(verbosity=args.verbosity,
outfile=log_path,
name=__name__)
logger.debug("Usage:\n{0}\n".format(str(" ".join([x for x in sys.argv]))))
logger.debug("All settings used:")
for k, v in sorted(vars(args).items()):
logger.debug("%s: %s", k, v)
date = str(datetime.datetime.now().strftime('%Y%m%d'))
# test whether executables are there
executables = [args.barrnap_exe]
test_ex = [check_installed_tools(x, logger=logger) for x in executables]
if all(test_ex):
logger.debug("All needed system executables found!")
logger.debug(str([shutil.which(i) for i in executables]))
# make MSA and calculate entropy
seq_entropy, names, tseq = calc_entropy_msa(args.msa)
gff, consensus_cov, annos = annotate_msa_conensus(
tseq_array=tseq,
pattern='product=(.+?)$',
seq_file=os.path.join(
output_root,
"test_consensus.fasta"),
barrnap_exe=args.barrnap_exe,
countdashcov=False, # include -'s in coverage
collapseNs=False, # include -'s in consensus
excludedash=False,
kingdom=args.kingdom,
logger=logger)
label_name = args.seq_name
title = str("Shannon Entropy by Position\n" +
label_name)
return_code = plot_scatter_with_anno(
data=seq_entropy,
consensus_cov=consensus_cov,
names=["Position", "Entropy"],
title=title,
anno_list=annos,
output_prefix=os.path.join(
output_root,
"entropy_plot"))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment