feat: implement knowledge retrieval events in Agent (#2727)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled

* feat: implement knowledge retrieval events in Agent

This commit introduces a series of knowledge retrieval events in the Agent class, enhancing its ability to handle knowledge queries. New events include KnowledgeRetrievalStartedEvent, KnowledgeRetrievalCompletedEvent, KnowledgeQueryGeneratedEvent, KnowledgeQueryFailedEvent, and KnowledgeSearchQueryCompletedEvent. The Agent now emits these events during knowledge retrieval processes, allowing for better tracking and handling of knowledge queries. Additionally, the console formatter has been updated to handle these new events, providing visual feedback during knowledge retrieval operations.

* refactor: update knowledge query handling in Agent

This commit refines the knowledge query processing in the Agent class by renaming variables for clarity and optimizing the query rewriting logic. The system prompt has been updated in the translation file to enhance clarity and context for the query rewriting process. These changes aim to improve the overall readability and maintainability of the code.

* fix: add missing newline at end of en.json file

* fix broken tests

* refactor: rename knowledge query events and enhance retrieval handling

This commit renames the KnowledgeQueryGeneratedEvent to KnowledgeQueryStartedEvent to better reflect its purpose. It also updates the event handling in the EventListener and ConsoleFormatter classes to accommodate the new event structure. Additionally, the retrieval knowledge is now included in the KnowledgeRetrievalCompletedEvent, improving the overall knowledge retrieval process.

* docs for transparancy

* refactor: improve error handling in knowledge query processing

This commit refactors the knowledge query handling in the Agent class by changing the order of checks for LLM compatibility. It now logs a warning and emits a failure event if the LLM is not an instance of BaseLLM before attempting to call the LLM. Additionally, the task_prompt attribute has been removed from the KnowledgeQueryFailedEvent, simplifying the event structure.

* test: add unit test for knowledge search query and VCR cassette

This commit introduces a new test, `test_get_knowledge_search_query`, to verify that the `_get_knowledge_search_query` method in the Agent class correctly interacts with the LLM using the appropriate prompts. Additionally, a VCR cassette is added to record the interactions with the OpenAI API for this test, ensuring consistent and reliable test results.
This commit is contained in:
Lorenze Jay
2025-05-07 11:55:42 -07:00
committed by GitHub
parent e3887ae36e
commit 7ad51d9d05
13 changed files with 2776 additions and 465 deletions

View File

@@ -31,6 +31,14 @@ from crewai.utilities.events.agent_events import (
AgentExecutionStartedEvent,
)
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.events.knowledge_events import (
KnowledgeQueryCompletedEvent,
KnowledgeQueryFailedEvent,
KnowledgeQueryStartedEvent,
KnowledgeRetrievalCompletedEvent,
KnowledgeRetrievalStartedEvent,
KnowledgeSearchQueryFailedEvent,
)
from crewai.utilities.llm_utils import create_llm
from crewai.utilities.token_counter_callback import TokenCalcHandler
from crewai.utilities.training_handler import CrewTrainingHandler
@@ -122,6 +130,10 @@ class Agent(BaseAgent):
default=None,
description="Knowledge context for the crew.",
)
knowledge_search_query: Optional[str] = Field(
default=None,
description="Knowledge search query for the agent dynamically generated by the agent.",
)
@model_validator(mode="after")
def post_init_setup(self):
@@ -185,7 +197,7 @@ class Agent(BaseAgent):
self,
task: Task,
context: Optional[str] = None,
tools: Optional[List[BaseTool]] = None
tools: Optional[List[BaseTool]] = None,
) -> str:
"""Execute a task with the agent.
@@ -245,27 +257,65 @@ class Agent(BaseAgent):
knowledge_config = (
self.knowledge_config.model_dump() if self.knowledge_config else {}
)
if self.knowledge:
agent_knowledge_snippets = self.knowledge.query(
[task.prompt()], **knowledge_config
)
if agent_knowledge_snippets:
self.agent_knowledge_context = extract_knowledge_context(
agent_knowledge_snippets
)
if self.agent_knowledge_context:
task_prompt += self.agent_knowledge_context
if self.crew:
knowledge_snippets = self.crew.query_knowledge(
[task.prompt()], **knowledge_config
if self.knowledge:
crewai_event_bus.emit(
self,
event=KnowledgeRetrievalStartedEvent(
agent=self,
),
)
if knowledge_snippets:
self.crew_knowledge_context = extract_knowledge_context(
knowledge_snippets
try:
self.knowledge_search_query = self._get_knowledge_search_query(
task_prompt
)
if self.knowledge_search_query:
agent_knowledge_snippets = self.knowledge.query(
[self.knowledge_search_query], **knowledge_config
)
if agent_knowledge_snippets:
self.agent_knowledge_context = extract_knowledge_context(
agent_knowledge_snippets
)
if self.agent_knowledge_context:
task_prompt += self.agent_knowledge_context
if self.crew:
knowledge_snippets = self.crew.query_knowledge(
[self.knowledge_search_query], **knowledge_config
)
if knowledge_snippets:
self.crew_knowledge_context = extract_knowledge_context(
knowledge_snippets
)
if self.crew_knowledge_context:
task_prompt += self.crew_knowledge_context
crewai_event_bus.emit(
self,
event=KnowledgeRetrievalCompletedEvent(
query=self.knowledge_search_query,
agent=self,
retrieved_knowledge=(
(self.agent_knowledge_context or "")
+ (
"\n"
if self.agent_knowledge_context
and self.crew_knowledge_context
else ""
)
+ (self.crew_knowledge_context or "")
),
),
)
except Exception as e:
crewai_event_bus.emit(
self,
event=KnowledgeSearchQueryFailedEvent(
query=self.knowledge_search_query or "",
agent=self,
error=str(e),
),
)
if self.crew_knowledge_context:
task_prompt += self.crew_knowledge_context
tools = tools or self.tools or []
self.create_agent_executor(tools=tools, task=task)
@@ -288,12 +338,19 @@ class Agent(BaseAgent):
# Determine execution method based on timeout setting
if self.max_execution_time is not None:
if not isinstance(self.max_execution_time, int) or self.max_execution_time <= 0:
raise ValueError("Max Execution time must be a positive integer greater than zero")
result = self._execute_with_timeout(task_prompt, task, self.max_execution_time)
if (
not isinstance(self.max_execution_time, int)
or self.max_execution_time <= 0
):
raise ValueError(
"Max Execution time must be a positive integer greater than zero"
)
result = self._execute_with_timeout(
task_prompt, task, self.max_execution_time
)
else:
result = self._execute_without_timeout(task_prompt, task)
except TimeoutError as e:
# Propagate TimeoutError without retry
crewai_event_bus.emit(
@@ -345,54 +402,46 @@ class Agent(BaseAgent):
)
return result
def _execute_with_timeout(
self,
task_prompt: str,
task: Task,
timeout: int
) -> str:
def _execute_with_timeout(self, task_prompt: str, task: Task, timeout: int) -> str:
"""Execute a task with a timeout.
Args:
task_prompt: The prompt to send to the agent.
task: The task being executed.
timeout: Maximum execution time in seconds.
Returns:
The output of the agent.
Raises:
TimeoutError: If execution exceeds the timeout.
RuntimeError: If execution fails for other reasons.
"""
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(
self._execute_without_timeout,
task_prompt=task_prompt,
task=task
self._execute_without_timeout, task_prompt=task_prompt, task=task
)
try:
return future.result(timeout=timeout)
except concurrent.futures.TimeoutError:
future.cancel()
raise TimeoutError(f"Task '{task.description}' execution timed out after {timeout} seconds. Consider increasing max_execution_time or optimizing the task.")
raise TimeoutError(
f"Task '{task.description}' execution timed out after {timeout} seconds. Consider increasing max_execution_time or optimizing the task."
)
except Exception as e:
future.cancel()
raise RuntimeError(f"Task execution failed: {str(e)}")
def _execute_without_timeout(
self,
task_prompt: str,
task: Task
) -> str:
def _execute_without_timeout(self, task_prompt: str, task: Task) -> str:
"""Execute a task without a timeout.
Args:
task_prompt: The prompt to send to the agent.
task: The task being executed.
Returns:
The output of the agent.
"""
@@ -560,6 +609,61 @@ class Agent(BaseAgent):
def set_fingerprint(self, fingerprint: Fingerprint):
self.security_config.fingerprint = fingerprint
def _get_knowledge_search_query(self, task_prompt: str) -> str | None:
"""Generate a search query for the knowledge base based on the task description."""
crewai_event_bus.emit(
self,
event=KnowledgeQueryStartedEvent(
task_prompt=task_prompt,
agent=self,
),
)
query = self.i18n.slice("knowledge_search_query").format(
task_prompt=task_prompt
)
rewriter_prompt = self.i18n.slice("knowledge_search_query_system_prompt")
if not isinstance(self.llm, BaseLLM):
self._logger.log(
"warning",
f"Knowledge search query failed: LLM for agent '{self.role}' is not an instance of BaseLLM",
)
crewai_event_bus.emit(
self,
event=KnowledgeQueryFailedEvent(
agent=self,
error="LLM is not compatible with knowledge search queries",
),
)
return None
try:
rewritten_query = self.llm.call(
[
{
"role": "system",
"content": rewriter_prompt,
},
{"role": "user", "content": query},
]
)
crewai_event_bus.emit(
self,
event=KnowledgeQueryCompletedEvent(
query=query,
agent=self,
),
)
return rewritten_query
except Exception as e:
crewai_event_bus.emit(
self,
event=KnowledgeQueryFailedEvent(
agent=self,
error=str(e),
),
)
return None
def kickoff(
self,
messages: Union[str, List[Dict[str, str]]],

View File

@@ -27,7 +27,9 @@
"feedback_instructions": "User feedback: {feedback}\nInstructions: Use this feedback to enhance the next output iteration.\nNote: Do not respond or add commentary.",
"lite_agent_system_prompt_with_tools": "You are {role}. {backstory}\nYour personal goal is: {goal}\n\nYou ONLY have access to the following tools, and should NEVER make up tools that are not listed here:\n\n{tools}\n\nIMPORTANT: Use the following format in your response:\n\n```\nThought: you should always think about what to do\nAction: the action to take, only one name of [{tool_names}], just the name, exactly as it's written.\nAction Input: the input to the action, just a simple JSON object, enclosed in curly braces, using \" to wrap keys and values.\nObservation: the result of the action\n```\n\nOnce all necessary information is gathered, return the following format:\n\n```\nThought: I now know the final answer\nFinal Answer: the final answer to the original input question\n```",
"lite_agent_system_prompt_without_tools": "You are {role}. {backstory}\nYour personal goal is: {goal}\n\nTo give my best complete final answer to the task respond using the exact following format:\n\nThought: I now can give a great answer\nFinal Answer: Your final answer must be the great and the most complete as possible, it must be outcome described.\n\nI MUST use these formats, my job depends on it!",
"lite_agent_response_format": "\nIMPORTANT: Your final answer MUST contain all the information requested in the following format: {response_format}\n\nIMPORTANT: Ensure the final output does not include any code block markers like ```json or ```python."
"lite_agent_response_format": "\nIMPORTANT: Your final answer MUST contain all the information requested in the following format: {response_format}\n\nIMPORTANT: Ensure the final output does not include any code block markers like ```json or ```python.",
"knowledge_search_query": "The original query is: {task_prompt}.",
"knowledge_search_query_system_prompt": "Your goal is to rewrite the user query so that it is optimized for retrieval from a vector database. Consider how the query will be used to find relevant documents, and aim to make it more specific and context-aware. \n\n Do not include any other text than the rewritten query, especially any preamble or postamble and only add expected output format if its relevant to the rewritten query. \n\n Focus on the key words of the intended task and to retrieve the most relevant information. \n\n There will be some extra context provided that might need to be removed such as expected_output formats structured_outputs and other instructions."
},
"errors": {
"force_final_answer_error": "You can't keep going, here is the best final answer you generated:\n\n {formatted_answer}",

View File

@@ -8,6 +8,14 @@ from crewai.telemetry.telemetry import Telemetry
from crewai.utilities import Logger
from crewai.utilities.constants import EMITTER_COLOR
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events.knowledge_events import (
KnowledgeQueryCompletedEvent,
KnowledgeQueryFailedEvent,
KnowledgeQueryStartedEvent,
KnowledgeRetrievalCompletedEvent,
KnowledgeRetrievalStartedEvent,
KnowledgeSearchQueryFailedEvent,
)
from crewai.utilities.events.llm_events import (
LLMCallCompletedEvent,
LLMCallFailedEvent,
@@ -57,6 +65,8 @@ class EventListener(BaseEventListener):
execution_spans: Dict[Task, Any] = Field(default_factory=dict)
next_chunk = 0
text_stream = StringIO()
knowledge_retrieval_in_progress = False
knowledge_query_in_progress = False
def __new__(cls):
if cls._instance is None:
@@ -342,5 +352,59 @@ class EventListener(BaseEventListener):
def on_crew_test_failed(source, event: CrewTestFailedEvent):
self.formatter.handle_crew_test_failed(event.crew_name or "Crew")
@crewai_event_bus.on(KnowledgeRetrievalStartedEvent)
def on_knowledge_retrieval_started(
source, event: KnowledgeRetrievalStartedEvent
):
if self.knowledge_retrieval_in_progress:
return
self.knowledge_retrieval_in_progress = True
self.formatter.handle_knowledge_retrieval_started(
self.formatter.current_agent_branch,
self.formatter.current_crew_tree,
)
@crewai_event_bus.on(KnowledgeRetrievalCompletedEvent)
def on_knowledge_retrieval_completed(
source, event: KnowledgeRetrievalCompletedEvent
):
if not self.knowledge_retrieval_in_progress:
return
self.knowledge_retrieval_in_progress = False
self.formatter.handle_knowledge_retrieval_completed(
self.formatter.current_agent_branch,
self.formatter.current_crew_tree,
event.retrieved_knowledge,
)
@crewai_event_bus.on(KnowledgeQueryStartedEvent)
def on_knowledge_query_started(source, event: KnowledgeQueryStartedEvent):
pass
@crewai_event_bus.on(KnowledgeQueryFailedEvent)
def on_knowledge_query_failed(source, event: KnowledgeQueryFailedEvent):
self.formatter.handle_knowledge_query_failed(
self.formatter.current_agent_branch,
event.error,
self.formatter.current_crew_tree,
)
@crewai_event_bus.on(KnowledgeQueryCompletedEvent)
def on_knowledge_query_completed(source, event: KnowledgeQueryCompletedEvent):
pass
@crewai_event_bus.on(KnowledgeSearchQueryFailedEvent)
def on_knowledge_search_query_failed(
source, event: KnowledgeSearchQueryFailedEvent
):
self.formatter.handle_knowledge_search_query_failed(
self.formatter.current_agent_branch,
event.error,
self.formatter.current_crew_tree,
)
event_listener = EventListener()

View File

@@ -0,0 +1,56 @@
from typing import TYPE_CHECKING, Any
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.utilities.events.base_events import BaseEvent
if TYPE_CHECKING:
from crewai.agents.agent_builder.base_agent import BaseAgent
class KnowledgeRetrievalStartedEvent(BaseEvent):
"""Event emitted when a knowledge retrieval is started."""
type: str = "knowledge_search_query_started"
agent: BaseAgent
class KnowledgeRetrievalCompletedEvent(BaseEvent):
"""Event emitted when a knowledge retrieval is completed."""
query: str
type: str = "knowledge_search_query_completed"
agent: BaseAgent
retrieved_knowledge: Any
class KnowledgeQueryStartedEvent(BaseEvent):
"""Event emitted when a knowledge query is started."""
task_prompt: str
type: str = "knowledge_query_started"
agent: BaseAgent
class KnowledgeQueryFailedEvent(BaseEvent):
"""Event emitted when a knowledge query fails."""
type: str = "knowledge_query_failed"
agent: BaseAgent
error: str
class KnowledgeQueryCompletedEvent(BaseEvent):
"""Event emitted when a knowledge query is completed."""
query: str
type: str = "knowledge_query_completed"
agent: BaseAgent
class KnowledgeSearchQueryFailedEvent(BaseEvent):
"""Event emitted when a knowledge search query fails."""
query: str
type: str = "knowledge_search_query_failed"
agent: BaseAgent
error: str

View File

@@ -783,3 +783,202 @@ class ConsoleFormatter:
self.update_lite_agent_status(
self.current_lite_agent_branch, lite_agent_role, status, **fields
)
def handle_knowledge_retrieval_started(
self,
agent_branch: Optional[Tree],
crew_tree: Optional[Tree],
) -> Optional[Tree]:
"""Handle knowledge retrieval started event."""
if not self.verbose:
return None
branch_to_use = agent_branch or self.current_lite_agent_branch
tree_to_use = branch_to_use or crew_tree
if branch_to_use is None or tree_to_use is None:
# If we don't have a valid branch, use crew_tree as the branch if available
if crew_tree is not None:
branch_to_use = tree_to_use = crew_tree
else:
return None
knowledge_branch = branch_to_use.add("")
self.update_tree_label(
knowledge_branch, "🔍", "Knowledge Retrieval Started", "blue"
)
self.print(tree_to_use)
self.print()
return knowledge_branch
def handle_knowledge_retrieval_completed(
self,
agent_branch: Optional[Tree],
crew_tree: Optional[Tree],
retrieved_knowledge: Any,
) -> None:
"""Handle knowledge retrieval completed event."""
if not self.verbose:
return None
branch_to_use = self.current_lite_agent_branch or agent_branch
tree_to_use = branch_to_use or crew_tree
if branch_to_use is None and tree_to_use is not None:
branch_to_use = tree_to_use
if branch_to_use is None or tree_to_use is None:
if retrieved_knowledge:
knowledge_text = str(retrieved_knowledge)
if len(knowledge_text) > 500:
knowledge_text = knowledge_text[:497] + "..."
knowledge_panel = Panel(
Text(knowledge_text, style="white"),
title="📚 Retrieved Knowledge",
border_style="green",
padding=(1, 2),
)
self.print(knowledge_panel)
self.print()
return None
knowledge_branch_found = False
for child in branch_to_use.children:
if "Knowledge Retrieval Started" in str(child.label):
self.update_tree_label(
child, "", "Knowledge Retrieval Completed", "green"
)
knowledge_branch_found = True
break
if not knowledge_branch_found:
for child in branch_to_use.children:
if (
"Knowledge Retrieval" in str(child.label)
and "Started" not in str(child.label)
and "Completed" not in str(child.label)
):
self.update_tree_label(
child, "", "Knowledge Retrieval Completed", "green"
)
knowledge_branch_found = True
break
if not knowledge_branch_found:
knowledge_branch = branch_to_use.add("")
self.update_tree_label(
knowledge_branch, "", "Knowledge Retrieval Completed", "green"
)
self.print(tree_to_use)
if retrieved_knowledge:
knowledge_text = str(retrieved_knowledge)
if len(knowledge_text) > 500:
knowledge_text = knowledge_text[:497] + "..."
knowledge_panel = Panel(
Text(knowledge_text, style="white"),
title="📚 Retrieved Knowledge",
border_style="green",
padding=(1, 2),
)
self.print(knowledge_panel)
self.print()
def handle_knowledge_query_started(
self,
agent_branch: Optional[Tree],
task_prompt: str,
crew_tree: Optional[Tree],
) -> None:
"""Handle knowledge query generated event."""
if not self.verbose:
return None
branch_to_use = self.current_lite_agent_branch or agent_branch
tree_to_use = branch_to_use or crew_tree
if branch_to_use is None or tree_to_use is None:
return None
query_branch = branch_to_use.add("")
self.update_tree_label(
query_branch, "🔎", f"Query: {task_prompt[:50]}...", "yellow"
)
self.print(tree_to_use)
self.print()
def handle_knowledge_query_failed(
self,
agent_branch: Optional[Tree],
error: str,
crew_tree: Optional[Tree],
) -> None:
"""Handle knowledge query failed event."""
if not self.verbose:
return
tree_to_use = self.current_lite_agent_branch or crew_tree
branch_to_use = self.current_lite_agent_branch or agent_branch
if branch_to_use and tree_to_use:
query_branch = branch_to_use.add("")
self.update_tree_label(query_branch, "", "Knowledge Query Failed", "red")
self.print(tree_to_use)
self.print()
# Show error panel
error_content = self.create_status_content(
"Knowledge Query Failed", "Query Error", "red", Error=error
)
self.print_panel(error_content, "Knowledge Error", "red")
def handle_knowledge_query_completed(
self,
agent_branch: Optional[Tree],
crew_tree: Optional[Tree],
) -> None:
"""Handle knowledge query completed event."""
if not self.verbose:
return None
branch_to_use = self.current_lite_agent_branch or agent_branch
tree_to_use = branch_to_use or crew_tree
if branch_to_use is None or tree_to_use is None:
return None
query_branch = branch_to_use.add("")
self.update_tree_label(query_branch, "", "Knowledge Query Completed", "green")
self.print(tree_to_use)
self.print()
def handle_knowledge_search_query_failed(
self,
agent_branch: Optional[Tree],
error: str,
crew_tree: Optional[Tree],
) -> None:
"""Handle knowledge search query failed event."""
if not self.verbose:
return
tree_to_use = self.current_lite_agent_branch or crew_tree
branch_to_use = self.current_lite_agent_branch or agent_branch
if branch_to_use and tree_to_use:
query_branch = branch_to_use.add("")
self.update_tree_label(query_branch, "", "Knowledge Search Failed", "red")
self.print(tree_to_use)
self.print()
# Show error panel
error_content = self.create_status_content(
"Knowledge Search Failed", "Search Error", "red", Error=error
)
self.print_panel(error_content, "Search Error", "red")