feat: add InvokeCrewAIAutomationTool for external crew API integration (#430)

* feat: add InvokeCrewAIAutomationTool for external crew API integration

* feat: add InvokeCrewAIAutomationTool class for executing CrewAI tasks programmatically
This commit is contained in:
Mike Plachta
2025-08-27 10:42:19 -07:00
committed by GitHub
parent 992cd726c4
commit 6562587cba
4 changed files with 337 additions and 0 deletions

View File

@@ -36,6 +36,7 @@ from .tools import (
FirecrawlSearchTool, FirecrawlSearchTool,
GithubSearchTool, GithubSearchTool,
HyperbrowserLoadTool, HyperbrowserLoadTool,
InvokeCrewAIAutomationTool,
JSONSearchTool, JSONSearchTool,
LinkupSearchTool, LinkupSearchTool,
LlamaIndexTool, LlamaIndexTool,

View File

@@ -27,6 +27,7 @@ from .firecrawl_scrape_website_tool.firecrawl_scrape_website_tool import (
from .firecrawl_search_tool.firecrawl_search_tool import FirecrawlSearchTool from .firecrawl_search_tool.firecrawl_search_tool import FirecrawlSearchTool
from .github_search_tool.github_search_tool import GithubSearchTool from .github_search_tool.github_search_tool import GithubSearchTool
from .hyperbrowser_load_tool.hyperbrowser_load_tool import HyperbrowserLoadTool from .hyperbrowser_load_tool.hyperbrowser_load_tool import HyperbrowserLoadTool
from .invoke_crewai_automation_tool.invoke_crewai_automation_tool import InvokeCrewAIAutomationTool
from .json_search_tool.json_search_tool import JSONSearchTool from .json_search_tool.json_search_tool import JSONSearchTool
from .linkup.linkup_search_tool import LinkupSearchTool from .linkup.linkup_search_tool import LinkupSearchTool
from .llamaindex_tool.llamaindex_tool import LlamaIndexTool from .llamaindex_tool.llamaindex_tool import LlamaIndexTool

View File

@@ -0,0 +1,159 @@
# InvokeCrewAIAutomationTool
## Description
The InvokeCrewAIAutomationTool provides CrewAI Platform API integration with external crew services. This tool allows you to invoke and interact with CrewAI Platform automations from within your CrewAI agents, enabling seamless integration between different crew workflows.
## Features
- **Dynamic Input Schema**: Configure custom input parameters for different crew automations
- **Automatic Polling**: Automatically polls for task completion with configurable timeout
- **Bearer Token Authentication**: Secure API authentication using bearer tokens
- **Comprehensive Error Handling**: Robust error handling for API failures and timeouts
- **Flexible Configuration**: Support for both simple and complex crew automation workflows
## Installation
Install the required dependencies:
```shell
pip install 'crewai[tools]'
```
## Example
### Basic Usage
```python
from crewai_tools import InvokeCrewAIAutomationTool
# Basic crew automation tool
tool = InvokeCrewAIAutomationTool(
crew_api_url="https://data-analysis-crew-[...].crewai.com",
crew_bearer_token="your_bearer_token_here",
crew_name="Data Analysis Crew",
crew_description="Analyzes data and generates insights"
)
# Use the tool
result = tool.run()
```
### Advanced Usage with Custom Inputs
```python
from crewai_tools import InvokeCrewAIAutomationTool
from pydantic import Field
# Define custom input schema
custom_inputs = {
"year": Field(..., description="Year to retrieve the report for (integer)"),
"region": Field(default="global", description="Geographic region for analysis"),
"format": Field(default="summary", description="Report format (summary, detailed, raw)")
}
# Create tool with custom inputs
tool = InvokeCrewAIAutomationTool(
crew_api_url="https://state-of-ai-report-crew-[...].crewai.com",
crew_bearer_token="your_bearer_token_here",
crew_name="State of AI Report",
crew_description="Retrieves a comprehensive report on state of AI for a given year and region",
crew_inputs=custom_inputs,
max_polling_time=15 * 60 # 15 minutes timeout
)
# Use with custom parameters
result = tool.run(year=2024, region="north-america", format="detailed")
```
### Integration with CrewAI Agents
```python
from crewai import Agent, Task, Crew
from crewai_tools import InvokeCrewAIAutomationTool
# Create the automation tool
market_research_tool = InvokeCrewAIAutomationTool(
crew_api_url="https://market-research-automation-crew-[...].crewai.com",
crew_bearer_token="your_bearer_token_here",
crew_name="Market Research Automation",
crew_description="Conducts comprehensive market research analysis",
inputs={
"year": Field(..., description="Year to use for the market research"),
}
)
# Create an agent with the tool
research_agent = Agent(
role="Research Coordinator",
goal="Coordinate and execute market research tasks",
backstory="You are an expert at coordinating research tasks and leveraging automation tools.",
tools=[market_research_tool],
verbose=True
)
# Create and execute a task
research_task = Task(
description="Conduct market research on AI tools market for 2024",
agent=research_agent,
expected_output="Comprehensive market research report"
)
crew = Crew(
agents=[research_agent],
tasks=[research_task]
)
result = crew.kickoff()
```
## Arguments
### Required Parameters
- `crew_api_url` (str): Base URL of the CrewAI Platform automation API
- `crew_bearer_token` (str): Bearer token for API authentication
- `crew_name` (str): Name of the crew automation
- `crew_description` (str): Description of what the crew automation does
### Optional Parameters
- `max_polling_time` (int): Maximum time in seconds to wait for task completion (default: 600 seconds = 10 minutes)
- `crew_inputs` (dict): Dictionary defining custom input schema fields using Pydantic Field objects
## Custom Input Schema
When defining `crew_inputs`, use Pydantic Field objects to specify the input parameters. These have to be compatible with the crew automation you are invoking:
```python
from pydantic import Field
crew_inputs = {
"required_param": Field(..., description="This parameter is required"),
"optional_param": Field(default="default_value", description="This parameter is optional"),
"typed_param": Field(..., description="Integer parameter", ge=1, le=100) # With validation
}
```
## Error Handling
The tool provides comprehensive error handling for common scenarios:
- **API Connection Errors**: Network connectivity issues
- **Authentication Errors**: Invalid or expired bearer tokens
- **Timeout Errors**: Tasks that exceed the maximum polling time
- **Task Failures**: Crew automations that fail during execution
## API Endpoints
The tool interacts with two main API endpoints:
- `POST {crew_api_url}/kickoff`: Starts a new crew automation task
- `GET {crew_api_url}/status/{crew_id}`: Checks the status of a running task
## Notes
- The tool automatically polls the status endpoint every second until completion or timeout
- Successful tasks return the result directly, while failed tasks return error information
- The bearer token should be kept secure and not hardcoded in production environments
- Consider using environment variables for sensitive configuration like bearer tokens

View File

@@ -0,0 +1,176 @@
from crewai.tools import BaseTool
from pydantic import BaseModel, Field, create_model
from typing import Any, Type
import requests
import time
class InvokeCrewAIAutomationInput(BaseModel):
"""Input schema for InvokeCrewAIAutomationTool."""
prompt: str = Field(..., description="The prompt or query to send to the crew")
class InvokeCrewAIAutomationTool(BaseTool):
"""
A CrewAI tool for invoking external crew/flows APIs.
This tool provides CrewAI Platform API integration with external crew services, supporting:
- Dynamic input schema configuration
- Automatic polling for task completion
- Bearer token authentication
- Comprehensive error handling
Example:
Basic usage:
>>> tool = InvokeCrewAIAutomationTool(
... crew_api_url="https://api.example.com",
... crew_bearer_token="your_token",
... crew_name="My Crew",
... crew_description="Description of what the crew does"
... )
With custom inputs:
>>> custom_inputs = {
... "param1": Field(..., description="Description of param1"),
... "param2": Field(default="default_value", description="Description of param2")
... }
>>> tool = InvokeCrewAIAutomationTool(
... crew_api_url="https://api.example.com",
... crew_bearer_token="your_token",
... crew_name="My Crew",
... crew_description="Description of what the crew does",
... crew_inputs=custom_inputs
... )
Example:
>>> tools=[
... InvokeCrewAIAutomationTool(
... crew_api_url="https://canary-crew-[...].crewai.com",
... crew_bearer_token="[Your token: abcdef012345]",
... crew_name="State of AI Report",
... crew_description="Retrieves a report on state of AI for a given year.",
... crew_inputs={
... "year": Field(..., description="Year to retrieve the report for (integer)")
... }
... )
... ]
"""
name: str = "invoke_amp_automation"
description: str = "Invokes an CrewAI Platform Automation using API"
args_schema: Type[BaseModel] = InvokeCrewAIAutomationInput
crew_api_url: str
crew_bearer_token: str
max_polling_time: int = 10 * 60 # 10 minutes
def __init__(
self,
crew_api_url: str,
crew_bearer_token: str,
crew_name: str,
crew_description: str,
max_polling_time: int = 10 * 60,
crew_inputs: dict[str, Any] = None):
"""
Initialize the InvokeCrewAIAutomationTool.
Args:
crew_api_url: Base URL of the crew API service
crew_bearer_token: Bearer token for API authentication
crew_name: Name of the crew to invoke
crew_description: Description of the crew to invoke
max_polling_time: Maximum time in seconds to wait for task completion (default: 600 seconds = 10 minutes)
crew_inputs: Optional dictionary defining custom input schema fields
"""
# Create dynamic args_schema if custom inputs provided
if crew_inputs:
# Start with the base prompt field
fields = {}
# Add custom fields
for field_name, field_def in crew_inputs.items():
if isinstance(field_def, tuple):
fields[field_name] = field_def
else:
# Assume it's a Field object, extract type from annotation if available
fields[field_name] = (str, field_def)
# Create dynamic model
args_schema = create_model('DynamicInvokeCrewAIAutomationInput', **fields)
else:
args_schema = InvokeCrewAIAutomationInput
# Initialize the parent class with proper field values
super().__init__(
name=crew_name,
description=crew_description,
args_schema=args_schema,
crew_api_url=crew_api_url,
crew_bearer_token=crew_bearer_token,
max_polling_time=max_polling_time
)
def _kickoff_crew(self, inputs: dict[str, Any]) -> dict[str, Any]:
"""Start a new crew task
Args:
inputs: Dictionary containing the query and other input parameters
Returns:
Dictionary containing the crew task response. The response will contain the crew id which needs to be returned to check the status of the crew.
"""
response = requests.post(
f"{self.crew_api_url}/kickoff",
headers={
"Authorization": f"Bearer {self.crew_bearer_token}",
"Content-Type": "application/json",
},
json={"inputs": inputs},
)
response_json = response.json()
return response_json
def _get_crew_status(self, crew_id: str) -> dict[str, Any]:
"""Get the status of a crew task
Args:
crew_id: The ID of the crew task to check
Returns:
Dictionary containing the crew task status
"""
response = requests.get(
f"{self.crew_api_url}/status/{crew_id}",
headers={
"Authorization": f"Bearer {self.crew_bearer_token}",
"Content-Type": "application/json",
},
)
return response.json()
def _run(self, **kwargs) -> str:
"""Execute the crew invocation tool."""
if kwargs is None:
kwargs = {}
# Start the crew
response = self._kickoff_crew(inputs=kwargs)
if response.get("kickoff_id") is None:
return f"Error: Failed to kickoff crew. Response: {response}"
kickoff_id = response.get("kickoff_id")
# Poll for completion
for i in range(self.max_polling_time):
try:
status_response = self._get_crew_status(crew_id=kickoff_id)
if status_response.get("state", "").lower() == "success":
return status_response.get("result", "No result returned")
elif status_response.get("state", "").lower() == "failed":
return f"Error: Crew task failed. Response: {status_response}"
except Exception as e:
if i == self.max_polling_time - 1: # Last attempt
return f"Error: Failed to get crew status after {self.max_polling_time} attempts. Last error: {e}"
time.sleep(1)
return f"Error: Crew did not complete within {self.max_polling_time} seconds"