Merge branch 'bugfix/langchain-tool-config-change' into feature/procedure_v2

This commit is contained in:
Brandon Hancock
2024-07-15 11:47:16 -04:00
6 changed files with 157 additions and 31 deletions

View File

@@ -0,0 +1,112 @@
---
title: crewAI Procedures
description: Understanding and utilizing procedures in the crewAI framework for sequential execution of multiple crews.
---
## What is a Procedure?
A procedure in crewAI represents a sequence of crews that are executed one after another. It allows for the chaining of multiple crews, where the output of one crew becomes the input for the next, enabling complex, multi-stage workflows.
## Procedure Attributes
| Attribute | Parameters | Description |
| :-------- | :--------- | :------------------------------------------ |
| **Crews** | `crews` | A list of crews to be executed in sequence. |
## Working with Procedures
The following example demonstrates how to create, execute, and work with Procedures:
```python
import asyncio
from crewai import Agent, Task, Crew, Procedure
from crewai.crews.crew_output import CrewOutput
# Define agents
researcher = Agent(
role='Senior Research Analyst',
goal='Discover innovative AI technologies',
backstory="You're a senior research analyst specializing in AI trends.",
)
writer = Agent(
role='Content Writer',
goal='Write engaging articles on AI discoveries',
backstory="You're a senior writer specializing in AI content.",
)
# Define tasks for each crew
research_task = Task(
description='Identify breakthrough AI technologies',
agent=researcher
)
write_task = Task(
description='Draft an article on the latest AI technologies',
agent=writer
)
# Create crews
research_crew = Crew(
agents=[researcher],
tasks=[research_task],
verbose=True
)
writing_crew = Crew(
agents=[writer],
tasks=[write_task],
verbose=True
)
# Create a procedure
procedure = research_crew >> writing_crew
# Alternative way to create a procedure
# procedure = Procedure(crews=[research_crew, writing_crew])
# Function to run the procedure
async def run_procedure():
inputs = [
{"topic": "AI in healthcare"},
{"topic": "AI in finance"}
]
results = await procedure.kickoff(inputs)
return results
# Execute the procedure and process results
async def main():
results = await run_procedure()
for i, result in enumerate(results):
print(f"\nResult {i + 1}:")
# Access raw output
print("Raw output:", result.raw)
# Access JSON output (if available)
if result.json_dict:
print("JSON output:", result.json_dict)
# Access Pydantic model output (if available)
if result.pydantic:
print("Pydantic output:", result.pydantic)
# Access individual task outputs
for j, task_output in enumerate(result.tasks_output):
print(f"Task {j + 1} output:", task_output.raw)
# Access token usage
print("Token usage:", result.token_usage)
# Convert result to dictionary
result_dict = result.to_dict()
print("Result as dictionary:", result_dict)
# String representation of the result
print("String representation:", str(result))
# Run the main function
if __name__ == "__main__":
asyncio.run(main())
```

View File

@@ -181,7 +181,6 @@ class Agent(BaseAgent):
self.agent_executor.tools = parsed_tools
self.agent_executor.task = task
# TODO: COMPARE WITH ARGS AND WITHOUT ARGS
self.agent_executor.tools_description = self._render_text_description_and_args(
parsed_tools
)
@@ -200,11 +199,13 @@ class Agent(BaseAgent):
"tools": self.agent_executor.tools_description,
}
)["output"]
print("Result when things went well:", result)
except Exception as e:
print("FAILED TO EXECUTE TASK", e)
self._times_executed += 1
if self._times_executed > self.max_retry_limit:
raise e
self.execute_task(task, context, tools)
return self.execute_task(task, context, tools)
if self.max_rpm:
self._rpm_controller.stop_rpm_counter()
@@ -216,6 +217,7 @@ class Agent(BaseAgent):
if tool_result.get("result_as_answer", False):
result = tool_result["result"]
print("RESULT TO RETURN", result)
return result
def format_log_to_str(

View File

@@ -56,7 +56,7 @@ class CrewAgentExecutor(AgentExecutor, CrewAgentExecutorMixin):
)
intermediate_steps: List[Tuple[AgentAction, str]] = []
# Allowing human input given task setting
if self.task.human_input:
if self.task and self.task.human_input:
self.should_ask_for_human_input = True
# Let's start tracking the number of iterations and time elapsed

View File

@@ -24,18 +24,6 @@ class CrewOutput(BaseModel):
description="Processed token summary", default={}
)
# TODO: Joao - Adding this safety check breakes when people want to see
# The full output of a CrewOutput.
# @property
# def pydantic(self) -> Optional[BaseModel]:
# # Check if the final task output included a pydantic model
# if self.tasks_output[-1].output_format != OutputFormat.PYDANTIC:
# raise ValueError(
# "No pydantic model found in the final task. Please make sure to set the output_pydantic property in the final task in your crew."
# )
# return self._pydantic
@property
def json(self) -> Optional[str]:
if self.tasks_output[-1].output_format != OutputFormat.JSON:
@@ -46,6 +34,9 @@ class CrewOutput(BaseModel):
return json.dumps(self.json_dict)
def to_dict(self) -> Dict[str, Any]:
print("Crew Output RAW", self.raw)
print("Crew Output JSON", self.json_dict)
print("Crew Output Pydantic", self.pydantic)
if self.json_dict:
return self.json_dict
if self.pydantic:

View File

@@ -12,21 +12,46 @@ class Procedure(BaseModel):
..., description="List of crews to be executed in sequence"
)
async def kickoff(self, inputs: List[Dict[str, Any]]) -> List[CrewOutput]:
def kickoff(self, inputs: List[Dict[str, Any]]) -> List[CrewOutput]:
current_inputs = inputs
for crew in self.crews:
for index, crew in enumerate(self.crews):
# Process all inputs for the current crew
crew_outputs = await self._process_crew(crew, current_inputs)
print("Crew Outputs", crew_outputs)
crew_outputs = self._process_crew(crew, current_inputs)
# Prepare inputs for the next crew
current_inputs = [output.to_dict() for output in crew_outputs]
# If this is not the last crew, prepare inputs for the next crew
if index < len(self.crews) - 1:
current_inputs = [output.to_dict() for output in crew_outputs]
else:
# For the last crew, we don't need to convert the output to input
return crew_outputs
# Return the final outputs
return crew_outputs
async def _process_crew(
async def kickoff_async(self, inputs: List[Dict[str, Any]]) -> List[CrewOutput]:
current_inputs = inputs
for index, crew in enumerate(self.crews):
# Process all inputs for the current crew
crew_outputs = await self._process_crew(crew, current_inputs)
# If this is not the last crew, prepare inputs for the next crew
if index < len(self.crews) - 1:
current_inputs = [output.to_dict() for output in crew_outputs]
else:
# For the last crew, we don't need to convert the output to input
return crew_outputs
return crew_outputs
def _process_crew(
self, crew: Crew, inputs: List[Dict[str, Any]]
) -> List[CrewOutput]:
# Kickoff crew for each input
outputs = [crew.kickoff(inputs=input_data) for input_data in inputs]
return outputs
async def _process_crew_async(
self, crew: Crew, inputs: List[Dict[str, Any]]
) -> List[CrewOutput]:
# Kickoff crew asynchronously for each input

View File

@@ -151,16 +151,12 @@ class ToolUsage:
for k, v in calling.arguments.items()
if k in acceptable_args
}
result = tool._run(**arguments)
result = tool.invoke(input=arguments)
except Exception:
if tool.args_schema:
arguments = calling.arguments
result = tool._run(**arguments)
else:
arguments = calling.arguments.values() # type: ignore # Incompatible types in assignment (expression has type "dict_values[str, Any]", variable has type "dict[str, Any]")
result = tool._run(*arguments)
arguments = calling.arguments
result = tool.invoke(input=arguments)
else:
result = tool._run()
result = tool.invoke(input={})
except Exception as e:
self._run_attempts += 1
if self._run_attempts > self._max_parsing_attempts: