FastAPI
Provide generic prediction endpoint via FastAPI
A common way to communicate with resources is via REST-APIs. In Python, FastAPI is a well known web framework package to build web-APIs.
The following shows an example how easily, pipelines in a project can be made accessible via endpoints in FastAPI.
It is assumed that you have some basic knowledge of FastAPI and the getML framework.
Helpful resources to get started:
FastAPI get started
getML example notebooks
getML user guide
This integration example requires at least v1.4.0 of the getml package and at least Python 3.8.
Example Data
As an example project we first run the demo notebook "Loan default prediction" which creates a project named "loans" in the getML Engine.
Code Explained
First, import the necessary packages and create a FastAPI-App app
. If the Engine isn't running yet
(getml.engine.is_engine_alive()
) launch it
(getml.engine.launch()
). The launch_browser=False
option prevents the browser to be opened
when the Engine spins up. Further, direct the Engine to load and set the previously created
project
"loans". (getml.engine.set_project()
)
from typing import Dict, List, Optional, Union
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from uvicorn import run
from getml import engine, pipeline, Pipeline, DataFrame
app: FastAPI = FastAPI()
if not engine.is_alive():
engine.launch(launch_browser=False)
engine.set_project("loans")
Create the first GET endpoint which returns a list with all
pipeline
s present (list_pipelines()
) in the
project. The list itself will only contain the names of the pipelines and no
additional metainformation. For sake of simplicity of the tutorial pagination is
left out.
@app.get("/pipeline")
async def get_pipeline() -> List[str]:
return pipeline.list_pipelines()
The following is required to start the app with uvicorn. Run your Python code and test the endpoint via localhost:8080/pipeline.
if __name__ == "__main__":
run(app, host="localhost", port=8080)
To expand the functionality, add another informative GET endpoint for a single
pipeline. The pipeline_id
can be retrieved from the previously
created GET endpoint. The existence of the pipeline can be checked using
exists()
. After validating its existence, the Engine must be
directed to load the pipeline identified with the provided
pipeline_id
. Information of interest could be the
name of the population data frame and peripheral data frames, the applied
preprocessors, used feature learners and selectors and target predictors. Those
information can be retrieved from the member variable metadata
of the
pipeline (pipeline_.metadata
) and the pipeline itself.
Again this endpoint can be tested by running your code and invoking the endpoint
localhost:8080/pipeline/a1b2c3 assuming that
the previously created pipeline has the id a1b2c3
.
@app.get("/pipeline/{pipeline_id}")
async def get_pipeline_pipeline_id(pipeline_id: str) -> Dict[str, Union[str, List[str]]]:
if not pipeline.exists(pipeline_id):
raise HTTPException(status_code=404, detail=f'Pipeline {pipeline_id} not found.')
pipeline_ = pipeline.load(pipeline_id)
if pipeline_.metadata is None:
raise HTTPException(status_code=409,
detail='The data schema is missing or pipeline is incomplete')
meta_data = pipeline_.metadata
metadata: Dict[str, Union[str, List[str]]] = {}
metadata["data_model"] = meta_data.population.name
metadata["peripheral"] = [_.name for _ in meta_data.peripheral]
metadata["preprocessors"] = [_.type for _ in pipeline_.preprocessors]
metadata["feature_learners"] = [_.type for _ in pipeline_.feature_learners]
metadata["feature_selectors"] = [_.type for _ in pipeline_.feature_selectors]
metadata["predictors"] = [_.type for _ in pipeline_.predictors]
return metadata
To create the prediction endpoint the data scheme for the request body needs to
be created first. For a prediction the getML Engine requires multiple data sets,
the population data set population
and any related peripheral data set
peripheral
based on the Data model of the pipeline. The
peripheral data sets can be either a list or a dictionary where the order of the
data sets in the list needs to match the order returned by
[_.name for _ in getml.pipeline.metadata.peripheral]
. This information
can also be retrieved by calling the previously created GET endpoint.
class PredictionBody(BaseModel):
peripheral: Union[List[Dict[str, List]], Dict[str, Dict[str, List]]]
population: Dict[str, List]
Next up, implement the POST endpoint which accepts data to task the Engine to
make a prediction. Validate that the pipeline exist, load the pipeline
(load()
), and validate that the pipeline has been
finalized.
@app.post("/pipeline/{pipeline_id}/predict")
async def post_project_predict(pipeline_id: str, body: PredictionBody) -> Optional[List]:
if not pipeline.exists(pipeline_id):
raise HTTPException(status_code=404,
detail=f'Pipeline {pipeline_id} not found.')
pipeline_: Pipeline = pipeline.load(pipeline_id)
if pipeline_.metadata is None:
raise HTTPException(status_code=409,
detail='The data schema is missing or pipeline is incomplete')
The request body should contain both the population and peripheral data. Check
that the population in the request body contains any content. Create a
data frame from the dictionary (from_dict()
): the name of
the data frame must not collide with an existing data frame in the pipeline, the
roles of the population can be obtained from the pipeline, using
pipeline_.metadata.population.roles
.
if not body.population:
raise HTTPException(status_code=400, detail='Missing population data.')
population_data_frame = DataFrame.from_dict(name='future',
roles=pipeline_.metadata.population.roles,
data=body.population)
The peripheral can be submitted in the request body both as list and dictionary.
Check that in case the peripheral data sets are received as dictionaries that
the names of all required peripheral data sets exist in the dictionary keys, and
in case the peripheral data sets are received as a list, check that the length of
the list matches the number of peripheral data sets in the pipeline. After,
create a list of data frames of the peripheral data. Again, ensure that the
names of the created data frames do not collide with existing data frames and
use the roles defined in the pipeline for the peripheral data sets
(pipeline_.metadata.peripheral[i].roles
).
peripheral_names = [_.name for _ in pipeline_.peripheral]
if isinstance(body.peripheral, dict):
if set(peripheral_names) - set(body.peripheral.keys()):
raise HTTPException(
status_code=400,
detail=f'Missing peripheral data, expected {peripheral_names}')
periperal_raw_data = body.peripheral
else:
if len(peripheral_names) != len(body.peripheral):
raise HTTPException(
status_code=400,
detail=f"Expected {len(pipeline_.peripheral)} peripheral data frames.")
periperal_raw_data = dict(zip(peripheral_names, body.peripheral))
peripheral_data_frames = [
DataFrame.from_dict(name=name + '_predict',
data=periperal_raw_data[name],
roles=pipeline_.metadata.peripheral[i].roles)
for i, name in enumerate(peripheral_names)
]
This leaves the actual call to the Engine to make a prediction
(predict()
) using the previously created population data
frame and peripheral data frames. The predicted target value is a numpy array
and returned transformed to a list as request response.
prediction = pipeline_.predict(
population_table=population_data_frame,
peripheral_tables=peripheral_data_frames
)
if prediction:
return prediction.tolist()
raise HTTPException(status_code=500, detail='getML Engine didn\'t return a result.')
This endpoint can be called on localhost:8080/pipeline/a1b2c3/predict, where the body needs the form:
{
"peripheral": [{
"column_1": [2.4, 3.0, 1.2, 1.4, 2.2],
"column_2": ["a", "b", "a", "b", "b"]
}],
"population": {
"column_1": [0.2, 0.1],
"column_2": ["a", "b"],
"time_stamp": ["2010-01-01 12:30:00", "2010-01-01 23:30:00"]
}
}
Example json data can be extracted from the notebook using the following code snippet at the end of the notebook used to create the Example Data.
from typing import Union, Any
from datetime import datetime
from json import dumps
def handle_timestamp(x: Union[Any, datetime]):
if isinstance(x, datetime):
return x.strftime(r'%Y-%m-%d %H:%M:%S')
pd_population_test = population_test.to_pandas()
account_id = pd_population_test.iloc[0]["account_id"]
populaton_dict = pd_population_test[pd_population_test["account_id"] == account_id].to_dict()
populaton_json = dumps({k: list(v.values()) for k, v in populaton_dict.items()}, default=handle_timestamp)
pd_peripherals = {_.name: _.to_pandas() for _ in [order, trans, meta]}
peripheral_dict = {k: v[v["account_id"] == account_id].to_dict() for k, v in pd_peripherals.items()}
peripheral_json = dumps(
{k: {vk: list(vv.values()) for vk, vv in v.items()} for k, v in peripheral_dict.items()},
default=handle_timestamp)
populaton_json
peripheral_json
Conclusion
With only a few lines it is possible to create a web API to make project pipelines accessible and request target predictions for provided population and peripheral data.