Update docs and add more examples

This commit is contained in:
Brandon Hancock
2024-09-24 15:41:11 -04:00
parent bfaba72da2
commit 128872a482
11 changed files with 298 additions and 14 deletions

View File

@@ -2,6 +2,7 @@ import asyncio
import inspect
from typing import Any, Callable, Dict, Generic, List, Set, Type, TypeVar, Union
import graphviz
from pydantic import BaseModel
T = TypeVar("T", bound=Union[BaseModel, Dict[str, Any]])
@@ -250,3 +251,103 @@ class Flow(Generic[T], metaclass=FlowMeta):
import traceback
traceback.print_exc()
def visualize(self, filename="flow_graph"):
dot = graphviz.Digraph(comment="Flow Graph", engine="dot")
dot.attr(
rankdir="TB", size="20,20", splines="curved"
) # Changed to curved splines
# Color scheme (using company colors)
colors = {
"bg": "#FFFFFF",
"start": "#FF5A50",
"method": "#333333",
"router_outline": "#FF5A50",
"edge": "#333333",
"text": "#FFFFFF",
}
dot.attr(bgcolor=colors["bg"])
# Add nodes for each relevant method
for method_name, method in self._methods.items():
if (
hasattr(method, "__is_start_method__")
or method_name in self._listeners
or method_name in self._routers.values()
):
shape = "rectangle"
style = "filled,rounded"
fillcolor = colors["method"]
if hasattr(method, "__is_start_method__"):
fillcolor = colors["start"]
dot.node(
method_name,
method_name,
shape=shape,
style=style,
fillcolor=fillcolor,
fontcolor=colors["text"],
penwidth="2",
)
# Add edges and enhanced router representation
for method_name, method in self._methods.items():
if method_name in self._listeners:
condition_type, trigger_methods = self._listeners[method_name]
for trigger in trigger_methods:
if condition_type == "AND":
dot.edge(
trigger,
method_name,
color=colors["edge"],
style="dashed",
penwidth="2",
)
else: # OR condition
dot.edge(
trigger, method_name, color=colors["edge"], penwidth="2"
)
if method_name in self._routers.values():
for trigger, router in self._routers.items():
if router == method_name:
# Create a subgraph for the router and its outputs
subgraph_name = f"cluster_{method_name}"
subgraph = graphviz.Digraph(name=subgraph_name)
subgraph.attr(
label="",
style="filled,rounded",
color=colors["router_outline"],
fillcolor=colors["method"],
penwidth="3",
) # Changed to solid line and increased penwidth
# Router label (method name) and outputs
label = f"{method_name}\\n\\nPossible outcomes:\\n• Success\\n• Failure"
subgraph.node(
method_name,
label,
shape="plaintext",
fontcolor=colors["text"],
)
# Add the subgraph to the main graph
dot.subgraph(subgraph)
# Connect trigger to router (to the border of the subgraph)
dot.edge(
trigger,
method_name,
color=colors["edge"],
style="solid",
penwidth="2",
lhead=subgraph_name,
)
# Render and save the graph
dot.render(filename, format="png", cleanup=True, view=True)
print(f"Graph saved as {filename}.png")

Binary file not shown.

After

Width:  |  Height:  |  Size: 53 KiB

View File

@@ -0,0 +1,119 @@
import asyncio
from crewai.flow.flow import Flow, listen, start
class BinaryBranchingFlow(Flow):
@start()
def start_method(self):
print("Starting the binary branching flow")
self.state["counter"] = 1
return "Start result"
# Level 1
@listen(start_method)
def first_branch_left(self):
print("First branch (left)")
self.state["counter"] += 1
return "First branch left result"
@listen(start_method)
def first_branch_right(self):
print("First branch (right)")
self.state["counter"] += 1
return "First branch right result"
# Level 2 - Left Branch
@listen(first_branch_left)
def second_branch_left_left(self):
print("Second branch from first left (left)")
self.state["counter"] += 1
return "Second branch left left result"
@listen(first_branch_left)
def second_branch_left_right(self):
print("Second branch from first left (right)")
self.state["counter"] += 1
return "Second branch left right result"
# Level 2 - Right Branch
@listen(first_branch_right)
def second_branch_right_left(self):
print("Second branch from first right (left)")
self.state["counter"] += 1
return "Second branch right left result"
@listen(first_branch_right)
def second_branch_right_right(self):
print("Second branch from first right (right)")
self.state["counter"] += 1
return "Second branch right right result"
# Level 3 - Left Left Branch
@listen(second_branch_left_left)
def third_branch_left_left_left(self):
print("Third branch from second left left (left)")
self.state["counter"] += 1
return "Third branch left left left result"
@listen(second_branch_left_left)
def third_branch_left_left_right(self):
print("Third branch from second left left (right)")
self.state["counter"] += 1
return "Third branch left left right result"
# Level 3 - Left Right Branch
@listen(second_branch_left_right)
def third_branch_left_right_left(self):
print("Third branch from second left right (left)")
self.state["counter"] += 1
return "Third branch left right left result"
@listen(second_branch_left_right)
def third_branch_left_right_right(self):
print("Third branch from second left right (right)")
self.state["counter"] += 1
return "Third branch left right right result"
# Level 3 - Right Left Branch
@listen(second_branch_right_left)
def third_branch_right_left_left(self):
print("Third branch from second right left (left)")
self.state["counter"] += 1
return "Third branch right left left result"
@listen(second_branch_right_left)
def third_branch_right_left_right(self):
print("Third branch from second right left (right)")
self.state["counter"] += 1
return "Third branch right left right result"
# Level 3 - Right Right Branch
@listen(second_branch_right_right)
def third_branch_right_right_left(self):
print("Third branch from second right right (left)")
self.state["counter"] += 1
return "Third branch right right left result"
@listen(second_branch_right_right)
def third_branch_right_right_right(self):
print("Third branch from second right right (right)")
self.state["counter"] += 1
return "Third branch right right right result"
# Final method for visualization
@listen(third_branch_left_left_left) # This is the deepest branch in the tree
def final_method(self):
print("Final method reached!")
print(f"Final counter value: {self.state['counter']}")
return "Final result"
async def main():
flow = BinaryBranchingFlow()
# Uncomment this if you want to run the flow with kickoff
# await flow.kickoff()
flow.visualize()
asyncio.run(main())

View File

@@ -21,7 +21,7 @@ class AndExampleFlow(Flow):
async def main():
flow = AndExampleFlow()
await flow.kickoff()
flow.visualize()
asyncio.run(main())

View File

@@ -20,7 +20,8 @@ class OrExampleFlow(Flow):
async def main():
flow = OrExampleFlow()
await flow.kickoff()
flow.visualize()
# await flow.kickoff()
asyncio.run(main())

View File

@@ -35,7 +35,7 @@ class RouterFlow(Flow[ExampleState]):
async def main():
flow = RouterFlow()
await flow.kickoff()
flow.visualize()
asyncio.run(main())

View File

@@ -27,7 +27,8 @@ class FlexibleExampleFlow(Flow):
async def main():
flow = FlexibleExampleFlow()
await flow.kickoff()
# await flow.kickoff()
flow.visualize()
asyncio.run(main())