import abc
import logging
import os
import pathlib
import re
import tempfile
import time
import warnings
from dataclasses import asdict
from datetime import datetime
import numpy as np
import pandas as pd
from typing import List
from typing import Optional
from cellpy.parameters import prms
from cellpy.readers.core import BaseDbReader
# logger = logging.getLogger(__name__)
[docs]class DbSheetCols:
# Note to developers: this should only be used for this Excell reader
# (it works, and that is its only reason to still exist)
def __init__(self):
db_cols_from_prms = asdict(prms.DbCols)
self.keys = []
self.headers = []
for table_key, value in db_cols_from_prms.items():
if isinstance(value, (list, tuple)):
value = value[0]
setattr(self, table_key, value)
self.keys.append(table_key)
self.headers.append(value)
def __repr__(self):
return f"<DbCols: {self.__dict__}>"
[docs]class Reader(BaseDbReader):
def __init__(
self,
db_file=None,
db_datadir=None,
db_datadir_processed=None,
db_frame=None,
batch=None,
batch_col_name=None,
):
"""Simple excel reader.
Args:
db_file (str, pathlib.Path): xlsx-file to read.
db_datadir(str, pathlib.Path): path where raw date is located.
db_datadir_processed (str, pathlib.Path): path where cellpy files are located.
db_frame (pandas.DataFrame): use this instead of reading from xlsx-file.
batch (str): batch name to use.
batch_col_name (str): name of the column in the db-file that contains the batch name.
"""
self.db_sheet_table = prms.Db.db_table_name
self.db_header_row = prms.Db.db_header_row
self.db_unit_row = prms.Db.db_unit_row
self.db_data_start_row = prms.Db.db_data_start_row
self.db_search_start_row = prms.Db.db_search_start_row
self.db_search_end_row = prms.Db.db_search_end_row
self.db_sheet_cols = DbSheetCols()
self.selected_batch = None
if not db_datadir:
self.db_datadir = prms.Paths.rawdatadir
else:
self.db_datadir = db_datadir
if not db_datadir_processed:
self.db_datadir_processed = prms.Paths.cellpydatadir
else:
self.db_datadir_processed = db_datadir_processed
if not db_file:
self.db_path = prms.Paths.db_path
self.db_filename = prms.Paths.db_filename
self.db_file = os.path.join(self.db_path, self.db_filename)
else:
self.db_path = os.path.dirname(db_file)
self.db_filename = os.path.basename(db_file)
self.db_file = db_file
self.headers = self.db_sheet_cols.headers
if db_frame is not None:
print("Using frame instead of file")
self.table = db_frame.copy()
else:
self.skiprows, self.nrows = self._find_out_what_rows_to_skip()
logging.debug("opening sheet")
self.table = self._open_sheet()
if batch:
self.selected_batch = self.select_batch(
batch, batch_col_name=batch_col_name
)
logging.debug("got table")
logging.debug(self.table)
def __str__(self):
newline = "\n - "
txt = f"<Reader:: \n - {newline.join(self.__dict__)} \n>\n"
txt += "Reader.table.head():\n"
txt += str(self.table.head())
return txt
[docs] def select_batch(
self, batch, batch_col_name=None, case_sensitive=True, drop=True
) -> List[int]:
"""Selects the rows in column batch_col_number.
Args:
batch: batch to select
batch_col_name: column name to use for batch selection (default: DbSheetCols.batch).
case_sensitive: if True, the batch name must match exactly (default: True).
drop: if True, all un-selected rows are dropped from the table (default: True).
Returns:
List of row indices
"""
if self.selected_batch is None:
return self._select_batch(
batch,
batch_col_name=batch_col_name,
case_sensitive=case_sensitive,
drop=drop,
)
else:
return self.selected_batch
def _select_batch(self, batch, batch_col_name=None, case_sensitive=True, drop=True):
if not batch_col_name:
batch_col_name = self.db_sheet_cols.batch
logging.debug("selecting batch - %s" % batch)
sheet = self.table
identity = self.db_sheet_cols.id
exists_col_number = self.db_sheet_cols.exists
if case_sensitive:
criterion = sheet.loc[:, batch_col_name] == batch
else:
criterion = (sheet.loc[:, batch_col_name]).upper() == batch.upper()
exists = sheet.loc[:, exists_col_number] > 0
# This will crash if the col is not of dtype number
sheet = sheet[criterion & exists]
if drop:
self.table = sheet
return sheet.loc[:, identity].values.astype(int)
[docs] def from_batch(
self,
batch_name: str,
include_key: bool = False,
include_individual_arguments: bool = False,
) -> dict:
raise NotImplementedError("This method is not implemented for this reader")
@staticmethod
def _parse_argument_str(argument_str: str) -> Optional[dict]:
# the argument str must be on the form:
# "keyword-1=value-1;keyword-2=value2"
if argument_str is None:
return
sep = ";"
parts = [part.strip() for part in argument_str.split(sep=sep)]
sep = "="
arguments = {}
for p in parts:
k, v = p.split(sep=sep)
arguments[k.strip()] = v.strip()
return arguments
@staticmethod
def _extract_date_from_cell_name(
cell_name, strf="%Y%m%d", regexp=None, splitter="_", position=0, start=0, end=12
):
"""Extract date given a cell name (or filename).
Uses regexp if given to find date txt, if not it uses splitter if splitter is not None or "",
else start-stop. Uses strf to parse for date in date txt.
if regexp is "auto", regexp is interpreted from strf
Args:
cell_name (str): extract date from.
strf (str): datetime string formatter.
regexp (str | "auto"): regular expression.
splitter (str): split parts into sub-parts.
position (int): selected sub-part.
start (int): number of first character in the date part.
end (int): number of last character in the date part.
Returns:
datetime.datetime object
"""
if regexp is not None:
if regexp == "auto":
year_r = r"%Y"
month_r = r"%m"
day_r = r"%d"
regexp = strf.replace("\\", "\\\\").replace("-", r"\-")
regexp = (
regexp.replace(year_r, "[0-9]{4}")
.replace(month_r, "[0-9]{2}")
.replace(day_r, "[0-9]{2}")
)
regexp = f"({regexp})"
m = re.search(regexp, cell_name)
datestr = m[0]
elif splitter:
datestr = cell_name.split(splitter)[position]
else:
datestr = cell_name[start:end]
try:
date = datetime.strptime(datestr, strf)
except ValueError as e:
logging.debug(e)
return None
return date
# --------not fixed from here -------------------------------
def _find_out_what_rows_to_skip(self):
if self.db_search_start_row >= self.db_data_start_row:
start_row = self.db_search_start_row
else:
start_row = self.db_data_start_row
skiprows = set(range(start_row))
try:
skiprows.remove(self.db_header_row)
except KeyError:
logging.debug(
"Trying to remove header row number"
" from skiprow, but it is not in skiprow"
)
skiprows.union((self.db_unit_row,))
if self.db_search_end_row <= 0 or self.db_search_end_row is None:
nrows = None
else:
nrows = self.db_search_end_row - start_row
return skiprows, nrows
def _lookup_unit(self, label):
units = {
"int": np.int32,
"float": np.float64,
"str": str,
"bol": bool,
"cat": str,
}
return units.get(label.lower(), object)
[docs] def pick_table(self):
"""Pick the table and return a pandas.DataFrame."""
return self.table
@staticmethod
def _select_col(df, no):
"""select specific column"""
return df.loc[:, no]
def _open_sheet(self):
"""Opens sheets and returns it"""
# Note 14.12.2020: xlrd has explicitly removed support for anything other than xls files
# Solution: install openpyxl
# df1=pd.read_excel(
# os.path.join(APP_PATH, "Data", "aug_latest.xlsm"),
# sheet_name=None,
# engine='openpyxl',
# )
table_name = self.db_sheet_table
header_row = self.db_header_row
nrows = self.nrows
rows_to_skip = self.skiprows
logging.debug(f"Trying to open the file {self.db_file}")
logging.debug(f"Number of rows (no means all): {nrows}")
logging.debug(f"Skipping the following rows: {rows_to_skip}")
work_book = pd.ExcelFile(self.db_file, engine="openpyxl")
try:
sheet = work_book.parse(
table_name,
header=header_row,
skiprows=rows_to_skip,
nrows=nrows,
)
except ValueError as e:
logging.debug(
"Could not parse all the columns (ValueError) "
"using given dtypes. Trying without dtypes."
)
logging.debug(str(e))
sheet = work_book.parse(
table_name, header=header_row, skiprows=rows_to_skip, nrows=nrows
)
return sheet
def _validate(self):
"""Checks that the db-file is ok
Returns:
True if OK, False if not.
"""
probably_good_to_go = True
sheet = self.table
identity = self.db_sheet_cols.id
# check if you have unique srnos
id_col = sheet.loc[:, identity]
if any(id_col.duplicated()):
warnings.warn(
"your database is corrupt: duplicates" " encountered in the srno-column"
)
logging.debug("srno duplicates:\n" + str(id_col.duplicated()))
probably_good_to_go = False
return probably_good_to_go
def _pick_info(self, serial_number, column_name):
row = self.select_serial_number_row(serial_number)
try:
x = self._select_col(row, column_name)
except KeyError:
warnings.warn(f"your database is missing the following key: {column_name}")
return None
else:
x = x.values
if len(x) == 1:
x = x[0]
return x
[docs] def select_serial_number_row(self, serial_number):
"""Select row for identification number serial_number
Args:
serial_number: serial number
Returns:
pandas.DataFrame
"""
sheet = self.table
col = self.db_sheet_cols.id
rows = sheet.loc[:, col] == serial_number
return sheet.loc[rows, :]
[docs] def select_all(self, serial_numbers):
"""Select rows for identification for a list of serial_number.
Args:
serial_numbers: list (or ndarray) of serial numbers
Returns:
pandas.DataFrame
"""
sheet = self.table
col = self.db_sheet_cols.id
rows = sheet.loc[:, col].isin(serial_numbers)
return sheet.loc[rows, :]
[docs] def print_serial_number_info(self, serial_number, print_to_screen=True):
"""Print information about the run.
Args:
serial_number: serial number.
print_to_screen: runs the print statement if True,
returns txt if not.
Returns:
txt if print_to_screen is False, else None.
"""
r = self.select_serial_number_row(serial_number)
if r.empty:
warnings.warn("missing serial number")
return
txt1 = 80 * "="
txt1 += "\n"
txt1 += f" serial number {serial_number}\n"
txt1 = 80 * "-"
txt1 += "\n"
txt2 = ""
for label, value in zip(r.columns, r.values[0]):
if label in self.headers:
txt1 += f"{label}: \t {value}\n"
else:
txt2 += f"({label}: \t {value})\n"
if print_to_screen:
print(txt1)
print(80 * "-")
print(txt2)
print(80 * "=")
return
else:
return txt1
[docs] def inspect_hd5f_fixed(self, serial_number):
column_name = self.db_sheet_cols.freeze
insp = self._pick_info(serial_number, column_name)
return insp
[docs] def inspect_exists(self, serial_number):
column_name = self.db_sheet_cols.exists
insp = self._pick_info(serial_number, column_name)
return insp
[docs] def get_label(self, serial_number):
column_name = self.db_sheet_cols.label
insp = self._pick_info(serial_number, column_name)
return insp
[docs] def get_area(self, serial_number):
column_name = self.db_sheet_cols.area
insp = self._pick_info(serial_number, column_name)
return insp
[docs] def get_cell_name(self, serial_number):
column_name = self.db_sheet_cols.cell_name
insp = self._pick_info(serial_number, column_name)
return insp
[docs] def get_by_column_label(self, column_name, serial_number):
insp = self._pick_info(serial_number, column_name)
return insp
[docs] def get_group(self, serial_number):
column_name = self.db_sheet_cols.group
insp = self._pick_info(serial_number, column_name)
return insp
[docs] def get_cell_type(self, serial_number):
try:
column_name = self.db_sheet_cols.cell_type
insp = self._pick_info(serial_number, column_name)
return insp
except KeyError:
logging.warning(
"Could not read the cycle mode (using value from prms instead)"
)
logging.debug(f"cycle mode: {prms.Reader.cycle_mode}")
return prms.Reader.cycle_mode
[docs] def get_loading(self, serial_number):
column_name = self.db_sheet_cols.loading
insp = self._pick_info(serial_number, column_name)
return insp
[docs] def get_areal_loading(self, serial_number):
raise NotImplementedError
[docs] def get_args(self, serial_number: int) -> dict:
column_name = self.db_sheet_cols.argument
argument_str = self._pick_info(serial_number, column_name)
try:
argument = self._parse_argument_str(argument_str)
except Exception as e:
logging.warning(f"could not parse argument str:")
logging.warning(f"{argument_str}")
logging.warning(f"Error message: {e}")
return {}
return argument
[docs] def get_mass(self, serial_number):
column_name_mass = self.db_sheet_cols.mass_active
mass = self._pick_info(serial_number, column_name_mass)
return mass
[docs] def get_nom_cap(self, serial_number):
column_name = self.db_sheet_cols.nom_cap
return self._pick_info(serial_number, column_name)
[docs] def get_experiment_type(self, serial_number):
column_name = self.db_sheet_cols.experiment_type
return self._pick_info(serial_number, column_name)
[docs] def get_instrument(self, serial_number):
column_name = self.db_sheet_cols.instrument
return self._pick_info(serial_number, column_name)
[docs] def get_total_mass(self, serial_number):
column_name_mass = self.db_sheet_cols.mass_total
total_mass = self._pick_info(serial_number, column_name_mass)
return total_mass
[docs] def get_all(self):
return self.filter_by_col([self.db_sheet_cols.id, self.db_sheet_cols.exists])
[docs] def get_fileid(self, serial_number, full_path=True): # NOT USED
column_name = self.db_sheet_cols.file_name_indicator
if not full_path:
filename = self._pick_info(serial_number, column_name)
else:
filename = os.path.join(
self.db_datadir_processed, self._pick_info(serial_number, column_name)
)
return filename
[docs] @staticmethod
def intersect(lists):
# find serial_numbers that to belong to all snro-lists in lists
# where lists = [serial_numberlist1, snrolist2, ....]
if not isinstance(lists[0], (list, tuple)):
lists = [lists]
serial_numbers = [set(a) for a in lists]
serial_numbers = set.intersection(*serial_numbers)
return serial_numbers
[docs] @staticmethod
def union(lists):
serial_numbers = [set(a) for a in lists]
serial_numbers = set.union(*serial_numbers)
return serial_numbers
[docs] @staticmethod
def subtract(list1, list2):
list1 = set(list1)
list2 = set(list2)
serial_numbers = set.difference(list1, list2)
return serial_numbers
[docs] @staticmethod
def subtract_many(list1, lists):
list_of_sets = [set(a) for a in lists]
list1 = set(list1)
serial_numbers = set.difference(list1, *list_of_sets)
return serial_numbers
[docs] def filter_selected(self, serial_numbers):
if isinstance(serial_numbers, (int, float)):
serial_numbers = [serial_numbers]
new_serial_numbers = []
column_name = self.db_sheet_cols.selected
for serial_number in serial_numbers:
insp = self._pick_info(serial_number, column_name)
if insp:
new_serial_numbers.append(serial_number)
return new_serial_numbers
[docs] def filter_by_slurry(self, slurry, appender="_"):
"""Filters sheet/table by slurry name.
Input is slurry name or list of slurry names, for example 'es030' or
["es012","es033","es031"].
Args:
slurry (str or list of strings): slurry names.
appender (chr): char that surrounds slurry names.
Returns:
List of serial_number (ints).
"""
sheet = self.table
identity = self.db_sheet_cols.id
exists = self.db_sheet_cols.exists
cellname = self.db_sheet_cols.cell_name
search_string = ""
if not isinstance(slurry, (list, tuple)):
slurry = [slurry]
first = True
for slur in slurry:
s_s = appender + slur + appender
if first:
search_string = s_s
first = False
else:
search_string += "|"
search_string += s_s
criterion = sheet.loc[:, cellname].str.contains(search_string)
exists = sheet.loc[:, exists] > 0
sheet = sheet[criterion & exists]
return sheet.loc[:, identity].values.astype(int)
[docs] def filter_by_col(self, column_names):
"""filters sheet/table by columns (input is column header)
The routine returns the serial numbers with values>1 in the selected
columns.
Args:
column_names (list): the column headers.
Returns:
pandas.DataFrame
"""
if not isinstance(column_names, (list, tuple)):
column_names = [column_names]
sheet = self.table
identity = self.db_sheet_cols.id
exists = self.db_sheet_cols.exists
criterion = True
for column_name in column_names:
_criterion = sheet.loc[:, column_name] > 0
_exists = sheet.loc[:, exists] > 0
criterion = criterion & _criterion & _exists
return sheet.loc[criterion, identity].values.astype(int)
[docs] def filter_by_col_value(self, column_name, min_val=None, max_val=None):
"""filters sheet/table by column.
The routine returns the serial-numbers with min_val <= values >= max_val
in the selected column.
Args:
column_name (str): column name.
min_val (int): minimum value of serial number.
max_val (int): maximum value of serial number.
Returns:
pandas.DataFrame
"""
sheet = self.table
identity = self.db_sheet_cols.id
exists_col_number = self.db_sheet_cols.exists
exists = sheet.loc[:, exists_col_number] > 0
if min_val is not None and max_val is not None:
criterion1 = sheet.loc[:, column_name] >= min_val
criterion2 = sheet.loc[:, column_name] <= max_val
sheet = sheet[criterion1 & criterion2 & exists]
elif min_val is not None or max_val is not None:
if min_val is not None:
criterion = sheet.loc[:, column_name] >= min_val
if max_val is not None:
criterion = sheet.loc[:, column_name] <= max_val
# noinspection PyUnboundLocalVariable
sheet = sheet[criterion & exists]
else:
sheet = sheet[exists]
return sheet.loc[:, identity].values.astype(int)