Last active
November 1, 2021 14:38
-
-
Save thegreekjester/b8ad105a4442950c77d4fb5b10f002e4 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import numpy as np | |
| import pandas as pd | |
| import matplotlib.pyplot as plt | |
| import os | |
| import pyspark | |
| from itertools import product | |
| experiment_list = ['20389491581'] | |
| output_dir = os.getcwd() + 'some path' | |
| conf = pyspark.SparkConf() | |
| conf = (conf.setMaster('local[*]') | |
| .set('spark.executor.memory', '4G') | |
| .set('spark.driver.memory', '45G') | |
| .set('spark.driver.maxResultSize', '10G')) | |
| sc = pyspark.SparkContext.getOrCreate(conf=conf) | |
| sqlcontext = pyspark.SQLContext(sc) | |
| def return_e3_dataframe(data_type="events"): | |
| """Input some info here""" | |
| data_type = "/type=events" if data_type == "events" else "/type=decisions" | |
| dataframes = [] | |
| for x in os.walk(output_dir + data_type): | |
| for i in x: | |
| if type(i) is list: | |
| for j in i: | |
| if str.endswith(j,'parquet'): | |
| print(x[0] + '/' + j) | |
| pf = sqlcontext.read.parquet(x[0] + '/' + j) | |
| df = pf.toPandas() | |
| dataframes.append(df) | |
| df = None | |
| pf = None | |
| pdf = pd.concat(dataframes) | |
| test_data = pdf if data_type=='/type=decisions' else pdf[pdf.experiments.apply(lambda tuple_list: any(check in experiment_list for check in [x.experiment_id for x in tuple_list if x.is_holdback==False]))] | |
| return test_data | |
| e3_df = return_e3_dataframe() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment