diff --git a/src/crewai/crew.py b/src/crewai/crew.py index 46443d0a5..490ac34bc 100644 --- a/src/crewai/crew.py +++ b/src/crewai/crew.py @@ -7,7 +7,7 @@ import warnings from concurrent.futures import Future from copy import copy as shallow_copy from hashlib import md5 -from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union +from typing import Any, Callable, Dict, List, Optional, Sequence, Set, Tuple, Union from pydantic import ( UUID4, @@ -709,22 +709,21 @@ class Crew(BaseModel): self._task_output_handler.reset() return results - def kickoff_for_each_parallel(self, inputs: List[Dict[str, Any]], max_workers: Optional[int] = None) -> List[CrewOutput]: + def kickoff_for_each_parallel(self, inputs: Sequence[Dict[str, Any]], max_workers: Optional[int] = None) -> List[CrewOutput]: """Executes the Crew's workflow for each input in the list in parallel using ThreadPoolExecutor. Args: - inputs: List of input dictionaries to be passed to each crew execution. + inputs: Sequence of input dictionaries to be passed to each crew execution. max_workers: Maximum number of worker threads to use. If None, uses the default ThreadPoolExecutor behavior (typically min(32, os.cpu_count() + 4)). Returns: List of CrewOutput objects, one for each input. """ - import concurrent.futures - from concurrent.futures import ThreadPoolExecutor + from concurrent.futures import ThreadPoolExecutor, as_completed - if not isinstance(inputs, list): - raise TypeError("Inputs must be a list of dictionaries.") + if not isinstance(inputs, (list, tuple)): + raise TypeError(f"Inputs must be a list of dictionaries. Received {type(inputs).__name__} instead.") if not inputs: return [] @@ -738,26 +737,30 @@ class Crew(BaseModel): crew_copies = [self.copy() for _ in inputs] # Execute each crew in parallel - with ThreadPoolExecutor(max_workers=max_workers) as executor: - # Submit all tasks to the executor - future_to_crew = { - executor.submit(crew_copies[i].kickoff, inputs[i]): i - for i in range(len(inputs)) - } - - # Process results as they complete - for future in concurrent.futures.as_completed(future_to_crew): - crew_index = future_to_crew[future] - try: - output = future.result() - results.append(output) - - # Aggregate usage metrics - if crew_copies[crew_index].usage_metrics: - total_usage_metrics.add_usage_metrics(crew_copies[crew_index].usage_metrics) - except Exception as exc: - # Re-raise the exception to maintain consistent behavior with kickoff_for_each - raise exc + try: + with ThreadPoolExecutor(max_workers=max_workers) as executor: + # Submit all tasks to the executor + future_to_crew = { + executor.submit(crew_copies[i].kickoff, inputs[i]): i + for i in range(len(inputs)) + } + + # Process results as they complete + for future in as_completed(future_to_crew): + crew_index = future_to_crew[future] + try: + output = future.result() + results.append(output) + + # Aggregate usage metrics + if crew_copies[crew_index].usage_metrics: + total_usage_metrics.add_usage_metrics(crew_copies[crew_index].usage_metrics) + except Exception as exc: + # Re-raise the exception to maintain consistent behavior with kickoff_for_each + raise exc + finally: + # Clean up to assist garbage collection + crew_copies.clear() # Set the aggregated metrics on the parent crew self.usage_metrics = total_usage_metrics