Detector of out-of-range values based on training feature ranges.
The detector is intentionally lightweight: it does not compute advanced
drift statistics since it is used to check single observations during
inference. Suitable for real-time applications.
Indicates whether exogenous variables have different values across
target series during training (i.e., exogenous is series-specific
rather than global).
Get a summary of the features in the DataFrame or Series. For numeric
features, it returns the min and max values. For categorical features,
it returns the unique values.
Feature ranges. If X is a Series, returns a tuple (min, max) for numeric
data or a set of unique values for categorical data. If X is a DataFrame,
returns a dictionary with column names as keys and their respective ranges
(tuple or set) as values.
Source code in skforecast\drift_detection\_range_drift.py
@classmethoddef_get_features_range(cls,X:pd.Series|pd.DataFrame)->tuple|set|dict[str,tuple|set]:""" Get a summary of the features in the DataFrame or Series. For numeric features, it returns the min and max values. For categorical features, it returns the unique values. Parameters ---------- X : pandas Series, pandas DataFrame Input data to summarize. Returns ------- features_ranges: tuple, set, dict Feature ranges. If X is a Series, returns a tuple (min, max) for numeric data or a set of unique values for categorical data. If X is a DataFrame, returns a dictionary with column names as keys and their respective ranges (tuple or set) as values. """ifnotisinstance(X,(pd.DataFrame,pd.Series)):raiseTypeError("Input must be a pandas DataFrame or Series.")ifisinstance(X,pd.Series):ifpd.api.types.is_numeric_dtype(X):features_ranges=(float(X.min()),float(X.max()))else:features_ranges=set(X.dropna().unique())ifisinstance(X,pd.DataFrame):num_cols=[colforcolinX.columnsifpd.api.types.is_numeric_dtype(X[col])]cat_cols=[colforcolinX.columnsifcolnotinnum_cols]features_ranges={}features_ranges.update({col:(float(X[col].min()),float(X[col].max()))forcolinnum_cols})features_ranges.update({col:set(X[col].dropna().unique())forcolincat_cols})returnfeatures_ranges
Check if there is any value outside the training range. For numeric features,
it checks if the values are within the min and max range. For categorical features,
it checks if the values are among the seen categories.
@classmethoddef_check_feature_range(cls,feature_range:tuple|set,X:pd.Series)->bool:""" Check if there is any value outside the training range. For numeric features, it checks if the values are within the min and max range. For categorical features, it checks if the values are among the seen categories. Parameters ---------- feature_range : tuple, set Output from _get_features_range() for a single feature. X : pd.Series New data to validate Returns ------- bool True if there is any value outside the training range, False otherwise. """ifisinstance(feature_range,tuple):returnX.min()<feature_range[0]orX.max()>feature_range[1]else:unseen=set(X.dropna().unique())-feature_rangereturnbool(unseen)
@classmethoddef_display_warnings(cls,not_compliant_feature:str,feature_range:tuple|set,series_name:str=None,)->None:""" Display warnings for features with values outside the training range. Parameters ---------- not_compliant_feature : str Name of the feature with values outside the training range. feature_range : tuple | set Training range of the feature. series_name : str, optional Name of the series being checked, if applicable. Returns ------- None """ifisinstance(feature_range,tuple):# Numericmsg=(f"'{not_compliant_feature}' has values outside the range seen during training "f"[{feature_range[0]:.5f}, {feature_range[1]:.5f}]. "f"This may affect the accuracy of the predictions.")else:# Categoricalmsg=(f"'{not_compliant_feature}' has values not seen during training. Seen values: "f"{feature_range}. This may affect the accuracy of the predictions.")ifseries_name:msg=f"'{series_name}': "+msgwarnings.warn(msg,FeatureOutOfRangeWarning)
@classmethoddef_summary(cls,out_of_range_series:list,out_of_range_series_ranges:list,out_of_range_exog:list,out_of_range_exog_ranges:list)->None:""" Summarize the results of the range check. Parameters ---------- out_of_range_series : list List of series names that are out of range. out_of_range_series_ranges : list List of ranges for the out-of-range series. out_of_range_exog : list List of exogenous variable names that are out of range. out_of_range_exog_ranges : list List of ranges for the out-of-range exogenous variables. Returns ------- None """msg_series=""ifout_of_range_series:series_msgs=[]forseries,series_rangeinzip(out_of_range_series,out_of_range_series_ranges):msg_temp=(f"'{series}' has values outside the observed range "f"[{series_range[0]:.5f}, {series_range[1]:.5f}].")series_msgs.append(textwrap.fill(msg_temp,width=80))msg_series="\n".join(series_msgs)+"\n"else:msg_series="No series with out-of-range values found.\n"msg_exog=""ifout_of_range_exog:exog_msgs=[]ifisinstance(out_of_range_exog,list):forexog,exog_rangeinzip(out_of_range_exog,out_of_range_exog_ranges):ifisinstance(exog_range,tuple):# Numericmsg_temp=(f"'{exog}' has values outside the observed range "f"[{exog_range[0]:.5f}, {exog_range[1]:.5f}].")else:# Categoricalmsg_temp=(f"'{exog}' has values not seen during training. Seen values: "f"{exog_range}.")exog_msgs.append(textwrap.fill(msg_temp,width=80))else:forkey,valueinout_of_range_exog.items():forexog,exog_rangeinzip(value,out_of_range_exog_ranges[key]):ifisinstance(exog_range,tuple):# Numericmsg_temp=(f"'{exog}' has values outside the observed range "f"[{exog_range[0]:.5f}, {exog_range[1]:.5f}].")else:# Categoricalmsg_temp=(f"'{exog}' has values not seen during training. Seen values: "f"{exog_range}.")msg_temp=f"'{key}': "+msg_tempexog_msgs.append(textwrap.fill(msg_temp,width=80))msg_exog="\n".join(exog_msgs)else:msg_exog="No exogenous variables with out-of-range values found."console=Console()content=(f"[bold]Series:[/bold]\n{msg_series}\n"f"[bold]Exogenous Variables:[/bold]\n{msg_exog}")console.print(Panel(content,title="[bold]Out-of-range summary[/bold]",expand=False))
def_normalize_input(self,X:pd.Series|pd.DataFrame|dict[str,pd.Series|pd.DataFrame],name:str,series_ids:list[str]|None=None)->dict[str,pd.Series|pd.DataFrame]:""" Convert pd.Series, pd.DataFrame or dict into a standardized dict of pd.Series or pd.DataFrames. Parameters ---------- X : pandas Series, pandas DataFrame, dict Input data to normalize. name : str Name of the input being normalized. Used for error messages. Expected values are 'series', 'last_window' or 'exog'. series_ids : list, default None Series IDs to include in the normalization of exogenous variables. Returns ------- X : dict Normalized input as a dictionary of pandas Series or DataFrames. """ifisinstance(X,pd.Series):ifnotX.name:raiseValueError(f"{name} must have a name when a pandas Series is provided.")X={X.name:X}elifisinstance(X,pd.DataFrame):ifisinstance(X.index,pd.MultiIndex):ifnamein["series","last_window"]:col=X.columns[0]iflen(X.columns)!=1:warnings.warn(f"`{name}` DataFrame has multiple columns. Only the "f"first column, '{col}', will be used. Others ignored.",IgnoredArgumentWarning,)X={series_id:X.loc[series_id][col].rename(series_id)forseries_idinX.index.levels[0]}else:X={series_id:X.loc[series_id]forseries_idinX.index.levels[0]}else:ifself.series_specific_exog_andseries_ids:X={series_id:X.copy()forseries_idinseries_ids}else:X=X.to_dict(orient="series")elifisinstance(X,dict):fork,vinX.items():ifnotisinstance(v,(pd.Series,pd.DataFrame)):raiseTypeError(f"All values in `{name}` must be a pandas Series or DataFrame. "f"Review the value for key '{k}'.")returnX
deffit(self,series:pd.DataFrame|pd.Series|dict[str,pd.Series|pd.DataFrame]|None=None,exog:pd.DataFrame|pd.Series|dict[str,pd.Series|pd.DataFrame]|None=None,**kwargs)->None:""" Fit detector, storing training ranges. Parameters ---------- series : pandas Series, pandas DataFrame, dict, aliases: `y` Input time series data to fit the detector, ideally the same ones used to fit the forecaster. exog : pandas Series, pandas DataFrame, dict, default None Exogenous variables to include in the forecaster. Returns ------- None """ifseriesisNoneand('y'notinkwargsorkwargs['y']isNone):raiseValueError("One of `series` or `y` must be provided.")if'y'inkwargs:ifseriesisnotNone:raiseValueError("Cannot specify both `series` and `y`. Please provide only one of them.")series=kwargs.pop('y')ifnotisinstance(series,(pd.Series,pd.DataFrame,dict)):raiseTypeError("Input must be a pandas Series, DataFrame or dict.")ifnotisinstance(exog,(pd.Series,pd.DataFrame,dict,type(None))):raiseTypeError("Exogenous variables must be a pandas Series, DataFrame or dict.")self.series_names_in_=[]self.series_values_range_={}self.exog_names_in_=Noneself.exog_values_range_=Noneself.series_specific_exog_=Falseself.is_fitted=Falseseries=self._normalize_input(series,name="series")forkey,valueinseries.items():self.series_names_in_.append(key)self.series_values_range_[key]=self._get_features_range(X=value)ifexogisnotNone:exog=self._normalize_input(exog,name="exog")self.exog_names_in_=[]self.exog_values_range_={}forkey,valueinexog.items():ifisinstance(value,pd.Series):self.exog_names_in_.append(key)else:self.exog_names_in_.extend(value.columns)self.exog_values_range_[key]=self._get_features_range(X=value)self.exog_names_in_=list(dict.fromkeys(self.exog_names_in_))self.series_specific_exog_=any(keyinself.series_names_in_forkeyinexog.keys())self.is_fitted=True
If self.series_specific_exog_ is False: returns a list with the names
of exogenous variables that are out of range (global exogenous).
If self.series_specific_exog_ is True: returns a dictionary where
keys are series names and values are lists of out-of-range exogenous
variables for each series.
Source code in skforecast\drift_detection\_range_drift.py
defpredict(self,last_window:pd.Series|pd.DataFrame|dict[str,pd.Series|pd.DataFrame]|None=None,exog:pd.Series|pd.DataFrame|dict[str,pd.Series|pd.DataFrame]|None=None,verbose:bool=True,suppress_warnings:bool=False)->tuple[bool,list[str],list[str]|dict[str,list[str]]]:""" Check if there is any value outside the training range for last_window and exog. Parameters ---------- last_window : pandas Series, pandas DataFrame, dict, default None Series values used to create the predictors (lags) needed in the first iteration of the prediction (t + 1). exog : pandas Series, pandas DataFrame, dict, default None Exogenous variable/s included as predictor/s. verbose : bool, default False Whether to print a summary of the check. suppress_warnings : bool, default False Whether to suppress warnings. Returns ------- flag_out_of_range : bool True if there is any value outside the training range, False otherwise. out_of_range_series : list List of series names that are out of range. out_of_range_exog : list, dict Exogenous variables that are out of range. - If `self.series_specific_exog_` is False: returns a list with the names of exogenous variables that are out of range (global exogenous). - If `self.series_specific_exog_` is True: returns a dictionary where keys are series names and values are lists of out-of-range exogenous variables for each series. """ifnotself.is_fitted:raiseRuntimeError("Model is not fitted yet.")ifnotisinstance(last_window,(pd.Series,pd.DataFrame,dict,type(None))):raiseTypeError("`last_window` must be a pandas Series, DataFrame, dict or None.")ifnotisinstance(exog,(pd.Series,pd.DataFrame,dict,type(None))):raiseTypeError("`exog` must be a pandas Series, DataFrame, dict or None.")set_skforecast_warnings(suppress_warnings,action='ignore')flag_out_of_range=Falseout_of_range_series=[]out_of_range_series_ranges=[]iflast_windowisnotNone:last_window=self._normalize_input(last_window,name="last_window")forkey,valueinlast_window.items():ifisinstance(value,pd.Series):value=value.to_frame()forcolinvalue.columns:ifkeynotinself.series_names_in_:warnings.warn(f"'{key}' was not seen during training. Its range is unknown.",UnknownLevelWarning)continueis_out_of_range=self._check_feature_range(feature_range=self.series_values_range_[col],X=value[col])ifis_out_of_range:flag_out_of_range=Trueout_of_range_series.append(col)out_of_range_series_ranges.append(self.series_values_range_[col])self._display_warnings(not_compliant_feature=col,feature_range=self.series_values_range_[col],series_name=None)out_of_range_exog={}ifself.series_specific_exog_else[]out_of_range_exog_ranges={}ifself.series_specific_exog_else[]ifexogisnotNone:series_ids=list(last_window.keys())iflast_windowisnotNoneelseself.series_names_in_exog=self._normalize_input(exog,name="exog",series_ids=series_ids)forkey,valueinexog.items():ifisinstance(value,pd.Series):value=value.to_frame()features_ranges=self.exog_values_range_.get(key,None)ifself.series_specific_exog_:out_of_range_exog[key]=[]out_of_range_exog_ranges[key]=[]forcolinvalue.columns:ifnotisinstance(features_ranges,dict):features_ranges={key:features_ranges}ifcolnotinself.exog_names_in_:warnings.warn(f"'{col}' was not seen during training. Its range is unknown.",MissingExogWarning,)continueis_out_of_range=self._check_feature_range(feature_range=features_ranges[col],X=value[col])ifis_out_of_range:flag_out_of_range=Trueifself.series_specific_exog_:out_of_range_exog[key].append(col)out_of_range_exog_ranges[key].append(features_ranges[col])else:out_of_range_exog.append(col)out_of_range_exog_ranges.append(features_ranges[col])self._display_warnings(not_compliant_feature=col,feature_range=features_ranges[col],series_name=keyifself.series_specific_exog_elseNone,)ifself.series_specific_exog_andnotout_of_range_exog[key]:out_of_range_exog.pop(key)out_of_range_exog_ranges.pop(key)ifverbose:self._summary(out_of_range_series=out_of_range_series,out_of_range_series_ranges=out_of_range_series_ranges,out_of_range_exog=out_of_range_exog,out_of_range_exog_ranges=out_of_range_exog_ranges)set_skforecast_warnings(suppress_warnings,action='default')returnflag_out_of_range,out_of_range_series,out_of_range_exog
A class to detect population drift between reference and new datasets.
This implementation computes Kolmogorov-Smirnov (KS) test for numeric features,
Chi-Square test for categorical features, and Jensen-Shannon (JS) distance
for all features. It calculates empirical distributions of these statistics
from the reference data and uses quantile thresholds to determine drift in
new data.
This implementation is inspired by NannyML's DriftDetector. See Notes for
details.
Size of chunks for sequential drift analysis. If int, number of rows per
chunk. If str (e.g., 'D' for daily, 'W' for weekly), time-based chunks
assuming a datetime index. If None, analyzes the full dataset as a single
chunk.
Size of chunks for sequential drift analysis. If int, number of rows per
chunk. If str (e.g., 'D' for daily, 'W' for weekly), time-based chunks
assuming a datetime index. If None, analyzes the full dataset as a single
chunk.
List of series IDs present during fitting when using MultiIndex DataFrames.
Notes
This implementation is inspired by NannyML's DriftDetector [1]_.
It is a lightweight version adapted for skforecast's needs:
- It does not store the raw reference data, only the necessary precomputed
information to calculate the statistics efficiently during prediction.
- All empirical thresholds are calculated using the specified quantile from
the empirical distributions obtained from the reference data chunks.
- It includes checks for out of range values in numeric features and new
categories in categorical features.
- It supports multiple time series by fitting separate detectors for each
series ID when provided with a MultiIndex DataFrame.
If user requires more advanced features, such as multivariate drift detection
or data quality checks, consider using https://nannyml.readthedocs.io/en/stable/
directly.
def__init__(self,chunk_size=None,threshold=0.95)->None:self.ref_features_=Noneself.is_fitted_=Falseself.ref_ecdf_={}self.ref_bins_edges_={}self.ref_hist_={}self.ref_probs_={}self.ref_counts_={}self.empirical_dist_ks_={}self.empirical_dist_chi2_={}self.empirical_dist_js_={}self.empirical_threshold_ks_={}self.empirical_threshold_chi2_={}self.empirical_threshold_js_={}self.ref_ranges_={}self.ref_categories_={}self.n_chunks_reference_data_=Noneself.detectors_={}# NOTE: Only used for multiseriesself.series_names_in_=None# NOTE: Only used for multiseriesifnot(0<threshold<1):raiseValueError(f"`threshold` must be between 0 and 1. Got {threshold}.")self.threshold=thresholderror_msg=("`chunk_size` must be a positive integer, a string compatible with ""pandas DateOffset (e.g., 'D', 'W', 'M'), a pandas DateOffset object, or None.")ifnot(isinstance(chunk_size,(int,str,pd.DateOffset,type(None)))):raiseTypeError(f"{error_msg} Got {type(chunk_size)}.")ifisinstance(chunk_size,str):try:chunk_size=pd.tseries.frequencies.to_offset(chunk_size)exceptValueError:raiseValueError(f"{error_msg} Got {type(chunk_size)}.")ifisinstance(chunk_size,int)andchunk_size<=0:raiseValueError(f"{error_msg} Got {chunk_size}.")self.chunk_size=chunk_size
Fit the drift detector by calculating empirical distributions and thresholds
from reference data. The empirical distributions are computed by chunking
the reference data according to the specified chunk_size and calculating
the statistics for each chunk.
Parameters:
Name
Type
Description
Default
X
pandas DataFrame
Reference data used as the baseline for drift detection.
required
Source code in skforecast\drift_detection\_population_drift.py
def_fit(self,X)->None:""" Fit the drift detector by calculating empirical distributions and thresholds from reference data. The empirical distributions are computed by chunking the reference data according to the specified `chunk_size` and calculating the statistics for each chunk. Parameters ---------- X : pandas DataFrame Reference data used as the baseline for drift detection. """self.ref_features_=[]self.is_fitted_=Falseself.ref_ecdf_={}self.ref_bins_edges_={}self.ref_hist_={}self.ref_probs_={}self.ref_counts_={}self.empirical_dist_ks_={}self.empirical_dist_chi2_={}self.empirical_dist_js_={}self.empirical_threshold_ks_={}self.empirical_threshold_chi2_={}self.empirical_threshold_js_={}self.ref_ranges_={}self.ref_categories_={}self.n_chunks_reference_data_=Noneself.detectors_={}# NOTE: Only used for multiseriesself.series_names_in_=None# NOTE: Only used for multiseriesifself.chunk_sizeisnotNone:ifisinstance(self.chunk_size,pd.offsets.DateOffset)andnotisinstance(X.index,pd.DatetimeIndex):raiseValueError("`chunk_size` is a pandas DateOffset but `X` does not have a DatetimeIndex.")ifself.chunk_sizeisnotNone:ifisinstance(self.chunk_size,int):chunks_ref=[X.iloc[i:i+self.chunk_size]foriinrange(0,len(X),self.chunk_size)]elifisinstance(self.chunk_size,(str,pd.offsets.DateOffset))andisinstance(X.index,pd.DatetimeIndex):chunks_ref=[groupfor_,groupinX.resample(self.chunk_size)]else:chunks_ref=[X]self.n_chunks_reference_data_=len(chunks_ref)features=X.columns.tolist()forfeatureinfeatures:is_numeric=pd.api.types.is_numeric_dtype(X[feature])ref=X[feature].dropna()ifref.empty:warnings.warn(f"Feature '{feature}' contains only NaN values in the reference dataset. "f"Drift detection skipped.",UnknownLevelWarning)continueself.empirical_dist_ks_[feature]=[]self.empirical_dist_chi2_[feature]=[]self.empirical_dist_js_[feature]=[]self.ref_features_.append(feature)ifis_numeric:# Precompute histogram with bins for Jensen-Shannon distance# This may not perfectly align with bins used in predict if new data# extends the range, but it provides a reasonable approximation# for efficiency.min_val=ref.min()max_val=ref.max()bins_edges=np.histogram_bin_edges(ref.astype("float64"),bins='doane')ref_hist=np.histogram(ref,bins=bins_edges)[0]/len(ref)self.ref_bins_edges_[feature]=bins_edgesself.ref_hist_[feature]=ref_histself.ref_ranges_[feature]=(min_val,max_val)# Precompute ECDF for Kolmogorov-Smirnov testself.ref_ecdf_[feature]=ecdf(ref)else:counts_raw=ref.value_counts()counts_norm=counts_raw/counts_raw.sum()self.ref_counts_[feature]=counts_rawself.ref_probs_[feature]=counts_normself.ref_categories_[feature]=counts_raw.index.tolist()forchunkinchunks_ref:new=chunk[feature].dropna()ifnew.empty:continueref=ref[~ref.index.isin(new.index)]ks_stat=np.nanchi2_stat=np.nanjs_distance=np.nanifis_numeric:new_ecdf=ecdf(new)new_hist=np.histogram(new,bins=self.ref_bins_edges_[feature])[0]/len(new)# Handle out-of-bin data: if new data contains values outside the reference range,# they will not be counted in the histogram, leading to a sum < 1. To ensure# the histograms are comparable, we add an extra bin for "out-of-range" data with# the leftover probability mass in the new histogram and a corresponding zero bin# in the reference histogram.leftover=1-np.sum(new_hist)ifleftover>0:new_hist=np.append(new_hist,leftover)ref_hist_appended=np.append(self.ref_hist_[feature],0)js_distance=jensenshannon(ref_hist_appended,new_hist,base=2)else:js_distance=jensenshannon(self.ref_hist_[feature],new_hist,base=2)ks_stat=ks_2samp_from_ecdf(ecdf1=self.ref_ecdf_.get(feature),ecdf2=new_ecdf,alternative="two-sided")else:new_probs=new.value_counts(normalize=True).sort_index()ref_probs=self.ref_probs_.get(feature)# Align categories and fill missing with 0all_cats=ref_probs.index.union(new_probs.index)ref_probs=ref_probs.reindex(all_cats,fill_value=0)new_probs=new_probs.reindex(all_cats,fill_value=0)js_distance=jensenshannon(ref_probs.to_numpy(),new_probs.to_numpy())# Align categories and fill missing with 0new_counts=new.value_counts().reindex(all_cats,fill_value=0).to_numpy()ref_counts=self.ref_counts_.get(feature).reindex(all_cats,fill_value=0).to_numpy()ifnew_counts.sum()>0andref_counts.sum()>0:# Create contingency table with rows = [reference, new], columns = categoriescontingency_table=np.array([ref_counts,new_counts])chi2_stat=chi2_contingency(contingency_table)[0]self.empirical_dist_ks_[feature].append(ks_stat)self.empirical_dist_chi2_[feature].append(chi2_stat)self.empirical_dist_js_[feature].append(js_distance)# Calculate empirical thresholds using the the specified quantile# Using pandas Series quantile method to handle NaNs properly and warningsself.empirical_threshold_ks_[feature]=pd.Series(self.empirical_dist_ks_[feature]).quantile(self.threshold)self.empirical_threshold_chi2_[feature]=pd.Series(self.empirical_dist_chi2_[feature]).quantile(self.threshold)self.empirical_threshold_js_[feature]=pd.Series(self.empirical_dist_js_[feature]).quantile(self.threshold)self.is_fitted_=True
Fit the drift detector by calculating empirical distributions and thresholds
from reference data. The empirical distributions are computed by chunking
the reference data according to the specified chunk_size and calculating
the statistics for each chunk.
Parameters:
Name
Type
Description
Default
X
pandas DataFrame
Reference data used as the baseline for drift detection.
required
Source code in skforecast\drift_detection\_population_drift.py
deffit(self,X)->None:""" Fit the drift detector by calculating empirical distributions and thresholds from reference data. The empirical distributions are computed by chunking the reference data according to the specified `chunk_size` and calculating the statistics for each chunk. Parameters ---------- X : pandas DataFrame Reference data used as the baseline for drift detection. """ifnotisinstance(X,pd.DataFrame):raiseValueError(f"`X` must be a pandas DataFrame. Got {type(X)} instead.")ifisinstance(X.index,pd.MultiIndex):X=X.groupby(level=0)foridx,groupinX:group=group.droplevel(0)self.detectors_[idx]=PopulationDriftDetector(chunk_size=self.chunk_size,threshold=self.threshold)self.detectors_[idx]._fit(group)else:self._fit(X)self.is_fitted_=Trueself.series_names_in_=list(self.detectors_.keys())ifself.detectors_elseNoneself._collect_attributes()
def_predict(self,X)->pd.DataFrame:""" Predict drift in new data by comparing the estimated statistics to reference thresholds. Parameters ---------- X : pandas DataFrame New data to compare against the reference. Returns ------- results : pandas DataFrame DataFrame with the drift detection results for each chunk. """ifself.chunk_sizeisnotNone:ifisinstance(self.chunk_size,pd.offsets.DateOffset)andnotisinstance(X.index,pd.DatetimeIndex):raiseValueError("`chunk_size` is a pandas DateOffset but `X` does not have a DatetimeIndex.")ifself.chunk_sizeisnotNone:ifisinstance(self.chunk_size,int):chunks=[X.iloc[i:i+self.chunk_size]foriinrange(0,len(X),self.chunk_size)]else:chunks=[groupfor_,groupinX.resample(self.chunk_size)]else:chunks=[X]results=[]features=X.columns.tolist()forfeatureinfeatures:iffeaturenotinself.ref_features_:warnings.warn(f"Feature '{feature}' was not present during fitting. Drift detection skipped."f"for this feature.",UnknownLevelWarning)continueis_numeric=pd.api.types.is_numeric_dtype(X[feature])ref_bin_edges=self.ref_bins_edges_.get(feature,None)ref_hist=self.ref_hist_.get(feature,None)ref_probs=self.ref_probs_.get(feature,None)ref_counts=self.ref_counts_.get(feature,None)ref_ecdf=self.ref_ecdf_.get(feature,None)threshold_ks=self.empirical_threshold_ks_.get(feature,np.nan)threshold_chi2=self.empirical_threshold_chi2_.get(feature,np.nan)threshold_js=self.empirical_threshold_js_.get(feature,np.nan)ref_range=self.ref_ranges_.get(feature,(np.nan,np.nan))forchunk_idx,chunkinenumerate(chunks):chunk_label=chunk_idxifself.chunk_sizeelse"full"new=chunk[feature].dropna()ks_stat=np.nanchi2_stat=np.nanjs_distance=np.nanis_out_of_range=np.nanifnotnew.empty:ifis_numeric:new_ecdf=ecdf(new)# Compute histogram for new data using reference bin edges and normalizenew_hist=np.histogram(new,bins=ref_bin_edges)[0]/len(new)# Handle out-of-bin data: if new data contains values outside the reference range,# they will not be counted in the histogram, leading to a sum < 1. To ensure# the histograms are comparable, we add an extra bin for "out-of-range" data with# the leftover probability mass in the new histogram and a corresponding zero bin# in the reference histogram.leftover=1-np.sum(new_hist)ifleftover>0:new_hist=np.append(new_hist,leftover)ref_hist_appended=np.append(ref_hist,0)js_distance=jensenshannon(ref_hist_appended,new_hist,base=2)else:js_distance=jensenshannon(ref_hist,new_hist,base=2)ks_stat=ks_2samp_from_ecdf(ecdf1=ref_ecdf,ecdf2=new_ecdf,alternative="two-sided")is_out_of_range=(np.min(new)<ref_range[0]ornp.max(new)>ref_range[1])else:ref_categories=self.ref_categories_[feature]ref_probs_=ref_probs.reindex(ref_categories,fill_value=0).to_numpy()# Map new data to reference categoriesnew_counts_dict=new.value_counts().to_dict()new_counts_on_ref=[new_counts_dict.get(cat,0)forcatinref_categories]new_probs=(np.array(new_counts_on_ref)/len(new)iflen(new)>0elsenp.zeros(len(ref_categories)))# Compute leftover (probability of new categories not in reference): if new data# contains categories not seen in reference, they will not be counted in the# histogram, leading to a sum < 1. To ensure the histograms are comparable,# we add an extra bin for "new categories" with the leftover probability mass# in the new histogram and a corresponding zero bin in the reference histogram.leftover=1-np.sum(new_probs)ifleftover>0:new_probs=np.append(new_probs,leftover)ref_probs_appended=np.append(ref_probs_,0)js_distance=jensenshannon(ref_probs_appended,new_probs,base=2)else:js_distance=jensenshannon(ref_probs_,new_probs,base=2)all_cats=set(self.ref_categories_[feature]).union(set(new_counts_dict.keys()))new_counts=new.value_counts().reindex(all_cats,fill_value=0).to_numpy()ref_counts_aligned=ref_counts.reindex(all_cats,fill_value=0).to_numpy()ifnew_counts.sum()>0andref_counts_aligned.sum()>0:# Create contingency table: rows = [reference, new], columns = categoriescontingency_table=np.array([ref_counts_aligned,new_counts])chi2_stat=chi2_contingency(contingency_table)[0]results.append({"chunk":chunk_label,"chunk_start":chunk.index.min(),"chunk_end":chunk.index.max(),"feature":feature,"ks_statistic":ks_stat,"threshold_ks":threshold_ks,"chi2_statistic":chi2_stat,"threshold_chi2":threshold_chi2,"jensen_shannon":js_distance,"threshold_js":threshold_js,"reference_range":ref_range,"is_out_of_range":is_out_of_range,})results_df=pd.DataFrame(results)results_df['drift_ks_statistic']=results_df['ks_statistic']>results_df['threshold_ks']results_df['drift_chi2_statistic']=results_df['chi2_statistic']>results_df['threshold_chi2']results_df['drift_js']=results_df['jensen_shannon']>results_df['threshold_js']results_df['drift_detected']=(results_df['drift_ks_statistic']|results_df['drift_chi2_statistic']|results_df['drift_js']|results_df['is_out_of_range'])returnresults_df
Predict drift in new data by comparing the estimated statistics to
reference thresholds. Two dataframes are returned, the first one with
detailed information of each chunk, the second only the total number
of chunks where drift have been detected.
Parameters:
Name
Type
Description
Default
X
pandas DataFrame
New data to compare against the reference.
required
Returns:
Name
Type
Description
results
pandas DataFrame
DataFrame with the drift detection results for each chunk.
summary
pandas DataFrame
Summary DataFrame with the total number and percentage of chunks
with detected drift per feature (or per series_id and feature if
MultiIndex).
Source code in skforecast\drift_detection\_population_drift.py
defpredict(self,X)->tuple[pd.DataFrame,pd.DataFrame]:""" Predict drift in new data by comparing the estimated statistics to reference thresholds. Two dataframes are returned, the first one with detailed information of each chunk, the second only the total number of chunks where drift have been detected. Parameters ---------- X : pandas DataFrame New data to compare against the reference. Returns ------- results : pandas DataFrame DataFrame with the drift detection results for each chunk. summary : pandas DataFrame Summary DataFrame with the total number and percentage of chunks with detected drift per feature (or per series_id and feature if MultiIndex). """ifnotself.is_fitted_:raiseNotFittedError("This PopulationDriftDetector instance is not fitted yet. ""Call 'fit' with appropriate arguments before using this estimator.")ifnotisinstance(X,pd.DataFrame):raiseValueError(f"`X` must be a pandas DataFrame. Got {type(X)} instead.")ifisinstance(X.index,pd.MultiIndex):results=[]foridx,groupinX.groupby(level=0):group=group.droplevel(0)ifidxnotinself.detectors_:warnings.warn(f"Series '{idx}' was not present during fitting. Drift detection skipped.",UnknownLevelWarning)continuedetector=self.detectors_[idx]result=detector._predict(group)result.insert(0,'series_id',idx)results.append(result)results=pd.concat(results,ignore_index=True)else:results=self._predict(X)ifresults.columns[0]=='series_id':summary=(results.groupby(['series_id','feature'])['drift_detected'].agg(['sum','mean']).reset_index().rename(columns={'sum':'n_chunks_with_drift','mean':'pct_chunks_with_drift'}))else:summary=(results.groupby(['feature'])['drift_detected'].agg(['sum','mean']).reset_index().rename(columns={'sum':'n_chunks_with_drift','mean':'pct_chunks_with_drift'}))summary['pct_chunks_with_drift']=summary['pct_chunks_with_drift']*100returnresults,summary
Collect attributes for representation and inspection and update the instance
dictionary with the collected values. For multi-series (when detectors_ is
populated), attributes are aggregated into nested dictionaries keyed by
detector names. For single-series, attributes remain unchanged.
Parameters:
Name
Type
Description
Default
self
required
Returns:
Type
Description
None
Source code in skforecast\drift_detection\_population_drift.py
def_collect_attributes(self)->None:""" Collect attributes for representation and inspection and update the instance dictionary with the collected values. For multi-series (when detectors_ is populated), attributes are aggregated into nested dictionaries keyed by detector names. For single-series, attributes remain unchanged. Parameters ---------- self Returns ------- None """attr_names=[kforkinself.__dict__.keys()ifknotin['is_fitted_','detectors_','series_names_in_']]ifself.detectors_:forattr_nameinattr_names:collected={}fordetector_key,detectorinself.detectors_.items():collected[detector_key]=getattr(detector,attr_name,None)self.__dict__[attr_name]=deepcopy(collected)