Skip to content

Оценки как внутренние компоненты детектора

Детекторы - это просто менеджеры над объектами оценок. Они представляют собой способы оценки каждой строки в каждой подвыборке.

Warning

Оценки имеют ограниченную поддержку типов данных.

applybn реализует две основные группы оценок: на основе модели и на основе близости.

Подробнее читайте в Руководстве пользователя.


Оценка

Bases: ABC

An abstract base class for implementing scoring mechanisms.

Source code in applybn/anomaly_detection/scores/score.py
class Score(ABC):
    """
    An abstract base class for implementing scoring mechanisms.
    """

    def __init__(self, verbose: int = 1):
        """
        Initializes the Score object.

        Args:
            verbose: The verbosity level for logging. Default is 1.
        """
        self.verbose = verbose

    @abstractmethod
    def score(self, X: pd.DataFrame):
        """
        Abstract method to compute scores for the given input data.

        Args:
            X: The input data to be scored.

        Raises:
            NotImplementedError: This method must be implemented by subclasses.
        """
        pass

__init__(verbose=1)

Initializes the Score object.

Parameters:

Name Type Description Default
verbose int

The verbosity level for logging. Default is 1.

1
Source code in applybn/anomaly_detection/scores/score.py
def __init__(self, verbose: int = 1):
    """
    Initializes the Score object.

    Args:
        verbose: The verbosity level for logging. Default is 1.
    """
    self.verbose = verbose

score(X) abstractmethod

Abstract method to compute scores for the given input data.

Parameters:

Name Type Description Default
X DataFrame

The input data to be scored.

required

Raises:

Type Description
NotImplementedError

This method must be implemented by subclasses.

Source code in applybn/anomaly_detection/scores/score.py
@abstractmethod
def score(self, X: pd.DataFrame):
    """
    Abstract method to compute scores for the given input data.

    Args:
        X: The input data to be scored.

    Raises:
        NotImplementedError: This method must be implemented by subclasses.
    """
    pass

Оценки на основе близости

Оценка локальных выбросов

Bases: ProximityBasedScore

A class for computing outlier scores using the Local Outlier Factor (LOF) algorithm.

Source code in applybn/anomaly_detection/scores/proximity_based.py
class LocalOutlierScore(ProximityBasedScore):
    """
    A class for computing outlier scores using the Local Outlier Factor (LOF) algorithm.
    """

    def __init__(self, proximity_steps: int = 5, verbose: int = 1, **kwargs):
        """
        Initializes the LocalOutlierScore object.

        Args:
            proximity_steps: The number of proximity steps to perform. Default is 5.
            verbose: The verbosity level for logging. Default is 1.
            **kwargs: Additional parameters for the Local Outlier Factor algorithm.
        """
        super().__init__(proximity_steps=proximity_steps, verbose=verbose)
        self.params = kwargs

    def local_score(self, X: pd.DataFrame):
        """
        Computes the local outlier scores for the given data using the LOF algorithm.

        Args:
            X: The input data.

        Returns:
            np.ndarray: An array of negative outlier factors, where higher values indicate more abnormal data points.
        """
        clf = LocalOutlierFactor(**self.params)
        clf.fit(X)
        # The higher the value, the more abnormal the data point
        return np.negative(clf.negative_outlier_factor_)

__init__(proximity_steps=5, verbose=1, **kwargs)

Initializes the LocalOutlierScore object.

Parameters:

Name Type Description Default
proximity_steps int

The number of proximity steps to perform. Default is 5.

5
verbose int

The verbosity level for logging. Default is 1.

1
**kwargs

Additional parameters for the Local Outlier Factor algorithm.

{}
Source code in applybn/anomaly_detection/scores/proximity_based.py
def __init__(self, proximity_steps: int = 5, verbose: int = 1, **kwargs):
    """
    Initializes the LocalOutlierScore object.

    Args:
        proximity_steps: The number of proximity steps to perform. Default is 5.
        verbose: The verbosity level for logging. Default is 1.
        **kwargs: Additional parameters for the Local Outlier Factor algorithm.
    """
    super().__init__(proximity_steps=proximity_steps, verbose=verbose)
    self.params = kwargs

local_score(X)

Computes the local outlier scores for the given data using the LOF algorithm.

Parameters:

Name Type Description Default
X DataFrame

The input data.

required

Returns:

Type Description

np.ndarray: An array of negative outlier factors, where higher values indicate more abnormal data points.

Source code in applybn/anomaly_detection/scores/proximity_based.py
def local_score(self, X: pd.DataFrame):
    """
    Computes the local outlier scores for the given data using the LOF algorithm.

    Args:
        X: The input data.

    Returns:
        np.ndarray: An array of negative outlier factors, where higher values indicate more abnormal data points.
    """
    clf = LocalOutlierFactor(**self.params)
    clf.fit(X)
    # The higher the value, the more abnormal the data point
    return np.negative(clf.negative_outlier_factor_)

Оценка на основе Isolation Forest

Bases: ProximityBasedScore

A class for computing outlier scores using the Isolation Forest algorithm.

Source code in applybn/anomaly_detection/scores/proximity_based.py
class IsolationForestScore(ProximityBasedScore):
    """
    A class for computing outlier scores using the Isolation Forest algorithm.
    """

    def __init__(self, proximity_steps: int = 5, verbose: int = 1, **kwargs):
        """
        Initializes the IsolationForestScore object.

        Args:
            **kwargs: Additional parameters for the Isolation Forest algorithm.
        """
        super().__init__(verbose=verbose, proximity_steps=proximity_steps)
        self.params = kwargs

    def local_score(self, X: pd.DataFrame):
        """
        Computes the outlier scores for the given data using the Isolation Forest algorithm.

        Args:
            X: The input data.

        Returns:
            np.ndarray: An array of negative decision function values, where higher values indicate more abnormal data points.
        """
        clf = IsolationForest(**self.params)
        clf.fit(X)
        return np.negative(clf.decision_function(X))

__init__(proximity_steps=5, verbose=1, **kwargs)

Initializes the IsolationForestScore object.

Parameters:

Name Type Description Default
**kwargs

Additional parameters for the Isolation Forest algorithm.

{}
Source code in applybn/anomaly_detection/scores/proximity_based.py
def __init__(self, proximity_steps: int = 5, verbose: int = 1, **kwargs):
    """
    Initializes the IsolationForestScore object.

    Args:
        **kwargs: Additional parameters for the Isolation Forest algorithm.
    """
    super().__init__(verbose=verbose, proximity_steps=proximity_steps)
    self.params = kwargs

local_score(X)

Computes the outlier scores for the given data using the Isolation Forest algorithm.

Parameters:

Name Type Description Default
X DataFrame

The input data.

required

Returns:

Type Description

np.ndarray: An array of negative decision function values, where higher values indicate more abnormal data points.

Source code in applybn/anomaly_detection/scores/proximity_based.py
def local_score(self, X: pd.DataFrame):
    """
    Computes the outlier scores for the given data using the Isolation Forest algorithm.

    Args:
        X: The input data.

    Returns:
        np.ndarray: An array of negative decision function values, where higher values indicate more abnormal data points.
    """
    clf = IsolationForest(**self.params)
    clf.fit(X)
    return np.negative(clf.decision_function(X))

Оценки на основе модели

Bases: Score

A generic score class that computes scores based on a provided model. Model must implement the predict_proba method.

Source code in applybn/anomaly_detection/scores/model_based.py
class ModelBasedScore(Score):
    """
    A generic score class that computes scores based on a provided model.
    Model must implement the predict_proba method.
    """

    def __init__(self, model):
        """
        Initializes the ModelBasedScore object.

        Args:
            model: The model used to compute probabilities for scoring.
        """
        super().__init__()
        self.model = model

    def score(self, X) -> np.ndarray:
        """
        Computes the score for the input data using the model's predicted probabilities.

        Args:
            X: The input data to be scored.

        Returns:
            np.ndarray: The predicted probabilities for the input data.
        """
        if not hasattr(self.model, "predict_proba"):
            raise AttributeError("The model does not have a predict_proba method.")
        probas = self.model.predict_proba(X)

        if isinstance(probas, pd.Series):
            return probas.values
        if isinstance(probas, np.ndarray):
            return probas

__init__(model)

Initializes the ModelBasedScore object.

Parameters:

Name Type Description Default
model

The model used to compute probabilities for scoring.

required
Source code in applybn/anomaly_detection/scores/model_based.py
def __init__(self, model):
    """
    Initializes the ModelBasedScore object.

    Args:
        model: The model used to compute probabilities for scoring.
    """
    super().__init__()
    self.model = model

score(X)

Computes the score for the input data using the model's predicted probabilities.

Parameters:

Name Type Description Default
X

The input data to be scored.

required

Returns:

Type Description
ndarray

np.ndarray: The predicted probabilities for the input data.

Source code in applybn/anomaly_detection/scores/model_based.py
def score(self, X) -> np.ndarray:
    """
    Computes the score for the input data using the model's predicted probabilities.

    Args:
        X: The input data to be scored.

    Returns:
        np.ndarray: The predicted probabilities for the input data.
    """
    if not hasattr(self.model, "predict_proba"):
        raise AttributeError("The model does not have a predict_proba method.")
    probas = self.model.predict_proba(X)

    if isinstance(probas, pd.Series):
        return probas.values
    if isinstance(probas, np.ndarray):
        return probas

Оценка на основе BN

Bases: Score

A score class based on a Bayesian network (BN).

Attributes:

Name Type Description
bn

The Bayesian network used for scoring.

encoding

The encoding for discrete variables.

child_nodes

The child nodes in the Bayesian network.

verbose

The verbosity level for logging.

Source code in applybn/anomaly_detection/scores/model_based.py
class BNBasedScore(Score):
    """
    A score class based on a Bayesian network (BN).

    Attributes:
        bn: The Bayesian network used for scoring.
        encoding: The encoding for discrete variables.
        child_nodes: The child nodes in the Bayesian network.
        verbose: The verbosity level for logging.
    """

    def __init__(self, bn: bamt_network, encoding: dict, verbose: int = 1):
        """
        Initializes the BNBasedScore object.

        Args:
            bn: The Bayesian network used for scoring.
            encoding: The encoding for discrete variables.
            verbose: The verbosity level for logging.
        """
        super().__init__(verbose=verbose)
        self.encoding = encoding
        self.bn = bn

        child_nodes = []
        for column in bn.nodes_names:
            if self.bn[column].disc_parents + self.bn[column].cont_parents:
                child_nodes.append(column)

        self.child_nodes = child_nodes

    def local_score(self, X: pd.DataFrame, node_name: str):
        """
        Computes the local score for a specific node in the Bayesian network.

        Args:
            X: The input data.
            node_name : The name of the node to compute the score for.

        Returns:
            np.ndarray: An array of local scores for the specified node.
        """
        node = self.bn[node_name]
        diff = []
        parents = node.cont_parents + node.disc_parents
        parent_dtypes = X[parents].dtypes.to_dict()

        for i in X.index:
            node_value = X.loc[i, node_name]
            row_df = X.loc[[i], parents].astype(parent_dtypes)
            pvalues = row_df.to_dict("records")[0]
            cond_dist = self.bn.get_dist(node_name, pvals=pvalues)

            if "gaussian" in cond_dist.node_type:
                cond_mean, std = cond_dist.get()
            else:
                probs, classes = cond_dist.get()
                match self.bn.descriptor["types"][node_name]:
                    case "disc_num":
                        classes_ = [int(class_name) for class_name in classes]
                    case "disc":
                        classes_ = np.asarray(
                            [
                                self.encoding[node_name][class_name]
                                for class_name in classes
                            ]
                        )
                cond_mean = classes_ @ np.asarray(probs).T

            match self.bn.descriptor["types"][node_name]:
                case "disc_num":
                    diff.append((node_value - cond_mean))
                case "disc":
                    diff.append(self.encoding[node_name][node_value] - cond_mean)
                case "cont":
                    diff.append((node_value - cond_mean) / std)

        return np.asarray(diff).reshape(-1, 1)

    def score(self, X: pd.DataFrame):
        """
        Computes the scores for all child nodes in the Bayesian network.

        Args:
            X: The input data.

        Returns:
            np.ndarray: A 2D array of scores for all child nodes.
        """
        if self.verbose >= 1:
            model_iterator = tqdm(self.child_nodes, desc="Model")
        else:
            model_iterator = self.child_nodes

        model_factors = []
        for child_node in model_iterator:
            model_factors.append(self.local_score(X, child_node))

        return np.hstack(model_factors)

__init__(bn, encoding, verbose=1)

Initializes the BNBasedScore object.

Parameters:

Name Type Description Default
bn bamt_network

The Bayesian network used for scoring.

required
encoding dict

The encoding for discrete variables.

required
verbose int

The verbosity level for logging.

1
Source code in applybn/anomaly_detection/scores/model_based.py
def __init__(self, bn: bamt_network, encoding: dict, verbose: int = 1):
    """
    Initializes the BNBasedScore object.

    Args:
        bn: The Bayesian network used for scoring.
        encoding: The encoding for discrete variables.
        verbose: The verbosity level for logging.
    """
    super().__init__(verbose=verbose)
    self.encoding = encoding
    self.bn = bn

    child_nodes = []
    for column in bn.nodes_names:
        if self.bn[column].disc_parents + self.bn[column].cont_parents:
            child_nodes.append(column)

    self.child_nodes = child_nodes

local_score(X, node_name)

Computes the local score for a specific node in the Bayesian network.

Parameters:

Name Type Description Default
X DataFrame

The input data.

required
node_name

The name of the node to compute the score for.

required

Returns:

Type Description

np.ndarray: An array of local scores for the specified node.

Source code in applybn/anomaly_detection/scores/model_based.py
def local_score(self, X: pd.DataFrame, node_name: str):
    """
    Computes the local score for a specific node in the Bayesian network.

    Args:
        X: The input data.
        node_name : The name of the node to compute the score for.

    Returns:
        np.ndarray: An array of local scores for the specified node.
    """
    node = self.bn[node_name]
    diff = []
    parents = node.cont_parents + node.disc_parents
    parent_dtypes = X[parents].dtypes.to_dict()

    for i in X.index:
        node_value = X.loc[i, node_name]
        row_df = X.loc[[i], parents].astype(parent_dtypes)
        pvalues = row_df.to_dict("records")[0]
        cond_dist = self.bn.get_dist(node_name, pvals=pvalues)

        if "gaussian" in cond_dist.node_type:
            cond_mean, std = cond_dist.get()
        else:
            probs, classes = cond_dist.get()
            match self.bn.descriptor["types"][node_name]:
                case "disc_num":
                    classes_ = [int(class_name) for class_name in classes]
                case "disc":
                    classes_ = np.asarray(
                        [
                            self.encoding[node_name][class_name]
                            for class_name in classes
                        ]
                    )
            cond_mean = classes_ @ np.asarray(probs).T

        match self.bn.descriptor["types"][node_name]:
            case "disc_num":
                diff.append((node_value - cond_mean))
            case "disc":
                diff.append(self.encoding[node_name][node_value] - cond_mean)
            case "cont":
                diff.append((node_value - cond_mean) / std)

    return np.asarray(diff).reshape(-1, 1)

score(X)

Computes the scores for all child nodes in the Bayesian network.

Parameters:

Name Type Description Default
X DataFrame

The input data.

required

Returns:

Type Description

np.ndarray: A 2D array of scores for all child nodes.

Source code in applybn/anomaly_detection/scores/model_based.py
def score(self, X: pd.DataFrame):
    """
    Computes the scores for all child nodes in the Bayesian network.

    Args:
        X: The input data.

    Returns:
        np.ndarray: A 2D array of scores for all child nodes.
    """
    if self.verbose >= 1:
        model_iterator = tqdm(self.child_nodes, desc="Model")
    else:
        model_iterator = self.child_nodes

    model_factors = []
    for child_node in model_iterator:
        model_factors.append(self.local_score(X, child_node))

    return np.hstack(model_factors)

Оценка на основе IQR

Bases: BNBasedScore

A score class that uses the Interquartile Range (IQR) for anomaly detection.

Source code in applybn/anomaly_detection/scores/model_based.py
class IQRBasedScore(BNBasedScore):
    """
    A score class that uses the Interquartile Range (IQR) for anomaly detection.
    """

    def __init__(
        self,
        bn: bamt_network,
        encoding: dict,
        iqr_sensivity: float = 1.0,
        verbose: int = 1,
    ):
        """
        Initializes the IQRBasedScore object.

        Args:
            bn: The Bayesian network used for scoring.
            encoding: The encoding for discrete variables.
            iqr_sensivity: The sensitivity factor for IQR-based scoring.
            verbose: The verbosity level for logging.
        """
        super().__init__(bn=bn, encoding=encoding, verbose=verbose)
        self.iqr_sensivity = iqr_sensivity

    @staticmethod
    def score_iqr(
        upper: float, lower: float, y: float, max_distance: float, min_distance: float
    ):
        """
        Computes the IQR-based score for a given value.

        Args:
            upper: The upper bound of the IQR.
            lower: The lower bound of the IQR.
            y: The value to score.
            max_distance: The maximum distance for scaling.
            min_distance: The minimum distance for scaling.

        Raises:
            ValueError: If the closest value does not match either upper or lower bound.

        Returns:
            float: The IQR-based score.
        """
        if lower < y <= upper:
            return 0

        closest_value = min([upper, lower], key=lambda x: abs(x - y))
        current_distance = abs(closest_value - y)

        if closest_value == upper:
            ref_distance = max_distance
        elif closest_value == lower:
            ref_distance = min_distance
        else:
            raise ValueError(
                "Unexpected state: closest_value does not match either upper or lower bound."
            )

        return min(1, current_distance / abs(ref_distance))

    def local_score(self, X: pd.DataFrame, node_name: str):
        """
        Computes the local IQR-based score for a specific node.

        Args:
            X: The input data.
            node_name : The name of the node to compute the score for.

        Returns:
            np.ndarray: An array of local scores for the specified node.
        """
        node = self.bn[node_name]
        parents = node.cont_parents + node.disc_parents
        parent_dtypes = X[parents].dtypes.to_dict()

        scores = []
        for i in X.index:
            row_df = X.loc[[i], parents].astype(parent_dtypes)
            pvalues = row_df.to_dict("records")[0]
            dist = self.bn.get_dist(node_name, pvals=pvalues).get(with_gaussian=True)

            X_value = X.loc[i, node_name]
            q25 = dist.ppf(0.25)
            q75 = dist.ppf(0.75)
            iqr = q75 - q25

            lower_bound = q25 - iqr * self.iqr_sensivity
            upper_bound = q75 + iqr * self.iqr_sensivity

            scores.append(
                self.score_iqr(
                    upper_bound,
                    lower_bound,
                    X_value,
                    max_distance=1 * X[node_name].max(),
                    min_distance=1 * X[node_name].min(),
                )
            )

        return np.asarray(scores).reshape(-1, 1)

__init__(bn, encoding, iqr_sensivity=1.0, verbose=1)

Initializes the IQRBasedScore object.

Parameters:

Name Type Description Default
bn bamt_network

The Bayesian network used for scoring.

required
encoding dict

The encoding for discrete variables.

required
iqr_sensivity float

The sensitivity factor for IQR-based scoring.

1.0
verbose int

The verbosity level for logging.

1
Source code in applybn/anomaly_detection/scores/model_based.py
def __init__(
    self,
    bn: bamt_network,
    encoding: dict,
    iqr_sensivity: float = 1.0,
    verbose: int = 1,
):
    """
    Initializes the IQRBasedScore object.

    Args:
        bn: The Bayesian network used for scoring.
        encoding: The encoding for discrete variables.
        iqr_sensivity: The sensitivity factor for IQR-based scoring.
        verbose: The verbosity level for logging.
    """
    super().__init__(bn=bn, encoding=encoding, verbose=verbose)
    self.iqr_sensivity = iqr_sensivity

local_score(X, node_name)

Computes the local IQR-based score for a specific node.

Parameters:

Name Type Description Default
X DataFrame

The input data.

required
node_name

The name of the node to compute the score for.

required

Returns:

Type Description

np.ndarray: An array of local scores for the specified node.

Source code in applybn/anomaly_detection/scores/model_based.py
def local_score(self, X: pd.DataFrame, node_name: str):
    """
    Computes the local IQR-based score for a specific node.

    Args:
        X: The input data.
        node_name : The name of the node to compute the score for.

    Returns:
        np.ndarray: An array of local scores for the specified node.
    """
    node = self.bn[node_name]
    parents = node.cont_parents + node.disc_parents
    parent_dtypes = X[parents].dtypes.to_dict()

    scores = []
    for i in X.index:
        row_df = X.loc[[i], parents].astype(parent_dtypes)
        pvalues = row_df.to_dict("records")[0]
        dist = self.bn.get_dist(node_name, pvals=pvalues).get(with_gaussian=True)

        X_value = X.loc[i, node_name]
        q25 = dist.ppf(0.25)
        q75 = dist.ppf(0.75)
        iqr = q75 - q25

        lower_bound = q25 - iqr * self.iqr_sensivity
        upper_bound = q75 + iqr * self.iqr_sensivity

        scores.append(
            self.score_iqr(
                upper_bound,
                lower_bound,
                X_value,
                max_distance=1 * X[node_name].max(),
                min_distance=1 * X[node_name].min(),
            )
        )

    return np.asarray(scores).reshape(-1, 1)

score_iqr(upper, lower, y, max_distance, min_distance) staticmethod

Computes the IQR-based score for a given value.

Parameters:

Name Type Description Default
upper float

The upper bound of the IQR.

required
lower float

The lower bound of the IQR.

required
y float

The value to score.

required
max_distance float

The maximum distance for scaling.

required
min_distance float

The minimum distance for scaling.

required

Raises:

Type Description
ValueError

If the closest value does not match either upper or lower bound.

Returns:

Name Type Description
float

The IQR-based score.

Source code in applybn/anomaly_detection/scores/model_based.py
@staticmethod
def score_iqr(
    upper: float, lower: float, y: float, max_distance: float, min_distance: float
):
    """
    Computes the IQR-based score for a given value.

    Args:
        upper: The upper bound of the IQR.
        lower: The lower bound of the IQR.
        y: The value to score.
        max_distance: The maximum distance for scaling.
        min_distance: The minimum distance for scaling.

    Raises:
        ValueError: If the closest value does not match either upper or lower bound.

    Returns:
        float: The IQR-based score.
    """
    if lower < y <= upper:
        return 0

    closest_value = min([upper, lower], key=lambda x: abs(x - y))
    current_distance = abs(closest_value - y)

    if closest_value == upper:
        ref_distance = max_distance
    elif closest_value == lower:
        ref_distance = min_distance
    else:
        raise ValueError(
            "Unexpected state: closest_value does not match either upper or lower bound."
        )

    return min(1, current_distance / abs(ref_distance))

Оценка на основе отношения условных вероятностей

Bases: BNBasedScore

A score class that uses conditional probability ratios for anomaly detection.

Source code in applybn/anomaly_detection/scores/model_based.py
class CondRatioScore(BNBasedScore):
    """
    A score class that uses conditional probability ratios for anomaly detection.
    """

    def __init__(self, bn: bamt_network, encoding: dict, verbose: int = 1):
        """
        Initializes the CondRatioScore object.

        Args:
            bn: The Bayesian network used for scoring.
            encoding: The encoding for discrete variables.
            verbose: The verbosity level for logging.
        """
        super(CondRatioScore, self).__init__(bn=bn, encoding=encoding, verbose=verbose)

    def local_score(self, X: pd.DataFrame, node_name: str):
        """
        Computes the local conditional ratio score for a specific node.

        Args:
            X: The input data.
            node_name: The name of the node to compute the score for.

        Returns:
            np.ndarray: An array of local scores for the specified node.
        """
        node = self.bn[node_name]
        diff = []
        parents = node.cont_parents + node.disc_parents
        parent_dtypes = X[parents].dtypes.to_dict()

        for i in X.index:
            row_df = X.loc[[i], parents].astype(parent_dtypes)
            pvalues = row_df.to_dict("records")[0]
            node_value = X.loc[i, node_name]
            cond_dist = self.bn.get_dist(node_name, pvals=pvalues).get()

            diff.append(self.score_proba_ratio(X[node_name], node_value, cond_dist))

        return np.asarray(diff).reshape(-1, 1)

    @staticmethod
    def score_proba_ratio(sample: pd.Series, X_value: str, cond_dist: tuple):
        """
        Computes the conditional probability ratio score.

        Args:
            sample: The sample data.
            X_value: The value to score.
            cond_dist: The conditional distribution.

        Returns:
            float: The conditional probability ratio score.
        """
        cond_probs, values = cond_dist
        marginal_prob = sample.value_counts(normalize=True)[X_value]

        index = values.index(str(X_value))
        cond_prob = cond_probs[index]

        if not np.isfinite(marginal_prob / cond_prob):
            return np.nan

        return min(1, marginal_prob / cond_prob)

__init__(bn, encoding, verbose=1)

Initializes the CondRatioScore object.

Parameters:

Name Type Description Default
bn bamt_network

The Bayesian network used for scoring.

required
encoding dict

The encoding for discrete variables.

required
verbose int

The verbosity level for logging.

1
Source code in applybn/anomaly_detection/scores/model_based.py
def __init__(self, bn: bamt_network, encoding: dict, verbose: int = 1):
    """
    Initializes the CondRatioScore object.

    Args:
        bn: The Bayesian network used for scoring.
        encoding: The encoding for discrete variables.
        verbose: The verbosity level for logging.
    """
    super(CondRatioScore, self).__init__(bn=bn, encoding=encoding, verbose=verbose)

local_score(X, node_name)

Computes the local conditional ratio score for a specific node.

Parameters:

Name Type Description Default
X DataFrame

The input data.

required
node_name str

The name of the node to compute the score for.

required

Returns:

Type Description

np.ndarray: An array of local scores for the specified node.

Source code in applybn/anomaly_detection/scores/model_based.py
def local_score(self, X: pd.DataFrame, node_name: str):
    """
    Computes the local conditional ratio score for a specific node.

    Args:
        X: The input data.
        node_name: The name of the node to compute the score for.

    Returns:
        np.ndarray: An array of local scores for the specified node.
    """
    node = self.bn[node_name]
    diff = []
    parents = node.cont_parents + node.disc_parents
    parent_dtypes = X[parents].dtypes.to_dict()

    for i in X.index:
        row_df = X.loc[[i], parents].astype(parent_dtypes)
        pvalues = row_df.to_dict("records")[0]
        node_value = X.loc[i, node_name]
        cond_dist = self.bn.get_dist(node_name, pvals=pvalues).get()

        diff.append(self.score_proba_ratio(X[node_name], node_value, cond_dist))

    return np.asarray(diff).reshape(-1, 1)

score_proba_ratio(sample, X_value, cond_dist) staticmethod

Computes the conditional probability ratio score.

Parameters:

Name Type Description Default
sample Series

The sample data.

required
X_value str

The value to score.

required
cond_dist tuple

The conditional distribution.

required

Returns:

Name Type Description
float

The conditional probability ratio score.

Source code in applybn/anomaly_detection/scores/model_based.py
@staticmethod
def score_proba_ratio(sample: pd.Series, X_value: str, cond_dist: tuple):
    """
    Computes the conditional probability ratio score.

    Args:
        sample: The sample data.
        X_value: The value to score.
        cond_dist: The conditional distribution.

    Returns:
        float: The conditional probability ratio score.
    """
    cond_probs, values = cond_dist
    marginal_prob = sample.value_counts(normalize=True)[X_value]

    index = values.index(str(X_value))
    cond_prob = cond_probs[index]

    if not np.isfinite(marginal_prob / cond_prob):
        return np.nan

    return min(1, marginal_prob / cond_prob)

Комбинированная оценка на основе IQR и отношения условных вероятностей

Bases: BNBasedScore

A score class that combines IQR-based scoring and probability ratio scoring for anomaly detection.

Source code in applybn/anomaly_detection/scores/model_based.py
class CombinedIQRandProbRatioScore(BNBasedScore):
    """
    A score class that combines IQR-based scoring and probability ratio scoring for anomaly detection.
    """

    def __init__(
        self, bn: bamt_network, encoding: dict, scores: dict, verbose: int = 1
    ):
        """
        Initializes the CombinedIQRandProbRatioScore object.

        Args:
            bn: The Bayesian network used for scoring.
            encoding: The encoding for discrete variables.
            scores: A dictionary containing scoring objects for continuous and discrete variables.
            verbose: The verbosity level for logging.
        """
        super(CombinedIQRandProbRatioScore, self).__init__(
            bn=bn, encoding=encoding, verbose=verbose
        )
        self.scores = scores

    def local_score(self, X: pd.DataFrame, node_name: str):
        """
        Computes the local score for a specific node by combining IQR-based and probability ratio scoring.

        Args:
            X: The input data.
            node_name: The name of the node to compute the score for.

        Returns:
            np.ndarray: An array of local scores for the specified node.
        """
        node = self.bn[node_name]
        iqr_sensivity = self.scores["cont"].iqr_sensivity
        parents = node.cont_parents + node.disc_parents
        parent_dtypes = X[parents].dtypes.to_dict()

        scores = []
        for i in X.index:
            row_df = X.loc[[i], parents].astype(parent_dtypes)
            pvalues = row_df.to_dict("records")[0]
            X_value = X.loc[i, node_name]
            dist = self.bn.get_dist(node_name, pvals=pvalues)

            if "gaussian" in dist.node_type:
                dist = dist.get(with_gaussian=True)
                if dist.kwds["scale"] == 0:
                    scores.append(0)
                    continue

                q25 = dist.ppf(0.25)
                q75 = dist.ppf(0.75)
                iqr = q75 - q25

                lower_bound = q25 - iqr * iqr_sensivity
                upper_bound = q75 + iqr * iqr_sensivity

                scores.append(
                    self.scores["cont"].score_iqr(
                        upper_bound,
                        lower_bound,
                        X_value,
                        max_distance=1 * X[node_name].max(),
                        min_distance=1 * X[node_name].min(),
                    )
                )
            else:
                dist = dist.get()
                scores.append(
                    self.scores["disc"].score_proba_ratio(X[node_name], X_value, dist)
                )

        return np.asarray(scores).reshape(-1, 1)

__init__(bn, encoding, scores, verbose=1)

Initializes the CombinedIQRandProbRatioScore object.

Parameters:

Name Type Description Default
bn bamt_network

The Bayesian network used for scoring.

required
encoding dict

The encoding for discrete variables.

required
scores dict

A dictionary containing scoring objects for continuous and discrete variables.

required
verbose int

The verbosity level for logging.

1
Source code in applybn/anomaly_detection/scores/model_based.py
def __init__(
    self, bn: bamt_network, encoding: dict, scores: dict, verbose: int = 1
):
    """
    Initializes the CombinedIQRandProbRatioScore object.

    Args:
        bn: The Bayesian network used for scoring.
        encoding: The encoding for discrete variables.
        scores: A dictionary containing scoring objects for continuous and discrete variables.
        verbose: The verbosity level for logging.
    """
    super(CombinedIQRandProbRatioScore, self).__init__(
        bn=bn, encoding=encoding, verbose=verbose
    )
    self.scores = scores

local_score(X, node_name)

Computes the local score for a specific node by combining IQR-based and probability ratio scoring.

Parameters:

Name Type Description Default
X DataFrame

The input data.

required
node_name str

The name of the node to compute the score for.

required

Returns:

Type Description

np.ndarray: An array of local scores for the specified node.

Source code in applybn/anomaly_detection/scores/model_based.py
def local_score(self, X: pd.DataFrame, node_name: str):
    """
    Computes the local score for a specific node by combining IQR-based and probability ratio scoring.

    Args:
        X: The input data.
        node_name: The name of the node to compute the score for.

    Returns:
        np.ndarray: An array of local scores for the specified node.
    """
    node = self.bn[node_name]
    iqr_sensivity = self.scores["cont"].iqr_sensivity
    parents = node.cont_parents + node.disc_parents
    parent_dtypes = X[parents].dtypes.to_dict()

    scores = []
    for i in X.index:
        row_df = X.loc[[i], parents].astype(parent_dtypes)
        pvalues = row_df.to_dict("records")[0]
        X_value = X.loc[i, node_name]
        dist = self.bn.get_dist(node_name, pvals=pvalues)

        if "gaussian" in dist.node_type:
            dist = dist.get(with_gaussian=True)
            if dist.kwds["scale"] == 0:
                scores.append(0)
                continue

            q25 = dist.ppf(0.25)
            q75 = dist.ppf(0.75)
            iqr = q75 - q25

            lower_bound = q25 - iqr * iqr_sensivity
            upper_bound = q75 + iqr * iqr_sensivity

            scores.append(
                self.scores["cont"].score_iqr(
                    upper_bound,
                    lower_bound,
                    X_value,
                    max_distance=1 * X[node_name].max(),
                    min_distance=1 * X[node_name].min(),
                )
            )
        else:
            dist = dist.get()
            scores.append(
                self.scores["disc"].score_proba_ratio(X[node_name], X_value, dist)
            )

    return np.asarray(scores).reshape(-1, 1)