Setup Environmnet variables for Hadoop.
export HADOOP_VERSION=2.8.5
export HADOOP_HOME=${HOME}/hadoop-$HADOOP_VERSION
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export PATH=${HADOOP_HOME}/bin:$PATH
Download Hadoop files.
| DROP TABLE IF EXISTS target_tables; | |
| CREATE TEMP TABLE target_tables AS ( | |
| SELECT | |
| DISTINCT tbl AS target_table_id, | |
| sti.schema AS target_schema, | |
| sti.table AS target_table, | |
| sti.database AS cluster, | |
| query | |
| FROM stl_insert | |
| JOIN SVV_TABLE_INFO sti ON sti.table_id = tbl |
| SELECT | |
| c.table_catalog as cluster, | |
| c.table_schema as schema_name, | |
| c.table_name as table_name, | |
| c.column_name as col_name, | |
| c.data_type as col_type , | |
| pgcd.description as col_description, | |
| ordinal_position as col_sort_order | |
| FROM | |
| INFORMATION_SCHEMA.COLUMNS c |
| SELECT | |
| t.table_catalog as cluster, | |
| t.table_schema as schema_name, | |
| t.table_name as table_name, | |
| t.table_type as table_type, | |
| pt.tableowner as table_owner, | |
| pgtd.description as description | |
| FROM | |
| information_schema.tables as t | |
| INNER JOIN pg_catalog.pg_statio_all_tables as st on |
| def personality_udf(personality_mapping: dict): | |
| @udf(returnType=IntegerType()) | |
| def inner(cat): | |
| return personality_mapping.get(cat, -1) | |
| return inner | |
| def compute_gvector_udf(personality_mapping: dict): | |
| @udf(returnType=ArrayType(DoubleType())) | |
| def inner(ngvector, cgvector, suma): |
| # Install quinn>=0.3.1 | |
| from quinn.extensions.dataframe_ext import DataFrame | |
| def with_idx(id_col: str, output_col: str) -> Callable[[DataFrame], DataFrame]: | |
| def inner(df: DataFrame) -> DataFrame: | |
| window = Window.orderBy(id_col) | |
| unique_activity_ids = df \ | |
| .select(id_col).distinct() \ | |
| .withColumn(output_col, F.row_number().over(window)) |
| trait SparkWriter { | |
| def write(df: DataFrame, mode: WriteMode = Overwrite): Unit | |
| } | |
| class SparkJsonWriter(path: String, partitions: Int = 1) extends SparkWriter { | |
| def write(df: DataFrame, mode: WriteMode = Overwrite): Unit = { | |
| df | |
| .coalesce(partitions) | |
| .write | |
| .mode(mode) |
| object Filters { | |
| def filter(filters: Seq[Column])(df: DataFrame): DataFrame = { | |
| filters.foldLeft(df)((df, filter) => df.filter(filter)) | |
| } | |
| } | |
| trait SparkReader { | |
| protected def execute(reader: DataFrameReader): DataFrame | |
| def read(schema: Option[StructType] = None, filters: Seq[Column] = Seq.empty)(implicit sparkSession: SparkSession): DataFrame = { |
| class ActivityInsightsJob(activityReader: SparkReader, | |
| analyticsReader: SparkReader, | |
| insightsWriter: SparkWriter | |
| )(implicit val sparkSession: SparkSession) extends SparkTask { | |
| def run(): Unit = { | |
| val metricsDF = analyticsReader.read(Some(AnalyticsSchema)) | |
| .transform(Events.isActivityImpression) | |
| .transform(Events.isActivityView) | |
| .transform(Events.isBookmarked) |
| export SPARK_OPTS='--packages org.apache.hadoop:hadoop-aws:2.8.5 --conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider' | |
| pyspark |
Setup Environmnet variables for Hadoop.
export HADOOP_VERSION=2.8.5
export HADOOP_HOME=${HOME}/hadoop-$HADOOP_VERSION
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export PATH=${HADOOP_HOME}/bin:$PATH
Download Hadoop files.