[ENG-227] Record task execution timestamps (#1844)

This commit is contained in:
Gui Vieira
2025-01-03 15:12:13 -03:00
committed by GitHub
parent d1e2430aac
commit 30bd79390a
4 changed files with 252 additions and 50 deletions

View File

@@ -127,38 +127,41 @@ class Task(BaseModel):
processed_by_agents: Set[str] = Field(default_factory=set)
guardrail: Optional[Callable[[TaskOutput], Tuple[bool, Any]]] = Field(
default=None,
description="Function to validate task output before proceeding to next task"
description="Function to validate task output before proceeding to next task",
)
max_retries: int = Field(
default=3,
description="Maximum number of retries when guardrail fails"
default=3, description="Maximum number of retries when guardrail fails"
)
retry_count: int = Field(
default=0,
description="Current number of retries"
retry_count: int = Field(default=0, description="Current number of retries")
start_time: Optional[datetime.datetime] = Field(
default=None, description="Start time of the task execution"
)
end_time: Optional[datetime.datetime] = Field(
default=None, description="End time of the task execution"
)
@field_validator("guardrail")
@classmethod
def validate_guardrail_function(cls, v: Optional[Callable]) -> Optional[Callable]:
"""Validate that the guardrail function has the correct signature and behavior.
While type hints provide static checking, this validator ensures runtime safety by:
1. Verifying the function accepts exactly one parameter (the TaskOutput)
2. Checking return type annotations match Tuple[bool, Any] if present
3. Providing clear, immediate error messages for debugging
This runtime validation is crucial because:
- Type hints are optional and can be ignored at runtime
- Function signatures need immediate validation before task execution
- Clear error messages help users debug guardrail implementation issues
Args:
v: The guardrail function to validate
Returns:
The validated guardrail function
Raises:
ValueError: If the function signature is invalid or return annotation
doesn't match Tuple[bool, Any]
@@ -171,8 +174,13 @@ class Task(BaseModel):
# Check return annotation if present, but don't require it
return_annotation = sig.return_annotation
if return_annotation != inspect.Signature.empty:
if not (return_annotation == Tuple[bool, Any] or str(return_annotation) == 'Tuple[bool, Any]'):
raise ValueError("If return type is annotated, it must be Tuple[bool, Any]")
if not (
return_annotation == Tuple[bool, Any]
or str(return_annotation) == "Tuple[bool, Any]"
):
raise ValueError(
"If return type is annotated, it must be Tuple[bool, Any]"
)
return v
_telemetry: Telemetry = PrivateAttr(default_factory=Telemetry)
@@ -181,7 +189,6 @@ class Task(BaseModel):
_original_expected_output: Optional[str] = PrivateAttr(default=None)
_original_output_file: Optional[str] = PrivateAttr(default=None)
_thread: Optional[threading.Thread] = PrivateAttr(default=None)
_execution_time: Optional[float] = PrivateAttr(default=None)
@model_validator(mode="before")
@classmethod
@@ -206,25 +213,19 @@ class Task(BaseModel):
"may_not_set_field", "This field is not to be set by the user.", {}
)
def _set_start_execution_time(self) -> float:
return datetime.datetime.now().timestamp()
def _set_end_execution_time(self, start_time: float) -> None:
self._execution_time = datetime.datetime.now().timestamp() - start_time
@field_validator("output_file")
@classmethod
def output_file_validation(cls, value: Optional[str]) -> Optional[str]:
"""Validate the output file path.
Args:
value: The output file path to validate. Can be None or a string.
If the path contains template variables (e.g. {var}), leading slashes are preserved.
For regular paths, leading slashes are stripped.
Returns:
The validated and potentially modified path, or None if no path was provided.
Raises:
ValueError: If the path contains invalid characters, path traversal attempts,
or other security concerns.
@@ -234,18 +235,24 @@ class Task(BaseModel):
# Basic security checks
if ".." in value:
raise ValueError("Path traversal attempts are not allowed in output_file paths")
raise ValueError(
"Path traversal attempts are not allowed in output_file paths"
)
# Check for shell expansion first
if value.startswith('~') or value.startswith('$'):
raise ValueError("Shell expansion characters are not allowed in output_file paths")
if value.startswith("~") or value.startswith("$"):
raise ValueError(
"Shell expansion characters are not allowed in output_file paths"
)
# Then check other shell special characters
if any(char in value for char in ['|', '>', '<', '&', ';']):
raise ValueError("Shell special characters are not allowed in output_file paths")
if any(char in value for char in ["|", ">", "<", "&", ";"]):
raise ValueError(
"Shell special characters are not allowed in output_file paths"
)
# Don't strip leading slash if it's a template path with variables
if "{" in value or "}" in value:
if "{" in value or "}" in value:
# Validate template variable format
template_vars = [part.split("}")[0] for part in value.split("{")[1:]]
for var in template_vars:
@@ -302,6 +309,12 @@ class Task(BaseModel):
return md5("|".join(source).encode(), usedforsecurity=False).hexdigest()
@property
def execution_duration(self) -> float | None:
if not self.start_time or not self.end_time:
return None
return (self.end_time - self.start_time).total_seconds()
def execute_async(
self,
agent: BaseAgent | None = None,
@@ -342,7 +355,7 @@ class Task(BaseModel):
f"The task '{self.description}' has no agent assigned, therefore it can't be executed directly and should be executed in a Crew using a specific process that support that, like hierarchical."
)
start_time = self._set_start_execution_time()
self.start_time = datetime.datetime.now()
self._execution_span = self._telemetry.task_started(crew=agent.crew, task=self)
self.prompt_context = context
@@ -392,15 +405,17 @@ class Task(BaseModel):
if isinstance(guardrail_result.result, str):
task_output.raw = guardrail_result.result
pydantic_output, json_output = self._export_output(guardrail_result.result)
pydantic_output, json_output = self._export_output(
guardrail_result.result
)
task_output.pydantic = pydantic_output
task_output.json_dict = json_output
elif isinstance(guardrail_result.result, TaskOutput):
task_output = guardrail_result.result
self.output = task_output
self.end_time = datetime.datetime.now()
self._set_end_execution_time(start_time)
if self.callback:
self.callback(self.output)
@@ -412,7 +427,9 @@ class Task(BaseModel):
content = (
json_output
if json_output
else pydantic_output.model_dump_json() if pydantic_output else result
else pydantic_output.model_dump_json()
if pydantic_output
else result
)
self._save_file(content)
@@ -434,11 +451,11 @@ class Task(BaseModel):
def interpolate_inputs(self, inputs: Dict[str, Union[str, int, float]]) -> None:
"""Interpolate inputs into the task description, expected output, and output file path.
Args:
inputs: Dictionary mapping template variables to their values.
Supported value types are strings, integers, and floats.
Raises:
ValueError: If a required template variable is missing from inputs.
"""
@@ -455,7 +472,9 @@ class Task(BaseModel):
try:
self.description = self._original_description.format(**inputs)
except KeyError as e:
raise ValueError(f"Missing required template variable '{e.args[0]}' in description") from e
raise ValueError(
f"Missing required template variable '{e.args[0]}' in description"
) from e
except ValueError as e:
raise ValueError(f"Error interpolating description: {str(e)}") from e
@@ -472,22 +491,26 @@ class Task(BaseModel):
input_string=self._original_output_file, inputs=inputs
)
except (KeyError, ValueError) as e:
raise ValueError(f"Error interpolating output_file path: {str(e)}") from e
raise ValueError(
f"Error interpolating output_file path: {str(e)}"
) from e
def interpolate_only(self, input_string: Optional[str], inputs: Dict[str, Union[str, int, float]]) -> str:
def interpolate_only(
self, input_string: Optional[str], inputs: Dict[str, Union[str, int, float]]
) -> str:
"""Interpolate placeholders (e.g., {key}) in a string while leaving JSON untouched.
Args:
input_string: The string containing template variables to interpolate.
Can be None or empty, in which case an empty string is returned.
inputs: Dictionary mapping template variables to their values.
Supported value types are strings, integers, and floats.
If input_string is empty or has no placeholders, inputs can be empty.
Returns:
The interpolated string with all template variables replaced with their values.
Empty string if input_string is None or empty.
Raises:
ValueError: If a required template variable is missing from inputs.
KeyError: If a template variable is not found in the inputs dictionary.
@@ -497,13 +520,17 @@ class Task(BaseModel):
if "{" not in input_string and "}" not in input_string:
return input_string
if not inputs:
raise ValueError("Inputs dictionary cannot be empty when interpolating variables")
raise ValueError(
"Inputs dictionary cannot be empty when interpolating variables"
)
try:
# Validate input types
for key, value in inputs.items():
if not isinstance(value, (str, int, float)):
raise ValueError(f"Value for key '{key}' must be a string, integer, or float, got {type(value).__name__}")
raise ValueError(
f"Value for key '{key}' must be a string, integer, or float, got {type(value).__name__}"
)
escaped_string = input_string.replace("{", "{{").replace("}", "}}")
@@ -512,7 +539,9 @@ class Task(BaseModel):
return escaped_string.format(**inputs)
except KeyError as e:
raise KeyError(f"Template variable '{e.args[0]}' not found in inputs dictionary") from e
raise KeyError(
f"Template variable '{e.args[0]}' not found in inputs dictionary"
) from e
except ValueError as e:
raise ValueError(f"Error during string interpolation: {str(e)}") from e
@@ -597,10 +626,10 @@ class Task(BaseModel):
def _save_file(self, result: Any) -> None:
"""Save task output to a file.
Args:
result: The result to save to the file. Can be a dict or any stringifiable object.
Raises:
ValueError: If output_file is not set
RuntimeError: If there is an error writing to the file
@@ -618,6 +647,7 @@ class Task(BaseModel):
with resolved_path.open("w", encoding="utf-8") as file:
if isinstance(result, dict):
import json
json.dump(result, file, ensure_ascii=False, indent=2)
else:
file.write(str(result))

View File

@@ -180,12 +180,12 @@ class CrewEvaluator:
self._test_result_span = self._telemetry.individual_test_result_span(
self.crew,
evaluation_result.pydantic.quality,
current_task._execution_time,
current_task.execution_duration,
self.openai_model_name,
)
self.tasks_scores[self.iteration].append(evaluation_result.pydantic.quality)
self.run_execution_times[self.iteration].append(
current_task._execution_time
current_task.execution_duration
)
else:
raise ValueError("Evaluation result is not in the expected format")