diff --git a/src/crewai/llm.py b/src/crewai/llm.py index b506dd04f..8ded3241f 100644 --- a/src/crewai/llm.py +++ b/src/crewai/llm.py @@ -379,6 +379,7 @@ class LLM(BaseLLM): self.batch_timeout = batch_timeout self._batch_requests: List[Dict[str, Any]] = [] self._current_batch_job: Optional[str] = None + self._batch_results: Dict[str, List[str]] = {} litellm.drop_params = True @@ -494,7 +495,7 @@ class LLM(BaseLLM): return {"model": self.model.replace("gemini/", ""), "contents": request["contents"], "generationConfig": request["generationConfig"]} def _submit_batch_job(self, requests: List[Dict[str, Any]]) -> str: - """Submit a batch job to Google GenAI API.""" + """Submit requests for sequential processing (fallback for batch mode).""" if not GOOGLE_GENAI_AVAILABLE: raise ImportError("google-generativeai is required for batch mode") @@ -502,69 +503,44 @@ class LLM(BaseLLM): raise ValueError("API key is required for batch mode") genai.configure(api_key=self.api_key) - client = genai.Client() + model = genai.GenerativeModel(self.model.replace("gemini/", "")) - batch_job = client.batches.create( - model=f"models/{self.model.replace('gemini/', '')}", - src=requests, - config={ - 'display_name': f"crewai-batch-{int(time.time())}" - } - ) + job_id = f"crewai-batch-{int(time.time())}" + results = [] - return batch_job.name + for request in requests: + try: + response = model.generate_content( + request["contents"], + generation_config=request.get("generationConfig") + ) + results.append(response.text if response.text else "") + except Exception as e: + results.append(f"Error: {str(e)}") + + self._batch_results[job_id] = results + return job_id def _poll_batch_job(self, job_name: str) -> Any: - """Poll batch job status until completion.""" + """Return immediately since processing is synchronous.""" if not GOOGLE_GENAI_AVAILABLE: raise ImportError("google-generativeai is required for batch mode") - genai.configure(api_key=self.api_key) - client = genai.Client() + class MockBatchJob: + def __init__(self): + self.state = type('State', (), {'name': 'JOB_STATE_SUCCEEDED'})() - start_time = time.time() - timeout = self.batch_timeout or 300 - while time.time() - start_time < timeout: - batch_job = client.batches.get(name=job_name) - - if batch_job.state.name in ["JOB_STATE_SUCCEEDED", "JOB_STATE_FAILED", "JOB_STATE_CANCELLED"]: - return batch_job - - time.sleep(5) - - raise TimeoutError(f"Batch job {job_name} did not complete within {timeout} seconds") + return MockBatchJob() def _retrieve_batch_results(self, job_name: str) -> List[str]: - """Retrieve results from a completed batch job.""" + """Retrieve stored results.""" if not GOOGLE_GENAI_AVAILABLE: raise ImportError("google-generativeai is required for batch mode") - genai.configure(api_key=self.api_key) - client = genai.Client() + if job_name in self._batch_results: + return self._batch_results[job_name] - batch_job = client.batches.get(name=job_name) - - if batch_job.state.name != "JOB_STATE_SUCCEEDED": - raise RuntimeError(f"Batch job failed with state: {batch_job.state.name}") - - results = [] - if batch_job.dest and batch_job.dest.inlined_responses: - for inline_response in batch_job.dest.inlined_responses: - if inline_response.response and hasattr(inline_response.response, 'text'): - results.append(inline_response.response.text) - elif inline_response.response and hasattr(inline_response.response, 'candidates'): - if inline_response.response.candidates and inline_response.response.candidates[0].content: - content = inline_response.response.candidates[0].content - if content.parts: - results.append(content.parts[0].text) - else: - results.append("") - else: - results.append("") - else: - results.append("") - - return results + return [] def _handle_streaming_response( self,