diff --git a/README.md b/README.md index 8a4c0ee..ce6f5a8 100644 --- a/README.md +++ b/README.md @@ -5,17 +5,46 @@ This is the Github repo corresponding to our [NAACL '24 Industry Track Paper](ht **Paper**: https://arxiv.org/abs/2406.06435 -## Setup -This repository is based off of the ALIGN system [codebase](https://github.com/ITM-Kitware/align-system). Instructions for how to set up your system can be found there (install using either `pip` or `poetry`). It is generally recommended to use a virtual Python environment to manage dependencies. -### Things to Note: -(1) This code requires Python version >=3.10 (virtual/conda env recommended). \ -(2) This repo was tested on a version of the ALIGN system corresponding to this [commit-id](https://github.com/ITM-Kitware/align-system/commit/7b67c76bf11313e31af43af53588fe70803943e7). To use this version, please run the following before running the code: +## Setup (updated 06/30/2024 to evaluate on CodeAct agent) +Set up the conda environment (`python<=3.10`) first: +```bash +conda create -n align_system python=3.10 +conda activate align_system +``` +After setting up the conda env, install the `align-system` from the forked repo as follows (the forked repo contains `CodeActAgent` class - we will merge later with the official code repo for `align-system`): +```bash +git clone https://github.com/wjdghks950/align-system.git +cd align-system +pip install -e . ``` -pip install -e git+https://github.com/ITM-Kitware/align-system.git@7b67c76bf11313e31af43af53588fe70803943e7#egg=align_system + +Other dependencies: +```bash +pip install vllm ``` +Start CodeAct: + +```bash +# start model serving at port 8080 +export CUDA_VISIBLE_DEVICES=0,1 +./scripts/start_vllm.sh /shared/nas2/shared/llms/CodeActAgent-Mistral-7b-v0.1/ + +# start code exec server at 8081 +./scripts/code_execution/start_jupyter_server.sh 8081 + +# then play around with interactive demo to make sure everything works +./scripts/run_codeact_demo.sh +``` + +## TODO List +- [ ] Implement the `__call__` method for [CodeActAgentADM](https://github.com/wjdghks950/align-system/blob/3446b221867c4e35e349dac8e03e2640b5ad1245/align_system/algorithms/codeact_agent_adm.py#L127). +- [ ] `CodeActAgentADM` is being called by [generate_outputs(...)](https://github.com/wjdghks950/align-system/blob/3446b221867c4e35e349dac8e03e2640b5ad1245/align_system/evaluation/adm_evaluator.py#L3) method in [run_evaluator.py](https://github.com/wjdghks950/llm-alignable-dm/blob/996e9be8c45b58305b4b0f187c7306dde0b667da/scripts/run_evaluator.py#L247). +- [ ] For the `CodeActAgentADM` configs, refer to [configs/codeact-agent](https://github.com/wjdghks950/llm-alignable-dm/tree/main/configs/codeact-agent). + + ## Overview To run a particular LLM-based decision-maker, use the `run_evaluator.py` file in the `scripts/` directory. This script takes as input a particular config file (found in the `configs/` directory) and a GPU ID: diff --git a/configs/codeact-agent/align/high.yml b/configs/codeact-agent/align/high.yml new file mode 100644 index 0000000..313937b --- /dev/null +++ b/configs/codeact-agent/align/high.yml @@ -0,0 +1,19 @@ +name: paper-dataset-1-12/codeact-agent/align/high +dataset: data/paper-dataset-1-12.json +llama_2_single_kdma_adm: + baseline: false + chat_template: falcon-instruct.jinja + device: cuda:0 + hf_model: tiiuae/codeact-agent + n_negative_samples: 0 + n_positive_samples: 1 + precision: half + shuffle: false + temperature: 0.7 +target_kdma_values: + continuation_of_care: 10 + fairness: 10 + moral_deservingness: 10 + protocol_focus: 10 + risk_aversion: 10 + utilitarianism: 10 diff --git a/configs/codeact-agent/align/low.yml b/configs/codeact-agent/align/low.yml new file mode 100644 index 0000000..7c306d5 --- /dev/null +++ b/configs/codeact-agent/align/low.yml @@ -0,0 +1,19 @@ +name: paper-dataset-1-12/codeact-agent/align/low +dataset: data/paper-dataset-1-12.json +llama_2_single_kdma_adm: + baseline: false + chat_template: falcon-instruct.jinja + device: cuda:0 + hf_model: tiiuae/codeact-agent + n_negative_samples: 0 + n_positive_samples: 1 + precision: half + shuffle: false + temperature: 0.7 +target_kdma_values: + continuation_of_care: 0 + fairness: 0 + moral_deservingness: 0 + protocol_focus: 0 + risk_aversion: 0 + utilitarianism: 0 diff --git a/configs/codeact-agent/align_self-consistency/high.yml b/configs/codeact-agent/align_self-consistency/high.yml new file mode 100644 index 0000000..3954d94 --- /dev/null +++ b/configs/codeact-agent/align_self-consistency/high.yml @@ -0,0 +1,19 @@ +name: paper-dataset-1-12/codeact-agent/align_self-consistency/high +dataset: data/paper-dataset-1-12.json +llama_2_single_kdma_adm: + baseline: false + chat_template: falcon-instruct.jinja + device: cuda:0 + hf_model: tiiuae/codeact-agent + n_negative_samples: 5 + n_positive_samples: 5 + precision: half + shuffle: true + temperature: 0.7 +target_kdma_values: + continuation_of_care: 10 + fairness: 10 + moral_deservingness: 10 + protocol_focus: 10 + risk_aversion: 10 + utilitarianism: 10 diff --git a/configs/codeact-agent/align_self-consistency/low.yml b/configs/codeact-agent/align_self-consistency/low.yml new file mode 100644 index 0000000..f90f938 --- /dev/null +++ b/configs/codeact-agent/align_self-consistency/low.yml @@ -0,0 +1,19 @@ +name: paper-dataset-1-12/codeact-agent/align_self-consistency/low +dataset: data/paper-dataset-1-12.json +llama_2_single_kdma_adm: + baseline: false + chat_template: falcon-instruct.jinja + device: cuda:0 + hf_model: tiiuae/codeact-agent + n_negative_samples: 5 + n_positive_samples: 5 + precision: half + shuffle: true + temperature: 0.7 +target_kdma_values: + continuation_of_care: 0 + fairness: 0 + moral_deservingness: 0 + protocol_focus: 0 + risk_aversion: 0 + utilitarianism: 0 diff --git a/configs/codeact-agent/baseline/baseline.yml b/configs/codeact-agent/baseline/baseline.yml new file mode 100644 index 0000000..b3cf0dd --- /dev/null +++ b/configs/codeact-agent/baseline/baseline.yml @@ -0,0 +1,13 @@ +name: paper-dataset-1-12/codeact-agent/baseline +dataset: data/paper-dataset-1-12.json +llama_2_single_kdma_adm: + baseline: true + chat_template: falcon-instruct.jinja + device: cuda:0 + hf_model: tiiuae/codeact-agent + n_negative_samples: 0 + n_positive_samples: 1 + precision: half + shuffle: false + temperature: 0.7 +target_kdma_values: null diff --git a/requirements-codeact.txt b/requirements-codeact.txt new file mode 100644 index 0000000..4aea64f --- /dev/null +++ b/requirements-codeact.txt @@ -0,0 +1,62 @@ +torch==2.0.1 -i https://download.pytorch.org/whl/cu118 +torchvision==0.15.2 -i https://download.pytorch.org/whl/cu118 +torchaudio==2.0.2 -i https://download.pytorch.org/whl/cu118 +pre-commit +openai==0.28 +datasets +wikipedia +langchain +streamlit +backoff +charset-normalizer==3.1.0 +numpy +pandas +pylatexenc +google-api-python-client +arxiv +# Alfworld +opencv-python +networkx +h5py +tqdm +vocab +revtok +Click +transformers +tokenizers +scipy==1.10.1 +ipython +matplotlib +cython +nltk +pipreqs +pyyaml +pytz +visdom +sympy +pycocotools +# We use MINT's docker for ALFWorld, so no need to install these +# gym==0.15.4 +# ai2thor==2.1.0 +# fast-downward @ https://github.com/MarcCote/downward/archive/faster_replan.zip +# textworld @ https://github.com/MarcCote/TextWorld/archive/handcoded_expert_integration.zip +# alfworld @ git+https://github.com/xingyaoww/alfworld.git +seaborn +google-generativeai +python-dateutil +statsmodels +# APPs evaluation +pyext +grpcio +vllm +accelerate +jsonlines +gym==0.26.2 +pandarallel +thefuzz +flask +gunicorn +apscheduler +docker +tornado +termcolor diff --git a/scripts/code_execution/api.py b/scripts/code_execution/api.py new file mode 100644 index 0000000..28591e3 --- /dev/null +++ b/scripts/code_execution/api.py @@ -0,0 +1,130 @@ +import os +import time +import json +import signal +import logging +import argparse +import tornado.ioloop +import tornado.web +import tornado.httpserver +from collections import namedtuple +from jupyter import JupyterKernel, JupyterGatewayDocker, JupyterGatewayKubernetes + +logging.basicConfig(level=logging.INFO) + +if os.environ.get("USE_KUBERNETES", "0").lower() == "1": + JupyterKernelWrapper = JupyterGatewayKubernetes + logging.info("Using Kubernetes as the backend for JupyterGateway") +else: + JupyterKernelWrapper = JupyterGatewayDocker + logging.info("Using Docker as the backend for JupyterGateway") + +# Global data structure to map convid to (JupyterKernelWrapper, JupyterKernel) +JupyterKernelType = namedtuple("JupyterKernelType", [ + "kernel_wrapper", + "kernel", + "last_access_time" +]) + +def cleanup_kernels(app, force=False): + """Cleanup kernels and gateway dockers that have timed out.""" + KERNEL_TIMEOUT = 10 * 60 # 10 minutes + current_time = time.time() + to_delete = [] + conv_id_to_kernel = app.conv_id_to_kernel + # Find all kernels that have timed out + for convid in conv_id_to_kernel.keys(): + last_access = conv_id_to_kernel[convid].last_access_time + if current_time - last_access > KERNEL_TIMEOUT: + to_delete.append(convid) + + if force: + to_delete = list(conv_id_to_kernel.keys()) + logging.info(f"Force cleanup all {len(to_delete)} kernels") + + for convid in to_delete: + # Close the kernel + # kernel: JupyterKernel = conv_id_to_kernel[convid].kernel + # kernel.shutdown() # Close the JupyterKernel + # Close the JupyterKernelWrapper by close its context manager + kernel_wrapper = conv_id_to_kernel[convid].kernel_wrapper + kernel_wrapper.__exit__(None, None, None) # Close the JupyterKernelWrapper + # Delete the entry from the global data structure + del conv_id_to_kernel[convid] + logging.info(f"Kernel closed for conversation {convid}") + +class ExecuteHandler(tornado.web.RequestHandler): + async def post(self): + data = json.loads(self.request.body) + convid = data.get("convid") + code = data.get("code") + + # Create a new kernel if not exist + new_kernel = False + + conv_id_to_kernel = self.application.conv_id_to_kernel + if convid not in conv_id_to_kernel: + kernel_wrapper = JupyterKernelWrapper( + name=f"conv-{convid}", + ) + url_suffix = kernel_wrapper.__enter__() + if os.environ.get("DEBUG", False): + logging.info(f"Kernel URL: {url_suffix}") + kernel = JupyterKernel(url_suffix, convid) + await kernel.initialize() + conv_id_to_kernel[convid] = JupyterKernelType( + kernel_wrapper, + kernel, + None + ) + new_kernel = True + logging.info(f"Kernel created for conversation {convid}") + + # Update last access time + kernel_access_time = time.time() + conv_id_to_kernel[convid] = conv_id_to_kernel[convid]._replace( + last_access_time=kernel_access_time + ) + + # Execute the code + kernel: JupyterKernel = conv_id_to_kernel[convid].kernel + result = await kernel.execute(code) + + self.write(json.dumps({ + "result": result, + "new_kernel_created": new_kernel + })) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--port", type=int, default=8000) + args = parser.parse_args() + + app = tornado.web.Application([ + (r"/execute", ExecuteHandler), + # Add other routes here + ]) + app.conv_id_to_kernel = {} + + # Wrap cleanup_kernels to pass the app object + periodic_cleanup = tornado.ioloop.PeriodicCallback( + lambda: cleanup_kernels(app), + int(os.environ.get("CLEANUP_TIMEOUT_MS", 60000)) + ) + periodic_cleanup.start() + + # Setup signal handler + def signal_handler(signum, frame, app): + logging.info("Received SIGINT, cleaning up...") + cleanup_kernels(app, force=True) + tornado.ioloop.IOLoop.current().stop() + logging.info("Cleanup complete, shutting down.") + + signal.signal( + signal.SIGINT, + lambda signum, frame: signal_handler(signum, frame, app) + ) + server = tornado.httpserver.HTTPServer(app) + server.listen(args.port) + tornado.ioloop.IOLoop.current().start() diff --git a/scripts/code_execution/jupyter.py b/scripts/code_execution/jupyter.py new file mode 100644 index 0000000..8213799 --- /dev/null +++ b/scripts/code_execution/jupyter.py @@ -0,0 +1,481 @@ +import os +import re +import time +import docker +import asyncio +import tornado +import logging + +from tornado.escape import json_encode, json_decode, url_escape +from tornado.websocket import websocket_connect, WebSocketHandler +from tornado.ioloop import IOLoop, PeriodicCallback +from tornado.httpclient import AsyncHTTPClient, HTTPRequest +from uuid import uuid4 + +logging.basicConfig(level=logging.INFO) +if os.environ.get("USE_KUBERNETES", "0").lower() == "1": + from kubernetes import client, config + config.load_incluster_config() + + +def strip_ansi(o: str) -> str: + """ + Removes ANSI escape sequences from `o`, as defined by ECMA-048 in + http://www.ecma-international.org/publications/files/ECMA-ST/Ecma-048.pdf + + # https://github.com/ewen-lbh/python-strip-ansi/blob/master/strip_ansi/__init__.py + + >>> strip_ansi("\\033[33mLorem ipsum\\033[0m") + 'Lorem ipsum' + + >>> strip_ansi("Lorem \\033[38;25mIpsum\\033[0m sit\\namet.") + 'Lorem Ipsum sit\\namet.' + + >>> strip_ansi("") + '' + + >>> strip_ansi("\\x1b[0m") + '' + + >>> strip_ansi("Lorem") + 'Lorem' + + >>> strip_ansi('\\x1b[38;5;32mLorem ipsum\\x1b[0m') + 'Lorem ipsum' + + >>> strip_ansi('\\x1b[1m\\x1b[46m\\x1b[31mLorem dolor sit ipsum\\x1b[0m') + 'Lorem dolor sit ipsum' + """ + + # pattern = re.compile(r'/(\x9B|\x1B\[)[0-?]*[ -\/]*[@-~]/') + pattern = re.compile(r'\x1B\[\d+(;\d+){0,2}m') + stripped = pattern.sub('', o) + return stripped + +class JupyterKernel: + def __init__( + self, + url_suffix, + convid, + lang="python" + ): + self.base_url = f"http://{url_suffix}" + self.base_ws_url = f"ws://{url_suffix}" + self.lang = lang + self.kernel_id = None + self.ws = None + self.convid = convid + logging.info(f"Jupyter kernel created for conversation {convid} at {url_suffix}") + + self.heartbeat_interval = 10000 # 10 seconds + self.heartbeat_callback = None + + async def initialize(self): + await self.execute(r"%colors nocolor") + # pre-defined tools + self.tools_to_run = [ + # TODO: You can add code for your pre-defined tools here + ] + for tool in self.tools_to_run: + # logging.info(f"Tool initialized:\n{tool}") + await self.execute(tool) + + async def _send_heartbeat(self): + if not self.ws: + return + try: + self.ws.ping() + # logging.info("Heartbeat sent...") + except tornado.iostream.StreamClosedError: + # logging.info("Heartbeat failed, reconnecting...") + try: + await self._connect() + except ConnectionRefusedError: + logging.info("ConnectionRefusedError: Failed to reconnect to kernel websocket - Is the kernel still running?") + + async def _connect(self): + if self.ws: + self.ws.close() + self.ws = None + + client = AsyncHTTPClient() + if not self.kernel_id: + n_tries = 5 + while n_tries > 0: + try: + response = await client.fetch( + "{}/api/kernels".format(self.base_url), + method="POST", + body=json_encode({"name": self.lang}), + ) + kernel = json_decode(response.body) + self.kernel_id = kernel["id"] + break + except Exception as e: + # kernels are not ready yet + n_tries -= 1 + await asyncio.sleep(1) + + if n_tries == 0: + raise ConnectionRefusedError("Failed to connect to kernel") + + ws_req = HTTPRequest( + url="{}/api/kernels/{}/channels".format( + self.base_ws_url, url_escape(self.kernel_id) + ) + ) + self.ws = await websocket_connect(ws_req) + logging.info("Connected to kernel websocket") + + # Setup heartbeat + if self.heartbeat_callback: + self.heartbeat_callback.stop() + self.heartbeat_callback = PeriodicCallback(self._send_heartbeat, self.heartbeat_interval) + self.heartbeat_callback.start() + + async def execute(self, code, timeout=60): + if not self.ws: + await self._connect() + + msg_id = uuid4().hex + self.ws.write_message( + json_encode( + { + "header": { + "username": "", + "version": "5.0", + "session": "", + "msg_id": msg_id, + "msg_type": "execute_request", + }, + "parent_header": {}, + "channel": "shell", + "content": { + "code": code, + "silent": False, + "store_history": False, + "user_expressions": {}, + "allow_stdin": False, + }, + "metadata": {}, + "buffers": {}, + } + ) + ) + + outputs = [] + + + async def wait_for_messages(): + execution_done = False + while not execution_done: + msg = await self.ws.read_message() + msg = json_decode(msg) + msg_type = msg['msg_type'] + parent_msg_id = msg['parent_header'].get('msg_id', None) + + if parent_msg_id != msg_id: + continue + + if os.environ.get("DEBUG", False): + logging.info(f"MSG TYPE: {msg_type.upper()} DONE:{execution_done}\nCONTENT: {msg['content']}") + + if msg_type == 'error': + traceback = "\n".join(msg["content"]["traceback"]) + outputs.append(traceback) + execution_done = True + elif msg_type == 'stream': + outputs.append(msg['content']['text']) + elif msg_type in ['execute_result', 'display_data']: + outputs.append(msg['content']['data']['text/plain']) + if 'image/png' in msg['content']['data']: + # use markdone to display image (in case of large image) + # outputs.append(f"\n\n") + outputs.append(f"![image](data:image/png;base64,{msg['content']['data']['image/png']})") + + elif msg_type == 'execute_reply': + execution_done = True + return execution_done + + async def interrupt_kernel(): + client = AsyncHTTPClient() + interrupt_response = await client.fetch( + f"{self.base_url}/api/kernels/{self.kernel_id}/interrupt", + method="POST", + body=json_encode({"kernel_id": self.kernel_id}), + ) + logging.info(f"Kernel interrupted: {interrupt_response}") + + try: + execution_done = await asyncio.wait_for(wait_for_messages(), timeout) + except asyncio.TimeoutError: + await interrupt_kernel() + return f"[Execution timed out ({timeout} seconds).]" + + if not outputs and execution_done: + ret = "[Code executed successfully with no output]" + else: + ret = ''.join(outputs) + + # Remove ANSI + ret = strip_ansi(ret) + + if os.environ.get("DEBUG", False): + logging.info(f"OUTPUT:\n{ret}") + return ret + + async def shutdown_async(self): + if self.kernel_id: + client = AsyncHTTPClient() + await client.fetch( + "{}/api/kernels/{}".format(self.base_url, self.kernel_id), + method="DELETE", + ) + self.kernel_id = None + if self.ws: + self.ws.close() + self.ws = None + +class JupyterGatewayDocker: + DOCKER_IMAGE = "docker.io/xingyaoww/codeact-executor" + RESOURCE_CONSTRAINTS = { + 'mem_limit': '8g', # Limit to 8 GB of memory + 'nano_cpus': 2 * 10 ** 9 # Limit to 2 CPU cores + # Add other constraints as needed + } + + def __init__(self, name: str): + self.name = name + self.client = docker.from_env() + self.container = None + self.url = None + + def _get_free_port(self): + """Get a free port from the OS.""" + import socket + + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("", 0)) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + return s.getsockname()[1] + + def _wait_for_container(self, container): + # Wait for a specific log message or condition indicating the app inside is ready + # Note: This is just an example, you should adjust the condition to match your use case + timeout = 60 # seconds + start_time = time.time() + while True: + # Get the latest logs + logs = container.logs().decode('utf-8') + logging.info(logs) + # Check for a specific message or condition in the logs + if "Jupyter Kernel Gateway" in logs and "is available at" in logs: + break + # Exit if timeout reached + if time.time() - start_time > timeout: + logging.info("Timeout reached while waiting for container to be ready.") + break + time.sleep(1) + + def __enter__(self): + # Check if the image exists locally, if not, pull it + try: + self.client.images.get(self.DOCKER_IMAGE) + except docker.errors.ImageNotFound: + self.client.images.pull(self.DOCKER_IMAGE) + + port = self._get_free_port() + # Run the container and expose the port + self.container = self.client.containers.run( + self.DOCKER_IMAGE, + name=self.name, + detach=True, + ports={"8888/tcp": port}, + remove=True, # Removes container when it's stopped + **self.RESOURCE_CONSTRAINTS, + ) + self._wait_for_container(self.container) + + url_suffix = f"localhost:{port}" + return url_suffix + + def __exit__(self, exc_type, exc_val, exc_tb): + if self.container: + self.container.stop() + + +class JupyterGatewayKubernetes: + IMAGE = "docker.io/xingyaoww/codeact-executor" + NAMESPACE = os.environ.get("KUBERNETES_NAMESPACE", "codeact-chat-ui") + RESOURCE_CONSTRAINTS = { + 'limits': {'memory': "512Mi", 'cpu': "1"}, + 'requests': {'memory': "256Mi", 'cpu': "1"} + } + + def __init__(self, name): + self.pod_name = f"executor-{name}-{uuid4().hex[:6]}" # Generate a unique pod name + self.api_instance = client.CoreV1Api() + self.port = 8888 # Default port for Jupyter Kernel Gateway + + def _create_pod(self): + # Define container + container = client.V1Container( + name=self.pod_name, + image=self.IMAGE, + ports=[client.V1ContainerPort(container_port=self.port)], + resources=client.V1ResourceRequirements( + **self.RESOURCE_CONSTRAINTS + ), + ) + + # Define pod spec + metadata = client.V1ObjectMeta( + name=self.pod_name, + labels={"app": self.pod_name} # Add this line to include labels + ) + spec = client.V1PodSpec( + containers=[container], + restart_policy="Never" + ) + pod = client.V1Pod( + api_version="v1", + kind="Pod", + metadata=metadata, + spec=spec, + ) + + # Create pod + self.api_instance.create_namespaced_pod( + namespace=self.NAMESPACE, + body=pod + ) + logging.info(f"Pod {self.pod_name} is being created...") + + def _create_service(self): + service = client.V1Service( + api_version="v1", + kind="Service", + metadata=client.V1ObjectMeta( + name=self.pod_name + ), + spec=client.V1ServiceSpec( + selector={"app": self.pod_name}, + ports=[client.V1ServicePort(port=self.port, target_port=self.port)], + type="ClusterIP" # Default and suitable for internal communication + ) + ) + self.api_instance.create_namespaced_service(namespace=self.NAMESPACE, body=service) + logging.info(f"Service {self.pod_name} created.") + + + def _wait_for_pod_to_be_ready(self): + ready = False + while not ready: + pod_status = self.api_instance.read_namespaced_pod_status( + self.pod_name, self.NAMESPACE + ) + if pod_status.status.phase == "Running": + ready = True + # self.pod_ip = pod_status.status.pod_ip + logging.info(f"Pod Ready. IP: {pod_status.status.pod_ip}") + else: + time.sleep(1) # Wait and check again + + def __enter__(self): + config.load_incluster_config() # Make sure your kubeconfig is correctly set up + self._create_pod() + self._wait_for_pod_to_be_ready() + self._create_service() # Ensure the service is created after the pod is ready + service = self.api_instance.read_namespaced_service( + name=self.pod_name, namespace=self.NAMESPACE + ) + service_ip = service.spec.cluster_ip + return f"{service_ip}:{self.port}" + + def __exit__(self, exc_type, exc_val, exc_tb): + self.api_instance.delete_namespaced_service(name=self.pod_name, namespace=self.NAMESPACE) + logging.info(f"Service {self.pod_name} deleted.") + self.api_instance.delete_namespaced_pod(self.pod_name, self.NAMESPACE) + logging.info(f"Pod {self.pod_name} has been deleted.") + + +if __name__ == "__main__": + + TO_EXEC = """ +from langchain.utilities import GoogleSearchAPIWrapper + +search = GoogleSearchAPIWrapper() +results = search.results("python", num_results=1) + +# Obtain the link from the first search result +search_link = results[0]['link'] +logging.info("Search Result Link:", search_link) +""" + TO_EXEC_1 = """ +from boilerpy3 import extractors +extractor = extractors.ArticleExtractor() + +# From a URL +content = extractor.get_content_from_url(search_link) +# If you want HTML (can be messy) +# content = extractor.get_marked_html_from_url(search_link) + +# Display the first line of the extracted content +first_line = content.splitlines()[0] +logging.info("First Line from Search Result:", first_line) +""" + + TO_EXEC_2 = """ +from langchain.utilities import GoogleSearchAPIWrapper + +search_elon = GoogleSearchAPIWrapper() +results_elon = search_elon.results("Elon Musk birth year", num_results=1) +logging.info(results_elon) + +search_taylor = GoogleSearchAPIWrapper() +results_taylor = search_taylor.results("Taylor Swift birth year", num_results=1) +logging.info(results_taylor) + +search_messi = GoogleSearchAPIWrapper() +results_messi = search_messi.results("Lionel Messi birth year", num_results=1) +logging.info(results_messi) + +def calculate_age(birth_year, current_year=2023): + return current_year - birth_year +ages = { + "Elon Musk": calculate_age(1971), + "Taylor Swift": calculate_age(1989), + "Lionel Messi": calculate_age(1987) +} +ages +""" + with JupyterGatewayDocker() as url_suffix: + j = JupyterKernel(url_suffix) + logging.info("EXECUTE:") + logging.info(TO_EXEC) + logging.info("OUTPUT:") + logging.info(j.execute(TO_EXEC)) + + logging.info("EXECUTE:") + logging.info(TO_EXEC_1) + logging.info("OUTPUT:") + logging.info(j.execute(TO_EXEC_1)) + + logging.info("EXECUTE:") + logging.info(TO_EXEC_2) + logging.info("OUTPUT:") + logging.info(j.execute(TO_EXEC_2)) + + # logging.info(j.execute(( + # "prime_numbers = [11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47]\n" + # "mean_prime = sum(prime_numbers) / len(prime_numbers)\n" + # "mean_prime\n" + # ))) + # logging.info(j.execute("logging.info('Hello from Jupyter!')")) + # logging.info(j.execute("logging.info(a)")) + # logging.info(j.execute("a = 1 * 9")) + # logging.info(j.execute("logging.info(a)")) + # logging.info(j.execute("import transformers")) + # logging.info(j.execute("!pwd")) + # logging.info(j.execute("!pip3 install transformers")) + del j diff --git a/scripts/code_execution/start_jupyter_server.sh b/scripts/code_execution/start_jupyter_server.sh new file mode 100755 index 0000000..e7bcfd5 --- /dev/null +++ b/scripts/code_execution/start_jupyter_server.sh @@ -0,0 +1,11 @@ +#!/bin/bash +JUPYTER_API_PORT=$1 +echo "JUPYTER_API_PORT=$JUPYTER_API_PORT" + +pushd scripts/code_execution +export PYTHONPATH=`pwd`:$PYTHONPATH +# gunicorn -w 1 api:app --bind localhost:$JUPYTER_API_PORT +# TODO: fix the issue of sharing data across worker before enabling multiple workers +python3 api.py --port $JUPYTER_API_PORT + +popd diff --git a/scripts/codeact_demo.py b/scripts/codeact_demo.py new file mode 100644 index 0000000..7d4850a --- /dev/null +++ b/scripts/codeact_demo.py @@ -0,0 +1,199 @@ +import re +import json +import openai +import pathlib +import requests +import argparse +from termcolor import colored +from transformers import AutoTokenizer +from typing import List, Dict +from datetime import datetime + +if openai.__version__ >= "0.28.1": + raise RuntimeError( + "Please use the compatbile version of openai (<=0.28.1) to use this script." + ) + +class ClientJupyterKernel: + def __init__(self, url, conv_id): + self.url = url + self.conv_id = conv_id + print(f"ClientJupyterKernel initialized with url={url} and conv_id={conv_id}") + + def execute(self, code): + payload = {"convid": self.conv_id, "code": code} + response = requests.post(self.url, data=json.dumps(payload)) + response_data = response.json() + if response_data["new_kernel_created"]: + print(f"New kernel created for conversation {self.conv_id}") + return response_data["result"] + + +class Generator: + def __init__(self, model_name: str, openai_base_url: str): + self.model_name = model_name + self.openai_base_url = openai_base_url + print( + f"Generator initialized with model_name={model_name} and openai_base_url={openai_base_url}" + ) + + def generate( + self, + messages: List[Dict[str, str]], + do_sample: bool = True, + max_new_tokens: int = 512, + stop_sequences: List[str] = ["<|im_start|>", "<|im_end|>"], + temperature: float = 0.1, + top_p: float = 0.95, + ) -> str: + completion = openai.ChatCompletion.create( + model=self.model_name, + messages=messages, + temperature=temperature if do_sample else 0.0, + max_tokens=max_new_tokens, + top_p=top_p if do_sample else 1.0, + stop=stop_sequences, + api_base=self.openai_base_url, + ) + content = completion.choices[0].message.content + if "" in content and "" not in content: + content += "" + return content + + +SYSTEM_MESSAGE = """A chat between a curious user and an artificial intelligence assistant. The assistant gives helpful, detailed, and polite answers to the user's questions. +The assistant can interact with an interactive Python (Jupyter Notebook) environment and receive the corresponding output when needed. The code should be enclosed using "" tag, for example: print("Hello World!") . +The assistant should attempt fewer things at a time instead of putting too much code in one block. The assistant can install packages through PIP by !pip install [package needed] and should always import packages and define variables before starting to use them. +The assistant should stop and provide an answer when they have already obtained the answer from the execution result. Whenever possible, execute the code for the user using instead of providing it. +The assistant's response should be concise, but do express their thoughts. +""" + +# NOTE: You may also include the following information in the system message if you have pre-defined tools for the assistant to execute. +# Tool function available (already imported in environment): +# [1] google_search(query: str, num_results: int = 1) -> dict +# Search google for the given query. You should rely on this to get most up-to-date information. Do not make things up. +# For example: \"\"google_search(\"Hello world\") \"\" +# [2] get_url_content(url: str) -> str +# Get content from URL. You can use this when you want to access more information from an URL. +# [3] get_url_html(url: str) -> str +# Get HTML from URL (could be messy). + + + +class Agent: + COLOR_MAP = { + "user": "green", + "execution_output": "yellow", + "assistant": "blue", + "system": "red", + } + + def __init__( + self, + generator: Generator, + code_executor: ClientJupyterKernel, + system_message: str = SYSTEM_MESSAGE, + conv_id: str = None, + **kwargs, + ): + self.messages = [ + {"role": "system", "content": system_message}, + ] + self.kwargs = { + "stop_sequences": ["<|im_end|>", "", "<|im_start|>"], + "do_sample": False, + "max_new_tokens": 512, + **kwargs, + } + self.generator = generator + self.code_executor: ClientJupyterKernel = code_executor + self.conv_id = conv_id + # print the messages + for message in self.messages: + self.print_message(message) + + def print_message(self, message): + # bold print the role + print("-" * 20) + print( + colored( + message["role"].upper(), self.COLOR_MAP[message["role"]], attrs=["bold"] + ) + ) + print(colored(message["content"], self.COLOR_MAP[message["role"]])) + + def handle_execution(self, completion: str, code_executor: ClientJupyterKernel): + # use regex to capture the code + code = re.search(r"(.*)", completion, re.DOTALL) + # check if the code is valid + if code is not None: + code = code.group(1) + # execute the code + result = code_executor.execute(code) + # return the result + return result + return None + + def handle_user_message(self, message, n_max_executions=3): + # append the message + self.messages.append({"role": "user", "content": message}) + self.print_message(self.messages[-1]) + + execution_count = 0 + while ( + self.messages[-1]["role"] == "user" and execution_count < n_max_executions + ): + response = self.generator.generate(self.messages, **self.kwargs) + self.messages.append({"role": "assistant", "content": response}) + self.print_message(self.messages[-1]) + + execution_output = self.handle_execution(response, self.code_executor) + if execution_output is not None: + execution_count += 1 + self.messages.append( + { + "role": "user", + "content": f"Execution Output:\n" + execution_output, + } + ) + self.print_message( + {"role": "execution_output", "content": execution_output} + ) + + if execution_count == n_max_executions: + assert self.messages[-1]["role"] == "user" + self.messages.append( + { + "role": "assistant", + "content": f"I have reached the maximum number of executions ({n_max_executions=}). Can you assist me or ask me another question?", + } + ) + self.print_message(self.messages[-1]) + + def run(self): + while True: + message = input("User Input> ") + if message == "exit": + self.save() + break + self.handle_user_message(message) + + def save(self): + pathlib.Path("conv_data").mkdir(exist_ok=True) + path = f"conv_data/{self.conv_id}.json" + with open(path, "w") as f: + json.dump(self.messages, f, indent=2) + + +parser = argparse.ArgumentParser() +parser.add_argument("--model-name", type=str, required=True, default="xingyaoww/CodeActAgent-Mistral-7b-v0.1") +parser.add_argument("--openai-base-url", type=str, required=True, default="http://localhost:8080/v1") +parser.add_argument("--jupyter-kernel-url", type=str, required=True, default="http://localhost:8081/execute") +args = parser.parse_args() + +CONV_ID = "demo-" + datetime.now().strftime("%Y-%m-%d-%H-%M-%S") + +code_executor = ClientJupyterKernel(args.jupyter_kernel_url, CONV_ID) +generator = Generator(args.model_name, args.openai_base_url) +agent = Agent(generator, code_executor, conv_id=CONV_ID) +agent.run() diff --git a/scripts/run_codeact_demo.sh b/scripts/run_codeact_demo.sh new file mode 100755 index 0000000..73a0ff9 --- /dev/null +++ b/scripts/run_codeact_demo.sh @@ -0,0 +1,13 @@ +#!/bin/bash + +YOUR_API_HOST=localhost +YOUR_API_PORT=8080 +YOUR_CODE_EXEC_ENGINE_HOST=localhost +YOUR_CODE_EXEC_ENGINE_PORT=8081 + +# Make sure you started model server (vLLM or llama.cpp) and code execution engine before running this! +set -x +python3 scripts/codeact_demo.py \ + --model-name CodeActAgent-Mistral-7b-v0.1 \ + --openai-base-url "http://$YOUR_API_HOST:$YOUR_API_PORT/v1" \ + --jupyter-kernel-url "http://$YOUR_CODE_EXEC_ENGINE_HOST:$YOUR_CODE_EXEC_ENGINE_PORT/execute" diff --git a/scripts/run_evaluator.py b/scripts/run_evaluator.py index 0748804..1258127 100644 --- a/scripts/run_evaluator.py +++ b/scripts/run_evaluator.py @@ -134,6 +134,14 @@ def kaleido_adm(config): return algorithm, config +def codeact_agent_adm(config): + from align_system.algorithms.codeact_agent_adm import CodeActAgentADM + + algorithm = CodeActAgentADM(**config) + algorithm.load_model() + return algorithm, config + + def dummy_adm(config): return DummyADM(**config), config @@ -162,6 +170,7 @@ def multi_comparison_adm(config): eval_fns = [ + codeact_agent_adm, chat_kdma_predicting_adm, llama_2_single_kdma_adm, llama_2_single_kdma_adm_with_rag, @@ -184,10 +193,14 @@ def main(config_file, cuda_idx=None): with open(config_file, 'r') as f: config = yaml.safe_load(f) + print("<< config >>\n", config) + with open(config['dataset'], 'r') as f: dataset = json.load(f) dataset = dataset + print(f"<< dataset >> (Dataset (length): {len(dataset)}\n\n{dataset[0]})") + if 'cache_dir' in config: cache_dir = config['cache_dir'] with open(os.path.join(cache_dir, 'input_output_labels.json')) as f: @@ -204,6 +217,7 @@ def main(config_file, cuda_idx=None): experiment_name = config['name'] if eval_fn.__name__ in config: + print("eval_fn.__name__: ", eval_fn.__name__) if cuda_idx is not None: config[eval_fn.__name__]['device'] = f'cuda:{cuda_idx}' diff --git a/scripts/start_vllm.sh b/scripts/start_vllm.sh new file mode 100755 index 0000000..462a595 --- /dev/null +++ b/scripts/start_vllm.sh @@ -0,0 +1,32 @@ +#!/bin/bash +# Require install docker and nvidia-docker2 +# https://docs.nvidia.com/datacenter/cloud-native/container-toolkit/1.8.0/install-guide.html + +MODEL_PATH=$1 +MODEL_DIR=$(dirname $MODEL_PATH) +MODEL_NAME=$(basename $MODEL_PATH) # CodeActAgent-Mistral-7b-v0.1 +PORT=$2 # 8080 +if [ -z "$PORT" ]; then + PORT=8080 +fi +echo "PORT=$PORT" +echo "MODEL_PATH=$MODEL_PATH" +echo "MODEL_DIR=$MODEL_DIR" +echo "CUDA_VISIBLE_DEVICES=$CUDA_VISIBLE_DEVICES" + +# get NUM_GPU from CUDA_VISIBLE_DEVICES +NUM_GPU=$(echo $CUDA_VISIBLE_DEVICES | tr ',' '\n' | wc -l) +echo "NUM_GPU=$NUM_GPU" + +docker run \ + -e CUDA_VISIBLE_DEVICES=$CUDA_VISIBLE_DEVICES \ + --runtime nvidia --gpus all \ + -v $MODEL_DIR:/data \ + -p $PORT:8000 \ + --ipc=host \ + vllm/vllm-openai:latest \ + --host 0.0.0.0 \ + --model /data/$MODEL_NAME \ + --served-model-name $MODEL_NAME \ + --dtype=half \ + --tensor-parallel-size=$NUM_GPU