mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-10 00:28:31 +00:00
feat: add function to clear and add task traces
This commit is contained in:
@@ -1,9 +1,8 @@
|
|||||||
import inspect
|
import inspect
|
||||||
import os
|
import os
|
||||||
from datetime import UTC, datetime
|
from datetime import UTC, datetime
|
||||||
from enum import Enum
|
|
||||||
from functools import wraps
|
from functools import wraps
|
||||||
from typing import Any, Awaitable, Callable, Dict, Generator, List, Optional
|
from typing import Any, Awaitable, Callable, Dict, List, Optional
|
||||||
from uuid import uuid4
|
from uuid import uuid4
|
||||||
|
|
||||||
from crewai.traces.context import TraceContext
|
from crewai.traces.context import TraceContext
|
||||||
@@ -34,7 +33,7 @@ class UnifiedTraceController:
|
|||||||
"CREWAI_DEPLOYMENT_INSTANCE_ID", ""
|
"CREWAI_DEPLOYMENT_INSTANCE_ID", ""
|
||||||
),
|
),
|
||||||
parent_trace_id: Optional[str] = None,
|
parent_trace_id: Optional[str] = None,
|
||||||
agent_role: Optional[str] = None,
|
agent_role: Optional[str] = "unknown",
|
||||||
task_name: Optional[str] = None,
|
task_name: Optional[str] = None,
|
||||||
task_description: Optional[str] = None,
|
task_description: Optional[str] = None,
|
||||||
task_id: Optional[str] = None,
|
task_id: Optional[str] = None,
|
||||||
@@ -79,6 +78,30 @@ class UnifiedTraceController:
|
|||||||
self.flow_step = flow_step
|
self.flow_step = flow_step
|
||||||
self.status: str = "running"
|
self.status: str = "running"
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_task_traces(cls, task_id: str) -> List["UnifiedTraceController"]:
|
||||||
|
"""Get all traces for a specific task.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
task_id: The ID of the task to get traces for
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of traces associated with the task
|
||||||
|
"""
|
||||||
|
if not hasattr(cls, "_task_traces"):
|
||||||
|
cls._task_traces = {}
|
||||||
|
return cls._task_traces.get(task_id, [])
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def clear_task_traces(cls, task_id: str) -> None:
|
||||||
|
"""Clear traces for a specific task.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
task_id: The ID of the task to clear traces for
|
||||||
|
"""
|
||||||
|
if hasattr(cls, "_task_traces") and task_id in cls._task_traces:
|
||||||
|
del cls._task_traces[task_id]
|
||||||
|
|
||||||
def _get_current_trace(self) -> "UnifiedTraceController":
|
def _get_current_trace(self) -> "UnifiedTraceController":
|
||||||
return TraceContext.get_current()
|
return TraceContext.get_current()
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user