diff --git a/src/crewai/agents/crew_agent_executor.py b/src/crewai/agents/crew_agent_executor.py index 48099fad9..37fee24e2 100644 --- a/src/crewai/agents/crew_agent_executor.py +++ b/src/crewai/agents/crew_agent_executor.py @@ -159,6 +159,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): messages=self.messages, callbacks=self.callbacks, printer=self._printer, + from_task=self.task ) formatted_answer = process_llm_response(answer, self.use_stop_words) diff --git a/src/crewai/llm.py b/src/crewai/llm.py index f30ed080f..56901ccd7 100644 --- a/src/crewai/llm.py +++ b/src/crewai/llm.py @@ -419,6 +419,7 @@ class LLM(BaseLLM): params: Dict[str, Any], callbacks: Optional[List[Any]] = None, available_functions: Optional[Dict[str, Any]] = None, + from_task: Optional[Any] = None, ) -> str: """Handle a streaming response from the LLM. @@ -510,6 +511,7 @@ class LLM(BaseLLM): tool_calls=tool_calls, accumulated_tool_args=accumulated_tool_args, available_functions=available_functions, + from_task=from_task, ) if result is not None: chunk_content = result @@ -527,7 +529,7 @@ class LLM(BaseLLM): assert hasattr(crewai_event_bus, "emit") crewai_event_bus.emit( self, - event=LLMStreamChunkEvent(chunk=chunk_content), + event=LLMStreamChunkEvent(chunk=chunk_content, from_task=from_task), ) # --- 4) Fallback to non-streaming if no content received if not full_response.strip() and chunk_count == 0: @@ -540,7 +542,7 @@ class LLM(BaseLLM): "stream_options", None ) # Remove stream_options for non-streaming call return self._handle_non_streaming_response( - non_streaming_params, callbacks, available_functions + non_streaming_params, callbacks, available_functions, from_task ) # --- 5) Handle empty response with chunks @@ -625,7 +627,7 @@ class LLM(BaseLLM): # Log token usage if available in streaming mode self._handle_streaming_callbacks(callbacks, usage_info, last_chunk) # Emit completion event and return response - self._handle_emit_call_events(full_response, LLMCallType.LLM_CALL) + self._handle_emit_call_events(full_response, LLMCallType.LLM_CALL, from_task) return full_response # --- 9) Handle tool calls if present @@ -637,7 +639,7 @@ class LLM(BaseLLM): self._handle_streaming_callbacks(callbacks, usage_info, last_chunk) # --- 11) Emit completion event and return response - self._handle_emit_call_events(full_response, LLMCallType.LLM_CALL) + self._handle_emit_call_events(full_response, LLMCallType.LLM_CALL, from_task) return full_response except ContextWindowExceededError as e: @@ -649,14 +651,14 @@ class LLM(BaseLLM): logging.error(f"Error in streaming response: {str(e)}") if full_response.strip(): logging.warning(f"Returning partial response despite error: {str(e)}") - self._handle_emit_call_events(full_response, LLMCallType.LLM_CALL) + self._handle_emit_call_events(full_response, LLMCallType.LLM_CALL, from_task) return full_response # Emit failed event and re-raise the exception assert hasattr(crewai_event_bus, "emit") crewai_event_bus.emit( self, - event=LLMCallFailedEvent(error=str(e)), + event=LLMCallFailedEvent(error=str(e), from_task=from_task), ) raise Exception(f"Failed to get streaming response: {str(e)}") @@ -665,6 +667,7 @@ class LLM(BaseLLM): tool_calls: List[ChatCompletionDeltaToolCall], accumulated_tool_args: DefaultDict[int, AccumulatedToolArgs], available_functions: Optional[Dict[str, Any]] = None, + from_task: Optional[Any] = None, ) -> None | str: for tool_call in tool_calls: current_tool_accumulator = accumulated_tool_args[tool_call.index] @@ -682,6 +685,7 @@ class LLM(BaseLLM): event=LLMStreamChunkEvent( tool_call=tool_call.to_dict(), chunk=tool_call.function.arguments, + from_task=from_task, ), ) @@ -748,6 +752,7 @@ class LLM(BaseLLM): params: Dict[str, Any], callbacks: Optional[List[Any]] = None, available_functions: Optional[Dict[str, Any]] = None, + from_task: Optional[Any] = None, ) -> str: """Handle a non-streaming response from the LLM. @@ -755,6 +760,7 @@ class LLM(BaseLLM): params: Parameters for the completion call callbacks: Optional list of callback functions available_functions: Dict of available functions + from_task: Optional Task that invoked the LLM Returns: str: The response text @@ -795,7 +801,7 @@ class LLM(BaseLLM): # --- 5) If no tool calls or no available functions, return the text response directly if not tool_calls or not available_functions: - self._handle_emit_call_events(text_response, LLMCallType.LLM_CALL) + self._handle_emit_call_events(text_response, LLMCallType.LLM_CALL, from_task) return text_response # --- 6) Handle tool calls if present @@ -804,7 +810,7 @@ class LLM(BaseLLM): return tool_result # --- 7) If tool call handling didn't return a result, emit completion event and return text response - self._handle_emit_call_events(text_response, LLMCallType.LLM_CALL) + self._handle_emit_call_events(text_response, LLMCallType.LLM_CALL, from_task) return text_response def _handle_tool_call( @@ -889,6 +895,7 @@ class LLM(BaseLLM): tools: Optional[List[dict]] = None, callbacks: Optional[List[Any]] = None, available_functions: Optional[Dict[str, Any]] = None, + from_task: Optional[Any] = None, ) -> Union[str, Any]: """High-level LLM call method. @@ -903,6 +910,7 @@ class LLM(BaseLLM): during and after the LLM call. available_functions: Optional dict mapping function names to callables that can be invoked by the LLM. + from_task: Optional Task that invoked the LLM Returns: Union[str, Any]: Either a text response from the LLM (str) or @@ -922,6 +930,7 @@ class LLM(BaseLLM): tools=tools, callbacks=callbacks, available_functions=available_functions, + from_task=from_task, ), ) @@ -950,11 +959,11 @@ class LLM(BaseLLM): # --- 7) Make the completion call and handle response if self.stream: return self._handle_streaming_response( - params, callbacks, available_functions + params, callbacks, available_functions, from_task ) else: return self._handle_non_streaming_response( - params, callbacks, available_functions + params, callbacks, available_functions, from_task ) except LLMContextLengthExceededException: @@ -966,12 +975,12 @@ class LLM(BaseLLM): assert hasattr(crewai_event_bus, "emit") crewai_event_bus.emit( self, - event=LLMCallFailedEvent(error=str(e)), + event=LLMCallFailedEvent(error=str(e), from_task=from_task), ) logging.error(f"LiteLLM call failed: {str(e)}") raise - def _handle_emit_call_events(self, response: Any, call_type: LLMCallType): + def _handle_emit_call_events(self, response: Any, call_type: LLMCallType, from_task: Optional[Any] = None): """Handle the events for the LLM call. Args: @@ -981,7 +990,7 @@ class LLM(BaseLLM): assert hasattr(crewai_event_bus, "emit") crewai_event_bus.emit( self, - event=LLMCallCompletedEvent(response=response, call_type=call_type), + event=LLMCallCompletedEvent(response=response, call_type=call_type, from_task=from_task), ) def _format_messages_for_provider( diff --git a/src/crewai/llms/base_llm.py b/src/crewai/llms/base_llm.py index c51e8847d..9e7fc4572 100644 --- a/src/crewai/llms/base_llm.py +++ b/src/crewai/llms/base_llm.py @@ -1,5 +1,5 @@ from abc import ABC, abstractmethod -from typing import Any, Callable, Dict, List, Optional, Union +from typing import Any, Dict, List, Optional, Union class BaseLLM(ABC): @@ -47,6 +47,7 @@ class BaseLLM(ABC): tools: Optional[List[dict]] = None, callbacks: Optional[List[Any]] = None, available_functions: Optional[Dict[str, Any]] = None, + from_task: Optional[Any] = None, ) -> Union[str, Any]: """Call the LLM with the given messages. @@ -61,6 +62,7 @@ class BaseLLM(ABC): during and after the LLM call. available_functions: Optional dict mapping function names to callables that can be invoked by the LLM. + from_task: Optional task caller to be used for the LLM call. Returns: Either a text response from the LLM (str) or diff --git a/src/crewai/utilities/agent_utils.py b/src/crewai/utilities/agent_utils.py index c3a38cc9a..086622b76 100644 --- a/src/crewai/utilities/agent_utils.py +++ b/src/crewai/utilities/agent_utils.py @@ -145,12 +145,14 @@ def get_llm_response( messages: List[Dict[str, str]], callbacks: List[Any], printer: Printer, + from_task: Optional[Any] = None, ) -> str: """Call the LLM and return the response, handling any invalid responses.""" try: answer = llm.call( messages, callbacks=callbacks, + from_task=from_task, ) except Exception as e: printer.print( diff --git a/src/crewai/utilities/events/llm_events.py b/src/crewai/utilities/events/llm_events.py index ca8d0367a..95d61babf 100644 --- a/src/crewai/utilities/events/llm_events.py +++ b/src/crewai/utilities/events/llm_events.py @@ -5,6 +5,27 @@ from pydantic import BaseModel from crewai.utilities.events.base_events import BaseEvent +class LLMEventBase(BaseEvent): + task_name: Optional[str] = None + task_id: Optional[str] = None + + agent_id: Optional[str] = None + agent_role: Optional[str] = None + + def __init__(self, **data): + super().__init__(**data) + self._set_agent_params(data) + self._set_task_params(data) + + def _set_agent_params(self, data: Dict[str, Any]): + if "from_task" in data and (agent := getattr(data["from_task"], "agent", None)): + self.agent_id = agent.id + self.agent_role = agent.role + + def _set_task_params(self, data: Dict[str, Any]): + if "from_task" in data and (task := data["from_task"]): + self.task_id = task.id + self.task_name = task.name class LLMCallType(Enum): """Type of LLM call being made""" @@ -13,7 +34,7 @@ class LLMCallType(Enum): LLM_CALL = "llm_call" -class LLMCallStartedEvent(BaseEvent): +class LLMCallStartedEvent(LLMEventBase): """Event emitted when a LLM call starts Attributes: @@ -28,7 +49,7 @@ class LLMCallStartedEvent(BaseEvent): available_functions: Optional[Dict[str, Any]] = None -class LLMCallCompletedEvent(BaseEvent): +class LLMCallCompletedEvent(LLMEventBase): """Event emitted when a LLM call completes""" type: str = "llm_call_completed" @@ -36,7 +57,7 @@ class LLMCallCompletedEvent(BaseEvent): call_type: LLMCallType -class LLMCallFailedEvent(BaseEvent): +class LLMCallFailedEvent(LLMEventBase): """Event emitted when a LLM call fails""" error: str @@ -55,7 +76,7 @@ class ToolCall(BaseModel): index: int -class LLMStreamChunkEvent(BaseEvent): +class LLMStreamChunkEvent(LLMEventBase): """Event emitted when a streaming chunk is received""" type: str = "llm_stream_chunk" diff --git a/tests/custom_llm_test.py b/tests/custom_llm_test.py index 6bee5b31d..9e22d3add 100644 --- a/tests/custom_llm_test.py +++ b/tests/custom_llm_test.py @@ -31,6 +31,7 @@ class CustomLLM(BaseLLM): tools=None, callbacks=None, available_functions=None, + from_task=None, ): """ Mock LLM call that returns a predefined response. diff --git a/tests/utilities/cassettes/test_llm_emits_event_with_task_and_agent_info.yaml b/tests/utilities/cassettes/test_llm_emits_event_with_task_and_agent_info.yaml new file mode 100644 index 000000000..76cf1248b --- /dev/null +++ b/tests/utilities/cassettes/test_llm_emits_event_with_task_and_agent_info.yaml @@ -0,0 +1,118 @@ +interactions: +- request: + body: '{"messages": [{"role": "system", "content": "You are base_agent. You are + a helpful assistant that just says hi\nYour personal goal is: Just say hi\nTo + give my best complete final answer to the task respond using the exact following + format:\n\nThought: I now can give a great answer\nFinal Answer: Your final + answer must be the great and the most complete as possible, it must be outcome + described.\n\nI MUST use these formats, my job depends on it!"}, {"role": "user", + "content": "\nCurrent Task: Just say hi\n\nThis is the expected criteria for + your final answer: hi\nyou MUST return the actual complete content as the final + answer, not a summary.\n\nBegin! This is VERY important to you, use the tools + available and give your best Final Answer, your job depends on it!\n\nThought:"}], + "model": "gpt-4o-mini", "stop": ["\nObservation:"]}' + headers: + accept: + - application/json + accept-encoding: + - gzip, deflate, zstd + connection: + - keep-alive + content-length: + - '838' + content-type: + - application/json + host: + - api.openai.com + user-agent: + - OpenAI/Python 1.78.0 + x-stainless-arch: + - arm64 + x-stainless-async: + - 'false' + x-stainless-lang: + - python + x-stainless-os: + - MacOS + x-stainless-package-version: + - 1.78.0 + x-stainless-raw-response: + - 'true' + x-stainless-read-timeout: + - '600.0' + x-stainless-retry-count: + - '0' + x-stainless-runtime: + - CPython + x-stainless-runtime-version: + - 3.11.12 + method: POST + uri: https://api.openai.com/v1/chat/completions + response: + body: + string: !!binary | + H4sIAAAAAAAAA4xSTW/bMAy9+1cQOsdDnI9m861ZMGAbBuyyHbYWBiMxtjaZEiS5aVHkvw+y09jd + OqAXA+bje3qP5GMGILQSJQjZYJStM/nW7j6z+vb9R739+mm707vj7sv2gffq/v2VEbPEsPtfJOMT + 6420rTMUteUBlp4wUlItNutiOd8s15seaK0ik2i1i/nK5q1mnS/mi1U+3+TF2zO7sVpSECX8zAAA + Hvtv8smK7kUJ89lTpaUQsCZRXpoAhLcmVQSGoENEjmI2gtJyJO6tfwS2R5DIUOs7AoQ62QbkcCQP + cMMfNKOB6/6/hEZPdTwduoApC3fGTABkthHTLPoEt2fkdPFsbO283Ye/qOKgWYem8oTBcvIXonWi + R08ZwG0/m+5ZXOG8bV2sov1N/XPFVTHoiXElE3RxBqONaCb1zXL2gl6lKKI2YTJdIVE2pEbquArs + lLYTIJuk/tfNS9pDcs31a+RHQEpykVTlPCktnyce2zyli/1f22XKvWERyN9pSVXU5NMmFB2wM8Md + ifAQIrXVQXNN3nk9HNPBVcsVrldI75ZSZKfsDwAAAP//AwBulbOoWgMAAA== + headers: + CF-RAY: + - 957fa6e91a22023d-GRU + Connection: + - keep-alive + Content-Encoding: + - gzip + Content-Type: + - application/json + Date: + - Mon, 30 Jun 2025 18:15:58 GMT + Server: + - cloudflare + Set-Cookie: + - __cf_bm=9WXNY0u6p0Nlyb1G36cXHDgtwb1538JzaUNoS4tgrpo-1751307358-1.0.1.1-BAvg6Fgqsv3ITFxrC3z3E42AqgSZcGq4Gr1Wrjx56TOsljYynqCePNzQ79oAncT9KXehFnUMxA6JSf2lAfQOeSLVREY3_P6GjPkbcwIsVXw; + path=/; expires=Mon, 30-Jun-25 18:45:58 GMT; domain=.api.openai.com; HttpOnly; + Secure; SameSite=None + - _cfuvid=N5kr6p8e26f8scPW5s2uVOatzh2RYjlQb13eQUBsrts-1751307358295-0.0.1.1-604800000; + path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None + Transfer-Encoding: + - chunked + X-Content-Type-Options: + - nosniff + access-control-expose-headers: + - X-Request-ID + alt-svc: + - h3=":443"; ma=86400 + cf-cache-status: + - DYNAMIC + openai-organization: + - crewai-iuxna1 + openai-processing-ms: + - '308' + openai-version: + - '2020-10-01' + strict-transport-security: + - max-age=31536000; includeSubDomains; preload + x-envoy-upstream-service-time: + - '310' + x-ratelimit-limit-requests: + - '30000' + x-ratelimit-limit-tokens: + - '150000000' + x-ratelimit-remaining-requests: + - '29999' + x-ratelimit-remaining-tokens: + - '149999823' + x-ratelimit-reset-requests: + - 2ms + x-ratelimit-reset-tokens: + - 0s + x-request-id: + - req_78bb0375ac6e0939c8e05f66869e0137 + status: + code: 200 + message: OK +version: 1 diff --git a/tests/utilities/cassettes/test_stream_llm_emits_event_with_task_and_agent_info.yaml b/tests/utilities/cassettes/test_stream_llm_emits_event_with_task_and_agent_info.yaml new file mode 100644 index 000000000..71cc2f39f --- /dev/null +++ b/tests/utilities/cassettes/test_stream_llm_emits_event_with_task_and_agent_info.yaml @@ -0,0 +1,165 @@ +interactions: +- request: + body: '{"messages": [{"role": "system", "content": "You are TestAgent. You are + a helpful assistant that just says hi\nYour personal goal is: Just say hi\nTo + give my best complete final answer to the task respond using the exact following + format:\n\nThought: I now can give a great answer\nFinal Answer: Your final + answer must be the great and the most complete as possible, it must be outcome + described.\n\nI MUST use these formats, my job depends on it!"}, {"role": "user", + "content": "\nCurrent Task: Just say hi\n\nThis is the expected criteria for + your final answer: hi\nyou MUST return the actual complete content as the final + answer, not a summary.\n\nBegin! This is VERY important to you, use the tools + available and give your best Final Answer, your job depends on it!\n\nThought:"}], + "model": "gpt-4o-mini", "stop": ["\nObservation:"], "stream": true, "stream_options": + {"include_usage": true}}' + headers: + accept: + - application/json + accept-encoding: + - gzip, deflate, zstd + connection: + - keep-alive + content-length: + - '896' + content-type: + - application/json + host: + - api.openai.com + user-agent: + - OpenAI/Python 1.78.0 + x-stainless-arch: + - arm64 + x-stainless-async: + - 'false' + x-stainless-lang: + - python + x-stainless-os: + - MacOS + x-stainless-package-version: + - 1.78.0 + x-stainless-raw-response: + - 'true' + x-stainless-read-timeout: + - '600.0' + x-stainless-retry-count: + - '0' + x-stainless-runtime: + - CPython + x-stainless-runtime-version: + - 3.11.12 + method: POST + uri: https://api.openai.com/v1/chat/completions + response: + body: + string: 'data: {"id":"chatcmpl-BoFTMrRBBaVsju5o285dR7JBeVVvS","object":"chat.completion.chunk","created":1751315576,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_34a54ae93c","choices":[{"index":0,"delta":{"role":"assistant","content":"","refusal":null},"logprobs":null,"finish_reason":null}],"usage":null} + + + data: {"id":"chatcmpl-BoFTMrRBBaVsju5o285dR7JBeVVvS","object":"chat.completion.chunk","created":1751315576,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_34a54ae93c","choices":[{"index":0,"delta":{"content":"I"},"logprobs":null,"finish_reason":null}],"usage":null} + + + data: {"id":"chatcmpl-BoFTMrRBBaVsju5o285dR7JBeVVvS","object":"chat.completion.chunk","created":1751315576,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_34a54ae93c","choices":[{"index":0,"delta":{"content":" + now"},"logprobs":null,"finish_reason":null}],"usage":null} + + + data: {"id":"chatcmpl-BoFTMrRBBaVsju5o285dR7JBeVVvS","object":"chat.completion.chunk","created":1751315576,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_34a54ae93c","choices":[{"index":0,"delta":{"content":" + can"},"logprobs":null,"finish_reason":null}],"usage":null} + + + data: {"id":"chatcmpl-BoFTMrRBBaVsju5o285dR7JBeVVvS","object":"chat.completion.chunk","created":1751315576,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_34a54ae93c","choices":[{"index":0,"delta":{"content":" + give"},"logprobs":null,"finish_reason":null}],"usage":null} + + + data: {"id":"chatcmpl-BoFTMrRBBaVsju5o285dR7JBeVVvS","object":"chat.completion.chunk","created":1751315576,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_34a54ae93c","choices":[{"index":0,"delta":{"content":" + a"},"logprobs":null,"finish_reason":null}],"usage":null} + + + data: {"id":"chatcmpl-BoFTMrRBBaVsju5o285dR7JBeVVvS","object":"chat.completion.chunk","created":1751315576,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_34a54ae93c","choices":[{"index":0,"delta":{"content":" + great"},"logprobs":null,"finish_reason":null}],"usage":null} + + + data: {"id":"chatcmpl-BoFTMrRBBaVsju5o285dR7JBeVVvS","object":"chat.completion.chunk","created":1751315576,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_34a54ae93c","choices":[{"index":0,"delta":{"content":" + answer"},"logprobs":null,"finish_reason":null}],"usage":null} + + + data: {"id":"chatcmpl-BoFTMrRBBaVsju5o285dR7JBeVVvS","object":"chat.completion.chunk","created":1751315576,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_34a54ae93c","choices":[{"index":0,"delta":{"content":" \n"},"logprobs":null,"finish_reason":null}],"usage":null} + + + data: {"id":"chatcmpl-BoFTMrRBBaVsju5o285dR7JBeVVvS","object":"chat.completion.chunk","created":1751315576,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_34a54ae93c","choices":[{"index":0,"delta":{"content":"Final"},"logprobs":null,"finish_reason":null}],"usage":null} + + + data: {"id":"chatcmpl-BoFTMrRBBaVsju5o285dR7JBeVVvS","object":"chat.completion.chunk","created":1751315576,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_34a54ae93c","choices":[{"index":0,"delta":{"content":" + Answer"},"logprobs":null,"finish_reason":null}],"usage":null} + + + data: {"id":"chatcmpl-BoFTMrRBBaVsju5o285dR7JBeVVvS","object":"chat.completion.chunk","created":1751315576,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_34a54ae93c","choices":[{"index":0,"delta":{"content":":"},"logprobs":null,"finish_reason":null}],"usage":null} + + + data: {"id":"chatcmpl-BoFTMrRBBaVsju5o285dR7JBeVVvS","object":"chat.completion.chunk","created":1751315576,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_34a54ae93c","choices":[{"index":0,"delta":{"content":" + hi"},"logprobs":null,"finish_reason":null}],"usage":null} + + + data: {"id":"chatcmpl-BoFTMrRBBaVsju5o285dR7JBeVVvS","object":"chat.completion.chunk","created":1751315576,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_34a54ae93c","choices":[{"index":0,"delta":{},"logprobs":null,"finish_reason":"stop"}],"usage":null} + + + data: {"id":"chatcmpl-BoFTMrRBBaVsju5o285dR7JBeVVvS","object":"chat.completion.chunk","created":1751315576,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_34a54ae93c","choices":[],"usage":{"prompt_tokens":161,"completion_tokens":12,"total_tokens":173,"prompt_tokens_details":{"cached_tokens":0,"audio_tokens":0},"completion_tokens_details":{"reasoning_tokens":0,"audio_tokens":0,"accepted_prediction_tokens":0,"rejected_prediction_tokens":0}}} + + + data: [DONE] + + + ' + headers: + CF-RAY: + - 95806f8e3fc2f213-GRU + Connection: + - keep-alive + Content-Type: + - text/event-stream; charset=utf-8 + Date: + - Mon, 30 Jun 2025 20:32:56 GMT + Server: + - cloudflare + Set-Cookie: + - __cf_bm=dxb.Rn1CsTQLjW9eU0KWonEuID9KkkVRJ1FaBBsW5gQ-1751315576-1.0.1.1-R7bLnCfrjJwtHYzbKEE9in7ilYfymelWgYg1OcPqSEllAFA9_R2ctsY0f7Mrv7i0dXaynAooDpLs9hGIzfgyBR9EgkRjqoaHbByPXjxy_5s; + path=/; expires=Mon, 30-Jun-25 21:02:56 GMT; domain=.api.openai.com; HttpOnly; + Secure; SameSite=None + - _cfuvid=MwdjLsfFXJDWzbJseVfA4MIpVAqLa7envAu7EAkdK4o-1751315576696-0.0.1.1-604800000; + path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None + Transfer-Encoding: + - chunked + X-Content-Type-Options: + - nosniff + access-control-expose-headers: + - X-Request-ID + alt-svc: + - h3=":443"; ma=86400 + cf-cache-status: + - DYNAMIC + openai-organization: + - crewai-iuxna1 + openai-processing-ms: + - '238' + openai-version: + - '2020-10-01' + strict-transport-security: + - max-age=31536000; includeSubDomains; preload + x-envoy-upstream-service-time: + - '241' + x-ratelimit-limit-requests: + - '30000' + x-ratelimit-limit-tokens: + - '150000000' + x-ratelimit-remaining-requests: + - '29999' + x-ratelimit-remaining-tokens: + - '149999824' + x-ratelimit-reset-requests: + - 2ms + x-ratelimit-reset-tokens: + - 0s + x-request-id: + - req_394b055696dfcdc221b5ecd0fba49e97 + status: + code: 200 + message: OK +version: 1 diff --git a/tests/utilities/test_events.py b/tests/utilities/test_events.py index 2f6f11b61..8e9cd7958 100644 --- a/tests/utilities/test_events.py +++ b/tests/utilities/test_events.py @@ -57,23 +57,28 @@ def vcr_config(request) -> dict: } -base_agent = Agent( - role="base_agent", - llm="gpt-4o-mini", - goal="Just say hi", - backstory="You are a helpful assistant that just says hi", +@pytest.fixture(scope="module") +def base_agent(): + return Agent( + role="base_agent", + llm="gpt-4o-mini", + goal="Just say hi", + backstory="You are a helpful assistant that just says hi", ) -base_task = Task( - description="Just say hi", - expected_output="hi", - agent=base_agent, -) +@pytest.fixture(scope="module") +def base_task(base_agent): + return Task( + description="Just say hi", + expected_output="hi", + agent=base_agent, + ) + event_listener = EventListener() @pytest.mark.vcr(filter_headers=["authorization"]) -def test_crew_emits_start_kickoff_event(): +def test_crew_emits_start_kickoff_event(base_agent, base_task): received_events = [] mock_span = Mock() @@ -101,7 +106,7 @@ def test_crew_emits_start_kickoff_event(): @pytest.mark.vcr(filter_headers=["authorization"]) -def test_crew_emits_end_kickoff_event(): +def test_crew_emits_end_kickoff_event(base_agent, base_task): received_events = [] @crewai_event_bus.on(CrewKickoffCompletedEvent) @@ -119,7 +124,7 @@ def test_crew_emits_end_kickoff_event(): @pytest.mark.vcr(filter_headers=["authorization"]) -def test_crew_emits_test_kickoff_type_event(): +def test_crew_emits_test_kickoff_type_event(base_agent, base_task): received_events = [] mock_span = Mock() @@ -165,7 +170,7 @@ def test_crew_emits_test_kickoff_type_event(): @pytest.mark.vcr(filter_headers=["authorization"]) -def test_crew_emits_kickoff_failed_event(): +def test_crew_emits_kickoff_failed_event(base_agent, base_task): received_events = [] with crewai_event_bus.scoped_handlers(): @@ -190,7 +195,7 @@ def test_crew_emits_kickoff_failed_event(): @pytest.mark.vcr(filter_headers=["authorization"]) -def test_crew_emits_start_task_event(): +def test_crew_emits_start_task_event(base_agent, base_task): received_events = [] @crewai_event_bus.on(TaskStartedEvent) @@ -207,7 +212,7 @@ def test_crew_emits_start_task_event(): @pytest.mark.vcr(filter_headers=["authorization"]) -def test_crew_emits_end_task_event(): +def test_crew_emits_end_task_event(base_agent, base_task): received_events = [] @crewai_event_bus.on(TaskCompletedEvent) @@ -235,7 +240,7 @@ def test_crew_emits_end_task_event(): @pytest.mark.vcr(filter_headers=["authorization"]) -def test_task_emits_failed_event_on_execution_error(): +def test_task_emits_failed_event_on_execution_error(base_agent, base_task): received_events = [] received_sources = [] @@ -272,7 +277,7 @@ def test_task_emits_failed_event_on_execution_error(): @pytest.mark.vcr(filter_headers=["authorization"]) -def test_agent_emits_execution_started_and_completed_events(): +def test_agent_emits_execution_started_and_completed_events(base_agent, base_task): received_events = [] @crewai_event_bus.on(AgentExecutionStartedEvent) @@ -301,7 +306,7 @@ def test_agent_emits_execution_started_and_completed_events(): @pytest.mark.vcr(filter_headers=["authorization"]) -def test_agent_emits_execution_error_event(): +def test_agent_emits_execution_error_event(base_agent, base_task): received_events = [] @crewai_event_bus.on(AgentExecutionErrorEvent) @@ -501,7 +506,7 @@ def test_flow_emits_method_execution_started_event(): @pytest.mark.vcr(filter_headers=["authorization"]) -def test_register_handler_adds_new_handler(): +def test_register_handler_adds_new_handler(base_agent, base_task): received_events = [] def custom_handler(source, event): @@ -519,7 +524,7 @@ def test_register_handler_adds_new_handler(): @pytest.mark.vcr(filter_headers=["authorization"]) -def test_multiple_handlers_for_same_event(): +def test_multiple_handlers_for_same_event(base_agent, base_task): received_events_1 = [] received_events_2 = [] @@ -613,6 +618,11 @@ def test_llm_emits_call_started_event(): assert received_events[0].type == "llm_call_started" assert received_events[1].type == "llm_call_completed" + assert received_events[0].task_name is None + assert received_events[0].agent_role is None + assert received_events[0].agent_id is None + assert received_events[0].task_id is None + @pytest.mark.vcr(filter_headers=["authorization"]) def test_llm_emits_call_failed_event(): @@ -632,6 +642,10 @@ def test_llm_emits_call_failed_event(): assert len(received_events) == 1 assert received_events[0].type == "llm_call_failed" assert received_events[0].error == error_message + assert received_events[0].task_name is None + assert received_events[0].agent_role is None + assert received_events[0].agent_id is None + assert received_events[0].task_id is None @pytest.mark.vcr(filter_headers=["authorization"]) @@ -742,7 +756,6 @@ def test_streaming_empty_response_handling(): received_chunks = [] with crewai_event_bus.scoped_handlers(): - @crewai_event_bus.on(LLMStreamChunkEvent) def handle_stream_chunk(source, event): received_chunks.append(event.chunk) @@ -779,3 +792,114 @@ def test_streaming_empty_response_handling(): finally: # Restore the original method llm.call = original_call + +@pytest.mark.vcr(filter_headers=["authorization"]) +def test_stream_llm_emits_event_with_task_and_agent_info(): + completed_event = [] + failed_event = [] + started_event = [] + stream_event = [] + + with crewai_event_bus.scoped_handlers(): + @crewai_event_bus.on(LLMCallFailedEvent) + def handle_llm_failed(source, event): + failed_event.append(event) + + @crewai_event_bus.on(LLMCallStartedEvent) + def handle_llm_started(source, event): + started_event.append(event) + + @crewai_event_bus.on(LLMCallCompletedEvent) + def handle_llm_completed(source, event): + completed_event.append(event) + + @crewai_event_bus.on(LLMStreamChunkEvent) + def handle_llm_stream_chunk(source, event): + stream_event.append(event) + + agent = Agent( + role="TestAgent", + llm=LLM(model="gpt-4o-mini", stream=True), + goal="Just say hi", + backstory="You are a helpful assistant that just says hi", + ) + task = Task( + description="Just say hi", + expected_output="hi", + llm=LLM(model="gpt-4o-mini", stream=True), + agent=agent + ) + + crew = Crew(agents=[agent], tasks=[task]) + crew.kickoff() + + assert len(completed_event) == 1 + assert len(failed_event) == 0 + assert len(started_event) == 1 + assert len(stream_event) == 12 + + all_events = completed_event + failed_event + started_event + stream_event + all_agent_roles = [event.agent_role for event in all_events] + all_agent_id = [event.agent_id for event in all_events] + all_task_id = [event.task_id for event in all_events] + all_task_name = [event.task_name for event in all_events] + + # ensure all events have the agent + task props set + assert len(all_agent_roles) == 14 + assert len(all_agent_id) == 14 + assert len(all_task_id) == 14 + assert len(all_task_name) == 14 + + assert set(all_agent_roles) == {agent.role} + assert set(all_agent_id) == {agent.id} + assert set(all_task_id) == {task.id} + assert set(all_task_name) == {task.name} + +@pytest.mark.vcr(filter_headers=["authorization"]) +def test_llm_emits_event_with_task_and_agent_info(base_agent, base_task): + completed_event = [] + failed_event = [] + started_event = [] + stream_event = [] + + with crewai_event_bus.scoped_handlers(): + @crewai_event_bus.on(LLMCallFailedEvent) + def handle_llm_failed(source, event): + failed_event.append(event) + + @crewai_event_bus.on(LLMCallStartedEvent) + def handle_llm_started(source, event): + started_event.append(event) + + @crewai_event_bus.on(LLMCallCompletedEvent) + def handle_llm_completed(source, event): + completed_event.append(event) + + @crewai_event_bus.on(LLMStreamChunkEvent) + def handle_llm_stream_chunk(source, event): + stream_event.append(event) + + crew = Crew(agents=[base_agent], tasks=[base_task]) + crew.kickoff() + + assert len(completed_event) == 1 + assert len(failed_event) == 0 + assert len(started_event) == 1 + assert len(stream_event) == 0 + + all_events = completed_event + failed_event + started_event + stream_event + all_agent_roles = [event.agent_role for event in all_events] + all_agent_id = [event.agent_id for event in all_events] + all_task_id = [event.task_id for event in all_events] + all_task_name = [event.task_name for event in all_events] + + # ensure all events have the agent + task props set + assert len(all_agent_roles) == 2 + assert len(all_agent_id) == 2 + assert len(all_task_id) == 2 + assert len(all_task_name) == 2 + + assert set(all_agent_roles) == {base_agent.role} + assert set(all_agent_id) == {base_agent.id} + assert set(all_task_id) == {base_task.id} + assert set(all_task_name) == {base_task.name}