-
Notifications
You must be signed in to change notification settings - Fork 4
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
CDP-1507 : Modified spark configs to support delta lake, added delta …
…read write utils package
- Loading branch information
vineela03
committed
Dec 14, 2022
1 parent
0c2c5f0
commit b71216b
Showing
4 changed files
with
91 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,3 +4,4 @@ testfixtures==6.18.3 | |
PyYAML==6.0 | ||
importlib-resources==5.4.0 | ||
dotmap==1.3.25 | ||
delta-spark==2.0.0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
from delta.tables import DeltaTable | ||
import re | ||
|
||
|
||
def write_partitioned_data_delta(self, dataframe, partition_name, partition_dates_to_override, write_mode, | ||
target_base_path): | ||
return dataframe \ | ||
.write.partitionBy(partition_name) \ | ||
.format("delta") \ | ||
.option("mergeSchema", "true") \ | ||
.option("__partition_columns", partition_name) \ | ||
.option("replaceWhere", "{} in ({})".format(partition_name, ', '.join( | ||
map(lambda x: "'{}'".format(x), partition_dates_to_override)))) \ | ||
.mode(write_mode) \ | ||
.save(target_base_path) | ||
|
||
|
||
def write_nonpartitioned_data_delta(self, dataframe, write_mode, target_base_path): | ||
return dataframe \ | ||
.write.format("delta") \ | ||
.option("mergeSchema", "true") \ | ||
.mode(write_mode) \ | ||
.save(target_base_path) | ||
|
||
|
||
def compact_delta_table_partitions(self, sparkSession, base_path, partition_name, dates, num_files): | ||
return sparkSession.read \ | ||
.format("delta") \ | ||
.load(base_path) \ | ||
.where(f"{partition_name} in (', '.join(map(lambda x : "'{}'".format(x), dates)))") \ | ||
.repartition(num_files) \ | ||
.write \ | ||
.option("dataChange", "false") \ | ||
.format("delta") \ | ||
.mode("overwrite") \ | ||
.option("replaceWhere", "{} in ({})".format(partition_name, ', '.join(map(lambda x: "'{}'".format(x), dates)))) \ | ||
.save(base_path) | ||
|
||
|
||
def generate_delta_table(self, sparkSession, schema_name, table_name, s3location): | ||
self.spark.sql("create database if not exists {}".format(schema_name)) | ||
qualified_table_name = f"""{schema_name}.{table_name}""" | ||
DeltaTable.createIfNotExists(sparkSession) \ | ||
.tableName(qualified_table_name) \ | ||
.location(s3location) \ | ||
.execute() | ||
print(f"Delta table {qualified_table_name} generated") | ||
|
||
|
||
def extract_delta_info_from_path(self, paths): | ||
path = paths[0] | ||
path_reg_exp = """(.*)/(.*)=(.*)""" | ||
try: | ||
match_pattern_to_path = re.match(path_reg_exp, path) | ||
except: | ||
raise Exception("Can not read {}: base path can not be extracted".format(paths.mkString(","))) | ||
|
||
base_path = match_pattern_to_path.group(1) | ||
partition_name = match_pattern_to_path.group(2) | ||
dates = map(lambda path: re.match(path_reg_exp, path).group(3), paths) | ||
print(base_path) | ||
print(partition_name) | ||
print(dates) | ||
return (base_path, partition_name, dates) | ||
|
||
|
||
def read_delta_from_s3(self, sparkSession, paths): | ||
(base_path, partition_name, dates) = extract_delta_info_from_path(self, paths) | ||
df = sparkSession.read \ | ||
.format("delta") \ | ||
.load(base_path) \ | ||
.where("{} in ({})".format(partition_name, ', '.join(map(lambda x: "'{}'".format(x), dates)))) | ||
print(df.count()) | ||
return df | ||
|
||
|
||
def delta_read_from_basepath(self, sparkSession, base_path): | ||
return sparkSession.read \ | ||
.format("delta") \ | ||
.load(base_path) | ||
|
||
|
||
def read_delta_table(self, sparkSession, schema_name, table_name, partition_name, partition_dates): | ||
qualified_table_name = f"""{schema_name}.{table_name}""" | ||
return sparkSession.read.table(qualified_table_name) \ | ||
.where("{} in ({})".format(partition_name, ', '.join(map(lambda x: "'{}'".format(x), partition_dates)))) |