add tests

This commit is contained in:
Brandon Hancock
2025-03-03 13:21:43 -05:00
parent 143832bd8b
commit 26e6106fe2
5 changed files with 758 additions and 0 deletions

View File

@@ -38,6 +38,7 @@ from crewai.utilities.events.llm_events import (
LLMCallFailedEvent,
LLMCallStartedEvent,
LLMCallType,
LLMStreamChunkEvent,
)
from crewai.utilities.events.task_events import (
TaskCompletedEvent,
@@ -615,3 +616,233 @@ def test_llm_emits_call_failed_event():
assert len(received_events) == 1
assert received_events[0].type == "llm_call_failed"
assert received_events[0].error == error_message
@pytest.mark.vcr(filter_headers=["authorization"])
def test_llm_emits_stream_chunk_events():
"""Test that LLM emits stream chunk events when streaming is enabled."""
received_chunks = []
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(LLMStreamChunkEvent)
def handle_stream_chunk(source, event):
received_chunks.append(event.chunk)
# Create an LLM with streaming enabled
llm = LLM(model="gpt-3.5-turbo", stream=True)
# Call the LLM with a simple message
response = llm.call("Tell me a short joke")
# Verify that we received chunks
assert len(received_chunks) > 0
# Verify that concatenating all chunks equals the final response
assert "".join(received_chunks) == response
@pytest.mark.vcr(filter_headers=["authorization"])
def test_llm_no_stream_chunks_when_streaming_disabled():
"""Test that LLM doesn't emit stream chunk events when streaming is disabled."""
received_chunks = []
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(LLMStreamChunkEvent)
def handle_stream_chunk(source, event):
received_chunks.append(event.chunk)
# Create an LLM with streaming disabled
llm = LLM(model="gpt-3.5-turbo", stream=False)
# Call the LLM with a simple message
response = llm.call("Tell me a short joke")
# Verify that we didn't receive any chunks
assert len(received_chunks) == 0
# Verify we got a response
assert response and isinstance(response, str)
@pytest.mark.vcr(filter_headers=["authorization"])
def test_llm_tool_calling_with_streaming():
"""Test that tool calling works correctly with streaming enabled."""
received_chunks = []
tool_called = False
def sample_tool(text: str) -> str:
nonlocal tool_called
tool_called = True
return f"Tool processed: {text}"
available_functions = {"sample_tool": sample_tool}
tools = [
{
"type": "function",
"function": {
"name": "sample_tool",
"description": "A sample tool that processes text",
"parameters": {
"type": "object",
"properties": {
"text": {"type": "string", "description": "The text to process"}
},
"required": ["text"],
},
},
}
]
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(LLMStreamChunkEvent)
def handle_stream_chunk(source, event):
received_chunks.append(event.chunk)
# Create an LLM with streaming enabled
llm = LLM(model="gpt-4", stream=True)
# Store original methods
original_call = llm.call
original_handle_tool_call = llm._handle_tool_call
# Create a mock call method that simulates streaming and tool calling
def mock_call(messages, tools=None, callbacks=None, available_functions=None):
# Emit some chunks first
crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk="I'll process "))
crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk="that text "))
crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk="for you."))
# Call the tool
if available_functions and "sample_tool" in available_functions:
result = available_functions["sample_tool"]("Hello, world!")
return result
return "No tool was called"
# Replace the methods with our mocks
llm.call = mock_call
try:
# Call the LLM with a message that should trigger tool use
response = llm.call(
"Process this text with the sample tool: 'Hello, world!'",
tools=tools,
available_functions=available_functions,
)
# Verify that we received chunks
assert len(received_chunks) == 3
assert "".join(received_chunks) == "I'll process that text for you."
# Verify that the tool was called
assert tool_called
# Verify the response contains the tool's output
assert response == "Tool processed: Hello, world!"
finally:
# Restore the original methods
llm.call = original_call
@pytest.mark.vcr(filter_headers=["authorization"])
def test_streaming_fallback_to_non_streaming():
"""Test that streaming falls back to non-streaming when there's an error."""
received_chunks = []
fallback_called = False
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(LLMStreamChunkEvent)
def handle_stream_chunk(source, event):
received_chunks.append(event.chunk)
# Create an LLM with streaming enabled
llm = LLM(model="gpt-3.5-turbo", stream=True)
# Store original methods
original_call = llm.call
# Create a mock call method that handles the streaming error
def mock_call(messages, tools=None, callbacks=None, available_functions=None):
nonlocal fallback_called
# Emit a couple of chunks to simulate partial streaming
crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk="Test chunk 1"))
crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk="Test chunk 2"))
# Mark that fallback would be called
fallback_called = True
# Return a response as if fallback succeeded
return "Fallback response after streaming error"
# Replace the call method with our mock
llm.call = mock_call
try:
# Call the LLM
response = llm.call("Tell me a short joke")
# Verify that we received some chunks
assert len(received_chunks) == 2
assert received_chunks[0] == "Test chunk 1"
assert received_chunks[1] == "Test chunk 2"
# Verify fallback was triggered
assert fallback_called
# Verify we got the fallback response
assert response == "Fallback response after streaming error"
finally:
# Restore the original method
llm.call = original_call
@pytest.mark.vcr(filter_headers=["authorization"])
def test_streaming_empty_response_handling():
"""Test that streaming handles empty responses correctly."""
received_chunks = []
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(LLMStreamChunkEvent)
def handle_stream_chunk(source, event):
received_chunks.append(event.chunk)
# Create an LLM with streaming enabled
llm = LLM(model="gpt-3.5-turbo", stream=True)
# Store original methods
original_call = llm.call
# Create a mock call method that simulates empty chunks
def mock_call(messages, tools=None, callbacks=None, available_functions=None):
# Emit a few empty chunks
for _ in range(3):
crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk=""))
# Return the default message for empty responses
return "I apologize, but I couldn't generate a proper response. Please try again or rephrase your request."
# Replace the call method with our mock
llm.call = mock_call
try:
# Call the LLM - this should handle empty response
response = llm.call("Tell me a short joke")
# Verify that we received empty chunks
assert len(received_chunks) == 3
assert all(chunk == "" for chunk in received_chunks)
# Verify the response is the default message for empty responses
assert "I apologize" in response and "couldn't generate" in response
finally:
# Restore the original method
llm.call = original_call