Skip to content

Instantly share code, notes, and snippets.

@flashton2003
Created October 21, 2022 09:22
Show Gist options
  • Select an option

  • Save flashton2003/50d645a60219c0e381874a1dd4355646 to your computer and use it in GitHub Desktop.

Select an option

Save flashton2003/50d645a60219c0e381874a1dd4355646 to your computer and use it in GitHub Desktop.
import os
import pprint
import dendropy as dpy
## maybe useful https://gist.github.com/jeetsukumaran/512075/16cfec3a433042287a3704beb24bc73033f757a6
def extract_subtree(tree, target_node, i, tree_handle, output_dir, country):
## so, because dendropy is a bit shit, need to copy the tree
## and get the node which we want to take as a sub-tree
## from the copied version.
tree2 = dpy.Tree(tree)
## what is the id of the target node (from the original tree)
target_label = target_node.annotations.values_as_dict()['label'][0].strip('"')
nd = None
## iterate through the copy until find the equivalent tree there.
for nd in tree2.postorder_node_iter():
if nd.is_internal():
nd_annot = nd.annotations.values_as_dict()
if 'label' in nd_annot:
## this is currently working as a hacky way to find the high posterior changes in distribution
## dont understand while children only seems to be len 1?
# print nd_annot['label'][0]
nd_label = nd_annot['label'][0].strip('"')
if nd_label == target_label:
break
## then make a subtree with the node from the copied tree
subtree = dpy.Tree(seed_node = nd)
output_handle = '.'.join(tree_handle.split('/')[-1].split('.')[0:-1]) + '.%s_subtree%s' % (country, i) + '.tree'
subtree.write(path = '%s/%s' % (output_dir, output_handle), schema = 'newick')
def check_outputdir(output_dir):
if not os.path.exists(output_dir):
os.system('mkdir %s' % output_dir)
def extract_subtrees(tree_handle, output_dir, country):
# this code:
# 1. parses the tree (nexus format)
# 2. for every internal node with a posterior probability of >= 0.5 that it represents a change in the phenotype distribition
# 3. and that has more than 5 descendents
# 4.
tree = dpy.Tree.get_from_path(tree_handle, 'nexus')
print('\t'.join(['subtree_id', 'posterior', 'num_leaves', 'identifying_strain', 'node_index']))
i = 0
for nd in tree.postorder_node_iter():
if nd.is_internal():
# do this, otherwise they assets of a class or something
nd_annot = nd.annotations.values_as_dict()
if 'label' in nd_annot:
## this is currently working as a hacky way to find the high posterior changes in distribution
## dont understand why children only seems to be len 1?
# print nd_annot['label'][0]
posterior = float(nd_annot['label'][0].strip('"').split('=')[-1])
# cutoff is 0.5
if posterior >= 0.5:
x = nd_annot['label'][0].split('|')[0].lstrip('"')
# print(x)
leaves = nd.leaf_nodes()
if len(leaves) >= 5:
i += 1
# take a copy of the tree (otherwise extracting the subtree removes it from the tree, doesn't just copy it)
tree2 = dpy.Tree(tree)
extract_subtree(tree2, nd, i, tree_handle, output_dir, country)
# sys.exit()
# print leaves[0].taxon
index = nd_annot['label'][0].strip('"').split('|')[0]
# print nd_annot
first_sample = str(leaves[0].taxon).strip('\'').split('{')[0]
print('\t'.join(map(str, ['%s_%s' % (country, i), posterior, len(leaves), first_sample, index])))
def get_posteriors(tree_handle, country, country_index_post, indices):
# we want to get hte posterior probabilities for each of the
# nodes in indices
tree = dpy.Tree.get_from_path(tree_handle, 'nexus')
for nd in tree.postorder_node_iter():
if nd.is_internal():
nd_annot = nd.annotations.values_as_dict()
if 'label' in nd_annot:
## this is currently working as a hacky way to find the high posterior changes in distribution
## dont understand why children only seems to be len 1?
# print nd_annot['label'][0]
posterior = float(nd_annot['label'][0].strip('"').split('=')[-1])
index = nd_annot['label'][0].strip('"').split('|')[0]
if index in indices:
country_index_post[country][index] = posterior
# print(country, index, posterior)
return country_index_post
def run_extract_subtrees(countries):
for country in countries:
# these will need to be valid paths on your machine
# sorry about the hardcoded paths, i know i'm a terrible human being!
tree_handle = f'/Users/flashton/Dropbox/mtb/all_tb_in_asia/phylo/results/2019.12.16/2019.12.10.all_l4_masked.{country}.treebreaker.nxs.tree'
output_dir = f'/Users/flashton/Dropbox/mtb/all_tb_in_asia/phylo/results/2019.12.16/{country}_subtrees_2'
check_outputdir(output_dir)
extract_subtrees(tree_handle, output_dir, country)
def run_get_posteriors(countries):
# run the get_posteriors function to get the posteriors for each node that was
# identified by at least one sample
# dict to hold the output
country_index_post = {}
# internal node indicies to look up
indices = {'index=2490', 'index=2511'}
# do this for each country
for country in countries:
tree_handle = f'/Users/flashton/Dropbox/mtb/all_tb_in_asia/phylo/results/2019.12.16/2019.12.10.all_l4_masked.{country}.treebreaker.nxs.tree'
# make a dict to hold the results {index:posterior} for each country
country_index_post[country] = {}
# get the {index:posterior} for this country analusis
country_index_post = get_posteriors(tree_handle, country, country_index_post, indices)
# print results
print('index', 'indo', 'thai', 'vn', 'seac', sep = '\t')
for index in indices:
print(index, country_index_post['indo'][index], country_index_post['thai'][index], country_index_post['vn'][index], country_index_post['seac'][index], sep = '\t')
countries = ['indo', 'thai', 'vn', 'seac']
# countries = ['indo']
if __name__ == '__main__':
run_extract_subtrees(countries)
# run_get_posteriors(countries)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment