Skip to content

Instantly share code, notes, and snippets.

@alienzj
Created September 10, 2024 06:38
Show Gist options
  • Select an option

  • Save alienzj/bcb06db67d910ef152cd8e19ea7bfd68 to your computer and use it in GitHub Desktop.

Select an option

Save alienzj/bcb06db67d910ef152cd8e19ea7bfd68 to your computer and use it in GitHub Desktop.
Pandas and Polars
def extract_norm_profile_pandas_version(profile_ori, column_name, profiler, samples_index, outdir):
'''
https://ictv.global/
International Committee on Taxonomy of Viruses: ICTV, Official Taxonomic Resources
https://gtdb.ecogenomic.org/
GENOME TAXONOMY DATABASE
596,859 genomes
Release 09-RS220 (24th April 2024)
'''
samples_list = list(profile_ori.columns)[samples_index:]
# profiler == "mpa4", GTDB format
taxa_levels = ["superkingdom", "phylum", "class", "order", "family", "genus", "species"]
taxa_dict = {
"superkingdom": "d__",
"phylum": ";p__",
"class": ";c__",
"order": ";o__",
"family": ";f__",
"genus": ";g__",
"species": ";s__"
}
abun_dict = {}
if profiler == "phanta_pmag":
taxa_levels = ["superkingdom", "phylum", "class", "order", "family", "genus", "species"]
taxa_dict = {
"superkingdom": "|superkingdom_",
"phylum": "|phylum_",
"class": "|class_",
"order": "|order_",
"family": "|family_",
"genus": "|genus_",
"species": "|species_"
}
if profiler == "phanta_vmag":
taxa_levels = ["superkingdom", "clade", "kingdom", "phylum", "class", "order", "family", "genus", "species"]
taxa_dict = {
"superkingdom": "|superkingdom_",
"clade": "|clade_",
"kingdom": "|kingdom_",
"phylum": "|phylum_",
"class": "|class_",
"order": "|order_",
"family": "|family_",
"genus": "|genus_",
"species": "|species_"
}
for taxa_level in taxa_levels:
abun_df = profile_ori.copy()
abun_df[taxa_level] = abun_df.apply(lambda x: extract_taxa_name(x[column_name], taxa_dict[taxa_level]), axis=1)
abun_df = abun_df.loc[:, [taxa_level] + samples_list]
abun_df = abun_df.groupby(taxa_level).sum()
abun_df = abun_df.apply(lambda x: x/sum(x), axis=0).reset_index().fillna(0)
samples_list_keep = []
for i in samples_list:
if sum(abun_df[i]) > 0:
samples_list_keep.append(i)
abun_df = abun_df.loc[:, [taxa_level] + samples_list_keep]
output_file = os.path.join(outdir, f"{profiler}.{taxa_level}.parquet")
abun_df.to_parquet(output_file)
output_file = os.path.join(outdir, f"{profiler}.{taxa_level}.tsv")
abun_df.to_csv(output_file, sep="\t", index=False)
abun_dict[taxa_level] = abun_df.shape
return abun_dict
def extract_norm_profile_polars_version(profile_ori, column_name, profiler, samples_index, outdir):
'''
https://ictv.global/
International Committee on Taxonomy of Viruses: ICTV, Official Taxonomic Resources
https://gtdb.ecogenomic.org/
GENOME TAXONOMY DATABASE
596,859 genomes
Release 09-RS220 (24th April 2024)
'''
samples_list = profile_ori.columns[samples_index:]
# profiler == "mpa4", GTDB format
taxa_levels = ["superkingdom", "phylum", "class", "order", "family", "genus", "species"]
taxa_dict = {
"superkingdom": "d__",
"phylum": ";p__",
"class": ";c__",
"order": ";o__",
"family": ";f__",
"genus": ";g__",
"species": ";s__"
}
abun_dict = {}
if profiler == "phanta_pmag":
taxa_levels = ["superkingdom", "phylum", "class", "order", "family", "genus", "species"]
taxa_dict = {
"superkingdom": "|superkingdom_",
"phylum": "|phylum_",
"class": "|class_",
"order": "|order_",
"family": "|family_",
"genus": "|genus_",
"species": "|species_"
}
if profiler == "phanta_vmag":
taxa_levels = ["superkingdom", "clade", "kingdom", "phylum", "class", "order", "family", "genus", "species"]
taxa_dict = {
"superkingdom": "|superkingdom_",
"clade": "|clade_",
"kingdom": "|kingdom_",
"phylum": "|phylum_",
"class": "|class_",
"order": "|order_",
"family": "|family_",
"genus": "|genus_",
"species": "|species_"
}
for taxa_level in taxa_levels:
abun_df = profile_ori.with_columns(
pl.col(column_name)\
.map_elements(lambda x: extract_taxa_name(x, taxa_dict[taxa_level]))\
.alias(taxa_level))
abun_df = abun_df.select(pl.col([taxa_level] + samples_list))
abun_df = abun_df.group_by(taxa_level).agg(pl.col(samples_list).sum())
samples_list_keep = []
for i in samples_list:
if sum(abun_df[i]) > 0:
samples_list_keep.append(i)
abun_df = abun_df.select(pl.col([taxa_level] + samples_list_keep))
abun_df = abun_df.with_columns(
pl.col(samples_list_keep) / pl.col(samples_list_keep).sum())
output_file = os.path.join(outdir, f"{profiler}.{taxa_level}.parquet")
abun_df.write_parquet(output_file)
output_file = os.path.join(outdir, f"{profiler}.{taxa_level}.tsv")
abun_df.write_csv(output_file, separator = "\t")
abun_dict[taxa_level] = abun_df.shape
return abun_dict
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment