# -*- coding: utf-8 -*-
import getpass
import glob
import logging
import os
import pathlib
import sys
import warnings
from collections import OrderedDict
from dataclasses import asdict, dataclass
from pprint import pprint
import box
import dotenv
import ruamel
from ruamel.yaml import YAML
from ruamel.yaml.error import YAMLError
from cellpy.exceptions import ConfigFileNotRead, ConfigFileNotWritten
from cellpy.parameters import prms
from cellpy.parameters.internal_settings import OTHERPATHS
from cellpy.internals.core import OtherPath
DEFAULT_FILENAME_START = ".cellpy_prms_"
DEFAULT_FILENAME_END = ".conf"
USE_MY_DOCUMENTS = False
DEFAULT_FILENAME = DEFAULT_FILENAME_START + "default" + DEFAULT_FILENAME_END
# logger = logging.getLogger(__name__)
yaml = YAML()
[docs]def initialize():
"""initializes cellpy by reading the config file and the environment file"""
try:
_read_prm_file(_get_prm_file())
_load_env_file()
except FileNotFoundError:
warnings.warn("Could not find the config-file")
except UserWarning:
warnings.warn("Could not read the config-file")
def _load_env_file():
"""loads the environment file"""
env_file = pathlib.Path(prms.Paths.env_file)
env_file_in_user_dir = pathlib.Path.home() / prms.Paths.env_file
if env_file.is_file():
dotenv.load_dotenv(env_file)
elif env_file_in_user_dir.is_file():
dotenv.load_dotenv(env_file_in_user_dir)
else:
logging.debug("No .env file found")
[docs]def get_user_name():
"""get the username of the current user (cross-platform)"""
return getpass.getuser()
[docs]def create_custom_init_filename(user_name=None):
"""creates a custom prms filename"""
if user_name is None:
return DEFAULT_FILENAME_START + get_user_name() + DEFAULT_FILENAME_END
else:
return DEFAULT_FILENAME_START + user_name + DEFAULT_FILENAME_END
[docs]def get_user_dir_and_dst(init_filename=None):
"""gets the name of the user directory and full prm filepath"""
if init_filename is None:
init_filename = create_custom_init_filename()
user_dir = get_user_dir()
dst_file = user_dir / init_filename
return user_dir, dst_file
[docs]def get_user_dir():
"""gets the name of the user directory"""
# user_dir = pathlib.Path(os.path.abspath(os.path.expanduser("~")))
user_dir = pathlib.Path().home().resolve()
if os.name == "nt" and USE_MY_DOCUMENTS:
_user_dir = user_dir / "documents"
if _user_dir.is_dir():
user_dir = _user_dir
return user_dir
def _write_prm_file(file_name=None):
logging.debug("saving configuration to %s" % file_name)
config_dict = _pack_prms()
try:
with open(file_name, "w") as config_file:
yaml.allow_unicode = True
yaml.default_flow_style = False
yaml.explicit_start = True
yaml.explicit_end = True
yaml.dump(config_dict, config_file)
except YAMLError:
raise ConfigFileNotWritten
def _update_prms(config_dict):
"""updates the prms with the values in the config_dict"""
# config_dict is your current config
# _config_attr is the attribute in the prms module (i.e. the defaults)
logging.debug("updating parameters")
logging.debug(f"new prms: {config_dict}")
for key in config_dict:
if config_dict[key] is None:
logging.debug(f"{config_dict[key]} is None")
continue
if key == "Paths":
_config_attr = getattr(prms, key)
for k in config_dict[key]:
z = config_dict[key][k]
_txt = f"{k}: {z}"
if k.lower() == "db_filename":
# special hack because it is a filename and not a path
pass
elif k.lower() in OTHERPATHS:
logging.debug("converting to OtherPath")
# special hack because it is possibly an external location
z = OtherPath(
str(z)
).resolve() # v1.0.0: this is only resolving local paths
else:
logging.debug("converting to pathlib.Path")
z = pathlib.Path(z).resolve()
_txt += f" -> {z}"
logging.debug(_txt)
setattr(_config_attr, k, z)
elif hasattr(prms, key):
_config_attr = getattr(prms, key)
if _config_attr is None:
logging.debug(f"{_config_attr} is None")
continue
for k in config_dict[key]:
z = config_dict[key][k]
if isinstance(z, dict):
y = getattr(_config_attr, k)
z = box.Box({**y, **z})
if isinstance(z, ruamel.yaml.comments.CommentedMap):
z = box.Box(z)
setattr(_config_attr, k, z)
else:
logging.info("\n not-supported prm: %s" % key)
def _convert_instruments_to_dict(x):
# Converting instruments to dictionary (since it contains box.Box objects)
d = asdict(x)
for k, v in d.items():
try:
d[k] = v.to_dict()
except AttributeError:
pass
return d
def _convert_to_dict(x):
try:
dictionary = x.to_dict()
except AttributeError:
dictionary = asdict(x)
return dictionary
def _convert_paths_to_dict(x):
dictionary = {}
for k in x.keys():
# hack to get around the leading underscore (since they are properties):
if len(k) > 1 and k[0] == "_" and k.lower()[1:] in OTHERPATHS:
t = getattr(x, k).full_path
k = k[1:]
else:
t = str(getattr(x, k))
dictionary[k] = t
return dictionary
def _update_and_convert_to_dict(parameter_name):
"""check that all the parameters are correct in the prm-file"""
# update from old prm-file (before v1.0.0):
if parameter_name == "DbCols":
if hasattr(prms, "DbCols"):
db_cols = _convert_to_dict(prms.DbCols)
if db_cols is None:
return prms.DbColsClass()
for k in db_cols:
if isinstance(db_cols[k], (list, tuple)):
db_cols[k] = db_cols[k][0]
return db_cols
else:
return prms.DbColsClass()
def _pack_prms():
"""if you introduce new 'save-able' parameter dictionaries, then you have
to include them here"""
config_dict = {
"Paths": _convert_paths_to_dict(prms.Paths),
"FileNames": _convert_to_dict(prms.FileNames),
"Db": _convert_to_dict(prms.Db),
"DbCols": _update_and_convert_to_dict("DbCols"),
"CellInfo": _convert_to_dict(prms.CellInfo),
"Reader": _convert_to_dict(prms.Reader),
"Materials": _convert_to_dict(prms.Materials),
"Instruments": _convert_instruments_to_dict(prms.Instruments),
"Batch": _convert_to_dict(prms.Batch),
}
return config_dict
def _read_prm_file(prm_filename):
"""read the prm file"""
logging.debug("Reading config-file: %s" % prm_filename)
try:
with open(prm_filename, "r") as config_file:
prm_dict = yaml.load(config_file)
except YAMLError as e:
raise ConfigFileNotRead from e
else:
if isinstance(prm_dict, dict):
_update_prms(prm_dict)
else:
print(type(prm_dict))
def _read_prm_file_without_updating(prm_filename):
"""read the prm file but do not update the params"""
logging.debug("Reading config-file: %s" % prm_filename)
try:
with open(prm_filename, "r") as config_file:
prm_dict = yaml.load(config_file)
except YAMLError as e:
raise ConfigFileNotRead from e
return prm_dict
def __look_at(file_name):
with open(file_name, "r") as config_file:
t = yaml.load(config_file)
print(t)
def _get_prm_file(file_name=None, search_order=None):
"""returns name of the prm file"""
if file_name is not None:
if os.path.isfile(file_name):
return file_name
else:
logging.info("Could not find the prm-file")
default_name = prms._prm_default_name # NOQA
prm_globtxt = prms._prm_globtxt # NOQA
script_dir = os.path.abspath(os.path.dirname(__file__))
search_path = dict()
search_path["curdir"] = os.path.abspath(os.path.dirname(sys.argv[0]))
search_path["filedir"] = script_dir
search_path["user_dir"] = get_user_dir()
if search_order is None:
search_order = ["user_dir"] # ["curdir","filedir", "user_dir",]
else:
search_order = search_order
# The default name for the prm file is at the moment in the script-dir,@
# while default searching is in the user_dir (yes, I know):
prm_default = os.path.join(script_dir, default_name)
# -searching-----------------------
search_dict: OrderedDict[Any] = OrderedDict()
for key in search_order:
search_dict[key] = [None, None]
prm_directory = search_path[key]
default_file = os.path.join(prm_directory, default_name)
if os.path.isfile(default_file):
# noinspection PyTypeChecker
search_dict[key][0] = default_file
prm_globtxt_full = os.path.join(prm_directory, prm_globtxt)
user_files = glob.glob(prm_globtxt_full)
for f in user_files:
if os.path.basename(f) != os.path.basename(default_file):
search_dict[key][1] = f
break
# -selecting----------------------
prm_file = None
for key, file_list in search_dict.items():
if file_list[-1]:
prm_file = file_list[-1]
break
else:
if not prm_file:
prm_file = file_list[0]
if prm_file:
prm_filename = prm_file
else:
prm_filename = prm_default
return prm_filename
def _save_current_prms_to_user_dir():
# This should be put into the cellpy setup script
file_name = os.path.join(prms.user_dir, prms._prm_default_name) # NOQA
_write_prm_file(file_name)
[docs]def get_env_file_name():
"""returns the location of the env-file"""
return pathlib.Path(prms.Paths.env_file)
[docs]def info():
"""this function will show only the 'box'-type
attributes and their content in the cellpy.prms module"""
print("Convenience function for listing prms")
print(prms.__name__)
print(f"prm file (for current user): {_get_prm_file()}")
print()
for key, current_object in prms.__dict__.items():
if key.startswith("_") and not key.startswith("__") and prms._debug: # NOQA
print(f"Internal: {key} (type={type(current_object)}): {current_object}")
elif isinstance(current_object, box.Box):
print()
print(" OLD-TYPE PRM ".center(80, "="))
print(f"prms.{key}:")
print(80 * "-")
for subkey in current_object:
print(f"prms.{key}.{subkey} = ", f"{current_object[subkey]}")
print()
elif key == "Paths":
print(" NEW-TYPE PRM WITH OTHERPATHS ".center(80, "*"))
attributes = {
k: v for k, v in vars(current_object).items() if not k.startswith("_")
}
for attr in OTHERPATHS:
attributes[attr] = getattr(current_object, attr)
pprint(attributes, width=1)
elif isinstance(current_object, (prms.CellPyConfig, prms.CellPyDataConfig)):
print(" NEW-TYPE PRM ".center(80, "="))
attributes = {
k: v for k, v in vars(current_object).items() if not k.startswith("_")
}
print(f" {key} ".center(80, "="))
pprint(attributes, width=1)
print()
[docs]def main():
print(" STARTING THE ACTUAL SCRIPT ".center(80, "-"))
print("PRM FILE:")
f = _get_prm_file()
print(f)
print("READING:")
_read_prm_file(f)
print("PACKING:")
pprint(_pack_prms())
print("INFO:")
info()
print(prms)
pprint(str(prms.Batch), width=1)
print(prms.Batch.summary_plot_height_fractions)
if __name__ == "__main__":
main()