What is modern Machine Learning that we need?

The scikit-learn for machine learning, and pytorch or tensorflow for deep learning serves as fundamental for Machine Learning Training. While the ONNX, TVM, and TensorRT makes deep learning models more deployable to applications. But they’re still not enough for modern machine learning engineering. Apart from them, we also need a tool similar to DevOps, a tool for automatically training, versioning, deploying, tracking, and retraining models. Hence, we have Metaflow and MLflow.

MetaFlow

Metaflow provides a set of APIs for machine learning workflow. Generally there’s two important parts:

  1. Flow API
  2. Client API

Before starts, install it by:

pip install metaflow

Flow API

A flow of Metaflow is inherited from FlowSpec and has each step being decorated by step decorator.

from metaflow import FlowSpec,step,IncludeFile,Parameter,conda_base,conda,retry

# Specify the conda env, python version, and libraries and specifies version 
@conda_base(python="3.10.5", libraries={"numpy":"1.15.4"})
class MyFlow(FlowSpec):
	# These are parameter of CLI -> Command Line Interface
	my_file = IncludeFile(name="name", help="File", default="file.csv")
	my_param = Parameter(name="param", help="Hi", default="Songlin")

	@conda(libraries={"pandas":"1.3.3"}) # Libraries and versions
	@step
	# Each Flow MUST have a **start**
	def start(self):
		import pandas as pd
		from io import StringIO
		self.df = pd.read_csv(StringIO(self.my_file)) # Use StringIO for load
		print(self.my_param)
		# Use self.next() to specify next step
		self.next(self.mid)

	@step
	def mid(self):
		# An iterator for multi-processing
		self.lst: list = [1, 2, 3] # Note must be self.lst but not lst
		# foreach API multiprocess the tasks, accept name of list, tuple, etc...
		self.next(self.times2, foreach="lst")
	
	@step
	@retry # Will retry when failed
	def times2(self):
		# Use the self.input to get param specified to THIS PROCESS
		self.num = self.input
		self.num = self.num * 2
		# JOIN Multi-Process
		self.next(self.join)

	@step
	def join(self, input):
		# Use parameter input to aggregate
		self.lst = [inp.num for inp in input]
		self.next(self.end)

	# Every flow must have an **end**
	@step
	def end(self):
		pass

# Make it runnable
if __name__ == "__main__":
	Myflow()

So it provides pipeline construction, multi-processing, environment management, and some CLI command helps for configurations.

After doing so, can use the CLI tools for

metaflow my.py show
metaflow my.py run

After running the pipeline, we might track the performance of each model. And everything in metaflow is versioned, so we could track different version of model and do comparisons by client API

Client API

This part of API usually runs in the Jupyter Notebook.

from metaflow import Flow, get_metadata
print("Current metadata provider: %s" % get_metadata()) # Check where's history

run = Flow('MovieStatsFlow').latest_successful_run # Get the last run
print("Using run: %s" % str(run)) # Get version of this run
lst: list = run.data.lst # Get the data
# This lst in run.data.lst is the self.lst in my.py

for run in Flow('PlayListFlow').runs(): # Iterate through runs
	if run.successful: # If run success
		print(run.data.lst)