"""arbin MS SQL Server data for MITS 7.0"""
import datetime
import logging
import os
import platform
import shutil
import sys
import tempfile
import time
import warnings
import sqlalchemy
import urllib
import numpy as np
import pandas as pd
import pyodbc
from dateutil.parser import parse
from cellpy import prms
from cellpy.parameters.internal_settings import HeaderDict, get_headers_normal
from cellpy.readers.core import (
Data,
FileID,
check64bit,
humanize_bytes,
xldate_as_datetime,
)
from cellpy.readers.instruments.base import BaseLoader
# TODO: @muhammad - get more meta data from the SQL db
# TODO: @jepe - update the batch functionality (including filefinder)
# TODO: @muhammad - make routine for "setting up the SQL Server" so that it is accessible and document it
[docs]
DEBUG_MODE = prms.Reader.diagnostics # not used
[docs]
ALLOW_MULTI_TEST_FILE = prms._allow_multi_test_file # not used
[docs]
SEARCH_FOR_ODBC_DRIVERS = prms._search_for_odbc_driver # not used
[docs]
SQL_SERVER = prms.Instruments.Arbin["SQL_server"]
[docs]
SQL_UID = prms.Instruments.Arbin["SQL_UID"]
[docs]
SQL_PWD = prms.Instruments.Arbin["SQL_PWD"]
[docs]
SQL_DRIVER = prms.Instruments.Arbin["SQL_Driver"]
# Names of the tables in the SQL Server db that is used by cellpy
# Not used anymore - maybe use a similar dict for the SQL table names (they are hard-coded at the moment)
[docs]
TABLE_NAMES = {
"normal": "Channel_Normal_Table",
"global": "Global_Table",
"statistic": "Channel_Statistic_Table",
"aux_global": "Aux_Global_Data_Table",
"aux": "Auxiliary_Table",
}
# Contains several headers not encountered yet in the Arbin SQL Server tables
# Contains several headers not encountered yet in the Arbin SQL Server tables
# Arbin SQL Server table headers (for both data df and stats df)
# --------------------------------------------------------------
# Test_ID
# Channel_ID
# Date_Time
# Data_Point
# Test_Time
# Step_Time
# Cycle_ID
# Step_ID
# Current
# Voltage
# Charge_Capacity
# Discharge_Capacity
# Charge_Energy
# Discharge_Energy
# Data_Flags
# Test_Name
# (all these headers are now implemented in the internal_settings
[docs]
def from_arbin_to_datetime(n):
if isinstance(n, int):
n = str(n)
ms_component = n[-7:]
date_time_component = n[:-7]
temp = f"{date_time_component}.{ms_component}"
datetime_object = datetime.datetime.fromtimestamp(float(temp))
time_in_str = datetime_object.strftime("%y-%m-%d %H:%M:%S:%f")
return time_in_str
[docs]
class DataLoader(BaseLoader):
"""Class for loading arbin-data from MS SQL server."""
_is_db = True
def __init__(self, *args, **kwargs):
"""initiates the ArbinSQLLoader class"""
self.arbin_headers_normal = (
get_headers_normal()
) # the column headers defined by Arbin
self.cellpy_headers_normal = (
get_headers_normal()
) # the column headers defined by cellpy
self.arbin_headers_global = self.get_headers_global()
self.arbin_headers_aux_global = self.get_headers_aux_global()
self.arbin_headers_aux = self.get_headers_aux()
self.current_chunk = 0 # use this to set chunks to load
self.server = SQL_SERVER
@staticmethod
@staticmethod
@staticmethod
@staticmethod
@staticmethod
[docs]
def get_raw_units():
raw_units = dict()
raw_units["current"] = "A"
raw_units["charge"] = "Ah"
raw_units["mass"] = "g"
raw_units["voltage"] = "V"
return raw_units
@staticmethod
[docs]
def get_raw_limits():
"""returns a dictionary with resolution limits"""
raw_limits = dict()
raw_limits["current_hard"] = 0.000_000_000_000_1
raw_limits["current_soft"] = 0.000_01
raw_limits["stable_current_hard"] = 2.0
raw_limits["stable_current_soft"] = 4.0
raw_limits["stable_voltage_hard"] = 2.0
raw_limits["stable_voltage_soft"] = 4.0
raw_limits["stable_charge_hard"] = 0.001
raw_limits["stable_charge_soft"] = 5.0
raw_limits["ir_change"] = 0.00001
return raw_limits
[docs]
def loader(self, name, **kwargs):
"""returns a Data object with loaded data.
Loads data from arbin SQL server db.
Args:
name (str): name of the test
Returns:
new_tests (list of data objects)
"""
warnings.warn(
"This loader is under development and might be missing some features."
)
# new_tests = []
# chonmj: seems to be broken at the moment. cellreader assumes a
# datatype "loader", not a list. removing the list for now.
# jepegit: Correct, this is no longer supported.
data_df, meta_data = self._query_sql()
aux_data_df = None # Needs to be implemented
# init data
# selecting only one value (might implement id selection later)
test_id = meta_data["Test_ID"].iloc[0]
id_name = f"{SQL_SERVER}:{self.name}:{test_id}"
channel_id = meta_data["IV_Ch_ID"][0]
data = Data()
data.loaded_from = id_name
data.channel_index = channel_id
data.test_ID = test_id
data.test_name = self.name
# The following metadata is not implemented yet for SQL loader:
data.channel_number = None
data.item_ID = None
# Implemented metadata:
data.schedule_file_name = meta_data["Schedule_File_Name"][0]
# TODO: convert to datetime:
data.start_datetime = meta_data["First_Start_DateTime"][0]
data.creator = meta_data["Creator"][0]
# Generating a FileID project:
self.generate_fid()
data.raw_data_files.append(self.fid)
data.raw = data_df
data.raw_data_files_length.append(len(data_df))
data = self._post_process(data)
# TODO: implement this:
# data = self.identify_last_data_point(data)
return data
def _post_process(self, data, **kwargs):
# TODO: move this to parent
logging.debug(f"{kwargs=}")
fix_datetime = kwargs.pop("fix_datetime", True)
set_index = kwargs.pop("set_index", True)
rename_headers = kwargs.pop("rename_headers", True)
extract_start_datetime = kwargs.pop("extract_start_datetime", True)
# TODO: insert post-processing and div tests here
# - check dtypes
# Remark that we also set index during saving the file to hdf5 if
# it is not set.
from pprint import pprint
if rename_headers:
logging.debug("rename headers: True")
columns = {}
for key in self.arbin_headers_normal:
old_header = normal_headers_renaming_dict.get(key, None)
new_header = self.cellpy_headers_normal[key]
if old_header:
columns[old_header] = new_header
logging.debug(
f"processing cellpy normal header key '{key}':"
f" old_header='{old_header}' -> new_header='{new_header}'"
)
logging.debug(f"renaming dict: {columns}")
data.raw.rename(index=str, columns=columns, inplace=True)
try:
columns = {}
for key, old_header in summary_headers_renaming_dict.items():
try:
columns[old_header] = self.cellpy_headers_normal[key]
except KeyError:
columns[old_header] = old_header.lower()
data.summary.rename(index=str, columns=columns, inplace=True)
except Exception as e:
logging.debug(f"Could not rename summary df ::\n{e}")
if fix_datetime:
logging.debug("fix date_time: true")
h_datetime = self.cellpy_headers_normal.datetime_txt
logging.debug("converting to datetime format")
data.raw[h_datetime] = data.raw[h_datetime].apply(from_arbin_to_datetime)
h_datetime = h_datetime
if h_datetime in data.summary:
data.summary[h_datetime] = data.summary[h_datetime].apply(
from_arbin_to_datetime
)
# if set_index:
# hdr_data_point = self.cellpy_headers_normal.data_point_txt
# if data.raw.index.name != hdr_data_point:
# data.raw = data.raw.set_index(hdr_data_point, drop=False)
if extract_start_datetime:
hdr_date_time = self.arbin_headers_normal.datetime_txt
data.start_datetime = parse("20" + data.raw[hdr_date_time].iat[0][:-7])
return data
def _query_sql(self):
# TODO: refactor and include optional SQL arguments
name = self.name
name_str = f"('{name}', '')"
# prepare engine
params = urllib.parse.quote_plus(
f"DRIVER={SQL_DRIVER};"
f"SERVER={SQL_SERVER};"
f"DATABASE=ArbinMasterData;"
f"UID={SQL_UID};"
f"PWD={SQL_PWD}"
)
# Create engine to SQL server using SQLAlchemy (mssql+pyodbc)
con_url = "mssql+pyodbc:///?odbc_connect={}".format(params)
engine = sqlalchemy.create_engine(con_url)
# Initial query to obtain metadata info on cell with 'name'
master_q = (
"SELECT ArbinMasterData.dbo.TestIVChList_Table.*, "
"ArbinMasterData.dbo.TestList_Table.* FROM "
"ArbinMasterData.dbo.TestIVChList_Table "
"JOIN ArbinMasterData.dbo.TestList_Table "
"ON ArbinMasterData.dbo.TestIVChList_Table.Test_ID = "
"ArbinMasterData.dbo.TestList_Table.Test_ID "
f"WHERE ArbinMasterData.dbo.TestList_Table.Test_Name IN {name_str}"
)
with engine.connect() as connection:
meta_data = pd.read_sql(master_q, connection)
# drop duplicate columns
meta_data = meta_data.loc[:, ~meta_data.columns.duplicated()].copy()
# query data
datas_df = []
for index, row in meta_data.iterrows():
# TODO: use variables - see above
# TODO: consider to use f-strings
# MITS 7 organizes raw channel data by Channel_ID and Date_Time, so the
# data query requires that we filter by these tables from the events table.
# Also, Date_Time is 7 orders higher than standard datetime, hence the additional
# zeroes in the query.
data_query = (
"SELECT " + str(row["Database_Name"]) + f".dbo.Channel_RawData_Table.* "
"FROM " + str(row["Database_Name"]) + ".dbo.Channel_RawData_Table "
" WHERE "
+ str(row["Database_Name"])
+ ".dbo.Channel_RawData_Table.Channel_ID = "
+ str(row["IV_Ch_ID"])
+ " AND "
+ str(row["Database_Name"])
+ ".dbo.Channel_RawData_Table.Date_Time >= "
+ str(row["First_Start_DateTime"])
+ str("0000000")
+ " AND "
+ str(row["Database_Name"])
+ ".dbo.Channel_RawData_Table.Date_Time <= "
+ str(row["Last_End_DateTime"])
+ str("0000000")
)
with engine.connect() as connection:
raw_df = pd.read_sql(data_query, connection)
# sort dataframe via pivot table:
datas_df.append(
raw_df.pivot(
index="Date_Time", columns="Data_Type", values="Data_Value"
).reset_index()
)
# convert column headers to strings
datas_df[index].columns = datas_df[index].columns.astype(str)
# TODO: rename columns
# 21: PV_Voltage
# 22: PV_Current
# 23: PV_Charge_Capacity
# 24: PV_Discharge_Capacity
# 25: PV_Charge_Energy
# 26: PV_Discharge_Energy
# 27: PV_dVdt
# 30: PV_InternalResistance
# Full column key found in 'SQL Table IDs.txt' file.
data_df = pd.concat(datas_df, axis=0)
return data_df, meta_data
def _check_sql_loader(server: str = None, tests: list = None):
test_name = tuple(tests) + ("",) # neat trick :-)
print(f"** test str: {test_name}")
con_str = "Driver={SQL Server};Server=" + server + ";Trusted_Connection=yes;"
master_q = (
"SELECT Database_Name, Test_Name FROM "
"ArbinMasterData.dbo.TestList_Table WHERE "
f"ArbinMasterData.dbo.TestList_Table.Test_Name IN {test_name}"
)
conn = pyodbc.connect(con_str)
print("** connected to server")
sql_query = pd.read_sql_query(master_q, conn)
print("** SQL query:")
print(sql_query)
for index, row in sql_query.iterrows():
# Muhammad, why is it a loop here?
print(f"** index: {index}")
print(f"** row: {row}")
data_query = (
"SELECT "
+ str(row["Database_Name"])
+ ".dbo.IV_Basic_Table.*, ArbinPro8MasterInfo.dbo.TestList_Table.Test_Name "
"FROM " + str(row["Database_Name"]) + ".dbo.IV_Basic_Table "
"JOIN ArbinPro8MasterInfo.dbo.TestList_Table "
"ON "
+ str(row["Database_Name"])
+ ".dbo.IV_Basic_Table.Test_ID = ArbinPro8MasterInfo.dbo.TestList_Table.Test_ID "
"WHERE ArbinPro8MasterInfo.dbo.TestList_Table.Test_Name IN "
+ str(test_name)
)
stat_query = (
"SELECT "
+ str(row["Database_Name"])
+ ".dbo.StatisticData_Table.*, ArbinPro8MasterInfo.dbo.TestList_Table.Test_Name "
"FROM " + str(row["Database_Name"]) + ".dbo.StatisticData_Table "
"JOIN ArbinPro8MasterInfo.dbo.TestList_Table "
"ON "
+ str(row["Database_Name"])
+ ".dbo.StatisticData_Table.Test_ID = ArbinPro8MasterInfo.dbo.TestList_Table.Test_ID "
"WHERE ArbinPro8MasterInfo.dbo.TestList_Table.Test_Name IN "
+ str(test_name)
)
print(f"** data query: {data_query}")
print(f"** stat query: {stat_query}")
# if looping, maybe these should be concatenated?
data_df = pd.read_sql_query(data_query, conn)
stat_df = pd.read_sql_query(stat_query, conn)
return data_df, stat_df
def _check_query():
import pathlib
name = ["20201106_HC03B1W_1_cc_01"]
dd, ds = check_sql_loader(SQL_SERVER, name)
out = pathlib.Path(r"C:\scripts\notebooks\Div")
dd.to_clipboard()
input("x")
ds.to_clipboard()
def _check_loader():
print(" Testing connection to arbin sql server ".center(80, "-"))
sql_loader = DataLoader()
name = "20201106_HC03B1W_1_cc_01"
cell = sql_loader.loader(name)
return cell
def _check_loader_from_outside():
import matplotlib.pyplot as plt
from cellpy import cellreader
name = "20200820_CoFBAT_slurry07B_01_cc_01"
c = cellreader.CellpyData()
c.set_instrument("arbin_sql")
# print(c)
c.from_raw(name)
# print(c)
c.make_step_table()
c.make_summary()
# print(c)
raw = c.cell.raw
steps = c.cell.steps
summary = c.cell.summary
raw.to_csv(r"C:\scripting\trash\raw.csv", sep=";")
steps.to_csv(r"C:\scripting\trash\steps.csv", sep=";")
summary.to_csv(r"C:\scripting\trash\summary.csv", sep=";")
n = c.get_number_of_cycles()
print(f"number of cycles: {n}")
cycle = c.get_cap(1, method="forth")
print(cycle.head())
cycle.plot(x="capacity", y="voltage")
plt.show()
def _check_get():
import cellpy
name = "20200820_CoFBAT_slurry07B_01_cc_01"
c = cellpy.get(name, instrument="arbin_sql")
print(c)
if __name__ == "__main__":
# test_query()
# cell = test_loader()
_check_get()