# -*- coding: utf-8 -*-
import getpass
import glob
import logging
import os
import pathlib
import sys
import warnings
from collections import OrderedDict
from dataclasses import asdict, dataclass
from pprint import pprint
import box
import dotenv
from rich import print
import ruamel
from ruamel.yaml import YAML
from ruamel.yaml.error import YAMLError
from cellpy.exceptions import ConfigFileNotRead, ConfigFileNotWritten
from cellpy.parameters import prms
from cellpy.parameters.internal_settings import OTHERPATHS
from cellpy.internals.core import OtherPath
[docs]
DEFAULT_FILENAME_START = ".cellpy_prms_"
[docs]
DEFAULT_FILENAME_END = ".conf"
[docs]
USE_MY_DOCUMENTS = False
[docs]
DEFAULT_FILENAME = DEFAULT_FILENAME_START + "default" + DEFAULT_FILENAME_END
[docs]
ENVIRONMENT_EXAMPLE = """
# This is an example of an environment file for cellpy.
# The environment file is used to set environment variables
# that are used by cellpy.
# The environment file should be located in the user directory
# (i.e. the directory returned by pathlib.Path.home()).
# The default environment file is named .env_cellpy, but you can
# change this in your config file.
# The environment file should contain the following variables:
# CELLPY_PASSWORD=<password>
# CELLPY_KEY_FILENAME=<key_filename>
# CELLPY_HOST=<host>
# CELLPY_USER=<user>
"""
# logger = logging.getLogger(__name__)
[docs]
def initialize():
"""Initializes cellpy by reading the config file and the environment file"""
try:
_read_prm_file(_get_prm_file())
_load_env_file()
except FileNotFoundError:
warnings.warn("Could not find the config-file")
except UserWarning:
warnings.warn("Could not read the config-file")
def _load_env_file():
"""Loads the environment file"""
env_file = pathlib.Path(prms.Paths.env_file)
env_file_in_user_dir = pathlib.Path.home() / prms.Paths.env_file
if env_file.is_file():
dotenv.load_dotenv(env_file)
elif env_file_in_user_dir.is_file():
dotenv.load_dotenv(env_file_in_user_dir)
else:
logging.debug("No .env file found")
[docs]
def get_user_name():
"""Get the username of the current user (cross-platform)"""
return getpass.getuser()
[docs]
def create_custom_init_filename(user_name=None):
"""Creates a custom prms filename"""
if user_name is None:
return DEFAULT_FILENAME_START + get_user_name() + DEFAULT_FILENAME_END
else:
return DEFAULT_FILENAME_START + user_name + DEFAULT_FILENAME_END
[docs]
def get_user_dir_and_dst(init_filename=None):
"""Gets the name of the user directory and full prm filepath"""
if init_filename is None:
init_filename = create_custom_init_filename()
user_dir = get_user_dir()
dst_file = user_dir / init_filename
return user_dir, dst_file
[docs]
def get_user_dir():
"""Gets the name of the user directory"""
# user_dir = pathlib.Path(os.path.abspath(os.path.expanduser("~")))
user_dir = pathlib.Path().home().resolve()
if os.name == "nt" and USE_MY_DOCUMENTS:
_user_dir = user_dir / "documents"
if _user_dir.is_dir():
user_dir = _user_dir
return user_dir
def _write_prm_file(file_name=None):
logging.debug(f"saving configuration to {file_name}")
config_dict = _pack_prms()
try:
with open(file_name, "w") as config_file:
yaml.allow_unicode = True
yaml.default_flow_style = False
yaml.explicit_start = True
yaml.explicit_end = True
yaml.dump(config_dict, config_file)
except YAMLError:
raise ConfigFileNotWritten
# TODO: make this alive by setting it to not dev:
def _write_env_file(env_file_name=None):
"""writes example environment file"""
dev = False
if env_file_name is None:
env_file_name = get_env_file_name()
logging.debug(f"saving environment arguments to {env_file_name}")
if dev:
print("---content----------------------------------------")
print("* OBS! in dev-mode, file will not be saved!")
print(ENVIRONMENT_EXAMPLE)
print("--------------------------------------------------")
return
try:
with open(env_file_name, "w") as env_file:
env_file.write(ENVIRONMENT_EXAMPLE)
except Exception as e:
print(f"could not write to {env_file_name}")
print(e)
def _update_prms(config_dict, resolve_paths=True):
"""updates the prms with the values in the config_dict"""
# config_dict is your current config
# _config_attr is the attribute in the prms module (i.e. the defaults)
logging.debug("updating parameters")
logging.debug(f"new prms: {config_dict}")
for key in config_dict:
if config_dict[key] is None:
logging.debug(f"{config_dict[key]} is None")
continue
if key == "Paths":
_config_attr = getattr(prms, key)
for k in config_dict[key]:
z = config_dict[key][k]
_txt = f"{k}: {z}"
if k.lower() == "db_filename":
# special hack because it is a filename and not a path
pass
elif k.lower() in OTHERPATHS:
logging.debug("converting to OtherPath")
# special hack because it is possibly an external location
z = OtherPath(str(z))
if resolve_paths:
z = z.resolve() # v1.0.0: this is only resolving local paths
else:
logging.debug("converting to pathlib.Path")
z = pathlib.Path(z)
if resolve_paths:
z = z.resolve()
_txt += f" -> {z}"
logging.debug(_txt)
setattr(_config_attr, k, z)
elif hasattr(prms, key):
_config_attr = getattr(prms, key)
if _config_attr is None:
logging.debug(f"{_config_attr} is None")
continue
for k in config_dict[key]:
z = config_dict[key][k]
if isinstance(z, dict):
y = getattr(_config_attr, k)
z = box.Box({**y, **z})
if isinstance(z, ruamel.yaml.comments.CommentedMap):
z = box.Box(z)
setattr(_config_attr, k, z)
else:
logging.info("\n not-supported prm: %s" % key)
def _convert_instruments_to_dict(x):
# Converting instruments to dictionary (since it contains box.Box objects)
d = asdict(x)
for k, v in d.items():
try:
d[k] = v.to_dict()
except AttributeError:
pass
return d
def _convert_to_dict(x):
try:
dictionary = x.to_dict()
except AttributeError:
dictionary = asdict(x)
return dictionary
def _convert_paths_to_dict(x):
dictionary = {}
for k in x.keys():
# hack to get around the leading underscore (since they are properties):
if len(k) > 1 and k[0] == "_" and k.lower()[1:] in OTHERPATHS:
t = getattr(x, k).full_path
k = k[1:]
else:
t = str(getattr(x, k))
dictionary[k] = t
return dictionary
def _update_and_convert_to_dict(parameter_name):
"""check that all the parameters are correct in the prm-file"""
# update from old prm-file (before v1.0.0):
if parameter_name == "DbCols":
if hasattr(prms, "DbCols"):
db_cols = _convert_to_dict(prms.DbCols)
if db_cols is None:
return prms.DbColsClass()
for k in db_cols:
if isinstance(db_cols[k], (list, tuple)):
db_cols[k] = db_cols[k][0]
return db_cols
else:
return prms.DbColsClass()
def _pack_prms():
"""if you introduce new 'save-able' parameter dictionaries, then you have
to include them here"""
config_dict = {
"Paths": _convert_paths_to_dict(prms.Paths),
"FileNames": _convert_to_dict(prms.FileNames),
"Db": _convert_to_dict(prms.Db),
"DbCols": _update_and_convert_to_dict("DbCols"),
"CellInfo": _convert_to_dict(prms.CellInfo),
"Reader": _convert_to_dict(prms.Reader),
"Materials": _convert_to_dict(prms.Materials),
"Instruments": _convert_instruments_to_dict(prms.Instruments),
"Batch": _convert_to_dict(prms.Batch),
}
return config_dict
def _read_prm_file(prm_filename, resolve_paths=True):
"""read the prm file"""
logging.debug("Reading config-file: %s" % prm_filename)
try:
with open(prm_filename, "r") as config_file:
prm_dict = yaml.load(config_file)
except YAMLError as e:
raise ConfigFileNotRead from e
else:
if isinstance(prm_dict, dict):
_update_prms(prm_dict, resolve_paths=resolve_paths)
else:
print(type(prm_dict))
def _read_prm_file_without_updating(prm_filename):
"""read the prm file but do not update the params"""
logging.debug("Reading config-file: %s" % prm_filename)
try:
with open(prm_filename, "r") as config_file:
prm_dict = yaml.load(config_file)
except YAMLError as e:
raise ConfigFileNotRead from e
return prm_dict
def __look_at(file_name):
with open(file_name, "r") as config_file:
t = yaml.load(config_file)
print(t)
def _get_prm_file(file_name=None, search_order=None):
"""returns name of the prm file"""
if file_name is not None:
if os.path.isfile(file_name):
return file_name
else:
logging.info("Could not find the prm-file")
default_name = prms._prm_default_name # NOQA
prm_globtxt = prms._prm_globtxt # NOQA
script_dir = os.path.abspath(os.path.dirname(__file__))
search_path = dict()
search_path["curdir"] = os.path.abspath(os.path.dirname(sys.argv[0]))
search_path["filedir"] = script_dir
search_path["user_dir"] = get_user_dir()
if search_order is None:
search_order = ["user_dir"] # ["curdir","filedir", "user_dir",]
else:
search_order = search_order
# The default name for the prm file is at the moment in the script-dir,@
# while default searching is in the user_dir (yes, I know):
prm_default = os.path.join(script_dir, default_name)
# -searching-----------------------
search_dict: OrderedDict = OrderedDict()
for key in search_order:
search_dict[key] = [None, None]
prm_directory = search_path[key]
default_file = os.path.join(prm_directory, default_name)
if os.path.isfile(default_file):
# noinspection PyTypeChecker
search_dict[key][0] = default_file
prm_globtxt_full = os.path.join(prm_directory, prm_globtxt)
user_files = glob.glob(prm_globtxt_full)
for f in user_files:
if os.path.basename(f) != os.path.basename(default_file):
search_dict[key][1] = f
break
# -selecting----------------------
prm_file = None
for key, file_list in search_dict.items():
if file_list[-1]:
prm_file = file_list[-1]
break
else:
if not prm_file:
prm_file = file_list[0]
if prm_file:
prm_filename = prm_file
else:
prm_filename = prm_default
return prm_filename
def _save_current_prms_to_user_dir():
# This should be put into the cellpy setup script
file_name = os.path.join(prms.user_dir, prms._prm_default_name) # NOQA
_write_prm_file(file_name)
[docs]
def get_env_file_name():
"""Returns the location of the env-file"""
# TODO: make this more robust - especially on posix systems strange
# things seem to happen
# from prms.py (default values):
# user_dir = Path.home()
# env_file: Union[Path, str] = user_dir / ".env_cellpy"
# from running setup on CI:
#
# location of env-file:
# /Users/runner/work/cellpy/cellpy/.env_cellpy
#
# location of config-file:
# /Users/runner/.cellpy_prms_runner.conf
#
# WHY ARE THEY NOT IN THE SAME DIRECTORY?
# (could it be that Path.home() behaves different than I expect?)
env_file = pathlib.Path(prms.Paths.env_file)
return env_file
[docs]
def info():
"""This function will show only the 'box'-type
attributes and their content in the cellpy.prms module"""
print(80 * "=")
print(f"Listing the content of the prms module ({prms.__name__})")
print(80 * "-")
config_file = _get_prm_file()
env_file = get_env_file_name()
print(f" prm file (for current user): {config_file}")
print(f" - exists: {os.path.isfile(config_file)}")
print(f" env file (for current user): {env_file}")
print(f" - exists: {os.path.isfile(env_file)}")
print()
for key, current_object in prms.__dict__.items():
if key.startswith("_") and not key.startswith("__") and prms._debug: # NOQA
print(f"Internal: {key} (type={type(current_object)}): {current_object}")
elif isinstance(current_object, box.Box):
print()
print(f" {key} [OLD-TYPE PRM] ".center(80, "-"))
for subkey in current_object:
print(f"prms.{key}.{subkey} = {current_object[subkey]}")
print()
elif key == "Paths":
print(" Paths ".center(80, "-"))
attributes = {
k: v for k, v in vars(current_object).items() if not k.startswith("_")
}
for attr in OTHERPATHS:
attributes[attr] = getattr(current_object, attr)
print(attributes)
elif isinstance(current_object, (prms.CellPyConfig, prms.CellPyDataConfig)):
# print(" NEW-TYPE PRM ".center(80, "="))
attributes = {
k: v for k, v in vars(current_object).items() if not k.startswith("_")
}
print(f" {key} ".center(80, "-"))
print(attributes)
print()
def _main():
print(" STARTING THE ACTUAL SCRIPT ".center(80, "-"))
print("PRM FILE:")
f = _get_prm_file()
print(f)
print("READING:")
_read_prm_file(f)
print("PACKING:")
pprint(_pack_prms())
print("INFO:")
info()
print(prms)
pprint(str(prms.Batch), width=1)
print(prms.Batch.summary_plot_height_fractions)
if __name__ == "__main__":
_main()