Skip to content

Instantly share code, notes, and snippets.

@thegreekjester
Last active November 1, 2021 14:38
Show Gist options
  • Select an option

  • Save thegreekjester/b8ad105a4442950c77d4fb5b10f002e4 to your computer and use it in GitHub Desktop.

Select an option

Save thegreekjester/b8ad105a4442950c77d4fb5b10f002e4 to your computer and use it in GitHub Desktop.
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os
import pyspark
from itertools import product
experiment_list = ['20389491581']
output_dir = os.getcwd() + 'some path'
conf = pyspark.SparkConf()
conf = (conf.setMaster('local[*]')
.set('spark.executor.memory', '4G')
.set('spark.driver.memory', '45G')
.set('spark.driver.maxResultSize', '10G'))
sc = pyspark.SparkContext.getOrCreate(conf=conf)
sqlcontext = pyspark.SQLContext(sc)
def return_e3_dataframe(data_type="events"):
"""Input some info here"""
data_type = "/type=events" if data_type == "events" else "/type=decisions"
dataframes = []
for x in os.walk(output_dir + data_type):
for i in x:
if type(i) is list:
for j in i:
if str.endswith(j,'parquet'):
print(x[0] + '/' + j)
pf = sqlcontext.read.parquet(x[0] + '/' + j)
df = pf.toPandas()
dataframes.append(df)
df = None
pf = None
pdf = pd.concat(dataframes)
test_data = pdf if data_type=='/type=decisions' else pdf[pdf.experiments.apply(lambda tuple_list: any(check in experiment_list for check in [x.experiment_id for x in tuple_list if x.is_holdback==False]))]
return test_data
e3_df = return_e3_dataframe()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment