Module skm_pyutils.merge
This module holds routine for merging outputs files and folders.
Expand source code
"""This module holds routine for merging outputs files and folders."""
import argparse
import os
import re
import shutil
import warnings
import numpy as np
from skm_pyutils.path import get_all_files_in_dir
def merge_files(in_dir, all_result_ext=None):
"""
Merge all files with the given extension recursively from in_dir.
Parameters
----------
in_dir : str
The path to where to start merging from.
all_result_ext : str, optional
The extension to look for, by default None, which takes all
Returns
-------
None
"""
all_file_loc = os.path.join(in_dir, "all_results_merged")
os.makedirs(all_file_loc, exist_ok=True)
print("Copying all results into {}".format(all_file_loc))
dirs = [
os.path.join(in_dir, o)
for o in os.listdir(in_dir)
if os.path.isdir(os.path.join(in_dir, o)) and o != "all_results_merged"
]
for d in dirs:
name = d[len(in_dir) :]
name = "--".join(name.split(os.sep))
# print("Copying contents of {}".format(d))
all_files = get_all_files_in_dir(
d, ext=all_result_ext, recursive=True, return_absolute=True
)
if all_result_ext is None:
all_files = [
f
for f in all_files
if os.path.splitext(f)[1]
in [".png", ".jpg", ".svg", ".png", ".gif", ".tiff"]
]
all_names = [
"--".join(f[len(in_dir + os.sep) :].split(os.sep)) for f in all_files
]
for f, o_name in zip(all_files, all_names):
out_name = os.path.join(all_file_loc, o_name)
shutil.copy(f, out_name)
def csv_merge(in_dir, keep_headers=True, insert_newline=True, stats=True, delim=","):
"""
Merge all csv files recursively from in_dir into one file.
Parameters
----------
in_dir : str
The directory to start the merging from.
keep_headers : bool, optional
Keep the headers in the csv file, by default True
insert_newline : bool, optional
Write things on new lines, by default True
stats : bool, optional
Do average and std of numerical values, by default True
delim : str, optional
What delimiter to use, by default ","
Returns
-------
None
"""
data_start_col = 2
csv_files = get_all_files_in_dir(in_dir, ext="csv", recursive=True)
try:
o_name = os.path.join(in_dir, f"merge--{os.path.basename(in_dir)}.csv")
except BaseException:
o_name = os.path.join(in_dir, f"merge.csv")
print("Merging csv results into {}".format(o_name))
if os.path.isfile(o_name):
csv_files = csv_files[1:]
with open(o_name, "w") as output:
for i, f in enumerate(csv_files):
# print("Merging {}".format(f))
with open(f, "r") as open_file:
file_data = open_file.read()
lines = file_data.split("\n")
if keep_headers or (i == 0):
for i, line in enumerate(lines):
output.write(line)
if i != len(lines) - 1:
output.write("\n")
else:
for i, line in enumerate(lines[1:]):
output.write(line)
if i != len(lines) - 2:
output.write("\n")
if stats:
file_data = re.sub('"[^"]+"', "NAN", file_data)
lines = file_data.split("\n")[:-1]
split_up = []
for line in lines[1:]:
if line != "":
split_up.append(line.split(",")[data_start_col:])
data = np.zeros(shape=(len(split_up), len(split_up[0])))
for i, row in enumerate(split_up):
for j, val in enumerate(row):
try:
to_write = float(val.strip())
except BaseException:
to_write = np.nan
data[i, j] = to_write
if np.sum(data) == 0:
# print("No data to average in merge")
# Added pass to make code valid
pass
else:
check = data[-1]
if np.sum(check) == 0:
raise RuntimeError(
"Excel sheet to merge has trailing blank lines"
)
with warnings.catch_warnings():
warnings.filterwarnings(
action="ignore", message="Mean of empty slice"
)
warnings.filterwarnings(
action="ignore",
message="Degrees of freedom <= 0 for slice.",
)
avg = np.nanmean(data, axis=0)
std = np.nanstd(data, axis=0)
avg_str = (
"Average," + "," + ",".join(str(val) for val in avg) + "\n"
)
std_str = (
"Std," + "," + ",".join(str(val) for val in std) + "\n"
)
output.write(avg_str)
output.write(std_str)
if insert_newline:
output.write("\n")
def cli():
"""Command line interface."""
parser = argparse.ArgumentParser(description="Command line arguments")
parser.add_argument("directory", type=str, help="The directory to merge in")
parser.add_argument(
"--do_csv", "-n", action="store_true", help="should merge csv files"
)
parser.add_argument(
"--do_images", "-i", action="store_true", help="should merge images"
)
parser.add_argument(
"--image_extension",
"-e",
type=str,
default=None,
help="the image extension to look for (without .)",
)
parsed, unparsed = parser.parse_known_args()
if len(unparsed) > 0:
raise ValueError("Unexpected arguments {}".format(unparsed))
if parsed.do_csv:
print("----------CSV MERGE-----------")
csv_merge(parsed.directory)
if parsed.do_images:
print("----------IMAGE MERGE-----------")
merge_files(parsed.directory, all_result_ext=parsed.image_extension)
if __name__ == "__main__":
cli()
Functions
def cli()
-
Command line interface.
Expand source code
def cli(): """Command line interface.""" parser = argparse.ArgumentParser(description="Command line arguments") parser.add_argument("directory", type=str, help="The directory to merge in") parser.add_argument( "--do_csv", "-n", action="store_true", help="should merge csv files" ) parser.add_argument( "--do_images", "-i", action="store_true", help="should merge images" ) parser.add_argument( "--image_extension", "-e", type=str, default=None, help="the image extension to look for (without .)", ) parsed, unparsed = parser.parse_known_args() if len(unparsed) > 0: raise ValueError("Unexpected arguments {}".format(unparsed)) if parsed.do_csv: print("----------CSV MERGE-----------") csv_merge(parsed.directory) if parsed.do_images: print("----------IMAGE MERGE-----------") merge_files(parsed.directory, all_result_ext=parsed.image_extension)
def csv_merge(in_dir, keep_headers=True, insert_newline=True, stats=True, delim=',')
-
Merge all csv files recursively from in_dir into one file.
Parameters
in_dir
:str
- The directory to start the merging from.
keep_headers
:bool
, optional- Keep the headers in the csv file, by default True
insert_newline
:bool
, optional- Write things on new lines, by default True
stats
:bool
, optional- Do average and std of numerical values, by default True
delim
:str
, optional- What delimiter to use, by default ","
Returns
None
Expand source code
def csv_merge(in_dir, keep_headers=True, insert_newline=True, stats=True, delim=","): """ Merge all csv files recursively from in_dir into one file. Parameters ---------- in_dir : str The directory to start the merging from. keep_headers : bool, optional Keep the headers in the csv file, by default True insert_newline : bool, optional Write things on new lines, by default True stats : bool, optional Do average and std of numerical values, by default True delim : str, optional What delimiter to use, by default "," Returns ------- None """ data_start_col = 2 csv_files = get_all_files_in_dir(in_dir, ext="csv", recursive=True) try: o_name = os.path.join(in_dir, f"merge--{os.path.basename(in_dir)}.csv") except BaseException: o_name = os.path.join(in_dir, f"merge.csv") print("Merging csv results into {}".format(o_name)) if os.path.isfile(o_name): csv_files = csv_files[1:] with open(o_name, "w") as output: for i, f in enumerate(csv_files): # print("Merging {}".format(f)) with open(f, "r") as open_file: file_data = open_file.read() lines = file_data.split("\n") if keep_headers or (i == 0): for i, line in enumerate(lines): output.write(line) if i != len(lines) - 1: output.write("\n") else: for i, line in enumerate(lines[1:]): output.write(line) if i != len(lines) - 2: output.write("\n") if stats: file_data = re.sub('"[^"]+"', "NAN", file_data) lines = file_data.split("\n")[:-1] split_up = [] for line in lines[1:]: if line != "": split_up.append(line.split(",")[data_start_col:]) data = np.zeros(shape=(len(split_up), len(split_up[0]))) for i, row in enumerate(split_up): for j, val in enumerate(row): try: to_write = float(val.strip()) except BaseException: to_write = np.nan data[i, j] = to_write if np.sum(data) == 0: # print("No data to average in merge") # Added pass to make code valid pass else: check = data[-1] if np.sum(check) == 0: raise RuntimeError( "Excel sheet to merge has trailing blank lines" ) with warnings.catch_warnings(): warnings.filterwarnings( action="ignore", message="Mean of empty slice" ) warnings.filterwarnings( action="ignore", message="Degrees of freedom <= 0 for slice.", ) avg = np.nanmean(data, axis=0) std = np.nanstd(data, axis=0) avg_str = ( "Average," + "," + ",".join(str(val) for val in avg) + "\n" ) std_str = ( "Std," + "," + ",".join(str(val) for val in std) + "\n" ) output.write(avg_str) output.write(std_str) if insert_newline: output.write("\n")
def merge_files(in_dir, all_result_ext=None)
-
Merge all files with the given extension recursively from in_dir.
Parameters
in_dir
:str
- The path to where to start merging from.
all_result_ext
:str
, optional- The extension to look for, by default None, which takes all
Returns
None
Expand source code
def merge_files(in_dir, all_result_ext=None): """ Merge all files with the given extension recursively from in_dir. Parameters ---------- in_dir : str The path to where to start merging from. all_result_ext : str, optional The extension to look for, by default None, which takes all Returns ------- None """ all_file_loc = os.path.join(in_dir, "all_results_merged") os.makedirs(all_file_loc, exist_ok=True) print("Copying all results into {}".format(all_file_loc)) dirs = [ os.path.join(in_dir, o) for o in os.listdir(in_dir) if os.path.isdir(os.path.join(in_dir, o)) and o != "all_results_merged" ] for d in dirs: name = d[len(in_dir) :] name = "--".join(name.split(os.sep)) # print("Copying contents of {}".format(d)) all_files = get_all_files_in_dir( d, ext=all_result_ext, recursive=True, return_absolute=True ) if all_result_ext is None: all_files = [ f for f in all_files if os.path.splitext(f)[1] in [".png", ".jpg", ".svg", ".png", ".gif", ".tiff"] ] all_names = [ "--".join(f[len(in_dir + os.sep) :].split(os.sep)) for f in all_files ] for f, o_name in zip(all_files, all_names): out_name = os.path.join(all_file_loc, o_name) shutil.copy(f, out_name)