-
Notifications
You must be signed in to change notification settings - Fork 842
Python: [DRAFT] workflow composibility design #2366
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
eavanvalkenburg
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some very interesting things in here, I agree that option 2 makes the most sense, but I think the port idea might make some other scenario's easier and should be considered.
| analysis = ( | ||
| ConcurrentBuilder() | ||
| .participants([operation, compliance]) | ||
| .as_connection() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this even needed? couldn't the next step just accept a unbuild workflowbuilder of any type? and then call as_connection itself, the function doesn't have params, so should be easy. Alternatively the prefix could be set in as_connection and then you can just connect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or even have prefix default to the name of the workflow, making it a bit easier to keep track of it.
| ) | ||
|
|
||
| builder = WorkflowBuilder() | ||
| analysis_handle = builder.connect(analysis, prefix="analysis") # merge + handle |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so while I understand the connect verbiage, why not keep it a bit simpler and just add_workflow, which allows either a WorkflowConnection or a WorkflowBuilder (and that probably means also making sure that the workflowbuilders are all based on a single baseclass with the as_connection logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've now got a shared mixin-style surface (as_connection, add_workflow as the single verb) rather than forcing all high-level builders into a single inheritance base because:
- High-level builders already have distinct ctor shapes and internal state; pushing them under one base would introduce a breaking constructor contract or a leaky abstract layer we’d need to retrofit across Concurrent/Sequential/GroupChat/Magentic/Handoff.
- The behavior we need is tiny (expose .as_connection(prefix=...) and route through WorkflowBuilder), which we can provide uniformly via a mixin or thin delegations without reshaping the class hierarchy.
- add_workflow accepts WorkflowBuilder, Workflow, or WorkflowConnection; so callers get the simple API, while we keep existing builder implementations intact.
- This keeps public surface additive and avoids a sweeping inheritance refactor now; if we later find more shared behavior, we can introduce a common base with backward-compatible constructors.
| - Composition helpers use contracts to fail fast or select the right canned adapter. | ||
| - Pros: predictable type-safe bridges and better error messages. Cons: adds small surface area but aligns with existing adapter executors already used inside SequentialBuilder. | ||
|
|
||
| ## Option 4: Port-Based Interfaces and Extension Points |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would be a interesting feature as well, making it very explicit which input and outputs there are, also allowing hosted workflows to be instantiated with different endpoints for the different input ports.
| - Keep checkpointing, request/response handling, and observability semantics intact across composed graphs. | ||
|
|
||
| ## Current State | ||
| - High-level builders (ConcurrentBuilder, SequentialBuilder, group chat variants) emit a finished Workflow; the graph is immutable and cannot be extended directly. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to make sure that the responsibilities of our abstractions are clearly defined:
- A Workflow instance: built from a workflow builder and immutable
- A WorkflowBuilder instance: created by the users and mutable
This makes the composition story more streamlined and with less ambiguity. This also potentially aligns better with the declarative story: the declarative language is essentially a builder language.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding a terminology section.
| - Type compatibility is enforced by WorkflowBuilder during validation, but only within a single builder instance; cross-workflow composition relies on developers hand-wiring compatible adapters. | ||
|
|
||
| ## Requirements | ||
| - Compose multiple workflows (built from any builder) as first-class nodes inside a parent graph. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have we not achieved this via subworkflows?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or you mean the workflow must not be wrapped in an executor?
|
|
||
| ## Option 1: WorkflowExecutor-Centric Composition Helpers | ||
| - Add a fluent creation path for WorkflowExecutor to remove manual boilerplate: | ||
| - `Workflow.as_executor(id: str | None = None, *, allow_direct_output: bool = False, adapter: Callable[[Any], Any] | None = None) -> WorkflowExecutor` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just syntactic sugar for what we already have:
workflow_executor = WorkflowExecutor(workflow, id="abc", ...)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed - this is syntactic sugar. The doc is exploring different approaches; we'll select what makes sense based on the full discussion.
| builder = WorkflowBuilder() | ||
| normalize_handle = builder.add_connection(normalize_connection, prefix="prep") | ||
| summarize_handle = builder.add_connection(summarize_connection, prefix="summary") | ||
| builder.connect(normalize_handle.output_points[0], summarize_handle.start_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is an implicitly assumption we are making here, which is that users must know the internals of the workflow builders to compose them. For example, they need to know the executors and the connections between them the builder is going to create in order to make the correct connections between the output points and starting points.
This makes the whole connection object another layer of complexity and it's probably unnecessary.
Say we have two builders and we want to merge them. And we know what's inside those two builders. We can simply do the following:
workflow_builder_a = ...
workflow_builder_b = ...
new_builder = (
workflow_builder_a
.merge(workflow_builder_b)
.add_edge(workflow_builder_a.get_executor("..."), workflow_builder_b.get_executor("..."))
)All the checkpoint and start executor configuration are preserved after merging but overrides are allowed because the workflow builder comes with all the APIs.
|
|
||
| ## Option 3: Typed Adapters and Contracts | ||
| - Provide first-class adapter executors to bridge mismatched but intentionally compatible types instead of ad-hoc callbacks: | ||
| - `builder.add_adapter(source, target, adapter_fn)` sugar that injects a small Executor running adapter_fn; validated via is_type_compatible on adapter outputs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can achieve this by adding a new executor that transforms the data in the merged workflow builder.
| - Executors expose `ports: dict[str, PortSpec]` where PortSpec includes direction (in/out), type set, and optional semantics tag (`conversation`, `aggregate`, `request`, `control`). | ||
| - Builders produce a `WorkflowPorts` manifest identifying exposed ports (entry, exit, extension points) instead of only start/terminal nodes. | ||
| - New APIs: `builder.connect(source=(node_id, "out:conversation"), target=(node_id, "in:conversation"))` with validation on port types/semantics. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really want to introduce this level of complexity to the lower-level APIs?
The PortSpec feels a bit opinionated. And how does an executor get the port specs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's fair. This option is intentionally more complex to explore the opposite end of the design spectrum. It helps us understand the trade-offs between implicit (current state) and explicit (port-based) approaches.
| self._checkpoint_storage = checkpoint_storage | ||
| return self | ||
|
|
||
| def as_connection(self) -> WorkflowConnection: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we have all the high-level workflow builders inherit from WorkflowBuilder instead?
|
|
||
| return WorkflowAgent(workflow=self, name=name) | ||
|
|
||
| def as_connection(self, prefix: str | None = None) -> "WorkflowConnection": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This goes back to my comments in the ADR.
This does feel like just a copy of the workflow builder.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The intent is to let an immutable Workflow be reused in composition without mutating it. At runtime the only safe way to inline a built workflow is to recreate a WorkflowBuilder copy of its topology, because all wiring APIs live on the builder. So Workflow.as_connection deep-clones the existing graph into a builder and returns a WorkflowConnection handle: there’s no new surface beyond that copy step. It looks like “copying the builder” because that’s exactly what we need to re-enter the composition APIs while keeping the original workflow immutable; the alternative would be to expose mutation on Workflow, which we're avoiding.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the reason that we cannot reuse the builder but need to use the workflow instance?
Motivation and Context
Putting some thoughts and a POC together around workflow compatibility - being able to connect together two workflows, edges + high-level orchestration APIs, etc.
Description
Contribution Checklist