Source code for iqdma.stats

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# stats.py
"""Modified DVHA-Stats for IQDM-PDF output"""
#
# Copyright (c) 2021 Dan Cutright
# This file is part of IQDM-Analytics, released under a MIT license.
#    See the file LICENSE included with this distribution, also
#    available at https://github.com/IQDM/IQDM-Analytics

from iqdma.importer import ReportImporter
import numpy as np
from iqdma.utilities_dvha_stats import import_data


[docs]class IQDMStats: """Modified DVHAStats class for IQDM-PDF output""" def __init__( self, report_file_path: str, charting_column: str, multi_val_policy: str, duplicate_detection: bool, parser: str, ): """Initialize ``IQDMStats`` Parameters ---------- report_file_path : str File path to CSV output from IQDM-PDF charting_column : str Column of y-axis data multi_val_policy : str Duplicate value policy from options duplicate_detection : bool If true, apply a multi_value policy from options parser : str CSV format """ imported_data = ReportImporter(report_file_path, parser, duplicate_detection) self.multi_val_policy = multi_val_policy data = imported_data(charting_column, self.multi_val_policy) self.uid_columns = imported_data.uid_col self.uid_data = data["uids"] self.criteria_columns = imported_data.criteria_col self.data, self.var_names = import_data(data["data"]) self.x_axis = data["x_axis"]
[docs] def get_index_description(self) -> tuple: """Get a dict of data and columns for :class:`.DataTable` Returns ------- dict Keys are column names with values being a list of values list Column names in order to be displayed """ table = {key: [] for key in self.criteria_columns} table["Index"] = list(range(len(self.var_names))) table["Reports"] = [] columns = ["Index", "Reports"] + self.criteria_columns for i, var_name in enumerate(self.var_names): counts = len(self.data[:, i][~np.isnan(self.data[:, i])]) table["Reports"].append(counts) for j, criteria in enumerate(var_name.split(" && ")): table[self.criteria_columns[j]].append(criteria) table["Index"].append(i + 1) table["Reports"].append("All") for col in self.criteria_columns: table[col].append("N/A") return table, columns
@property def variable_count(self): """Number of variables in data Returns ---------- int Number of columns in data""" return self.data.shape[1]
[docs] def get_index_by_var_name(self, var_name): """Get the variable index by var_name Parameters ---------- var_name : int, str The name (str) or index (int) of the variable of interest Returns ---------- int The column index for the given var_name """ if var_name in self.var_names: index = self.var_names.index(var_name) elif isinstance(var_name, int) and var_name in range( self.variable_count ): return var_name else: msg = "%s is not a valid var_name\n%s" % ( var_name, ",".join(self.var_names), ) raise AttributeError(msg) return index
[docs] def univariate_control_chart( self, var_name, std=3, ucl_limit=None, lcl_limit=None, range=None, ): """ Calculate control limits for a standard univariate Control Chart Parameters ---------- var_name : str, int The name (str) or index (int) of teh variable to plot std : int, float, optional Number of standard deviations used to calculate if a y-value is out-of-control ucl_limit : float, optional Limit the upper control limit to this value lcl_limit : float, optional Limit the lower control limit to this value Returns ---------- stats.ControlChart stats.ControlChart class object """ kwargs = { "std": std, "ucl_limit": ucl_limit, "lcl_limit": lcl_limit, "range": range, } if var_name == "All": func = ( "max" if self.multi_val_policy not in {"min", "mean", "max"} else self.multi_val_policy ) data = getattr(np, f"nan{func}")(self.data, 1) return ControlChart(data, **kwargs) index = self.get_index_by_var_name(var_name) return ControlChart(self.data[:, index], **kwargs)
[docs] def univariate_control_charts(self, **kwargs): """ Calculate Control charts for all variables Parameters ---------- kwargs : any See univariate_control_chart for keyword parameters Returns ---------- dict ControlChart class objects stored in a dictionary with var_names and indices as keys (can use var_name or index) """ data = {} for i, key in enumerate(self.var_names): data[key] = self.univariate_control_chart(key, **kwargs) data[i] = data[key] data["All"] = self.univariate_control_chart("All", **kwargs) data[i + 1] = data["All"] return data
[docs]class ControlChart: """Calculate control limits for a standard univariate Control Chart" Parameters ---------- y : list, np.ndarray Input data (1-D) std : int, float, optional Number of standard deviations used to calculate if a y-value is out-of-control. ucl_limit : float, optional Limit the upper control limit to this value lcl_limit : float, optional Limit the lower control limit to this value """ def __init__( self, y, std=3, ucl_limit=None, lcl_limit=None, x=None, range=None, ): """Initialization of a ControlChart""" self.y = np.array(y) if isinstance(y, list) else y self.x = x if x else np.linspace(1, len(self.y), len(self.y)) self.std = std self.ucl_limit = ucl_limit self.lcl_limit = lcl_limit self.range = range # since moving range is calculated based on 2 consecutive points self.scalar_d = 1.128 def __str__(self): """String representation of ControlChartData object""" msg = [ "center_line: %0.3f" % self.center_line, "control_limits: %0.3f, %0.3f" % self.control_limits, "out_of_control: %s" % self.out_of_control, ] return "\n".join(msg) def __repr__(self): """Return the string representation""" return str(self) @property def x_ranged(self) -> list: """Return ``x`` within ``range`` Returns ------- list ``x`` data from ``range[0]`` to ``range[1]`` """ return ( self.x if self.range is None else self.x[self.range[0] - 1 : self.range[1]] ) @property def y_ranged(self): """Return ``y`` within ``range`` Returns ------- list ``y`` data from ``range[0]`` to ``range[1]`` """ return ( self.y if self.range is None else self.y[self.range[0] - 1 : self.range[1]] ) @property def center_line(self): """Center line of charting data (i.e., mean value) Returns ---------- np.ndarray, np.nan Mean value of y with np.mean() or np.nan if y is empty """ data = remove_nan(self.y_ranged) if len(data): return np.mean(data) return np.nan @property def avg_moving_range(self): """Avg moving range based on 2 consecutive points Returns ---------- np.ndarray, np.nan Average moving range. Returns NaN if arr is empty. """ return avg_moving_range(self.y_ranged, nan_policy="omit") @property def sigma(self): """UCL/LCL = center_line +/- sigma * std Returns ---------- np.ndarray, np.nan sigma or np.nan if arr is empty """ return self.avg_moving_range / self.scalar_d @property def control_limits(self): """Calculate the lower and upper control limits Returns ---------- lcl : float Lower Control Limit (LCL) ucl : float Upper Control Limit (UCL) """ cl = self.center_line sigma = self.sigma ucl = cl + self.std * sigma lcl = cl - self.std * sigma if self.ucl_limit is not None and ucl > self.ucl_limit: ucl = self.ucl_limit if self.lcl_limit is not None and lcl < self.lcl_limit: lcl = self.lcl_limit return lcl, ucl @property def out_of_control(self): """Get the indices of out-of-control observations Returns ---------- np.ndarray An array of indices that are not between the lower and upper control limits """ lcl, ucl = self.control_limits high = np.argwhere(self.y_ranged > ucl) low = np.argwhere(self.y_ranged < lcl) return np.unique(np.concatenate([high, low])) @property def out_of_control_high(self): """Get the indices of observations > ucl Returns ---------- np.ndarray An array of indices that are greater than the upper control limit """ _, ucl = self.control_limits return np.argwhere(self.y_ranged > ucl) @property def out_of_control_low(self): """Get the indices of observations < lcl Returns ---------- np.ndarray An array of indices that are less than the lower control limit """ lcl, _ = self.control_limits return np.argwhere(self.y_ranged < lcl) @property def chart_data(self): """JSON compatible dict for chart generation Returns ---------- dict Data used for Histogram visuals. Keys include 'x', 'y', 'out_of_control', 'center_line', 'lcl', 'ucl' """ lcl, ucl = self.control_limits return { "x": self.x_ranged.tolist(), "y": self.y_ranged.tolist(), "out_of_control": self.out_of_control.tolist(), "center_line": float(self.center_line), "lcl": float(lcl), "ucl": float(ucl), }
[docs]def avg_moving_range(arr, nan_policy="omit"): """Calculate the average moving range (over 2-consecutive point1) Parameters ---------- arr : array-like (1-D) Input array. Must be positive 1-dimensional. nan_policy : str, optional Value must be one of the following: {‘propagate’, ‘raise’, ‘omit’} Defines how to handle when input contains nan. The following options are available (default is ‘omit’): ‘propagate’: returns nan ‘raise’: throws an error ‘omit’: performs the calculations ignoring nan values Returns ---------- np.ndarray, np.nan Average moving range. Returns NaN if arr is empty """ arr = process_nan_policy(arr, nan_policy) if len(arr) == 0: return np.nan return np.mean(np.absolute(np.diff(arr)))
[docs]def remove_nan(arr): """Remove indices from 1-D array with values of np.nan Parameters ---------- arr : np.ndarray (1-D) Input array. Must be positive 1-dimensional. Returns ---------- np.ndarray arr with NaN values deleted """ return arr[~np.isnan(arr)]
[docs]def process_nan_policy(arr, nan_policy): """Calculate the average moving range (over 2-consecutive point1) Parameters ---------- arr : array-like (1-D) Input array. Must be positive 1-dimensional. nan_policy : str Value must be one of the following: {‘propagate’, ‘raise’, ‘omit’} Defines how to handle when input contains nan. The following options are available (default is ‘omit’): ‘propagate’: returns nan ‘raise’: throws an error ‘omit’: performs the calculations ignoring nan values Returns ---------- np.ndarray, np.nan Input array evaluated per nan_policy """ arr_no_nan = remove_nan(arr) if len(arr_no_nan) != len(arr): if nan_policy == "raise": msg = "NaN values are not supported for avg_moving_range" raise NotImplementedError(msg) if nan_policy == "propagate": return np.nan if nan_policy == "omit": return arr_no_nan return arr