Skip to content

Каузальный анализ на основе концептов

Ссылка на руководство пользователя

A tool for extracting and analyzing causal concepts from tabular datasets.

This class provides methods to cluster data, evaluate discriminability of clusters, extract concept definitions, and estimate causal effects on different outcomes.

Source code in applybn/explainable/causal_analysis/concept_causal_effect.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
class ConceptCausalExplainer:
    """A tool for extracting and analyzing causal concepts from tabular datasets.

    This class provides methods to cluster data, evaluate discriminability of clusters,
    extract concept definitions, and estimate causal effects on different outcomes.
    """

    @staticmethod
    def calculate_confidence_uncertainty(
        X: pd.DataFrame,
        y: pd.Series | np.ndarray,
        clf: ClassifierMixin | BaseEstimator,
    ) -> tuple:
        """Calculate model confidence and aleatoric uncertainty using DataIQ.

        Args:
            X: Feature matrix.
            y: Target labels or values.
            clf: A trained classifier that supports predict_proba or similar.

        Returns:
            A tuple (confidence, aleatoric_uncertainty) containing:
                - confidence: Model confidence scores.
                - aleatoric_uncertainty: Aleatoric uncertainty scores.
        """
        data_iq = DataIQSKLearn(X=X, y=y)
        data_iq.on_epoch_end(clf=clf, iteration=10)
        confidence = data_iq.confidence
        aleatoric_uncertainty = data_iq.aleatoric
        return confidence, aleatoric_uncertainty

    @staticmethod
    def perform_clustering(
        D: pd.DataFrame, num_clusters: int, random_state=42
    ) -> np.ndarray:
        """Perform KMeans clustering on the dataset.

        Args:
            D: The dataset for clustering (without index column).
            num_clusters: The number of clusters to form.
            random_state: Seed for KMeans.

        Returns:
            Array of cluster labels.
        """
        kmeans = KMeans(n_clusters=num_clusters, random_state=random_state)
        clusters = kmeans.fit_predict(D)
        return clusters

    @staticmethod
    def evaluate_discriminability(
        D: pd.DataFrame,
        N: pd.DataFrame,
        clusters: np.ndarray,
        auc_threshold: float,
        k_min_cluster_size: int,
        random_state=42,
    ) -> list:
        """Evaluate discriminability of clusters using an SVM and AUC.

        Args:
            D: The discovery dataset, expected to include an 'index' column.
            N: The negative (natural) dataset, expected to include an 'index' column.
            clusters: Cluster labels from perform_clustering.
            auc_threshold: A threshold for AUC to consider a cluster discriminative.
            k_min_cluster_size: Minimum cluster size for evaluation.
            random_state: Seed for splitting and SVC

        Returns:
            A list of dictionaries containing information about discriminative clusters.
        """
        discriminative_clusters = []
        cluster_labels = np.unique(clusters)

        for cluster_label in cluster_labels:
            cluster_indices = np.where(clusters == cluster_label)[0]
            if len(cluster_indices) >= k_min_cluster_size:
                # Prepare data for SVM classifier
                X_cluster = D.iloc[cluster_indices]
                y_cluster = np.ones(len(cluster_indices))
                X_N = N.drop(columns="index")  # Negative class is the natural dataset N
                y_N = np.zeros(len(X_N))
                # Combine datasets
                X_train = pd.concat([X_cluster.drop(columns="index"), X_N], axis=0)
                y_train = np.concatenate([y_cluster, y_N])
                # Split into training and validation sets
                X_train_svm, X_val_svm, y_train_svm, y_val_svm = train_test_split(
                    X_train, y_train, test_size=0.3, random_state=random_state
                )
                # Train SVM
                S_i = SVC(kernel="linear", probability=True, random_state=random_state)
                S_i.fit(X_train_svm, y_train_svm)
                # Evaluate discriminability using AUC
                y_scores = S_i.decision_function(X_val_svm)
                auc_score = roc_auc_score(y_val_svm, y_scores)
                logger.info(f"Cluster {cluster_label}: AUC = {auc_score:.4f}")
                if auc_score > auc_threshold:
                    # Discriminative cluster found
                    discriminative_clusters.append(
                        {
                            "classifier": S_i,
                            "cluster_label": cluster_label,
                            "cluster_indices": D.iloc[cluster_indices]["index"].values,
                            "auc_score": auc_score,
                        }
                    )
        return discriminative_clusters

    def extract_concepts(
        self,
        D: pd.DataFrame,
        N: pd.DataFrame,
        auc_threshold: float = 0.7,
        k_min_cluster_size: int = 100,
        max_clusters: int = 10,
        max_iterations: int = 10,
    ) -> list:
        """Extract concepts from a discovery dataset.

        Clusters the dataset incrementally and looks for discriminative clusters.

        Args:
            D: Discovery dataset with an 'index' column.
            N: Negative (natural) dataset with an 'index' column.
            auc_threshold: Threshold for AUC to declare a cluster discriminative.
            k_min_cluster_size: Minimum cluster size for evaluation.
            max_clusters: Maximum number of clusters to attempt.
            max_iterations: Maximum iterations for incremental clustering.

        Returns:
            A list of discriminative cluster dictionaries.
        """
        svm_classifiers = []
        cluster_concepts = []
        no_improvement_counter = 0
        num_clusters = 9
        iteration = 0

        while num_clusters <= max_clusters and iteration < max_iterations:
            iteration += 1
            logger.info(
                f"\nIteration {iteration}: Clustering with {num_clusters} clusters"
            )
            clusters = self.perform_clustering(D.drop(columns="index"), num_clusters)
            discriminative_clusters = self.evaluate_discriminability(
                D, N, clusters, auc_threshold, k_min_cluster_size
            )

            if discriminative_clusters:
                for concept in discriminative_clusters:
                    svm_classifiers.append(concept["classifier"])
                    cluster_concepts.append(concept)
                no_improvement_counter = 0
            else:
                no_improvement_counter += 1

            if no_improvement_counter >= 3:
                logger.info("No significant improvement in discriminability.")
                break

            num_clusters += 1

        return cluster_concepts

    @staticmethod
    def generate_concept_space(X: pd.DataFrame, cluster_concepts: list) -> pd.DataFrame:
        """Generate a binary concept space from the given cluster concepts.

        Args:
            X: The entire preprocessed dataset.
            cluster_concepts: A list of discriminative cluster dictionaries.

        Returns:
            A DataFrame with binary columns indicating concept membership.
        """
        A = pd.DataFrame(index=X.index)
        for idx, concept in enumerate(cluster_concepts):
            classifier = concept["classifier"]
            A_i_scores = classifier.decision_function(X)
            A[f"Concept_{idx}"] = (A_i_scores > 0).astype(int)
        return A

    @staticmethod
    def select_features_for_concept(
        concept_data: pd.DataFrame,
        other_data: pd.DataFrame,
        features: list,
        original_data: pd.DataFrame,
        lambda_reg: float = 0.1,
    ) -> dict:
        """Select features for a concept and extract value ranges or categories.

        Args:
            concept_data: Data points belonging to the concept.
            other_data: Remaining data points not in the concept.
            features: List of feature names in the preprocessed dataset.
            original_data: Original dataset (before one-hot encoding).
            lambda_reg: Regularization parameter to penalize variance or overlap.

        Returns:
            A dictionary mapping features to their type and range/categories.
        """
        selected_features = {}
        for feature in features:
            X_i_feature = concept_data[feature]
            X_minus_i_feature = other_data[feature]
            # For numerical features, calculate the mean difference
            if (
                feature in original_data.columns
                and original_data[feature].dtype != object
            ):
                mean_diff = abs(X_i_feature.mean() - X_minus_i_feature.mean())
                var_within = X_i_feature.var()
                score = mean_diff - lambda_reg * var_within
                if score > 0:
                    # Get value range from original data
                    original_feature = feature
                    indices = concept_data.index
                    X_i_orig_feature = original_data.loc[indices, original_feature]
                    value_range = (X_i_orig_feature.min(), X_i_orig_feature.max())
                    selected_features[original_feature] = {
                        "type": "numeric",
                        "range": value_range,
                    }
            else:
                # For one-hot encoded categorical features
                proportion_in_concept = X_i_feature.mean()
                proportion_in_others = X_minus_i_feature.mean()
                proportion_diff = proportion_in_concept - proportion_in_others
                score = abs(proportion_diff) - lambda_reg
                if score > 0:
                    # Map back to original feature and category
                    if "_" in feature:
                        original_feature = "_".join(feature.split("_")[:-1])
                        category = feature.split("_")[-1]
                        selected_features.setdefault(
                            original_feature, {"type": "categorical", "categories": []}
                        )
                        selected_features[original_feature]["categories"].append(
                            category
                        )
        return selected_features

    def extract_concept_meanings(
        self, D: pd.DataFrame, cluster_concepts: list, original_data: pd.DataFrame
    ) -> dict:
        """Extract the meanings (dominant features) of each concept.

        Args:
            D: Preprocessed discovery dataset with an 'index' column.
            cluster_concepts: List of discriminative cluster dictionaries.
            original_data: Original dataset (before one-hot encoding).

        Returns:
            A dictionary mapping concept names to their selected features and values.
        """
        selected_features_per_concept = {}
        features = D.drop(columns="index").columns.tolist()

        for idx, concept in enumerate(cluster_concepts):
            cluster_indices = concept["cluster_indices"]
            concept_data = D.set_index("index").loc[cluster_indices]
            other_indices = D.index.difference(concept_data.index)
            other_data = D.loc[other_indices]
            selected_features = self.select_features_for_concept(
                concept_data, other_data, features, original_data
            )
            concept_key = f"Concept_{idx}"
            selected_features_per_concept[concept_key] = selected_features
            logger.info(f"\n{concept_key} selected features and values:")
            for feature, details in selected_features.items():
                if details["type"] == "numeric":
                    logger.info(f"  {feature}: range {details['range']}")
                else:
                    logger.info(f"  {feature}: categories {details['categories']}")
        return selected_features_per_concept

    @staticmethod
    def estimate_causal_effects(D_c: pd.DataFrame) -> dict:
        """Estimate the causal effect of each concept on a binary outcome.

        Args:
            D_c: DataFrame where columns are concepts plus the outcome 'L_f' (binary).

        Returns:
            Dictionary of concept names to their estimated coefficients (logistic regression).
        """
        effects = {}
        outcome = "L_f"

        for concept in D_c.columns:
            if concept != outcome:
                # Prepare data
                X = D_c[[concept]].copy()
                # Control for other concepts
                other_concepts = [
                    col for col in D_c.columns if col not in [concept, outcome]
                ]
                if other_concepts:
                    X[other_concepts] = D_c[other_concepts]
                X = sm.add_constant(X)
                y = D_c[outcome]
                # Fit logistic regression
                model = sm.Logit(y, X).fit(disp=0)
                # Extract the coefficient for the concept
                coef = model.params[concept]
                effects[concept] = coef
                logger.info(
                    f"{concept}: Estimated Coefficient (Causal Effect) = {coef:.4f}"
                )
        return effects

    @staticmethod
    def estimate_causal_effects_on_continuous_outcomes(
        D_c: pd.DataFrame, outcome_name: str
    ) -> dict:
        """Estimate causal effects on continuous outcomes using econML's LinearDML or CausalForestDML.

        Args:
            D_c: DataFrame where columns include concepts and a continuous outcome.
            outcome_name: Name of the continuous outcome column.

        Returns:
            Dictionary of concept names to their estimated causal effect on the outcome.
        """
        from sklearn.ensemble import RandomForestRegressor

        effects = {}
        for concept in D_c.columns:
            if concept != outcome_name:
                # Define treatment and outcome
                T = D_c[[concept]].values.ravel()
                Y = D_c[outcome_name].values
                # Control for other concepts
                X_controls = D_c.drop(columns=[concept, outcome_name])

                # Simple heuristic for selecting estimator
                if X_controls.shape[0] > X_controls.shape[1] * 2:
                    est = LinearDML(
                        model_y=RandomForestRegressor(),
                        model_t=RandomForestRegressor(),
                        linear_first_stages=False,
                    )
                else:
                    logger.info(
                        f"Using CausalForestDML for {concept} due to high dimensionality in controls."
                    )
                    est = CausalForestDML()

                # Fit model and calculate treatment effect
                est.fit(Y, T, X=X_controls)
                treatment_effect = est.effect(X=X_controls)

                # Store the mean effect for the current concept
                effects[concept] = treatment_effect.mean()
                logger.info(
                    f"{concept}: Estimated Causal Effect = {treatment_effect.mean():.4f}"
                )

        return effects

    @staticmethod
    def plot_tornado(
        effects_dict: dict,
        title: str = "Tornado Plot",
        figsize: tuple[int, int] = (10, 6),
    ):
        """Visualize causal effects using a tornado plot.

        Args:
            effects_dict: Dictionary of {concept: effect_size}
            title: Title for the plot
            figsize: Figure dimensions
        """
        # Sort effects by absolute value
        sorted_effects = sorted(
            effects_dict.items(), key=lambda x: abs(x[1]), reverse=True
        )

        # Prepare data for plotting
        concepts = [k for k, v in sorted_effects]
        values = [v for k, v in sorted_effects]
        colors = [
            "#4C72B0" if v > 0 else "#DD8452" for v in values
        ]  # Blue for positive, orange for negative

        # Create plot
        plt.figure(figsize=figsize)
        y_pos = np.arange(len(concepts))

        # Create horizontal bars
        bars = plt.barh(y_pos, values, color=colors)

        # Add reference line and styling
        plt.axvline(0, color="black", linewidth=0.8)
        plt.yticks(y_pos, concepts)
        plt.xlabel("Causal Effect Size")
        plt.title(title)
        plt.gca().invert_yaxis()  # Largest effect at top

        # Add value labels
        for bar, value in zip(bars, values):
            if value > 0:
                ha = "left"
                xpos = min(value + 0.01, max(values) * 0.95)
            else:
                ha = "right"
                xpos = max(value - 0.01, min(values) * 0.95)
            plt.text(
                xpos,
                bar.get_y() + bar.get_height() / 2,
                f"{value:.3f}",
                ha=ha,
                va="center",
                color="black",
            )

        plt.tight_layout()
        plt.show()

calculate_confidence_uncertainty(X, y, clf) staticmethod

Calculate model confidence and aleatoric uncertainty using DataIQ.

Parameters:

Name Type Description Default
X DataFrame

Feature matrix.

required
y Series | ndarray

Target labels or values.

required
clf ClassifierMixin | BaseEstimator

A trained classifier that supports predict_proba or similar.

required

Returns:

Type Description
tuple

A tuple (confidence, aleatoric_uncertainty) containing: - confidence: Model confidence scores. - aleatoric_uncertainty: Aleatoric uncertainty scores.

Source code in applybn/explainable/causal_analysis/concept_causal_effect.py
@staticmethod
def calculate_confidence_uncertainty(
    X: pd.DataFrame,
    y: pd.Series | np.ndarray,
    clf: ClassifierMixin | BaseEstimator,
) -> tuple:
    """Calculate model confidence and aleatoric uncertainty using DataIQ.

    Args:
        X: Feature matrix.
        y: Target labels or values.
        clf: A trained classifier that supports predict_proba or similar.

    Returns:
        A tuple (confidence, aleatoric_uncertainty) containing:
            - confidence: Model confidence scores.
            - aleatoric_uncertainty: Aleatoric uncertainty scores.
    """
    data_iq = DataIQSKLearn(X=X, y=y)
    data_iq.on_epoch_end(clf=clf, iteration=10)
    confidence = data_iq.confidence
    aleatoric_uncertainty = data_iq.aleatoric
    return confidence, aleatoric_uncertainty

estimate_causal_effects(D_c) staticmethod

Estimate the causal effect of each concept on a binary outcome.

Parameters:

Name Type Description Default
D_c DataFrame

DataFrame where columns are concepts plus the outcome 'L_f' (binary).

required

Returns:

Type Description
dict

Dictionary of concept names to their estimated coefficients (logistic regression).

Source code in applybn/explainable/causal_analysis/concept_causal_effect.py
@staticmethod
def estimate_causal_effects(D_c: pd.DataFrame) -> dict:
    """Estimate the causal effect of each concept on a binary outcome.

    Args:
        D_c: DataFrame where columns are concepts plus the outcome 'L_f' (binary).

    Returns:
        Dictionary of concept names to their estimated coefficients (logistic regression).
    """
    effects = {}
    outcome = "L_f"

    for concept in D_c.columns:
        if concept != outcome:
            # Prepare data
            X = D_c[[concept]].copy()
            # Control for other concepts
            other_concepts = [
                col for col in D_c.columns if col not in [concept, outcome]
            ]
            if other_concepts:
                X[other_concepts] = D_c[other_concepts]
            X = sm.add_constant(X)
            y = D_c[outcome]
            # Fit logistic regression
            model = sm.Logit(y, X).fit(disp=0)
            # Extract the coefficient for the concept
            coef = model.params[concept]
            effects[concept] = coef
            logger.info(
                f"{concept}: Estimated Coefficient (Causal Effect) = {coef:.4f}"
            )
    return effects

estimate_causal_effects_on_continuous_outcomes(D_c, outcome_name) staticmethod

Estimate causal effects on continuous outcomes using econML's LinearDML or CausalForestDML.

Parameters:

Name Type Description Default
D_c DataFrame

DataFrame where columns include concepts and a continuous outcome.

required
outcome_name str

Name of the continuous outcome column.

required

Returns:

Type Description
dict

Dictionary of concept names to their estimated causal effect on the outcome.

Source code in applybn/explainable/causal_analysis/concept_causal_effect.py
@staticmethod
def estimate_causal_effects_on_continuous_outcomes(
    D_c: pd.DataFrame, outcome_name: str
) -> dict:
    """Estimate causal effects on continuous outcomes using econML's LinearDML or CausalForestDML.

    Args:
        D_c: DataFrame where columns include concepts and a continuous outcome.
        outcome_name: Name of the continuous outcome column.

    Returns:
        Dictionary of concept names to their estimated causal effect on the outcome.
    """
    from sklearn.ensemble import RandomForestRegressor

    effects = {}
    for concept in D_c.columns:
        if concept != outcome_name:
            # Define treatment and outcome
            T = D_c[[concept]].values.ravel()
            Y = D_c[outcome_name].values
            # Control for other concepts
            X_controls = D_c.drop(columns=[concept, outcome_name])

            # Simple heuristic for selecting estimator
            if X_controls.shape[0] > X_controls.shape[1] * 2:
                est = LinearDML(
                    model_y=RandomForestRegressor(),
                    model_t=RandomForestRegressor(),
                    linear_first_stages=False,
                )
            else:
                logger.info(
                    f"Using CausalForestDML for {concept} due to high dimensionality in controls."
                )
                est = CausalForestDML()

            # Fit model and calculate treatment effect
            est.fit(Y, T, X=X_controls)
            treatment_effect = est.effect(X=X_controls)

            # Store the mean effect for the current concept
            effects[concept] = treatment_effect.mean()
            logger.info(
                f"{concept}: Estimated Causal Effect = {treatment_effect.mean():.4f}"
            )

    return effects

evaluate_discriminability(D, N, clusters, auc_threshold, k_min_cluster_size, random_state=42) staticmethod

Evaluate discriminability of clusters using an SVM and AUC.

Parameters:

Name Type Description Default
D DataFrame

The discovery dataset, expected to include an 'index' column.

required
N DataFrame

The negative (natural) dataset, expected to include an 'index' column.

required
clusters ndarray

Cluster labels from perform_clustering.

required
auc_threshold float

A threshold for AUC to consider a cluster discriminative.

required
k_min_cluster_size int

Minimum cluster size for evaluation.

required
random_state

Seed for splitting and SVC

42

Returns:

Type Description
list

A list of dictionaries containing information about discriminative clusters.

Source code in applybn/explainable/causal_analysis/concept_causal_effect.py
@staticmethod
def evaluate_discriminability(
    D: pd.DataFrame,
    N: pd.DataFrame,
    clusters: np.ndarray,
    auc_threshold: float,
    k_min_cluster_size: int,
    random_state=42,
) -> list:
    """Evaluate discriminability of clusters using an SVM and AUC.

    Args:
        D: The discovery dataset, expected to include an 'index' column.
        N: The negative (natural) dataset, expected to include an 'index' column.
        clusters: Cluster labels from perform_clustering.
        auc_threshold: A threshold for AUC to consider a cluster discriminative.
        k_min_cluster_size: Minimum cluster size for evaluation.
        random_state: Seed for splitting and SVC

    Returns:
        A list of dictionaries containing information about discriminative clusters.
    """
    discriminative_clusters = []
    cluster_labels = np.unique(clusters)

    for cluster_label in cluster_labels:
        cluster_indices = np.where(clusters == cluster_label)[0]
        if len(cluster_indices) >= k_min_cluster_size:
            # Prepare data for SVM classifier
            X_cluster = D.iloc[cluster_indices]
            y_cluster = np.ones(len(cluster_indices))
            X_N = N.drop(columns="index")  # Negative class is the natural dataset N
            y_N = np.zeros(len(X_N))
            # Combine datasets
            X_train = pd.concat([X_cluster.drop(columns="index"), X_N], axis=0)
            y_train = np.concatenate([y_cluster, y_N])
            # Split into training and validation sets
            X_train_svm, X_val_svm, y_train_svm, y_val_svm = train_test_split(
                X_train, y_train, test_size=0.3, random_state=random_state
            )
            # Train SVM
            S_i = SVC(kernel="linear", probability=True, random_state=random_state)
            S_i.fit(X_train_svm, y_train_svm)
            # Evaluate discriminability using AUC
            y_scores = S_i.decision_function(X_val_svm)
            auc_score = roc_auc_score(y_val_svm, y_scores)
            logger.info(f"Cluster {cluster_label}: AUC = {auc_score:.4f}")
            if auc_score > auc_threshold:
                # Discriminative cluster found
                discriminative_clusters.append(
                    {
                        "classifier": S_i,
                        "cluster_label": cluster_label,
                        "cluster_indices": D.iloc[cluster_indices]["index"].values,
                        "auc_score": auc_score,
                    }
                )
    return discriminative_clusters

extract_concept_meanings(D, cluster_concepts, original_data)

Extract the meanings (dominant features) of each concept.

Parameters:

Name Type Description Default
D DataFrame

Preprocessed discovery dataset with an 'index' column.

required
cluster_concepts list

List of discriminative cluster dictionaries.

required
original_data DataFrame

Original dataset (before one-hot encoding).

required

Returns:

Type Description
dict

A dictionary mapping concept names to their selected features and values.

Source code in applybn/explainable/causal_analysis/concept_causal_effect.py
def extract_concept_meanings(
    self, D: pd.DataFrame, cluster_concepts: list, original_data: pd.DataFrame
) -> dict:
    """Extract the meanings (dominant features) of each concept.

    Args:
        D: Preprocessed discovery dataset with an 'index' column.
        cluster_concepts: List of discriminative cluster dictionaries.
        original_data: Original dataset (before one-hot encoding).

    Returns:
        A dictionary mapping concept names to their selected features and values.
    """
    selected_features_per_concept = {}
    features = D.drop(columns="index").columns.tolist()

    for idx, concept in enumerate(cluster_concepts):
        cluster_indices = concept["cluster_indices"]
        concept_data = D.set_index("index").loc[cluster_indices]
        other_indices = D.index.difference(concept_data.index)
        other_data = D.loc[other_indices]
        selected_features = self.select_features_for_concept(
            concept_data, other_data, features, original_data
        )
        concept_key = f"Concept_{idx}"
        selected_features_per_concept[concept_key] = selected_features
        logger.info(f"\n{concept_key} selected features and values:")
        for feature, details in selected_features.items():
            if details["type"] == "numeric":
                logger.info(f"  {feature}: range {details['range']}")
            else:
                logger.info(f"  {feature}: categories {details['categories']}")
    return selected_features_per_concept

extract_concepts(D, N, auc_threshold=0.7, k_min_cluster_size=100, max_clusters=10, max_iterations=10)

Extract concepts from a discovery dataset.

Clusters the dataset incrementally and looks for discriminative clusters.

Parameters:

Name Type Description Default
D DataFrame

Discovery dataset with an 'index' column.

required
N DataFrame

Negative (natural) dataset with an 'index' column.

required
auc_threshold float

Threshold for AUC to declare a cluster discriminative.

0.7
k_min_cluster_size int

Minimum cluster size for evaluation.

100
max_clusters int

Maximum number of clusters to attempt.

10
max_iterations int

Maximum iterations for incremental clustering.

10

Returns:

Type Description
list

A list of discriminative cluster dictionaries.

Source code in applybn/explainable/causal_analysis/concept_causal_effect.py
def extract_concepts(
    self,
    D: pd.DataFrame,
    N: pd.DataFrame,
    auc_threshold: float = 0.7,
    k_min_cluster_size: int = 100,
    max_clusters: int = 10,
    max_iterations: int = 10,
) -> list:
    """Extract concepts from a discovery dataset.

    Clusters the dataset incrementally and looks for discriminative clusters.

    Args:
        D: Discovery dataset with an 'index' column.
        N: Negative (natural) dataset with an 'index' column.
        auc_threshold: Threshold for AUC to declare a cluster discriminative.
        k_min_cluster_size: Minimum cluster size for evaluation.
        max_clusters: Maximum number of clusters to attempt.
        max_iterations: Maximum iterations for incremental clustering.

    Returns:
        A list of discriminative cluster dictionaries.
    """
    svm_classifiers = []
    cluster_concepts = []
    no_improvement_counter = 0
    num_clusters = 9
    iteration = 0

    while num_clusters <= max_clusters and iteration < max_iterations:
        iteration += 1
        logger.info(
            f"\nIteration {iteration}: Clustering with {num_clusters} clusters"
        )
        clusters = self.perform_clustering(D.drop(columns="index"), num_clusters)
        discriminative_clusters = self.evaluate_discriminability(
            D, N, clusters, auc_threshold, k_min_cluster_size
        )

        if discriminative_clusters:
            for concept in discriminative_clusters:
                svm_classifiers.append(concept["classifier"])
                cluster_concepts.append(concept)
            no_improvement_counter = 0
        else:
            no_improvement_counter += 1

        if no_improvement_counter >= 3:
            logger.info("No significant improvement in discriminability.")
            break

        num_clusters += 1

    return cluster_concepts

generate_concept_space(X, cluster_concepts) staticmethod

Generate a binary concept space from the given cluster concepts.

Parameters:

Name Type Description Default
X DataFrame

The entire preprocessed dataset.

required
cluster_concepts list

A list of discriminative cluster dictionaries.

required

Returns:

Type Description
DataFrame

A DataFrame with binary columns indicating concept membership.

Source code in applybn/explainable/causal_analysis/concept_causal_effect.py
@staticmethod
def generate_concept_space(X: pd.DataFrame, cluster_concepts: list) -> pd.DataFrame:
    """Generate a binary concept space from the given cluster concepts.

    Args:
        X: The entire preprocessed dataset.
        cluster_concepts: A list of discriminative cluster dictionaries.

    Returns:
        A DataFrame with binary columns indicating concept membership.
    """
    A = pd.DataFrame(index=X.index)
    for idx, concept in enumerate(cluster_concepts):
        classifier = concept["classifier"]
        A_i_scores = classifier.decision_function(X)
        A[f"Concept_{idx}"] = (A_i_scores > 0).astype(int)
    return A

perform_clustering(D, num_clusters, random_state=42) staticmethod

Perform KMeans clustering on the dataset.

Parameters:

Name Type Description Default
D DataFrame

The dataset for clustering (without index column).

required
num_clusters int

The number of clusters to form.

required
random_state

Seed for KMeans.

42

Returns:

Type Description
ndarray

Array of cluster labels.

Source code in applybn/explainable/causal_analysis/concept_causal_effect.py
@staticmethod
def perform_clustering(
    D: pd.DataFrame, num_clusters: int, random_state=42
) -> np.ndarray:
    """Perform KMeans clustering on the dataset.

    Args:
        D: The dataset for clustering (without index column).
        num_clusters: The number of clusters to form.
        random_state: Seed for KMeans.

    Returns:
        Array of cluster labels.
    """
    kmeans = KMeans(n_clusters=num_clusters, random_state=random_state)
    clusters = kmeans.fit_predict(D)
    return clusters

plot_tornado(effects_dict, title='Tornado Plot', figsize=(10, 6)) staticmethod

Visualize causal effects using a tornado plot.

Parameters:

Name Type Description Default
effects_dict dict

Dictionary of {concept: effect_size}

required
title str

Title for the plot

'Tornado Plot'
figsize tuple[int, int]

Figure dimensions

(10, 6)
Source code in applybn/explainable/causal_analysis/concept_causal_effect.py
@staticmethod
def plot_tornado(
    effects_dict: dict,
    title: str = "Tornado Plot",
    figsize: tuple[int, int] = (10, 6),
):
    """Visualize causal effects using a tornado plot.

    Args:
        effects_dict: Dictionary of {concept: effect_size}
        title: Title for the plot
        figsize: Figure dimensions
    """
    # Sort effects by absolute value
    sorted_effects = sorted(
        effects_dict.items(), key=lambda x: abs(x[1]), reverse=True
    )

    # Prepare data for plotting
    concepts = [k for k, v in sorted_effects]
    values = [v for k, v in sorted_effects]
    colors = [
        "#4C72B0" if v > 0 else "#DD8452" for v in values
    ]  # Blue for positive, orange for negative

    # Create plot
    plt.figure(figsize=figsize)
    y_pos = np.arange(len(concepts))

    # Create horizontal bars
    bars = plt.barh(y_pos, values, color=colors)

    # Add reference line and styling
    plt.axvline(0, color="black", linewidth=0.8)
    plt.yticks(y_pos, concepts)
    plt.xlabel("Causal Effect Size")
    plt.title(title)
    plt.gca().invert_yaxis()  # Largest effect at top

    # Add value labels
    for bar, value in zip(bars, values):
        if value > 0:
            ha = "left"
            xpos = min(value + 0.01, max(values) * 0.95)
        else:
            ha = "right"
            xpos = max(value - 0.01, min(values) * 0.95)
        plt.text(
            xpos,
            bar.get_y() + bar.get_height() / 2,
            f"{value:.3f}",
            ha=ha,
            va="center",
            color="black",
        )

    plt.tight_layout()
    plt.show()

select_features_for_concept(concept_data, other_data, features, original_data, lambda_reg=0.1) staticmethod

Select features for a concept and extract value ranges or categories.

Parameters:

Name Type Description Default
concept_data DataFrame

Data points belonging to the concept.

required
other_data DataFrame

Remaining data points not in the concept.

required
features list

List of feature names in the preprocessed dataset.

required
original_data DataFrame

Original dataset (before one-hot encoding).

required
lambda_reg float

Regularization parameter to penalize variance or overlap.

0.1

Returns:

Type Description
dict

A dictionary mapping features to their type and range/categories.

Source code in applybn/explainable/causal_analysis/concept_causal_effect.py
@staticmethod
def select_features_for_concept(
    concept_data: pd.DataFrame,
    other_data: pd.DataFrame,
    features: list,
    original_data: pd.DataFrame,
    lambda_reg: float = 0.1,
) -> dict:
    """Select features for a concept and extract value ranges or categories.

    Args:
        concept_data: Data points belonging to the concept.
        other_data: Remaining data points not in the concept.
        features: List of feature names in the preprocessed dataset.
        original_data: Original dataset (before one-hot encoding).
        lambda_reg: Regularization parameter to penalize variance or overlap.

    Returns:
        A dictionary mapping features to their type and range/categories.
    """
    selected_features = {}
    for feature in features:
        X_i_feature = concept_data[feature]
        X_minus_i_feature = other_data[feature]
        # For numerical features, calculate the mean difference
        if (
            feature in original_data.columns
            and original_data[feature].dtype != object
        ):
            mean_diff = abs(X_i_feature.mean() - X_minus_i_feature.mean())
            var_within = X_i_feature.var()
            score = mean_diff - lambda_reg * var_within
            if score > 0:
                # Get value range from original data
                original_feature = feature
                indices = concept_data.index
                X_i_orig_feature = original_data.loc[indices, original_feature]
                value_range = (X_i_orig_feature.min(), X_i_orig_feature.max())
                selected_features[original_feature] = {
                    "type": "numeric",
                    "range": value_range,
                }
        else:
            # For one-hot encoded categorical features
            proportion_in_concept = X_i_feature.mean()
            proportion_in_others = X_minus_i_feature.mean()
            proportion_diff = proportion_in_concept - proportion_in_others
            score = abs(proportion_diff) - lambda_reg
            if score > 0:
                # Map back to original feature and category
                if "_" in feature:
                    original_feature = "_".join(feature.split("_")[:-1])
                    category = feature.split("_")[-1]
                    selected_features.setdefault(
                        original_feature, {"type": "categorical", "categories": []}
                    )
                    selected_features[original_feature]["categories"].append(
                        category
                    )
    return selected_features

Пример