"""Core end-to-end pipeline, foreshadow."""
import random
import warnings
from typing import List, NoReturn, Union
import numpy as np
import pandas as pd
from sklearn.exceptions import DataConversionWarning
from sklearn.pipeline import Pipeline
from foreshadow.base import BaseEstimator
from foreshadow.cachemanager import CacheManager
from foreshadow.concrete.internals.cleaners.customizable_base import (
CustomizableBaseCleaner,
)
from foreshadow.estimators.auto import AutoEstimator
from foreshadow.estimators.estimator_wrapper import EstimatorWrapper
from foreshadow.intents import IntentType
from foreshadow.logging import logging
from foreshadow.preparer import DataPreparer
from foreshadow.utils import (
AcceptedKey,
ConfigKey,
Override,
ProblemType,
check_df,
)
warnings.filterwarnings(action="ignore", category=DataConversionWarning)
[docs]class Foreshadow(BaseEstimator):
"""An end-to-end pipeline to preprocess and tune a machine learning model.
Example:
>>> shadow = Foreshadow(problem_type=ProblemType.CLASSIFICATION)
Args:
X_preparer \
(:obj:`Preprocessor <foreshadow.preprocessor.Preprocessor>`, \
optional): Preprocessor instance that will apply to X data. Passing
False prevents the automatic generation of an instance.
y_preparer \
(:obj:`Preprocessor <foreshadow.preprocessor.Preprocessor>`, \
optional): Preprocessor instance that will apply to y data. Passing
False prevents the automatic generation of an instance.
estimator (:obj:`sklearn.base.BaseEstimator`, optional): Estimator
instance to fit on processed data
optimizer (:class:`sklearn.grid_search.BaseSeachCV`, optional):
Optimizer class to optimize feature engineering and model
hyperparameters
"""
def __init__(
self,
problem_type,
random_state=None,
n_jobs=1,
estimator=None,
allowed_seconds=300,
auto_estimator_kwargs=None,
):
if problem_type not in [
ProblemType.CLASSIFICATION,
ProblemType.REGRESSION,
]:
raise ValueError(
"Unknown Problem Type {}. Please choose from {} "
"or {}".format(
problem_type,
ProblemType.CLASSIFICATION,
ProblemType.REGRESSION,
)
)
self.problem_type = problem_type
self.random_state = random_state
self.n_jobs = n_jobs
self._X_preparer = DataPreparer(cache_manager=CacheManager())
self.configure_multiprocessing(self.n_jobs)
self._y_preprarer = DataPreparer(
cache_manager=CacheManager(),
y_var=True,
problem_type=self.problem_type,
)
if estimator is not None and auto_estimator_kwargs is not None:
raise ValueError(
"estimator and estimator_kwargs are mutually exclusive"
)
self.allowed_seconds = allowed_seconds
self.auto_estimator_kwargs = auto_estimator_kwargs
self.estimator = estimator
self.pipeline = None
self.data_columns = None
self.has_fitted = False
if self.y_preparer is not None:
self.estimator_wrapper = EstimatorWrapper(
self.estimator, self.y_preparer
)
else:
self.estimator_wrapper = self.estimator
@property
def X_preparer(self): # noqa
"""Preprocessor object for performing feature engineering on X data.
:getter: Returns Preprocessor object
:setter: Verifies Preprocessor object, if None, creates a default
Preprocessor
:type: :obj:`Preprocessor <foreshadow.preprocessor.Preprocessor>`
Returns:
the X_preparer object
.. # noqa: I201
"""
return self._X_preparer
@property
def y_preparer(self): # noqa
"""Preprocessor object for performing scaling and encoding on Y data.
:getter: Returns Preprocessor object
:setter: Verifies Preprocessor object, if None, creates a default
Preprocessor
:type: :obj:`Preprocessor <foreshadow.preprocessor.Preprocessor>`
Returns:
the y_preparer object
.. # noqa: I201
"""
return self._y_preprarer
@property
def estimator(self): # noqa
"""Estimator object for fitting preprocessed data.
:getter: Returns Estimator object
:setter: Verifies Estimator object. If None, an
:obj:`AutoEstimator <foreshadow.estimators.AutoEstimator>`
object is created in place.
:type: :obj:`sklearn.base.BaseEstimator`
Returns:
the estimator object
.. # noqa: I201
"""
return self._estimator
@estimator.setter
def estimator(self, e):
if e is not None:
if isinstance(e, BaseEstimator):
self._estimator = e
else:
raise ValueError("Invalid value passed as estimator")
else:
include_preprocessors = (
False if self.X_preparer is not None else True
)
self._estimator = AutoEstimator(
self.problem_type,
auto="tpot",
include_preprocessors=include_preprocessors,
allowed_seconds=self.allowed_seconds,
random_state=self.random_state,
n_jobs=self.n_jobs,
estimator_kwargs=self.auto_estimator_kwargs,
)
def _reset(self):
if self.random_state is not None:
random.seed(self.random_state)
np.random.seed(self.random_state)
if hasattr(self, "pipeline"):
del self.pipeline
if hasattr(self, "tuner"):
del self.tuner
del self.opt_instance
[docs] def fit(self, data_df, y_df):
"""Fit the Foreshadow instance using the provided input data.
Args:
data_df (:obj:`DataFrame <pandas.DataFrame>`): The input feature(s)
y_df (:obj:`DataFrame <pandas.DataFrame>`): The response feature(s)
Returns:
:obj:`Foreshadow`: The fitted instance.
"""
self._reset()
X_df = check_df(data_df)
y_df = check_df(y_df)
self.data_columns = X_df.columns.values.tolist()
self.pipeline = Pipeline(
[
("X_preparer", self.X_preparer),
("estimator_wrapper", self.estimator_wrapper),
]
)
self.pipeline.fit(X_df, y_df)
self.has_fitted = True
return self
def _prepare_predict(self, pred_cols):
"""Validate prior to predicting.
Args:
pred_cols (:obj:`Index pandas.Index`): the predicted columns
Raises:
ValueError: Pipeline not fit yet
ValueError: Predict must have the same columns as train
"""
if self.pipeline is None:
raise ValueError("Foreshadow has not been fit yet")
elif pred_cols.values.tolist() != self.data_columns:
raise ValueError(
"Predict must have the same columns as train columns"
)
[docs] def predict(self, data_df):
"""Use the trained estimator to predict the response variable.
Args:
data_df (:obj:`DataFrame <pandas.DataFrame>`): The input feature(s)
Returns:
:obj:`DataFrame <pandas.DataFrame>`: The response feature(s) \
(transformed if necessary)
"""
data_df = check_df(data_df)
self._prepare_predict(data_df.columns)
return self.pipeline.predict(data_df)
[docs] def predict_proba(self, data_df):
"""Use the trained estimator to predict the response variable.
Uses the predicted confidences instead of binary predictions.
Args:
data_df (:obj:`DataFrame <pandas.DataFrame>`): The input feature(s)
Returns:
:obj:`DataFrame <pandas.DataFrame>`: The probability associated \
with each response feature
"""
data_df = check_df(data_df)
self._prepare_predict(data_df.columns)
return self.pipeline.predict_proba(data_df)
[docs] def score(self, data_df, y_df=None, sample_weight=None):
"""Use the trained estimator to compute the evaluation score.
The scoding method is defined by the selected estimator.
Args:
data_df (:obj:`DataFrame <pandas.DataFrame>`): The input feature(s)
y_df (:obj:`DataFrame <pandas.DataFrame>`, optional): The response
feature(s)
sample_weight (:obj:`numpy.ndarray`, optional): The weights to be
used when scoring each sample
Returns:
float: A computed prediction fitness score
"""
data_df = check_df(data_df)
y_df = check_df(y_df)
self._prepare_predict(data_df.columns)
return self.pipeline.score(data_df, y_df, sample_weight)
[docs] def get_params(self, deep=True):
"""Get params for this object. See super.
Args:
deep: True to recursively call get_params, False to not.
Returns:
params for this object.
"""
params = super().get_params(deep=deep)
params["data_columns"] = self.data_columns
return params
[docs] def set_params(self, **params):
"""Set params for this object. See super.
Args:
**params: params to set.
Returns:
See super.
"""
self.data_columns = params.pop("data_columns", None)
return super().set_params(**params)
[docs] def get_intent(self, column_name: str) -> Union[str, None]:
"""Retrieve the intent of a column.
Args:
column_name: the column name
Returns:
str: the intent of the column
"""
# Note: this retrieves intent from cache_manager. Only columns have
# been processed will be visible.
cache_manager = self.X_preparer.cache_manager
if self._has_column_in_cache_manager(column_name):
return cache_manager[AcceptedKey.INTENT][column_name]
else:
logging.info(
"No intent exists for column {}. Either the column "
"doesn't exist or foreshadow object has not "
"been fitted yet.".format(column_name)
)
return None
[docs] def list_intent(self, column_names: List[str]) -> List[str]:
"""Retrieve the intent of a list of columns.
Args:
column_names: a list of columns
Returns:
The list of intents
"""
return [self.get_intent(column) for column in column_names]
def _has_column_in_cache_manager(self, column: str) -> Union[bool, None]:
"""Check if the column exists in the cache manager.
If the foreshadow object has not been trained, it will return None.
Args:
column: the column name
Returns:
Whether a column exists in the cache manager
"""
if not self.has_fitted:
logging.info(
"The foreshadow object is not trained yet. Please make sure "
"the column {} exist to ensure the override takes "
"effect.".format(column)
)
return False
cache_manager = self.X_preparer.cache_manager
return True if column in cache_manager[AcceptedKey.INTENT] else False
[docs] def override_intent(self, column_name: str, intent: str) -> NoReturn:
"""Override the intent of a particular column.
Args:
column_name: the column to override
intent: the user supplied intent
Raises:
ValueError: Invalid column to override.
"""
if not IntentType.is_valid(intent):
raise ValueError(
"Invalid intent type {}. "
"Supported intent types are {}.".format(
intent, IntentType.list_intents()
)
)
if (
not self._has_column_in_cache_manager(column_name)
and self.has_fitted
):
raise ValueError("Invalid Column {}".format(column_name))
# Update the intent
self.X_preparer.cache_manager[AcceptedKey.OVERRIDE][
"_".join([Override.INTENT, column_name])
] = intent
self.X_preparer.cache_manager[AcceptedKey.INTENT][column_name] = intent
[docs] def set_processed_data_export_path(
self, data_path: str, is_train: bool
) -> NoReturn:
"""Set path to export data before feeding the data to the estimator.
Args:
data_path: the data path string
is_train: whether this is for training data
"""
key = (
ConfigKey.PROCESSED_TRAINING_DATA_EXPORT_PATH
if is_train
else ConfigKey.PROCESSED_TEST_DATA_EXPORT_PATH
)
self.X_preparer.cache_manager[AcceptedKey.CONFIG][key] = data_path
[docs] def pickle_fitted_pipeline(self, path: str) -> NoReturn:
"""Pickle the foreshadow object with the best pipeline estimator.
Args:
path: the pickle file path
Raises:
ValueError: pipeline not fitted.
"""
if not self.has_fitted:
logging.error("No pipeline has been fitted yet.")
raise ValueError("The pipeline has not been fitted yet.")
import pickle
if (
isinstance(self.estimator, AutoEstimator)
and self.estimator.estimator.fitted_pipeline_ is not None
):
self.estimator = self.estimator.estimator.fitted_pipeline_
# updating the estimator above will not update the reference in
# the pipeline instance as it still points to the old object.
# Accessing the estimator wrapper object and update the
# estimator field.
self.pipeline.steps[1][1].estimator = self.estimator
with open(path, "wb") as fopen:
pickle.dump(self.pipeline, fopen)
[docs] def register_customized_data_cleaner(
self, data_cleaners: List
) -> NoReturn:
"""**EXPERIMENTAL** Allow user to register a customized data cleaner.
Args:
data_cleaners: customized data cleaners
Raises:
ValueError: data cleaner must be a child class of the base cleaner.
"""
for cleaner in data_cleaners:
if not issubclass(cleaner, CustomizableBaseCleaner):
raise ValueError(
"cleaner {} must be a child class of the {} class.".format(
str(cleaner), CustomizableBaseCleaner.__name__
)
)
self.X_preparer.cache_manager[AcceptedKey.CUSTOMIZED_TRANSFORMERS][
ConfigKey.CUSTOMIZED_CLEANERS
] = data_cleaners
[docs] def get_data_summary(self) -> pd.DataFrame:
"""Get summary statistics and identified intent for training dataset.
Returns:
a DataFrame, in which each column represent the summary of a column
"""
if not self.has_fitted:
logging.info("The foreshadow object is not trained yet.")
return None
X_summary = self.X_preparer.cache_manager[AcceptedKey.SUMMARY]
y_summary = self.y_preparer.cache_manager[AcceptedKey.SUMMARY]
X_summary[y_summary.columns[0]] = y_summary
return X_summary