Spaces:
Running
Running
import os | |
from typing import Optional, Dict, Any, List, Type | |
from crewai_tools import BaseTool | |
import requests | |
import importlib.util | |
from pydantic.v1 import BaseModel, Field,root_validator, ValidationError | |
import docker | |
import base64 | |
class FixedCustomFileWriteToolInputSchema(BaseModel): | |
content: str = Field(..., description="The content to write or append to the file") | |
mode: str = Field(..., description="Mode to open the file in, either 'w' or 'a'") | |
class CustomFileWriteToolInputSchema(FixedCustomFileWriteToolInputSchema): | |
content: str = Field(..., description="The content to write or append to the file") | |
mode: str = Field(..., description="Mode to open the file in, either 'w' or 'a'") | |
filename: str = Field(..., description="The name of the file to write to or append") | |
class CustomFileWriteTool(BaseTool): | |
name: str = "Write File" | |
description: str = "Tool to write or append to files" | |
args_schema = CustomFileWriteToolInputSchema | |
filename: Optional[str] = None | |
def __init__(self, base_folder: str, filename: Optional[str] = None, **kwargs): | |
super().__init__(**kwargs) | |
if filename is not None and len(filename) > 0: | |
self.args_schema = FixedCustomFileWriteToolInputSchema | |
self._base_folder = base_folder | |
self.filename = filename or None | |
self._ensure_base_folder_exists() | |
self._generate_description() | |
def _ensure_base_folder_exists(self): | |
os.makedirs(self._base_folder, exist_ok=True) | |
def _get_full_path(self, filename: Optional[str]) -> str: | |
if filename is None and self.filename is None: | |
raise ValueError("No filename specified and no default file set.") | |
chosen_file = filename or self.filename | |
full_path = os.path.abspath(os.path.join(self._base_folder, chosen_file)) | |
if not full_path.startswith(os.path.abspath(self._base_folder)): | |
raise ValueError("Access outside the base directory is not allowed.") #TODO: add validations for path traversal | |
return full_path | |
def _run(self, content: str, mode: str, filename: Optional[str] = None) -> Dict[str, Any]: | |
full_path = self._get_full_path(filename) | |
try: | |
with open(full_path, 'a' if mode == 'a' else 'w') as file: | |
file.write(content) | |
return { | |
"status": "success", | |
"message": f"Content successfully {'appended to' if mode == 'a' else 'written to'} {full_path}" | |
} | |
except Exception as e: | |
return { | |
"status": "error", | |
"message": str(e) | |
} | |
def run(self, input_data: CustomFileWriteToolInputSchema) -> Any: | |
response_data = self._run( | |
content=input_data.content, | |
mode=input_data.mode, | |
filename=input_data.filename | |
) | |
return response_data | |
class CustomApiToolInputSchema(BaseModel): | |
endpoint: str = Field(..., description="The specific endpoint for the API call") | |
method: str = Field(..., description="HTTP method to use (GET, POST, PUT, DELETE)") | |
headers: Optional[Dict[str, str]] = Field(None, description="HTTP headers to include in the request") | |
query_params: Optional[Dict[str, Any]] = Field(None, description="Query parameters for the request") | |
body: Optional[Dict[str, Any]] = Field(None, description="Body of the request for POST/PUT methods") | |
class CustomApiTool(BaseTool): | |
name: str = "Call Api" | |
description: str = "Tool to make API calls with customizable parameters" | |
args_schema = CustomApiToolInputSchema | |
base_url: Optional[str] = None | |
default_headers: Optional[Dict[str, str]] = None | |
default_query_params: Optional[Dict[str, Any]] = None | |
def __init__(self, base_url: Optional[str] = None, headers: Optional[Dict[str, str]] = None, query_params: Optional[Dict[str, Any]] = None, **kwargs): | |
super().__init__(**kwargs) | |
self.base_url = base_url | |
self.default_headers = headers or {} | |
self.default_query_params = query_params or {} | |
self._generate_description() | |
def _run(self, endpoint: str, method: str, headers: Optional[Dict[str, str]] = None, query_params: Optional[Dict[str, Any]] = None, body: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: | |
url = f"{self.base_url}/{endpoint}".rstrip("/") | |
headers = {**self.default_headers, **(headers or {})} | |
query_params = {**self.default_query_params, **(query_params or {})} | |
try: | |
response = requests.request( | |
method=method.upper(), | |
url=url, | |
headers=headers, | |
params=query_params, | |
json=body, | |
verify=False #TODO: add option to disable SSL verification | |
) | |
return { | |
"status_code": response.status_code, | |
"response": response.json() if response.headers.get("Content-Type") == "application/json" else response.text | |
} | |
except Exception as e: | |
return { | |
"status_code": 500, | |
"response": str(e) | |
} | |
def run(self, input_data: CustomApiToolInputSchema) -> Any: | |
response_data = self._run( | |
endpoint=input_data.endpoint, | |
method=input_data.method, | |
headers=input_data.headers, | |
query_params=input_data.query_params, | |
body=input_data.body | |
) | |
return response_data | |
class CustomCodeInterpreterSchema(BaseModel): | |
"""Input for CustomCodeInterpreterTool.""" | |
code: Optional[str] = Field( | |
None, | |
description="Python3 code used to be interpreted in the Docker container. ALWAYS PRINT the final result and the output of the code", | |
) | |
run_script: Optional[str] = Field( | |
None, | |
description="Relative path to the script to run in the Docker container. The script should contain the code to be executed.", | |
) | |
libraries_used: str = Field( | |
..., | |
description="List of libraries used in the code with proper installing names separated by commas. Example: numpy,pandas,beautifulsoup4", | |
) | |
def check_code_or_run_script(cls, values): | |
code = values.get('code') | |
run_script = values.get('run_script') | |
if not code and not run_script: | |
raise ValueError('Either code or run_script must be provided') | |
if code and run_script: | |
raise ValueError('Only one of code or run_script should be provided') | |
return values | |
class CustomCodeInterpreterTool(BaseTool): | |
name: str = "Code Interpreter" | |
description: str = "Interprets Python3 code strings with a final print statement. Requires eighter code or run_script to be provided." | |
args_schema: Type[BaseModel] = CustomCodeInterpreterSchema | |
code: Optional[str] = None | |
run_script: Optional[str] = None | |
workspace_dir: Optional[str] = None | |
def __init__(self, workspace_dir: Optional[str] = None, **kwargs): | |
super().__init__(**kwargs) | |
if workspace_dir is not None and len(workspace_dir) > 0: | |
self.workspace_dir = os.path.abspath(workspace_dir) | |
os.makedirs(self.workspace_dir, exist_ok=True) | |
self._generate_description() | |
def _get_installed_package_path(): | |
spec = importlib.util.find_spec('crewai_tools') | |
return os.path.dirname(spec.origin) | |
def _verify_docker_image(self) -> None: | |
""" | |
Verify if the Docker image is available | |
""" | |
image_tag = "code-interpreter:latest" | |
client = docker.from_env() | |
try: | |
client.images.get(image_tag) | |
except docker.errors.ImageNotFound: | |
package_path = self._get_installed_package_path() | |
dockerfile_path = os.path.join(package_path, "tools/code_interpreter_tool") | |
if not os.path.exists(dockerfile_path): | |
raise FileNotFoundError(f"Dockerfile not found in {dockerfile_path}") | |
client.images.build( | |
path=dockerfile_path, | |
tag=image_tag, | |
rm=True, | |
) | |
def _install_libraries( | |
self, container: docker.models.containers.Container, libraries: str | |
) -> None: | |
""" | |
Install missing libraries in the Docker container | |
""" | |
if libraries and len(libraries) > 0: | |
for library in libraries.split(","): | |
print(f"Installing library: {library}") | |
install_result = container.exec_run(f"pip install {library}") | |
if install_result.exit_code != 0: | |
print(f"Something went wrong while installing the library: {library}") | |
print(install_result.output.decode("utf-8")) | |
def _get_existing_container(self, container_name: str) -> Optional[docker.models.containers.Container]: | |
client = docker.from_env() | |
try: | |
existing_container = client.containers.get(container_name) | |
if existing_container.status == 'running': | |
return existing_container | |
if existing_container.status == 'exited': | |
existing_container.remove() | |
except docker.errors.NotFound: | |
pass | |
return None | |
def _init_docker_container(self) -> docker.models.containers.Container: | |
client = docker.from_env() | |
volumes = {} | |
if self.workspace_dir: | |
volumes[self.workspace_dir] = {"bind": "/workspace", "mode": "rw"} | |
container_name = "custom-code-interpreter" | |
existing_container = self._get_existing_container(container_name) | |
if existing_container: | |
return existing_container | |
return client.containers.run( | |
"code-interpreter", detach=True, tty=True, working_dir="/workspace", name=container_name, volumes=volumes | |
) | |
def run_code_in_docker(self, code: str, libraries_used: str) -> str: | |
self._verify_docker_image() | |
container = self._init_docker_container() | |
self._install_libraries(container, libraries_used) | |
# Encode the code to base64 | |
encoded_code = base64.b64encode(code.encode('utf-8')).decode('utf-8') | |
# Create a command to decode the base64 string and run the Python code | |
cmd_to_run = f'python3 -c "import base64; exec(base64.b64decode(\'{encoded_code}\').decode(\'utf-8\'))"' | |
print(f"Running code in container: \n{code}") | |
exec_result = container.exec_run(cmd_to_run) | |
if exec_result.exit_code != 0: | |
print(f"Something went wrong while running the code: \n{exec_result.output.decode('utf-8')}") | |
return f"Something went wrong while running the code: \n{exec_result.output.decode('utf-8')}" | |
print(f"Code run output: \n{exec_result.output.decode('utf-8')}") | |
return exec_result.output.decode("utf-8") | |
def _run_script(self, run_script: str,libraries_used: str) -> str: | |
with open(f"{self.workspace_dir}/{run_script}", "r") as file: | |
code = file.read() | |
return self.run_code_in_docker(code, libraries_used) | |
def _run(self, **kwargs) -> str: | |
code = kwargs.get("code", self.code) | |
run_script = kwargs.get("run_script", self.run_script) | |
libraries_used = kwargs.get("libraries_used", []) | |
if run_script: | |
return self._run_script(run_script, libraries_used) | |
return self.run_code_in_docker(code, libraries_used) | |