Source code for chariots.ml._base_ml_op

"""machine learning abstract Ops"""
import io
import json
import time
from abc import abstractmethod
from typing import Optional, List, Any
from zipfile import ZipFile

from .. import versioning
from ..pipelines import callbacks, ops
from . import serializers, MLMode


[docs]class BaseMLOp(ops.LoadableOp): """ an BaseMLOp are ops designed specifically to be machine learning models (whether for training or inference). You can initialize the op in three distinctive :doc:`ml mode <../ml/ml>`: - `FIT` for training the model - `PREDICT` to perform inference - `FIT_PREDICT` to do both (train and predict on the same dataset the usual workflow is to a have a training and a prediction pipeline. and to: - execute the training pipeline: - save the training pipeline - reload the prediction pipeline - use the prediction pipeline here is an example: .. testsetup:: >>> from chariots.pipelines import Pipeline >>> from chariots.pipelines.nodes import Node >>> from chariots._helpers.doc_utils import IrisFullDataSet, PCAOp, LogisticOp >>> from chariots._helpers.test_helpers import FromArray first create your pipelines: .. doctest:: >>> train = Pipeline([ ... Node(IrisFullDataSet(), output_nodes=["x", "y"]), ... Node(PCAOp(MLMode.FIT_PREDICT), input_nodes=["x"], output_nodes="x_transformed"), ... Node(LogisticOp(MLMode.FIT), input_nodes=["x_transformed", "y"]) ... ], 'train') >>> pred = Pipeline([ ... Node(PCAOp(MLMode.PREDICT), input_nodes=["__pipeline_input__"], output_nodes="x_transformed"), ... Node(LogisticOp(MLMode.PREDICT), input_nodes=["x_transformed"], output_nodes=['pred']), ... Node(FromArray(), input_nodes=['pred'], output_nodes='__pipeline_output__') ... ], 'pred') .. testsetup:: >>> import tempfile >>> import shutil >>> from chariots.pipelines import Pipeline, PipelinesServer >>> from chariots.testing import TestOpStoreClient, TestPipelinesClient >>> app_path = tempfile.mkdtemp() >>> op_store_client = TestOpStoreClient(app_path) >>> op_store_client.server.db.create_all() >>> app = PipelinesServer([train, pred], op_store_client=op_store_client, import_name="app") >>> client = TestPipelinesClient(app) and then to train your pipelines and make some predictions: .. doctest:: >>> response = client.call_pipeline(train) >>> client.save_pipeline(train) >>> client.load_pipeline(pred) >>> response = client.call_pipeline(pred, [[1, 2, 3, 4]]) >>> response.value [1] If you want to create a new MLOp class (to accommodate an unsupported framework for instance), you need to define: - how to fit your op with the `fit` method - how to perform inference with your op with the `predict` method - define how to initialize a new model with the `_init_model` method and eventually you can change the `serializer_cls` class attribute to change the serialization format of your model :param op_callbacks: :doc:`OpCallbacks objects<../pipelines/callbacks>` to change the behavior of the op by executing some action before or after the op's execution """ training_update_version = versioning.VersionType.PATCH serializer_cls = serializers.DillSerializer
[docs] def __init__(self, mode: MLMode, op_callbacks: Optional[List[callbacks.OpCallBack]] = None): """ :param mode: the mode to use when instantiating the op """ super().__init__(op_callbacks) self._call_mode = mode self.serializer = self.serializer_cls() self._model = self._init_model() self._last_training_time = 0
@property def allow_version_change(self): # we only want to check the version on prediction return self.mode != MLMode.PREDICT @property def mode(self) -> MLMode: """the mode this op was instantiated with""" return self._call_mode
[docs] def execute(self, *args, **kwargs): """ executes the model action that is required (train, test or both depending in what the op was initialized with """ if self.mode == MLMode.FIT: self._fit(*args, **kwargs) return None if self.mode == MLMode.PREDICT: return self.predict(*args, **kwargs) if self.mode == MLMode.FIT_PREDICT: self._fit(*args, **kwargs) return self.predict(*args, **kwargs) raise ValueError('unknown mode for {}: {}'.format(type(self), self.mode))
def _fit(self, *args, **kwargs): """ method that wraps fit and performs necessary actions (update version) """ self.fit(*args, **kwargs) self._last_training_time = time.time()
[docs] @abstractmethod def fit(self, *args, **kwargs): """ fits the inner model of the op on data (in args and kwargs) this method must not return any data (use the FIT_PREDICT mode to predict on the same data the op was trained on) """
[docs] @abstractmethod def predict(self, *args, **kwargs) -> Any: """ the method used to do predictions/inference once the model has been fitted/loaded """
@abstractmethod def _init_model(self): """ method used to create a new (blank) model (used at initialisation) """ @property def op_version(self): time_version = versioning.Version().update(self.training_update_version, str(self._last_training_time).encode('utf-8')) return super().op_version + time_version
[docs] def load(self, serialized_object: bytes): io_file = io.BytesIO(serialized_object) with ZipFile(io_file, 'r') as zip_file: self._model = self.serializer .deserialize_object(zip_file.read('model')) meta = json.loads(zip_file.read('_meta.json').decode('utf-8')) self._last_training_time = meta['train_time']
[docs] def serialize(self) -> bytes: io_file = io.BytesIO() with ZipFile(io_file, 'w') as zip_file: zip_file.writestr('model', self.serializer.serialize_object(self._model)) zip_file.writestr('_meta.json', json.dumps({'train_time': self._last_training_time})) return io_file.getvalue()