Skip to content

getml.pipeline

Contains handlers for all steps involved in a data science project after data preparation:

  • Automated feature learning
  • Automated feature selection
  • Training and evaluation of machine learning (ML) algorithms
  • Deployment of the fitted models
Example

We assume that you have already set up your preprocessors (refer to preprocessors), your feature learners (refer to feature_learning) as well as your feature selectors and predictors (refer to predictors, which can be used for prediction and feature selection).

You might also want to refer to DataFrame, View, DataModel, Container, Placeholder and StarSchema.

If you want to create features for a time series problem, the easiest way to do so is to use the TimeSeries abstraction.

Note that this example is taken from the robot notebook .

# All rows before row 10500 will be used for training.
split = getml.data.split.time(data_all, "rowid", test=10500)

time_series = getml.data.TimeSeries(
    population=data_all,
    time_stamps="rowid",
    split=split,
    lagged_targets=False,
    memory=30,
)

pipe = getml.Pipeline(
    data_model=time_series.data_model,
    feature_learners=[...],
    predictors=...
)

pipe.check(time_series.train)

pipe.fit(time_series.train)

pipe.score(time_series.test)

# To generate predictions on new data,
# it is sufficient to use a Container.
# You don't have to recreate the entire
# TimeSeries, because the abstract data model
# is stored in the pipeline.
container = getml.data.Container(
    population=population_new,
)

# Add the data as a peripheral table, for the
# self-join.
container.add(population=population_new)

predictions = pipe.predict(container.full)
Example

If your data can be organized in a simple star schema, you can use StarSchema. StarSchema unifies Container and DataModel: Note that this example is taken from the loans notebook .

# First, we insert our data into a StarSchema.
# population_train and population_test are either
# DataFrames or Views. The population table
# defines the statistical population of your
# machine learning problem and contains the
# target variables.
star_schema = getml.data.StarSchema(
    train=population_train,
    test=population_test
)

# meta, order and trans are either
# DataFrames or Views.
# Because this is a star schema,
# all joins take place on the population
# table.
star_schema.join(
    trans,
    on="account_id",
    time_stamps=("date_loan", "date")
)

star_schema.join(
    order,
    on="account_id",
)

star_schema.join(
    meta,
    on="account_id",
)

# Now you can insert your data model,
# your preprocessors, feature learners,
# feature selectors and predictors
# into the pipeline.
# Note that the pipeline only knows
# the abstract data model, but hasn't
# seen the actual data yet.
pipe = getml.Pipeline(
    data_model=star_schema.data_model,
    preprocessors=[mapping],
    feature_learners=[fast_prop],
    feature_selectors=[feature_selector],
    predictors=predictor,
)

# Now, we pass the actual data.
# This passes 'population_train' and the
# peripheral tables (meta, order and trans)
# to the pipeline.
pipe.check(star_schema.train)

pipe.fit(star_schema.train)

pipe.score(star_schema.test)
Example

StarSchema is simpler, but cannot be used for more complex data models. The general approach is to use Container and DataModel:

# First, we insert our data into a Container.
# population_train and population_test are either
# DataFrames or Views.
container = getml.data.Container(
    train=population_train,
    test=population_test
)

# meta, order and trans are either
# DataFrames or Views. They are given
# aliases, so we can refer to them in the
# DataModel.
container.add(
    meta=meta,
    order=order,
    trans=trans
)

# Freezing makes the container immutable.
# This is not required, but often a good idea.
container.freeze()

# The abstract data model is constructed
# using the DataModel class. A data model
# does not contain any actual data. It just
# defines the abstract relational structure.
dm = getml.data.DataModel(
    population_train.to_placeholder("population")
)

dm.add(getml.data.to_placeholder(
    meta=meta,
    order=order,
    trans=trans)
)

dm.population.join(
    dm.trans,
    on="account_id",
    time_stamps=("date_loan", "date")
)

dm.population.join(
    dm.order,
    on="account_id",
)

dm.population.join(
    dm.meta,
    on="account_id",
)

# Now you can insert your data model,
# your preprocessors, feature learners,
# feature selectors and predictors
# into the pipeline.
# Note that the pipeline only knows
# the abstract data model, but hasn't
# seen the actual data yet.
pipe = getml.Pipeline(
    data_model=dm,
    preprocessors=[mapping],
    feature_learners=[fast_prop],
    feature_selectors=[feature_selector],
    predictors=predictor,
)

# This passes 'population_train' and the
# peripheral tables (meta, order and trans)
# to the pipeline.
pipe.check(container.train)

pipe.fit(container.train)

pipe.score(container.test)
Technically, you don't actually have to use a Container. You might as well do this (in fact, a Container is just syntactic sugar for this approach):

pipe.check(
    population_train,
    {"meta": meta, "order": order, "trans": trans},
)

pipe.fit(
    population_train,
    {"meta": meta, "order": order, "trans": trans},
)

pipe.score(
    population_test,
    {"meta": meta, "order": order, "trans": trans},
)
Or you could even do this. The order of the peripheral tables can be inferred from the __repr__() method of the pipeline, and it is usually in alphabetical order.

pipe.check(
    population_train,
    [meta, order, trans],
)

pipe.fit(
    population_train,
    [meta, order, trans],
)

pipe.score(
    population_test,
    [meta, order, trans],
)

delete

delete(name: str) -> None

If a pipeline named 'name' exists, it is deleted.

PARAMETER DESCRIPTION
name

Name of the pipeline.

TYPE: str

Source code in getml/pipeline/helpers2.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
def delete(name: str) -> None:
    """
    If a pipeline named 'name' exists, it is deleted.

    Args:
        name:
            Name of the pipeline.
    """

    if not isinstance(name, str):
        raise TypeError("'name' must be of type str")

    if exists(name):
        _make_dummy(name).delete()

exists

exists(name: str) -> bool

Returns true if a pipeline named 'name' exists.

PARAMETER DESCRIPTION
name

Name of the pipeline.

TYPE: str

RETURNS DESCRIPTION
bool

True if the pipeline exists, False otherwise.

Source code in getml/pipeline/helpers2.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def exists(name: str) -> bool:
    """
    Returns true if a pipeline named 'name' exists.

    Args:
        name (str):
            Name of the pipeline.

    Returns:
            True if the pipeline exists, False otherwise.
    """
    if not isinstance(name, str):
        raise TypeError("'name' must be of type str")

    all_pipelines = list_pipelines()

    return name in all_pipelines

list_pipelines

list_pipelines() -> List[str]

Lists all pipelines present in the Engine.

Note that this function only lists pipelines which are part of the current project. See set_project for changing projects and pipelines for more details about the lifecycles of the pipelines.

To subsequently load one of them, use load.

RETURNS DESCRIPTION
List[str]

List containing the names of all pipelines.

Source code in getml/pipeline/helpers2.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def list_pipelines() -> List[str]:
    """Lists all pipelines present in the Engine.

    Note that this function only lists pipelines which are part of the
    current project. See [`set_project`][getml.engine.set_project] for
    changing projects and [`pipelines`][getml.pipeline] for more details about
    the lifecycles of the pipelines.

    To subsequently load one of them, use
    [`load`][getml.pipeline.load].

    Returns:
        List containing the names of all pipelines.
    """

    cmd: Dict[str, Any] = {}

    cmd["type_"] = "list_pipelines"
    cmd["name_"] = ""

    with comm.send_and_get_socket(cmd) as sock:
        msg = comm.recv_string(sock)
        if msg != "Success!":
            comm.handle_engine_exception(msg)
        json_str = comm.recv_string(sock)

    return json.loads(json_str)["names"]

load

load(name: str) -> Pipeline

Loads a pipeline from the getML Engine into Python.

PARAMETER DESCRIPTION
name

The name of the pipeline to be loaded.

TYPE: str

RETURNS DESCRIPTION
Pipeline

Pipeline that is a handler for the pipeline signified by name.

Source code in getml/pipeline/helpers2.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
def load(name: str) -> Pipeline:
    """Loads a pipeline from the getML Engine into Python.

    Args:
        name: The name of the pipeline to be loaded.

    Returns:
        Pipeline that is a handler for the pipeline signified by name.
    """

    return _make_dummy(name).refresh()