-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcopy_csv.py
More file actions
58 lines (46 loc) · 2.47 KB
/
copy_csv.py
File metadata and controls
58 lines (46 loc) · 2.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import shutil
from pathlib import Path
import pandas as pd
from lir.aggregation import Aggregation, ContextAwareDict, config_parser, partial, pop_field
from lir.data.io import search_path
from lir.util import check_type
class CopyCSV(Aggregation):
"""Aggregation that copies a CSV file from a source location to a target location, optionally selecting columns.
Attributes
----------
source_file (str): The path to the source CSV file that should be copied.
target_dir (str): The directory where the new CSV file will be saved. Given by the config_parser, meaning
it is not set by the user in the configuration.
columns (list[str]): A list of column names to copy from the source CSV. If empty, all columns will be copied.
new_file_name (str): The name of the new CSV file. If empty, the original file name will be used.
"""
def __init__(self, source_file: str, target_dir: str, columns: list[str], new_file_name: str | None):
self.source_file = search_path(Path(source_file))
self.target_dir = Path(target_dir)
# Ensure the target directory exists or create it
self.target_dir.mkdir(parents=True, exist_ok=True)
self.columns = columns
if new_file_name is None:
self.new_file_name = self.target_dir / self.source_file.name
else:
self.new_file_name = self.target_dir / new_file_name
def report(self, data) -> None:
"""Do nothing. Required by parent class."""
pass
def close(self):
"""Close the aggregation and perform any necessary cleanup.
This method has the logic of this class. It copies the CSV file from the source to the target location,
optionally selecting specific columns if they are specified.
"""
if self.columns:
df = pd.read_csv(self.source_file)
df[self.columns].to_csv(self.new_file_name, index=False)
else:
shutil.copy(self.source_file, self.new_file_name)
@config_parser()
def copy_csv(config: ContextAwareDict, output_dir: str) -> CopyCSV:
"""Parse the configuration for the CopyCSV aggregation and return an instance of it."""
source_file = pop_field(config, "file")
columns = pop_field(config, "columns", default=[], validate=partial(check_type, list))
new_file_name = pop_field(config, "new_file_name", required=False)
return CopyCSV(source_file, output_dir, columns=columns, new_file_name=new_file_name)