Skip to content

Instantly share code, notes, and snippets.

@pinei
Last active June 24, 2022 23:23
Show Gist options
  • Select an option

  • Save pinei/89a4b5e55c0eb37f58f6a2facd131546 to your computer and use it in GitHub Desktop.

Select an option

Save pinei/89a4b5e55c0eb37f58f6a2facd131546 to your computer and use it in GitHub Desktop.
Databricks PySpark Utilities

Methods for DataFrames manipulation

# Creating a MD5 Hash identification column for a DataFrame based on a set of key fields
from pyspark.sql.functions import md5, concat_ws
def md5_hash(*cols):
cols_map = map(lambda col_name: F.col(col_name), cols)
return md5(concat_ws('||', *list(cols_map)))
df = spark.createDataFrame([
('Connor', 'John', 'Terminator', 1990),
('McFly', 'George', 'Back To The Future', 1985)
], ['Surname', 'Name', 'Movie', 'Year'])
df.withColumn('CD_HASH', md5_hash('Surname', 'Name', 'Movie')).show()
# ---
+-------+------+------------------+----+--------------------+
|Surname| Name| Movie|Year| CD_HASH|
+-------+------+------------------+----+--------------------+
| Connor| John| Terminator|1990|78e6d27afa22788d1...|
| McFly|George|Back To The Future|1985|6774cc495e2cc90fc...|
+-------+------+------------------+----+--------------------+
# Creating a transformation to change all blank values to null
from pyspark.sql import functions as F
# Class for multiple generic transformation methods
class T:
@staticmethod
def blank_to_null(input_df):
return input_df.select(
*[F.when(F.trim(input_df[x]) == '', None).otherwise(input_df[x]).alias(x) for x in input_df.columns])
df = spark.createDataFrame([
('Connor', 'John', ' ', 1990),
('McFly', '', 'Back To The Future', 1985)
], ['Surname', 'Name', 'Movie', 'Year'])
df.transform(T.blank_to_null).show()
# ---
+-------+----+------------------+----+
|Surname|Name| Movie|Year|
+-------+----+------------------+----+
| Connor|John| null|1990|
| McFly|null|Back To The Future|1985|
+-------+----+------------------+----+
# Creating a translation to a column based on a dictionary
# You can choose a default value if the key was not found
from pyspark.sql import functions as F
from pyspark.sql.dataframe import DataFrame, Column
# Class for multiple generic transformation methods
class T:
@staticmethod
def translate(column, mapping, default = None):
# Returns a customized function for the transformation
def custom(input_df):
if (not isinstance(column, str)):
raise TypeError('column should be str')
if (not isinstance(mapping, dict)):
raise TypeError('mapping should be a dict')
if (default is not None) and (not isinstance(default, Column)):
raise TypeError('default should be a Column or None (not used)')
condition = F.when(F.lit(False) == True, None) # always false initial condition
for key, value in mapping.items():
key = F.lit(key) if not isinstance(key, Column) else key
value = F.lit(value) if not isinstance(value, Column) else value
condition = condition.when(key == F.col(column), value)
# Default value / keep original
condition = condition.otherwise(
default if default is not None else F.col(column)
)
return input_df.withColumn(column, condition)
return custom
df = spark.createDataFrame([
('Cristiano Ronaldo', 'PT'),
('Neymar', 'BR'),
('Messi', 'AR'),
('Kylian', 'FR')
], ['Player', 'Country'])
translation = {
'PT': 'Portugal',
'AR': 'Argentina',
'BR': 'Brazil'
}
df.transform(T.translate('Country', translation, F.lit('Unknown'))).show()
df.transform(T.translate('Country', translation)).show()
# ---
+-----------------+---------+
| Player| Country|
+-----------------+---------+
|Cristiano Ronaldo| Portugal|
| Neymar| Brazil|
| Messi|Argentina|
| Kylian| Unknown|
+-----------------+---------+
+-----------------+---------+
| Player| Country|
+-----------------+---------+
|Cristiano Ronaldo| Portugal|
| Neymar| Brazil|
| Messi|Argentina|
| Kylian| FR|
+-----------------+---------+
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment