Methods for DataFrames manipulation
Last active
June 24, 2022 23:23
-
-
Save pinei/89a4b5e55c0eb37f58f6a2facd131546 to your computer and use it in GitHub Desktop.
Databricks PySpark Utilities
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Creating a MD5 Hash identification column for a DataFrame based on a set of key fields | |
| from pyspark.sql.functions import md5, concat_ws | |
| def md5_hash(*cols): | |
| cols_map = map(lambda col_name: F.col(col_name), cols) | |
| return md5(concat_ws('||', *list(cols_map))) | |
| df = spark.createDataFrame([ | |
| ('Connor', 'John', 'Terminator', 1990), | |
| ('McFly', 'George', 'Back To The Future', 1985) | |
| ], ['Surname', 'Name', 'Movie', 'Year']) | |
| df.withColumn('CD_HASH', md5_hash('Surname', 'Name', 'Movie')).show() | |
| # --- | |
| +-------+------+------------------+----+--------------------+ | |
| |Surname| Name| Movie|Year| CD_HASH| | |
| +-------+------+------------------+----+--------------------+ | |
| | Connor| John| Terminator|1990|78e6d27afa22788d1...| | |
| | McFly|George|Back To The Future|1985|6774cc495e2cc90fc...| | |
| +-------+------+------------------+----+--------------------+ |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Creating a transformation to change all blank values to null | |
| from pyspark.sql import functions as F | |
| # Class for multiple generic transformation methods | |
| class T: | |
| @staticmethod | |
| def blank_to_null(input_df): | |
| return input_df.select( | |
| *[F.when(F.trim(input_df[x]) == '', None).otherwise(input_df[x]).alias(x) for x in input_df.columns]) | |
| df = spark.createDataFrame([ | |
| ('Connor', 'John', ' ', 1990), | |
| ('McFly', '', 'Back To The Future', 1985) | |
| ], ['Surname', 'Name', 'Movie', 'Year']) | |
| df.transform(T.blank_to_null).show() | |
| # --- | |
| +-------+----+------------------+----+ | |
| |Surname|Name| Movie|Year| | |
| +-------+----+------------------+----+ | |
| | Connor|John| null|1990| | |
| | McFly|null|Back To The Future|1985| | |
| +-------+----+------------------+----+ |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Creating a translation to a column based on a dictionary | |
| # You can choose a default value if the key was not found | |
| from pyspark.sql import functions as F | |
| from pyspark.sql.dataframe import DataFrame, Column | |
| # Class for multiple generic transformation methods | |
| class T: | |
| @staticmethod | |
| def translate(column, mapping, default = None): | |
| # Returns a customized function for the transformation | |
| def custom(input_df): | |
| if (not isinstance(column, str)): | |
| raise TypeError('column should be str') | |
| if (not isinstance(mapping, dict)): | |
| raise TypeError('mapping should be a dict') | |
| if (default is not None) and (not isinstance(default, Column)): | |
| raise TypeError('default should be a Column or None (not used)') | |
| condition = F.when(F.lit(False) == True, None) # always false initial condition | |
| for key, value in mapping.items(): | |
| key = F.lit(key) if not isinstance(key, Column) else key | |
| value = F.lit(value) if not isinstance(value, Column) else value | |
| condition = condition.when(key == F.col(column), value) | |
| # Default value / keep original | |
| condition = condition.otherwise( | |
| default if default is not None else F.col(column) | |
| ) | |
| return input_df.withColumn(column, condition) | |
| return custom | |
| df = spark.createDataFrame([ | |
| ('Cristiano Ronaldo', 'PT'), | |
| ('Neymar', 'BR'), | |
| ('Messi', 'AR'), | |
| ('Kylian', 'FR') | |
| ], ['Player', 'Country']) | |
| translation = { | |
| 'PT': 'Portugal', | |
| 'AR': 'Argentina', | |
| 'BR': 'Brazil' | |
| } | |
| df.transform(T.translate('Country', translation, F.lit('Unknown'))).show() | |
| df.transform(T.translate('Country', translation)).show() | |
| # --- | |
| +-----------------+---------+ | |
| | Player| Country| | |
| +-----------------+---------+ | |
| |Cristiano Ronaldo| Portugal| | |
| | Neymar| Brazil| | |
| | Messi|Argentina| | |
| | Kylian| Unknown| | |
| +-----------------+---------+ | |
| +-----------------+---------+ | |
| | Player| Country| | |
| +-----------------+---------+ | |
| |Cristiano Ronaldo| Portugal| | |
| | Neymar| Brazil| | |
| | Messi|Argentina| | |
| | Kylian| FR| | |
| +-----------------+---------+ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment