Created
August 31, 2017 13:13
-
-
Save nickp60/3353a69f6ce2ea3644fc601fdad6ca94 to your computer and use it in GitHub Desktop.
plotMSA.py: making entropy plots for msa's (with rDNA annotations)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python | |
| """ | |
| Gist of how to make similar plots to the entropy plots in riboSeed from an MSA. | |
| This assumes you want to annotate rDNAs, but you probably don't | |
| care about those. Adjust as needed. | |
| """ | |
| import os | |
| import datetime | |
| import time | |
| import argparse | |
| import sys | |
| import shutil | |
| import math | |
| #from pyutilsnrw import utils3_5 | |
| sys.path.append(os.path.join('..', 'riboSeed')) | |
| from pyutilsnrw.utils3_5 import set_up_logging, check_installed_tools | |
| from riboSnag import calc_entropy_msa, annotate_msa_conensus, \ | |
| plot_scatter_with_anno | |
| def get_args(): # pragma: no cover | |
| """get the arguments as a main parser with subparsers | |
| for named required arguments and optional arguments | |
| """ | |
| parser = argparse.ArgumentParser(description="riboSnag lite. Use this " + | |
| "to generate the plots made in riboSnag", | |
| add_help=False) | |
| parser.add_argument("msa", help="msa in fasta format") | |
| requiredNamed = parser.add_argument_group('required named arguments') | |
| requiredNamed.add_argument("-o", "--output", | |
| help="output directory; default: %(default)s", | |
| default=os.getcwd(), | |
| type=str, dest="output") | |
| # had to make this faux "optional" parse so that the named required ones | |
| # above get listed first | |
| optional = parser.add_argument_group('optional arguments') | |
| optional.add_argument("-n", "--name", | |
| help="rename the contigs with this prefix" + | |
| # "default: %(default)s", | |
| "default: date (YYYYMMDD)", | |
| default=None, dest="name", | |
| action="store", type=str) | |
| optional.add_argument("-v", "--verbosity", | |
| dest='verbosity', action="store", | |
| default=2, type=int, | |
| help="1 = debug(), 2 = info(), 3 = warning(), " + | |
| "4 = error() and 5 = critical(); " + | |
| "default: %(default)s") | |
| optional.add_argument("--barrnap_exe", dest="barrnap_exe", | |
| action="store", default="barrnap", | |
| help="Path to barrnap executable; " + | |
| "default: %(default)s") | |
| optional.add_argument("--kingdom", dest="kingdom", | |
| action="store", default="bac", | |
| choices=["mito", "euk", "arc", "bac"], | |
| help="kingdom for barrnap; " + | |
| "default: %(default)s") | |
| optional.add_argument("-s", "--seq_name", dest='seq_name', | |
| action="store", | |
| help="name of genome; " | |
| "default: inferred from file name, as many cases " + | |
| "involve multiple contigs, etc, making inference " + | |
| "from record intractable;" + | |
| "default: %(default)s", | |
| default="plotMSA", | |
| type=str) | |
| # had to make this explicitly to call it a faux optional arg | |
| optional.add_argument("-h", "--help", | |
| action="help", default=argparse.SUPPRESS, | |
| help="Displays this help message") | |
| args = parser.parse_args() | |
| return args | |
| ###################### From riboSeed's riboSnag ################ | |
| def calc_Shannon_entropy(matrix): | |
| """ $j$ has entropy $H(j)$ such that | |
| $H(j) = -sum_{i=(A,C,T,G)} p_i(j) log p_i(j)$ | |
| """ | |
| entropies = [] | |
| for instance in matrix: | |
| unique = set(instance) | |
| proportions = {} | |
| for i in unique: | |
| proportions[i] = sum([x == i for x in instance]) / len(instance) | |
| entropy = -sum([prob * (math.log(prob, math.e)) for | |
| prob in proportions.values()]) | |
| entropies.append(entropy) | |
| return entropies | |
| def calc_entropy_msa(msa_path): | |
| """givn a path to an MSA in FASTA format, this gets the | |
| entropy of each position in batches, so as to not gobble memory | |
| return list | |
| """ | |
| batch_size = 1000 # read seequences in chunks this long | |
| lengths = [] | |
| with open(msa_path) as fh: | |
| msa_seqs = list(SeqIO.parse(fh, 'fasta')) | |
| seq_names = [] | |
| for rec in msa_seqs: | |
| lengths.append(len(rec)) | |
| seq_names.append(rec.id) | |
| if not all([i == lengths[0] for i in lengths]): | |
| raise ValueError("Sequences must all be the same length!") | |
| entropies = [] | |
| tseq = [] | |
| # save memory by reading in chunks | |
| for batch in range(0, (math.ceil(lengths[0] / batch_size))): | |
| # get all sequences into an array | |
| seq_array = [] | |
| for nseq, record in enumerate(msa_seqs): | |
| seq_array.append( | |
| [x for x in record.seq[(batch * batch_size): | |
| ((batch + 1) * batch_size)]]) | |
| # transpose | |
| tseq_array = list(map(list, zip(*seq_array))) | |
| tseq.extend(tseq_array) | |
| entropies.extend(calc_Shannon_entropy(tseq_array)) | |
| # check length of sequence is the same as length of the entropies | |
| assert len(entropies) == lengths[0] | |
| return (entropies, seq_names, tseq) | |
| def annotate_msa_conensus(tseq_array, seq_file, barrnap_exe, | |
| kingdom="bact", | |
| pattern='product=(.+?)$', | |
| countdashcov=True, # include -'s in coverage | |
| collapseNs=False, # include -'s in consensus | |
| excludedash=False, | |
| logger=None): | |
| """ returns annotations (as gfflist),the consensus sequence as a | |
| list[base, cov], and named coords as a list | |
| TODO: The 'next_best' thing fails is an N is most frequent. Defaults to a T | |
| """ | |
| assert logger is not None, "Must use logger" | |
| if excludedash: | |
| logger.warning("CAUTION: excludedash selected. There is a known " + | |
| "bug in the 'next_best' thing fails if an " + | |
| "N is most frequent. Defaults to a T") | |
| consensus = [] | |
| nseqs = len(tseq_array[0]) | |
| logger.info("calc coverage for each of the %i positions", len(tseq_array)) | |
| for position in tseq_array: | |
| if all([x == position[0] for x in position]): | |
| if position[0] == '-': | |
| if collapseNs: | |
| continue | |
| elif not countdashcov: | |
| consensus.append([position[0], 0]) | |
| else: | |
| consensus.append([position[0], nseqs]) | |
| else: | |
| max_count = 0 # starting count | |
| nextbest_count = 0 | |
| best_nuc = None | |
| nextbest_nuc = None | |
| for nuc in set(position): | |
| count = sum([nuc == z for z in position]) | |
| # if max count, swap with max to nextbest and update max | |
| if count > max_count: | |
| nextbest_count = max_count | |
| max_count = count # update count if better | |
| nextbest_nuc = best_nuc | |
| best_nuc = nuc | |
| else: | |
| pass | |
| # contol whether gaps are allowed in consensus | |
| if ( | |
| all([x != '-' for x in position]) and | |
| best_nuc == '-' and | |
| excludedash): | |
| # if we dont want n's, choose nextbest | |
| if nextbest_nuc is None: | |
| nextbest_nuc = 't' # I hate myself for this | |
| consensus.append([nextbest_nuc, nextbest_count]) | |
| elif best_nuc == '-' and not countdashcov: | |
| consensus.append([best_nuc, 0]) | |
| else: | |
| consensus.append([best_nuc, max_count]) | |
| # if any are '-', replace with n's for barrnap | |
| seq = str(''.join([x[0] for x in consensus])).replace('-', 'n') | |
| # annotate seq | |
| with open(seq_file, 'w') as output: | |
| SeqIO.write(SeqRecord( | |
| Seq(seq, IUPAC.IUPACAmbiguousDNA())), output, "fasta"), | |
| barrnap_cmd = "{0} --kingdom {1} {2}".format(barrnap_exe, | |
| kingdom, seq_file) | |
| barrnap_gff = subprocess.run(barrnap_cmd, | |
| shell=sys.platform != "win32", | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| check=True) | |
| results_list = [x.split('\t') for x in | |
| barrnap_gff.stdout.decode("utf-8").split("\n")] | |
| ### make [name, [start_coord, end_coord]] list | |
| named_coords = [] | |
| for i in results_list: | |
| if i[0].startswith("#") or not len(i) == 9: | |
| logger.debug("skipping gff line: %s", i) | |
| continue | |
| m = re.search(pattern, i[8]) | |
| if m: | |
| found = m.group(1) | |
| named_coords.append([found, [int(i[3]), int(i[4])]]) | |
| if len(named_coords) == 0: | |
| raise ValueError(str("Error extracting coords from barrnap gff line" + | |
| " %s with pattern %s!" % (str(i), pattern))) | |
| return (results_list, consensus, named_coords) | |
| def plot_scatter_with_anno(data, | |
| consensus_cov, | |
| anno_list, | |
| names=["Position", "Entropy"], | |
| title="Shannon Entropy by Position", | |
| output_prefix="entropy_plot.png"): | |
| """Given annotation coords [feature, [start, end]], | |
| consensus cov list ['base', coverage_depth], | |
| entropy values (list) and consensus sequence | |
| (same length for consensus_cov and data, no funny business), | |
| plot out the entropies for each position, | |
| plot the annotations, and return 0 | |
| """ | |
| if len(consensus_cov) != len(data): | |
| raise ValueError("data and consensus different lengths!") | |
| df = pd.DataFrame({names[0]: range(1, len(data) + 1), | |
| names[1]: data}) # columns=names) | |
| df_con = pd.DataFrame(consensus_cov, columns=["base", | |
| "depth"]) | |
| cov_max_depth = max(df_con['depth']) | |
| fig, (ax1, ax2) = plt.subplots(2, 1, sharex=True, | |
| gridspec_kw={'height_ratios': [4, 1]}) | |
| colors = ['#ff4c05', '#FFFB07', '#04FF08', '#06B9FF', '#6505FF', '#FF012F', | |
| '#ff4c05', '#FFFB07', '#04FF08', '#06B9FF', '#6505FF', '#FF012F'] | |
| ax1.set_title(title, y=1.08) | |
| xmin, xmax = 0, len(data) | |
| ymin, ymax = -0.1, (max(data) * 1.2) | |
| ax1.set_xlim([xmin, xmax]) | |
| ax1.set_ylim([ymin, ymax]) | |
| yjust = -.1 | |
| for index, anno in enumerate(anno_list): | |
| rect1 = patches.Rectangle( | |
| (anno[1][0], # starting x | |
| ymin), # starting y | |
| anno[1][1] - anno[1][0], # rel x end | |
| ymax - ymin, # rel y end | |
| facecolor=mpl.colors.ColorConverter().to_rgba( | |
| colors[index], alpha=0.2), | |
| edgecolor=mpl.colors.ColorConverter().to_rgba( | |
| colors[index], alpha=0.2)) | |
| rect2 = patches.Rectangle( | |
| (anno[1][0], # starting x | |
| 1), # starting y | |
| anno[1][1] - anno[1][0], # rel x end | |
| cov_max_depth, # dont -1 beacuse start at 1 | |
| facecolor=mpl.colors.ColorConverter().to_rgba( | |
| colors[index], alpha=0.2), | |
| edgecolor=mpl.colors.ColorConverter().to_rgba( | |
| colors[index], alpha=0.2)) | |
| ax1.add_patch(rect1) | |
| ax2.add_patch(rect2) | |
| ax1.text((anno[1][0] + anno[1][1]) / 2, # x location | |
| ymax - 0.48 - yjust, # y location | |
| anno[0][0:20], # text first 20 char | |
| ha='center', color='red', weight='bold', fontsize=10) | |
| yjust = yjust * - 1 | |
| ax1.scatter(x=df["Position"], y=df["Entropy"], | |
| marker='o', color='black', s=2) | |
| # add smoothing for kicks | |
| df["fit"] = savitzky_golay(df["Entropy"].values, 351, 3) # window size 51, polynomial order 3 | |
| ax1.scatter(x=df["Position"], y=df["fit"], color='red', s=1) | |
| # | |
| ax1.set_ylabel('Shannon Entropy') | |
| ax1.get_yaxis().set_label_coords(-.05, 0.5) | |
| ax2.set_xlim([xmin, xmax]) | |
| ax2.invert_yaxis() | |
| ax2.set_ylabel('Consensus Coverage') | |
| ax2.set_xlabel('Position (bp)') | |
| ax2.get_yaxis().set_label_coords(-.05, 0.5) | |
| # ax2.set_ylim([1, cov_max_depth + 1]) #, 1]) | |
| ax2.bar(df_con.index, df_con["depth"], | |
| width=1, color='darkgrey', linewidth=0, edgecolor='darkgrey') | |
| # ax2.step(df_con.index, df_con["depth"], | |
| # where='mid', color='darkgrey') | |
| for ax in [ax1, ax2]: | |
| ax.spines['right'].set_visible(False) | |
| # Only show ticks on the left and bottom spines | |
| ax1.spines['top'].set_visible(False) | |
| ax2.spines['bottom'].set_visible(False) | |
| ax.yaxis.set_ticks_position('left') | |
| ax2.xaxis.set_ticks_position('bottom') | |
| ax1.xaxis.set_ticks_position('top') | |
| ax1.tick_params(axis='y', colors='dimgrey') | |
| ax2.tick_params(axis='y', colors='dimgrey') | |
| ax1.tick_params(axis='x', colors='dimgrey') | |
| ax2.tick_params(axis='x', colors='dimgrey') | |
| ax1.yaxis.label.set_color('black') | |
| ax2.yaxis.label.set_color('black') | |
| ax1.xaxis.label.set_color('black') | |
| ax2.xaxis.label.set_color('black') | |
| plt.tight_layout() | |
| fig.subplots_adjust(hspace=0) | |
| fig.set_size_inches(12, 7.5) | |
| fig.savefig(str(output_prefix + '.png'), dpi=(200)) | |
| fig.savefig(str(output_prefix + '.pdf'), dpi=(200)) | |
| return 0 | |
| if __name__ == "__main__": # pragma: no cover | |
| args = get_args() | |
| output_root = os.path.abspath(os.path.expanduser(args.output)) | |
| # Create output directory only if it does not exist | |
| try: | |
| os.makedirs(args.output) | |
| except FileExistsError: | |
| # '#' is needed in case streaming output eventually | |
| print("#Selected output directory %s exists" % | |
| args.output) | |
| print("exiting") | |
| sys.exit(1) | |
| log_path = os.path.join(output_root, | |
| str("{0}_makeMSA_log.txt".format( | |
| time.strftime("%Y%m%d%H%M")))) | |
| logger = set_up_logging(verbosity=args.verbosity, | |
| outfile=log_path, | |
| name=__name__) | |
| logger.debug("Usage:\n{0}\n".format(str(" ".join([x for x in sys.argv])))) | |
| logger.debug("All settings used:") | |
| for k, v in sorted(vars(args).items()): | |
| logger.debug("%s: %s", k, v) | |
| date = str(datetime.datetime.now().strftime('%Y%m%d')) | |
| # test whether executables are there | |
| executables = [args.barrnap_exe] | |
| test_ex = [check_installed_tools(x, logger=logger) for x in executables] | |
| if all(test_ex): | |
| logger.debug("All needed system executables found!") | |
| logger.debug(str([shutil.which(i) for i in executables])) | |
| # make MSA and calculate entropy | |
| seq_entropy, names, tseq = calc_entropy_msa(args.msa) | |
| gff, consensus_cov, annos = annotate_msa_conensus( | |
| tseq_array=tseq, | |
| pattern='product=(.+?)$', | |
| seq_file=os.path.join( | |
| output_root, | |
| "test_consensus.fasta"), | |
| barrnap_exe=args.barrnap_exe, | |
| countdashcov=False, # include -'s in coverage | |
| collapseNs=False, # include -'s in consensus | |
| excludedash=False, | |
| kingdom=args.kingdom, | |
| logger=logger) | |
| label_name = args.seq_name | |
| title = str("Shannon Entropy by Position\n" + | |
| label_name) | |
| return_code = plot_scatter_with_anno( | |
| data=seq_entropy, | |
| consensus_cov=consensus_cov, | |
| names=["Position", "Entropy"], | |
| title=title, | |
| anno_list=annos, | |
| output_prefix=os.path.join( | |
| output_root, | |
| "entropy_plot")) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment