Lorenze/ensure hooks work with lite agents flows (#3981)

* liteagent support hooks

* wip llm.call hooks work - needs tests for this

* fix tests

* fixed more

* more tool hooks test cassettes
This commit is contained in:
Lorenze Jay
2025-12-04 09:38:39 -08:00
committed by GitHub
parent 633e279b51
commit c456e5c5fa
17 changed files with 1640 additions and 53 deletions

View File

@@ -246,6 +246,11 @@ class GeminiCompletion(BaseLLM):
messages
)
messages_for_hooks = self._convert_contents_to_dict(formatted_content)
if not self._invoke_before_llm_call_hooks(messages_for_hooks, from_agent):
raise ValueError("LLM call blocked by before_llm_call hook")
config = self._prepare_generation_config(
system_instruction, tools, response_model
)
@@ -559,7 +564,9 @@ class GeminiCompletion(BaseLLM):
messages=messages_for_event,
)
return content
return self._invoke_after_llm_call_hooks(
messages_for_event, content, from_agent
)
def _handle_streaming_completion(
self,
@@ -639,7 +646,9 @@ class GeminiCompletion(BaseLLM):
messages=messages_for_event,
)
return full_response
return self._invoke_after_llm_call_hooks(
messages_for_event, full_response, from_agent
)
async def _ahandle_completion(
self,
@@ -787,7 +796,159 @@ class GeminiCompletion(BaseLLM):
messages=messages_for_event,
)
return full_response
return self._invoke_after_llm_call_hooks(
messages_for_event, full_response, from_agent
)
async def _ahandle_completion(
self,
contents: list[types.Content],
system_instruction: str | None,
config: types.GenerateContentConfig,
available_functions: dict[str, Any] | None = None,
from_task: Any | None = None,
from_agent: Any | None = None,
response_model: type[BaseModel] | None = None,
) -> str | Any:
"""Handle async non-streaming content generation."""
try:
# The API accepts list[Content] but mypy is overly strict about variance
contents_for_api: Any = contents
response = await self.client.aio.models.generate_content(
model=self.model,
contents=contents_for_api,
config=config,
)
usage = self._extract_token_usage(response)
except Exception as e:
if is_context_length_exceeded(e):
logging.error(f"Context window exceeded: {e}")
raise LLMContextLengthExceededError(str(e)) from e
raise e from e
self._track_token_usage_internal(usage)
if response.candidates and (self.tools or available_functions):
candidate = response.candidates[0]
if candidate.content and candidate.content.parts:
for part in candidate.content.parts:
if hasattr(part, "function_call") and part.function_call:
function_name = part.function_call.name
if function_name is None:
continue
function_args = (
dict(part.function_call.args)
if part.function_call.args
else {}
)
result = self._handle_tool_execution(
function_name=function_name,
function_args=function_args,
available_functions=available_functions or {},
from_task=from_task,
from_agent=from_agent,
)
if result is not None:
return result
content = response.text or ""
content = self._apply_stop_words(content)
messages_for_event = self._convert_contents_to_dict(contents)
self._emit_call_completed_event(
response=content,
call_type=LLMCallType.LLM_CALL,
from_task=from_task,
from_agent=from_agent,
messages=messages_for_event,
)
return content
async def _ahandle_streaming_completion(
self,
contents: list[types.Content],
config: types.GenerateContentConfig,
available_functions: dict[str, Any] | None = None,
from_task: Any | None = None,
from_agent: Any | None = None,
response_model: type[BaseModel] | None = None,
) -> str:
"""Handle async streaming content generation."""
full_response = ""
function_calls: dict[str, dict[str, Any]] = {}
# The API accepts list[Content] but mypy is overly strict about variance
contents_for_api: Any = contents
stream = await self.client.aio.models.generate_content_stream(
model=self.model,
contents=contents_for_api,
config=config,
)
async for chunk in stream:
if chunk.text:
full_response += chunk.text
self._emit_stream_chunk_event(
chunk=chunk.text,
from_task=from_task,
from_agent=from_agent,
)
if chunk.candidates:
candidate = chunk.candidates[0]
if candidate.content and candidate.content.parts:
for part in candidate.content.parts:
if hasattr(part, "function_call") and part.function_call:
call_id = part.function_call.name or "default"
if call_id not in function_calls:
function_calls[call_id] = {
"name": part.function_call.name,
"args": dict(part.function_call.args)
if part.function_call.args
else {},
}
if function_calls and available_functions:
for call_data in function_calls.values():
function_name = call_data["name"]
function_args = call_data["args"]
# Skip if function_name is None
if not isinstance(function_name, str):
continue
# Ensure function_args is a dict
if not isinstance(function_args, dict):
function_args = {}
result = self._handle_tool_execution(
function_name=function_name,
function_args=function_args,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
if result is not None:
return result
messages_for_event = self._convert_contents_to_dict(contents)
self._emit_call_completed_event(
response=full_response,
call_type=LLMCallType.LLM_CALL,
from_task=from_task,
from_agent=from_agent,
messages=messages_for_event,
)
return self._invoke_after_llm_call_hooks(
messages_for_event, full_response, from_agent
)
def supports_function_calling(self) -> bool:
"""Check if the model supports function calling."""
@@ -851,7 +1012,7 @@ class GeminiCompletion(BaseLLM):
def _convert_contents_to_dict(
self,
contents: list[types.Content],
) -> list[dict[str, str]]:
) -> list[LLMMessage]:
"""Convert contents to dict format."""
result: list[dict[str, str]] = []
for content_obj in contents: