diff --git a/src/crewai/flow/decorators.py b/src/crewai/flow/decorators.py new file mode 100644 index 000000000..e151d4aaf --- /dev/null +++ b/src/crewai/flow/decorators.py @@ -0,0 +1 @@ +# TODO: diff --git a/src/crewai/flow/examples/context.py b/src/crewai/flow/examples/context.py new file mode 100644 index 000000000..2d3b5be7d --- /dev/null +++ b/src/crewai/flow/examples/context.py @@ -0,0 +1,13 @@ +from crewai.flows import Flow, end_job, start_job # type: ignore + + +class SimpleFlow(Flow): + + @start_job() + async def research(self, topic: str) -> str: + print(f"Researching {topic}...") + return f"Full report on {topic}..." + + @end_job("research") + async def write_post(self, report: str) -> str: + return f"Post written: {report}" diff --git a/src/crewai/flow/examples/longer.py b/src/crewai/flow/examples/longer.py new file mode 100644 index 000000000..ce378a142 --- /dev/null +++ b/src/crewai/flow/examples/longer.py @@ -0,0 +1,17 @@ +from crewai.flows import Flow, end_job, job, start_job # type: ignore + + +class LongerFlow(Flow): + + @start_job() + async def research(self, topic: str) -> str: + print(f"Researching {topic}...") + return f"Full report on {topic}..." + + @job("research") + async def edit_report(self, report: str) -> str: + return f"Edited report: {report}" + + @end_job("edit_report") + async def write_post(self, report: str) -> str: + return f"Post written: {report}" diff --git a/src/crewai/flow/examples/router.py b/src/crewai/flow/examples/router.py new file mode 100644 index 000000000..d68208fb0 --- /dev/null +++ b/src/crewai/flow/examples/router.py @@ -0,0 +1,22 @@ +from typing import Tuple + +from crewai.flows import Flow, end_job, router, start_job # type: ignore + + +class RouterFlow(Flow): + + @start_job() + @router() + async def classify_email(self, report: str) -> Tuple[str, str]: + if "urgent" in report: + return "urgent", report + + return "normal", report + + @end_job("urgent") + async def write_urgent_email(self, report: str) -> str: + return f"Urgent Email Response: {report}" + + @end_job("normal") + async def write_normal_email(self, report: str) -> str: + return f"Normal Email Response: {report}" diff --git a/src/crewai/flow/examples/simple.py b/src/crewai/flow/examples/simple.py new file mode 100644 index 000000000..2d3b5be7d --- /dev/null +++ b/src/crewai/flow/examples/simple.py @@ -0,0 +1,13 @@ +from crewai.flows import Flow, end_job, start_job # type: ignore + + +class SimpleFlow(Flow): + + @start_job() + async def research(self, topic: str) -> str: + print(f"Researching {topic}...") + return f"Full report on {topic}..." + + @end_job("research") + async def write_post(self, report: str) -> str: + return f"Post written: {report}" diff --git a/src/crewai/flow/examples/simple_with_crew.py b/src/crewai/flow/examples/simple_with_crew.py new file mode 100644 index 000000000..b4a8487be --- /dev/null +++ b/src/crewai/flow/examples/simple_with_crew.py @@ -0,0 +1,19 @@ +from crewai.flows import Flow, end_job, job, start_job # type: ignore + + +class SimpleFlow(Flow): + + @start_job() + async def research_crew(self, topic: str) -> str: + result = research_crew.kickoff(inputs={topic: topic}) + return result.raw + + @job("research_crew") + async def create_x_post(self, research: str) -> str: + result = x_post_crew.kickoff(inputs={research: research}) + return result.raw + + @end_job("research") + async def post_to_x(self, post: str) -> None: + # TODO: Post to X + return None diff --git a/src/crewai/flow/flow.py b/src/crewai/flow/flow.py new file mode 100644 index 000000000..6cac9a7b7 --- /dev/null +++ b/src/crewai/flow/flow.py @@ -0,0 +1,100 @@ +from typing import Any, Callable, Dict, Generic, List, TypeVar, Union, get_args + +from pydantic import BaseModel + +TState = TypeVar("TState", bound=Union[BaseModel, Dict[str, Any]]) + + +class FlowMeta(type): + def __new__(mcs, name, bases, dct): + cls = super().__new__(mcs, name, bases, dct) + + start_methods = [] + listeners = {} + + for attr_name, attr_value in dct.items(): + if hasattr(attr_value, "__is_start_method__"): + start_methods.append(attr_name) + if hasattr(attr_value, "__trigger_methods__"): + for trigger in attr_value.__trigger_methods__: + trigger_name = trigger.__name__ if callable(trigger) else trigger + if trigger_name not in listeners: + listeners[trigger_name] = [] + listeners[trigger_name].append(attr_name) + + setattr(cls, "_start_methods", start_methods) + setattr(cls, "_listeners", listeners) + + return cls + + +class Flow(Generic[TState], metaclass=FlowMeta): + _start_methods: List[str] = [] + _listeners: Dict[str, List[str]] = {} + state: TState + + def __init__(self): + self._methods: Dict[str, Callable] = {} + self.state = self._create_default_state() + + for method_name in dir(self): + if callable(getattr(self, method_name)) and not method_name.startswith( + "__" + ): + self._methods[method_name] = getattr(self, method_name) + + def _create_default_state(self) -> TState: + state_type = self._get_state_type() + if state_type and issubclass(state_type, BaseModel): + return state_type() + return DictWrapper() # type: ignore + + def _get_state_type(self) -> type[TState] | None: + for base in self.__class__.__bases__: + if hasattr(base, "__origin__") and base.__origin__ is Flow: + args = get_args(base) + if args: + return args[0] + return None + + def run(self): + if not self._start_methods: + raise ValueError("No start method defined") + + for start_method in self._start_methods: + result = self._methods[start_method]() + self._execute_listeners(start_method, result) + + def _execute_listeners(self, trigger_method: str, result: Any): + if trigger_method in self._listeners: + for listener in self._listeners[trigger_method]: + try: + listener_result = self._methods[listener](result) + self._execute_listeners(listener, listener_result) + except Exception as e: + print(f"Error in method {listener}: {str(e)}") + return + + +class DictWrapper(Dict[str, Any]): + def __getattr__(self, name: str) -> Any: + return self.get(name) + + def __setattr__(self, name: str, value: Any) -> None: + self[name] = value + + +def start(): + def decorator(func): + func.__is_start_method__ = True + return func + + return decorator + + +def listen(*trigger_methods): + def decorator(func): + func.__trigger_methods__ = trigger_methods + return func + + return decorator diff --git a/src/crewai/flow/structured_test_flow.py b/src/crewai/flow/structured_test_flow.py new file mode 100644 index 000000000..dfd92972c --- /dev/null +++ b/src/crewai/flow/structured_test_flow.py @@ -0,0 +1,27 @@ +from crewai.flow.flow import Flow, listen, start +from pydantic import BaseModel + + +class ExampleState(BaseModel): + counter: int = 0 + message: str = "" + + +class StructuredExampleFlow(Flow[ExampleState]): + @start() + def start_method(self): + print("Starting the structured flow") + self.state.message = "Hello from structured flow" + return "Start result" + + @listen(start_method) + def second_method(self, result): + print(f"Second method, received: {result}") + self.state.counter += 1 + self.state.message = "Hello from structured flow" + return "Second result" + + +# Run the flow +structured_flow = StructuredExampleFlow() +structured_flow.run() diff --git a/src/crewai/flow/unstructured_test_flow.py b/src/crewai/flow/unstructured_test_flow.py new file mode 100644 index 000000000..d3a7c5f8b --- /dev/null +++ b/src/crewai/flow/unstructured_test_flow.py @@ -0,0 +1,28 @@ +from crewai.flow.flow import Flow, listen, start + + +class FlexibleExampleFlow(Flow): + @start() + def start_method(self): + print("Starting the flexible flow") + self.state.counter = 1 + return "Start result" + + @listen(start_method) + def second_method(self, result): + print(f"Second method, received: {result}") + self.state.counter += 1 + self.state.message = "Hello from flexible flow" + return "Second result" + + @listen(second_method) + def third_method(self, result): + print(f"Third method, received: {result}") + print(f"Final counter value: {self.state.counter}") + print(f"Final message: {self.state.message}") + return "Third result" + + +# Run the flows +flexible_flow = FlexibleExampleFlow() +flexible_flow.run()