import json
import logging
import os
import pathlib
import platform
import shutil
import tempfile
from typing import Union, Any
import warnings
from abc import ABC
import pandas as pd
from cellpy.exceptions import UnderDefined
from cellpy.parameters import prms
from cellpy.parameters.internal_settings import (
get_headers_journal,
keys_journal_session,
)
from cellpy.parameters.legacy.update_headers import (
headers_journal_v0 as hdr_journal_old,
)
from cellpy.readers import dbreader, json_dbreader
from cellpy.utils.batch_tools.batch_core import BaseJournal
from cellpy.utils.batch_tools.engines import simple_db_engine, sql_db_engine
from cellpy.utils.batch_tools import batch_helpers
hdr_journal = get_headers_journal()
trans_dict = {}
missing_keys = []
for key in hdr_journal:
if key in hdr_journal_old:
trans_dict[hdr_journal_old[key]] = hdr_journal[key]
else:
missing_keys.append(key)
# Default output directory name for batch processing
DEFAULT_OUTPUT_DIR_NAME = "dump"
[docs]
class LabJournal(BaseJournal, ABC):
def __init__(
self, db_reader="default", engine=None, batch_col=None, db_file=None, **kwargs
):
"""Journal for selected batch.
The journal contains pages (pandas.DataFrame) with prms for
each cell (one cell pr row).
Args:
db_reader: either default (a simple excel reader already
implemented in cellpy) or other db readers that implement
the needed API.
engine: defaults to simple_db_engine for parsing db using the
db_reader
self.pages = simple_db_engine(
self.db_reader, id_keys, **kwargs
)
db_file: the file name of the json file to use as the db (used by batbase_json_reader).
batch_col: the column name for the batch column in the db (used by simple_db_engine).
**kwargs: passed to the db_reader
"""
super().__init__()
self.db_reader = None
self.engine = None
self.batch_col = None
if db_reader is None:
return
if isinstance(db_reader, str):
if db_reader == "off":
self.db_reader = None
return
if db_reader == "default":
db_reader = prms.Db.db_type
if db_reader == "simple_excel_reader":
self.db_reader = dbreader.Reader()
self.engine = simple_db_engine
elif db_reader == "batbase_json_reader":
if db_file is None:
raise UnderDefined("db_file is not provided")
if not os.path.exists(db_file):
raise FileNotFoundError(f"The file {db_file} does not exist")
self.db_reader = json_dbreader.BatBaseJSONReader(json_file=db_file)
self.engine = simple_db_engine
elif db_reader == "custom_json_reader":
if db_file is None:
raise UnderDefined("db_file is not provided")
if not os.path.exists(db_file):
raise FileNotFoundError(f"The file {db_file} does not exist")
column_map = kwargs.pop("column_map", None)
self.db_reader = json_dbreader.CustomJSONReader(
json_file=db_file, column_map=column_map
)
self.engine = simple_db_engine
elif db_reader == "sql_db_reader":
raise NotImplementedError(
"sql_db_reader is not implemented yet"
) # self.db_reader = sql_dbreader.SqlReader() # self.engine = sql_db_engine
else:
raise UnderDefined(f"The db-reader '{db_reader}' is not supported")
else:
logging.debug(f"Remark! db_reader: {db_reader}")
self.db_reader = db_reader
if engine is None:
self.engine = simple_db_engine
logging.debug(f"engine: {self.engine}")
logging.debug(f"db_reader: {self.db_reader}")
logging.debug(f"batch_col: {batch_col}")
logging.debug(f"kwargs: {kwargs}")
self.batch_col = batch_col or "b01"
def _repr_html_(self):
if hasattr(self.db_reader, "_repr_html_"):
db_reader_txt = self.db_reader._repr_html_()
else:
db_reader_txt = self.db_reader.__str__()
txt = f"<h2>LabJournal-object</h2> id={hex(id(self))}"
txt += "<h3>Main attributes</h3>"
txt += f"""
<table>
<thead>
<tr>
<th>Attribute</th>
<th>Value</th>
</tr>
</thead>
<tbody>
<tr><td><b>name</b></td><td>{self.name}</td></tr>
<tr><td><b>project</b></td><td>{self.project}</td></tr>
<tr><td><b>file_name</b></td><td>{self.file_name}</td></tr>
<tr><td><b>db_reader</b></td><td>{db_reader_txt}</td></tr>
"""
if self.db_reader == "default" or isinstance(self.db_reader, dbreader.Reader):
txt += f"<tr><td><b>batch_col</b></td><td>{self.batch_col}</td></tr>"
txt += f"""
<tr><td><b>time_stamp</b></td><td>{self.time_stamp}</td></tr>
<tr><td><b>project_dir</b></td><td>{self.project_dir}</td></tr>
<tr><td><b>raw_dir</b></td><td>{self.raw_dir}</td></tr>
<tr><td><b>batch_dir</b></td><td>{self.batch_dir}</td></tr>
</tbody>
</table>
"""
txt += "<h3>Session info</h3>"
if self.session:
txt += f"<p>(type=dictionary | id={hex(id(self.session))})</p>"
for session_key in self.session:
txt += f"<p><b>{session_key}</b>: {self.session[session_key]}</p>"
txt += "<h3>Pages</h3>"
try:
txt += self.pages._repr_html_() # pylint: disable=protected-access
except AttributeError:
txt += "<p><b>pages</b><br> not found!</p>"
except ValueError:
txt += "<p><b>pages</b><br> not readable!</p>"
return txt
def _check_file_name(self, file_name, to_project_folder=False):
if file_name is None:
if not self.file_name:
self.generate_file_name(to_project_folder=to_project_folder)
file_name = pathlib.Path(self.file_name)
else:
file_name = pathlib.Path(file_name)
if to_project_folder:
# When saving to project folder, use prms.Paths.outdatadir
file_name = file_name.with_suffix(".json").name
out_data_dir = prms.Paths.outdatadir
project_dir = pathlib.Path(out_data_dir) / self.project
file_name = project_dir / file_name
self.file_name = file_name # updates object (maybe not smart)
return file_name
[docs]
def from_db(
self, project=None, name=None, batch_col=None, dbreader_kwargs=None, **kwargs
):
"""populate journal from db.
Args:
project (str): project name.
name (str): experiment name.
batch_col (int): batch column.
dbreader_kwargs: sent to simple db_reader.
**kwargs: sent to engine.
simple_db-engine -> filefinder.search_for_files:
run_name(str): run-file identification.
raw_extension(str): optional, extension of run-files (without the '.').
cellpy_file_extension(str): optional, extension for cellpy files
(without the '.').
raw_file_dir(path): optional, directory where to look for run-files
(default: read prm-file)
cellpy_file_dir(path): optional, directory where to look for
cellpy-files (default: read prm-file)
prm_filename(path): optional parameter file can be given.
file_name_format(str): format of raw-file names or a glob pattern
(default: YYYYMMDD_[name]EEE_CC_TT_RR) [not finished yet].
reg_exp(str): use regular expression instead (defaults to None) [not finished yet].
sub_folders (bool): perform search also in sub-folders.
file_list (list of str): perform the search within a given list
of filenames instead of searching the folder(s). The list should
not contain the full filepath (only the actual file names). If
you want to provide the full path, you will have to modify the
file_name_format or reg_exp accordingly.
pre_path (path or str): path to prepend the list of files selected
from the file_list.
Returns:
None
"""
logging.debug("creating journal from db")
if batch_col is None:
batch_col = self.batch_col
if project is not None:
self.project = project
if name is None:
name = self.name
else:
self.name = name
if dbreader_kwargs is None:
dbreader_kwargs = {}
logging.debug(
f"batch_name, batch_col, dbreader, dbreader_kwargs: {name}, {batch_col}, {self.db_reader}, {dbreader_kwargs}"
)
if self.db_reader is not None:
logging.debug("running db_reader.select_batch")
if isinstance(self.db_reader, dbreader.Reader): # Simple excel-db
id_keys = self.db_reader.select_batch(
name, batch_col, **dbreader_kwargs
)
# Check for duplicates in id_keys
if len(id_keys) != len(set(id_keys)):
duplicates = [x for x in id_keys if id_keys.count(x) > 1]
unique_duplicates = list(set(duplicates))
logging.warning(f"Found duplicate id_keys: {unique_duplicates}")
else:
logging.debug("No duplicates found in id_keys")
logging.debug(f"id_keys: {id_keys}")
self.pages = self.engine(self.db_reader, id_keys, **kwargs)
else:
logging.debug(
"creating journal pages using advanced reader methods (not simple excel-db)"
)
self.pages = self.engine(self.db_reader, batch_name=name, **kwargs)
if self.pages.empty:
logging.critical(
"EMPTY JOURNAL PAGES: are you sure you have provided correct input to batch?"
)
logging.critical(f"name: {name}")
logging.critical(f"project: {self.project}")
logging.critical(f"batch_col: {batch_col}")
else:
logging.debug("creating empty journal pages")
self.pages = pd.DataFrame()
self.generate_empty_session()
self.generate_folder_names()
self.paginate()
[docs]
def generate_empty_session(self):
self.session = {}
for item in keys_journal_session:
self.session[item] = None
@staticmethod
def _fix_cellpy_paths(p):
logging.debug("_fix_cellpy_paths does not work with OtherPaths yet")
# if platform.system() != "Windows":
# if p.find("\\") >= 0:
# # convert from win to posix
# p = pathlib.PureWindowsPath(p)
# else:
# if p.find("/") >= 0:
# # convert from posix to win
# p = pathlib.PurePosixPath(p)
# p = pathlib.Path(p)
return p
[docs]
def from_dict(self, info_dict, **kwargs):
pages_dict = info_dict.get("info_df", None)
if pages_dict is None:
logging.critical("could not find any pages in the journal")
raise UnderDefined
meta_dict = info_dict.get("metadata", None)
session = info_dict.get("session", None)
pages = self._clean_pages(pages_dict)
if session is None:
logging.debug("no session - generating empty one")
session = dict()
session, pages = self._clean_session(session, pages)
if hdr_journal.filename in pages.columns:
pages = pages.set_index(hdr_journal.filename)
self.pages = pages
self.session = session
self._prm_packer(meta_dict)
[docs]
@classmethod
def read_journal_jason_file(
cls, file_name: Union[pathlib.Path, str, bytes, bytearray], **kwargs
) -> tuple[pd.DataFrame, dict, dict]:
"""Loads a journal file in json format."""
is_raw_bytes = kwargs.pop("is_raw_bytes", False)
if is_raw_bytes:
logging.debug("json loader starting on raw bytes")
top_level_dict = json.loads(file_name)
else:
logging.debug(f"json loader starting on {file_name}")
with open(file_name, "r") as infile:
top_level_dict = json.load(infile)
pages_dict = top_level_dict["info_df"]
meta = top_level_dict["metadata"]
session = top_level_dict.get("session", None)
pages = pd.DataFrame(pages_dict)
if pages.empty:
logging.critical(
"could not find any pages in the journal (seems to be empty - no data)"
)
logging.critical(f"file_name: {file_name}")
logging.critical(
"Tips: If using the load function with allow_using_backup_journal=True, "
"try deleting the journal file and try again."
)
raise UnderDefined
pages = cls._clean_pages(pages)
if session is None:
logging.debug("no session - generating empty one")
session = dict()
session, pages = cls._clean_session(session, pages)
return pages, meta, session
[docs]
@classmethod
def read_journal_excel_file(cls, file_name, **kwargs):
sheet_names = {"meta": "meta", "pages": "pages", "session": "session"}
project = kwargs.pop("project", "NaN")
name = kwargs.pop("batch", pathlib.Path(file_name).stem)
_meta = {
"name": name,
"project": project,
"project_dir": pathlib.Path("."),
"batch_dir": pathlib.Path("."),
"raw_dir": pathlib.Path("."),
}
logging.debug(f"xlsx loader starting on {file_name}")
meta_sheet_name = sheet_names["meta"] # not tested yet
pages_sheet_name = sheet_names["pages"]
session_sheet_name = sheet_names["session"] # not tested yet
temporary_directory = tempfile.mkdtemp()
temporary_file_name = shutil.copy(file_name, temporary_directory)
try:
pages = pd.read_excel(
temporary_file_name, engine="openpyxl", sheet_name=pages_sheet_name
)
except KeyError:
print(f"Worksheet '{pages_sheet_name}' does not exist.")
return None
try:
session = pd.read_excel(
temporary_file_name,
sheet_name=session_sheet_name,
engine="openpyxl",
header=[0, 1],
)
except (KeyError, ValueError):
print(f"Worksheet '{session_sheet_name}' does not exist.")
session = None
try:
meta = pd.read_excel(
temporary_file_name, sheet_name=meta_sheet_name, engine="openpyxl"
)
except (KeyError, ValueError):
print(f"Worksheet '{meta_sheet_name}' does not exist.")
meta = None
if pages.empty:
logging.critical("could not find any pages in the journal")
raise UnderDefined
pages = cls._clean_pages(pages)
pages = pages.set_index(hdr_journal.filename)
if meta is None:
meta = _meta
else:
meta = cls._unpack_meta(meta) or _meta
if session is None:
logging.debug("no session - generating empty one")
session = dict()
else:
session = cls._unpack_session(session)
session, pages = cls._clean_session(session, pages)
return pages, meta, session
@classmethod
def _unpack_session(cls, session):
try:
bcn2 = {
n: sb["cycle_index"].to_list()
for n, sb in session["bad_cycles"].groupby("cell_name")
}
except KeyError:
bcn2 = []
try:
bc2 = list(session["bad_cells"]["cell_name"].dropna().values.flatten())
except KeyError:
bc2 = []
try:
s2 = list(session["starred"]["cell_name"].dropna().values.flatten())
except KeyError:
s2 = []
try:
n2 = list(session["notes"]["txt"].dropna().values.flatten())
except KeyError:
n2 = []
session = {"bad_cycles": bcn2, "bad_cells": bc2, "starred": s2, "notes": n2}
return session
@classmethod
def _unpack_meta(cls, meta):
try:
meta = meta.loc[:, ["parameter", "value"]]
except KeyError:
return
meta = meta.set_index("parameter")
return meta.to_dict()["value"]
@classmethod
def _clean_session(
cls, session: dict, pages: pd.DataFrame
) -> tuple[dict, pd.DataFrame]:
# TODO: clean up this method
# I wonder why I included pages as one of the arguments here...
if not session:
logging.critical("no session found in your journal file")
for item in keys_journal_session:
session[item] = session.get(item, None)
return session, pages
@classmethod
def _clean_pages(cls, pages: Union[pd.DataFrame, dict]) -> pd.DataFrame:
import ast
# TODO: clean up this method
# (and it is probably not suited for a class method anymore)
if isinstance(pages, dict):
pages = pd.DataFrame(pages)
logging.debug("removing empty rows")
pages = pages.dropna(how="all")
logging.debug("checking path-names")
try:
p = pages[hdr_journal.raw_file_names]
new_p = []
for f in p:
if isinstance(f, str):
try:
new_f = ast.literal_eval(f"'{f}'")
if isinstance(new_f, list):
f = new_f
except Exception as e:
logging.warning(e)
warnings.warn(f"Could not evaluate {f}")
new_p.append(f)
pages[hdr_journal.raw_file_names] = new_p
except KeyError:
print(
"Tried but failed in converting raw_file_names into an appropriate list"
)
try:
# TODO: the _fix_cellpy_paths method does not work with OtherPaths (yet?) - fix this
pages[hdr_journal.cellpy_file_name] = pages[
hdr_journal.cellpy_file_name
].apply(cls._fix_cellpy_paths)
except KeyError:
# assumes it is an old type journal file
print(f"The key '{hdr_journal.cellpy_file_name}' is missing!")
print("Assumes that this is an old-type journal file.")
try:
pages.rename(columns=trans_dict, inplace=True)
# TODO: the _fix_cellpy_paths method does not work with OtherPaths (yet?) - fix this
pages[hdr_journal.cellpy_file_name] = pages[
hdr_journal.cellpy_file_name
].apply(cls._fix_cellpy_paths)
logging.warning("old journal file - updating")
except KeyError:
print("Error! Could still not parse the pages.")
print(f"Missing key: {hdr_journal.cellpy_file_name}")
pages[hdr_journal.cellpy_file_name] = None
# only keep selected cells if keep column exists
if "keep" in pages.columns:
logging.debug("Journal contains 'keep' - selecting only 'keep' > 0.")
negatives = ["No", "no", "n", "False", "false", "f", "0", "0.0", 0, 0.0]
not_negative = ~pages.keep.isin(negatives)
pages = pages.loc[not_negative, :]
for column_name in missing_keys:
if column_name not in pages.columns:
logging.debug(f"wrong journal format - missing: {column_name}")
pages[column_name] = None
for column_name in hdr_journal:
if column_name not in pages.columns:
if column_name != hdr_journal.filename:
pages[column_name] = None
pages = batch_helpers.make_unique_groups(pages)
return pages
[docs]
def from_file(self, file_name=None, paginate=True, **kwargs):
"""Loads a DataFrame with all the needed info about the experiment"""
file_name = self._check_file_name(file_name)
logging.info(f"reading {file_name}")
if pathlib.Path(file_name).suffix.lower() == ".xlsx":
file_loader = self.read_journal_excel_file
else:
file_loader = self.read_journal_jason_file
try:
# Assuming that the file_loader performs the appropriate cleaning and fixing.
out = file_loader(file_name, **kwargs)
if out is None:
raise IOError(f"Error reading {file_name}.")
pages, meta_dict, session = out
except UnderDefined as e:
logging.critical(f"could not load {file_name}")
raise UnderDefined from e
logging.debug("got pages and meta_dict")
self.pages = pages
self.session = session
self.file_name = file_name
self._prm_packer(meta_dict)
if paginate:
self.generate_folder_names()
self.paginate()
[docs]
def from_frame(self, frame, name=None, project=None, paginate=None, **kwargs):
if name is not None:
self.name = name
if project is not None:
self.project = project
self.pages = self._clean_pages(frame)
for hdr in hdr_journal.values():
if hdr not in self.pages.columns:
self.pages[hdr] = None
if hdr_journal.filename in self.pages.columns:
self.pages = self.pages.set_index(hdr_journal.filename)
if paginate is None:
if self.name and self.project:
paginate = True
if paginate:
logging.info(f"paginating {project}/{name} ")
self.generate_folder_names()
self.paginate()
[docs]
def from_file_old(self, file_name=None):
"""Loads a DataFrame with all the needed info about the experiment"""
file_name = self._check_file_name(file_name)
with open(file_name, "r") as infile:
top_level_dict = json.load(infile)
pages_dict = top_level_dict["info_df"]
pages = pd.DataFrame(pages_dict)
pages[hdr_journal.cellpy_file_name] = pages[hdr_journal.cellpy_file_name].apply(
self._fix_cellpy_paths
)
self.pages = pages
self.file_name = file_name
self._prm_packer(top_level_dict["metadata"])
self.generate_folder_names()
self.paginate()
[docs]
def create_empty_pages(self, description=None):
if description is not None:
print(f"Creating from {type(description)} is not implemented yet")
logging.debug("Creating an empty journal")
logging.debug(f"name: {self.name}")
logging.debug(f"project: {self.project}")
col_names = list(hdr_journal.values())
pages = pd.DataFrame(columns=col_names)
pages.set_index(hdr_journal.filename, inplace=True)
return pages
[docs]
def duplicate_journal(self, folder=None) -> None:
"""Copy the journal to folder.
Args:
folder (str or pathlib.Path): folder to copy to (defaults to the
current folder).
"""
logging.debug(f"duplicating journal to folder {folder}")
journal_name = pathlib.Path(self.file_name)
if not journal_name.is_file():
logging.info("No journal saved")
return
new_journal_name = journal_name.name
if folder is not None:
new_journal_name = pathlib.Path(folder) / new_journal_name
try:
shutil.copy(journal_name, new_journal_name)
except shutil.SameFileError:
logging.debug("same file exception encountered")
[docs]
def to_file(
self,
file_name=None,
paginate=True,
to_project_folder=False,
duplicate_to_project_folder=False,
duplicate_to_local_folder=None, # Deprecated: for backward compatibility
):
"""Saves a DataFrame with all the needed info about the experiment.
Args:
file_name (str or pathlib.Path): journal file name (.json or .xlsx)
paginate (bool): make project folders
to_project_folder (bool): if True, save journal file to prms.Paths.outdatadir/project/
(default False, saves to current directory)
duplicate_to_project_folder (bool): if True, copy journal file to prms.Paths.outdatadir/project/
after saving to current directory (default False)
duplicate_to_local_folder (bool): Deprecated. For backward compatibility only.
If provided, it is ignored since the default behavior now saves to current directory.
Returns:
None
"""
# Backward compatibility: handle deprecated parameter
if duplicate_to_local_folder is not None:
warnings.warn(
"duplicate_to_local_folder is deprecated. "
"The default behavior now saves to current directory. "
"Use duplicate_to_project_folder=True to copy to project folder.",
DeprecationWarning,
stacklevel=2,
)
# Old parameter is ignored since new default (save to current directory) matches old behavior
file_name = self._check_file_name(
file_name, to_project_folder=to_project_folder
)
pages = self.pages
session = self.session
meta = self._prm_packer()
top_level_dict = {"info_df": pages, "metadata": meta, "session": session}
is_json = False
is_xlsx = False
if file_name.suffix == ".xlsx":
is_xlsx = True
if file_name.suffix == ".json":
is_json = True
if is_xlsx:
df_session = self._pack_session(session)
df_meta = self._pack_meta(meta)
try:
pages.index.name = "filename"
with pd.ExcelWriter(file_name, mode="w", engine="openpyxl") as writer:
pages.to_excel(writer, sheet_name="pages", engine="openpyxl")
# no index is not supported for multi-index (update to index=False when pandas implement it):
df_session.to_excel(writer, sheet_name="session", engine="openpyxl")
df_meta.to_excel(
writer, sheet_name="meta", engine="openpyxl", index=False
)
except PermissionError as e:
print(f"Could not load journal to xlsx ({e})")
if is_json:
def json_default_handler(obj):
"""Handle objects that can't be serialized by default JSON encoder."""
if isinstance(obj, pathlib.Path):
return str(obj)
elif hasattr(obj, "to_json"):
# For pandas DataFrames and Series
return json.loads(obj.to_json(default_handler=str))
else:
# Fallback to string representation
return str(obj)
jason_string = json.dumps(
top_level_dict,
default=json_default_handler,
)
with open(file_name, "w") as outfile:
outfile.write(jason_string)
self.file_name = file_name
logging.info(f"Saved file to {file_name}")
if paginate:
self.paginate()
if duplicate_to_project_folder:
# Copy to prms.Paths.outdatadir/project/ if requested
if self.project:
out_data_dir = prms.Paths.outdatadir
project_dir = pathlib.Path(out_data_dir) / self.project
project_dir.mkdir(parents=True, exist_ok=True)
target_file = project_dir / file_name.name
try:
shutil.copy(file_name, target_file)
logging.info(f"Copied journal to project folder: {target_file}")
except shutil.SameFileError:
logging.debug("same file exception encountered")
except Exception as e:
logging.warning(f"Could not copy journal to project folder: {e}")
@staticmethod
def _pack_session(session):
frames = []
keys = []
try:
l_bad_cycle_numbers = []
for k, v in session["bad_cycles"].items():
l_bad_cycle_numbers.append(pd.DataFrame(data=v, columns=[k]))
df_bad_cycle_numbers = (
pd.concat(l_bad_cycle_numbers, axis=1)
.melt(var_name="cell_name", value_name="cycle_index")
.dropna()
)
frames.append(df_bad_cycle_numbers)
keys.append("bad_cycles")
except (KeyError, AttributeError):
logging.debug("missing bad cycle numbers")
df_bad_cells = pd.DataFrame(session["bad_cells"], columns=["cell_name"])
frames.append(df_bad_cells)
keys.append("bad_cells")
df_starred = pd.DataFrame(session["starred"], columns=["cell_name"])
frames.append(df_starred)
keys.append("starred")
df_notes = pd.DataFrame(session["notes"], columns=["txt"])
frames.append(df_notes)
keys.append("notes")
session = pd.concat(frames, axis=1, keys=keys)
return session
@staticmethod
def _pack_meta(meta):
meta = pd.DataFrame(meta, index=[0]).melt(
var_name="parameter", value_name="value"
)
return meta
[docs]
def generate_folder_names(self, use_outdatadir=False):
"""Set appropriate folder names.
Args:
use_outdatadir (bool): if True, use prms.Paths.outdatadir as base for all directories,
otherwise use current_directory as base.
"""
logging.debug("creating folder names")
if use_outdatadir: # Deprecated: for backward compatibility only
if self.project and isinstance(self.project, (pathlib.Path, str)):
logging.debug("got project name")
logging.debug(self.project)
self.project_dir = os.path.join(prms.Paths.outdatadir, self.project)
else:
logging.critical(
"Could not create project dir (missing project definition)"
)
if self.name and self.project_dir:
self.batch_dir = os.path.join(self.project_dir, self.name)
self.raw_dir = os.path.join(self.batch_dir, "raw_data")
else:
logging.critical(
"Could not create batch_dir and raw_dir (missing batch name or project directory)"
)
else:
self.project_dir = pathlib.Path.cwd()
if self.project and isinstance(self.project, (pathlib.Path, str)):
self.legacy_project_dir = os.path.join(
prms.Paths.outdatadir, self.project
)
else:
logging.critical(
"Could not create legacy project directory (missing project definition)"
)
if self.name and self.project_dir:
self.batch_dir = (
pathlib.Path(self.project_dir) / DEFAULT_OUTPUT_DIR_NAME
)
self.raw_dir = self.batch_dir / "raw_data"
else:
logging.critical(
"Could not create batch_dir and raw_dir (missing batch name or project directory)"
)
logging.debug(f"batch dir: {self.batch_dir}")
logging.debug(f"project dir: {self.project_dir}")
logging.debug(f"raw dir: {self.raw_dir}")
def _pages_add_column(
self, column_name: str, column_data: Any = None, overwrite=False
):
if column_name in self.pages.columns and not overwrite:
logging.critical(f"column {column_name} already exists")
logging.critical("use overwrite=True to overwrite")
return
self.pages[column_name] = column_data
def _pages_remove_column(self, column_name: str) -> None:
if column_name in self.pages.columns:
self.pages.drop(column_name, axis=1, inplace=True)
else:
logging.critical(f"column {column_name} does not exist")
def _pages_rename_column(self, old_column_name: str, new_column_name: str) -> None:
if old_column_name in self.pages.columns:
self.pages.rename(columns={old_column_name: new_column_name}, inplace=True)
else:
logging.critical(f"column {old_column_name} does not exist")
def _pages_set_columns_first(self, column_names: list) -> None:
_column_names = []
number_of_columns = self.pages.shape[1]
for column_name in column_names:
if column_name not in self.pages.columns:
logging.critical(f"column {column_name} does not exist")
else:
_column_names.append(column_name)
column_names = _column_names + [
col for col in self.pages.columns if col not in _column_names
]
if number_of_columns != len(column_names):
logging.critical("number of columns changed")
else:
self.pages = self.pages[column_names]
def _pages_set_column_first(self, column_name: str) -> None:
if column_name not in self.pages.columns:
logging.critical(f"column {column_name} does not exist")
return
column_names = list(self.pages.columns)
column_names.remove(column_name)
column_names = [column_name] + column_names
self.pages = self.pages[column_names]
def _pages_set_value(self, cell_name, column_name, value):
if column_name not in self.pages.columns:
logging.critical(f"column {column_name} does not exist")
return
if cell_name not in self.pages.index:
logging.critical(f"cell {cell_name} does not exist")
return
self.pages.loc[cell_name, column_name] = value
[docs]
def select_all(self) -> None:
"""Select all cells."""
self.pages["selected"] = 1
[docs]
def unselect_all(self) -> None:
"""Unselect all cells."""
self.pages["selected"] = 0
[docs]
def select_group(self, group: int) -> None:
"""Toggle the selected status of a group of cells."""
if group not in self.pages[hdr_journal.group].unique():
logging.critical(f"group {group} does not exist")
return
self.pages.loc[self.pages[hdr_journal.group] == group, "selected"] = 1
[docs]
def unselect_group(self, group: int) -> None:
"""Toggle the selected status of a group of cells."""
if group not in self.pages[hdr_journal.group].unique():
logging.critical(f"group {group} does not exist")
return
self.pages.loc[self.pages[hdr_journal.group] == group, "selected"] = 0
[docs]
def toggle_selected(self, cell_name: str) -> None:
"""Toggle the selected status of a cell."""
if cell_name in self.pages.index:
if self.pages.loc[cell_name, "selected"]:
self.pages.loc[cell_name, "selected"] = 0
else:
self.pages.loc[cell_name, "selected"] = 1
[docs]
def select_by(self, criterion: str) -> None:
"""Select only cells based on criterion as used by the pandas query method."""
df = self.pages.copy()
try:
selected = df.query(criterion).index
df.loc[df.index.isin(selected), "selected"] = 1
df.loc[~df.index.isin(selected), "selected"] = 0
except Exception as e:
logging.critical(f"Could not select by {criterion}: {e}")
return
self.pages = df
[docs]
def select_distinct(self, column_name: str, value: Union[str, int, float]) -> None:
"""Select cells based on a column value."""
if column_name not in self.pages.columns:
logging.critical(f"column {column_name} does not exist")
return
criterion = self.pages[column_name] == value
self.pages.loc[criterion, "selected"] = 1
[docs]
def update_group_labels(self, group_labels: dict) -> None:
if hdr_journal.group in self.pages.columns:
self.pages[hdr_journal.group] = self.pages[hdr_journal.group].apply(
lambda x: group_labels.get(x, x)
)
def _cellpy_filenames_to_cellpy_directory(
self, cellpy_directory: pathlib.Path = None
) -> None:
if cellpy_directory is None:
cellpy_directory = prms.Paths.cellpydatadir
try:
pages = self.pages.copy()
pages[hdr_journal.cellpy_file_name] = self.pages[
hdr_journal.cellpy_file_name
].apply(lambda x: cellpy_directory / pathlib.Path(x).name)
self.pages = pages
except Exception as e:
logging.critical(f"Could not update cellpy file names: {e}")
def _cellpy_filenames_check(self, verbose: bool = False) -> None:
if hdr_journal.cellpy_file_name in self.pages.columns:
cellpy_files = self.pages[hdr_journal.cellpy_file_name]
for cellpy_file in cellpy_files:
if not pathlib.Path(cellpy_file).is_file():
logging.debug(f"Cellpy file {cellpy_file} not found")
if verbose:
print(f"Cellpy file {cellpy_file} not found")
else:
logging.debug(f"Cellpy file {cellpy_file} found")
if verbose:
print(f"Cellpy file {cellpy_file} found")
[docs]
def paginate(self) -> tuple[str, str, str]:
"""Make folders where we would like to put results etc."""
project_dir = self.project_dir
legacy_project_dir = self.legacy_project_dir
raw_dir = self.raw_dir
batch_dir = self.batch_dir
if project_dir is None:
raise UnderDefined("no project directory defined")
if raw_dir is None:
raise UnderDefined("no raw directory defined")
if batch_dir is None:
raise UnderDefined("no batch directory defined")
if legacy_project_dir is None:
logging.debug("no legacy project directory defined")
# create the folders (use makedirs to create parent directories if needed)
if not os.path.isdir(project_dir):
os.makedirs(project_dir, exist_ok=True)
logging.info(f"created folder {project_dir}")
if not os.path.isdir(batch_dir):
os.makedirs(batch_dir, exist_ok=True)
logging.info(f"created folder {batch_dir}")
if not os.path.isdir(raw_dir):
os.makedirs(raw_dir, exist_ok=True)
logging.info(f"created folder {raw_dir}")
if legacy_project_dir and not os.path.isdir(legacy_project_dir):
os.makedirs(legacy_project_dir, exist_ok=True)
logging.info(f"created folder {legacy_project_dir}")
self.project_dir = project_dir
self.batch_dir = batch_dir
self.raw_dir = raw_dir
return project_dir, batch_dir, raw_dir
[docs]
def generate_file_name(self, to_project_folder=False) -> None:
"""generate a suitable file name for the experiment
Args:
to_project_folder (bool): if True, save to prms.Paths.outdatadir/project/,
otherwise save to current directory (default False)
"""
if not self.name:
raise UnderDefined("batch name not given")
file_name = f"cellpy_batch_{self.name}.json"
if to_project_folder:
if not self.project:
raise UnderDefined("project name not given")
out_data_dir = prms.Paths.outdatadir
project_dir = pathlib.Path(out_data_dir) / self.project
self.file_name = project_dir / file_name
else:
# Default: save to current directory
self.file_name = pathlib.Path.cwd() / file_name
# v.1.3.0:
[docs]
def look_for_file(self):
pass
[docs]
def get_column(self, header):
"""populate new column from db"""
pass
[docs]
def get_cell(self, id_key):
"""get additional cell info from db"""
pass
[docs]
def remove_cell(self, cell_id):
pass
[docs]
def add_cell(self, cell_id, **kwargs):
"""Add a cell to the pages"""
pass
def _dev_journal_loading():
from cellpy import log
log.setup_logging(default_level="DEBUG")
journal_file = pathlib.Path(
"../../../testdata/batch_project/test_project.json"
).resolve()
assert journal_file.is_file()
logging.debug(f"reading journal file {journal_file}")
journal = LabJournal(db_reader=None)
journal.from_file(journal_file, paginate=False)
print(80 * "-")
print(journal.pages)
print(80 * "-")
print(journal.session)
# creating a mock session
bad_cycle_numbers = {
"20160805_test001_45_cc": [4, 337, 338],
"20160805_test001_47_cc": [7, 8, 9],
}
bad_cells = ["20160805_test001_45_cc"]
notes = {"date_stamp": "one comment for the road", "date_stamp2": "another comment"}
session = {
"bad_cycle_numbers": bad_cycle_numbers,
"bad_cells": bad_cells,
"notes": notes,
}
# journal.session = session
new_journal_name = journal_file.with_name(f"{journal_file.stem}_tmp.xlsx")
print(new_journal_name)
journal.to_file(file_name=new_journal_name, paginate=False, to_project_folder=False)
if __name__ == "__main__":
print(" running journal ".center(80, "-"))
_dev_journal_loading()
print(" finished ".center(80, "-"))