What is modern Machine Learning that we need?
The scikit-learn for machine learning, and pytorch or tensorflow for deep learning serves as fundamental for Machine Learning Training. While the ONNX, TVM, and TensorRT makes deep learning models more deployable to applications. But they’re still not enough for modern machine learning engineering. Apart from them, we also need a tool similar to DevOps, a tool for automatically training, versioning, deploying, tracking, and retraining models. Hence, we have Metaflow and MLflow.
Metaflow provides a set of APIs for machine learning workflow. Generally there’s two important parts:
Before starts, install it by:
pip install metaflow
A flow of Metaflow is inherited from FlowSpec
and has each step being decorated by step
decorator.
from metaflow import FlowSpec,step,IncludeFile,Parameter,conda_base,conda,retry
# Specify the conda env, python version, and libraries and specifies version
@conda_base(python="3.10.5", libraries={"numpy":"1.15.4"})
class MyFlow(FlowSpec):
# These are parameter of CLI -> Command Line Interface
my_file = IncludeFile(name="name", help="File", default="file.csv")
my_param = Parameter(name="param", help="Hi", default="Songlin")
@conda(libraries={"pandas":"1.3.3"}) # Libraries and versions
@step
# Each Flow MUST have a **start**
def start(self):
import pandas as pd
from io import StringIO
self.df = pd.read_csv(StringIO(self.my_file)) # Use StringIO for load
print(self.my_param)
# Use self.next() to specify next step
self.next(self.mid)
@step
def mid(self):
# An iterator for multi-processing
self.lst: list = [1, 2, 3] # Note must be self.lst but not lst
# foreach API multiprocess the tasks, accept name of list, tuple, etc...
self.next(self.times2, foreach="lst")
@step
@retry # Will retry when failed
def times2(self):
# Use the self.input to get param specified to THIS PROCESS
self.num = self.input
self.num = self.num * 2
# JOIN Multi-Process
self.next(self.join)
@step
def join(self, input):
# Use parameter input to aggregate
self.lst = [inp.num for inp in input]
self.next(self.end)
# Every flow must have an **end**
@step
def end(self):
pass
# Make it runnable
if __name__ == "__main__":
Myflow()
So it provides pipeline construction, multi-processing, environment management, and some CLI command helps for configurations.
After doing so, can use the CLI tools for
metaflow my.py show
metaflow my.py run
After running the pipeline, we might track the performance of each model. And everything in metaflow is versioned, so we could track different version of model and do comparisons by client API
This part of API usually runs in the Jupyter Notebook.
from metaflow import Flow, get_metadata
print("Current metadata provider: %s" % get_metadata()) # Check where's history
run = Flow('MovieStatsFlow').latest_successful_run # Get the last run
print("Using run: %s" % str(run)) # Get version of this run
lst: list = run.data.lst # Get the data
# This lst in run.data.lst is the self.lst in my.py
for run in Flow('PlayListFlow').runs(): # Iterate through runs
if run.successful: # If run success
print(run.data.lst)