"""arbin MS SQL Server csv data"""
import pandas as pd
from dateutil.parser import parse
from cellpy import prms
from cellpy.parameters.internal_settings import HeaderDict, get_headers_normal
from cellpy.readers.core import Data, FileID
from cellpy.readers.instruments.base import BaseLoader
[docs]
DEBUG_MODE = prms.Reader.diagnostics # not used
[docs]
ALLOW_MULTI_TEST_FILE = prms._allow_multi_test_file # not used
# Not used yet - only supporting loading raw data (normal)
[docs]
FILE_NAME_POST_LABEL = {
"statistics_cycle": "_StatisticByCycle.CSV",
"statistics_step": "_StatisticBySteps.CSV",
"normal": "_Wb_1.CSV",
}
[docs]
unit_labels = {
"time": "s",
"current": "A",
"voltage": "V",
"power": "W",
"capacity": "Ah",
"energy": "Wh",
"resistance": "Ohm",
"temperature": "C",
}
[docs]
incremental_unit_labels = {
"dv_dt": f"{unit_labels['voltage']}/{unit_labels['time']}",
"dq_dv": f"{unit_labels['capacity']}/{unit_labels['voltage']}",
"dv_dq": f"{unit_labels['voltage']}/{unit_labels['capacity']}",
}
# Contains several headers not encountered yet in the Arbin SQL Server tables
[docs]
not_implemented_in_cellpy_yet_renaming_dict = {
f"Power({unit_labels['power']})": "power",
f"ACR({unit_labels['resistance']})": "acr",
f"dV/dt({incremental_unit_labels['dv_dt']})": "dv_dt",
f"dQ/dV({incremental_unit_labels['dq_dv']})": "dq_dv",
f"dV/dQ({incremental_unit_labels['dv_dq']})": "dv_dq",
}
[docs]
class DataLoader(BaseLoader):
"""Class for loading csv-exported arbin-data from MS SQL server."""
[docs]
instrument_name = "arbin_sql_csv"
def __init__(self, *args, **kwargs):
self.arbin_headers_normal = (
self.get_headers_normal()
) # the column headers defined by Arbin
self.cellpy_headers_normal = (
get_headers_normal()
) # the column headers defined by cellpy
@staticmethod
@staticmethod
@staticmethod
[docs]
def get_raw_units():
raw_units = dict()
raw_units["current"] = "A"
raw_units["charge"] = "Ah"
raw_units["mass"] = "g"
raw_units["voltage"] = "V"
return raw_units
@staticmethod
[docs]
def get_raw_limits():
"""returns a dictionary with resolution limits"""
raw_limits = dict()
raw_limits["current_hard"] = 0.000_000_000_000_1
raw_limits["current_soft"] = 0.000_01
raw_limits["stable_current_hard"] = 2.0
raw_limits["stable_current_soft"] = 4.0
raw_limits["stable_voltage_hard"] = 2.0
raw_limits["stable_voltage_soft"] = 4.0
raw_limits["stable_charge_hard"] = 0.001
raw_limits["stable_charge_soft"] = 5.0
raw_limits["ir_change"] = 0.00001
return raw_limits
# TODO: rename this (for all instruments) to e.g. load
# TODO: implement more options (bad_cycles, ...)
[docs]
def loader(self, name, **kwargs):
"""returns a Data object with loaded data.
Loads data from arbin SQL server db.
Args:
name (str): name of the file
Returns:
Data: a Data object
"""
# self.name = name
# self.copy_to_temporary()
data_df = self._query_csv(self.temp_file_path)
data = Data()
# metadata is unfortunately not available for csv dumps
data.loaded_from = self.name
data.channel_index = None
data.test_ID = None
data.test_name = self.name.name
data.creator = None
data.schedule_file_name = None
data.start_datetime = None
# Generating a FileID project:
self.generate_fid()
data.raw_data_files.append(self.fid)
data.raw = data_df
data.raw_data_files_length.append(len(data_df))
data.summary = (
pd.DataFrame()
) # creating an empty frame - loading summary is not implemented yet
data = self._post_process(data)
data = self.identify_last_data_point(data)
return data
def _post_process(self, data):
set_index = True
rename_headers = True
if rename_headers:
columns = {}
for key in self.arbin_headers_normal:
old_header = normal_headers_renaming_dict[key]
new_header = self.cellpy_headers_normal[key]
columns[old_header] = new_header
data.raw.rename(index=str, columns=columns, inplace=True)
new_aux_headers = self.get_headers_aux(data.raw)
data.raw.rename(index=str, columns=new_aux_headers, inplace=True)
data.raw.rename(
index=str,
columns=not_implemented_in_cellpy_yet_renaming_dict,
inplace=True,
)
if set_index:
hdr_data_point = self.cellpy_headers_normal.data_point_txt
if data.raw.index.name != hdr_data_point:
data.raw = data.raw.set_index(hdr_data_point, drop=False)
hdr_date_time = self.arbin_headers_normal.datetime_txt
data.start_datetime = parse(data.raw[hdr_date_time].iat[0])
return data
def _query_csv(self, name):
# NOTE! I am using the prms.Reader.sep prm as the delimiter.
data_df = pd.read_csv(name, sep=prms.Reader.sep)
return data_df
def _test_csv_loader():
import pathlib
datadir = pathlib.Path(
r"C:\scripts\cellpy\dev_data\arbin_new\2021_02_02_standardageing_1C_25dC_1_2021_02_02_130709"
)
name = datadir / "2021_02_02_standardageing_1C_25dC_1_Channel_1_Wb_1.CSV"
loader = DataLoader()
out = pathlib.Path(r"C:\scripts\notebooks\Div")
dd = loader.loader(name)
def _test_loader_from_outside():
import pathlib
import matplotlib.pyplot as plt
from cellpy import cellreader
datadir = pathlib.Path(
r"C:\scripts\cellpy\dev_data\arbin_new\2021_02_02_standardageing_1C_25dC_1_2021_02_02_130709"
)
name = datadir / "2021_02_02_standardageing_1C_25dC_1_Channel_1_Wb_1.CSV"
c = cellreader.CellpyCell()
c.set_instrument("arbin_sql_csv")
c.from_raw(name)
c.set_mass(1000)
c.make_step_table()
c.make_summary()
# raw = c.data.raw
# steps = c.data.steps
# summary = c.data.summary
# raw.to_csv(r"C:\scripts\notebooks\Div\trash\raw.csv", sep=";")
# steps.to_csv(r"C:\scripts\notebooks\Div\trash\steps.csv", sep=";")
# summary.to_csv(r"C:\scripts\notebooks\Div\trash\summary.csv", sep=";")
#
# n = c.get_number_of_cycles()
# print(f"number of cycles: {n}")
#
# cycle = c.get_cap(1, method="forth")
# print(cycle.head())
# # cycle.plot(x="capacity", y="voltage")
# # plt.show()
#
# s = c.get_step_numbers()
# t = c.sget_timestamp(1, s[1])
# v = c.sget_voltage(1, s[1])
# steps = c.sget_step_numbers(1, s[1])
#
# print("step numbers:")
# print(s)
# print("/ntesttime:")
# print(t)
# print("/nvoltage")
# print(v)
# plt.plot(t, v)
# plt.plot(t, steps)
# plt.show()
outfile = datadir / "test_out"
c.save(outfile)
def _check_seamless_files():
import pathlib
import matplotlib.pyplot as plt
from cellpy import cellreader, prms
datadir = pathlib.Path(
r"\\ad.ife.no\dfs\Org\MPT-BAT-LAB\Processed\Experiments\seamless\Raw data_excel"
)
name1 = (
datadir
/ r"20210430_seam10_01_01_cc_01_2021_04_30_172207\20210430_seam10_01_01_cc_01_Channel_48_Wb_1.csv"
)
name2 = (
datadir
/ r"20210430_seam10_01_01_cc_01_2021_04_30_172207\20210430_seam10_01_01_cc_01_Channel_48_Wb_1.csv"
)
c = cellreader.CellpyCell()
c.set_instrument("arbin_sql_csv")
prms.Reader.sep = ";"
c.from_raw(name1)
c.set_mass(0.016569)
c.make_step_table()
c.make_summary()
names = [name1, name2]
cell_data = cellreader.CellpyCell()
cell_data.set_instrument("arbin_sql_csv")
cell_data.from_raw(names, mass=0.016569)
if __name__ == "__main__":
_check_seamless_files()