Files
crewAI/tests/utilities/test_file_handler.py
Greyson LaLonde 641c156c17
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
fix: address flaky tests (#3363)
fix: resolve flaky tests and race conditions in test suite

- Fix telemetry/event tests by patching class methods instead of instances
- Use unique temp files/directories to prevent CI race conditions
- Reset singleton state between tests
- Mock embedchain.Client.setup() to prevent JSON corruption
- Rename test files to test_*.py convention
- Move agent tests to tests/agents directory
- Fix repeated tool usage detection
- Remove database-dependent tools causing initialization errors
2025-08-20 13:34:09 -04:00

51 lines
1.5 KiB
Python

import os
import unittest
import uuid
import pytest
from crewai.utilities.file_handler import PickleHandler
class TestPickleHandler(unittest.TestCase):
def setUp(self):
# Use a unique file name for each test to avoid race conditions in parallel test execution
unique_id = str(uuid.uuid4())
self.file_name = f"test_data_{unique_id}.pkl"
self.file_path = os.path.join(os.getcwd(), self.file_name)
self.handler = PickleHandler(self.file_name)
def tearDown(self):
if os.path.exists(self.file_path):
os.remove(self.file_path)
def test_initialize_file(self):
assert os.path.exists(self.file_path) is False
self.handler.initialize_file()
assert os.path.exists(self.file_path) is True
assert os.path.getsize(self.file_path) >= 0
def test_save_and_load(self):
data = {"key": "value"}
self.handler.save(data)
loaded_data = self.handler.load()
assert loaded_data == data
def test_load_empty_file(self):
loaded_data = self.handler.load()
assert loaded_data == {}
def test_load_corrupted_file(self):
with open(self.file_path, "wb") as file:
file.write(b"corrupted data")
file.flush()
os.fsync(file.fileno()) # Ensure data is written to disk
with pytest.raises(Exception) as exc:
self.handler.load()
assert str(exc.value) == "pickle data was truncated"
assert "<class '_pickle.UnpicklingError'>" == str(exc.type)