Detector of out-of-range values based on training feature ranges.
The detector is intentionally lightweight: it does not compute advanced
drift statistics since it is used to check single observations during
inference. Suitable for real-time applications.
Indicates whether exogenous variables have different values across
target series during training (i.e., exogenous is series-specific
rather than global).
Get a summary of the features in the DataFrame or Series. For numeric
features, it returns the min and max values. For categorical features,
it returns the unique values.
Feature ranges. If X is a Series, returns a tuple (min, max) for numeric
data or a set of unique values for categorical data. If X is a DataFrame,
returns a dictionary with column names as keys and their respective ranges
(tuple or set) as values.
Source code in skforecast/drift_detection/_range_drift.py
@classmethoddef_get_features_range(cls,X:pd.Series|pd.DataFrame)->tuple|set|dict[str,tuple|set]:""" Get a summary of the features in the DataFrame or Series. For numeric features, it returns the min and max values. For categorical features, it returns the unique values. Parameters ---------- X : pandas Series, pandas DataFrame Input data to summarize. Returns ------- features_ranges: tuple, set, dict Feature ranges. If X is a Series, returns a tuple (min, max) for numeric data or a set of unique values for categorical data. If X is a DataFrame, returns a dictionary with column names as keys and their respective ranges (tuple or set) as values. """ifnotisinstance(X,(pd.DataFrame,pd.Series)):raiseTypeError("Input must be a pandas DataFrame or Series.")ifisinstance(X,pd.Series):ifpd.api.types.is_numeric_dtype(X):features_ranges=(float(X.min()),float(X.max()))else:features_ranges=set(X.dropna().unique())ifisinstance(X,pd.DataFrame):num_cols=[colforcolinX.columnsifpd.api.types.is_numeric_dtype(X[col])]cat_cols=[colforcolinX.columnsifcolnotinnum_cols]features_ranges={}features_ranges.update({col:(float(X[col].min()),float(X[col].max()))forcolinnum_cols})features_ranges.update({col:set(X[col].dropna().unique())forcolincat_cols})returnfeatures_ranges
Check if there is any value outside the training range. For numeric features,
it checks if the values are within the min and max range. For categorical features,
it checks if the values are among the seen categories.
@classmethoddef_check_feature_range(cls,feature_range:tuple|set,X:pd.Series)->bool:""" Check if there is any value outside the training range. For numeric features, it checks if the values are within the min and max range. For categorical features, it checks if the values are among the seen categories. Parameters ---------- feature_range : tuple, set Output from _get_features_range() for a single feature. X : pd.Series New data to validate Returns ------- bool True if there is any value outside the training range, False otherwise. """ifisinstance(feature_range,tuple):returnX.min()<feature_range[0]orX.max()>feature_range[1]else:unseen=set(X.dropna().unique())-feature_rangereturnbool(unseen)
@classmethoddef_display_warnings(cls,not_compliant_feature:str,feature_range:tuple|set,series_name:str=None,)->None:""" Display warnings for features with values outside the training range. Parameters ---------- not_compliant_feature : str Name of the feature with values outside the training range. feature_range : tuple | set Training range of the feature. series_name : str, optional Name of the series being checked, if applicable. Returns ------- None """ifisinstance(feature_range,tuple):# Numericmsg=(f"'{not_compliant_feature}' has values outside the range seen during training "f"[{feature_range[0]:.5f}, {feature_range[1]:.5f}]. "f"This may affect the accuracy of the predictions.")else:# Categoricalmsg=(f"'{not_compliant_feature}' has values not seen during training. Seen values: "f"{feature_range}. This may affect the accuracy of the predictions.")ifseries_name:msg=f"'{series_name}': "+msgwarnings.warn(msg,FeatureOutOfRangeWarning)
@classmethoddef_summary(cls,out_of_range_series:list,out_of_range_series_ranges:list,out_of_range_exog:list,out_of_range_exog_ranges:list)->None:""" Summarize the results of the range check. Parameters ---------- out_of_range_series : list List of series names that are out of range. out_of_range_series_ranges : list List of ranges for the out-of-range series. out_of_range_exog : list List of exogenous variable names that are out of range. out_of_range_exog_ranges : list List of ranges for the out-of-range exogenous variables. Returns ------- None """msg_series=""ifout_of_range_series:series_msgs=[]forseries,series_rangeinzip(out_of_range_series,out_of_range_series_ranges):msg_temp=(f"'{series}' has values outside the observed range "f"[{series_range[0]:.5f}, {series_range[1]:.5f}].")series_msgs.append(textwrap.fill(msg_temp,width=80))msg_series="\n".join(series_msgs)+"\n"else:msg_series="No series with out-of-range values found.\n"msg_exog=""ifout_of_range_exog:exog_msgs=[]ifisinstance(out_of_range_exog,list):forexog,exog_rangeinzip(out_of_range_exog,out_of_range_exog_ranges):ifisinstance(exog_range,tuple):# Numericmsg_temp=(f"'{exog}' has values outside the observed range "f"[{exog_range[0]:.5f}, {exog_range[1]:.5f}].")else:# Categoricalmsg_temp=(f"'{exog}' has values not seen during training. Seen values: "f"{exog_range}.")exog_msgs.append(textwrap.fill(msg_temp,width=80))else:forkey,valueinout_of_range_exog.items():forexog,exog_rangeinzip(value,out_of_range_exog_ranges[key]):ifisinstance(exog_range,tuple):# Numericmsg_temp=(f"'{exog}' has values outside the observed range "f"[{exog_range[0]:.5f}, {exog_range[1]:.5f}].")else:# Categoricalmsg_temp=(f"'{exog}' has values not seen during training. Seen values: "f"{exog_range}.")msg_temp=f"'{key}': "+msg_tempexog_msgs.append(textwrap.fill(msg_temp,width=80))msg_exog="\n".join(exog_msgs)else:msg_exog="No exogenous variables with out-of-range values found."console=Console()content=(f"[bold]Series:[/bold]\n{msg_series}\n"f"[bold]Exogenous Variables:[/bold]\n{msg_exog}")console.print(Panel(content,title="[bold]Out-of-range summary[/bold]",expand=False))
def_normalize_input(self,X:pd.Series|pd.DataFrame|dict[str,pd.Series|pd.DataFrame],name:str,series_ids:list[str]|None=None)->dict[str,pd.Series|pd.DataFrame]:""" Convert pd.Series, pd.DataFrame or dict into a standardized dict of pd.Series or pd.DataFrames. Parameters ---------- X : pandas Series, pandas DataFrame, dict Input data to normalize. name : str Name of the input being normalized. Used for error messages. Expected values are 'series', 'last_window' or 'exog'. series_ids : list, default None Series IDs to include in the normalization of exogenous variables. Returns ------- X : dict Normalized input as a dictionary of pandas Series or DataFrames. """ifisinstance(X,pd.Series):ifnotX.name:raiseValueError(f"{name} must have a name when a pandas Series is provided.")X={X.name:X}elifisinstance(X,pd.DataFrame):ifisinstance(X.index,pd.MultiIndex):ifnamein["series","last_window"]:col=X.columns[0]iflen(X.columns)!=1:warnings.warn(f"`{name}` DataFrame has multiple columns. Only the "f"first column, '{col}', will be used. Others ignored.",IgnoredArgumentWarning,)X={series_id:X.loc[series_id][col].rename(series_id)forseries_idinX.index.levels[0]}else:X={series_id:X.loc[series_id]forseries_idinX.index.levels[0]}else:ifself.series_specific_exog_andseries_ids:X={series_id:X.copy()forseries_idinseries_ids}else:X=X.to_dict(orient="series")elifisinstance(X,dict):fork,vinX.items():ifnotisinstance(v,(pd.Series,pd.DataFrame)):raiseTypeError(f"All values in `{name}` must be a pandas Series or DataFrame. "f"Review the value for key '{k}'.")returnX
deffit(self,series:pd.DataFrame|pd.Series|dict[str,pd.Series|pd.DataFrame]|None=None,exog:pd.DataFrame|pd.Series|dict[str,pd.Series|pd.DataFrame]|None=None,**kwargs)->None:""" Fit detector, storing training ranges. Parameters ---------- series : pandas Series, pandas DataFrame, dict, aliases: `y` Input time series data to fit the detector, ideally the same ones used to fit the forecaster. exog : pandas Series, pandas DataFrame, dict, default None Exogenous variables to include in the forecaster. Returns ------- None """ifseriesisNoneand('y'notinkwargsorkwargs['y']isNone):raiseValueError("One of `series` or `y` must be provided.")if'y'inkwargs:ifseriesisnotNone:raiseValueError("Cannot specify both `series` and `y`. Please provide only one of them.")series=kwargs.pop('y')ifnotisinstance(series,(pd.Series,pd.DataFrame,dict)):raiseTypeError("Input must be a pandas Series, DataFrame or dict.")ifnotisinstance(exog,(pd.Series,pd.DataFrame,dict,type(None))):raiseTypeError("Exogenous variables must be a pandas Series, DataFrame or dict.")self.series_names_in_=[]self.series_values_range_={}self.exog_names_in_=Noneself.exog_values_range_=Noneself.series_specific_exog_=Falseself.is_fitted=Falseseries=self._normalize_input(series,name="series")forkey,valueinseries.items():self.series_names_in_.append(key)self.series_values_range_[key]=self._get_features_range(X=value)ifexogisnotNone:exog=self._normalize_input(exog,name="exog")self.exog_names_in_=[]self.exog_values_range_={}forkey,valueinexog.items():ifisinstance(value,pd.Series):self.exog_names_in_.append(key)else:self.exog_names_in_.extend(value.columns)self.exog_values_range_[key]=self._get_features_range(X=value)self.exog_names_in_=list(dict.fromkeys(self.exog_names_in_))self.series_specific_exog_=any(keyinself.series_names_in_forkeyinexog.keys())self.is_fitted=True
If self.series_specific_exog_ is False: returns a list with the names
of exogenous variables that are out of range (global exogenous).
If self.series_specific_exog_ is True: returns a dictionary where
keys are series names and values are lists of out-of-range exogenous
variables for each series.
Source code in skforecast/drift_detection/_range_drift.py
defpredict(self,last_window:pd.Series|pd.DataFrame|dict[str,pd.Series|pd.DataFrame]|None=None,exog:pd.Series|pd.DataFrame|dict[str,pd.Series|pd.DataFrame]|None=None,verbose:bool=True,suppress_warnings:bool=False)->tuple[bool,list[str],list[str]|dict[str,list[str]]]:""" Check if there is any value outside the training range for last_window and exog. Parameters ---------- last_window : pandas Series, pandas DataFrame, dict, default None Series values used to create the predictors (lags) needed in the first iteration of the prediction (t + 1). exog : pandas Series, pandas DataFrame, dict, default None Exogenous variable/s included as predictor/s. verbose : bool, default False Whether to print a summary of the check. suppress_warnings : bool, default False Whether to suppress warnings. Returns ------- flag_out_of_range : bool True if there is any value outside the training range, False otherwise. out_of_range_series : list List of series names that are out of range. out_of_range_exog : list, dict Exogenous variables that are out of range. - If `self.series_specific_exog_` is False: returns a list with the names of exogenous variables that are out of range (global exogenous). - If `self.series_specific_exog_` is True: returns a dictionary where keys are series names and values are lists of out-of-range exogenous variables for each series. """ifnotself.is_fitted:raiseRuntimeError("Model is not fitted yet.")ifnotisinstance(last_window,(pd.Series,pd.DataFrame,dict,type(None))):raiseTypeError("`last_window` must be a pandas Series, DataFrame, dict or None.")ifnotisinstance(exog,(pd.Series,pd.DataFrame,dict,type(None))):raiseTypeError("`exog` must be a pandas Series, DataFrame, dict or None.")set_skforecast_warnings(suppress_warnings,action='ignore')flag_out_of_range=Falseout_of_range_series=[]out_of_range_series_ranges=[]iflast_windowisnotNone:last_window=self._normalize_input(last_window,name="last_window")forkey,valueinlast_window.items():ifisinstance(value,pd.Series):value=value.to_frame()forcolinvalue.columns:ifkeynotinself.series_names_in_:warnings.warn(f"'{key}' was not seen during training. Its range is unknown.",UnknownLevelWarning)continueis_out_of_range=self._check_feature_range(feature_range=self.series_values_range_[col],X=value[col])ifis_out_of_range:flag_out_of_range=Trueout_of_range_series.append(col)out_of_range_series_ranges.append(self.series_values_range_[col])self._display_warnings(not_compliant_feature=col,feature_range=self.series_values_range_[col],series_name=None)out_of_range_exog={}ifself.series_specific_exog_else[]out_of_range_exog_ranges={}ifself.series_specific_exog_else[]ifexogisnotNone:series_ids=list(last_window.keys())iflast_windowisnotNoneelseself.series_names_in_exog=self._normalize_input(exog,name="exog",series_ids=series_ids)forkey,valueinexog.items():ifisinstance(value,pd.Series):value=value.to_frame()features_ranges=self.exog_values_range_.get(key,None)ifself.series_specific_exog_:out_of_range_exog[key]=[]out_of_range_exog_ranges[key]=[]forcolinvalue.columns:ifnotisinstance(features_ranges,dict):features_ranges={key:features_ranges}ifcolnotinself.exog_names_in_:warnings.warn(f"'{col}' was not seen during training. Its range is unknown.",MissingExogWarning,)continueis_out_of_range=self._check_feature_range(feature_range=features_ranges[col],X=value[col])ifis_out_of_range:flag_out_of_range=Trueifself.series_specific_exog_:out_of_range_exog[key].append(col)out_of_range_exog_ranges[key].append(features_ranges[col])else:out_of_range_exog.append(col)out_of_range_exog_ranges.append(features_ranges[col])self._display_warnings(not_compliant_feature=col,feature_range=features_ranges[col],series_name=keyifself.series_specific_exog_elseNone,)ifself.series_specific_exog_andnotout_of_range_exog[key]:out_of_range_exog.pop(key)out_of_range_exog_ranges.pop(key)ifverbose:self._summary(out_of_range_series=out_of_range_series,out_of_range_series_ranges=out_of_range_series_ranges,out_of_range_exog=out_of_range_exog,out_of_range_exog_ranges=out_of_range_exog_ranges)set_skforecast_warnings(suppress_warnings,action='default')returnflag_out_of_range,out_of_range_series,out_of_range_exog
A class to detect population drift between reference and new datasets.
This implementation computes Kolmogorov-Smirnov (KS) test for numeric features,
Chi-Square test for categorical features, and Jensen-Shannon (JS) distance
for all features. It calculates empirical distributions of these statistics
from the reference data and uses quantile thresholds to determine drift in
new data.
This implementation is inspired by NannyML's DriftDetector. See Notes for
details.
Size of chunks for sequential drift analysis. If int, number of rows per
chunk. If str (e.g., 'D' for daily, 'W' for weekly), time-based chunks
assuming a datetime index. If None, analyzes the full dataset as a single
chunk.
Method for calculating thresholds from empirical distributions:
'std': Uses mean + threshold * std of the empirical distribution.
This is faster since it does not use leave-one-chunk-out.
'quantile': Uses the specified quantile of the empirical distribution.
Thresholds are computed using leave-one-chunk-out cross-validation to
avoid self-comparison bias. This is statistically more correct for
quantile-based thresholds but computationally more expensive.
Size of chunks for sequential drift analysis. If int, number of rows per
chunk. If str (e.g., 'D' for daily, 'W' for weekly), time-based chunks
assuming a datetime index. If None, analyzes the full dataset as a single
chunk.
List of series IDs present during fitting when using MultiIndex DataFrames.
Notes
This implementation is inspired by NannyML's DriftDetector [1]_.
It is a lightweight version adapted for skforecast's needs:
- It does not store the raw reference data, only the necessary precomputed
information to calculate the statistics efficiently during prediction.
- All empirical thresholds are calculated using the specified quantile from
the empirical distributions obtained from the reference data chunks.
- It includes checks for out of range values in numeric features and new
categories in categorical features.
- It supports multiple time series by fitting separate detectors for each
series ID when provided with a MultiIndex DataFrame.
If user requires more advanced features, such as multivariate drift detection
or data quality checks, consider using https://nannyml.readthedocs.io/en/stable/
directly.
def__init__(self,chunk_size:int|str|None=None,threshold:int|float=3,threshold_method:str='std')->None:self.ref_features_=[]self.is_fitted=Falseself.ref_ecdf_={}self.ref_bins_edges_={}self.ref_hist_={}self.ref_probs_={}self.ref_counts_={}self.empirical_dist_ks_={}self.empirical_dist_chi2_={}self.empirical_dist_js_={}self.empirical_threshold_ks_={}self.empirical_threshold_chi2_={}self.empirical_threshold_js_={}self.ref_ranges_={}self.ref_categories_={}self.n_chunks_reference_data_=Noneself.detectors_={}# NOTE: Only used for multiseriesself.series_names_in_=None# NOTE: Only used for multiserieserror_msg=("`chunk_size` must be a positive integer, a string compatible with ""pandas frequencies (e.g., 'D', 'W', 'MS'), or None.")ifnot(isinstance(chunk_size,(int,str,pd.DateOffset,type(None)))):raiseTypeError(f"{error_msg} Got {type(chunk_size)}.")ifisinstance(chunk_size,str):try:chunk_size=pd.tseries.frequencies.to_offset(chunk_size)exceptValueError:raiseValueError(f"{error_msg} Got {type(chunk_size)}.")ifisinstance(chunk_size,int)andchunk_size<=0:raiseValueError(f"{error_msg} Got {chunk_size}.")self.chunk_size=chunk_sizevalid_threshold_methods=['quantile','std']ifthreshold_methodnotinvalid_threshold_methods:raiseValueError(f"`threshold_method` must be one of {valid_threshold_methods}. "f"Got '{threshold_method}'.")self.threshold_method=threshold_methodifthreshold_method=='quantile':ifnot(0<threshold<1):raiseValueError(f"When `threshold_method='quantile'`, `threshold` must be between "f"0 and 1. Got {threshold}.")else:ifthreshold<0:raiseValueError(f"When `threshold_method='std'`, `threshold` must be >= 0. "f"Got {threshold}.")self.threshold=threshold
def_reset_attributes(self)->None:""" Reset all fitted attributes to their initial state. """self.ref_features_=[]self.ref_ecdf_={}self.ref_bins_edges_={}self.ref_hist_={}self.ref_probs_={}self.ref_counts_={}self.empirical_dist_ks_={}self.empirical_dist_chi2_={}self.empirical_dist_js_={}self.empirical_threshold_ks_={}self.empirical_threshold_chi2_={}self.empirical_threshold_js_={}self.ref_ranges_={}self.ref_categories_={}self.n_chunks_reference_data_=Noneself.is_fitted=False
def_create_chunks(self,X:pd.DataFrame)->list[pd.DataFrame]:""" Split X into chunks based on chunk_size. Parameters ---------- X : pandas DataFrame Data to be chunked. Returns ------- chunks : list List of DataFrames, each representing a chunk of the data. """ifself.chunk_sizeisnotNone:ifisinstance(self.chunk_size,pd.offsets.DateOffset)andnotisinstance(X.index,pd.DatetimeIndex):raiseValueError("`chunk_size` is a pandas frequency but `X` does not have a DatetimeIndex.")ifself.chunk_sizeisnotNone:ifisinstance(self.chunk_size,int):chunks=[X.iloc[i:i+self.chunk_size]foriinrange(0,len(X),self.chunk_size)]else:chunks=[groupfor_,groupinX.resample(self.chunk_size)]else:chunks=[X]returnchunks
If new data contains values outside the reference range (numeric) or
unseen categories (categorical), they won't be counted in the probability
distribution, leading to a sum < 1. To ensure distributions are comparable,
we add an extra bin for "leftover" probability mass in the new distribution
and a corresponding zero bin in the reference distribution.
Parameters:
Name
Type
Description
Default
ref_probs
numpy ndarray
Probability distribution from reference data (histogram for numeric,
normalized counts for categorical).
def_compute_js_with_leftover(self,ref_probs:np.ndarray,new_probs:np.ndarray)->float:""" Compute Jensen-Shannon distance handling out-of-range/unseen values. If new data contains values outside the reference range (numeric) or unseen categories (categorical), they won't be counted in the probability distribution, leading to a sum < 1. To ensure distributions are comparable, we add an extra bin for "leftover" probability mass in the new distribution and a corresponding zero bin in the reference distribution. Parameters ---------- ref_probs : numpy ndarray Probability distribution from reference data (histogram for numeric, normalized counts for categorical). new_probs : numpy ndarray Probability distribution from new data. Returns ------- js_distance : float Jensen-Shannon distance between the two distributions. """leftover=1-np.sum(new_probs)ifleftover>0:new_probs_extended=np.append(new_probs,leftover)ref_probs_extended=np.append(ref_probs,0)js_distance=jensenshannon(ref_probs_extended,new_probs_extended,base=2)else:js_distance=jensenshannon(ref_probs,new_probs,base=2)returnjs_distance
Fit the drift detector by calculating empirical distributions and thresholds
from reference data. The empirical distributions are computed by chunking
the reference data according to the specified chunk_size and calculating
the statistics for each chunk.
Parameters:
Name
Type
Description
Default
X
pandas DataFrame
Reference data used as the baseline for drift detection.
required
Source code in skforecast/drift_detection/_population_drift.py
def_fit(self,X:pd.DataFrame)->None:""" Fit the drift detector by calculating empirical distributions and thresholds from reference data. The empirical distributions are computed by chunking the reference data according to the specified `chunk_size` and calculating the statistics for each chunk. Parameters ---------- X : pandas DataFrame Reference data used as the baseline for drift detection. """self._reset_attributes()chunks=self._create_chunks(X)self.n_chunks_reference_data_=len(chunks)features=X.columns.tolist()forfeatureinfeatures:is_numeric=pd.api.types.is_numeric_dtype(X[feature])ref=X[feature].dropna()ifref.empty:warnings.warn(f"Feature '{feature}' contains only NaN values in the reference dataset. "f"Drift detection skipped.",UnknownLevelWarning)continueself.empirical_dist_ks_[feature]=[]self.empirical_dist_chi2_[feature]=[]self.empirical_dist_js_[feature]=[]self.ref_features_.append(feature)ifis_numeric:# Precompute histogram with bins for Jensen-Shannon distance# This may not perfectly align with bins used in predict if new data# extends the range, but it provides a reasonable approximation# for efficiency.bins_edges=np.histogram_bin_edges(ref.astype("float64"),bins='doane')ref_hist=np.histogram(ref,bins=bins_edges)[0]/len(ref)self.ref_bins_edges_[feature]=bins_edgesself.ref_hist_[feature]=ref_histself.ref_ranges_[feature]=(ref.min(),ref.max())# Precompute ECDF for Kolmogorov-Smirnov testself.ref_ecdf_[feature]=ecdf(ref)else:counts_raw=ref.value_counts()counts_norm=counts_raw/counts_raw.sum()self.ref_counts_[feature]=counts_rawself.ref_probs_[feature]=counts_normself.ref_categories_[feature]=counts_raw.index.tolist()# NOTE: Precompute Leave One Chunk Out (LOO) indices once per feature # (only for 'quantile' method)use_loo=self.threshold_method=='quantile'ifuse_loo:chunk_indices=[chunk.indexforchunkinchunks]# List of indices of the reference DataFrame excluding each time # one of the chunk in order (Leave One Out)loo_indices=[X.index.difference(chunk_indices[i])foriinrange(len(chunks))]fori,chunkinenumerate(chunks):new=chunk[feature].dropna()ifnew.empty:continue# If 'quantile', use LOO and exclude current chunk from reference dataifuse_loo:ref_data_for_stats=X.loc[loo_indices[i],feature].dropna()ifref_data_for_stats.empty:continueelse:ref_data_for_stats=refks_stat=np.nanchi2_stat=np.nanjs_distance=np.nanifis_numeric:# Compute ECDF and histogram for referenceref_stats_ecdf=ecdf(ref_data_for_stats)ref_stats_hist=np.histogram(ref_data_for_stats,bins=self.ref_bins_edges_[feature])[0]/len(ref_data_for_stats)new_ecdf=ecdf(new)new_hist=np.histogram(new,bins=self.ref_bins_edges_[feature])[0]/len(new)# Handle out-of-bin data: if new data contains values outside the reference range,# they will not be counted in the histogram, leading to a sum < 1. To ensure# the histograms are comparable, we add an extra bin for "out-of-range" data with# the leftover probability mass in the new histogram and a corresponding zero bin# in the reference histogram.js_distance=self._compute_js_with_leftover(ref_probs=ref_stats_hist,new_probs=new_hist)ks_stat=ks_2samp_from_ecdf(ecdf1=ref_stats_ecdf,ecdf2=new_ecdf,alternative="two-sided")else:# Compute counts and probs for referenceifuse_loo:ref_stats_counts=ref_data_for_stats.value_counts()ref_stats_probs=ref_stats_counts/ref_stats_counts.sum()else:ref_stats_counts=self.ref_counts_[feature]ref_stats_probs=self.ref_probs_[feature]new_probs=new.value_counts(normalize=True).sort_index()# Align categories and fill missing with 0all_cats=ref_stats_probs.index.union(new_probs.index)ref_probs_aligned=ref_stats_probs.reindex(all_cats,fill_value=0)new_probs_aligned=new_probs.reindex(all_cats,fill_value=0)js_distance=jensenshannon(ref_probs_aligned.to_numpy(),new_probs_aligned.to_numpy(),base=2)# Align categories and fill missing with 0new_counts=new.value_counts().reindex(all_cats,fill_value=0).to_numpy()ref_counts_aligned=ref_stats_counts.reindex(all_cats,fill_value=0).to_numpy()ifnew_counts.sum()>0andref_counts_aligned.sum()>0:# Create contingency table with rows = [reference, new], columns = categoriescontingency_table=np.array([ref_counts_aligned,new_counts])chi2_stat=chi2_contingency(contingency_table)[0]self.empirical_dist_ks_[feature].append(ks_stat)self.empirical_dist_chi2_[feature].append(chi2_stat)self.empirical_dist_js_[feature].append(js_distance)ifself.threshold_method=='quantile':# Calculate empirical thresholds using the the specified quantile# Using pandas Series quantile method to handle NaNs properly and warningsself.empirical_threshold_ks_[feature]=pd.Series(self.empirical_dist_ks_[feature]).quantile(self.threshold)self.empirical_threshold_chi2_[feature]=pd.Series(self.empirical_dist_chi2_[feature]).quantile(self.threshold)self.empirical_threshold_js_[feature]=pd.Series(self.empirical_dist_js_[feature]).quantile(self.threshold)else:# Mean + k*std thresholdsks_values=self.empirical_dist_ks_[feature]chi2_values=self.empirical_dist_chi2_[feature]js_values=self.empirical_dist_js_[feature]# Suppress RuntimeWarnings when all values are NaNwithwarnings.catch_warnings():warnings.filterwarnings('ignore',message='Mean of empty slice')warnings.filterwarnings('ignore',message='Degrees of freedom <= 0 for slice')self.empirical_threshold_ks_[feature]=(np.nanmean(ks_values)+self.threshold*np.nanstd(ks_values,ddof=0))self.empirical_threshold_chi2_[feature]=(np.nanmean(chi2_values)+self.threshold*np.nanstd(chi2_values,ddof=0))self.empirical_threshold_js_[feature]=(np.nanmean(js_values)+self.threshold*np.nanstd(js_values,ddof=0))# NOTE: Clip thresholds to their theoretical bounds# KS statistic is bounded in [0, 1]ifnotnp.isnan(self.empirical_threshold_ks_[feature]):self.empirical_threshold_ks_[feature]=np.clip(self.empirical_threshold_ks_[feature],0,1)# Jensen-Shannon distance is bounded in [0, 1]ifnotnp.isnan(self.empirical_threshold_js_[feature]):self.empirical_threshold_js_[feature]=np.clip(self.empirical_threshold_js_[feature],0,1)# Chi-square statistic is bounded in [0, inf), only clip lower boundifnotnp.isnan(self.empirical_threshold_chi2_[feature]):self.empirical_threshold_chi2_[feature]=np.clip(self.empirical_threshold_chi2_[feature],0,None)ifself.n_chunks_reference_data_<10:warnings.warn(f"Only {self.n_chunks_reference_data_} chunks in reference data. "f"Empirical thresholds may not be reliable. Consider using more "f"data or smaller chunk_size.")self.is_fitted=True
Fit the drift detector by calculating empirical distributions and thresholds
from reference data. The empirical distributions are computed by chunking
the reference data according to the specified chunk_size and calculating
the statistics for each chunk.
Parameters:
Name
Type
Description
Default
X
pandas DataFrame
Reference data used as the baseline for drift detection.
If X is a regular DataFrame, a single detector is fitted for all data.
The index is assumed to be the temporal index and each column a feature.
If X has a MultiIndex, the first level is assumed to be the series ID
and the second level the temporal index. A separate detector is fitted for
each series.
required
Source code in skforecast/drift_detection/_population_drift.py
deffit(self,X)->None:""" Fit the drift detector by calculating empirical distributions and thresholds from reference data. The empirical distributions are computed by chunking the reference data according to the specified `chunk_size` and calculating the statistics for each chunk. Parameters ---------- X : pandas DataFrame Reference data used as the baseline for drift detection. - If `X` is a regular DataFrame, a single detector is fitted for all data. The index is assumed to be the temporal index and each column a feature. - If `X` has a MultiIndex, the first level is assumed to be the series ID and the second level the temporal index. A separate detector is fitted for each series. """self._reset_attributes()self.detectors_={}# NOTE: Only used for multiseriesself.series_names_in_=None# NOTE: Only used for multiseriesifnotisinstance(X,pd.DataFrame):raiseValueError(f"`X` must be a pandas DataFrame. Got {type(X)} instead.")ifisinstance(X.index,pd.MultiIndex):X=X.groupby(level=0)foridx,groupinX:group=group.droplevel(0)self.detectors_[idx]=PopulationDriftDetector(chunk_size=self.chunk_size,threshold=self.threshold,threshold_method=self.threshold_method)self.detectors_[idx]._fit(group)else:self._fit(X)self.is_fitted=Trueself.series_names_in_=list(self.detectors_.keys())ifself.detectors_elseNoneself._collect_attributes()
def_predict(self,X:pd.DataFrame)->pd.DataFrame:""" Predict drift in new data by comparing the estimated statistics to reference thresholds. Parameters ---------- X : pandas DataFrame New data to compare against the reference. Returns ------- results : pandas DataFrame DataFrame with the drift detection results for each chunk. """chunks=self._create_chunks(X)results=[]features=X.columns.tolist()forfeatureinfeatures:iffeaturenotinself.ref_features_:warnings.warn(f"Feature '{feature}' was not present during fitting. Drift detection skipped."f"for this feature.",UnknownLevelWarning)continueis_numeric=pd.api.types.is_numeric_dtype(X[feature])ref_bin_edges=self.ref_bins_edges_.get(feature,None)ref_hist=self.ref_hist_.get(feature,None)ref_probs=self.ref_probs_.get(feature,None)ref_counts=self.ref_counts_.get(feature,None)ref_ecdf=self.ref_ecdf_.get(feature,None)ks_threshold=self.empirical_threshold_ks_.get(feature,np.nan)chi2_threshold=self.empirical_threshold_chi2_.get(feature,np.nan)js_threshold=self.empirical_threshold_js_.get(feature,np.nan)ref_range=self.ref_ranges_.get(feature,(np.nan,np.nan))forchunk_idx,chunkinenumerate(chunks):chunk_label=chunk_idxifself.chunk_sizeelse"full"new=chunk[feature].dropna()ks_stat=np.nanchi2_stat=np.nanjs_distance=np.nanis_out_of_range=np.nanifnotnew.empty:ifis_numeric:new_ecdf=ecdf(new)# Compute histogram for new data using reference bin edges and normalizenew_hist=np.histogram(new,bins=ref_bin_edges)[0]/len(new)js_distance=self._compute_js_with_leftover(ref_probs=ref_hist,new_probs=new_hist)ks_stat=ks_2samp_from_ecdf(ecdf1=ref_ecdf,ecdf2=new_ecdf,alternative="two-sided")is_out_of_range=(np.min(new)<ref_range[0]ornp.max(new)>ref_range[1])else:ref_categories=self.ref_categories_[feature]ref_probs_=ref_probs.reindex(ref_categories,fill_value=0).to_numpy()# Map new data to reference categoriesnew_counts_dict=new.value_counts().to_dict()new_counts_on_ref=[new_counts_dict.get(cat,0)forcatinref_categories]new_probs=(np.array(new_counts_on_ref)/len(new)iflen(new)>0elsenp.zeros(len(ref_categories)))js_distance=self._compute_js_with_leftover(ref_probs=ref_probs_,new_probs=new_probs)all_cats=set(self.ref_categories_[feature]).union(set(new_counts_dict.keys()))new_counts=new.value_counts().reindex(all_cats,fill_value=0).to_numpy()ref_counts_aligned=ref_counts.reindex(all_cats,fill_value=0).to_numpy()ifnew_counts.sum()>0andref_counts_aligned.sum()>0:# Create contingency table: rows = [reference, new], columns = categoriescontingency_table=np.array([ref_counts_aligned,new_counts])chi2_stat=chi2_contingency(contingency_table)[0]results.append({"chunk":chunk_label,"chunk_start":chunk.index.min(),"chunk_end":chunk.index.max(),"feature":feature,"ks_statistic":ks_stat,"ks_threshold":ks_threshold,"chi2_statistic":chi2_stat,"chi2_threshold":chi2_threshold,"js_statistic":js_distance,"js_threshold":js_threshold,"reference_range":ref_range,"is_out_of_range":is_out_of_range,})results_df=pd.DataFrame(results)results_df['drift_ks_statistic']=results_df['ks_statistic']>results_df['ks_threshold']results_df['drift_chi2_statistic']=results_df['chi2_statistic']>results_df['chi2_threshold']results_df['drift_js']=results_df['js_statistic']>results_df['js_threshold']results_df['drift_detected']=(results_df['drift_ks_statistic']|results_df['drift_chi2_statistic']|results_df['drift_js']|results_df['is_out_of_range'])returnresults_df
Predict drift in new data by comparing the estimated statistics to
reference thresholds. Two dataframes are returned, the first one with
detailed information of each chunk, the second only the total number
of chunks where drift have been detected.
Parameters:
Name
Type
Description
Default
X
pandas DataFrame
New data to compare against the reference.
required
Returns:
Name
Type
Description
results
pandas DataFrame
DataFrame with the drift detection results for each chunk.
summary
pandas DataFrame
Summary DataFrame with the total number and percentage of chunks
with detected drift per feature (or per series_id and feature if
MultiIndex), and the list of chunk IDs where drift was detected.
Source code in skforecast/drift_detection/_population_drift.py
defpredict(self,X)->tuple[pd.DataFrame,pd.DataFrame]:""" Predict drift in new data by comparing the estimated statistics to reference thresholds. Two dataframes are returned, the first one with detailed information of each chunk, the second only the total number of chunks where drift have been detected. Parameters ---------- X : pandas DataFrame New data to compare against the reference. Returns ------- results : pandas DataFrame DataFrame with the drift detection results for each chunk. summary : pandas DataFrame Summary DataFrame with the total number and percentage of chunks with detected drift per feature (or per series_id and feature if MultiIndex), and the list of chunk IDs where drift was detected. """ifnotself.is_fitted:raiseNotFittedError("This PopulationDriftDetector instance is not fitted yet. ""Call 'fit' with appropriate arguments before using this estimator.")ifnotisinstance(X,pd.DataFrame):raiseValueError(f"`X` must be a pandas DataFrame. Got {type(X)} instead.")ifisinstance(X.index,pd.MultiIndex):results=[]foridx,groupinX.groupby(level=0):group=group.droplevel(0)ifidxnotinself.detectors_:warnings.warn(f"Series '{idx}' was not present during fitting. Drift detection skipped.",UnknownLevelWarning)continuedetector=self.detectors_[idx]result=detector._predict(group)result.insert(0,'series_id',idx)results.append(result)results=pd.concat(results,ignore_index=True)else:results=self._predict(X)ifresults.columns[0]=='series_id':groupby_cols=['series_id','feature']else:groupby_cols=['feature']def_get_drift_chunk_ids(group):returngroup.loc[group['drift_detected'],'chunk'].tolist()summary=(results.groupby(groupby_cols).agg(n_chunks_with_drift=('drift_detected','sum'),pct_chunks_with_drift=('drift_detected','mean'),chunks_with_drift=('drift_detected',lambdax:_get_drift_chunk_ids(results.loc[x.index]))).reset_index())summary['pct_chunks_with_drift']=summary['pct_chunks_with_drift']*100returnresults,summary
Collect attributes for representation and inspection and update the instance
dictionary with the collected values. For multi-series (when detectors_ is
populated), attributes are aggregated into nested dictionaries keyed by
detector names. For single-series, attributes remain unchanged.
Parameters:
Name
Type
Description
Default
self
required
Returns:
Type
Description
None
Source code in skforecast/drift_detection/_population_drift.py
def_collect_attributes(self)->None:""" Collect attributes for representation and inspection and update the instance dictionary with the collected values. For multi-series (when detectors_ is populated), attributes are aggregated into nested dictionaries keyed by detector names. For single-series, attributes remain unchanged. Parameters ---------- self Returns ------- None """attr_names=[kforkinself.__dict__.keys()ifknotin['is_fitted','detectors_','series_names_in_']]ifself.detectors_:forattr_nameinattr_names:collected={}fordetector_key,detectorinself.detectors_.items():collected[detector_key]=getattr(detector,attr_name,None)self.__dict__[attr_name]=deepcopy(collected)
defget_thresholds(self)->pd.DataFrame:""" Return a DataFrame with all computed thresholds per feature. For multi-series, returns thresholds per series_id and feature. Parameters ---------- self Returns ------- thresholds : pandas DataFrame DataFrame with the computed thresholds per feature (and per series_id if MultiIndex was used during fitting). """ifnotself.is_fitted:raiseNotFittedError("This PopulationDriftDetector instance is not fitted yet. ""Call 'fit' with appropriate arguments before using this estimator.")# Multi-series case: ref_features_ is a dict keyed by series_idifself.detectors_:thresholds={"series_id":[],"feature":[],"ks_threshold":[],"chi2_threshold":[],"js_threshold":[]}forseries_id,detectorinself.detectors_.items():forfeatureindetector.ref_features_:thresholds["series_id"].append(series_id)thresholds["feature"].append(feature)thresholds["ks_threshold"].append(detector.empirical_threshold_ks_.get(feature))thresholds["chi2_threshold"].append(detector.empirical_threshold_chi2_.get(feature))thresholds["js_threshold"].append(detector.empirical_threshold_js_.get(feature))else:# Single-series casethresholds={"feature":[],"ks_threshold":[],"chi2_threshold":[],"js_threshold":[]}forfeatureinself.ref_features_:thresholds["feature"].append(feature)thresholds["ks_threshold"].append(self.empirical_threshold_ks_.get(feature))thresholds["chi2_threshold"].append(self.empirical_threshold_chi2_.get(feature))thresholds["js_threshold"].append(self.empirical_threshold_js_.get(feature))returnpd.DataFrame(thresholds)