Almost working!

This commit is contained in:
Brandon Hancock
2024-09-11 10:29:54 -04:00
parent 322780a5f3
commit d67c12a5a3
9 changed files with 240 additions and 0 deletions

View File

@@ -0,0 +1 @@
# TODO:

View File

@@ -0,0 +1,13 @@
from crewai.flows import Flow, end_job, start_job # type: ignore
class SimpleFlow(Flow):
@start_job()
async def research(self, topic: str) -> str:
print(f"Researching {topic}...")
return f"Full report on {topic}..."
@end_job("research")
async def write_post(self, report: str) -> str:
return f"Post written: {report}"

View File

@@ -0,0 +1,17 @@
from crewai.flows import Flow, end_job, job, start_job # type: ignore
class LongerFlow(Flow):
@start_job()
async def research(self, topic: str) -> str:
print(f"Researching {topic}...")
return f"Full report on {topic}..."
@job("research")
async def edit_report(self, report: str) -> str:
return f"Edited report: {report}"
@end_job("edit_report")
async def write_post(self, report: str) -> str:
return f"Post written: {report}"

View File

@@ -0,0 +1,22 @@
from typing import Tuple
from crewai.flows import Flow, end_job, router, start_job # type: ignore
class RouterFlow(Flow):
@start_job()
@router()
async def classify_email(self, report: str) -> Tuple[str, str]:
if "urgent" in report:
return "urgent", report
return "normal", report
@end_job("urgent")
async def write_urgent_email(self, report: str) -> str:
return f"Urgent Email Response: {report}"
@end_job("normal")
async def write_normal_email(self, report: str) -> str:
return f"Normal Email Response: {report}"

View File

@@ -0,0 +1,13 @@
from crewai.flows import Flow, end_job, start_job # type: ignore
class SimpleFlow(Flow):
@start_job()
async def research(self, topic: str) -> str:
print(f"Researching {topic}...")
return f"Full report on {topic}..."
@end_job("research")
async def write_post(self, report: str) -> str:
return f"Post written: {report}"

View File

@@ -0,0 +1,19 @@
from crewai.flows import Flow, end_job, job, start_job # type: ignore
class SimpleFlow(Flow):
@start_job()
async def research_crew(self, topic: str) -> str:
result = research_crew.kickoff(inputs={topic: topic})
return result.raw
@job("research_crew")
async def create_x_post(self, research: str) -> str:
result = x_post_crew.kickoff(inputs={research: research})
return result.raw
@end_job("research")
async def post_to_x(self, post: str) -> None:
# TODO: Post to X
return None

100
src/crewai/flow/flow.py Normal file
View File

@@ -0,0 +1,100 @@
from typing import Any, Callable, Dict, Generic, List, TypeVar, Union, get_args
from pydantic import BaseModel
TState = TypeVar("TState", bound=Union[BaseModel, Dict[str, Any]])
class FlowMeta(type):
def __new__(mcs, name, bases, dct):
cls = super().__new__(mcs, name, bases, dct)
start_methods = []
listeners = {}
for attr_name, attr_value in dct.items():
if hasattr(attr_value, "__is_start_method__"):
start_methods.append(attr_name)
if hasattr(attr_value, "__trigger_methods__"):
for trigger in attr_value.__trigger_methods__:
trigger_name = trigger.__name__ if callable(trigger) else trigger
if trigger_name not in listeners:
listeners[trigger_name] = []
listeners[trigger_name].append(attr_name)
setattr(cls, "_start_methods", start_methods)
setattr(cls, "_listeners", listeners)
return cls
class Flow(Generic[TState], metaclass=FlowMeta):
_start_methods: List[str] = []
_listeners: Dict[str, List[str]] = {}
state: TState
def __init__(self):
self._methods: Dict[str, Callable] = {}
self.state = self._create_default_state()
for method_name in dir(self):
if callable(getattr(self, method_name)) and not method_name.startswith(
"__"
):
self._methods[method_name] = getattr(self, method_name)
def _create_default_state(self) -> TState:
state_type = self._get_state_type()
if state_type and issubclass(state_type, BaseModel):
return state_type()
return DictWrapper() # type: ignore
def _get_state_type(self) -> type[TState] | None:
for base in self.__class__.__bases__:
if hasattr(base, "__origin__") and base.__origin__ is Flow:
args = get_args(base)
if args:
return args[0]
return None
def run(self):
if not self._start_methods:
raise ValueError("No start method defined")
for start_method in self._start_methods:
result = self._methods[start_method]()
self._execute_listeners(start_method, result)
def _execute_listeners(self, trigger_method: str, result: Any):
if trigger_method in self._listeners:
for listener in self._listeners[trigger_method]:
try:
listener_result = self._methods[listener](result)
self._execute_listeners(listener, listener_result)
except Exception as e:
print(f"Error in method {listener}: {str(e)}")
return
class DictWrapper(Dict[str, Any]):
def __getattr__(self, name: str) -> Any:
return self.get(name)
def __setattr__(self, name: str, value: Any) -> None:
self[name] = value
def start():
def decorator(func):
func.__is_start_method__ = True
return func
return decorator
def listen(*trigger_methods):
def decorator(func):
func.__trigger_methods__ = trigger_methods
return func
return decorator

View File

@@ -0,0 +1,27 @@
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel
class ExampleState(BaseModel):
counter: int = 0
message: str = ""
class StructuredExampleFlow(Flow[ExampleState]):
@start()
def start_method(self):
print("Starting the structured flow")
self.state.message = "Hello from structured flow"
return "Start result"
@listen(start_method)
def second_method(self, result):
print(f"Second method, received: {result}")
self.state.counter += 1
self.state.message = "Hello from structured flow"
return "Second result"
# Run the flow
structured_flow = StructuredExampleFlow()
structured_flow.run()

View File

@@ -0,0 +1,28 @@
from crewai.flow.flow import Flow, listen, start
class FlexibleExampleFlow(Flow):
@start()
def start_method(self):
print("Starting the flexible flow")
self.state.counter = 1
return "Start result"
@listen(start_method)
def second_method(self, result):
print(f"Second method, received: {result}")
self.state.counter += 1
self.state.message = "Hello from flexible flow"
return "Second result"
@listen(second_method)
def third_method(self, result):
print(f"Third method, received: {result}")
print(f"Final counter value: {self.state.counter}")
print(f"Final message: {self.state.message}")
return "Third result"
# Run the flows
flexible_flow = FlexibleExampleFlow()
flexible_flow.run()