Replies: 17 comments 10 replies
-
|
Some folks on my team have been doing this for a while and it's pretty ergonomic. A basic implementation is trivial, just from kedro.pipeline import node
def make_node(**kwargs):
return lambda func: node(func=func, **kwargs) |
Beta Was this translation helpful? Give feedback.
-
|
Copying here my comments from #2408. As noted already, there's a problem here if you want to reuse the same function in multiple nodes and when you start using modular pipelines. These are not insurmountable with the decorator approach but it does get cumbersome. That said, I actually think there's potentially a lot of value in the decorator method for simple projects. Just like auto-registration of pipelines, anything we can do to make it easier for people to make a simple project is good in my book. Certainly just slapping a In fact, a while ago there was a QB tool that did something similar and took the simplification even further:
I actually think there's a lot to be said for at least some aspects of this approach, although it won't ever replace the current system. |
Beta Was this translation helpful? Give feedback.
-
This is a reasonable default, but I suspect this is less scalable as you are likely to use a generic variable name like
this is my favourite part, It's sometimes annoying to search over 10 pipelines.py.
|
Beta Was this translation helpful? Give feedback.
-
|
Definitely agree with @antonymilne , this functionality definitely would not replace the existing scheme, just act as sugar for what I suspect is a decent chunk of use cases. The possibility to automatically infer names and arguments would be slick. |
Beta Was this translation helpful? Give feedback.
-
|
Just coming back to it again, it may be quite useful for learning kedro smoothing. I have been teaching some people about Kedro lately and sometimes they get stuck about how to write a node , then I will ask "forget about Kedro, how would you write it in normal python script?". Suddenly they can write the code properly and then just map it to Kedro. I suspect this happen more for beginner when they trying to jump too fast and think about node. @astrojuanlu Have you ever encountered this, is it worth to check with other people who teaches Kedro? |
Beta Was this translation helpful? Give feedback.
-
Originally posted by @mle-els in #2726 (comment) |
Beta Was this translation helpful? Give feedback.
-
|
I think it's safe to say that the current approach will not go away for the time being, so the question would be whether to maintain this as an alternative, recommended approach. And from that perspective, it looks like a lot of documentation and maintainability effort, plus the potential confusion of users having to pick between the two approaches, all of this for little gain. There have been some very valid concerns about this approach, most importantly that it blends functions with nodes and that hampers reusability of said functions by coupling them to Kedro. I'm closing this issue, if you disagree and think this deserves further discussion feel free to leave a comment. |
Beta Was this translation helpful? Give feedback.
-
|
I don't think this approach blends functions with nodes, if a function is not decorated, it's a simple function, it can be used with whatever inputs and outputs. I think this actually helps create separation of concern between function and nodes. Let me try to illustrate this: # This is a regular, reusable function - can be used anywhere, any inputs/outputs
def reusable_fn(x, y):
return x + y
# This is explicitly a pipeline node
@node(inputs=["a", "b"], outputs="sum")
def pipeline_step(a, b): return reusable_fn(a, b)I agree that at that point, creating a function that returns another decorated function is similarly convoluted to what it would simply be replacing: # Without decorator
node(
func=pipeline_step,
inputs=["a", "b"],
outputs="sum"
)but it streamlines the cases where a function serves a single node, hence can be defined as standalone. I only suggested this because I've seen as a syntax in ZenML, AirFlow tasks, etc but I understand the concern about having two separate ways of doing things. If the decorator replaced the the As @noklam said in Slack (https://kedro-org.slack.com/archives/C03RKP2LW64/p1737118683984249):
|
Beta Was this translation helpful? Give feedback.
-
|
Decided to take a stab at what it looks like in a "real" (but very simple) pipeline. I used https://github.com/catherinenelson1/from_notebooks_to_scalable/blob/main/penguins_refactored.py as the example (repo corresponding to Catherine's talk at PyCon US on Going from Notebooks to Scalable Systems), because I think that there's a case to be made for Kedro being the logical progression in the learning curve. Complete implementationimport joblib
import pandas as pd
from kedro.io import DataCatalog
from kedro.pipeline import Pipeline, Node
from kedro.runner import SequentialRunner
from kedro_datasets.pandas import CSVDataset
from kedro_datasets.pickle import PickleDataset
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder
_SENTINEL = object()
def node(
func=None,
inputs=_SENTINEL,
outputs=_SENTINEL,
*,
name=None,
tags=None,
confirms=None,
namespace=None
) -> Node:
if func is None:
assert inputs is not _SENTINEL, "Inputs must be provided when using the decorator syntax."
assert outputs is not _SENTINEL, "Outputs must be provided when using the decorator syntax."
def wrapper(func):
return Node(
func,
inputs,
outputs,
name=name,
tags=tags,
confirms=confirms,
namespace=namespace,
)
return wrapper
else:
assert func is not None, "Function must be provided when using the function syntax."
assert inputs is not _SENTINEL, "Inputs must be provided when using the function syntax."
assert outputs is not _SENTINEL, "Outputs must be provided when using the function syntax."
return Node(
func,
inputs,
outputs,
name=name,
tags=tags,
confirms=confirms,
namespace=namespace,
)
catalog = DataCatalog(
{
"penguins": CSVDataset(
filepath="https://raw.githubusercontent.com/mwaskom/seaborn-data/refs/heads/master/penguins.csv"
),
"encoder": PickleDataset(
filepath="penguins_label_encoder.joblib",
backend="joblib",
),
"model": PickleDataset(
filepath="penguins_model.joblib",
backend="joblib",
),
}
)
@node(inputs="penguins", outputs=["feature", "labels"])
def clean_data(penguins):
# drop rows with missing values
# return numpy arrays for features and labels
df = penguins.dropna(subset=['species', 'bill_length_mm', 'bill_depth_mm',
'flipper_length_mm', 'body_mass_g'])
features = df[['bill_length_mm', 'bill_depth_mm',
'flipper_length_mm', 'body_mass_g']].to_numpy()
labels = df['species'].to_numpy()
return features, labels
@node(inputs=["feature", "labels"], outputs=["encoder", "X_train", "X_test", "y_train", "y_test"])
def preprocess_data(features, labels):
# encode categorical variables
# scale numerical features
# split the data into train/test features and labels
scaler = StandardScaler()
features_scaled = scaler.fit_transform(features)
encoder = LabelEncoder()
labels_encoded = encoder.fit_transform(labels)
X_train, X_test, y_train, y_test = train_test_split(features_scaled, labels_encoded, test_size=0.2, random_state=42)
return encoder, X_train, X_test, y_train, y_test
@node(inputs=["X_train", "y_train", "X_test", "y_test"], outputs="model")
def train_model(X_train, y_train, X_test, y_test):
# train a model and save it
clf = LogisticRegression()
clf.fit(X_train, y_train)
# print the model's accuracy
accuracy = clf.score(X_test, y_test)
print(f'Model accuracy: {accuracy:.2f}')
return clf
@node(inputs=["X_new", "model", "encoder"], outputs="predicted_class")
def predict_new_data(X_new, clf, encoder):
# load the model and make predictions
# return a string of the predicted class
prediction = clf.predict(X_new)
# decode the prediction
predicted_class = encoder.inverse_transform(prediction)
return predicted_class[0]
training_pipeline = Pipeline([clean_data, preprocess_data, train_model])
if __name__ == "__main__":
runner = SequentialRunner()
output_datasets = runner.run(pipeline=training_pipeline, catalog=catalog)I think it works quite well (at least, as @antonymilne said long ago, for the simple projects without function reuse).
I didn't do this here (and I did use a different name, i.e. From an implementation perspective, if wanted to do something like this, it seems fairly straightforward to retain the existing Putting my new user hat on, the part that still feels like a jump is the fact that these "disconnected" nodes turned into a pipeline. This would still be much more explicit: @node
def clean_data(penguins):
...
@node
def preprocess_data(features, labels):
...
@node
def train_model(X_train, y_train, X_test, y_test):
...
@pipeline
def training_pipeline(penguins):
features, labels = clean_data(penguins)
encoder, X_train, X_test, y_train, y_test = preprocess_data(features, labels)
model = train_model(X_train, y_train, X_test, y_test)
return model # Not sure this is necessaryMaybe somebody knows or can look it up—isn't this more like what the QB-internal Brix post @antonymilne was referring to looked like, too? I can't remember. Under the hood, here Example AST parsing>>> import ast
>>> tree = ast.parse("""\
... def training_pipeline(penguins):
... features, labels = clean_data(penguins)
... encoder, X_train, X_test, y_train, y_test = preprocess_data(features, labels)
... model = train_model(X_train, y_train, X_test, y_test)
... return model # Not sure this is necessary
... """)
>>> print(ast.dump(tree, indent=4))
Module(
body=[
FunctionDef(
name='training_pipeline',
args=arguments(
args=[
arg(arg='penguins')]),
body=[
Assign(
targets=[
Tuple(
elts=[
Name(id='features', ctx=Store()),
Name(id='labels', ctx=Store())],
ctx=Store())],
value=Call(
func=Name(id='clean_data', ctx=Load()),
args=[
Name(id='penguins', ctx=Load())])),
Assign(
targets=[
Tuple(
elts=[
Name(id='encoder', ctx=Store()),
Name(id='X_train', ctx=Store()),
Name(id='X_test', ctx=Store()),
Name(id='y_train', ctx=Store()),
Name(id='y_test', ctx=Store())],
ctx=Store())],
value=Call(
func=Name(id='preprocess_data', ctx=Load()),
args=[
Name(id='features', ctx=Load()),
Name(id='labels', ctx=Load())])),
Assign(
targets=[
Name(id='model', ctx=Store())],
value=Call(
func=Name(id='train_model', ctx=Load()),
args=[
Name(id='X_train', ctx=Load()),
Name(id='y_train', ctx=Load()),
Name(id='X_test', ctx=Load()),
Name(id='y_test', ctx=Load())])),
Return(
value=Name(id='model', ctx=Load()))])])Because of the late binding, we also enable easy reuse (the I'm happy to flesh this second approach out into a full PoC, but would love to get some initial reactions, first! Update: Here's a bit hacky (but functional) PoC with room for improvement: End-to-end with AST parsingimport ast
import inspect
from functools import partial
import joblib
import pandas as pd
from kedro.io import DataCatalog
from kedro.pipeline import Pipeline, Node
from kedro.runner import SequentialRunner
from kedro_datasets.pandas import CSVDataset
from kedro_datasets.pickle import PickleDataset
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder
_SENTINEL = object()
# FIXME(deepyaman): Populating a node registry is ugly and likely unnecessary.
# A better approach would be to rewrite the AST of the pipeline function to produce a
# `Pipeline` object directly.
_NODE_REGISTRY = {}
def node(
func=None,
inputs=_SENTINEL,
outputs=_SENTINEL,
*,
name=None,
tags=None,
confirms=None,
namespace=None
) -> Node:
if inputs is _SENTINEL and outputs is _SENTINEL:
unbound_node = partial(node, func=func)
_NODE_REGISTRY[func.__name__] = unbound_node
return unbound_node
else:
assert func is not None, "Function must be provided when using the function syntax."
assert inputs is not _SENTINEL, "Inputs must be provided when using the function syntax."
assert outputs is not _SENTINEL, "Outputs must be provided when using the function syntax."
return Node(
func,
inputs,
outputs,
name=name,
tags=tags,
confirms=confirms,
namespace=namespace,
)
class NodeVisitor(ast.NodeVisitor):
def __init__(self):
self.nodes = []
def visit_Assign(self, node):
unbound_node = _NODE_REGISTRY[node.value.func.id]
inputs = [arg.id for arg in node.value.args]
outputs = (
[elt.id for elt in node.targets[0].elts]
if hasattr(node.targets[0], "elts")
else node.targets[0].id
)
self.nodes.append(unbound_node(inputs=inputs, outputs=outputs))
self.generic_visit(node)
def pipeline(pipe):
source = inspect.getsource(pipe)
tree = ast.parse(source)
visitor = NodeVisitor()
visitor.visit(tree)
return Pipeline(visitor.nodes)
catalog = DataCatalog(
{
"penguins": CSVDataset(
filepath="https://raw.githubusercontent.com/mwaskom/seaborn-data/refs/heads/master/penguins.csv"
),
"encoder": PickleDataset(
filepath="penguins_label_encoder.joblib",
backend="joblib",
),
"model": PickleDataset(
filepath="penguins_model.joblib",
backend="joblib",
),
}
)
@node
def clean_data(penguins):
# drop rows with missing values
# return numpy arrays for features and labels
df = penguins.dropna(subset=['species', 'bill_length_mm', 'bill_depth_mm',
'flipper_length_mm', 'body_mass_g'])
features = df[['bill_length_mm', 'bill_depth_mm',
'flipper_length_mm', 'body_mass_g']].to_numpy()
labels = df['species'].to_numpy()
return features, labels
@node
def preprocess_data(features, labels):
# encode categorical variables
# scale numerical features
# split the data into train/test features and labels
scaler = StandardScaler()
features_scaled = scaler.fit_transform(features)
encoder = LabelEncoder()
labels_encoded = encoder.fit_transform(labels)
X_train, X_test, y_train, y_test = train_test_split(features_scaled, labels_encoded, test_size=0.2, random_state=42)
return encoder, X_train, X_test, y_train, y_test
@node
def train_model(X_train, y_train, X_test, y_test):
# train a model and save it
clf = LogisticRegression()
clf.fit(X_train, y_train)
# print the model's accuracy
accuracy = clf.score(X_test, y_test)
print(f'Model accuracy: {accuracy:.2f}')
return clf
@node
def predict_new_data(X_new, clf, encoder):
# load the model and make predictions
# return a string of the predicted class
prediction = clf.predict(X_new)
# decode the prediction
predicted_class = encoder.inverse_transform(prediction)
return predicted_class[0]
@pipeline
def training_pipeline():
features, labels = clean_data(penguins)
encoder, X_train, X_test, y_train, y_test = preprocess_data(features, labels)
model = train_model(X_train, y_train, X_test, y_test)
if __name__ == "__main__":
runner = SequentialRunner()
output_datasets = runner.run(pipeline=training_pipeline, catalog=catalog) |
Beta Was this translation helpful? Give feedback.
-
|
Overall I think I like the 2nd approach better, the first one is mostly a syntactic sugar and I don't think it provides much value other than a few keystrokes. The 2nd is better because the @pipeline
def training_pipeline():
features, labels = clean_data(penguins)
encoder, X_train, X_test, y_train, y_test = preprocess_data(features, labels)
model = train_model(X_train, y_train, X_test, y_test)What happen if now you have a namespace dataset like |
Beta Was this translation helpful? Give feedback.
-
I haven't really thought about it, but any reason it wouldn't be workable? The |
Beta Was this translation helpful? Give feedback.
-
|
I was thinking it's the opposite, you cannot have namespace.penguin in the pipeline code because it will be interpreted as a missing attribute. |
Beta Was this translation helpful? Give feedback.
-
Sorry, I wasn't clear; that's what I meant by, "Maybe you would just need to define your namespace; could be either as a variable or using a context manager or something." namespace = Namespace(name="namespace")
namespace.penguinsAlternatively, if you just want to make it a valid name, you can require something like For now, I haven't thought too much about this side of the spectrum, because my focus has been much more on the Jupyter notebook-using data scientist persona getting started rather than the person getting into more complex use cases (e.g. haven't even bothered thinking about dynamic pipelines). I think a lot of validation will need to be done regardless. Just to be clear, I also am not proposing getting rid of the other syntax or anything; I think this "simplified" syntax should at least ease the journey and should be able to go quite far, but then more explicit I'll try to better illustrate the adoption journey as I've currently thought about it ahead of tomorrow. |
Beta Was this translation helpful? Give feedback.
-
|
This issue was discussed in Tech Design earlier today. Here is the content of my (Marimo) slides: Flattening the learning curve
|
Beta Was this translation helpful? Give feedback.
-
|
Previous comment related to issue #4892 |
Beta Was this translation helpful? Give feedback.
-
|
This feels like an idea that would benefit from community feedback. What’s the plan to gather that input to see if it actually resolves #4892? I’d also be curious to hear from @Galileo-Galilei on whether this would help with adoption in his organisation. |
Beta Was this translation helpful? Give feedback.
-
|
Hi, interesting discussion! I have used this kind of alternative pipeline definition, and everything that optionally reduces boilerplate is good IMO. Apart from the meta-programming fun, I have done it for the following reasons, which are a bit different from the beginners-in-notebooks case:
Currently, I am mostly concerned with (2), but I believe a well-designed decorator feature can be helpful both for the beginners-in-notebooks, as well as is cases where the use of Kedro can be in conflict with other constraints or developer standards. When I played around with implementing (2) I used very thin (Note on (2): Handling limited iteration and branching is challangeing, but possible.) (Also +1 for parsing a script-file to pipeline.) |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
Introduction
ChatGPT once suggested this syntax to a user
Is this a syntax we'd like to add to Kedro?
Beta Was this translation helpful? Give feedback.
All reactions