feat: expose messages to TaskOutput and LiteAgentOutputs (#3880)
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Notify Downstream / notify-downstream (push) Has been cancelled

* feat: add messages to task and agent outputs

- Introduced a new  field in  and  to capture messages from the last task execution.
- Updated the  class to store the last messages and provide a property for easy access.
- Enhanced the  and  classes to include messages in their outputs.
- Added tests to ensure that messages are correctly included in task outputs and agent outputs during execution.

* using typing_extensions for 3.10 compatability

* feat: add last_messages attribute to agent for improved task tracking

- Introduced a new `last_messages` attribute in the agent class to store messages from the last task execution.
- Updated the `Crew` class to handle the new messages attribute in task outputs.
- Enhanced existing tests to ensure that the `last_messages` attribute is correctly initialized and utilized across various guardrail scenarios.

* fix: add messages field to TaskOutput in tests for consistency

- Updated multiple test cases to include the new `messages` field in the `TaskOutput` instances.
- Ensured that all relevant tests reflect the latest changes in the TaskOutput structure, maintaining consistency across the test suite.
- This change aligns with the recent addition of the `last_messages` attribute in the agent class for improved task tracking.

* feat: preserve messages in task outputs during replay

- Added functionality to the Crew class to store and retrieve messages in task outputs.
- Enhanced the replay mechanism to ensure that messages from stored task outputs are preserved and accessible.
- Introduced a new test case to verify that messages are correctly stored and replayed, ensuring consistency in task execution and output handling.
- This change improves the overall tracking and context retention of task interactions within the CrewAI framework.

* fix original test, prev was debugging
This commit is contained in:
Lorenze Jay
2025-11-10 17:38:30 -08:00
committed by GitHub
parent 629f7f34ce
commit 6b52587c67
14 changed files with 889 additions and 14 deletions

View File

@@ -340,7 +340,7 @@ def test_sync_task_execution(researcher, writer):
)
mock_task_output = TaskOutput(
description="Mock description", raw="mocked output", agent="mocked agent"
description="Mock description", raw="mocked output", agent="mocked agent", messages=[]
)
# Because we are mocking execute_sync, we never hit the underlying _execute_core
@@ -412,7 +412,7 @@ def test_manager_agent_delegating_to_assigned_task_agent(researcher, writer):
)
mock_task_output = TaskOutput(
description="Mock description", raw="mocked output", agent="mocked agent"
description="Mock description", raw="mocked output", agent="mocked agent", messages=[]
)
# Because we are mocking execute_sync, we never hit the underlying _execute_core
@@ -513,7 +513,7 @@ def test_manager_agent_delegates_with_varied_role_cases():
)
mock_task_output = TaskOutput(
description="Mock description", raw="mocked output", agent="mocked agent"
description="Mock description", raw="mocked output", agent="mocked agent", messages=[]
)
task.output = mock_task_output
@@ -611,7 +611,7 @@ def test_crew_with_delegating_agents_should_not_override_task_tools(ceo, writer)
)
mock_task_output = TaskOutput(
description="Mock description", raw="mocked output", agent="mocked agent"
description="Mock description", raw="mocked output", agent="mocked agent", messages=[]
)
# Because we are mocking execute_sync, we never hit the underlying _execute_core
@@ -669,7 +669,7 @@ def test_crew_with_delegating_agents_should_not_override_agent_tools(ceo, writer
)
mock_task_output = TaskOutput(
description="Mock description", raw="mocked output", agent="mocked agent"
description="Mock description", raw="mocked output", agent="mocked agent", messages=[]
)
# Because we are mocking execute_sync, we never hit the underlying _execute_core
@@ -788,7 +788,7 @@ def test_task_tools_override_agent_tools_with_allow_delegation(researcher, write
)
mock_task_output = TaskOutput(
description="Mock description", raw="mocked output", agent="mocked agent"
description="Mock description", raw="mocked output", agent="mocked agent", messages=[]
)
# We mock execute_sync to verify which tools get used at runtime
@@ -1225,7 +1225,7 @@ async def test_async_task_execution_call_count(researcher, writer):
# Create a valid TaskOutput instance to mock the return value
mock_task_output = TaskOutput(
description="Mock description", raw="mocked output", agent="mocked agent"
description="Mock description", raw="mocked output", agent="mocked agent", messages=[]
)
# Create a MagicMock Future instance
@@ -1784,7 +1784,7 @@ def test_hierarchical_kickoff_usage_metrics_include_manager(researcher):
Task,
"execute_sync",
return_value=TaskOutput(
description="dummy", raw="Hello", agent=researcher.role
description="dummy", raw="Hello", agent=researcher.role, messages=[]
),
):
crew.kickoff()
@@ -1828,7 +1828,7 @@ def test_hierarchical_crew_creation_tasks_with_agents(researcher, writer):
)
mock_task_output = TaskOutput(
description="Mock description", raw="mocked output", agent="mocked agent"
description="Mock description", raw="mocked output", agent="mocked agent", messages=[]
)
# Because we are mocking execute_sync, we never hit the underlying _execute_core
@@ -1881,7 +1881,7 @@ def test_hierarchical_crew_creation_tasks_with_async_execution(researcher, write
)
mock_task_output = TaskOutput(
description="Mock description", raw="mocked output", agent="mocked agent"
description="Mock description", raw="mocked output", agent="mocked agent", messages=[]
)
# Create a mock Future that returns our TaskOutput
@@ -2246,11 +2246,13 @@ def test_conditional_task_uses_last_output(researcher, writer):
description="First task output",
raw="First success output", # Will be used by third task's condition
agent=researcher.role,
messages=[],
)
mock_third = TaskOutput(
description="Third task output",
raw="Third task executed", # Output when condition succeeds using first task output
agent=writer.role,
messages=[],
)
# Set up mocks for task execution and conditional logic
@@ -2318,11 +2320,13 @@ def test_conditional_tasks_result_collection(researcher, writer):
description="Success output",
raw="Success output", # Triggers third task's condition
agent=researcher.role,
messages=[],
)
mock_conditional = TaskOutput(
description="Conditional output",
raw="Conditional task executed",
agent=writer.role,
messages=[],
)
# Set up mocks for task execution and conditional logic
@@ -2399,6 +2403,7 @@ def test_multiple_conditional_tasks(researcher, writer):
description="Mock success",
raw="Success and proceed output",
agent=researcher.role,
messages=[],
)
# Set up mocks for task execution
@@ -2806,7 +2811,7 @@ def test_manager_agent(researcher, writer):
)
mock_task_output = TaskOutput(
description="Mock description", raw="mocked output", agent="mocked agent"
description="Mock description", raw="mocked output", agent="mocked agent", messages=[]
)
# Because we are mocking execute_sync, we never hit the underlying _execute_core
@@ -3001,6 +3006,7 @@ def test_replay_feature(researcher, writer):
output_format=OutputFormat.RAW,
pydantic=None,
summary="Mocked output for list of ideas",
messages=[],
)
crew.kickoff()
@@ -3052,6 +3058,7 @@ def test_crew_task_db_init():
output_format=OutputFormat.RAW,
pydantic=None,
summary="Write about AI in healthcare...",
messages=[],
)
crew.kickoff()
@@ -3114,6 +3121,7 @@ def test_replay_task_with_context():
output_format=OutputFormat.RAW,
pydantic=None,
summary="Detailed report on AI advancements...",
messages=[],
)
mock_task_output2 = TaskOutput(
description="Summarize the AI advancements report.",
@@ -3123,6 +3131,7 @@ def test_replay_task_with_context():
output_format=OutputFormat.RAW,
pydantic=None,
summary="Summary of the AI advancements report...",
messages=[],
)
mock_task_output3 = TaskOutput(
description="Write an article based on the AI advancements summary.",
@@ -3132,6 +3141,7 @@ def test_replay_task_with_context():
output_format=OutputFormat.RAW,
pydantic=None,
summary="Article on AI advancements...",
messages=[],
)
mock_task_output4 = TaskOutput(
description="Create a presentation based on the AI advancements article.",
@@ -3141,6 +3151,7 @@ def test_replay_task_with_context():
output_format=OutputFormat.RAW,
pydantic=None,
summary="Presentation on AI advancements...",
messages=[],
)
with patch.object(Task, "execute_sync") as mock_execute_task:
@@ -3164,6 +3175,70 @@ def test_replay_task_with_context():
db_handler.reset()
@pytest.mark.vcr(filter_headers=["authorization"])
def test_replay_preserves_messages():
"""Test that replay preserves messages from stored task outputs."""
from crewai.utilities.types import LLMMessage
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
allow_delegation=False,
)
task = Task(
description="Say hello",
expected_output="A greeting",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task], process=Process.sequential)
mock_messages: list[LLMMessage] = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Say hello"},
{"role": "assistant", "content": "Hello!"},
]
mock_task_output = TaskOutput(
description="Say hello",
raw="Hello!",
agent="Test Agent",
messages=mock_messages,
)
with patch.object(Task, "execute_sync", return_value=mock_task_output):
crew.kickoff()
# Verify the task output was stored with messages
db_handler = TaskOutputStorageHandler()
stored_outputs = db_handler.load()
assert stored_outputs is not None
assert len(stored_outputs) > 0
# Verify messages are in the stored output
stored_output = stored_outputs[0]["output"]
assert "messages" in stored_output
assert len(stored_output["messages"]) == 3
assert stored_output["messages"][0]["role"] == "system"
assert stored_output["messages"][1]["role"] == "user"
assert stored_output["messages"][2]["role"] == "assistant"
# Replay the task and verify messages are preserved
with patch.object(Task, "execute_sync", return_value=mock_task_output):
replayed_output = crew.replay(str(task.id))
# Verify the replayed task output has messages
assert len(replayed_output.tasks_output) > 0
replayed_task_output = replayed_output.tasks_output[0]
assert hasattr(replayed_task_output, "messages")
assert isinstance(replayed_task_output.messages, list)
assert len(replayed_task_output.messages) == 3
db_handler.reset()
@pytest.mark.vcr(filter_headers=["authorization"])
def test_replay_with_context():
agent = Agent(role="test_agent", backstory="Test Description", goal="Test Goal")
@@ -3181,6 +3256,7 @@ def test_replay_with_context():
pydantic=None,
json_dict={},
output_format=OutputFormat.RAW,
messages=[],
)
task1.output = context_output
@@ -3241,6 +3317,7 @@ def test_replay_with_context_set_to_nullable():
description="Test Task Output",
raw="test raw output",
agent="test_agent",
messages=[],
)
crew.kickoff()
@@ -3264,6 +3341,7 @@ def test_replay_with_invalid_task_id():
pydantic=None,
json_dict={},
output_format=OutputFormat.RAW,
messages=[],
)
task1.output = context_output
@@ -3328,6 +3406,7 @@ def test_replay_interpolates_inputs_properly(mock_interpolate_inputs):
pydantic=None,
json_dict={},
output_format=OutputFormat.RAW,
messages=[],
)
task1.output = context_output
@@ -3386,6 +3465,7 @@ def test_replay_setup_context():
pydantic=None,
json_dict={},
output_format=OutputFormat.RAW,
messages=[],
)
task1.output = context_output
crew = Crew(agents=[agent], tasks=[task1, task2], process=Process.sequential)
@@ -3619,6 +3699,7 @@ def test_conditional_should_skip(researcher, writer):
description="Task 1 description",
raw="Task 1 output",
agent="Researcher",
messages=[],
)
result = crew_met.kickoff()
@@ -3653,6 +3734,7 @@ def test_conditional_should_execute(researcher, writer):
description="Task 1 description",
raw="Task 1 output",
agent="Researcher",
messages=[],
)
crew_met.kickoff()
@@ -3824,7 +3906,7 @@ def test_task_tools_preserve_code_execution_tools():
)
mock_task_output = TaskOutput(
description="Mock description", raw="mocked output", agent="mocked agent"
description="Mock description", raw="mocked output", agent="mocked agent", messages=[]
)
with patch.object(
@@ -3878,7 +3960,7 @@ def test_multimodal_flag_adds_multimodal_tools():
crew = Crew(agents=[multimodal_agent], tasks=[task], process=Process.sequential)
mock_task_output = TaskOutput(
description="Mock description", raw="mocked output", agent="mocked agent"
description="Mock description", raw="mocked output", agent="mocked agent", messages=[]
)
# Mock execute_sync to verify the tools passed at runtime
@@ -3942,6 +4024,7 @@ def test_multimodal_agent_image_tool_handling():
description="Mock description",
raw="A detailed analysis of the image",
agent="Image Analyst",
messages=[],
)
with patch.object(Task, "execute_sync") as mock_execute_sync: