Skip to content

drift_detection

skforecast.drift_detection._range_drift.RangeDriftDetector

RangeDriftDetector()

Detector of out-of-range values based on training feature ranges.

The detector is intentionally lightweight: it does not compute advanced drift statistics since it is used to check single observations during inference. Suitable for real-time applications.

Parameters:

Name Type Description Default
self
required

Attributes:

Name Type Description
series_names_in_ list

Names of the series used during training.

series_values_range_ dict

Range of values of the target series used during training.

exog_names_in_ list

Names of the exogenous variables used during training.

exog_values_range_ dict

Range of values of the exogenous variables used during training.

series_specific_exog_ bool

Indicates whether exogenous variables have different values across target series during training (i.e., exogenous is series-specific rather than global).

is_fitted bool

Whether the detector has been fitted to the training data.

Methods:

Name Description
fit

Fit detector, storing training ranges.

predict

Check if there is any value outside the training range for last_window and exog.

Source code in skforecast\drift_detection\_range_drift.py
59
60
61
62
63
64
65
66
def __init__(self) -> None:

    self.series_names_in_      = None
    self.series_values_range_  = None
    self.exog_names_in_        = None
    self.exog_values_range_    = None
    self.series_specific_exog_ = False
    self.is_fitted             = False

series_names_in_ instance-attribute

series_names_in_ = None

series_values_range_ instance-attribute

series_values_range_ = None

exog_names_in_ instance-attribute

exog_names_in_ = None

exog_values_range_ instance-attribute

exog_values_range_ = None

series_specific_exog_ instance-attribute

series_specific_exog_ = False

is_fitted instance-attribute

is_fitted = False

_repr_html_

_repr_html_()

HTML representation of the object. The "General Information" section is expanded by default.

Source code in skforecast\drift_detection\_range_drift.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def _repr_html_(self):
    """
    HTML representation of the object.
    The "General Information" section is expanded by default.
    """

    series_names_in_ = None
    if self.series_names_in_ is not None:
        if len(self.series_names_in_) > 50:
            series_names_in_ = self.series_names_in_[:25] + ["..."] + self.series_names_in_[-25:]
            series_names_in_ = ", ".join(series_names_in_)
        else:
            series_names_in_ = ", ".join(self.series_names_in_)

    exog_names_in_ = None
    if self.exog_names_in_ is not None:
        if len(self.exog_names_in_) > 50:
            exog_names_in_ = self.exog_names_in_[:25] + ["..."] + self.exog_names_in_[-25:]
            exog_names_in_ = ", ".join(exog_names_in_)
        else:
            exog_names_in_ = ", ".join(self.exog_names_in_)

    style, unique_id = get_style_repr_html(self.is_fitted)
    content = f"""
    <div class="container-{unique_id}">
        <p style="font-size: 1.5em; font-weight: bold; margin-block-start: 0.83em; margin-block-end: 0.83em;">{type(self).__name__}</p>
        <details open>
            <summary>General Information</summary>
            <ul>
                <li><strong>Fitted series:</strong> {series_names_in_}</li>
                <li><strong>Fitted exogenous:</strong> {exog_names_in_}</li>
                <li><strong>Series-specific exogenous:</strong> {self.series_specific_exog_}</li>
                <li><strong>Is fitted:</strong> {self.is_fitted}</li>
            </ul>
        </details>
        <details>
            <summary>Series value ranges</summary>
            <ul>
                {self.series_values_range_}
            </ul>
        </details>
        <details>
            <summary>Exogenous value ranges</summary>
            <ul>
                {self.exog_values_range_}
            </ul>
        </details>
        <p>
            <a href="https://skforecast.org/{__version__}/api/drift_detection.html#skforecast.drift_detection._range_drift.RangeDriftDetector">&#128712 <strong>API Reference</strong></a>
            &nbsp;&nbsp;
            <a href="https://skforecast.org/{__version__}/user_guides/drift-detection.html">&#128462 <strong>User Guide</strong></a>
        </p>
    </div>
    """

    return style + content

_get_features_range classmethod

_get_features_range(X)

Get a summary of the features in the DataFrame or Series. For numeric features, it returns the min and max values. For categorical features, it returns the unique values.

Parameters:

Name Type Description Default
X pandas Series, pandas DataFrame

Input data to summarize.

required

Returns:

Name Type Description
features_ranges (tuple, set, dict)

Feature ranges. If X is a Series, returns a tuple (min, max) for numeric data or a set of unique values for categorical data. If X is a DataFrame, returns a dictionary with column names as keys and their respective ranges (tuple or set) as values.

Source code in skforecast\drift_detection\_range_drift.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
@classmethod
def _get_features_range(
    cls, 
    X: pd.Series | pd.DataFrame
) -> tuple | set | dict[str, tuple | set]:
    """
    Get a summary of the features in the DataFrame or Series. For numeric
    features, it returns the min and max values. For categorical features,
    it returns the unique values.

    Parameters
    ----------
    X : pandas Series, pandas DataFrame
        Input data to summarize.

    Returns
    -------
    features_ranges: tuple, set, dict
        Feature ranges. If X is a Series, returns a tuple (min, max) for numeric
        data or a set of unique values for categorical data. If X is a DataFrame,
        returns a dictionary with column names as keys and their respective ranges
        (tuple or set) as values.

    """

    if not isinstance(X, (pd.DataFrame, pd.Series)):
        raise TypeError("Input must be a pandas DataFrame or Series.")

    if isinstance(X, pd.Series):
        if pd.api.types.is_numeric_dtype(X):
            features_ranges = (float(X.min()), float(X.max()))
        else:
            features_ranges = set(X.dropna().unique())

    if isinstance(X, pd.DataFrame):
        num_cols = [
            col for col in X.columns if pd.api.types.is_numeric_dtype(X[col])
        ]
        cat_cols = [col for col in X.columns if col not in num_cols]

        features_ranges = {}
        features_ranges.update(
            {col: (float(X[col].min()), float(X[col].max())) for col in num_cols}
        )
        features_ranges.update(
            {col: set(X[col].dropna().unique()) for col in cat_cols}
        )

    return features_ranges

_check_feature_range classmethod

_check_feature_range(feature_range, X)

Check if there is any value outside the training range. For numeric features, it checks if the values are within the min and max range. For categorical features, it checks if the values are among the seen categories.

Parameters:

Name Type Description Default
feature_range (tuple, set)

Output from _get_features_range() for a single feature.

required
X Series

New data to validate

required

Returns:

Type Description
bool

True if there is any value outside the training range, False otherwise.

Source code in skforecast\drift_detection\_range_drift.py
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
@classmethod
def _check_feature_range(
    cls,
    feature_range: tuple | set,
    X: pd.Series
) -> bool:
    """
    Check if there is any value outside the training range. For numeric features,
    it checks if the values are within the min and max range. For categorical features,
    it checks if the values are among the seen categories.

    Parameters
    ----------
    feature_range : tuple, set
        Output from _get_features_range() for a single feature.
    X : pd.Series
        New data to validate

    Returns
    -------
    bool
        True if there is any value outside the training range, False otherwise.

    """

    if isinstance(feature_range, tuple):
        return X.min() < feature_range[0] or X.max() > feature_range[1]
    else:
        unseen = set(X.dropna().unique()) - feature_range
        return bool(unseen)

_display_warnings classmethod

_display_warnings(
    not_compliant_feature, feature_range, series_name=None
)

Display warnings for features with values outside the training range.

Parameters:

Name Type Description Default
not_compliant_feature str

Name of the feature with values outside the training range.

required
feature_range tuple | set

Training range of the feature.

required
series_name str

Name of the series being checked, if applicable.

None

Returns:

Type Description
None
Source code in skforecast\drift_detection\_range_drift.py
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
@classmethod
def _display_warnings(
    cls,
    not_compliant_feature: str,
    feature_range: tuple | set,
    series_name: str = None,
) -> None:
    """
    Display warnings for features with values outside the training range.

    Parameters
    ----------
    not_compliant_feature : str
        Name of the feature with values outside the training range.
    feature_range : tuple | set
        Training range of the feature.
    series_name : str, optional
        Name of the series being checked, if applicable.

    Returns
    -------
    None

    """

    if isinstance(feature_range, tuple):
        # Numeric
        msg = (
            f"'{not_compliant_feature}' has values outside the range seen during training "
            f"[{feature_range[0]:.5f}, {feature_range[1]:.5f}]. "
            f"This may affect the accuracy of the predictions."
        )
    else:
        # Categorical
        msg = (
            f"'{not_compliant_feature}' has values not seen during training. Seen values: "
            f"{feature_range}. This may affect the accuracy of the predictions."
        )

    if series_name:
        msg = f"'{series_name}': " + msg

    warnings.warn(msg, FeatureOutOfRangeWarning)

_summary classmethod

_summary(
    out_of_range_series,
    out_of_range_series_ranges,
    out_of_range_exog,
    out_of_range_exog_ranges,
)

Summarize the results of the range check.

Parameters:

Name Type Description Default
out_of_range_series list

List of series names that are out of range.

required
out_of_range_series_ranges list

List of ranges for the out-of-range series.

required
out_of_range_exog list

List of exogenous variable names that are out of range.

required
out_of_range_exog_ranges list

List of ranges for the out-of-range exogenous variables.

required

Returns:

Type Description
None
Source code in skforecast\drift_detection\_range_drift.py
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
@classmethod
def _summary(
    cls,
    out_of_range_series: list,
    out_of_range_series_ranges: list,
    out_of_range_exog: list,
    out_of_range_exog_ranges: list
) -> None:
    """
    Summarize the results of the range check.

    Parameters
    ----------
    out_of_range_series : list
        List of series names that are out of range.
    out_of_range_series_ranges : list
        List of ranges for the out-of-range series.
    out_of_range_exog : list
        List of exogenous variable names that are out of range.
    out_of_range_exog_ranges : list
        List of ranges for the out-of-range exogenous variables.

    Returns
    -------
    None

    """

    msg_series = ""
    if out_of_range_series:
        series_msgs = []
        for series, series_range in zip(
            out_of_range_series, out_of_range_series_ranges
        ):
            msg_temp = (
                f"'{series}' has values outside the observed range "
                f"[{series_range[0]:.5f}, {series_range[1]:.5f}]."
            )
            series_msgs.append(textwrap.fill(msg_temp, width=80))
        msg_series = "\n".join(series_msgs) + "\n"
    else:
        msg_series = "No series with out-of-range values found.\n"

    msg_exog = ""
    if out_of_range_exog:
        exog_msgs = []
        if isinstance(out_of_range_exog, list):
            for exog, exog_range in zip(out_of_range_exog, out_of_range_exog_ranges):
                if isinstance(exog_range, tuple):
                    # Numeric
                    msg_temp = (
                        f"'{exog}' has values outside the observed range "
                        f"[{exog_range[0]:.5f}, {exog_range[1]:.5f}]."
                    )
                else:
                    # Categorical
                    msg_temp = (
                        f"'{exog}' has values not seen during training. Seen values: "
                        f"{exog_range}."
                    )
                exog_msgs.append(textwrap.fill(msg_temp, width=80))
        else:
            for key, value in out_of_range_exog.items():
                for exog, exog_range in zip(value, out_of_range_exog_ranges[key]):
                    if isinstance(exog_range, tuple):
                        # Numeric
                        msg_temp = (
                            f"'{exog}' has values outside the observed range "
                            f"[{exog_range[0]:.5f}, {exog_range[1]:.5f}]."
                        )
                    else:
                        # Categorical
                        msg_temp = (
                            f"'{exog}' has values not seen during training. Seen values: "
                            f"{exog_range}."
                        )
                    msg_temp = f"'{key}': " + msg_temp
                    exog_msgs.append(textwrap.fill(msg_temp, width=80))

        msg_exog = "\n".join(exog_msgs)
    else:
        msg_exog = "No exogenous variables with out-of-range values found."

    console = Console()
    content = (
        f"[bold]Series:[/bold]\n{msg_series}\n"
        f"[bold]Exogenous Variables:[/bold]\n{msg_exog}"
    )
    console.print(Panel(content, title="[bold]Out-of-range summary[/bold]", expand=False))

_normalize_input

_normalize_input(X, name, series_ids=None)

Convert pd.Series, pd.DataFrame or dict into a standardized dict of pd.Series or pd.DataFrames.

Parameters:

Name Type Description Default
X pandas Series, pandas DataFrame, dict

Input data to normalize.

required
name str

Name of the input being normalized. Used for error messages. Expected values are 'series', 'last_window' or 'exog'.

required
series_ids list

Series IDs to include in the normalization of exogenous variables.

None

Returns:

Name Type Description
X dict

Normalized input as a dictionary of pandas Series or DataFrames.

Source code in skforecast\drift_detection\_range_drift.py
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
def _normalize_input(
    self, 
    X: pd.Series | pd.DataFrame | dict[str, pd.Series | pd.DataFrame],
    name: str,
    series_ids: list[str] | None = None
) -> dict[str, pd.Series | pd.DataFrame]:
    """
    Convert pd.Series, pd.DataFrame or dict into a standardized dict of
    pd.Series or pd.DataFrames.

    Parameters
    ----------
    X : pandas Series, pandas DataFrame, dict
        Input data to normalize.
    name : str
        Name of the input being normalized. Used for error messages.
        Expected values are 'series', 'last_window' or 'exog'.
    series_ids : list, default None
        Series IDs to include in the normalization of exogenous variables.

    Returns
    -------
    X : dict
        Normalized input as a dictionary of pandas Series or DataFrames.

    """

    if isinstance(X, pd.Series):
        if not X.name:
            raise ValueError(
                f"{name} must have a name when a pandas Series is provided."
            )
        X = {X.name: X}

    elif isinstance(X, pd.DataFrame):
        if isinstance(X.index, pd.MultiIndex):
            if name in ["series", "last_window"]:
                col = X.columns[0]
                if len(X.columns) != 1:
                    warnings.warn(
                        f"`{name}` DataFrame has multiple columns. Only the "
                        f"first column, '{col}', will be used. Others ignored.",
                        IgnoredArgumentWarning,
                    )
                X = {
                    series_id: X.loc[series_id][col].rename(series_id)
                    for series_id in X.index.levels[0]
                }
            else:
                X = {series_id: X.loc[series_id] for series_id in X.index.levels[0]}
        else:
            if self.series_specific_exog_ and series_ids:
                X = {series_id: X.copy() for series_id in series_ids}
            else:
                X = X.to_dict(orient="series")

    elif isinstance(X, dict):
        for k, v in X.items():
            if not isinstance(v, (pd.Series, pd.DataFrame)):
                raise TypeError(
                    f"All values in `{name}` must be a pandas Series or DataFrame. "
                    f"Review the value for key '{k}'."
                )

    return X

fit

fit(series=None, exog=None, **kwargs)

Fit detector, storing training ranges.

Parameters:

Name Type Description Default
series pandas Series, pandas DataFrame, dict, aliases: `y`

Input time series data to fit the detector, ideally the same ones used to fit the forecaster.

None
exog pandas Series, pandas DataFrame, dict

Exogenous variables to include in the forecaster.

None

Returns:

Type Description
None
Source code in skforecast\drift_detection\_range_drift.py
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
def fit(
    self,
    series: pd.DataFrame | pd.Series | dict[str, pd.Series | pd.DataFrame] | None = None,
    exog: pd.DataFrame | pd.Series | dict[str, pd.Series | pd.DataFrame] | None = None,
    **kwargs
) -> None:
    """
    Fit detector, storing training ranges.

    Parameters
    ----------
    series : pandas Series, pandas DataFrame, dict, aliases: `y`
        Input time series data to fit the detector, ideally the same ones
        used to fit the forecaster.
    exog : pandas Series, pandas DataFrame, dict, default None
        Exogenous variables to include in the forecaster.

    Returns
    -------
    None

    """

    if series is None and ('y' not in kwargs or kwargs['y'] is None):
        raise ValueError(
            "One of `series` or `y` must be provided."
        )
    if 'y' in kwargs:
        if series is not None:
            raise ValueError(
                "Cannot specify both `series` and `y`. Please provide only one of them."
            )
        series = kwargs.pop('y')

    if not isinstance(series, (pd.Series, pd.DataFrame, dict)):
        raise TypeError("Input must be a pandas Series, DataFrame or dict.")

    if not isinstance(exog, (pd.Series, pd.DataFrame, dict, type(None))):
        raise TypeError(
            "Exogenous variables must be a pandas Series, DataFrame or dict."
        )

    self.series_names_in_      = []
    self.series_values_range_  = {}
    self.exog_names_in_        = None
    self.exog_values_range_    = None
    self.series_specific_exog_ = False
    self.is_fitted             = False

    series = self._normalize_input(series, name="series")
    for key, value in series.items():
        self.series_names_in_.append(key)
        self.series_values_range_[key] = self._get_features_range(X=value)

    if exog is not None:

        exog = self._normalize_input(exog, name="exog")

        self.exog_names_in_ = []
        self.exog_values_range_ = {}
        for key, value in exog.items():
            if isinstance(value, pd.Series):
                self.exog_names_in_.append(key)
            else:
                self.exog_names_in_.extend(value.columns)
            self.exog_values_range_[key] = self._get_features_range(X=value)

        self.exog_names_in_ = list(dict.fromkeys(self.exog_names_in_))
        self.series_specific_exog_ = any(key in self.series_names_in_ for key in exog.keys())

    self.is_fitted = True

predict

predict(
    last_window=None,
    exog=None,
    verbose=True,
    suppress_warnings=False,
)

Check if there is any value outside the training range for last_window and exog.

Parameters:

Name Type Description Default
last_window pandas Series, pandas DataFrame, dict

Series values used to create the predictors (lags) needed in the first iteration of the prediction (t + 1).

None
exog pandas Series, pandas DataFrame, dict

Exogenous variable/s included as predictor/s.

None
verbose bool

Whether to print a summary of the check.

False
suppress_warnings bool

Whether to suppress warnings.

False

Returns:

Name Type Description
flag_out_of_range bool

True if there is any value outside the training range, False otherwise.

out_of_range_series list

List of series names that are out of range.

out_of_range_exog (list, dict)

Exogenous variables that are out of range.

  • If self.series_specific_exog_ is False: returns a list with the names of exogenous variables that are out of range (global exogenous).
  • If self.series_specific_exog_ is True: returns a dictionary where keys are series names and values are lists of out-of-range exogenous variables for each series.
Source code in skforecast\drift_detection\_range_drift.py
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
def predict(
    self,
    last_window: pd.Series | pd.DataFrame | dict[str, pd.Series | pd.DataFrame] | None = None,
    exog: pd.Series | pd.DataFrame | dict[str, pd.Series | pd.DataFrame] | None = None,
    verbose: bool = True,
    suppress_warnings: bool = False
) -> tuple[bool, list[str], list[str] | dict[str, list[str]]]:
    """
    Check if there is any value outside the training range for last_window and exog.

    Parameters
    ----------
    last_window : pandas Series, pandas DataFrame, dict, default None
        Series values used to create the predictors (lags) needed in the
        first iteration of the prediction (t + 1).
    exog : pandas Series, pandas DataFrame, dict, default None
        Exogenous variable/s included as predictor/s.
    verbose : bool, default False
        Whether to print a summary of the check.
    suppress_warnings : bool, default False
        Whether to suppress warnings.

    Returns
    -------
    flag_out_of_range : bool
        True if there is any value outside the training range, False otherwise.
    out_of_range_series : list
        List of series names that are out of range.
    out_of_range_exog : list, dict
        Exogenous variables that are out of range.

        - If `self.series_specific_exog_` is False: returns a list with the names
        of exogenous variables that are out of range (global exogenous).
        - If `self.series_specific_exog_` is True: returns a dictionary where
        keys are series names and values are lists of out-of-range exogenous
        variables for each series.

    """

    if not self.is_fitted:
        raise RuntimeError("Model is not fitted yet.")

    if not isinstance(last_window, (pd.Series, pd.DataFrame, dict, type(None))):
        raise TypeError(
            "`last_window` must be a pandas Series, DataFrame, dict or None."
        )

    if not isinstance(exog, (pd.Series, pd.DataFrame, dict, type(None))):
        raise TypeError(
            "`exog` must be a pandas Series, DataFrame, dict or None."
        )

    set_skforecast_warnings(suppress_warnings, action='ignore')

    flag_out_of_range = False

    out_of_range_series = []
    out_of_range_series_ranges = []
    if last_window is not None:
        last_window = self._normalize_input(last_window, name="last_window")
        for key, value in last_window.items():
            if isinstance(value, pd.Series):
                value = value.to_frame()
            for col in value.columns:
                if key not in self.series_names_in_:
                    warnings.warn(
                        f"'{key}' was not seen during training. Its range is unknown.",
                        UnknownLevelWarning
                    )
                    continue
                is_out_of_range = self._check_feature_range(
                    feature_range=self.series_values_range_[col], X=value[col]
                )
                if is_out_of_range:
                    flag_out_of_range = True
                    out_of_range_series.append(col)
                    out_of_range_series_ranges.append(self.series_values_range_[col])
                    self._display_warnings(
                        not_compliant_feature = col,
                        feature_range         = self.series_values_range_[col],
                        series_name           = None
                    )

    out_of_range_exog = {} if self.series_specific_exog_ else []
    out_of_range_exog_ranges = {} if self.series_specific_exog_ else []
    if exog is not None:
        series_ids = list(last_window.keys()) if last_window is not None else self.series_names_in_
        exog = self._normalize_input(exog, name="exog", series_ids=series_ids)
        for key, value in exog.items():

            if isinstance(value, pd.Series):
                value = value.to_frame()
            features_ranges = self.exog_values_range_.get(key, None)

            if self.series_specific_exog_:
                out_of_range_exog[key] = []
                out_of_range_exog_ranges[key] = []

            for col in value.columns:

                if not isinstance(features_ranges, dict):
                    features_ranges = {key: features_ranges}

                if col not in self.exog_names_in_:
                    warnings.warn(
                        f"'{col}' was not seen during training. Its range is unknown.",
                        MissingExogWarning,
                    )
                    continue

                is_out_of_range = self._check_feature_range(
                    feature_range=features_ranges[col], X=value[col]
                )

                if is_out_of_range:

                    flag_out_of_range = True
                    if self.series_specific_exog_:
                        out_of_range_exog[key].append(col)
                        out_of_range_exog_ranges[key].append(features_ranges[col])
                    else:
                        out_of_range_exog.append(col)
                        out_of_range_exog_ranges.append(features_ranges[col])

                    self._display_warnings(
                        not_compliant_feature = col,
                        feature_range         = features_ranges[col],
                        series_name           = key if self.series_specific_exog_ else None,
                    )

            if self.series_specific_exog_ and not out_of_range_exog[key]:
                out_of_range_exog.pop(key)
                out_of_range_exog_ranges.pop(key)

    if verbose:
        self._summary(
            out_of_range_series        = out_of_range_series,
            out_of_range_series_ranges = out_of_range_series_ranges,
            out_of_range_exog          = out_of_range_exog,
            out_of_range_exog_ranges   = out_of_range_exog_ranges
        )

    set_skforecast_warnings(suppress_warnings, action='default')

    return flag_out_of_range, out_of_range_series, out_of_range_exog

skforecast.drift_detection._population_drift.PopulationDriftDetector

PopulationDriftDetector(chunk_size=None, threshold=0.95)

A class to detect population drift between reference and new datasets. This implementation computes Kolmogorov-Smirnov (KS) test for numeric features, Chi-Square test for categorical features, and Jensen-Shannon (JS) distance for all features. It calculates empirical distributions of these statistics from the reference data and uses quantile thresholds to determine drift in new data.

This implementation is inspired by NannyML's DriftDetector. See Notes for details.

For an in-depth explanation of the underlying calculations, see https://skforecast.org/0.18.0/user_guides/drift-detection.html#deep-dive-into-temporal-drift-detection-in-time-series

Parameters:

Name Type Description Default
chunk_size int, string, pandas DateOffset

Size of chunks for sequential drift analysis. If int, number of rows per chunk. If str (e.g., 'D' for daily, 'W' for weekly), time-based chunks assuming a datetime index. If None, analyzes the full dataset as a single chunk.

None
threshold float

The quantile threshold (between 0 and 1) for determining drift based on empirical distributions.

0.95

Attributes:

Name Type Description
chunk_size int, string, pandas DateOffset

Size of chunks for sequential drift analysis. If int, number of rows per chunk. If str (e.g., 'D' for daily, 'W' for weekly), time-based chunks assuming a datetime index. If None, analyzes the full dataset as a single chunk.

threshold float

The quantile threshold (between 0 and 1) for determining drift based on empirical distributions.

is_fitted_ bool

Indicates if the detector has been fitted with reference data.

ref_features_ list

List of features in the reference data.

empirical_dist_ks_ dict

Empirical distributions of KS test statistics for each numeric feature in reference data.

empirical_dist_chi2_ dict

Empirical distributions of Chi-Square test statistics for each categorical feature in reference data.

empirical_dist_js_ dict

Empirical distributions of Jensen-Shannon distance for each feature in reference data (numeric and categorical).

empirical_threshold_ks_ dict

Thresholds for KS statistics based on empirical distributions for each numeric feature in reference data.

empirical_threshold_chi2_ dict

Thresholds for Chi-Square statistics based on empirical distributions for each categorical feature in reference data.

empirical_threshold_js_ dict

Thresholds for Jensen-Shannon distance based on empirical distributions for each feature in reference data (numeric and categorical).

n_chunks_reference_data_ int

Number of chunks in the reference data used during fitting to compute empirical distributions.

ref_ecdf_ dict

Precomputed ECDFs for numeric features in the reference data.

ref_bins_edges_ dict

Precomputed bin edges for numeric features in the reference data.

ref_hist_ dict

Precomputed histograms for numeric features in the reference data.

ref_probs_ dict

Precomputed normalized value counts (probabilities) for each category of categorical features in the reference data.

ref_ranges_ dict

Min and max values for numeric features in the reference data.

ref_categories_ dict

Unique categories for categorical features in the reference data.

detectors_ dict

Dictionary of PopulationDriftDetector instances for each group when fitting/predicting on MultiIndex DataFrames.

series_names_in_ list

List of series IDs present during fitting when using MultiIndex DataFrames.

Notes

This implementation is inspired by NannyML's DriftDetector [1]_.

It is a lightweight version adapted for skforecast's needs: - It does not store the raw reference data, only the necessary precomputed information to calculate the statistics efficiently during prediction. - All empirical thresholds are calculated using the specified quantile from the empirical distributions obtained from the reference data chunks. - It includes checks for out of range values in numeric features and new categories in categorical features. - It supports multiple time series by fitting separate detectors for each series ID when provided with a MultiIndex DataFrame.

If user requires more advanced features, such as multivariate drift detection or data quality checks, consider using https://nannyml.readthedocs.io/en/stable/ directly.

References

.. [1] NannyML API Reference. https://nannyml.readthedocs.io/en/stable/tutorials/detecting_data_drift/univariate_drift_detection.html

Methods:

Name Description
fit

Fit the drift detector by calculating empirical distributions and thresholds

predict

Predict drift in new data by comparing the estimated statistics to

Source code in skforecast\drift_detection\_population_drift.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
def __init__(
    self, 
    chunk_size=None, 
    threshold=0.95
) -> None:

    self.ref_features_             = None
    self.is_fitted_                = False
    self.ref_ecdf_                 = {}
    self.ref_bins_edges_           = {}
    self.ref_hist_                 = {}
    self.ref_probs_                = {}
    self.ref_counts_               = {}
    self.empirical_dist_ks_        = {}
    self.empirical_dist_chi2_      = {}
    self.empirical_dist_js_        = {}
    self.empirical_threshold_ks_   = {}
    self.empirical_threshold_chi2_ = {}
    self.empirical_threshold_js_   = {}
    self.ref_ranges_               = {}
    self.ref_categories_           = {}
    self.n_chunks_reference_data_  = None
    self.detectors_                = {}    # NOTE: Only used for multiseries
    self.series_names_in_          = None  # NOTE: Only used for multiseries

    if not (0 < threshold < 1):
        raise ValueError(f"`threshold` must be between 0 and 1. Got {threshold}.")

    self.threshold = threshold

    error_msg = (
        "`chunk_size` must be a positive integer, a string compatible with "
        "pandas DateOffset (e.g., 'D', 'W', 'M'), a pandas DateOffset object, or None."
    )
    if not (isinstance(chunk_size, (int, str, pd.DateOffset, type(None)))):
        raise TypeError(f"{error_msg} Got {type(chunk_size)}.")

    if isinstance(chunk_size, str):
        try:
            chunk_size = pd.tseries.frequencies.to_offset(chunk_size)
        except ValueError:
            raise ValueError(f"{error_msg} Got {type(chunk_size)}.")

    if isinstance(chunk_size, int) and chunk_size <= 0:
        raise ValueError(f"{error_msg} Got {chunk_size}.")

    self.chunk_size = chunk_size

ref_features_ instance-attribute

ref_features_ = None

is_fitted_ instance-attribute

is_fitted_ = False

ref_ecdf_ instance-attribute

ref_ecdf_ = {}

ref_bins_edges_ instance-attribute

ref_bins_edges_ = {}

ref_hist_ instance-attribute

ref_hist_ = {}

ref_probs_ instance-attribute

ref_probs_ = {}

ref_counts_ instance-attribute

ref_counts_ = {}

empirical_dist_ks_ instance-attribute

empirical_dist_ks_ = {}

empirical_dist_chi2_ instance-attribute

empirical_dist_chi2_ = {}

empirical_dist_js_ instance-attribute

empirical_dist_js_ = {}

empirical_threshold_ks_ instance-attribute

empirical_threshold_ks_ = {}

empirical_threshold_chi2_ instance-attribute

empirical_threshold_chi2_ = {}

empirical_threshold_js_ instance-attribute

empirical_threshold_js_ = {}

ref_ranges_ instance-attribute

ref_ranges_ = {}

ref_categories_ instance-attribute

ref_categories_ = {}

n_chunks_reference_data_ instance-attribute

n_chunks_reference_data_ = None

detectors_ instance-attribute

detectors_ = {}

series_names_in_ instance-attribute

series_names_in_ = None

threshold instance-attribute

threshold = threshold

chunk_size instance-attribute

chunk_size = chunk_size

_repr_html_

_repr_html_()

HTML representation of the object. The "General Information" section is expanded by default.

Source code in skforecast\drift_detection\_population_drift.py
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
def _repr_html_(self):
    """
    HTML representation of the object.
    The "General Information" section is expanded by default.
    """

    style, unique_id = get_style_repr_html(self.is_fitted_)
    content = f"""
    <div class="container-{unique_id}">
        <p style="font-size: 1.5em; font-weight: bold; margin-block-start: 0.83em; margin-block-end: 0.83em;">{type(self).__name__}</p>
        <details>
            <summary>General Information</summary>
            <ul>
                <li><strong>Fitted features:</strong> {self.ref_features_}</li>
                <li><strong>Is fitted:</strong> {self.is_fitted_}</li>
            </ul>
        </details>
        <p>
            <a href="https://skforecast.org/{__version__}/api/drift_detection.html#skforecast.drift_detection._population_drift.PopulationDriftDetector">&#128712 <strong>API Reference</strong></a>
            &nbsp;&nbsp;
            <a href="https://skforecast.org/{__version__}/user_guides/drift-detection.html">&#128462 <strong>User Guide</strong></a>
        </p>
    </div>
    """

    return style + content

_fit

_fit(X)

Fit the drift detector by calculating empirical distributions and thresholds from reference data. The empirical distributions are computed by chunking the reference data according to the specified chunk_size and calculating the statistics for each chunk.

Parameters:

Name Type Description Default
X pandas DataFrame

Reference data used as the baseline for drift detection.

required
Source code in skforecast\drift_detection\_population_drift.py
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
def _fit(self, X) -> None:
    """
    Fit the drift detector by calculating empirical distributions and thresholds
    from reference data. The empirical distributions are computed by chunking
    the reference data according to the specified `chunk_size` and calculating
    the statistics for each chunk.

    Parameters
    ----------
    X : pandas DataFrame
        Reference data used as the baseline for drift detection.

    """

    self.ref_features_             = []
    self.is_fitted_                = False
    self.ref_ecdf_                 = {}
    self.ref_bins_edges_           = {}
    self.ref_hist_                 = {}
    self.ref_probs_                = {}
    self.ref_counts_               = {}
    self.empirical_dist_ks_        = {}
    self.empirical_dist_chi2_      = {}
    self.empirical_dist_js_        = {}
    self.empirical_threshold_ks_   = {}
    self.empirical_threshold_chi2_ = {}
    self.empirical_threshold_js_   = {}
    self.ref_ranges_               = {}
    self.ref_categories_           = {}
    self.n_chunks_reference_data_  = None
    self.detectors_                = {}    # NOTE: Only used for multiseries
    self.series_names_in_          = None  # NOTE: Only used for multiseries

    if self.chunk_size is not None:
        if isinstance(self.chunk_size, pd.offsets.DateOffset) and not isinstance(X.index, pd.DatetimeIndex):
            raise ValueError(
                "`chunk_size` is a pandas DateOffset but `X` does not have a DatetimeIndex."
            )

    if self.chunk_size is not None:
        if isinstance(self.chunk_size, int):
            chunks_ref = [
                X.iloc[i : i + self.chunk_size]
                for i in range(0, len(X), self.chunk_size)
            ]
        elif isinstance(
            self.chunk_size, (str, pd.offsets.DateOffset)
        ) and isinstance(X.index, pd.DatetimeIndex):
            chunks_ref = [group for _, group in X.resample(self.chunk_size)]
    else:
        chunks_ref = [X]

    self.n_chunks_reference_data_ = len(chunks_ref)

    features = X.columns.tolist()
    for feature in features:
        is_numeric = pd.api.types.is_numeric_dtype(X[feature])
        ref = X[feature].dropna()
        if ref.empty:
            warnings.warn(
                f"Feature '{feature}' contains only NaN values in the reference dataset. "
                f"Drift detection skipped.",
                UnknownLevelWarning
            )
            continue

        self.empirical_dist_ks_[feature] = []
        self.empirical_dist_chi2_[feature] = []
        self.empirical_dist_js_[feature] = []
        self.ref_features_.append(feature)

        if is_numeric:
            # Precompute histogram with bins for Jensen-Shannon distance
            # This may not perfectly align with bins used in predict if new data
            # extends the range, but it provides a reasonable approximation
            # for efficiency.
            min_val = ref.min()
            max_val = ref.max()
            bins_edges = np.histogram_bin_edges(ref.astype("float64"), bins='doane')
            ref_hist = np.histogram(ref, bins=bins_edges)[0] / len(ref)
            self.ref_bins_edges_[feature] = bins_edges
            self.ref_hist_[feature] = ref_hist
            self.ref_ranges_[feature] = (min_val, max_val)

            # Precompute ECDF for Kolmogorov-Smirnov test
            self.ref_ecdf_[feature] = ecdf(ref)
        else:
            counts_raw = ref.value_counts()
            counts_norm = counts_raw / counts_raw.sum()
            self.ref_counts_[feature] = counts_raw
            self.ref_probs_[feature] = counts_norm
            self.ref_categories_[feature] = counts_raw.index.tolist()

        for chunk in chunks_ref:
            new = chunk[feature].dropna()
            if new.empty:
                continue
            ref = ref[~ref.index.isin(new.index)]
            ks_stat = np.nan
            chi2_stat = np.nan
            js_distance = np.nan

            if is_numeric:
                new_ecdf = ecdf(new)
                new_hist = np.histogram(new, bins=self.ref_bins_edges_[feature])[0] / len(new)
                # Handle out-of-bin data: if new data contains values outside the reference range,
                # they will not be counted in the histogram, leading to a sum < 1. To ensure
                # the histograms are comparable, we add an extra bin for "out-of-range" data with
                # the leftover probability mass in the new histogram and a corresponding zero bin
                # in the reference histogram.
                leftover = 1 - np.sum(new_hist)
                if leftover > 0:
                    new_hist = np.append(new_hist, leftover)
                    ref_hist_appended = np.append(self.ref_hist_[feature], 0)
                    js_distance = jensenshannon(ref_hist_appended, new_hist, base=2)
                else:
                    js_distance = jensenshannon(self.ref_hist_[feature], new_hist, base=2)

                ks_stat = ks_2samp_from_ecdf(
                    ecdf1=self.ref_ecdf_.get(feature),
                    ecdf2=new_ecdf,
                    alternative="two-sided"
                )
            else:
                new_probs = new.value_counts(normalize=True).sort_index()
                ref_probs = self.ref_probs_.get(feature)
                # Align categories and fill missing with 0
                all_cats = ref_probs.index.union(new_probs.index)
                ref_probs = ref_probs.reindex(all_cats, fill_value=0)
                new_probs = new_probs.reindex(all_cats, fill_value=0)
                js_distance = jensenshannon(ref_probs.to_numpy(), new_probs.to_numpy())

                # Align categories and fill missing with 0
                new_counts = new.value_counts().reindex(all_cats, fill_value=0).to_numpy()
                ref_counts = self.ref_counts_.get(feature).reindex(all_cats, fill_value=0).to_numpy()
                if new_counts.sum() > 0 and ref_counts.sum() > 0:
                    # Create contingency table with rows = [reference, new], columns = categories
                    contingency_table = np.array([ref_counts, new_counts])
                    chi2_stat = chi2_contingency(contingency_table)[0]

            self.empirical_dist_ks_[feature].append(ks_stat)
            self.empirical_dist_chi2_[feature].append(chi2_stat)
            self.empirical_dist_js_[feature].append(js_distance)

        # Calculate empirical thresholds using the the specified quantile
        # Using pandas Series quantile method to handle NaNs properly and warnings
        self.empirical_threshold_ks_[feature] = pd.Series(
            self.empirical_dist_ks_[feature]
        ).quantile(self.threshold)
        self.empirical_threshold_chi2_[feature] = pd.Series(
            self.empirical_dist_chi2_[feature]
        ).quantile(self.threshold)
        self.empirical_threshold_js_[feature] = pd.Series(
            self.empirical_dist_js_[feature]
        ).quantile(self.threshold)

    self.is_fitted_ = True

fit

fit(X)

Fit the drift detector by calculating empirical distributions and thresholds from reference data. The empirical distributions are computed by chunking the reference data according to the specified chunk_size and calculating the statistics for each chunk.

Parameters:

Name Type Description Default
X pandas DataFrame

Reference data used as the baseline for drift detection.

required
Source code in skforecast\drift_detection\_population_drift.py
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
def fit(self, X) -> None:
    """
    Fit the drift detector by calculating empirical distributions and thresholds
    from reference data. The empirical distributions are computed by chunking
    the reference data according to the specified `chunk_size` and calculating
    the statistics for each chunk.

    Parameters
    ----------
    X : pandas DataFrame
        Reference data used as the baseline for drift detection.

    """

    if not isinstance(X, pd.DataFrame):
        raise ValueError(
            f"`X` must be a pandas DataFrame. Got {type(X)} instead."
        )

    if isinstance(X.index, pd.MultiIndex):
        X = X.groupby(level=0)

        for idx, group in X:
            group = group.droplevel(0)
            self.detectors_[idx] = PopulationDriftDetector(
                                       chunk_size = self.chunk_size,
                                       threshold  = self.threshold
                                   )
            self.detectors_[idx]._fit(group)
    else:
        self._fit(X)

    self.is_fitted_ = True
    self.series_names_in_ = list(self.detectors_.keys()) if self.detectors_ else None
    self._collect_attributes()

_predict

_predict(X)

Predict drift in new data by comparing the estimated statistics to reference thresholds.

Parameters:

Name Type Description Default
X pandas DataFrame

New data to compare against the reference.

required

Returns:

Name Type Description
results pandas DataFrame

DataFrame with the drift detection results for each chunk.

Source code in skforecast\drift_detection\_population_drift.py
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
def _predict(self, X) -> pd.DataFrame:
    """
    Predict drift in new data by comparing the estimated statistics to
    reference thresholds.

    Parameters
    ----------
    X : pandas DataFrame
        New data to compare against the reference.

    Returns
    -------
    results : pandas DataFrame
        DataFrame with the drift detection results for each chunk.

    """

    if self.chunk_size is not None:
        if isinstance(self.chunk_size, pd.offsets.DateOffset) and not isinstance(X.index, pd.DatetimeIndex):
            raise ValueError(
                "`chunk_size` is a pandas DateOffset but `X` does not have a DatetimeIndex."
            )

    if self.chunk_size is not None:
        if isinstance(self.chunk_size, int):
            chunks = [
                X.iloc[i:i + self.chunk_size]
                for i in range(0, len(X), self.chunk_size)
            ]
        else:
            chunks = [group for _, group in X.resample(self.chunk_size)]
    else:
        chunks = [X]

    results = []
    features = X.columns.tolist()
    for feature in features:
        if feature not in self.ref_features_:
            warnings.warn(
                f"Feature '{feature}' was not present during fitting. Drift detection skipped."
                f"for this feature.",
                UnknownLevelWarning
            )
            continue

        is_numeric = pd.api.types.is_numeric_dtype(X[feature])
        ref_bin_edges = self.ref_bins_edges_.get(feature, None)
        ref_hist = self.ref_hist_.get(feature, None)
        ref_probs = self.ref_probs_.get(feature, None)
        ref_counts = self.ref_counts_.get(feature, None)
        ref_ecdf = self.ref_ecdf_.get(feature, None)
        threshold_ks = self.empirical_threshold_ks_.get(feature, np.nan)
        threshold_chi2 = self.empirical_threshold_chi2_.get(feature, np.nan)
        threshold_js = self.empirical_threshold_js_.get(feature, np.nan)
        ref_range = self.ref_ranges_.get(feature, (np.nan, np.nan))

        for chunk_idx, chunk in enumerate(chunks):

            chunk_label = chunk_idx if self.chunk_size else "full"
            new = chunk[feature].dropna()
            ks_stat = np.nan
            chi2_stat = np.nan
            js_distance = np.nan
            is_out_of_range = np.nan

            if not new.empty:

                if is_numeric:
                    new_ecdf = ecdf(new)
                    # Compute histogram for new data using reference bin edges and normalize
                    new_hist = np.histogram(new, bins=ref_bin_edges)[0] / len(new)
                    # Handle out-of-bin data: if new data contains values outside the reference range,
                    # they will not be counted in the histogram, leading to a sum < 1. To ensure
                    # the histograms are comparable, we add an extra bin for "out-of-range" data with
                    # the leftover probability mass in the new histogram and a corresponding zero bin
                    # in the reference histogram.
                    leftover = 1 - np.sum(new_hist)
                    if leftover > 0:
                        new_hist = np.append(new_hist, leftover)
                        ref_hist_appended = np.append(ref_hist, 0)
                        js_distance = jensenshannon(ref_hist_appended, new_hist, base=2)
                    else:
                        js_distance = jensenshannon(ref_hist, new_hist, base=2)

                    ks_stat = ks_2samp_from_ecdf(
                        ecdf1=ref_ecdf,
                        ecdf2=new_ecdf,
                        alternative="two-sided"
                    )
                    is_out_of_range = (
                        np.min(new) < ref_range[0] or
                        np.max(new) > ref_range[1]
                    )
                else:
                    ref_categories = self.ref_categories_[feature]
                    ref_probs_ = ref_probs.reindex(ref_categories, fill_value=0).to_numpy()
                    # Map new data to reference categories
                    new_counts_dict = new.value_counts().to_dict()
                    new_counts_on_ref = [new_counts_dict.get(cat, 0) for cat in ref_categories]
                    new_probs = (
                        np.array(new_counts_on_ref) / len(new) if len(new) > 0
                        else np.zeros(len(ref_categories))
                    )
                    # Compute leftover (probability of new categories not in reference): if new data
                    # contains categories not seen in reference, they will not be counted in the
                    # histogram, leading to a sum < 1. To ensure the histograms are comparable,
                    # we add an extra bin for "new categories" with the leftover probability mass
                    # in the new histogram and a corresponding zero bin in the reference histogram.
                    leftover = 1 - np.sum(new_probs)
                    if leftover > 0:
                        new_probs = np.append(new_probs, leftover)
                        ref_probs_appended = np.append(ref_probs_, 0)
                        js_distance = jensenshannon(ref_probs_appended, new_probs, base=2)
                    else:
                        js_distance = jensenshannon(ref_probs_, new_probs, base=2)

                    all_cats = set(self.ref_categories_[feature]).union(set(new_counts_dict.keys()))
                    new_counts = new.value_counts().reindex(all_cats, fill_value=0).to_numpy()
                    ref_counts_aligned = ref_counts.reindex(all_cats, fill_value=0).to_numpy()
                    if new_counts.sum() > 0 and ref_counts_aligned.sum() > 0:
                        # Create contingency table: rows = [reference, new], columns = categories
                        contingency_table = np.array([ref_counts_aligned, new_counts])
                        chi2_stat = chi2_contingency(contingency_table)[0]

            results.append({
                "chunk": chunk_label,
                "chunk_start": chunk.index.min(),
                "chunk_end": chunk.index.max(),
                "feature": feature,
                "ks_statistic": ks_stat,
                "threshold_ks": threshold_ks,
                "chi2_statistic": chi2_stat,
                "threshold_chi2": threshold_chi2,
                "jensen_shannon": js_distance,
                "threshold_js": threshold_js,
                "reference_range": ref_range,
                "is_out_of_range": is_out_of_range,
            })

    results_df = pd.DataFrame(results)
    results_df['drift_ks_statistic'] = results_df['ks_statistic'] > results_df['threshold_ks']
    results_df['drift_chi2_statistic'] = results_df['chi2_statistic'] > results_df['threshold_chi2']
    results_df['drift_js'] = results_df['jensen_shannon'] > results_df['threshold_js']
    results_df['drift_detected'] = (
        results_df['drift_ks_statistic']
        | results_df['drift_chi2_statistic']
        | results_df['drift_js']
        | results_df['is_out_of_range']
    )

    return results_df

predict

predict(X)

Predict drift in new data by comparing the estimated statistics to reference thresholds. Two dataframes are returned, the first one with detailed information of each chunk, the second only the total number of chunks where drift have been detected.

Parameters:

Name Type Description Default
X pandas DataFrame

New data to compare against the reference.

required

Returns:

Name Type Description
results pandas DataFrame

DataFrame with the drift detection results for each chunk.

summary pandas DataFrame

Summary DataFrame with the total number and percentage of chunks with detected drift per feature (or per series_id and feature if MultiIndex).

Source code in skforecast\drift_detection\_population_drift.py
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
def predict(self, X) -> tuple[pd.DataFrame, pd.DataFrame]:
    """
    Predict drift in new data by comparing the estimated statistics to
    reference thresholds. Two dataframes are returned, the first one with
    detailed information of each chunk, the second only the total number
    of chunks where drift have been detected.

    Parameters
    ----------
    X : pandas DataFrame
        New data to compare against the reference.

    Returns
    -------
    results : pandas DataFrame
        DataFrame with the drift detection results for each chunk.
    summary : pandas DataFrame
        Summary DataFrame with the total number and percentage of chunks
        with detected drift per feature (or per series_id and feature if
        MultiIndex).

    """

    if not self.is_fitted_:
        raise NotFittedError(
            "This PopulationDriftDetector instance is not fitted yet. "
            "Call 'fit' with appropriate arguments before using this estimator."
        )

    if not isinstance(X, pd.DataFrame):
        raise ValueError(f"`X` must be a pandas DataFrame. Got {type(X)} instead.")

    if isinstance(X.index, pd.MultiIndex):
        results = []
        for idx, group in X.groupby(level=0):
            group = group.droplevel(0)
            if idx not in self.detectors_:
                warnings.warn(
                    f"Series '{idx}' was not present during fitting. Drift detection skipped.",
                    UnknownLevelWarning
                )
                continue

            detector = self.detectors_[idx]
            result = detector._predict(group)
            result.insert(0, 'series_id', idx)
            results.append(result)

        results = pd.concat(results, ignore_index=True)
    else:
        results = self._predict(X)

    if results.columns[0] == 'series_id':
        summary = (
            results.groupby(['series_id', 'feature'])['drift_detected']
            .agg(['sum', 'mean'])
            .reset_index()
            .rename(columns={'sum': 'n_chunks_with_drift', 'mean': 'pct_chunks_with_drift'})
        )
    else:
        summary = (
            results.groupby(['feature'])['drift_detected']
            .agg(['sum', 'mean'])
            .reset_index()
            .rename(columns={'sum': 'n_chunks_with_drift', 'mean': 'pct_chunks_with_drift'})
        )
    summary['pct_chunks_with_drift'] = summary['pct_chunks_with_drift'] * 100

    return results, summary

_collect_attributes

_collect_attributes()

Collect attributes for representation and inspection and update the instance dictionary with the collected values. For multi-series (when detectors_ is populated), attributes are aggregated into nested dictionaries keyed by detector names. For single-series, attributes remain unchanged.

Parameters:

Name Type Description Default
self
required

Returns:

Type Description
None
Source code in skforecast\drift_detection\_population_drift.py
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
def _collect_attributes(self) -> None:
    """
    Collect attributes for representation and inspection and update the instance
    dictionary with the collected values. For multi-series (when detectors_ is
    populated), attributes are aggregated into nested dictionaries keyed by
    detector names. For single-series, attributes remain unchanged.

    Parameters
    ----------
    self

    Returns
    -------
    None

    """

    attr_names = [
        k 
        for k in self.__dict__.keys() 
        if k not in ['is_fitted_', 'detectors_', 'series_names_in_']
    ]

    if self.detectors_:
        for attr_name in attr_names:
            collected = {}
            for detector_key, detector in self.detectors_.items():
                collected[detector_key] = getattr(detector, attr_name, None)
            self.__dict__[attr_name] = deepcopy(collected)