Compare commits

..

3 Commits

Author SHA1 Message Date
Devin AI
a4b22e92ad fix: save flow plot HTML to current working directory instead of temp dir
Fixes #4991

Previously,  saved generated HTML/CSS/JS files to a
hidden system temp directory (via ), making the output
inaccessible from the user's project folder. The CLI also printed a
misleading message suggesting the file was saved locally.

Changes:
-  now defaults to saving files in the current
  working directory () instead of a temp directory.
- Added an  parameter to both  and
   so users can optionally specify a custom output directory.
- Added 5 new tests covering CWD output, explicit output_dir, and
  absolute path guarantees.

Co-Authored-By: João <joao@crewai.com>
2026-03-20 19:55:22 +00:00
Lucas Gomide
8e427164ca docs: adding a lot of missinge vent listeners (#4990)
Co-authored-by: Greyson LaLonde <greyson.r.lalonde@gmail.com>
2026-03-20 15:30:11 -04:00
Greyson LaLonde
6495aff528 refactor: replace Any-typed callback and model fields with serializable types 2026-03-20 15:18:50 -04:00
19 changed files with 874 additions and 1017 deletions

View File

@@ -196,12 +196,19 @@ CrewAI provides a wide range of events that you can listen for:
- **CrewTrainStartedEvent**: Emitted when a Crew starts training
- **CrewTrainCompletedEvent**: Emitted when a Crew completes training
- **CrewTrainFailedEvent**: Emitted when a Crew fails to complete training
- **CrewTestResultEvent**: Emitted when a Crew test result is available. Contains the quality score, execution duration, and model used.
### Agent Events
- **AgentExecutionStartedEvent**: Emitted when an Agent starts executing a task
- **AgentExecutionCompletedEvent**: Emitted when an Agent completes executing a task
- **AgentExecutionErrorEvent**: Emitted when an Agent encounters an error during execution
- **LiteAgentExecutionStartedEvent**: Emitted when a LiteAgent starts executing. Contains the agent info, tools, and messages.
- **LiteAgentExecutionCompletedEvent**: Emitted when a LiteAgent completes execution. Contains the agent info and output.
- **LiteAgentExecutionErrorEvent**: Emitted when a LiteAgent encounters an error during execution. Contains the agent info and error message.
- **AgentEvaluationStartedEvent**: Emitted when an agent evaluation starts. Contains the agent ID, agent role, optional task ID, and iteration number.
- **AgentEvaluationCompletedEvent**: Emitted when an agent evaluation completes. Contains the agent ID, agent role, optional task ID, iteration number, metric category, and score.
- **AgentEvaluationFailedEvent**: Emitted when an agent evaluation fails. Contains the agent ID, agent role, optional task ID, iteration number, and error message.
### Task Events
@@ -242,16 +249,26 @@ CrewAI provides a wide range of events that you can listen for:
- **LLMGuardrailStartedEvent**: Emitted when a guardrail validation starts. Contains details about the guardrail being applied and retry count.
- **LLMGuardrailCompletedEvent**: Emitted when a guardrail validation completes. Contains details about validation success/failure, results, and error messages if any.
- **LLMGuardrailFailedEvent**: Emitted when a guardrail validation fails. Contains the error message and retry count.
### Flow Events
- **FlowCreatedEvent**: Emitted when a Flow is created
- **FlowStartedEvent**: Emitted when a Flow starts execution
- **FlowFinishedEvent**: Emitted when a Flow completes execution
- **FlowPausedEvent**: Emitted when a Flow is paused waiting for human feedback. Contains the flow name, flow ID, method name, current state, message shown when requesting feedback, and optional list of possible outcomes for routing.
- **FlowPlotEvent**: Emitted when a Flow is plotted
- **MethodExecutionStartedEvent**: Emitted when a Flow method starts execution
- **MethodExecutionFinishedEvent**: Emitted when a Flow method completes execution
- **MethodExecutionFailedEvent**: Emitted when a Flow method fails to complete execution
- **MethodExecutionPausedEvent**: Emitted when a Flow method is paused waiting for human feedback. Contains the flow name, method name, current state, flow ID, message shown when requesting feedback, and optional list of possible outcomes for routing.
### Human In The Loop Events
- **FlowInputRequestedEvent**: Emitted when a Flow requests user input via `Flow.ask()`. Contains the flow name, method name, the question or prompt being shown to the user, and optional metadata (e.g., user ID, channel, session context).
- **FlowInputReceivedEvent**: Emitted when user input is received after `Flow.ask()`. Contains the flow name, method name, the original question, the user's response (or `None` if timed out), optional request metadata, and optional response metadata from the provider (e.g., who responded, thread ID, timestamps).
- **HumanFeedbackRequestedEvent**: Emitted when a `@human_feedback` decorated method requires input from a human reviewer. Contains the flow name, method name, the method output shown to the human for review, the message displayed when requesting feedback, and optional list of possible outcomes for routing.
- **HumanFeedbackReceivedEvent**: Emitted when a human provides feedback in response to a `@human_feedback` decorated method. Contains the flow name, method name, the raw text feedback provided by the human, and the collapsed outcome string (if emit was specified).
### LLM Events
@@ -259,6 +276,7 @@ CrewAI provides a wide range of events that you can listen for:
- **LLMCallCompletedEvent**: Emitted when an LLM call completes
- **LLMCallFailedEvent**: Emitted when an LLM call fails
- **LLMStreamChunkEvent**: Emitted for each chunk received during streaming LLM responses
- **LLMThinkingChunkEvent**: Emitted when a thinking/reasoning chunk is received from a thinking model. Contains the chunk text and optional response ID.
### Memory Events
@@ -270,6 +288,79 @@ CrewAI provides a wide range of events that you can listen for:
- **MemorySaveFailedEvent**: Emitted when a memory save operation fails. Contains the value, metadata, agent role, and error message.
- **MemoryRetrievalStartedEvent**: Emitted when memory retrieval for a task prompt starts. Contains the optional task ID.
- **MemoryRetrievalCompletedEvent**: Emitted when memory retrieval for a task prompt completes successfully. Contains the task ID, memory content, and retrieval execution time.
- **MemoryRetrievalFailedEvent**: Emitted when memory retrieval for a task prompt fails. Contains the optional task ID and error message.
### Reasoning Events
- **AgentReasoningStartedEvent**: Emitted when an agent starts reasoning about a task. Contains the agent role, task ID, and attempt number.
- **AgentReasoningCompletedEvent**: Emitted when an agent finishes its reasoning process. Contains the agent role, task ID, the plan produced, and whether the agent is ready to proceed.
- **AgentReasoningFailedEvent**: Emitted when the reasoning process fails. Contains the agent role, task ID, and error message.
### Observation Events
- **StepObservationStartedEvent**: Emitted when the Planner begins observing a step's result. Fires after every step execution, before the observation LLM call. Contains the agent role, step number, and step description.
- **StepObservationCompletedEvent**: Emitted when the Planner finishes observing a step's result. Contains whether the step completed successfully, key information learned, whether the remaining plan is still valid, whether a full replan is needed, and suggested refinements.
- **StepObservationFailedEvent**: Emitted when the observation LLM call itself fails. The system defaults to continuing the plan. Contains the error message.
- **PlanRefinementEvent**: Emitted when the Planner refines upcoming step descriptions without a full replan. Contains the number of refined steps and the refinements applied.
- **PlanReplanTriggeredEvent**: Emitted when the Planner triggers a full replan because the remaining plan was deemed fundamentally wrong. Contains the replan reason, replan count, and number of completed steps preserved.
- **GoalAchievedEarlyEvent**: Emitted when the Planner detects the goal was achieved early and remaining steps will be skipped. Contains the number of steps remaining and steps completed.
### A2A (Agent-to-Agent) Events
#### Delegation Events
- **A2ADelegationStartedEvent**: Emitted when A2A delegation starts. Contains the endpoint URL, task description, agent ID, context ID, whether it's multiturn, turn number, agent card metadata, protocol version, provider info, and optional skill ID.
- **A2ADelegationCompletedEvent**: Emitted when A2A delegation completes. Contains the completion status (`completed`, `input_required`, `failed`, etc.), result, error message, context ID, and agent card metadata.
- **A2AParallelDelegationStartedEvent**: Emitted when parallel delegation to multiple A2A agents begins. Contains the list of endpoints and the task description.
- **A2AParallelDelegationCompletedEvent**: Emitted when parallel delegation to multiple A2A agents completes. Contains the list of endpoints, success count, failure count, and results summary.
#### Conversation Events
- **A2AConversationStartedEvent**: Emitted once at the beginning of a multiturn A2A conversation, before the first message exchange. Contains the agent ID, endpoint, context ID, agent card metadata, protocol version, and provider info.
- **A2AMessageSentEvent**: Emitted when a message is sent to the A2A agent. Contains the message content, turn number, context ID, message ID, and whether it's multiturn.
- **A2AResponseReceivedEvent**: Emitted when a response is received from the A2A agent. Contains the response content, turn number, context ID, message ID, status, and whether it's the final response.
- **A2AConversationCompletedEvent**: Emitted once at the end of a multiturn A2A conversation. Contains the final status (`completed` or `failed`), final result, error message, context ID, and total number of turns.
#### Streaming Events
- **A2AStreamingStartedEvent**: Emitted when streaming mode begins for A2A delegation. Contains the task ID, context ID, endpoint, turn number, and whether it's multiturn.
- **A2AStreamingChunkEvent**: Emitted when a streaming chunk is received. Contains the chunk text, chunk index, whether it's the final chunk, task ID, context ID, and turn number.
#### Polling & Push Notification Events
- **A2APollingStartedEvent**: Emitted when polling mode begins for A2A delegation. Contains the task ID, context ID, polling interval in seconds, and endpoint.
- **A2APollingStatusEvent**: Emitted on each polling iteration. Contains the task ID, context ID, current task state, elapsed seconds, and poll count.
- **A2APushNotificationRegisteredEvent**: Emitted when a push notification callback is registered. Contains the task ID, context ID, callback URL, and endpoint.
- **A2APushNotificationReceivedEvent**: Emitted when a push notification is received from the remote A2A agent. Contains the task ID, context ID, and current state.
- **A2APushNotificationSentEvent**: Emitted when a push notification is sent to a callback URL. Contains the task ID, context ID, callback URL, state, whether delivery succeeded, and optional error message.
- **A2APushNotificationTimeoutEvent**: Emitted when push notification wait times out. Contains the task ID, context ID, and timeout duration in seconds.
#### Connection & Authentication Events
- **A2AAgentCardFetchedEvent**: Emitted when an agent card is successfully fetched. Contains the endpoint, agent name, agent card metadata, protocol version, provider info, whether it was cached, and fetch time in milliseconds.
- **A2AAuthenticationFailedEvent**: Emitted when authentication to an A2A agent fails. Contains the endpoint, auth type attempted (e.g., `bearer`, `oauth2`, `api_key`), error message, and HTTP status code.
- **A2AConnectionErrorEvent**: Emitted when a connection error occurs during A2A communication. Contains the endpoint, error message, error type (e.g., `timeout`, `connection_refused`, `dns_error`), HTTP status code, and the operation being attempted.
- **A2ATransportNegotiatedEvent**: Emitted when transport protocol is negotiated with an A2A agent. Contains the negotiated transport, negotiated URL, selection source (`client_preferred`, `server_preferred`, `fallback`), and client/server supported transports.
- **A2AContentTypeNegotiatedEvent**: Emitted when content types are negotiated with an A2A agent. Contains the client/server input/output modes, negotiated input/output modes, and whether negotiation succeeded.
#### Artifact Events
- **A2AArtifactReceivedEvent**: Emitted when an artifact is received from a remote A2A agent. Contains the task ID, artifact ID, artifact name, description, MIME type, size in bytes, and whether content should be appended.
#### Server Task Events
- **A2AServerTaskStartedEvent**: Emitted when an A2A server task execution starts. Contains the task ID and context ID.
- **A2AServerTaskCompletedEvent**: Emitted when an A2A server task execution completes. Contains the task ID, context ID, and result.
- **A2AServerTaskCanceledEvent**: Emitted when an A2A server task execution is canceled. Contains the task ID and context ID.
- **A2AServerTaskFailedEvent**: Emitted when an A2A server task execution fails. Contains the task ID, context ID, and error message.
#### Context Lifecycle Events
- **A2AContextCreatedEvent**: Emitted when an A2A context is created. Contexts group related tasks in a conversation or workflow. Contains the context ID and creation timestamp.
- **A2AContextExpiredEvent**: Emitted when an A2A context expires due to TTL. Contains the context ID, creation timestamp, age in seconds, and task count.
- **A2AContextIdleEvent**: Emitted when an A2A context becomes idle (no activity for the configured threshold). Contains the context ID, idle time in seconds, and task count.
- **A2AContextCompletedEvent**: Emitted when all tasks in an A2A context complete. Contains the context ID, total tasks, and duration in seconds.
- **A2AContextPrunedEvent**: Emitted when an A2A context is pruned (deleted). Contains the context ID, task count, and age in seconds.
## Event Handler Structure

View File

@@ -195,12 +195,19 @@ CrewAI는 여러분이 청취할 수 있는 다양한 이벤트를 제공합니
- **CrewTrainStartedEvent**: Crew가 훈련을 시작할 때 발생
- **CrewTrainCompletedEvent**: Crew가 훈련을 완료할 때 발생
- **CrewTrainFailedEvent**: Crew가 훈련을 완료하지 못할 때 발생
- **CrewTestResultEvent**: Crew 테스트 결과가 사용 가능할 때 발생합니다. 품질 점수, 실행 시간, 사용된 모델을 포함합니다.
### 에이전트 이벤트
- **AgentExecutionStartedEvent**: 에이전트가 작업 실행을 시작할 때 발생함
- **AgentExecutionCompletedEvent**: 에이전트가 작업 실행을 완료할 때 발생함
- **AgentExecutionErrorEvent**: 에이전트가 실행 도중 오류를 만날 때 발생함
- **LiteAgentExecutionStartedEvent**: LiteAgent가 실행을 시작할 때 발생합니다. 에이전트 정보, 도구, 메시지를 포함합니다.
- **LiteAgentExecutionCompletedEvent**: LiteAgent가 실행을 완료할 때 발생합니다. 에이전트 정보와 출력을 포함합니다.
- **LiteAgentExecutionErrorEvent**: LiteAgent가 실행 중 오류를 만날 때 발생합니다. 에이전트 정보와 오류 메시지를 포함합니다.
- **AgentEvaluationStartedEvent**: 에이전트 평가가 시작될 때 발생합니다. 에이전트 ID, 에이전트 역할, 선택적 태스크 ID, 반복 횟수를 포함합니다.
- **AgentEvaluationCompletedEvent**: 에이전트 평가가 완료될 때 발생합니다. 에이전트 ID, 에이전트 역할, 선택적 태스크 ID, 반복 횟수, 메트릭 카테고리, 점수를 포함합니다.
- **AgentEvaluationFailedEvent**: 에이전트 평가가 실패할 때 발생합니다. 에이전트 ID, 에이전트 역할, 선택적 태스크 ID, 반복 횟수, 오류 메시지를 포함합니다.
### 작업 이벤트
@@ -218,6 +225,16 @@ CrewAI는 여러분이 청취할 수 있는 다양한 이벤트를 제공합니
- **ToolExecutionErrorEvent**: 도구 실행 중 오류가 발생할 때 발생함
- **ToolSelectionErrorEvent**: 도구 선택 시 오류가 발생할 때 발생함
### MCP 이벤트
- **MCPConnectionStartedEvent**: MCP 서버 연결을 시작할 때 발생합니다. 서버 이름, URL, 전송 유형, 연결 시간 초과, 재연결 시도 여부를 포함합니다.
- **MCPConnectionCompletedEvent**: MCP 서버에 성공적으로 연결될 때 발생합니다. 서버 이름, 연결 시간(밀리초), 재연결 여부를 포함합니다.
- **MCPConnectionFailedEvent**: MCP 서버 연결이 실패할 때 발생합니다. 서버 이름, 오류 메시지, 오류 유형(`timeout`, `authentication`, `network` 등)을 포함합니다.
- **MCPToolExecutionStartedEvent**: MCP 도구 실행을 시작할 때 발생합니다. 서버 이름, 도구 이름, 도구 인수를 포함합니다.
- **MCPToolExecutionCompletedEvent**: MCP 도구 실행이 성공적으로 완료될 때 발생합니다. 서버 이름, 도구 이름, 결과, 실행 시간(밀리초)을 포함합니다.
- **MCPToolExecutionFailedEvent**: MCP 도구 실행이 실패할 때 발생합니다. 서버 이름, 도구 이름, 오류 메시지, 오류 유형(`timeout`, `validation`, `server_error` 등)을 포함합니다.
- **MCPConfigFetchFailedEvent**: MCP 서버 구성을 가져오는 데 실패할 때 발생합니다(예: 계정에서 MCP가 연결되지 않았거나, API 오류, 구성을 가져온 후 연결 실패). slug, 오류 메시지, 오류 유형(`not_connected`, `api_error`, `connection_failed`)을 포함합니다.
### 지식 이벤트
- **KnowledgeRetrievalStartedEvent**: 지식 검색이 시작될 때 발생
@@ -231,16 +248,26 @@ CrewAI는 여러분이 청취할 수 있는 다양한 이벤트를 제공합니
- **LLMGuardrailStartedEvent**: 가드레일 검증이 시작될 때 발생합니다. 적용되는 가드레일에 대한 세부 정보와 재시도 횟수를 포함합니다.
- **LLMGuardrailCompletedEvent**: 가드레일 검증이 완료될 때 발생합니다. 검증의 성공/실패, 결과 및 오류 메시지(있는 경우)에 대한 세부 정보를 포함합니다.
- **LLMGuardrailFailedEvent**: 가드레일 검증이 실패할 때 발생합니다. 오류 메시지와 재시도 횟수를 포함합니다.
### Flow 이벤트
- **FlowCreatedEvent**: Flow가 생성될 때 발생
- **FlowStartedEvent**: Flow가 실행을 시작할 때 발생
- **FlowFinishedEvent**: Flow가 실행을 완료할 때 발생
- **FlowPausedEvent**: 사람의 피드백을 기다리며 Flow가 일시 중지될 때 발생합니다. Flow 이름, Flow ID, 메서드 이름, 현재 상태, 피드백 요청 시 표시되는 메시지, 라우팅을 위한 선택적 결과 목록을 포함합니다.
- **FlowPlotEvent**: Flow가 플롯될 때 발생
- **MethodExecutionStartedEvent**: Flow 메서드가 실행을 시작할 때 발생
- **MethodExecutionFinishedEvent**: Flow 메서드가 실행을 완료할 때 발생
- **MethodExecutionFailedEvent**: Flow 메서드가 실행을 완료하지 못할 때 발생
- **MethodExecutionPausedEvent**: 사람의 피드백을 기다리며 Flow 메서드가 일시 중지될 때 발생합니다. Flow 이름, 메서드 이름, 현재 상태, Flow ID, 피드백 요청 시 표시되는 메시지, 라우팅을 위한 선택적 결과 목록을 포함합니다.
### Human In The Loop 이벤트
- **FlowInputRequestedEvent**: `Flow.ask()`를 통해 Flow가 사용자 입력을 요청할 때 발생합니다. Flow 이름, 메서드 이름, 사용자에게 표시되는 질문 또는 프롬프트, 선택적 메타데이터(예: 사용자 ID, 채널, 세션 컨텍스트)를 포함합니다.
- **FlowInputReceivedEvent**: `Flow.ask()` 이후 사용자 입력이 수신될 때 발생합니다. Flow 이름, 메서드 이름, 원래 질문, 사용자의 응답(시간 초과 시 `None`), 선택적 요청 메타데이터, 프로바이더의 선택적 응답 메타데이터(예: 응답자, 스레드 ID, 타임스탬프)를 포함합니다.
- **HumanFeedbackRequestedEvent**: `@human_feedback` 데코레이터가 적용된 메서드가 사람 리뷰어의 입력을 필요로 할 때 발생합니다. Flow 이름, 메서드 이름, 사람에게 검토를 위해 표시되는 메서드 출력, 피드백 요청 시 표시되는 메시지, 라우팅을 위한 선택적 결과 목록을 포함합니다.
- **HumanFeedbackReceivedEvent**: `@human_feedback` 데코레이터가 적용된 메서드에 대해 사람이 피드백을 제공할 때 발생합니다. Flow 이름, 메서드 이름, 사람이 제공한 원본 텍스트 피드백, 축약된 결과 문자열(emit이 지정된 경우)을 포함합니다.
### LLM 이벤트
@@ -248,6 +275,7 @@ CrewAI는 여러분이 청취할 수 있는 다양한 이벤트를 제공합니
- **LLMCallCompletedEvent**: LLM 호출이 완료될 때 발생
- **LLMCallFailedEvent**: LLM 호출이 실패할 때 발생
- **LLMStreamChunkEvent**: 스트리밍 LLM 응답 중 각 청크를 받을 때마다 발생
- **LLMThinkingChunkEvent**: thinking 모델에서 사고/추론 청크가 수신될 때 발생합니다. 청크 텍스트와 선택적 응답 ID를 포함합니다.
### 메모리 이벤트
@@ -259,6 +287,79 @@ CrewAI는 여러분이 청취할 수 있는 다양한 이벤트를 제공합니
- **MemorySaveFailedEvent**: 메모리 저장 작업에 실패할 때 발생합니다. 값, 메타데이터, agent 역할, 오류 메시지를 포함합니다.
- **MemoryRetrievalStartedEvent**: 태스크 프롬프트를 위한 메모리 검색이 시작될 때 발생합니다. 선택적 태스크 ID를 포함합니다.
- **MemoryRetrievalCompletedEvent**: 태스크 프롬프트를 위한 메모리 검색이 성공적으로 완료될 때 발생합니다. 태스크 ID, 메모리 내용, 검색 실행 시간을 포함합니다.
- **MemoryRetrievalFailedEvent**: 태스크 프롬프트를 위한 메모리 검색이 실패할 때 발생합니다. 선택적 태스크 ID와 오류 메시지를 포함합니다.
### 추론 이벤트
- **AgentReasoningStartedEvent**: 에이전트가 태스크에 대한 추론을 시작할 때 발생합니다. 에이전트 역할, 태스크 ID, 시도 횟수를 포함합니다.
- **AgentReasoningCompletedEvent**: 에이전트가 추론 과정을 마칠 때 발생합니다. 에이전트 역할, 태스크 ID, 생성된 계획, 에이전트가 진행할 준비가 되었는지 여부를 포함합니다.
- **AgentReasoningFailedEvent**: 추론 과정이 실패할 때 발생합니다. 에이전트 역할, 태스크 ID, 오류 메시지를 포함합니다.
### 관찰 이벤트
- **StepObservationStartedEvent**: Planner가 단계 결과를 관찰하기 시작할 때 발생합니다. 매 단계 실행 후, 관찰 LLM 호출 전에 발생합니다. 에이전트 역할, 단계 번호, 단계 설명을 포함합니다.
- **StepObservationCompletedEvent**: Planner가 단계 결과 관찰을 마칠 때 발생합니다. 단계 성공 여부, 학습된 핵심 정보, 남은 계획의 유효성, 전체 재계획 필요 여부, 제안된 개선 사항을 포함합니다.
- **StepObservationFailedEvent**: 관찰 LLM 호출 자체가 실패할 때 발생합니다. 시스템은 기본적으로 계획을 계속 진행합니다. 오류 메시지를 포함합니다.
- **PlanRefinementEvent**: Planner가 전체 재계획 없이 다음 단계 설명을 개선할 때 발생합니다. 개선된 단계 수와 적용된 개선 사항을 포함합니다.
- **PlanReplanTriggeredEvent**: 남은 계획이 근본적으로 잘못된 것으로 판단되어 Planner가 전체 재계획을 트리거할 때 발생합니다. 재계획 이유, 재계획 횟수, 보존된 완료 단계 수를 포함합니다.
- **GoalAchievedEarlyEvent**: Planner가 목표가 조기에 달성되었음을 감지하고 나머지 단계를 건너뛸 때 발생합니다. 남은 단계 수와 완료된 단계 수를 포함합니다.
### A2A (Agent-to-Agent) 이벤트
#### 위임 이벤트
- **A2ADelegationStartedEvent**: A2A 위임이 시작될 때 발생합니다. 엔드포인트 URL, 태스크 설명, 에이전트 ID, 컨텍스트 ID, 멀티턴 여부, 턴 번호, agent card 메타데이터, 프로토콜 버전, 프로바이더 정보, 선택적 skill ID를 포함합니다.
- **A2ADelegationCompletedEvent**: A2A 위임이 완료될 때 발생합니다. 완료 상태(`completed`, `input_required`, `failed` 등), 결과, 오류 메시지, 컨텍스트 ID, agent card 메타데이터를 포함합니다.
- **A2AParallelDelegationStartedEvent**: 여러 A2A 에이전트로의 병렬 위임이 시작될 때 발생합니다. 엔드포인트 목록과 태스크 설명을 포함합니다.
- **A2AParallelDelegationCompletedEvent**: 여러 A2A 에이전트로의 병렬 위임이 완료될 때 발생합니다. 엔드포인트 목록, 성공 수, 실패 수, 결과 요약을 포함합니다.
#### 대화 이벤트
- **A2AConversationStartedEvent**: 멀티턴 A2A 대화 시작 시 한 번 발생합니다. 첫 번째 메시지 교환 전에 발생합니다. 에이전트 ID, 엔드포인트, 컨텍스트 ID, agent card 메타데이터, 프로토콜 버전, 프로바이더 정보를 포함합니다.
- **A2AMessageSentEvent**: A2A 에이전트에 메시지가 전송될 때 발생합니다. 메시지 내용, 턴 번호, 컨텍스트 ID, 메시지 ID, 멀티턴 여부를 포함합니다.
- **A2AResponseReceivedEvent**: A2A 에이전트로부터 응답이 수신될 때 발생합니다. 응답 내용, 턴 번호, 컨텍스트 ID, 메시지 ID, 상태, 최종 응답 여부를 포함합니다.
- **A2AConversationCompletedEvent**: 멀티턴 A2A 대화 종료 시 한 번 발생합니다. 최종 상태(`completed` 또는 `failed`), 최종 결과, 오류 메시지, 컨텍스트 ID, 총 턴 수를 포함합니다.
#### 스트리밍 이벤트
- **A2AStreamingStartedEvent**: A2A 위임을 위한 스트리밍 모드가 시작될 때 발생합니다. 태스크 ID, 컨텍스트 ID, 엔드포인트, 턴 번호, 멀티턴 여부를 포함합니다.
- **A2AStreamingChunkEvent**: 스트리밍 청크가 수신될 때 발생합니다. 청크 텍스트, 청크 인덱스, 최종 청크 여부, 태스크 ID, 컨텍스트 ID, 턴 번호를 포함합니다.
#### 폴링 및 푸시 알림 이벤트
- **A2APollingStartedEvent**: A2A 위임을 위한 폴링 모드가 시작될 때 발생합니다. 태스크 ID, 컨텍스트 ID, 폴링 간격(초), 엔드포인트를 포함합니다.
- **A2APollingStatusEvent**: 각 폴링 반복 시 발생합니다. 태스크 ID, 컨텍스트 ID, 현재 태스크 상태, 경과 시간, 폴링 횟수를 포함합니다.
- **A2APushNotificationRegisteredEvent**: 푸시 알림 콜백이 등록될 때 발생합니다. 태스크 ID, 컨텍스트 ID, 콜백 URL, 엔드포인트를 포함합니다.
- **A2APushNotificationReceivedEvent**: 원격 A2A 에이전트로부터 푸시 알림이 수신될 때 발생합니다. 태스크 ID, 컨텍스트 ID, 현재 상태를 포함합니다.
- **A2APushNotificationSentEvent**: 콜백 URL로 푸시 알림이 전송될 때 발생합니다. 태스크 ID, 컨텍스트 ID, 콜백 URL, 상태, 전달 성공 여부, 선택적 오류 메시지를 포함합니다.
- **A2APushNotificationTimeoutEvent**: 푸시 알림 대기가 시간 초과될 때 발생합니다. 태스크 ID, 컨텍스트 ID, 시간 초과 시간(초)을 포함합니다.
#### 연결 및 인증 이벤트
- **A2AAgentCardFetchedEvent**: agent card가 성공적으로 가져올 때 발생합니다. 엔드포인트, 에이전트 이름, agent card 메타데이터, 프로토콜 버전, 프로바이더 정보, 캐시 여부, 가져오기 시간(밀리초)을 포함합니다.
- **A2AAuthenticationFailedEvent**: A2A 에이전트 인증이 실패할 때 발생합니다. 엔드포인트, 시도된 인증 유형(예: `bearer`, `oauth2`, `api_key`), 오류 메시지, HTTP 상태 코드를 포함합니다.
- **A2AConnectionErrorEvent**: A2A 통신 중 연결 오류가 발생할 때 발생합니다. 엔드포인트, 오류 메시지, 오류 유형(예: `timeout`, `connection_refused`, `dns_error`), HTTP 상태 코드, 시도 중인 작업을 포함합니다.
- **A2ATransportNegotiatedEvent**: A2A 에이전트와 전송 프로토콜이 협상될 때 발생합니다. 협상된 전송, 협상된 URL, 선택 소스(`client_preferred`, `server_preferred`, `fallback`), 클라이언트/서버 지원 전송을 포함합니다.
- **A2AContentTypeNegotiatedEvent**: A2A 에이전트와 콘텐츠 유형이 협상될 때 발생합니다. 클라이언트/서버 입출력 모드, 협상된 입출력 모드, 협상 성공 여부를 포함합니다.
#### 아티팩트 이벤트
- **A2AArtifactReceivedEvent**: 원격 A2A 에이전트로부터 아티팩트가 수신될 때 발생합니다. 태스크 ID, 아티팩트 ID, 아티팩트 이름, 설명, MIME 유형, 크기(바이트), 콘텐츠 추가 여부를 포함합니다.
#### 서버 태스크 이벤트
- **A2AServerTaskStartedEvent**: A2A 서버 태스크 실행이 시작될 때 발생합니다. 태스크 ID와 컨텍스트 ID를 포함합니다.
- **A2AServerTaskCompletedEvent**: A2A 서버 태스크 실행이 완료될 때 발생합니다. 태스크 ID, 컨텍스트 ID, 결과를 포함합니다.
- **A2AServerTaskCanceledEvent**: A2A 서버 태스크 실행이 취소될 때 발생합니다. 태스크 ID와 컨텍스트 ID를 포함합니다.
- **A2AServerTaskFailedEvent**: A2A 서버 태스크 실행이 실패할 때 발생합니다. 태스크 ID, 컨텍스트 ID, 오류 메시지를 포함합니다.
#### 컨텍스트 수명 주기 이벤트
- **A2AContextCreatedEvent**: A2A 컨텍스트가 생성될 때 발생합니다. 컨텍스트는 대화 또는 워크플로우에서 관련 태스크를 그룹화합니다. 컨텍스트 ID와 생성 타임스탬프를 포함합니다.
- **A2AContextExpiredEvent**: TTL로 인해 A2A 컨텍스트가 만료될 때 발생합니다. 컨텍스트 ID, 생성 타임스탬프, 수명(초), 태스크 수를 포함합니다.
- **A2AContextIdleEvent**: A2A 컨텍스트가 유휴 상태가 될 때(설정된 임계값 동안 활동 없음) 발생합니다. 컨텍스트 ID, 유휴 시간(초), 태스크 수를 포함합니다.
- **A2AContextCompletedEvent**: A2A 컨텍스트의 모든 태스크가 완료될 때 발생합니다. 컨텍스트 ID, 총 태스크 수, 지속 시간(초)을 포함합니다.
- **A2AContextPrunedEvent**: A2A 컨텍스트가 정리(삭제)될 때 발생합니다. 컨텍스트 ID, 태스크 수, 수명(초)을 포함합니다.
## 이벤트 핸들러 구조

View File

@@ -196,12 +196,19 @@ O CrewAI fornece uma ampla variedade de eventos para escuta:
- **CrewTrainStartedEvent**: Emitido ao iniciar o treinamento de um Crew
- **CrewTrainCompletedEvent**: Emitido ao concluir o treinamento de um Crew
- **CrewTrainFailedEvent**: Emitido ao falhar no treinamento de um Crew
- **CrewTestResultEvent**: Emitido quando um resultado de teste de Crew está disponível. Contém a pontuação de qualidade, duração da execução e modelo utilizado.
### Eventos de Agent
- **AgentExecutionStartedEvent**: Emitido quando um Agent inicia a execução de uma tarefa
- **AgentExecutionCompletedEvent**: Emitido quando um Agent conclui a execução de uma tarefa
- **AgentExecutionErrorEvent**: Emitido quando um Agent encontra um erro durante a execução
- **LiteAgentExecutionStartedEvent**: Emitido quando um LiteAgent inicia a execução. Contém as informações do agente, ferramentas e mensagens.
- **LiteAgentExecutionCompletedEvent**: Emitido quando um LiteAgent conclui a execução. Contém as informações do agente e a saída.
- **LiteAgentExecutionErrorEvent**: Emitido quando um LiteAgent encontra um erro durante a execução. Contém as informações do agente e a mensagem de erro.
- **AgentEvaluationStartedEvent**: Emitido quando uma avaliação de agente é iniciada. Contém o ID do agente, papel do agente, ID da tarefa opcional e número da iteração.
- **AgentEvaluationCompletedEvent**: Emitido quando uma avaliação de agente é concluída. Contém o ID do agente, papel do agente, ID da tarefa opcional, número da iteração, categoria da métrica e pontuação.
- **AgentEvaluationFailedEvent**: Emitido quando uma avaliação de agente falha. Contém o ID do agente, papel do agente, ID da tarefa opcional, número da iteração e mensagem de erro.
### Eventos de Task
@@ -219,6 +226,16 @@ O CrewAI fornece uma ampla variedade de eventos para escuta:
- **ToolExecutionErrorEvent**: Emitido quando ocorre erro na execução de uma ferramenta
- **ToolSelectionErrorEvent**: Emitido ao ocorrer erro na seleção de uma ferramenta
### Eventos de MCP
- **MCPConnectionStartedEvent**: Emitido ao iniciar a conexão com um servidor MCP. Contém o nome do servidor, URL, tipo de transporte, timeout de conexão e se é uma tentativa de reconexão.
- **MCPConnectionCompletedEvent**: Emitido ao conectar com sucesso a um servidor MCP. Contém o nome do servidor, duração da conexão em milissegundos e se foi uma reconexão.
- **MCPConnectionFailedEvent**: Emitido quando a conexão com um servidor MCP falha. Contém o nome do servidor, mensagem de erro e tipo de erro (`timeout`, `authentication`, `network`, etc.).
- **MCPToolExecutionStartedEvent**: Emitido ao iniciar a execução de uma ferramenta MCP. Contém o nome do servidor, nome da ferramenta e argumentos da ferramenta.
- **MCPToolExecutionCompletedEvent**: Emitido quando a execução de uma ferramenta MCP é concluída com sucesso. Contém o nome do servidor, nome da ferramenta, resultado e duração da execução em milissegundos.
- **MCPToolExecutionFailedEvent**: Emitido quando a execução de uma ferramenta MCP falha. Contém o nome do servidor, nome da ferramenta, mensagem de erro e tipo de erro (`timeout`, `validation`, `server_error`, etc.).
- **MCPConfigFetchFailedEvent**: Emitido quando a obtenção da configuração de um servidor MCP falha (ex.: o MCP não está conectado na sua conta, erro de API ou falha de conexão após a configuração ser obtida). Contém o slug, mensagem de erro e tipo de erro (`not_connected`, `api_error`, `connection_failed`).
### Eventos de Knowledge
- **KnowledgeRetrievalStartedEvent**: Emitido ao iniciar recuperação de conhecimento
@@ -232,16 +249,26 @@ O CrewAI fornece uma ampla variedade de eventos para escuta:
- **LLMGuardrailStartedEvent**: Emitido ao iniciar validação dos guardrails. Contém detalhes do guardrail aplicado e tentativas.
- **LLMGuardrailCompletedEvent**: Emitido ao concluir validação dos guardrails. Contém detalhes sobre sucesso/falha na validação, resultados e mensagens de erro, se houver.
- **LLMGuardrailFailedEvent**: Emitido quando a validação do guardrail falha. Contém a mensagem de erro e o número de tentativas.
### Eventos de Flow
- **FlowCreatedEvent**: Emitido ao criar um Flow
- **FlowStartedEvent**: Emitido ao iniciar a execução de um Flow
- **FlowFinishedEvent**: Emitido ao concluir a execução de um Flow
- **FlowPausedEvent**: Emitido quando um Flow é pausado aguardando feedback humano. Contém o nome do flow, ID do flow, nome do método, estado atual, mensagem exibida ao solicitar feedback e lista opcional de resultados possíveis para roteamento.
- **FlowPlotEvent**: Emitido ao plotar um Flow
- **MethodExecutionStartedEvent**: Emitido ao iniciar a execução de um método do Flow
- **MethodExecutionFinishedEvent**: Emitido ao concluir a execução de um método do Flow
- **MethodExecutionFailedEvent**: Emitido ao falhar na execução de um método do Flow
- **MethodExecutionPausedEvent**: Emitido quando um método do Flow é pausado aguardando feedback humano. Contém o nome do flow, nome do método, estado atual, ID do flow, mensagem exibida ao solicitar feedback e lista opcional de resultados possíveis para roteamento.
### Eventos de Human In The Loop
- **FlowInputRequestedEvent**: Emitido quando um Flow solicita entrada do usuário via `Flow.ask()`. Contém o nome do flow, nome do método, a pergunta ou prompt exibido ao usuário e metadados opcionais (ex.: ID do usuário, canal, contexto da sessão).
- **FlowInputReceivedEvent**: Emitido quando a entrada do usuário é recebida após `Flow.ask()`. Contém o nome do flow, nome do método, a pergunta original, a resposta do usuário (ou `None` se expirou), metadados opcionais da solicitação e metadados opcionais da resposta do provedor (ex.: quem respondeu, ID do thread, timestamps).
- **HumanFeedbackRequestedEvent**: Emitido quando um método decorado com `@human_feedback` requer entrada de um revisor humano. Contém o nome do flow, nome do método, a saída do método exibida ao humano para revisão, a mensagem exibida ao solicitar feedback e lista opcional de resultados possíveis para roteamento.
- **HumanFeedbackReceivedEvent**: Emitido quando um humano fornece feedback em resposta a um método decorado com `@human_feedback`. Contém o nome do flow, nome do método, o texto bruto do feedback fornecido pelo humano e a string de resultado consolidada (se emit foi especificado).
### Eventos de LLM
@@ -249,6 +276,91 @@ O CrewAI fornece uma ampla variedade de eventos para escuta:
- **LLMCallCompletedEvent**: Emitido ao concluir uma chamada LLM
- **LLMCallFailedEvent**: Emitido ao falhar uma chamada LLM
- **LLMStreamChunkEvent**: Emitido para cada chunk recebido durante respostas em streaming do LLM
- **LLMThinkingChunkEvent**: Emitido quando um chunk de pensamento/raciocínio é recebido de um modelo de pensamento. Contém o texto do chunk e ID de resposta opcional.
### Eventos de Memória
- **MemoryQueryStartedEvent**: Emitido quando uma consulta de memória é iniciada. Contém a consulta, limite e threshold de pontuação opcional.
- **MemoryQueryCompletedEvent**: Emitido quando uma consulta de memória é concluída com sucesso. Contém a consulta, resultados, limite, threshold de pontuação e tempo de execução da consulta.
- **MemoryQueryFailedEvent**: Emitido quando uma consulta de memória falha. Contém a consulta, limite, threshold de pontuação e mensagem de erro.
- **MemorySaveStartedEvent**: Emitido quando uma operação de salvamento de memória é iniciada. Contém o valor a ser salvo, metadados e papel do agente opcional.
- **MemorySaveCompletedEvent**: Emitido quando uma operação de salvamento de memória é concluída com sucesso. Contém o valor salvo, metadados, papel do agente e tempo de salvamento.
- **MemorySaveFailedEvent**: Emitido quando uma operação de salvamento de memória falha. Contém o valor, metadados, papel do agente e mensagem de erro.
- **MemoryRetrievalStartedEvent**: Emitido quando a recuperação de memória para um prompt de tarefa é iniciada. Contém o ID da tarefa opcional.
- **MemoryRetrievalCompletedEvent**: Emitido quando a recuperação de memória para um prompt de tarefa é concluída com sucesso. Contém o ID da tarefa, conteúdo da memória e tempo de execução da recuperação.
- **MemoryRetrievalFailedEvent**: Emitido quando a recuperação de memória para um prompt de tarefa falha. Contém o ID da tarefa opcional e mensagem de erro.
### Eventos de Raciocínio
- **AgentReasoningStartedEvent**: Emitido quando um agente começa a raciocinar sobre uma tarefa. Contém o papel do agente, ID da tarefa e número da tentativa.
- **AgentReasoningCompletedEvent**: Emitido quando um agente finaliza seu processo de raciocínio. Contém o papel do agente, ID da tarefa, o plano produzido e se o agente está pronto para prosseguir.
- **AgentReasoningFailedEvent**: Emitido quando o processo de raciocínio falha. Contém o papel do agente, ID da tarefa e mensagem de erro.
### Eventos de Observação
- **StepObservationStartedEvent**: Emitido quando o Planner começa a observar o resultado de um passo. Disparado após cada execução de passo, antes da chamada LLM de observação. Contém o papel do agente, número do passo e descrição do passo.
- **StepObservationCompletedEvent**: Emitido quando o Planner finaliza a observação do resultado de um passo. Contém se o passo foi concluído com sucesso, informações-chave aprendidas, se o plano restante ainda é válido, se é necessário um replanejamento completo e refinamentos sugeridos.
- **StepObservationFailedEvent**: Emitido quando a chamada LLM de observação falha. O sistema continua o plano por padrão. Contém a mensagem de erro.
- **PlanRefinementEvent**: Emitido quando o Planner refina descrições de passos futuros sem replanejamento completo. Contém o número de passos refinados e os refinamentos aplicados.
- **PlanReplanTriggeredEvent**: Emitido quando o Planner dispara um replanejamento completo porque o plano restante foi considerado fundamentalmente incorreto. Contém o motivo do replanejamento, contagem de replanejamentos e número de passos concluídos preservados.
- **GoalAchievedEarlyEvent**: Emitido quando o Planner detecta que o objetivo foi alcançado antecipadamente e os passos restantes serão ignorados. Contém o número de passos restantes e passos concluídos.
### Eventos A2A (Agent-to-Agent)
#### Eventos de Delegação
- **A2ADelegationStartedEvent**: Emitido quando a delegação A2A é iniciada. Contém a URL do endpoint, descrição da tarefa, ID do agente, ID do contexto, se é multiturn, número do turno, metadados do agent card, versão do protocolo, informações do provedor e ID da skill opcional.
- **A2ADelegationCompletedEvent**: Emitido quando a delegação A2A é concluída. Contém o status de conclusão (`completed`, `input_required`, `failed`, etc.), resultado, mensagem de erro, ID do contexto e metadados do agent card.
- **A2AParallelDelegationStartedEvent**: Emitido quando a delegação paralela para múltiplos agentes A2A é iniciada. Contém a lista de endpoints e a descrição da tarefa.
- **A2AParallelDelegationCompletedEvent**: Emitido quando a delegação paralela para múltiplos agentes A2A é concluída. Contém a lista de endpoints, contagem de sucessos, contagem de falhas e resumo dos resultados.
#### Eventos de Conversação
- **A2AConversationStartedEvent**: Emitido uma vez no início de uma conversação multiturn A2A, antes da primeira troca de mensagens. Contém o ID do agente, endpoint, ID do contexto, metadados do agent card, versão do protocolo e informações do provedor.
- **A2AMessageSentEvent**: Emitido quando uma mensagem é enviada ao agente A2A. Contém o conteúdo da mensagem, número do turno, ID do contexto, ID da mensagem e se é multiturn.
- **A2AResponseReceivedEvent**: Emitido quando uma resposta é recebida do agente A2A. Contém o conteúdo da resposta, número do turno, ID do contexto, ID da mensagem, status e se é a resposta final.
- **A2AConversationCompletedEvent**: Emitido uma vez ao final de uma conversação multiturn A2A. Contém o status final (`completed` ou `failed`), resultado final, mensagem de erro, ID do contexto e número total de turnos.
#### Eventos de Streaming
- **A2AStreamingStartedEvent**: Emitido quando o modo streaming é iniciado para delegação A2A. Contém o ID da tarefa, ID do contexto, endpoint, número do turno e se é multiturn.
- **A2AStreamingChunkEvent**: Emitido quando um chunk de streaming é recebido. Contém o texto do chunk, índice do chunk, se é o chunk final, ID da tarefa, ID do contexto e número do turno.
#### Eventos de Polling e Push Notification
- **A2APollingStartedEvent**: Emitido quando o modo polling é iniciado para delegação A2A. Contém o ID da tarefa, ID do contexto, intervalo de polling em segundos e endpoint.
- **A2APollingStatusEvent**: Emitido em cada iteração de polling. Contém o ID da tarefa, ID do contexto, estado atual da tarefa, segundos decorridos e contagem de polls.
- **A2APushNotificationRegisteredEvent**: Emitido quando um callback de push notification é registrado. Contém o ID da tarefa, ID do contexto, URL do callback e endpoint.
- **A2APushNotificationReceivedEvent**: Emitido quando uma push notification é recebida do agente A2A remoto. Contém o ID da tarefa, ID do contexto e estado atual.
- **A2APushNotificationSentEvent**: Emitido quando uma push notification é enviada para uma URL de callback. Contém o ID da tarefa, ID do contexto, URL do callback, estado, se a entrega foi bem-sucedida e mensagem de erro opcional.
- **A2APushNotificationTimeoutEvent**: Emitido quando a espera por push notification expira. Contém o ID da tarefa, ID do contexto e duração do timeout em segundos.
#### Eventos de Conexão e Autenticação
- **A2AAgentCardFetchedEvent**: Emitido quando um agent card é obtido com sucesso. Contém o endpoint, nome do agente, metadados do agent card, versão do protocolo, informações do provedor, se foi do cache e tempo de busca em milissegundos.
- **A2AAuthenticationFailedEvent**: Emitido quando a autenticação com um agente A2A falha. Contém o endpoint, tipo de autenticação tentada (ex.: `bearer`, `oauth2`, `api_key`), mensagem de erro e código de status HTTP.
- **A2AConnectionErrorEvent**: Emitido quando ocorre um erro de conexão durante a comunicação A2A. Contém o endpoint, mensagem de erro, tipo de erro (ex.: `timeout`, `connection_refused`, `dns_error`), código de status HTTP e a operação sendo tentada.
- **A2ATransportNegotiatedEvent**: Emitido quando o protocolo de transporte é negociado com um agente A2A. Contém o transporte negociado, URL negociada, fonte de seleção (`client_preferred`, `server_preferred`, `fallback`) e transportes suportados pelo cliente/servidor.
- **A2AContentTypeNegotiatedEvent**: Emitido quando os tipos de conteúdo são negociados com um agente A2A. Contém os modos de entrada/saída do cliente/servidor, modos de entrada/saída negociados e se a negociação foi bem-sucedida.
#### Eventos de Artefatos
- **A2AArtifactReceivedEvent**: Emitido quando um artefato é recebido de um agente A2A remoto. Contém o ID da tarefa, ID do artefato, nome do artefato, descrição, tipo MIME, tamanho em bytes e se o conteúdo deve ser concatenado.
#### Eventos de Tarefa do Servidor
- **A2AServerTaskStartedEvent**: Emitido quando a execução de uma tarefa do servidor A2A é iniciada. Contém o ID da tarefa e ID do contexto.
- **A2AServerTaskCompletedEvent**: Emitido quando a execução de uma tarefa do servidor A2A é concluída. Contém o ID da tarefa, ID do contexto e resultado.
- **A2AServerTaskCanceledEvent**: Emitido quando a execução de uma tarefa do servidor A2A é cancelada. Contém o ID da tarefa e ID do contexto.
- **A2AServerTaskFailedEvent**: Emitido quando a execução de uma tarefa do servidor A2A falha. Contém o ID da tarefa, ID do contexto e mensagem de erro.
#### Eventos de Ciclo de Vida do Contexto
- **A2AContextCreatedEvent**: Emitido quando um contexto A2A é criado. Contextos agrupam tarefas relacionadas em uma conversação ou workflow. Contém o ID do contexto e timestamp de criação.
- **A2AContextExpiredEvent**: Emitido quando um contexto A2A expira devido ao TTL. Contém o ID do contexto, timestamp de criação, idade em segundos e contagem de tarefas.
- **A2AContextIdleEvent**: Emitido quando um contexto A2A fica inativo (sem atividade pelo threshold configurado). Contém o ID do contexto, tempo de inatividade em segundos e contagem de tarefas.
- **A2AContextCompletedEvent**: Emitido quando todas as tarefas em um contexto A2A são concluídas. Contém o ID do contexto, total de tarefas e duração em segundos.
- **A2AContextPrunedEvent**: Emitido quando um contexto A2A é podado (deletado). Contém o ID do contexto, contagem de tarefas e idade em segundos.
## Estrutura dos Handlers de Evento

View File

@@ -66,6 +66,7 @@ from crewai.mcp.tool_resolver import MCPToolResolver
from crewai.rag.embeddings.types import EmbedderConfig
from crewai.security.fingerprint import Fingerprint
from crewai.tools.agent_tools.agent_tools import AgentTools
from crewai.types.callback import SerializableCallable
from crewai.utilities.agent_utils import (
get_tool_names,
is_inside_event_loop,
@@ -143,7 +144,7 @@ class Agent(BaseAgent):
default=None,
description="Maximum execution time for an agent to execute a task",
)
step_callback: Any | None = Field(
step_callback: SerializableCallable | None = Field(
default=None,
description="Callback to be executed after each step of the agent execution.",
)
@@ -151,10 +152,10 @@ class Agent(BaseAgent):
default=True,
description="Use system prompt for the agent.",
)
llm: str | InstanceOf[BaseLLM] | Any = Field(
llm: str | InstanceOf[BaseLLM] | None = Field(
description="Language model that will run the agent.", default=None
)
function_calling_llm: str | InstanceOf[BaseLLM] | Any | None = Field(
function_calling_llm: str | InstanceOf[BaseLLM] | None = Field(
description="Language model that will run the agent.", default=None
)
system_template: str | None = Field(
@@ -340,7 +341,7 @@ class Agent(BaseAgent):
return (
hasattr(self.llm, "supports_function_calling")
and callable(getattr(self.llm, "supports_function_calling", None))
and self.llm.supports_function_calling()
and self.llm.supports_function_calling() # type: ignore[union-attr]
and len(tools) > 0
)

View File

@@ -1,7 +1,6 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from collections.abc import Callable
from copy import copy as shallow_copy
from hashlib import md5
import re
@@ -12,6 +11,7 @@ from pydantic import (
UUID4,
BaseModel,
Field,
InstanceOf,
PrivateAttr,
field_validator,
model_validator,
@@ -26,10 +26,14 @@ from crewai.agents.tools_handler import ToolsHandler
from crewai.knowledge.knowledge import Knowledge
from crewai.knowledge.knowledge_config import KnowledgeConfig
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
from crewai.knowledge.storage.base_knowledge_storage import BaseKnowledgeStorage
from crewai.mcp.config import MCPServerConfig
from crewai.memory.memory_scope import MemoryScope, MemorySlice
from crewai.memory.unified_memory import Memory
from crewai.rag.embeddings.types import EmbedderConfig
from crewai.security.security_config import SecurityConfig
from crewai.tools.base_tool import BaseTool, Tool
from crewai.types.callback import SerializableCallable
from crewai.utilities.config import process_config
from crewai.utilities.i18n import I18N, get_i18n
from crewai.utilities.logger import Logger
@@ -179,7 +183,7 @@ class BaseAgent(BaseModel, ABC, metaclass=AgentMeta):
default=None,
description="Knowledge sources for the agent.",
)
knowledge_storage: Any | None = Field(
knowledge_storage: InstanceOf[BaseKnowledgeStorage] | None = Field(
default=None,
description="Custom knowledge storage for the agent.",
)
@@ -187,7 +191,7 @@ class BaseAgent(BaseModel, ABC, metaclass=AgentMeta):
default_factory=SecurityConfig,
description="Security configuration for the agent, including fingerprinting.",
)
callbacks: list[Callable[[Any], Any]] = Field(
callbacks: list[SerializableCallable] = Field(
default_factory=list, description="Callbacks to be used for the agent"
)
adapted_agent: bool = Field(
@@ -205,7 +209,7 @@ class BaseAgent(BaseModel, ABC, metaclass=AgentMeta):
default=None,
description="List of MCP server references. Supports 'https://server.com/path' for external servers and bare slugs like 'notion' for connected MCP integrations. Use '#tool_name' suffix for specific tools.",
)
memory: Any = Field(
memory: bool | Memory | MemoryScope | MemorySlice | None = Field(
default=None,
description=(
"Enable agent memory. Pass True for default Memory(), "

View File

@@ -35,6 +35,7 @@ from typing_extensions import Self
if TYPE_CHECKING:
from crewai_files import FileInput
from opentelemetry.trace import Span
try:
from crewai_files import get_supported_content_types
@@ -83,6 +84,8 @@ from crewai.knowledge.knowledge import Knowledge
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
from crewai.llm import LLM
from crewai.llms.base_llm import BaseLLM
from crewai.memory.memory_scope import MemoryScope, MemorySlice
from crewai.memory.unified_memory import Memory
from crewai.process import Process
from crewai.rag.embeddings.types import EmbedderConfig
from crewai.rag.types import SearchResult
@@ -94,6 +97,7 @@ from crewai.tasks.task_output import TaskOutput
from crewai.tools.agent_tools.agent_tools import AgentTools
from crewai.tools.agent_tools.read_file_tool import ReadFileTool
from crewai.tools.base_tool import BaseTool
from crewai.types.callback import SerializableCallable
from crewai.types.streaming import CrewStreamingOutput
from crewai.types.usage_metrics import UsageMetrics
from crewai.utilities.constants import NOT_SPECIFIED, TRAINING_DATA_FILE
@@ -112,7 +116,6 @@ from crewai.utilities.llm_utils import create_llm
from crewai.utilities.logger import Logger
from crewai.utilities.planning_handler import CrewPlanner
from crewai.utilities.printer import PrinterColor
from crewai.utilities.replanning_evaluator import ReplanningEvaluator
from crewai.utilities.rpm_controller import RPMController
from crewai.utilities.streaming import (
create_async_chunk_generator,
@@ -167,12 +170,12 @@ class Crew(FlowTrackable, BaseModel):
"""
__hash__ = object.__hash__
_execution_span: Any = PrivateAttr()
_execution_span: Span | None = PrivateAttr()
_rpm_controller: RPMController = PrivateAttr()
_logger: Logger = PrivateAttr()
_file_handler: FileHandler = PrivateAttr()
_cache_handler: InstanceOf[CacheHandler] = PrivateAttr(default_factory=CacheHandler)
_memory: Any = PrivateAttr(default=None) # Unified Memory | MemoryScope
_memory: Memory | MemoryScope | MemorySlice | None = PrivateAttr(default=None)
_train: bool | None = PrivateAttr(default=False)
_train_iteration: int | None = PrivateAttr()
_inputs: dict[str, Any] | None = PrivateAttr(default=None)
@@ -183,8 +186,6 @@ class Crew(FlowTrackable, BaseModel):
default_factory=TaskOutputStorageHandler
)
_kickoff_event_id: str | None = PrivateAttr(default=None)
_replan_count: int = PrivateAttr(default=0)
_original_task_descriptions: list[str] = PrivateAttr(default_factory=list)
name: str | None = Field(default="crew")
cache: bool = Field(default=True)
@@ -192,7 +193,7 @@ class Crew(FlowTrackable, BaseModel):
agents: list[BaseAgent] = Field(default_factory=list)
process: Process = Field(default=Process.sequential)
verbose: bool = Field(default=False)
memory: bool | Any = Field(
memory: bool | Memory | MemoryScope | MemorySlice | None = Field(
default=False,
description=(
"Enable crew memory. Pass True for default Memory(), "
@@ -207,36 +208,34 @@ class Crew(FlowTrackable, BaseModel):
default=None,
description="Metrics for the LLM usage during all tasks execution.",
)
manager_llm: str | InstanceOf[BaseLLM] | Any | None = Field(
manager_llm: str | InstanceOf[BaseLLM] | None = Field(
description="Language model that will run the agent.", default=None
)
manager_agent: BaseAgent | None = Field(
description="Custom agent that will be used as manager.", default=None
)
function_calling_llm: str | InstanceOf[LLM] | Any | None = Field(
function_calling_llm: str | InstanceOf[LLM] | None = Field(
description="Language model that will run the agent.", default=None
)
config: Json[dict[str, Any]] | dict[str, Any] | None = Field(default=None)
id: UUID4 = Field(default_factory=uuid.uuid4, frozen=True)
share_crew: bool | None = Field(default=False)
step_callback: Any | None = Field(
step_callback: SerializableCallable | None = Field(
default=None,
description="Callback to be executed after each step for all agents execution.",
)
task_callback: Any | None = Field(
task_callback: SerializableCallable | None = Field(
default=None,
description="Callback to be executed after each task for all agents execution.",
)
before_kickoff_callbacks: list[
Callable[[dict[str, Any] | None], dict[str, Any] | None]
] = Field(
before_kickoff_callbacks: list[SerializableCallable] = Field(
default_factory=list,
description=(
"List of callbacks to be executed before crew kickoff. "
"It may be used to adjust inputs before the crew is executed."
),
)
after_kickoff_callbacks: list[Callable[[CrewOutput], CrewOutput]] = Field(
after_kickoff_callbacks: list[SerializableCallable] = Field(
default_factory=list,
description=(
"List of callbacks to be executed after crew kickoff. "
@@ -272,21 +271,6 @@ class Crew(FlowTrackable, BaseModel):
"Language model that will run the AgentPlanner if planning is True."
),
)
replan_on_failure: bool = Field(
default=False,
description=(
"When True and planning is enabled, evaluate each task result against "
"the plan and trigger replanning if results deviate significantly."
),
)
max_replans: int = Field(
default=3,
description=(
"Maximum number of replans allowed during a single crew execution. "
"Prevents infinite replanning loops."
),
ge=0,
)
task_execution_output_json_files: list[str] | None = Field(
default=None,
description="list of file paths for task execution JSON files.",
@@ -367,7 +351,7 @@ class Crew(FlowTrackable, BaseModel):
self._file_handler = FileHandler(self.output_log_file)
self._rpm_controller = RPMController(max_rpm=self.max_rpm, logger=self._logger)
if self.function_calling_llm and not isinstance(self.function_calling_llm, LLM):
self.function_calling_llm = create_llm(self.function_calling_llm)
self.function_calling_llm = create_llm(self.function_calling_llm) # type: ignore[assignment]
return self
@@ -381,7 +365,7 @@ class Crew(FlowTrackable, BaseModel):
if self.embedder is not None:
from crewai.rag.embeddings.factory import build_embedder
embedder = build_embedder(self.embedder)
embedder = build_embedder(self.embedder) # type: ignore[arg-type]
self._memory = Memory(embedder=embedder)
elif self.memory:
# User passed a Memory / MemoryScope / MemorySlice instance
@@ -1059,7 +1043,6 @@ class Crew(FlowTrackable, BaseModel):
task_outputs.append(task_output)
self._process_task_result(task, task_output)
self._store_execution_log(task, task_output, task_index, was_replayed)
self._maybe_replan(task, task_output, task_index, tasks, task_outputs)
if pending_tasks:
task_outputs = await self._aprocess_async_tasks(pending_tasks, was_replayed)
@@ -1106,11 +1089,6 @@ class Crew(FlowTrackable, BaseModel):
tasks=self.tasks, planning_agent_llm=self.planning_llm
)._handle_crew_planning()
# Store original descriptions before appending plans so replanning
# can strip the old plan and apply a fresh one.
self._original_task_descriptions = [task.description for task in self.tasks]
self._replan_count = 0
plan_map: dict[int, str] = {}
for step_plan in result.list_of_plans_per_task:
if step_plan.task_number in plan_map:
@@ -1132,95 +1110,6 @@ class Crew(FlowTrackable, BaseModel):
f"No plan found for Task Number {task_number}",
)
def _maybe_replan(
self,
task: Task,
task_output: TaskOutput,
task_index: int,
tasks: list[Task],
all_task_outputs: list[TaskOutput],
) -> None:
"""Evaluate a completed task and replan remaining tasks if needed.
This is called after each synchronous task completes when both
``planning`` and ``replan_on_failure`` are enabled. It uses a
lightweight LLM call to decide whether the result deviates from
the plan, and if so generates revised plans for remaining tasks.
Args:
task: The task that just completed.
task_output: The output produced by the completed task.
task_index: Index of the completed task in the tasks list.
tasks: The full list of tasks being executed.
all_task_outputs: All task outputs collected so far.
"""
if (
not self.planning
or not self.replan_on_failure
or self._replan_count >= self.max_replans
):
return
remaining_tasks = tasks[task_index + 1 :]
if not remaining_tasks:
return
# Extract the plan portion appended to this task's description
original_desc = (
self._original_task_descriptions[task_index]
if task_index < len(self._original_task_descriptions)
else task.description
)
plan_text = task.description[len(original_desc) :]
if not plan_text.strip():
return
evaluator = ReplanningEvaluator(llm=self.planning_llm)
decision = evaluator.evaluate(
completed_task=task,
task_output=task_output,
original_plan=plan_text,
remaining_tasks=remaining_tasks,
)
if not decision.should_replan:
return
self._replan_count += 1
self._logger.log(
"info",
f"Replanning triggered (replan {self._replan_count}/{self.max_replans}): "
f"{decision.reason}",
)
completed_tasks = tasks[: task_index + 1]
planner = CrewPlanner(tasks=self.tasks, planning_agent_llm=self.planning_llm)
replan_result = planner._handle_crew_replanning(
completed_tasks=completed_tasks,
completed_outputs=all_task_outputs,
remaining_tasks=remaining_tasks,
deviation_reason=decision.reason,
)
# Build a map of new plans keyed by task_number
new_plan_map: dict[int, str] = {}
for step_plan in replan_result.list_of_plans_per_task:
if step_plan.task_number not in new_plan_map:
new_plan_map[step_plan.task_number] = step_plan.plan
# Apply revised plans to remaining tasks, restoring original
# descriptions first so old plans don't accumulate.
for plan_idx, remaining_task in enumerate(remaining_tasks):
global_idx = task_index + 1 + plan_idx
if global_idx < len(self._original_task_descriptions):
remaining_task.description = self._original_task_descriptions[
global_idx
]
plan_number = plan_idx + 1
if plan_number in new_plan_map:
remaining_task.description += new_plan_map[plan_number]
def _store_execution_log(
self,
task: Task,
@@ -1353,7 +1242,6 @@ class Crew(FlowTrackable, BaseModel):
task_outputs.append(task_output)
self._process_task_result(task, task_output)
self._store_execution_log(task, task_output, task_index, was_replayed)
self._maybe_replan(task, task_output, task_index, tasks, task_outputs)
if futures:
task_outputs = self._process_async_tasks(futures, was_replayed)

View File

@@ -81,6 +81,7 @@ from crewai.flow.flow_wrappers import (
SimpleFlowCondition,
StartMethod,
)
from crewai.flow.input_provider import InputProvider
from crewai.flow.persistence.base import FlowPersistence
from crewai.flow.types import (
FlowExecutionData,
@@ -99,6 +100,8 @@ from crewai.flow.utils import (
is_flow_method_name,
is_simple_flow_condition,
)
from crewai.memory.memory_scope import MemoryScope, MemorySlice
from crewai.memory.unified_memory import Memory
if TYPE_CHECKING:
@@ -501,7 +504,7 @@ class LockedListProxy(list, Generic[T]): # type: ignore[type-arg]
def index(
self, value: T, start: SupportsIndex = 0, stop: SupportsIndex | None = None
) -> int: # type: ignore[override]
) -> int:
if stop is None:
return self._list.index(value, start)
return self._list.index(value, start, stop)
@@ -520,13 +523,13 @@ class LockedListProxy(list, Generic[T]): # type: ignore[type-arg]
def copy(self) -> list[T]:
return self._list.copy()
def __add__(self, other: list[T]) -> list[T]:
def __add__(self, other: list[T]) -> list[T]: # type: ignore[override]
return self._list + other
def __radd__(self, other: list[T]) -> list[T]:
return other + self._list
def __iadd__(self, other: Iterable[T]) -> LockedListProxy[T]:
def __iadd__(self, other: Iterable[T]) -> LockedListProxy[T]: # type: ignore[override]
with self._lock:
self._list += list(other)
return self
@@ -630,13 +633,13 @@ class LockedDictProxy(dict, Generic[T]): # type: ignore[type-arg]
def copy(self) -> dict[str, T]:
return self._dict.copy()
def __or__(self, other: dict[str, T]) -> dict[str, T]:
def __or__(self, other: dict[str, T]) -> dict[str, T]: # type: ignore[override]
return self._dict | other
def __ror__(self, other: dict[str, T]) -> dict[str, T]:
def __ror__(self, other: dict[str, T]) -> dict[str, T]: # type: ignore[override]
return other | self._dict
def __ior__(self, other: dict[str, T]) -> LockedDictProxy[T]:
def __ior__(self, other: dict[str, T]) -> LockedDictProxy[T]: # type: ignore[override]
with self._lock:
self._dict |= other
return self
@@ -822,10 +825,8 @@ class Flow(Generic[T], metaclass=FlowMeta):
name: str | None = None
tracing: bool | None = None
stream: bool = False
memory: Any = (
None # Memory | MemoryScope | MemorySlice | None; auto-created if not set
)
input_provider: Any = None # InputProvider | None; per-flow override for self.ask()
memory: Memory | MemoryScope | MemorySlice | None = None
input_provider: InputProvider | None = None
def __class_getitem__(cls: type[Flow[T]], item: type[T]) -> type[Flow[T]]:
class _FlowGeneric(cls): # type: ignore
@@ -904,8 +905,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
# Internal flows (RecallFlow, EncodingFlow) set _skip_auto_memory
# to avoid creating a wasteful standalone Memory instance.
if self.memory is None and not getattr(self, "_skip_auto_memory", False):
from crewai.memory.unified_memory import Memory
self.memory = Memory()
# Register all flow-related methods
@@ -951,10 +950,16 @@ class Flow(Generic[T], metaclass=FlowMeta):
Raises:
ValueError: If no memory is configured for this flow.
TypeError: If batch remember is attempted on a MemoryScope or MemorySlice.
"""
if self.memory is None:
raise ValueError("No memory configured for this flow")
if isinstance(content, list):
if not isinstance(self.memory, Memory):
raise TypeError(
"Batch remember requires a Memory instance, "
f"got {type(self.memory).__name__}"
)
return self.memory.remember_many(content, **kwargs)
return self.memory.remember(content, **kwargs)
@@ -2725,7 +2730,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
# ── User Input (self.ask) ────────────────────────────────────────
def _resolve_input_provider(self) -> Any:
def _resolve_input_provider(self) -> InputProvider:
"""Resolve the input provider using the priority chain.
Resolution order:
@@ -3148,12 +3153,19 @@ class Flow(Generic[T], metaclass=FlowMeta):
else:
logger.warning(message)
def plot(self, filename: str = "crewai_flow.html", show: bool = True) -> str:
def plot(
self,
filename: str = "crewai_flow.html",
show: bool = True,
output_dir: str | None = None,
) -> str:
"""Create interactive HTML visualization of Flow structure.
Args:
filename: Output HTML filename (default: "crewai_flow.html").
show: Whether to open in browser (default: True).
output_dir: Directory to save generated files. Defaults to the
current working directory.
Returns:
Absolute path to generated HTML file.
@@ -3166,7 +3178,9 @@ class Flow(Generic[T], metaclass=FlowMeta):
),
)
structure = build_flow_structure(self)
return render_interactive(structure, filename=filename, show=show)
return render_interactive(
structure, filename=filename, show=show, output_dir=output_dir
)
@staticmethod
def _show_tracing_disabled_message() -> None:

View File

@@ -6,7 +6,7 @@ customize Flow behavior at runtime.
from __future__ import annotations
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING
if TYPE_CHECKING:
@@ -32,17 +32,17 @@ class FlowConfig:
self._input_provider: InputProvider | None = None
@property
def hitl_provider(self) -> Any:
def hitl_provider(self) -> HumanFeedbackProvider | None:
"""Get the configured HITL provider."""
return self._hitl_provider
@hitl_provider.setter
def hitl_provider(self, provider: Any) -> None:
def hitl_provider(self, provider: HumanFeedbackProvider | None) -> None:
"""Set the HITL provider."""
self._hitl_provider = provider
@property
def input_provider(self) -> Any:
def input_provider(self) -> InputProvider | None:
"""Get the configured input provider for ``Flow.ask()``.
Returns:
@@ -52,7 +52,7 @@ class FlowConfig:
return self._input_provider
@input_provider.setter
def input_provider(self, provider: Any) -> None:
def input_provider(self, provider: InputProvider | None) -> None:
"""Set the input provider for ``Flow.ask()``.
Args:

View File

@@ -2,7 +2,6 @@
import json
from pathlib import Path
import tempfile
from typing import Any, ClassVar
import webbrowser
@@ -205,20 +204,24 @@ def render_interactive(
dag: FlowStructure,
filename: str = "flow_dag.html",
show: bool = True,
output_dir: str | None = None,
) -> str:
"""Create interactive HTML visualization of Flow structure.
Generates three output files in a temporary directory: HTML template,
CSS stylesheet, and JavaScript. Optionally opens the visualization in
default browser.
Generates three output files: HTML template, CSS stylesheet, and
JavaScript. Files are saved to the specified output directory, or the
current working directory when *output_dir* is ``None``. Optionally
opens the visualization in the default browser.
Args:
dag: FlowStructure to visualize.
filename: Output HTML filename (basename only, no path).
show: Whether to open in browser.
output_dir: Directory to save generated files. Defaults to the
current working directory (``os.getcwd()``).
Returns:
Absolute path to generated HTML file in temporary directory.
Absolute path to generated HTML file.
"""
node_positions = calculate_node_positions(dag)
@@ -403,12 +406,13 @@ def render_interactive(
extensions=[CSSExtension, JSExtension],
)
temp_dir = Path(tempfile.mkdtemp(prefix="crewai_flow_"))
output_path = temp_dir / Path(filename).name
dest_dir = Path(output_dir) if output_dir else Path.cwd()
dest_dir.mkdir(parents=True, exist_ok=True)
output_path = dest_dir / Path(filename).name
css_filename = output_path.stem + "_style.css"
css_output_path = temp_dir / css_filename
css_output_path = dest_dir / css_filename
js_filename = output_path.stem + "_script.js"
js_output_path = temp_dir / js_filename
js_output_path = dest_dir / js_filename
css_file = template_dir / "style.css"
css_content = css_file.read_text(encoding="utf-8")

View File

@@ -22,7 +22,6 @@ from crewai.events.types.memory_events import (
)
from crewai.llms.base_llm import BaseLLM
from crewai.memory.analyze import extract_memories_from_content
from crewai.memory.recall_flow import RecallFlow
from crewai.memory.storage.backend import StorageBackend
from crewai.memory.types import (
MemoryConfig,
@@ -620,6 +619,8 @@ class Memory(BaseModel):
)
results.sort(key=lambda m: m.score, reverse=True)
else:
from crewai.memory.recall_flow import RecallFlow
flow = RecallFlow(
storage=self._storage,
llm=self._llm,

View File

@@ -67,6 +67,7 @@ except ImportError:
return []
from crewai.types.callback import SerializableCallable
from crewai.utilities.guardrail import (
process_guardrail,
)
@@ -124,7 +125,7 @@ class Task(BaseModel):
description="Configuration for the agent",
default=None,
)
callback: Any | None = Field(
callback: SerializableCallable | None = Field(
description="Callback to be executed after the task is completed.", default=None
)
agent: BaseAgent | None = Field(

View File

@@ -0,0 +1,152 @@
"""Serializable callback type for Pydantic models.
Provides a ``SerializableCallable`` type alias that enables full JSON
round-tripping of callback fields, e.g. ``"builtins.print"`` ↔ ``print``.
Lambdas and closures serialize to a dotted path but cannot be deserialized
back — use module-level named functions for checkpointable callbacks.
"""
from __future__ import annotations
from collections.abc import Callable
import importlib
import inspect
import os
from typing import Annotated, Any
import warnings
from pydantic import BeforeValidator, WithJsonSchema
from pydantic.functional_serializers import PlainSerializer
def _is_non_roundtrippable(fn: object) -> bool:
"""Return ``True`` if *fn* cannot survive a serialize/deserialize round-trip.
Built-in functions, plain module-level functions, and classes produce
dotted paths that :func:`_resolve_dotted_path` can reliably resolve.
Bound methods, ``functools.partial`` objects, callable class instances,
lambdas, and closures all fail or silently change semantics during
round-tripping.
Args:
fn: The object to check.
Returns:
``True`` if *fn* would not round-trip through JSON serialization.
"""
if inspect.isbuiltin(fn) or inspect.isclass(fn):
return False
if inspect.isfunction(fn):
qualname = getattr(fn, "__qualname__", "")
return qualname.endswith("<lambda>") or "<locals>" in qualname
return True
def string_to_callable(value: Any) -> Callable[..., Any]:
"""Convert a dotted path string to the callable it references.
If *value* is already callable it is returned as-is, with a warning if
it cannot survive JSON round-tripping. Otherwise, it is treated as
``"module.qualname"`` and resolved via :func:`_resolve_dotted_path`.
Args:
value: A callable or a dotted-path string e.g. ``"builtins.print"``.
Returns:
The resolved callable.
Raises:
ValueError: If *value* is not callable or a resolvable dotted-path string.
"""
if callable(value):
if _is_non_roundtrippable(value):
warnings.warn(
f"{type(value).__name__} callbacks cannot be serialized "
"and will prevent checkpointing. "
"Use a module-level named function instead.",
UserWarning,
stacklevel=2,
)
return value # type: ignore[no-any-return]
if not isinstance(value, str):
raise ValueError(
f"Expected a callable or dotted-path string, got {type(value).__name__}"
)
if "." not in value:
raise ValueError(
f"Invalid callback path {value!r}: expected 'module.name' format"
)
if not os.environ.get("CREWAI_DESERIALIZE_CALLBACKS"):
raise ValueError(
f"Refusing to resolve callback path {value!r}: "
"set CREWAI_DESERIALIZE_CALLBACKS=1 to allow. "
"Only enable this for trusted checkpoint data."
)
return _resolve_dotted_path(value)
def _resolve_dotted_path(path: str) -> Callable[..., Any]:
"""Import a module and walk attribute lookups to resolve a dotted path.
Handles multi-level qualified names like ``"module.ClassName.method"``
by trying progressively shorter module paths and resolving the remainder
as chained attribute lookups.
Args:
path: A dotted string e.g. ``"builtins.print"`` or
``"mymodule.MyClass.my_method"``.
Returns:
The resolved callable.
Raises:
ValueError: If no valid module can be imported from the path.
"""
parts = path.split(".")
# Try importing progressively shorter prefixes as the module.
for i in range(len(parts), 0, -1):
module_path = ".".join(parts[:i])
try:
obj: Any = importlib.import_module(module_path)
except (ImportError, TypeError, ValueError):
continue
# Walk the remaining attribute chain.
try:
for attr in parts[i:]:
obj = getattr(obj, attr)
except AttributeError:
continue
if callable(obj):
return obj # type: ignore[no-any-return]
raise ValueError(f"Cannot resolve callback {path!r}")
def callable_to_string(fn: Callable[..., Any]) -> str:
"""Serialize a callable to its dotted-path string representation.
Uses ``fn.__module__`` and ``fn.__qualname__`` to produce a string such
as ``"builtins.print"``. Lambdas and closures produce paths that contain
``<locals>`` and cannot be round-tripped via :func:`string_to_callable`.
Args:
fn: The callable to serialize.
Returns:
A dotted string of the form ``"module.qualname"``.
"""
module = getattr(fn, "__module__", None)
qualname = getattr(fn, "__qualname__", None)
if module is None or qualname is None:
raise ValueError(
f"Cannot serialize {fn!r}: missing __module__ or __qualname__. "
"Use a module-level named function for checkpointable callbacks."
)
return f"{module}.{qualname}"
SerializableCallable = Annotated[
Callable[..., Any],
BeforeValidator(string_to_callable),
PlainSerializer(callable_to_string, return_type=str, when_used="json"),
WithJsonSchema({"type": "string"}),
]

View File

@@ -1,7 +1,5 @@
"""Handles planning and coordination of crew tasks."""
from __future__ import annotations
import logging
from pydantic import BaseModel, Field
@@ -9,7 +7,6 @@ from pydantic import BaseModel, Field
from crewai.agent import Agent
from crewai.llms.base_llm import BaseLLM
from crewai.task import Task
from crewai.tasks.task_output import TaskOutput
logger = logging.getLogger(__name__)
@@ -137,115 +134,6 @@ class CrewPlanner:
logger.warning("Error accessing agent knowledge sources")
return []
def _handle_crew_replanning(
self,
completed_tasks: list[Task],
completed_outputs: list[TaskOutput],
remaining_tasks: list[Task],
deviation_reason: str,
) -> PlannerTaskPydanticOutput:
"""Generate revised plans for remaining tasks after a deviation is detected.
This method is called when a ``ReplanningEvaluator`` determines that a
completed task's result deviates significantly from the original plan.
It creates a new plan that accounts for the actual results so far.
Args:
completed_tasks: Tasks that have already been executed.
completed_outputs: Outputs produced by the completed tasks.
remaining_tasks: Tasks that still need to be executed.
deviation_reason: Explanation of why replanning was triggered.
Returns:
A PlannerTaskPydanticOutput with revised plans for the remaining tasks.
Raises:
ValueError: If the replanning output cannot be obtained.
"""
planning_agent = self._create_planning_agent()
completed_summary = self._create_completed_tasks_summary(
completed_tasks, completed_outputs
)
remaining_summary = self._create_tasks_summary_for(remaining_tasks)
replan_task = Task(
description=(
"The crew's execution plan needs to be revised because a task result "
"deviated from the original plan's assumptions.\n\n"
f"## Reason for Replanning\n{deviation_reason}\n\n"
f"## Completed Tasks and Their Results\n{completed_summary}\n\n"
f"## Remaining Tasks That Need New Plans\n{remaining_summary}\n\n"
"Create revised step-by-step plans for the remaining tasks ONLY, "
"taking into account what has actually been accomplished so far "
"and the deviation from the original plan. The plans should adapt "
"to the real situation rather than following the now-outdated assumptions."
),
expected_output=(
"Step by step revised plan for each remaining task, "
"adapted to the actual results so far."
),
agent=planning_agent,
output_pydantic=PlannerTaskPydanticOutput,
)
result = replan_task.execute_sync()
if isinstance(result.pydantic, PlannerTaskPydanticOutput):
return result.pydantic
raise ValueError("Failed to get the Replanning output")
@staticmethod
def _create_completed_tasks_summary(
tasks: list[Task], outputs: list[TaskOutput]
) -> str:
"""Create a summary of completed tasks and their actual outputs.
Args:
tasks: The completed tasks.
outputs: The outputs from those tasks.
Returns:
A formatted string summarizing completed tasks and results.
"""
summaries = []
for idx, (task, output) in enumerate(
zip(tasks, outputs, strict=False), start=1
):
agent_role = task.agent.role if task.agent else "None"
summaries.append(
f"Task {idx} (Agent: {agent_role}):\n"
f" Description: {task.description}\n"
f" Expected Output: {task.expected_output}\n"
f" Actual Result: {output.raw}"
)
return "\n\n".join(summaries) if summaries else "No completed tasks."
@staticmethod
def _create_tasks_summary_for(tasks: list[Task]) -> str:
"""Create a summary of a subset of tasks (used for remaining tasks).
Args:
tasks: The tasks to summarize.
Returns:
A formatted string summarizing the tasks.
"""
summaries = []
for idx, task in enumerate(tasks, start=1):
agent_role = task.agent.role if task.agent else "None"
agent_goal = task.agent.goal if task.agent else "None"
summaries.append(
f"Task Number {idx}:\n"
f' "task_description": {task.description}\n'
f' "task_expected_output": {task.expected_output}\n'
f' "agent": {agent_role}\n'
f' "agent_goal": {agent_goal}\n'
f' "task_tools": {task.tools}\n'
f' "agent_tools": {task.agent.tools if task.agent else "None"}'
)
return "\n\n".join(summaries) if summaries else "No remaining tasks."
def _create_tasks_summary(self) -> str:
"""Creates a summary of all tasks.

View File

@@ -1,162 +0,0 @@
"""Evaluates whether task results deviate from the original plan and triggers replanning."""
from __future__ import annotations
import logging
from pydantic import BaseModel, Field
from crewai.agent import Agent
from crewai.llms.base_llm import BaseLLM
from crewai.task import Task
from crewai.tasks.task_output import TaskOutput
logger = logging.getLogger(__name__)
class ReplanDecision(BaseModel):
"""Structured decision on whether replanning is needed.
Attributes:
should_replan: Whether the crew should generate a new plan for remaining tasks.
reason: Explanation of why replanning is or is not needed.
affected_task_numbers: 1-indexed task numbers of remaining tasks most affected
by the deviation. Empty if should_replan is False.
"""
should_replan: bool = Field(
description="Whether the task result deviates significantly from the plan, requiring replanning.",
)
reason: str = Field(
description="A concise explanation of why replanning is or is not needed.",
)
affected_task_numbers: list[int] = Field(
default_factory=list,
description="1-indexed task numbers of remaining tasks most affected by the deviation.",
)
class ReplanningEvaluator:
"""Evaluates task outputs to decide if the crew's plan needs to be revised.
After each task completes, this evaluator makes a lightweight LLM call to
determine whether the result deviates significantly from what the plan
assumed. If so, it signals that replanning should occur.
Example usage::
evaluator = ReplanningEvaluator(llm="gpt-4o-mini")
decision = evaluator.evaluate(
completed_task=task,
task_output=output,
original_plan="Step 1: ...",
remaining_tasks=remaining,
)
if decision.should_replan:
# trigger replanning for remaining tasks
...
Args:
llm: The language model to use for evaluation. Accepts a string model
name or a BaseLLM instance. Defaults to ``"gpt-4o-mini"``.
"""
def __init__(self, llm: str | BaseLLM | None = None) -> None:
self.llm = llm or "gpt-4o-mini"
def evaluate(
self,
completed_task: Task,
task_output: TaskOutput,
original_plan: str,
remaining_tasks: list[Task],
) -> ReplanDecision:
"""Evaluate whether a task result deviates from the plan.
Args:
completed_task: The task that just finished executing.
task_output: The output produced by the completed task.
original_plan: The plan text that was appended to the task description.
remaining_tasks: Tasks that have not yet been executed.
Returns:
A ReplanDecision indicating whether replanning is needed.
"""
evaluation_agent = Agent(
role="Replanning Evaluator",
goal=(
"Evaluate whether the result of a completed task deviates "
"significantly from what the plan assumed, and determine if "
"the remaining tasks need a revised plan."
),
backstory=(
"You are an expert at evaluating execution plans. You compare "
"actual task results against planned expectations and identify "
"deviations that would make the remaining plan ineffective."
),
llm=self.llm,
)
remaining_summary = self._summarize_remaining_tasks(remaining_tasks)
evaluation_task = Task(
description=(
"Evaluate whether the following task result deviates significantly "
"from what the original plan assumed.\n\n"
f"## Completed Task\n"
f"Description: {completed_task.description}\n"
f"Expected Output: {completed_task.expected_output}\n\n"
f"## Plan for this Task\n{original_plan}\n\n"
f"## Actual Result\n{task_output.raw}\n\n"
f"## Remaining Tasks\n{remaining_summary}\n\n"
"Based on the above, decide if the actual result deviates enough "
"from the plan's assumptions that the remaining tasks need replanning. "
"Minor differences or format changes do NOT require replanning. "
"Only significant deviations (missing data, errors, completely "
"different approach needed, infeasible assumptions) should trigger replanning."
),
expected_output=(
"A structured decision indicating whether replanning is needed, "
"with a reason and the affected task numbers."
),
agent=evaluation_agent,
output_pydantic=ReplanDecision,
)
result = evaluation_task.execute_sync()
if isinstance(result.pydantic, ReplanDecision):
return result.pydantic
logger.warning(
"Failed to get structured ReplanDecision, defaulting to no replan"
)
return ReplanDecision(
should_replan=False,
reason="Failed to evaluate task output against plan.",
affected_task_numbers=[],
)
@staticmethod
def _summarize_remaining_tasks(remaining_tasks: list[Task]) -> str:
"""Create a summary of remaining tasks for evaluation context.
Args:
remaining_tasks: Tasks that have not yet been executed.
Returns:
A formatted string summarizing the remaining tasks.
"""
if not remaining_tasks:
return "No remaining tasks."
summaries = []
for idx, task in enumerate(remaining_tasks, start=1):
agent_role = task.agent.role if task.agent else "Unassigned"
summaries.append(
f"Task {idx}: {task.description}\n"
f" Expected Output: {task.expected_output}\n"
f" Agent: {agent_role}"
)
return "\n".join(summaries)

View File

@@ -1690,7 +1690,10 @@ def test_agent_with_knowledge_sources_works_with_copy():
with patch(
"crewai.knowledge.storage.knowledge_storage.KnowledgeStorage"
) as mock_knowledge_storage:
from crewai.knowledge.storage.base_knowledge_storage import BaseKnowledgeStorage
mock_knowledge_storage_instance = mock_knowledge_storage.return_value
mock_knowledge_storage_instance.__class__ = BaseKnowledgeStorage
agent.knowledge_storage = mock_knowledge_storage_instance
agent_copy = agent.copy()

View File

@@ -0,0 +1,237 @@
"""Tests for crewai.types.callback — SerializableCallable round-tripping."""
from __future__ import annotations
import functools
import os
from typing import Any
import pytest
from pydantic import BaseModel, ValidationError
from crewai.types.callback import (
SerializableCallable,
_is_non_roundtrippable,
_resolve_dotted_path,
callable_to_string,
string_to_callable,
)
# ── Helpers ──────────────────────────────────────────────────────────
def module_level_function() -> str:
"""Plain module-level function that should round-trip."""
return "hello"
class _CallableInstance:
"""Callable class instance — non-roundtrippable."""
def __call__(self) -> str:
return "instance"
class _HasMethod:
def method(self) -> str:
return "method"
class _Model(BaseModel):
cb: SerializableCallable | None = None
# ── _is_non_roundtrippable ───────────────────────────────────────────
class TestIsNonRoundtrippable:
def test_builtin_is_roundtrippable(self) -> None:
assert _is_non_roundtrippable(print) is False
assert _is_non_roundtrippable(len) is False
def test_class_is_roundtrippable(self) -> None:
assert _is_non_roundtrippable(dict) is False
assert _is_non_roundtrippable(_CallableInstance) is False
def test_module_level_function_is_roundtrippable(self) -> None:
assert _is_non_roundtrippable(module_level_function) is False
def test_lambda_is_non_roundtrippable(self) -> None:
assert _is_non_roundtrippable(lambda: None) is True
def test_closure_is_non_roundtrippable(self) -> None:
x = 1
def closure() -> int:
return x
assert _is_non_roundtrippable(closure) is True
def test_bound_method_is_non_roundtrippable(self) -> None:
assert _is_non_roundtrippable(_HasMethod().method) is True
def test_partial_is_non_roundtrippable(self) -> None:
assert _is_non_roundtrippable(functools.partial(print, "hi")) is True
def test_callable_instance_is_non_roundtrippable(self) -> None:
assert _is_non_roundtrippable(_CallableInstance()) is True
# ── callable_to_string ───────────────────────────────────────────────
class TestCallableToString:
def test_module_level_function(self) -> None:
result = callable_to_string(module_level_function)
assert result == f"{__name__}.module_level_function"
def test_class(self) -> None:
result = callable_to_string(dict)
assert result == "builtins.dict"
def test_builtin(self) -> None:
result = callable_to_string(print)
assert result == "builtins.print"
def test_lambda_produces_locals_path(self) -> None:
fn = lambda: None # noqa: E731
result = callable_to_string(fn)
assert "<lambda>" in result
def test_missing_qualname_raises(self) -> None:
obj = type("NoQual", (), {"__module__": "test"})()
obj.__qualname__ = None # type: ignore[assignment]
with pytest.raises(ValueError, match="missing __module__ or __qualname__"):
callable_to_string(obj)
def test_missing_module_raises(self) -> None:
# Create an object where getattr(obj, "__module__", None) returns None
ns: dict[str, Any] = {"__qualname__": "x", "__module__": None}
obj = type("NoMod", (), ns)()
with pytest.raises(ValueError, match="missing __module__"):
callable_to_string(obj)
# ── string_to_callable ───────────────────────────────────────────────
class TestStringToCallable:
def test_callable_passthrough(self) -> None:
assert string_to_callable(print) is print
def test_roundtrippable_callable_no_warning(self, recwarn: pytest.WarningsChecker) -> None:
string_to_callable(module_level_function)
our_warnings = [
w for w in recwarn if "cannot be serialized" in str(w.message)
]
assert our_warnings == []
def test_non_roundtrippable_warns(self) -> None:
with pytest.warns(UserWarning, match="cannot be serialized"):
string_to_callable(functools.partial(print))
def test_non_callable_non_string_raises(self) -> None:
with pytest.raises(ValueError, match="Expected a callable"):
string_to_callable(42)
def test_string_without_dot_raises(self) -> None:
with pytest.raises(ValueError, match="expected 'module.name' format"):
string_to_callable("nodots")
def test_string_refused_without_env_var(self, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.delenv("CREWAI_DESERIALIZE_CALLBACKS", raising=False)
with pytest.raises(ValueError, match="Refusing to resolve"):
string_to_callable("builtins.print")
def test_string_resolves_with_env_var(self, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("CREWAI_DESERIALIZE_CALLBACKS", "1")
result = string_to_callable("builtins.print")
assert result is print
def test_string_resolves_multi_level_path(self, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("CREWAI_DESERIALIZE_CALLBACKS", "1")
result = string_to_callable("os.path.join")
assert result is os.path.join
def test_unresolvable_path_raises(self, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("CREWAI_DESERIALIZE_CALLBACKS", "1")
with pytest.raises(ValueError, match="Cannot resolve"):
string_to_callable("nonexistent.module.func")
# ── _resolve_dotted_path ─────────────────────────────────────────────
class TestResolveDottedPath:
def test_builtin(self) -> None:
assert _resolve_dotted_path("builtins.print") is print
def test_nested_module_attribute(self) -> None:
assert _resolve_dotted_path("os.path.join") is os.path.join
def test_class_on_module(self) -> None:
from collections import OrderedDict
assert _resolve_dotted_path("collections.OrderedDict") is OrderedDict
def test_nonexistent_raises(self) -> None:
with pytest.raises(ValueError, match="Cannot resolve"):
_resolve_dotted_path("no.such.module.func")
def test_non_callable_attribute_skipped(self) -> None:
# os.sep is a string, not callable — should not resolve
with pytest.raises(ValueError, match="Cannot resolve"):
_resolve_dotted_path("os.sep")
# ── Pydantic integration round-trip ──────────────────────────────────
class TestSerializableCallableRoundTrip:
def test_json_serialize_module_function(self) -> None:
m = _Model(cb=module_level_function)
data = m.model_dump(mode="json")
assert data["cb"] == f"{__name__}.module_level_function"
def test_json_round_trip(self, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("CREWAI_DESERIALIZE_CALLBACKS", "1")
m = _Model(cb=print)
json_str = m.model_dump_json()
restored = _Model.model_validate_json(json_str)
assert restored.cb is print
def test_json_round_trip_class(self, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("CREWAI_DESERIALIZE_CALLBACKS", "1")
m = _Model(cb=dict)
json_str = m.model_dump_json()
restored = _Model.model_validate_json(json_str)
assert restored.cb is dict
def test_python_mode_preserves_callable(self) -> None:
m = _Model(cb=module_level_function)
data = m.model_dump(mode="python")
assert data["cb"] is module_level_function
def test_none_field(self) -> None:
m = _Model(cb=None)
assert m.cb is None
data = m.model_dump(mode="json")
assert data["cb"] is None
def test_validation_error_for_int(self) -> None:
with pytest.raises(ValidationError):
_Model(cb=42) # type: ignore[arg-type]
def test_deserialization_refused_without_env(
self, monkeypatch: pytest.MonkeyPatch
) -> None:
monkeypatch.delenv("CREWAI_DESERIALIZE_CALLBACKS", raising=False)
with pytest.raises(ValidationError, match="Refusing to resolve"):
_Model.model_validate({"cb": "builtins.print"})
def test_json_schema_is_string(self) -> None:
schema = _Model.model_json_schema()
cb_schema = schema["properties"]["cb"]
# anyOf for Optional: one string, one null
types = {item.get("type") for item in cb_schema.get("anyOf", [cb_schema])}
assert "string" in types

View File

@@ -333,9 +333,9 @@ def test_visualization_plot_method():
"""Test that flow.plot() method works."""
flow = SimpleFlow()
html_file = flow.plot("test_plot.html", show=False)
assert os.path.exists(html_file)
with tempfile.TemporaryDirectory() as tmp_dir:
html_file = flow.plot("test_plot.html", show=False, output_dir=tmp_dir)
assert os.path.exists(html_file)
def test_router_paths_to_string_conditions():
@@ -667,4 +667,94 @@ def test_no_warning_for_properly_typed_router(caplog):
# No warnings should be logged
warning_messages = [r.message for r in caplog.records if r.levelno >= logging.WARNING]
assert not any("Could not determine return paths" in msg for msg in warning_messages)
assert not any("Found listeners waiting for triggers" in msg for msg in warning_messages)
assert not any("Found listeners waiting for triggers" in msg for msg in warning_messages)
def test_plot_saves_to_current_working_directory():
"""Test that plot() saves the HTML file to the current working directory by default.
Regression test for https://github.com/crewAIInc/crewAI/issues/4991
"""
flow = SimpleFlow()
with tempfile.TemporaryDirectory() as tmp_dir:
original_cwd = os.getcwd()
try:
os.chdir(tmp_dir)
html_file = flow.plot("test_cwd_plot.html", show=False)
# The returned path must live inside the CWD, not a hidden temp dir
assert Path(html_file).parent == Path(tmp_dir)
assert os.path.exists(html_file)
assert html_file == str(Path(tmp_dir) / "test_cwd_plot.html")
finally:
os.chdir(original_cwd)
def test_plot_saves_to_explicit_output_dir():
"""Test that plot() saves files to a user-specified output directory."""
flow = SimpleFlow()
with tempfile.TemporaryDirectory() as output_dir:
html_file = flow.plot(
"custom_output.html", show=False, output_dir=output_dir
)
assert Path(html_file).parent == Path(output_dir)
assert os.path.exists(html_file)
# CSS and JS companion files should also be in the same directory
html_path = Path(html_file)
css_file = html_path.parent / f"{html_path.stem}_style.css"
js_file = html_path.parent / f"{html_path.stem}_script.js"
assert css_file.exists()
assert js_file.exists()
def test_render_interactive_saves_to_cwd_by_default():
"""Test that render_interactive() writes to CWD when output_dir is None.
Regression test for https://github.com/crewAIInc/crewAI/issues/4991
"""
flow = SimpleFlow()
structure = build_flow_structure(flow)
with tempfile.TemporaryDirectory() as tmp_dir:
original_cwd = os.getcwd()
try:
os.chdir(tmp_dir)
html_file = visualize_flow_structure(
structure, "cwd_test.html", show=False
)
assert Path(html_file).parent == Path(tmp_dir)
assert os.path.exists(html_file)
finally:
os.chdir(original_cwd)
def test_render_interactive_saves_to_specified_output_dir():
"""Test that render_interactive() writes to the specified output_dir."""
flow = SimpleFlow()
structure = build_flow_structure(flow)
with tempfile.TemporaryDirectory() as output_dir:
html_file = visualize_flow_structure(
structure, "output_dir_test.html", show=False, output_dir=output_dir
)
assert Path(html_file).parent == Path(output_dir)
assert os.path.exists(html_file)
with open(html_file, "r", encoding="utf-8") as f:
html_content = f.read()
assert "<!DOCTYPE html>" in html_content
def test_plot_returned_path_is_absolute():
"""Test that the path returned by plot() is always absolute."""
flow = SimpleFlow()
with tempfile.TemporaryDirectory() as tmp_dir:
html_file = flow.plot("abs_path_test.html", show=False, output_dir=tmp_dir)
assert os.path.isabs(html_file)

View File

@@ -6,6 +6,7 @@ from crewai.agent import Agent
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.crew import Crew
from crewai.llm import LLM
from crewai.llms.base_llm import BaseLLM
from crewai.project import (
CrewBase,
after_kickoff,
@@ -371,9 +372,12 @@ def test_internal_crew_with_mcp():
mock_adapter = Mock()
mock_adapter.tools = ToolCollection([simple_tool, another_simple_tool])
mock_llm = Mock()
mock_llm.__class__ = BaseLLM
with (
patch("crewai_tools.MCPServerAdapter", return_value=mock_adapter) as adapter_mock,
patch("crewai.llm.LLM.__new__", return_value=Mock()),
patch("crewai.llm.LLM.__new__", return_value=mock_llm),
):
crew = InternalCrewWithMCP()
assert crew.reporting_analyst().tools == [simple_tool, another_simple_tool]

View File

@@ -1,572 +0,0 @@
"""Tests for the adaptive replanning feature (issue #4983).
Covers:
- ReplanningEvaluator: evaluation of task results against plans
- CrewPlanner._handle_crew_replanning: generating revised plans
- Crew integration: replan_on_failure / max_replans fields and the
_maybe_replan hook in both sync and async execution paths
- Backwards compatibility: existing crews are unaffected by default
"""
from unittest.mock import MagicMock, patch, call
import pytest
from crewai.agent import Agent
from crewai.crew import Crew
from crewai.task import Task
from crewai.tasks.task_output import TaskOutput
from crewai.utilities.planning_handler import (
CrewPlanner,
PlannerTaskPydanticOutput,
PlanPerTask,
)
from crewai.utilities.replanning_evaluator import (
ReplanDecision,
ReplanningEvaluator,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_agents(n: int = 3) -> list[Agent]:
return [
Agent(role=f"Agent {i}", goal=f"Goal {i}", backstory=f"Backstory {i}")
for i in range(1, n + 1)
]
def _make_tasks(agents: list[Agent]) -> list[Task]:
return [
Task(
description=f"Task {i} description",
expected_output=f"Output {i}",
agent=agents[i - 1],
)
for i in range(1, len(agents) + 1)
]
def _task_output(raw: str = "result", agent: str = "agent") -> TaskOutput:
return TaskOutput(description="desc", agent=agent, raw=raw)
# ---------------------------------------------------------------------------
# ReplanDecision model tests
# ---------------------------------------------------------------------------
class TestReplanDecision:
def test_replan_decision_defaults(self):
decision = ReplanDecision(
should_replan=False,
reason="All good",
)
assert decision.should_replan is False
assert decision.reason == "All good"
assert decision.affected_task_numbers == []
def test_replan_decision_with_affected_tasks(self):
decision = ReplanDecision(
should_replan=True,
reason="Data missing",
affected_task_numbers=[2, 3],
)
assert decision.should_replan is True
assert decision.affected_task_numbers == [2, 3]
# ---------------------------------------------------------------------------
# ReplanningEvaluator tests
# ---------------------------------------------------------------------------
class TestReplanningEvaluator:
def test_default_llm(self):
evaluator = ReplanningEvaluator()
assert evaluator.llm == "gpt-4o-mini"
def test_custom_llm(self):
evaluator = ReplanningEvaluator(llm="gpt-4o")
assert evaluator.llm == "gpt-4o"
def test_evaluate_returns_replan_decision_when_deviation_detected(self):
"""When the LLM says replanning is needed, evaluate() returns that."""
evaluator = ReplanningEvaluator()
agents = _make_agents(3)
tasks = _make_tasks(agents)
output = _task_output("completely unexpected result")
expected_decision = ReplanDecision(
should_replan=True,
reason="Result deviates from plan assumptions",
affected_task_numbers=[2, 3],
)
with patch.object(Task, "execute_sync") as mock_exec:
mock_exec.return_value = TaskOutput(
description="eval",
agent="evaluator",
pydantic=expected_decision,
)
decision = evaluator.evaluate(
completed_task=tasks[0],
task_output=output,
original_plan="Step 1: do X\nStep 2: do Y",
remaining_tasks=tasks[1:],
)
assert decision.should_replan is True
assert decision.reason == "Result deviates from plan assumptions"
mock_exec.assert_called_once()
def test_evaluate_returns_no_replan_when_result_matches(self):
"""When the result matches the plan, no replanning is needed."""
evaluator = ReplanningEvaluator()
agents = _make_agents(2)
tasks = _make_tasks(agents)
output = _task_output("expected result matching plan")
expected_decision = ReplanDecision(
should_replan=False,
reason="Result aligns with plan",
affected_task_numbers=[],
)
with patch.object(Task, "execute_sync") as mock_exec:
mock_exec.return_value = TaskOutput(
description="eval",
agent="evaluator",
pydantic=expected_decision,
)
decision = evaluator.evaluate(
completed_task=tasks[0],
task_output=output,
original_plan="Step 1: gather data",
remaining_tasks=tasks[1:],
)
assert decision.should_replan is False
def test_evaluate_fallback_on_bad_output(self):
"""When the LLM returns non-structured output, fallback to no-replan."""
evaluator = ReplanningEvaluator()
agents = _make_agents(2)
tasks = _make_tasks(agents)
output = _task_output("some result")
with patch.object(Task, "execute_sync") as mock_exec:
mock_exec.return_value = TaskOutput(
description="eval",
agent="evaluator",
pydantic=None, # no structured output
)
decision = evaluator.evaluate(
completed_task=tasks[0],
task_output=output,
original_plan="Step 1: do stuff",
remaining_tasks=tasks[1:],
)
assert decision.should_replan is False
assert "Failed to evaluate" in decision.reason
def test_summarize_remaining_tasks_empty(self):
result = ReplanningEvaluator._summarize_remaining_tasks([])
assert result == "No remaining tasks."
def test_summarize_remaining_tasks_with_tasks(self):
agents = _make_agents(2)
tasks = _make_tasks(agents)
result = ReplanningEvaluator._summarize_remaining_tasks(tasks)
assert "Task 1" in result
assert "Task 2" in result
assert "Agent 1" in result
# ---------------------------------------------------------------------------
# CrewPlanner._handle_crew_replanning tests
# ---------------------------------------------------------------------------
class TestCrewPlannerReplanning:
@pytest.fixture
def planner(self):
agents = _make_agents(3)
tasks = _make_tasks(agents)
return CrewPlanner(tasks=tasks, planning_agent_llm=None)
def test_handle_crew_replanning_returns_revised_plans(self, planner):
agents = _make_agents(3)
tasks = _make_tasks(agents)
outputs = [_task_output("result 1")]
revised_plans = PlannerTaskPydanticOutput(
list_of_plans_per_task=[
PlanPerTask(task_number=1, task="Task 2", plan="Revised plan for task 2"),
PlanPerTask(task_number=2, task="Task 3", plan="Revised plan for task 3"),
]
)
with patch.object(Task, "execute_sync") as mock_exec:
mock_exec.return_value = TaskOutput(
description="replan",
agent="planner",
pydantic=revised_plans,
)
result = planner._handle_crew_replanning(
completed_tasks=[tasks[0]],
completed_outputs=outputs,
remaining_tasks=tasks[1:],
deviation_reason="Task 1 returned unexpected data",
)
assert isinstance(result, PlannerTaskPydanticOutput)
assert len(result.list_of_plans_per_task) == 2
mock_exec.assert_called_once()
def test_handle_crew_replanning_raises_on_bad_output(self, planner):
agents = _make_agents(3)
tasks = _make_tasks(agents)
outputs = [_task_output("result 1")]
with patch.object(Task, "execute_sync") as mock_exec:
mock_exec.return_value = TaskOutput(
description="replan",
agent="planner",
pydantic=None,
)
with pytest.raises(ValueError, match="Failed to get the Replanning output"):
planner._handle_crew_replanning(
completed_tasks=[tasks[0]],
completed_outputs=outputs,
remaining_tasks=tasks[1:],
deviation_reason="deviation",
)
def test_completed_tasks_summary(self):
agents = _make_agents(2)
tasks = _make_tasks(agents)
outputs = [_task_output("result A"), _task_output("result B")]
summary = CrewPlanner._create_completed_tasks_summary(tasks, outputs)
assert "result A" in summary
assert "result B" in summary
assert "Agent 1" in summary
def test_completed_tasks_summary_empty(self):
summary = CrewPlanner._create_completed_tasks_summary([], [])
assert summary == "No completed tasks."
def test_tasks_summary_for_remaining(self):
agents = _make_agents(2)
tasks = _make_tasks(agents)
summary = CrewPlanner._create_tasks_summary_for(tasks)
assert "Task Number 1" in summary
assert "Task Number 2" in summary
def test_tasks_summary_for_empty(self):
summary = CrewPlanner._create_tasks_summary_for([])
assert summary == "No remaining tasks."
# ---------------------------------------------------------------------------
# Crew field tests (backwards compatibility)
# ---------------------------------------------------------------------------
class TestCrewReplanningFields:
def test_replan_on_failure_defaults_to_false(self):
agents = _make_agents(1)
tasks = _make_tasks(agents)
crew = Crew(agents=agents, tasks=tasks)
assert crew.replan_on_failure is False
def test_max_replans_defaults_to_3(self):
agents = _make_agents(1)
tasks = _make_tasks(agents)
crew = Crew(agents=agents, tasks=tasks)
assert crew.max_replans == 3
def test_replan_on_failure_can_be_set(self):
agents = _make_agents(1)
tasks = _make_tasks(agents)
crew = Crew(agents=agents, tasks=tasks, replan_on_failure=True)
assert crew.replan_on_failure is True
def test_max_replans_can_be_set(self):
agents = _make_agents(1)
tasks = _make_tasks(agents)
crew = Crew(agents=agents, tasks=tasks, max_replans=5)
assert crew.max_replans == 5
def test_max_replans_cannot_be_negative(self):
agents = _make_agents(1)
tasks = _make_tasks(agents)
with pytest.raises(ValueError):
Crew(agents=agents, tasks=tasks, max_replans=-1)
# ---------------------------------------------------------------------------
# Crew._maybe_replan integration tests
# ---------------------------------------------------------------------------
class TestCrewMaybeReplan:
def _setup_crew_with_planning(self, n_agents: int = 3) -> tuple[Crew, list[Agent], list[Task]]:
agents = _make_agents(n_agents)
tasks = _make_tasks(agents)
crew = Crew(
agents=agents,
tasks=tasks,
planning=True,
replan_on_failure=True,
max_replans=3,
)
# Simulate planning having been called
crew._original_task_descriptions = [t.description for t in tasks]
crew._replan_count = 0
# Append a fake plan to each task
for task in tasks:
task.description += " [PLAN]"
return crew, agents, tasks
def test_maybe_replan_skips_when_planning_disabled(self):
agents = _make_agents(2)
tasks = _make_tasks(agents)
crew = Crew(agents=agents, tasks=tasks, planning=False, replan_on_failure=True)
crew._original_task_descriptions = [t.description for t in tasks]
# Should not call evaluator at all
with patch.object(ReplanningEvaluator, "evaluate") as mock_eval:
crew._maybe_replan(tasks[0], _task_output(), 0, tasks, [_task_output()])
mock_eval.assert_not_called()
def test_maybe_replan_skips_when_replan_on_failure_disabled(self):
agents = _make_agents(2)
tasks = _make_tasks(agents)
crew = Crew(agents=agents, tasks=tasks, planning=True, replan_on_failure=False)
crew._original_task_descriptions = [t.description for t in tasks]
with patch.object(ReplanningEvaluator, "evaluate") as mock_eval:
crew._maybe_replan(tasks[0], _task_output(), 0, tasks, [_task_output()])
mock_eval.assert_not_called()
def test_maybe_replan_skips_on_last_task(self):
crew, agents, tasks = self._setup_crew_with_planning(2)
with patch.object(ReplanningEvaluator, "evaluate") as mock_eval:
crew._maybe_replan(tasks[1], _task_output(), 1, tasks, [_task_output()])
mock_eval.assert_not_called()
def test_maybe_replan_skips_when_max_replans_reached(self):
crew, agents, tasks = self._setup_crew_with_planning(3)
crew._replan_count = 3 # already at max
with patch.object(ReplanningEvaluator, "evaluate") as mock_eval:
crew._maybe_replan(tasks[0], _task_output(), 0, tasks, [_task_output()])
mock_eval.assert_not_called()
def test_maybe_replan_skips_when_no_plan_text(self):
agents = _make_agents(3)
tasks = _make_tasks(agents)
crew = Crew(
agents=agents, tasks=tasks,
planning=True, replan_on_failure=True,
)
crew._original_task_descriptions = [t.description for t in tasks]
crew._replan_count = 0
# No plan appended — descriptions are unchanged
with patch.object(ReplanningEvaluator, "evaluate") as mock_eval:
crew._maybe_replan(tasks[0], _task_output(), 0, tasks, [_task_output()])
mock_eval.assert_not_called()
def test_maybe_replan_no_replan_when_evaluator_says_no(self):
crew, agents, tasks = self._setup_crew_with_planning(3)
original_desc_1 = tasks[1].description
original_desc_2 = tasks[2].description
no_replan = ReplanDecision(
should_replan=False,
reason="Result is fine",
affected_task_numbers=[],
)
with patch.object(ReplanningEvaluator, "evaluate", return_value=no_replan):
with patch.object(CrewPlanner, "_handle_crew_replanning") as mock_handler:
crew._maybe_replan(tasks[0], _task_output(), 0, tasks, [_task_output()])
mock_handler.assert_not_called()
assert crew._replan_count == 0
# Task descriptions should be unchanged
assert tasks[1].description == original_desc_1
assert tasks[2].description == original_desc_2
def test_maybe_replan_triggers_replanning_and_updates_tasks(self):
crew, agents, tasks = self._setup_crew_with_planning(3)
deviation_decision = ReplanDecision(
should_replan=True,
reason="Task 1 returned error data",
affected_task_numbers=[2, 3],
)
revised_plans = PlannerTaskPydanticOutput(
list_of_plans_per_task=[
PlanPerTask(task_number=1, task="Task 2", plan=" [REVISED PLAN 2]"),
PlanPerTask(task_number=2, task="Task 3", plan=" [REVISED PLAN 3]"),
]
)
with patch.object(ReplanningEvaluator, "evaluate", return_value=deviation_decision):
with patch.object(
CrewPlanner, "_handle_crew_replanning", return_value=revised_plans
):
crew._maybe_replan(
tasks[0], _task_output("bad result"), 0, tasks, [_task_output("bad result")]
)
assert crew._replan_count == 1
# Remaining tasks should have the revised plans
assert "[REVISED PLAN 2]" in tasks[1].description
assert "[REVISED PLAN 3]" in tasks[2].description
# Old plan should be gone (original desc restored + new plan)
assert tasks[1].description.count("[PLAN]") == 0
assert tasks[2].description.count("[PLAN]") == 0
def test_maybe_replan_increments_replan_count_each_time(self):
crew, agents, tasks = self._setup_crew_with_planning(3)
deviation = ReplanDecision(
should_replan=True,
reason="deviation",
affected_task_numbers=[2],
)
revised = PlannerTaskPydanticOutput(
list_of_plans_per_task=[
PlanPerTask(task_number=1, task="T2", plan=" [NEW PLAN]"),
]
)
with patch.object(ReplanningEvaluator, "evaluate", return_value=deviation):
with patch.object(CrewPlanner, "_handle_crew_replanning", return_value=revised):
crew._maybe_replan(tasks[0], _task_output(), 0, tasks, [_task_output()])
assert crew._replan_count == 1
# Simulate second task completing with deviation
crew._maybe_replan(tasks[1], _task_output(), 1, tasks, [_task_output()] * 2)
assert crew._replan_count == 2
def test_maybe_replan_stops_at_max_replans(self):
crew, agents, tasks = self._setup_crew_with_planning(3)
crew.max_replans = 1
deviation = ReplanDecision(
should_replan=True,
reason="deviation",
affected_task_numbers=[2],
)
revised = PlannerTaskPydanticOutput(
list_of_plans_per_task=[
PlanPerTask(task_number=1, task="T2", plan=" [NEW]"),
]
)
with patch.object(ReplanningEvaluator, "evaluate", return_value=deviation) as mock_eval:
with patch.object(CrewPlanner, "_handle_crew_replanning", return_value=revised):
crew._maybe_replan(tasks[0], _task_output(), 0, tasks, [_task_output()])
assert crew._replan_count == 1
# Second call should be skipped because max_replans=1
crew._maybe_replan(tasks[1], _task_output(), 1, tasks, [_task_output()] * 2)
assert crew._replan_count == 1 # unchanged
# evaluate was only called once (the second time was short-circuited)
assert mock_eval.call_count == 1
# ---------------------------------------------------------------------------
# Crew._handle_crew_planning stores original descriptions
# ---------------------------------------------------------------------------
class TestCrewPlanningStoresOriginals:
def test_handle_crew_planning_stores_original_descriptions(self):
agents = _make_agents(2)
tasks = _make_tasks(agents)
crew = Crew(agents=agents, tasks=tasks, planning=True)
original_descs = [t.description for t in tasks]
plans = [
PlanPerTask(task_number=1, task="T1", plan=" [PLAN 1]"),
PlanPerTask(task_number=2, task="T2", plan=" [PLAN 2]"),
]
mock_result = PlannerTaskPydanticOutput(list_of_plans_per_task=plans)
with patch.object(CrewPlanner, "_handle_crew_planning", return_value=mock_result):
crew._handle_crew_planning()
assert crew._original_task_descriptions == original_descs
assert crew._replan_count == 0
def test_handle_crew_planning_resets_replan_count(self):
agents = _make_agents(1)
tasks = _make_tasks(agents)
crew = Crew(agents=agents, tasks=tasks, planning=True)
crew._replan_count = 5 # leftover from previous execution
plans = [PlanPerTask(task_number=1, task="T1", plan=" [PLAN]")]
mock_result = PlannerTaskPydanticOutput(list_of_plans_per_task=plans)
with patch.object(CrewPlanner, "_handle_crew_planning", return_value=mock_result):
crew._handle_crew_planning()
assert crew._replan_count == 0
# ---------------------------------------------------------------------------
# Sync execution integration test
# ---------------------------------------------------------------------------
class TestExecuteTasksWithReplanning:
def test_execute_tasks_calls_maybe_replan_for_sync_tasks(self):
"""Verify that _maybe_replan is called after each sync task execution."""
agents = _make_agents(2)
tasks = _make_tasks(agents)
crew = Crew(
agents=agents,
tasks=tasks,
planning=True,
replan_on_failure=True,
)
crew._original_task_descriptions = [t.description for t in tasks]
crew._replan_count = 0
output = _task_output("result")
with patch.object(Task, "execute_sync", return_value=output):
with patch.object(Crew, "_maybe_replan") as mock_replan:
crew._execute_tasks(tasks)
# Should be called once for each sync task
assert mock_replan.call_count == 2
def test_execute_tasks_without_replanning_is_unaffected(self):
"""Verify existing behaviour when replan_on_failure is False."""
agents = _make_agents(2)
tasks = _make_tasks(agents)
crew = Crew(
agents=agents,
tasks=tasks,
planning=False,
replan_on_failure=False,
)
output = _task_output("result")
with patch.object(Task, "execute_sync", return_value=output):
with patch.object(Crew, "_maybe_replan") as mock_replan:
result = crew._execute_tasks(tasks)
# _maybe_replan is still called but returns immediately
assert mock_replan.call_count == 2
assert result is not None