mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-09 08:08:32 +00:00
* WIP. Procedure appears to be working well. Working on mocking properly for tests * All tests are passing now * rshift working * Add back in Gui's tool_usage fix * WIP * Going to start refactoring for pipeline_output * Update terminology * new pipeline flow with traces and usage metrics working. need to add more tests and make sure PipelineOutput behaves likew CrewOutput * Fix pipelineoutput to look more like crewoutput and taskoutput * Implemented additional tests for pipeline. One test is failing. Need team support * Update docs for pipeline * Update pipeline to properly process input and ouput dictionary * Update Pipeline docs * Add back in commentary at top of pipeline file * Starting to work on router * Drop router for now. will add in separately * In the middle of fixing router. A ton of circular dependencies. Moving over to a new design. * WIP. * Fix circular dependencies and updated PipelineRouter * Add in Eduardo feedback. Still need to add in more commentary describing the design decisions for pipeline * Add developer notes to explain what is going on in pipelines. * Add doc strings * Fix missing rag datatype * WIP. Converting usage metrics from a dict to an object * Fix tests that were checking usage metrics * Drop todo * Fix 1 type error in pipeline * Update pipeline to use UsageMetric * Add missing doc string * WIP. * Change names * Rename variables based on joaos feedback * Fix critical circular dependency issues. Now needing to fix trace issue. * Tests working now! * Add more tests which showed underlying issue with traces * Fix tests * Remove overly complicated test * Add router example to docs * Clean up end of docs * Clean up docs * Working on creating Crew templates and pipeline templates * WIP. * WIP * Fix poetry install from templates * WIP * Restructure * changes for lorenze * more todos * WIP: create pipelines cli working * wrapped up router * ignore mypy src on templates * ignored signature of copy * fix all verbose * rm print statements * brought back correct folders * fixes missing folders and then rm print statements * fixed tests * fixed broken test * fixed type checker * fixed type ignore * ignore types for templates * needed * revert * exclude only required * rm type errors on templates * rm excluding type checks for template files on github action * fixed missing quotes --------- Co-authored-by: Brandon Hancock <brandon@brandonhancock.io>
190 lines
7.0 KiB
Python
190 lines
7.0 KiB
Python
import inspect
|
|
import os
|
|
from pathlib import Path
|
|
from typing import Any, Callable, Dict
|
|
|
|
import yaml
|
|
from dotenv import load_dotenv
|
|
from pydantic import ConfigDict
|
|
|
|
load_dotenv()
|
|
|
|
|
|
def CrewBase(cls):
|
|
class WrappedClass(cls):
|
|
model_config = ConfigDict(arbitrary_types_allowed=True)
|
|
is_crew_class: bool = True # type: ignore
|
|
|
|
base_directory = None
|
|
for frame_info in inspect.stack():
|
|
if "site-packages" not in frame_info.filename:
|
|
base_directory = Path(frame_info.filename).parent.resolve()
|
|
break
|
|
|
|
original_agents_config_path = getattr(
|
|
cls, "agents_config", "config/agents.yaml"
|
|
)
|
|
|
|
original_tasks_config_path = getattr(cls, "tasks_config", "config/tasks.yaml")
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
|
|
if self.base_directory is None:
|
|
raise Exception(
|
|
"Unable to dynamically determine the project's base directory, you must run it from the project's root directory."
|
|
)
|
|
|
|
self.agents_config = self.load_yaml(
|
|
os.path.join(self.base_directory, self.original_agents_config_path)
|
|
)
|
|
|
|
self.tasks_config = self.load_yaml(
|
|
os.path.join(self.base_directory, self.original_tasks_config_path)
|
|
)
|
|
|
|
self.map_all_agent_variables()
|
|
self.map_all_task_variables()
|
|
|
|
@staticmethod
|
|
def load_yaml(config_path: str):
|
|
with open(config_path, "r") as file:
|
|
# parsedContent = YamlParser.parse(file) # type: ignore # Argument 1 to "parse" has incompatible type "TextIOWrapper"; expected "YamlParser"
|
|
return yaml.safe_load(file)
|
|
|
|
def _get_all_functions(self):
|
|
return {
|
|
name: getattr(self, name)
|
|
for name in dir(self)
|
|
if callable(getattr(self, name))
|
|
}
|
|
|
|
def _filter_functions(
|
|
self, functions: Dict[str, Callable], attribute: str
|
|
) -> Dict[str, Callable]:
|
|
return {
|
|
name: func
|
|
for name, func in functions.items()
|
|
if hasattr(func, attribute)
|
|
}
|
|
|
|
def map_all_agent_variables(self) -> None:
|
|
all_functions = self._get_all_functions()
|
|
llms = self._filter_functions(all_functions, "is_llm")
|
|
tool_functions = self._filter_functions(all_functions, "is_tool")
|
|
cache_handler_functions = self._filter_functions(
|
|
all_functions, "is_cache_handler"
|
|
)
|
|
callbacks = self._filter_functions(all_functions, "is_callback")
|
|
agents = self._filter_functions(all_functions, "is_agent")
|
|
|
|
for agent_name, agent_info in self.agents_config.items():
|
|
self._map_agent_variables(
|
|
agent_name,
|
|
agent_info,
|
|
agents,
|
|
llms,
|
|
tool_functions,
|
|
cache_handler_functions,
|
|
callbacks,
|
|
)
|
|
|
|
def _map_agent_variables(
|
|
self,
|
|
agent_name: str,
|
|
agent_info: Dict[str, Any],
|
|
agents: Dict[str, Callable],
|
|
llms: Dict[str, Callable],
|
|
tool_functions: Dict[str, Callable],
|
|
cache_handler_functions: Dict[str, Callable],
|
|
callbacks: Dict[str, Callable],
|
|
) -> None:
|
|
if llm := agent_info.get("llm"):
|
|
self.agents_config[agent_name]["llm"] = llms[llm]()
|
|
|
|
if tools := agent_info.get("tools"):
|
|
self.agents_config[agent_name]["tools"] = [
|
|
tool_functions[tool]() for tool in tools
|
|
]
|
|
|
|
if function_calling_llm := agent_info.get("function_calling_llm"):
|
|
self.agents_config[agent_name]["function_calling_llm"] = agents[
|
|
function_calling_llm
|
|
]()
|
|
|
|
if step_callback := agent_info.get("step_callback"):
|
|
self.agents_config[agent_name]["step_callback"] = callbacks[
|
|
step_callback
|
|
]()
|
|
|
|
if cache_handler := agent_info.get("cache_handler"):
|
|
self.agents_config[agent_name]["cache_handler"] = (
|
|
cache_handler_functions[cache_handler]()
|
|
)
|
|
|
|
def map_all_task_variables(self) -> None:
|
|
all_functions = self._get_all_functions()
|
|
agents = self._filter_functions(all_functions, "is_agent")
|
|
tasks = self._filter_functions(all_functions, "is_task")
|
|
output_json_functions = self._filter_functions(
|
|
all_functions, "is_output_json"
|
|
)
|
|
tool_functions = self._filter_functions(all_functions, "is_tool")
|
|
callback_functions = self._filter_functions(all_functions, "is_callback")
|
|
output_pydantic_functions = self._filter_functions(
|
|
all_functions, "is_output_pydantic"
|
|
)
|
|
|
|
for task_name, task_info in self.tasks_config.items():
|
|
self._map_task_variables(
|
|
task_name,
|
|
task_info,
|
|
agents,
|
|
tasks,
|
|
output_json_functions,
|
|
tool_functions,
|
|
callback_functions,
|
|
output_pydantic_functions,
|
|
)
|
|
|
|
def _map_task_variables(
|
|
self,
|
|
task_name: str,
|
|
task_info: Dict[str, Any],
|
|
agents: Dict[str, Callable],
|
|
tasks: Dict[str, Callable],
|
|
output_json_functions: Dict[str, Callable],
|
|
tool_functions: Dict[str, Callable],
|
|
callback_functions: Dict[str, Callable],
|
|
output_pydantic_functions: Dict[str, Callable],
|
|
) -> None:
|
|
if context_list := task_info.get("context"):
|
|
self.tasks_config[task_name]["context"] = [
|
|
tasks[context_task_name]() for context_task_name in context_list
|
|
]
|
|
|
|
if tools := task_info.get("tools"):
|
|
self.tasks_config[task_name]["tools"] = [
|
|
tool_functions[tool]() for tool in tools
|
|
]
|
|
|
|
if agent_name := task_info.get("agent"):
|
|
self.tasks_config[task_name]["agent"] = agents[agent_name]()
|
|
|
|
if output_json := task_info.get("output_json"):
|
|
self.tasks_config[task_name]["output_json"] = output_json_functions[
|
|
output_json
|
|
]
|
|
|
|
if output_pydantic := task_info.get("output_pydantic"):
|
|
self.tasks_config[task_name]["output_pydantic"] = (
|
|
output_pydantic_functions[output_pydantic]
|
|
)
|
|
|
|
if callbacks := task_info.get("callbacks"):
|
|
self.tasks_config[task_name]["callbacks"] = [
|
|
callback_functions[callback]() for callback in callbacks
|
|
]
|
|
|
|
return WrappedClass
|