mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-05-02 15:52:34 +00:00
feat: support to filter LLM Events from Lite Agent
This commit is contained in:
@@ -420,6 +420,7 @@ class LLM(BaseLLM):
|
||||
callbacks: Optional[List[Any]] = None,
|
||||
available_functions: Optional[Dict[str, Any]] = None,
|
||||
from_task: Optional[Any] = None,
|
||||
from_agent: Optional[Any] = None,
|
||||
) -> str:
|
||||
"""Handle a streaming response from the LLM.
|
||||
|
||||
@@ -427,6 +428,8 @@ class LLM(BaseLLM):
|
||||
params: Parameters for the completion call
|
||||
callbacks: Optional list of callback functions
|
||||
available_functions: Dict of available functions
|
||||
from_task: Optional task object
|
||||
from_agent: Optional agent object
|
||||
|
||||
Returns:
|
||||
str: The complete response text
|
||||
@@ -512,6 +515,7 @@ class LLM(BaseLLM):
|
||||
accumulated_tool_args=accumulated_tool_args,
|
||||
available_functions=available_functions,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
)
|
||||
if result is not None:
|
||||
chunk_content = result
|
||||
@@ -529,7 +533,7 @@ class LLM(BaseLLM):
|
||||
assert hasattr(crewai_event_bus, "emit")
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=LLMStreamChunkEvent(chunk=chunk_content, from_task=from_task),
|
||||
event=LLMStreamChunkEvent(chunk=chunk_content, from_task=from_task, from_agent=from_agent),
|
||||
)
|
||||
# --- 4) Fallback to non-streaming if no content received
|
||||
if not full_response.strip() and chunk_count == 0:
|
||||
@@ -542,7 +546,7 @@ class LLM(BaseLLM):
|
||||
"stream_options", None
|
||||
) # Remove stream_options for non-streaming call
|
||||
return self._handle_non_streaming_response(
|
||||
non_streaming_params, callbacks, available_functions, from_task
|
||||
non_streaming_params, callbacks, available_functions, from_task, from_agent
|
||||
)
|
||||
|
||||
# --- 5) Handle empty response with chunks
|
||||
@@ -627,7 +631,7 @@ class LLM(BaseLLM):
|
||||
# Log token usage if available in streaming mode
|
||||
self._handle_streaming_callbacks(callbacks, usage_info, last_chunk)
|
||||
# Emit completion event and return response
|
||||
self._handle_emit_call_events(full_response, LLMCallType.LLM_CALL, from_task)
|
||||
self._handle_emit_call_events(full_response, LLMCallType.LLM_CALL, from_task, from_agent)
|
||||
return full_response
|
||||
|
||||
# --- 9) Handle tool calls if present
|
||||
@@ -639,7 +643,7 @@ class LLM(BaseLLM):
|
||||
self._handle_streaming_callbacks(callbacks, usage_info, last_chunk)
|
||||
|
||||
# --- 11) Emit completion event and return response
|
||||
self._handle_emit_call_events(full_response, LLMCallType.LLM_CALL, from_task)
|
||||
self._handle_emit_call_events(full_response, LLMCallType.LLM_CALL, from_task, from_agent)
|
||||
return full_response
|
||||
|
||||
except ContextWindowExceededError as e:
|
||||
@@ -651,14 +655,14 @@ class LLM(BaseLLM):
|
||||
logging.error(f"Error in streaming response: {str(e)}")
|
||||
if full_response.strip():
|
||||
logging.warning(f"Returning partial response despite error: {str(e)}")
|
||||
self._handle_emit_call_events(full_response, LLMCallType.LLM_CALL, from_task)
|
||||
self._handle_emit_call_events(full_response, LLMCallType.LLM_CALL, from_task, from_agent)
|
||||
return full_response
|
||||
|
||||
# Emit failed event and re-raise the exception
|
||||
assert hasattr(crewai_event_bus, "emit")
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=LLMCallFailedEvent(error=str(e), from_task=from_task),
|
||||
event=LLMCallFailedEvent(error=str(e), from_task=from_task, from_agent=from_agent),
|
||||
)
|
||||
raise Exception(f"Failed to get streaming response: {str(e)}")
|
||||
|
||||
@@ -668,6 +672,7 @@ class LLM(BaseLLM):
|
||||
accumulated_tool_args: DefaultDict[int, AccumulatedToolArgs],
|
||||
available_functions: Optional[Dict[str, Any]] = None,
|
||||
from_task: Optional[Any] = None,
|
||||
from_agent: Optional[Any] = None,
|
||||
) -> None | str:
|
||||
for tool_call in tool_calls:
|
||||
current_tool_accumulator = accumulated_tool_args[tool_call.index]
|
||||
@@ -686,6 +691,7 @@ class LLM(BaseLLM):
|
||||
tool_call=tool_call.to_dict(),
|
||||
chunk=tool_call.function.arguments,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
),
|
||||
)
|
||||
|
||||
@@ -753,6 +759,7 @@ class LLM(BaseLLM):
|
||||
callbacks: Optional[List[Any]] = None,
|
||||
available_functions: Optional[Dict[str, Any]] = None,
|
||||
from_task: Optional[Any] = None,
|
||||
from_agent: Optional[Any] = None,
|
||||
) -> str:
|
||||
"""Handle a non-streaming response from the LLM.
|
||||
|
||||
@@ -761,6 +768,7 @@ class LLM(BaseLLM):
|
||||
callbacks: Optional list of callback functions
|
||||
available_functions: Dict of available functions
|
||||
from_task: Optional Task that invoked the LLM
|
||||
from_agent: Optional Agent that invoked the LLM
|
||||
|
||||
Returns:
|
||||
str: The response text
|
||||
@@ -801,7 +809,7 @@ class LLM(BaseLLM):
|
||||
|
||||
# --- 5) If no tool calls or no available functions, return the text response directly
|
||||
if not tool_calls or not available_functions:
|
||||
self._handle_emit_call_events(text_response, LLMCallType.LLM_CALL, from_task)
|
||||
self._handle_emit_call_events(text_response, LLMCallType.LLM_CALL, from_task, from_agent)
|
||||
return text_response
|
||||
|
||||
# --- 6) Handle tool calls if present
|
||||
@@ -810,7 +818,7 @@ class LLM(BaseLLM):
|
||||
return tool_result
|
||||
|
||||
# --- 7) If tool call handling didn't return a result, emit completion event and return text response
|
||||
self._handle_emit_call_events(text_response, LLMCallType.LLM_CALL, from_task)
|
||||
self._handle_emit_call_events(text_response, LLMCallType.LLM_CALL, from_task, from_agent)
|
||||
return text_response
|
||||
|
||||
def _handle_tool_call(
|
||||
@@ -896,6 +904,7 @@ class LLM(BaseLLM):
|
||||
callbacks: Optional[List[Any]] = None,
|
||||
available_functions: Optional[Dict[str, Any]] = None,
|
||||
from_task: Optional[Any] = None,
|
||||
from_agent: Optional[Any] = None,
|
||||
) -> Union[str, Any]:
|
||||
"""High-level LLM call method.
|
||||
|
||||
@@ -911,6 +920,7 @@ class LLM(BaseLLM):
|
||||
available_functions: Optional dict mapping function names to callables
|
||||
that can be invoked by the LLM.
|
||||
from_task: Optional Task that invoked the LLM
|
||||
from_agent: Optional Agent that invoked the LLM
|
||||
|
||||
Returns:
|
||||
Union[str, Any]: Either a text response from the LLM (str) or
|
||||
@@ -931,6 +941,7 @@ class LLM(BaseLLM):
|
||||
callbacks=callbacks,
|
||||
available_functions=available_functions,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
),
|
||||
)
|
||||
|
||||
@@ -959,11 +970,11 @@ class LLM(BaseLLM):
|
||||
# --- 7) Make the completion call and handle response
|
||||
if self.stream:
|
||||
return self._handle_streaming_response(
|
||||
params, callbacks, available_functions, from_task
|
||||
params, callbacks, available_functions, from_task, from_agent
|
||||
)
|
||||
else:
|
||||
return self._handle_non_streaming_response(
|
||||
params, callbacks, available_functions, from_task
|
||||
params, callbacks, available_functions, from_task, from_agent
|
||||
)
|
||||
|
||||
except LLMContextLengthExceededException:
|
||||
@@ -975,12 +986,12 @@ class LLM(BaseLLM):
|
||||
assert hasattr(crewai_event_bus, "emit")
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=LLMCallFailedEvent(error=str(e), from_task=from_task),
|
||||
event=LLMCallFailedEvent(error=str(e), from_task=from_task, from_agent=from_agent),
|
||||
)
|
||||
logging.error(f"LiteLLM call failed: {str(e)}")
|
||||
raise
|
||||
|
||||
def _handle_emit_call_events(self, response: Any, call_type: LLMCallType, from_task: Optional[Any] = None):
|
||||
def _handle_emit_call_events(self, response: Any, call_type: LLMCallType, from_task: Optional[Any] = None, from_agent: Optional[Any] = None):
|
||||
"""Handle the events for the LLM call.
|
||||
|
||||
Args:
|
||||
@@ -990,7 +1001,7 @@ class LLM(BaseLLM):
|
||||
assert hasattr(crewai_event_bus, "emit")
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=LLMCallCompletedEvent(response=response, call_type=call_type, from_task=from_task),
|
||||
event=LLMCallCompletedEvent(response=response, call_type=call_type, from_task=from_task, from_agent=from_agent),
|
||||
)
|
||||
|
||||
def _format_messages_for_provider(
|
||||
|
||||
Reference in New Issue
Block a user