diff --git a/.gitignore b/.gitignore index bf0e1ac..c7091ed 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,12 @@ *.pyc .molns/ molns_install.log +.ec2_creds_molns +.idea/ +*.tar.gz +*.tar +notes +qsubscript +/dockerfile_* +\#qsubscript\# +docker_test.py diff --git a/MolnsLib/DockerProvider.py b/MolnsLib/DockerProvider.py new file mode 100644 index 0000000..b2f7ccd --- /dev/null +++ b/MolnsLib/DockerProvider.py @@ -0,0 +1,245 @@ +import logging +import os +import tempfile +import time +import DockerProxy +import constants +import installSoftware +from collections import OrderedDict +from DockerSSH import DockerSSH +from constants import Constants +from molns_provider import ProviderBase, ProviderException + + +def docker_provider_default_key_name(): + user = os.environ.get('USER') or 'USER' + return "{0}_molns_docker_sshkey_{1}".format(user, hex(int(time.time())).replace('0x', '')) + + +class DockerBase(ProviderBase): + """ Base class for Docker. """ + + SSH_KEY_EXTENSION = ".pem" + PROVIDER_TYPE = 'Docker' + + def __init__(self, name, config=None, config_dir=None, **kwargs): + ProviderBase.__init__(self, name, config, config_dir, **kwargs) + self.docker = DockerProxy.DockerProxy() + self.ssh = DockerSSH(self.docker) + + def _get_container_status(self, container_id): + self.docker.container_status(container_id) + + def start_instance(self, num=1): + """ Start given number of (or 1) containers. """ + started_containers = [] + for i in range(num): + container_id = self.docker.create_container(self.provider.config["molns_image_name"], name=self.name, + port_bindings={ + Constants.DEFAULT_PUBLIC_WEBSERVER_PORT: + ('127.0.0.1', self.config['web_server_port']), + Constants.DEFAULT_PRIVATE_NOTEBOOK_PORT: + ('127.0.0.1', self.config['notebook_port'])}, + working_directory=self.config["working_directory"]) + stored_container = self.datastore.get_instance(provider_instance_identifier=container_id, + ip_address=self.docker.get_container_ip_address(container_id) + , provider_id=self.provider.id, controller_id=self.id, + provider_type=constants.Constants.DockerProvider) + started_containers.append(stored_container) + if num == 1: + return started_containers[0] + return started_containers + + def resume_instance(self, instances): + instance_ids = [] + if isinstance(instances, list): + for instance in instances: + instance_ids.append(instance.provider_instance_identifier) + else: + instance_ids.append(instances.provider_instance_identifier) + self.docker.start_containers(instance_ids) + + def stop_instance(self, instances): + instance_ids = [] + if isinstance(instances, list): + for instance in instances: + instance_ids.append(instance.provider_instance_identifier) + else: + instance_ids.append(instances.provider_instance_identifier) + self.docker.stop_containers(instance_ids) + + def terminate_instance(self, instances): + instance_ids = [] + if isinstance(instances, list): + for instance in instances: + instance_ids.append(instance.provider_instance_identifier) + self.datastore.delete_instance(instance) + else: + instance_ids.append(instances.provider_instance_identifier) + self.datastore.delete_instance(instances) + self.docker.terminate_containers(instance_ids) + + def exec_command(self, container_id, command): + self.docker.execute_command(container_id, command) + + +class DockerProvider(DockerBase): + """ Provider handle for local Docker based service. """ + + OBJ_NAME = 'DockerProvider' + + CONFIG_VARS = OrderedDict([ + ('ubuntu_image_name', + {'q': 'Base Ubuntu image to use', 'default': constants.Constants.DOCKER_DEFAULT_IMAGE, + 'ask': True}), + ('molns_image_name', + {'q': 'Local MOLNs image (Docker image ID or image tag) to use ', 'default': 'briandrawert/molns-default-image:latest', 'ask': True}), + ('key_name', + {'q': 'Docker Key Pair name', 'default': "docker-default", 'ask': False}), # Unused. + ('group_name', + {'q': 'Docker Security Group name', 'default': 'molns', 'ask': False}), # Unused. + ('login_username', + {'default': 'ubuntu', 'ask': False}), # Unused. + ('provider_type', + {'default': constants.Constants.DockerProvider, 'ask': False}) + ]) + + def get_config_credentials(self): + return None + + @staticmethod + def __get_new_dockerfile_name(): + import uuid + filename = constants.Constants.DOCKERFILE_NAME + str(uuid.uuid4()) + return filename + + def check_ssh_key(self): + """ Returns true. (Implementation does not use SSH.) """ + return True + + def create_ssh_key(self): + """ Returns true. """ + ssh_key_dir = os.path.join(self.config_dir, self.name) + with open(ssh_key_dir, 'w') as fp: + fp.write("This is a dummy key.") + os.chmod(ssh_key_dir, 0o600) + + def check_security_group(self): + """ Returns true. (Implementation does not use SSH.) """ + return True + + def create_seurity_group(self): + """ Returns true. (Implementation does not use SSH.) """ + return True + + def create_molns_image(self): + """ Create a molns image, save it on localhost and return DockerImage ID of created image. """ + file_to_remove = None + try: + dockerfile, file_to_remove = self._create_dockerfile(installSoftware.InstallSW.get_command_list()) + image_id = self.docker.build_image(dockerfile) + return image_id + except Exception as e: + logging.exception(e) + raise ProviderException("Failed to create molns image: {0}".format(e)) + finally: + if file_to_remove is not None: + os.remove(file_to_remove) + + def check_molns_image(self): + """ Check if the molns image exists. """ + if 'molns_image_name' in self.config and self.config['molns_image_name'] is not None \ + and self.config['molns_image_name'] != '': + return self.docker.image_exists(self.config['molns_image_name']) + return False + + def _create_dockerfile(self, commands): + """ Create Dockerfile from given commands. """ + import Utils + + user_id = Utils.get_sudo_user_id() + dockerfile = '''FROM ubuntu:14.04\nRUN apt-get update\n\n# Add user ubuntu.\nRUN useradd -u {0} -ms /bin/bash ubuntu\n + # Set up base environment.\nRUN apt-get install -yy \ \n software-properties-common \ \n + python-software-properties \ \n wget \ \n curl \ \n git \ \n ipython \ \n sudo \ \n + screen \ \n iptables \nRUN echo "ubuntu ALL=(ALL) NOPASSWD: ALL" >> /etc/sudoers + \nWORKDIR /home/ubuntu\n\nUSER ubuntu\nENV HOME /home/ubuntu'''.format(user_id) + + flag = False + + for entry in commands: + if isinstance(entry, list): + dockerfile += '''\n\nRUN ''' + first = True + flag = False + for sub_entry in entry: + if first is True: + dockerfile += self._preprocess(sub_entry) + first = False + else: + dockerfile += ''' && \ \n ''' + self._preprocess(sub_entry) + else: + if flag is False: + dockerfile += '''\n\nRUN ''' + flag = True + dockerfile += self._preprocess(entry) + else: + dockerfile += ''' && \ \n ''' + self._preprocess(entry) + + dockerfile += '''\n\n\n''' + + dockerfile_file = DockerProvider.__get_new_dockerfile_name() + with open(dockerfile_file, 'w') as Dockerfile: + Dockerfile.write(dockerfile) + named_dockerfile = tempfile.NamedTemporaryFile() + named_dockerfile.write(dockerfile) + named_dockerfile.seek(0) + + return named_dockerfile, dockerfile_file + + @staticmethod + def _preprocess(command): + """ Prepends "shell only" commands with '/bin/bash -c'. """ + for shell_command in DockerProxy.DockerProxy.shell_commands: + if shell_command in command: + replace_string = "/bin/bash -c \"" + shell_command + command = command.replace(shell_command, replace_string) + command += "\"" + return command + + +def get_default_working_directory(config=None): + if config is None: + raise Exception("Config should not be None.") + return os.path.realpath(os.path.join(config.config_dir, "docker_controller_working_dirs", config.name)) + + +class DockerController(DockerBase): + """ Provider handle for a Docker controller. """ + + OBJ_NAME = 'DockerController' + CONFIG_VARS = OrderedDict([ + ('web_server_port', + {'q': 'Port to use for web server', 'default': "8080", + 'ask': True}), + ('notebook_port', + {'q': 'Port to use for jupyter notebook', 'default': "8081", + 'ask': True}), + ('working_directory', + {'q': 'Working directory for this controller', 'default': get_default_working_directory, 'ask': True}), + ('ssh_key_file', + {'q': 'SSH key to a qsub and docker enabled cluster', 'default': "None", 'ask': True}) + ]) + + def get_instance_status(self, instance): + return self.docker.container_status(instance.provider_instance_identifier) + + +class DockerWorkerGroup(DockerController): + """ Provider handle for Docker worker group. """ + + OBJ_NAME = 'DockerWorkerGroup' + + CONFIG_VARS = OrderedDict([ + ('num_vms', + {'q': 'Number of containers in group', 'default': '1', 'ask': True}), + ]) diff --git a/MolnsLib/DockerProxy.py b/MolnsLib/DockerProxy.py new file mode 100644 index 0000000..d8a7fdc --- /dev/null +++ b/MolnsLib/DockerProxy.py @@ -0,0 +1,335 @@ +import logging +import os +import re +import time +import constants +from molns_provider import ProviderBase +from constants import Constants +from docker import APIClient as Client +from docker.errors import NotFound, NullResource, APIError + + +class InvalidVolumeName(Exception): + pass + +class ExecuteCommandException(Exception): + pass + + +class DockerProxy: + + """ A wrapper over docker-py and some utility methods and classes. """ + + LOG_TAG = "Docker " + + shell_commands = ["source"] + + class ImageBuildException(Exception): + def __init__(self, message=None): + super("Something went wrong while building docker container image.\n{0}".format(message)) + + def __init__(self): + if os.environ.get('DOCKER_HOST') is not None: + self.client = Client(base_url=os.environ.get('DOCKER_HOST')) + else: + self.client = Client(base_url=Constants.DOCKER_BASE_URL) + self.build_count = 0 + logging.basicConfig(level=logging.DEBUG) + + @staticmethod + def get_container_volume_from_working_dir(working_directory): + import os + return os.path.join("/home/ubuntu/", os.path.basename(working_directory)) + + def create_container(self, image_str, working_directory=None, name=None, + port_bindings={Constants.DEFAULT_PUBLIC_WEBSERVER_PORT: ('127.0.0.1', 8080), + Constants.DEFAULT_PRIVATE_NOTEBOOK_PORT: ('127.0.0.1', 8081)}): + """Creates a new container with elevated privileges. Returns the container ID. Maps port 80 of container + to 8080 of locahost by default""" + + docker_image = DockerImage.from_string(image_str) + volume_dir = DockerProxy.get_container_volume_from_working_dir(working_directory) + + if name is None: + import uuid + random_str = str(uuid.uuid4()) + name = constants.Constants.MolnsDockerContainerNamePrefix + random_str[:8] + image = docker_image.image_id if docker_image.image_id is not Constants.DockerNonExistentTag \ + else docker_image.image_tag + + logging.info("Using image {0}".format(image)) + import os + if DockerProxy._verify_directory(working_directory) is False: + if working_directory is not None: + raise InvalidVolumeName("\n\nMOLNs uses certain reserved names for its configuration files in the " + "controller environment, and unfortunately the provided name for working " + "directory of the controller cannot be one of these. Please configure this " + "controller again with a different volume name and retry. " + "Here is the list of forbidden names: \n{0}" + .format(Constants.ForbiddenVolumeNames)) + + logging.warning(DockerProxy.LOG_TAG + "Unable to verify provided directory to use to as volume. Volume will NOT " + "be created.") + hc = self.client.create_host_config(privileged=True, port_bindings=port_bindings) + container = self.client.create_container(image=image, name=name, command="/bin/bash", tty=True, detach=True, + ports=[Constants.DEFAULT_PUBLIC_WEBSERVER_PORT, + Constants.DEFAULT_PRIVATE_NOTEBOOK_PORT], + host_config=hc, + environment={"PYTHONPATH": "/usr/local/"}) + + else: + container_mount_point = '/home/ubuntu/{0}'.format(os.path.basename(working_directory)) + hc = self.client.create_host_config(privileged=True, port_bindings=port_bindings, + binds={working_directory: {'bind': container_mount_point, + 'mode': 'rw'}}) + + container = self.client.create_container(image=image, name=name, command="/bin/bash", tty=True, detach=True, + ports=[Constants.DEFAULT_PUBLIC_WEBSERVER_PORT, + Constants.DEFAULT_PRIVATE_NOTEBOOK_PORT], + volumes=container_mount_point, host_config=hc, + working_dir=volume_dir, + environment={"PYTHONPATH": "/usr/local/"}) + + container_id = container.get("Id") + + return container_id + + # noinspection PyBroadException + @staticmethod + def _verify_directory(working_directory): + import os + if working_directory is None or os.path.basename(working_directory) in Constants.ForbiddenVolumeNames: + return False + try: + if not os.path.exists(working_directory): + os.makedirs(working_directory) + return True + except: + return False + + def stop_containers(self, container_ids): + """Stops given containers.""" + for container_id in container_ids: + self.stop_container(container_id) + + def stop_container(self, container_id): + """Stops the container with given ID.""" + self.client.stop(container_id) + + def container_status(self, container_id): + """Checks if container with given ID running.""" + status = ProviderBase.STATUS_TERMINATED + try: + ret_val = str(self.client.inspect_container(container_id).get('State').get('Status')) + if ret_val.startswith("running"): + status = ProviderBase.STATUS_RUNNING + else: + status = ProviderBase.STATUS_STOPPED + except NotFound: + pass + return status + + def start_containers(self, container_ids): + """Starts each container in given list of container IDs.""" + for container_id in container_ids: + self.start_container(container_id) + + def start_container(self, container_id): + """ Start the container with given ID.""" + logging.info(DockerProxy.LOG_TAG + " Starting container " + container_id) + try: + self.client.start(container=container_id) + except (NotFound, NullResource) as e: + print (DockerProxy.LOG_TAG + "Something went wrong while starting container.", e) + return False + return True + + def execute_command(self, container_id, command): + """Executes given command as a shell command in the given container. Returns None is anything goes wrong.""" + run_command = "/bin/bash -c \"" + command + "\"" + # print("CONTAINER: {0} COMMAND: {1}".format(container_id, run_command)) + if self.start_container(container_id) is False: + raise ExecuteCommandException(DockerProxy.LOG_TAG + "Could not start container.") + #return None + try: + exec_instance = self.client.exec_create(container_id, run_command) + response = self.client.exec_start(exec_instance) + return [self.client.exec_inspect(exec_instance), response] + except (NotFound, APIError) as e: + raise ExecuteCommandException(DockerProxy.LOG_TAG + " Could not execute command." + str(e)) + + def build_image(self, dockerfile): + """ Build image from given Dockerfile object and return ID of the image created. """ + import uuid + logging.info("Building image...") + random_string = str(uuid.uuid4()) + image_tag = Constants.DOCKER_IMAGE_PREFIX + "{0}".format(random_string[:]) + last_line = "" + try: + for line in self.client.build(fileobj=dockerfile, rm=True, tag=image_tag): + print(DockerProxy._decorate(line)) + if "errorDetail" in line: + raise DockerProxy.ImageBuildException() + last_line = line + + # Return image ID. It's a hack around the fact that docker-py's build image command doesn't return an image + # id. + image_id = get_docker_image_id_from_string(str(last_line)) + logging.info("Image ID: {0}".format(image_id)) + return str(DockerImage(image_id, image_tag)) + + except (DockerProxy.ImageBuildException, IndexError) as e: + raise DockerProxy.ImageBuildException(e) + + @staticmethod + def _decorate(some_line): + return some_line[11:-4].rstrip() + + def image_exists(self, image_str): + """Checks if an image with the given ID/tag exists locally.""" + docker_image = DockerImage.from_string(image_str) + logging.debug("DockerProxy.image_exists(image={0})".format(image_str)) + logging.debug("DockerProxy.image_exists() docker_image.image_id={0}".format(docker_image.image_id)) + + if docker_image.image_id is Constants.DockerNonExistentTag \ + and docker_image.image_tag is Constants.DockerNonExistentTag: + logging.debug("DockerProxy.image_exists() raising 'raise InvalidDockerImageException'"); + raise InvalidDockerImageException("Neither image_id nor image_tag provided.") + + for image in self.client.images(): + some_id = image["Id"] + some_tags = image["RepoTags"] or [None] + logging.debug("DockerProxy.image_exists() some_id={0}".format(some_id)) + logging.debug("DockerProxy.image_exists() some_tags={0}".format(some_tags)) + if docker_image.image_id in \ + some_id[:(Constants.DOCKER_PY_IMAGE_ID_PREFIX_LENGTH + Constants.DOKCER_IMAGE_ID_LENGTH)]: + return True + if docker_image.image_tag in some_tags: + return True + return False + + def terminate_containers(self, container_ids): + """ Terminates containers with given container ids.""" + for container_id in container_ids: + try: + if self.container_status(container_id) == ProviderBase.STATUS_RUNNING: + self.stop_container(container_id) + self.terminate_container(container_id) + except NotFound: + pass + + def terminate_container(self, container_id): + self.client.remove_container(container_id) + + def get_mapped_ports(self, container_id): + container_ins = self.client.inspect_container(container_id) + mapped_ports = container_ins['HostConfig']['PortBindings'] + ret_val = [] + if mapped_ports is None: + logging.info("No mapped ports for {0}".format(container_id)) + return + for k, v in mapped_ports.iteritems(): + host_port = v[0]['HostPort'] + ret_val.append(host_port) + return ret_val + + def get_working_directory(self, container_id): + wd = self.client.inspect_container(container_id)["Config"]["WorkingDir"] + if len(wd)==0: + return '/' + return wd + + def get_home_directory(self, container_id): + try: + env_vars = self.client.inspect_container(container_id)["Config"]["Env"] + home = [i for i in env_vars if i.startswith("HOME")] + if len(home) == 0: + return self.get_working_directory(container_id) + logging.debug("DockerProxy.get_home_directory(container_id={0}) home={1}".format(container_id, home)) + return home[0].split("=")[1] + except IndexError as e: + logging.debug("DockerProxy.get_home_directory(container_id={0}): {1} ".format(container_id,e)) + return self.get_working_directory(container_id) + + def put_archive(self, container_id, tar_file_bytes, target_path_in_container): + """ Copies and unpacks a given tarfile in the container at specified location. + Location must exist in container.""" + if self.start_container(container_id) is False: + raise Exception("Could not start container.") + + # Prepend file path with /home/ubuntu/. TODO Should be refined. + if not target_path_in_container.startswith("/home/ubuntu/"): + import os + target_path_in_container = os.path.join("/home/ubuntu/", target_path_in_container) + + logging.info("target path in container: {0}".format(target_path_in_container)) + if not self.client.put_archive(container_id, target_path_in_container, tar_file_bytes): + logging.error(DockerProxy.LOG_TAG + "Failed to copy.") + + def get_container_ip_address(self, container_id): + """ Returns the IP Address of given container.""" + self.start_container(container_id) + ins = self.client.inspect_container(container_id) + ip_address = str(ins.get("NetworkSettings").get("IPAddress")) + while True: + ip_address = str(ins.get("NetworkSettings").get("IPAddress")) + if ip_address == "": + time.sleep(3) + if ip_address.startswith("1") is True: + break + return ip_address + + +def get_docker_image_id_from_string(some_string): + exp = r'[a-z0-9]{12}' + matches = re.findall(exp, some_string) + if len(matches) is 0: + return None + else: + return matches[0] + + +class InvalidDockerImageException(Exception): + def __init__(self, message): + super(message) + + +class DockerImage: + def __init__(self, image_id=None, image_tag=None): + if image_id in [None, Constants.DockerNonExistentTag] and image_tag in [None, Constants.DockerNonExistentTag]: + raise InvalidDockerImageException("Both image_id and image_tag cannot be None.") + + self.image_id = image_id if image_id is not None else Constants.DockerNonExistentTag + self.image_tag = image_tag if image_tag is not None else Constants.DockerNonExistentTag + + def __str__(self): + if self.image_id is Constants.DockerNonExistentTag and self.image_tag is Constants.DockerNonExistentTag: + raise InvalidDockerImageException( + "Cannot serialize DockerImage object because both image_id and image_tag are None.") + + return "{0}{1}{2}".format(self.image_id, Constants.DockerImageDelimiter, self.image_tag) + + @staticmethod + def from_string(serialized_docker_image): + temp = serialized_docker_image.split(Constants.DockerImageDelimiter) + + if len(temp) is 2: + return DockerImage(image_id=temp[0], image_tag=temp[1]) + + if len(temp) > 2 or len(temp) is 0: + raise InvalidDockerImageException("Unexpected format, cannot serialize to DockerImage.") + + temp = temp[0] + # Figure out if temp is image_id or image_name. + if DockerImage.looks_like_image_id(temp): + return DockerImage(image_id=temp) + else: + return DockerImage(image_tag=temp) + + @staticmethod + def looks_like_image_id(some_string): + possible_image_id = get_docker_image_id_from_string(some_string) + if some_string is possible_image_id: + return True + else: + return False diff --git a/MolnsLib/DockerSSH.py b/MolnsLib/DockerSSH.py new file mode 100644 index 0000000..d4f6f23 --- /dev/null +++ b/MolnsLib/DockerSSH.py @@ -0,0 +1,98 @@ +import StringIO +import tarfile +import os +import re +import paramiko + + +# "unused" arguments to some methods are added to maintain compatibility with existing upper level APIs. + +class DockerSSH(object): + def __init__(self, docker): + self.docker = docker + self.container_id = None + + def exec_command(self, command, verbose=None): + cmd = re.sub("\"", "\\\"", command) # Escape all occurrences of ". + ret_val, response = self.docker.execute_command(self.container_id, cmd) + if 'ExitCode' in ret_val and ret_val['ExitCode'] > 0: + raise paramiko.SSHException("DOckerSSH.exec_command({0}) exit_code={1}: {2}".format(command,ret_val['ExitCode'],response)) + return response + + def exec_multi_command(self, command, verbose=None): + return self.exec_command(command) + + def open_sftp(self): + return MockSFTP(self.docker, self.container_id) + + def connect(self, instance, endpoint, username=None, key_filename=None): + self.container_id = instance.provider_instance_identifier + + def connect_cluster_node(self, ip_address, port, username, keyfile): + raise DockerSSHException("This invocation means that an error has occurred.") + + def close(self): + self.container_id = None + + +class MockSFTPFileException(Exception): + pass + + +class DockerSSHException(Exception): + pass + + +class MockSFTP: + def __init__(self, docker, container_id): + self.docker = docker + self.container_id = container_id + + def file(self, filename, flag): + return MockSFTPFile(filename, flag, self.docker, self.container_id) + + def close(self): + pass + + +class MockSFTPFile: + def __init__(self, filename, flag, docker, container_id): + self.filename = filename # Absolute path of file. + self.file_contents = "" + self.docker = docker + self.container_id = container_id + if flag is 'w': + self.flag = flag + # else: + # print("WARNING Unrecognized file mode. Filename: {0}, Flag: {1}".format(filename, flag)) + + def write(self, write_this): + self.file_contents += write_this + + def close(self): + # Make tarfile. + import uuid + rand_str = str(uuid.uuid4()) + temp_tar = "transport-{0}.tar".format(rand_str[:8]) + try: + tar = tarfile.TarFile(temp_tar, "w") + string = StringIO.StringIO() + string.write(self.file_contents) + string.seek(0) + tar_file_info = tarfile.TarInfo(name=os.path.basename(self.filename)) + tar_file_info.size = len(string.buf) + tar.addfile(tarinfo=tar_file_info, fileobj=string) + tar.close() + + path_to_file = os.path.dirname(self.filename) + + if not path_to_file.startswith("/home"): + path_to_file = os.path.join(self.docker.get_home_directory(self.container_id), path_to_file) + + with open(temp_tar, mode='rb') as f: + tar_file_bytes = f.read() + + # print("path to file: {0}".format(path_to_file)) + self.docker.put_archive(self.container_id, tar_file_bytes, path_to_file) + finally: + os.remove(temp_tar) # Remove temporary tar file. diff --git a/MolnsLib/EC2Provider.py b/MolnsLib/EC2Provider.py index 3451570..42f706f 100644 --- a/MolnsLib/EC2Provider.py +++ b/MolnsLib/EC2Provider.py @@ -169,7 +169,7 @@ def _get_image_name(self): ########################################## class EC2Controller(EC2Base): - """ Provider handle for an open stack controller. """ + """ Provider handle for an EC2 controller. """ OBJ_NAME = 'EC2Controller' @@ -272,7 +272,7 @@ def get_instance_status(self, instance): ########################################## class EC2WorkerGroup(EC2Controller): - """ Provider handle for an open stack controller. """ + """ Provider handle for EC2 worker group. """ OBJ_NAME = 'EC2WorkerGroup' @@ -464,7 +464,7 @@ def start_ec2_instances(self, image_id=None, key_name=None, group_name=None, num raise ProviderException("Could not find image_id={0}".format(image_id)) if img.state != "available": if img.state != "pending": - raise ProviderException("Image {0} is not available, it has state is {1}.".format(image_id, img.state)) + raise ProviderException("Image {0} is not available, it's state is {1}.".format(image_id, img.state)) while img.state == "pending": print "Image {0} has state {1}, waiting {2} seconds for it to become available.".format(image_id, img.state, self.PENDING_IMAGE_WAITTIME) time.sleep(self.PENDING_IMAGE_WAITTIME) diff --git a/MolnsLib/OpenStackProvider.py b/MolnsLib/OpenStackProvider.py index 26c8779..5a35c98 100644 --- a/MolnsLib/OpenStackProvider.py +++ b/MolnsLib/OpenStackProvider.py @@ -43,6 +43,8 @@ class OpenStackProvider(OpenStackBase): {'q':'OpenStack project_name', 'default':os.environ.get('OS_TENANT_NAME'), 'ask':True}), ('neutron_nic', {'q':'Network ID (leave empty if only one possible network)', 'default':None, 'ask':True}), + ('region_name', + {'q':'Specify the region (leave empty if only one region)', 'default':os.environ.get('OS_REGION_NAME'), 'ask':True}), ('floating_ip_pool', {'q':'Name of Floating IP Pool (leave empty if only one possible pool)', 'default':None, 'ask':True}), ('nova_version', @@ -192,6 +194,8 @@ def _connect(self): creds['api_key'] = self.config['nova_password'] creds['auth_url'] = self.config['nova_auth_url'] creds['project_id'] = self.config['nova_project_id'] + if 'region_name' in self.config and self.config['region_name'] is not None: + creds['region_name'] = self.config['region_name'] self.nova = novaclient.Client(self.config['nova_version'], **creds) self.connected = True diff --git a/MolnsLib/Utils.py b/MolnsLib/Utils.py new file mode 100644 index 0000000..a6b75f2 --- /dev/null +++ b/MolnsLib/Utils.py @@ -0,0 +1,45 @@ +def get_user_name(): + try: + import os + return os.environ['SUDO_USER'] + except KeyError: + import getpass + return getpass.getuser() + + +def get_sudo_user_id(): + import pwd + u_name = get_user_name() + return pwd.getpwnam(u_name).pw_uid + + +def get_sudo_group_id(): + import grp + u_name = get_user_name() + return grp.getgrnam(u_name).gr_gid + + +def ensure_sudo_mode(some_function): + import os + import sys + if sys.platform.startswith("linux") and os.getuid() != 0: + pass + raise NoPrivilegedMode("\n\nOn Linux platforms, 'docker' is a priviledged command. " + "To use 'docker' functionality, please run in sudo mode or as root user.") + return some_function + + +class Log: + verbose = True + + def __init__(self): + pass + + @staticmethod + def write_log(message): + if Log.verbose: + print message + + +class NoPrivilegedMode(Exception): + pass diff --git a/MolnsLib/constants.py b/MolnsLib/constants.py new file mode 100644 index 0000000..7ac7ffb --- /dev/null +++ b/MolnsLib/constants.py @@ -0,0 +1,28 @@ +import os + + +class Constants: + DockerWorkingDirectoryPrefix = "working_dir_" + LOGGING_DIRECTORY = "~/MOLNS_LOG" + DOCKER_BASE_URL = "unix://var/run/docker.sock" + DOCKER_DEFAULT_IMAGE = "ubuntu:latest" + DOCKER_DEFAULT_PORT = '9000' + DOCKER_CONTAINER_RUNNING = "running" + DOCKER_CONTAINER_EXITED = "exited" + DOCKERFILE_NAME = "dockerfile_" + DOKCER_IMAGE_ID_LENGTH = 12 + DOCKER_IMAGE_PREFIX = "molns-docker-provider-" + DOCKER_PY_IMAGE_ID_PREFIX_LENGTH = 7 + DockerProvider = "Docker" + DockerNonExistentTag = "**NA**" + DockerImageDelimiter = "|||" + MolnsDockerContainerNamePrefix = "Molns-" + MolnsExecHelper = "molns_exec_helper.py" + DEFAULT_PRIVATE_NOTEBOOK_PORT = 8081 + DEFAULT_PUBLIC_NOTEBOOK_PORT = 443 + DEFAULT_PRIVATE_WEBSERVER_PORT = 8001 + DEFAULT_PUBLIC_WEBSERVER_PORT = 80 + DEFAULT_QSUB_SSH_PORT = 22 + ForbiddenVolumeNames = [".ssh", ".ipython", ".molns", "ipython", "localarea", "shared"] + ConfigDir = os.path.join(os.path.dirname(os.path.abspath(__file__)), ".molns/") + ClusterKeyFileNameOnController = "molns_cluster_secretkey" diff --git a/MolnsLib/installSoftware.py b/MolnsLib/installSoftware.py index 92cb2e5..fcd74e8 100644 --- a/MolnsLib/installSoftware.py +++ b/MolnsLib/installSoftware.py @@ -40,29 +40,40 @@ class InstallSW: "sudo pip install dill cloud pygments", "sudo pip install tornado Jinja2", - # Molnsutil + # Molnsutil develop [ "sudo pip install jsonschema jsonpointer", # EC2/S3 and OpenStack APIs "sudo pip install boto", "sudo apt-get -y install pandoc", - # This set of packages is needed for OpenStack, as molnsutil uses them for hybrid cloud deployment + # This set of packages is needed for OpenStack, as molns_util uses them for hybrid cloud deployment "sudo apt-get -y install libxml2-dev libxslt1-dev python-dev", "sudo pip install python-novaclient", - "sudo easy_install -U pip", "sudo pip install python-keystoneclient", "sudo pip install python-swiftclient", ], - [ - "sudo rm -rf /usr/local/molnsutil;sudo mkdir -p /usr/local/molnsutil;sudo chown ubuntu /usr/local/molnsutil", - "cd /usr/local/ && git clone https://github.com/Molns/molnsutil.git", - "cd /usr/local/molnsutil && sudo python setup.py install" + "sudo rm -rf /usr/local/molnsutil;sudo mkdir -p /usr/local/molnsutil;sudo chown ubuntu /usr/local/molnsutil", + #"cd /usr/local/ && git clone https://github.com/briandrawert/molnsutil.git && cd /usr/local/molnsutil && git checkout qsub_support" + "cd /usr/local/ && git clone https://github.com/briandrawert/molnsutil.git" + ], + + # Molns develop + [ + "sudo rm -rf /usr/local/molns;sudo mkdir -p /usr/local/molns;sudo chown ubuntu /usr/local/molns", + "cd /usr/local/ && git clone https://github.com/briandrawert/molns.git && cd /usr/local/molns" + ], + + # Cluster execution + [ + "sudo rm -rf /usr/local/cluster_execution;sudo mkdir -p /usr/local/cluster_execution;sudo chown ubuntu /usr/local/cluster_execution", + "cd /usr/local/ && git clone https://github.com/briandrawert/cluster_execution.git" ], # So the workers can mount the controller via SSHfs [ "sudo apt-get -y install sshfs", "sudo gpasswd -a ubuntu fuse", + "mkdir -p /home/ubuntu/.ssh/", "echo 'ServerAliveInterval 60' >> /home/ubuntu/.ssh/config", ], @@ -83,19 +94,20 @@ class InstallSW: "cd /usr/local/ && git clone https://github.com/StochSS/stochkit.git StochKit", "cd /usr/local/StochKit && ./install.sh", - "sudo rm -rf /usr/local/ode-1.0.2;sudo mkdir -p /usr/local/ode-1.0.2/;sudo chown ubuntu /usr/local/ode-1.0.2", - "wget https://github.com/StochSS/stochss/blob/master/ode-1.0.2.tgz?raw=true -q -O /tmp/ode.tgz", - "cd /usr/local/ && tar -xzf /tmp/ode.tgz", + #"wget https://github.com/StochSS/stochss/blob/master/ode-1.0.4.tgz?raw=true -q -O /tmp/ode.tgz", + "wget https://github.com/StochSS/StochKit_ode/archive/master.tar.gz?raw=true -q -O /tmp/ode.tgz", + "cd /tmp && tar -xzf /tmp/ode.tgz", + "sudo mv /tmp/StochKit_ode-master /usr/local/ode", "rm /tmp/ode.tgz", - "cd /usr/local/ode-1.0.2/cvodes/ && tar -xzf \"cvodes-2.7.0.tar.gz\"", - "cd /usr/local/ode-1.0.2/cvodes/cvodes-2.7.0/ && ./configure --prefix=\"/usr/local/ode-1.0.2/cvodes/cvodes-2.7.0/cvodes\" 1>stdout.log 2>stderr.log", - "cd /usr/local/ode-1.0.2/cvodes/cvodes-2.7.0/ && make 1>stdout.log 2>stderr.log", - "cd /usr/local/ode-1.0.2/cvodes/cvodes-2.7.0/ && make install 1>stdout.log 2>stderr.log", - "cd /usr/local/ode-1.0.2/ && STOCHKIT_HOME=/usr/local/StochKit/ STOCHKIT_ODE=/usr/local/ode-1.0.2/ make 1>stdout.log 2>stderr.log", + "cd /usr/local/ode/cvodes/ && tar -xzf \"cvodes-2.7.0.tar.gz\"", + "cd /usr/local/ode/cvodes/cvodes-2.7.0/ && ./configure --prefix=\"/usr/local/ode/cvodes/cvodes-2.7.0/cvodes\" 1>stdout.log 2>stderr.log", + "cd /usr/local/ode/cvodes/cvodes-2.7.0/ && make 1>stdout.log 2>stderr.log", + "cd /usr/local/ode/cvodes/cvodes-2.7.0/ && make install 1>stdout.log 2>stderr.log", + "cd /usr/local/ode/ && STOCHKIT_HOME=/usr/local/StochKit/ STOCHKIT_ODE=/usr/local/ode/ make 1>stdout.log 2>stderr.log", "sudo rm -rf /usr/local/gillespy;sudo mkdir -p /usr/local/gillespy;sudo chown ubuntu /usr/local/gillespy", - "cd /usr/local/ && git clone https://github.com/MOLNs/gillespy.git", - "cd /usr/local/gillespy && sudo STOCHKIT_HOME=/usr/local/StochKit/ STOCHKIT_ODE_HOME=/usr/local/ode-1.0.2/ python setup.py install" + "cd /usr/local/ && git clone https://github.com/briandrawert/gillespy.git", + "cd /usr/local/gillespy && sudo STOCHKIT_HOME=/usr/local/StochKit/ STOCHKIT_ODE_HOME=/usr/local/ode/ python setup.py install" ], @@ -106,18 +118,20 @@ class InstallSW: # Gmsh for Finite Element meshes "sudo apt-get install -y gmsh", ], - + + ["sudo apt-get install docker", "sudo pip install docker", "sudo pip install sqlalchemy", + "sudo pip install boto", "sudo pip install python-novaclient", "sudo pip install paramiko"], # pyurdme - [ "sudo rm -rf /usr/local/pyurdme;sudo mkdir -p /usr/local/pyurdme;sudo chown ubuntu /usr/local/pyurdme", + [ "sudo rm -rf /usr/local/pyurdme && sudo mkdir -p /usr/local/pyurdme && sudo chown ubuntu /usr/local/pyurdme", "cd /usr/local/ && git clone https://github.com/MOLNs/pyurdme.git", #"cd /usr/local/pyurdme && git checkout develop", # for development only - "cp /usr/local/pyurdme/pyurdme/data/three.js_templates/js/* .ipython/profile_default/static/custom/", + "cp /usr/local/pyurdme/pyurdme/data/three.js_templates/js/* $HOME/.ipython/profile_default/static/custom/", "source /usr/local/pyurdme/pyurdme_init && python -c 'import pyurdme'", ], # example notebooks - [ "rm -rf MOLNS_notebooks;git clone https://github.com/Molns/MOLNS_notebooks.git", - "cp MOLNS_notebooks/*.ipynb .;rm -rf MOLNS_notebooks;", + [ "rm -rf MOLNS_notebooks && git clone https://github.com/Molns/MOLNS_notebooks.git", + "cp MOLNS_notebooks/*.ipynb . && rm -rf MOLNS_notebooks", "ls *.ipynb" ], @@ -125,10 +139,10 @@ class InstallSW: "sudo apt-get -y remove python-scipy", "sudo pip install scipy", - "sudo pip install jsonschema jsonpointer", #redo this install to be sure it has not been removed. - + "sudo pip install jsonschema jsonpointer", # redo this install to be sure it has not been removed. + "sudo pip install paramiko", - "sync", # This is critial for some infrastructures. + "sync", # This is critical for some infrastructures. ] # How many time do we try to install each package. @@ -268,7 +282,6 @@ def exec_command_list_switch(self, command_list): raise SystemExit("CRITICAL ERROR: could not complete command '{0}'. Exiting.".format(command)) print "Installation complete in {0}s".format(time.time() - tic) - def log_exec(self, msg): if self.log_file is not None: self.log_file.write(msg) @@ -332,6 +345,11 @@ def exec_multi_command(self, command, next_command): print "FAILED......\t{0}:{1}\t{2}\t{3}".format(self.hostname, self.ssh_endpoint, command, e) raise InstallSWException() + @staticmethod + def get_command_list(): + """Returns the whole list of dependency installation commands. """ + return InstallSW.command_list + if __name__ == "__main__": print "{0}".format(InstallSW.command_list) print "len={0}".format(len(InstallSW.command_list)) @@ -342,4 +360,3 @@ def exec_multi_command(self, command, next_command): else: cnt += 1 print "cnt={0}".format(cnt) - diff --git a/MolnsLib/molns_datastore.py b/MolnsLib/molns_datastore.py index 00093c5..dee8428 100644 --- a/MolnsLib/molns_datastore.py +++ b/MolnsLib/molns_datastore.py @@ -1,15 +1,20 @@ #!/usr/bin/env python from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base + Base = declarative_base() from sqlalchemy import Column, Integer, String, Sequence from sqlalchemy.orm import sessionmaker import os import logging import sys +import uuid +import datetime + ############################################################# -#VALID_PROVIDER_TYPES = ['OpenStack', 'EC2', 'Rackspace'] -VALID_PROVIDER_TYPES = ['OpenStack', 'EC2', 'Eucalyptus'] +VALID_PROVIDER_TYPES = ['OpenStack', 'EC2', 'Eucalyptus', 'Docker'] + + ############################################################# #### SCHEMA ################################################# ############################################################# @@ -18,12 +23,13 @@ class Provider(Base): """ DB object for an infrastructure service provider. """ __tablename__ = 'providers' id = Column(Integer, Sequence('provider_id_seq'), primary_key=True) - type = Column(String) #'EC2', 'Azure', 'OpenStack' + type = Column(String) # 'EC2', 'Azure', 'OpenStack' name = Column(String) def __str__(self): return "Provider({0}): name={1} type={2}".format(self.id, self.name, self.type) + class ProviderData(Base): """ DB object to store the key/value pairs for a service provider. """ __tablename__ = 'provider_data' @@ -33,19 +39,22 @@ class ProviderData(Base): value = Column(String) def __str__(self): - return "ProviderData({0}): provider_id={1} name={2} value={3}".format(self.id, self.parent_id, self.name, self.value) + return "ProviderData({0}): provider_id={1} name={2} value={3}".format(self.id, self.parent_id, self.name, + self.value) + class Controller(Base): """ DB object for a MOLNS controller. """ __tablename__ = 'controllers' id = Column(Integer, Sequence('controller_id_seq'), primary_key=True) - type = Column(String) #'EC2', 'Azure', 'OpenStack' + type = Column(String) # 'EC2', 'Azure', 'OpenStack' name = Column(String) provider_id = Column(Integer) - + def __str__(self): return "Controller({0}): name={1} provider_id={2}".format(self.id, self.name, self.provider_id) + class ControllerData(Base): """ DB object to store the key/value pairs for a controller. """ __tablename__ = 'controller_data' @@ -55,19 +64,24 @@ class ControllerData(Base): value = Column(String) def __str__(self): - return "ControllerData({0}): controller_id={1} name={2} value={3}".format(self.id, self.parent_id, self.name, self.value) + return "ControllerData({0}): controller_id={1} name={2} value={3}".format(self.id, self.parent_id, self.name, + self.value) + class WorkerGroup(Base): """ DB object for a MOLNS WorkerGroup. """ __tablename__ = 'worker_groups' id = Column(Integer, Sequence('worker_group_id_seq'), primary_key=True) - type = Column(String) #'EC2', 'Azure', 'OpenStack' + type = Column(String) # 'EC2', 'Azure', 'OpenStack' name = Column(String) provider_id = Column(Integer) controller_id = Column(Integer) - + def __str__(self): - return "WorkerGroup({0}): name={1} provider_id={2} controller_id={3}".format(self.id, self.name, self.provider_id, self.controller_id) + return "WorkerGroup({0}): name={1} provider_id={2} controller_id={3}".format(self.id, self.name, + self.provider_id, + self.controller_id) + class WorkerGroupData(Base): """ DB object to store the key/value pairs for a worker groups. """ @@ -78,36 +92,54 @@ class WorkerGroupData(Base): value = Column(String) def __str__(self): - return "WorkerGrouprData({0}): worker_group_id={1} name={2} value={3}".format(self.id, self.parent_id, self.name, self.value) + return "WorkerGrouprData({0}): worker_group_id={1} name={2} value={3}".format(self.id, self.parent_id, + self.name, self.value) class Instance(Base): """ DB object for a MOLNS VM instance. """ __tablename__ = 'instances' id = Column(Integer, Sequence('instance_id_seq'), primary_key=True) - type = Column(String) #'head-node' or 'worker' + type = Column(String) # 'head-node' or 'worker' controller_id = Column(Integer) worker_group_id = Column(Integer) provider_id = Column(Integer) ip_address = Column(String) provider_instance_identifier = Column(String) - + + def __str__(self): + return "Instance({0}): provider_instance_identifier={1} provider_id={2} controller_id={3} worker_group_id={4}".format( + self.id, self.provider_instance_identifier, self.provider_id, self.controller_id, self.worker_group_id) + + +class ExecJob(Base): + """ DB object for MOLNS exec jobs. """ + __tablename__ = 'jobs' + id = Column(Integer, Sequence('instance_id_seq'), primary_key=True) + controller_id = Column(Integer) + exec_str = Column(String) + jobID = Column(String) + date = Column(String) + def __str__(self): - return "Instance({0}): provider_instance_identifier={1} provider_id={2} controller_id={3} worker_group_id={4}".format(self.id, self.provider_instance_identifier, self.provider_id, self.controller_id, self.worker_group_id) + return "ExecJob({0}): jobID={1} controller_id={2}, exec_str={3}".format(self.id, self.jobID, self.controller_id, + self.exec_str) class DatastoreException(Exception): pass + ############################################################# HANDLE_MAPPING = { - 'Provider':(Provider,ProviderData), - 'Controller':(Controller,ControllerData), - 'WorkerGroup':(WorkerGroup,WorkerGroupData), + 'Provider': (Provider, ProviderData), + 'Controller': (Controller, ControllerData), + 'WorkerGroup': (WorkerGroup, WorkerGroupData), } -#from OpenStackProvider import OpenStackProvider, OpenStackController, OpenStackWorkerGroup -#from EC2Provider import EC2Provider, EC2Controller, EC2WorkerGroup + +# from OpenStackProvider import OpenStackProvider, OpenStackController, OpenStackWorkerGroup +# from EC2Provider import EC2Provider, EC2Controller, EC2WorkerGroup def dynamic_module_import(name): mod = __import__(name) @@ -116,26 +148,30 @@ def dynamic_module_import(name): mod = getattr(mod, comp) return mod + def get_provider_handle(kind, ptype): """ Return object of 'kind' (Provider, Controller or WokerGroup) for provider of type 'ptype'. Load the module if necessary. """ - #logging.debug("get_provider_handle(kind={0}, ptype={1})".format(kind, ptype)) + # logging.debug("get_provider_handle(kind={0}, ptype={1})".format(kind, ptype)) valid_handles = ['Provider', 'Controller', 'WorkerGroup'] if kind not in valid_handles: raise DatastoreException("Unknown kind {0}".format(kind)) if ptype not in VALID_PROVIDER_TYPES: - raise DatastoreException("Unknown {1} type {0}".format(ptype, kind)) + # raise DatastoreException("Unknown {1} type {0}".format(ptype, kind)) + return None cls_name = "{0}{1}".format(ptype, kind) pkg_name = "MolnsLib.{0}Provider".format(ptype) if pkg_name not in sys.modules: logging.debug("loading {0} from {1}".format(cls_name, pkg_name)) + # pkg = dynamic_module_import(pkg_name) pkg = dynamic_module_import(pkg_name) try: - #logging.debug("dir(pkg={0})={1}".format(pkg, dir(pkg))) + # logging.debug("dir(pkg={0})={1}".format(pkg, dir(pkg))) mod = getattr(pkg, cls_name) except AttributeError: raise DatastoreException("module {0} does not contain {1}".format(pkg_name, cls_name)) return mod + ############################################################# @@ -160,14 +196,13 @@ def __init__(self, db_file=None, config_dir=None): os.makedirs(self.MOLNS_CONFIG_DIR) self.engine = create_engine('sqlite:///{0}/{1}'.format(self.MOLNS_CONFIG_DIR, self.MOLNS_DATASTORE)) - Base.metadata.create_all(self.engine) # Create all the tables + Base.metadata.create_all(self.engine) # Create all the tables Session = sessionmaker(bind=self.engine) self.session = Session() def __del__(self): """ Destructor. """ self.session.commit() - def list_objects(self, kind): """ Get all the currently configured objects of kind (Provider, Controller, WorkerGroup). @@ -197,16 +232,16 @@ def create_object(self, ptype, name, kind, **kwargs): raise DatastoreException("{1} {0} already exists with type".format(name, kind, p.type)) p_handle = get_provider_handle(kind, ptype) - #logging.debug("create_object() {1}(name={0})".format(name, p_handle)) + # logging.debug("create_object() {1}(name={0})".format(name, p_handle)) p = p_handle(name=name, config_dir=self.config_dir) if 'provider_id' in kwargs: p.provider_id = kwargs['provider_id'] - #logging.debug("create_object() provider_id={0}".format(kwargs['provider_id'])) + # logging.debug("create_object() provider_id={0}".format(kwargs['provider_id'])) if 'controller_id' in kwargs: p.controller_id = kwargs['controller_id'] - #logging.debug("create_object() controller_id={0}".format(kwargs['controller_id'])) + # logging.debug("create_object() controller_id={0}".format(kwargs['controller_id'])) return p - + def delete_object(self, name, kind): """ Delete a objects of kind (Provider, Controller, WorkerGroup). @@ -225,7 +260,7 @@ def delete_object(self, name, kind): logging.debug("Deleting entry: {0}".format(p)) self.session.delete(p) self.session.commit() - + def get_object(self, name, kind): """ Get a config object of of kind (Provider, Controller, WorkerGroup). @@ -271,19 +306,21 @@ def _get_object_data(self, d_handle, kind, ptype, p): data[d.name] = d.value p_handle = get_provider_handle(kind, ptype) - #logging.debug("{2}(name={0}, data={1})".format(name,data,p_handle)) + # logging.debug("{2}(name={0}, data={1})".format(name,data,p_handle)) + if p_handle is None: + return None ret = p_handle(name=p.name, config=data, config_dir=self.config_dir) ret.id = p.id ret.datastore = self if 'provider_id' in p.__dict__: - #logging.debug("_get_object_data(): provider_id={0}".format(p.provider_id)) + # logging.debug("_get_object_data(): provider_id={0}".format(p.provider_id)) try: ret.provider = self.get_object_by_id(id=p.provider_id, kind='Provider') except DatastoreException as e: logging.debug('Error: provider {0} not found'.format(p.provider_id)) ret.provider = None if 'controller_id' in p.__dict__: - #logging.debug("_get_object_data(): controller_id={0}".format(p.controller_id)) + # logging.debug("_get_object_data(): controller_id={0}".format(p.controller_id)) try: ret.controller = self.get_object_by_id(id=p.controller_id, kind='Controller') except DatastoreException as e: @@ -291,8 +328,6 @@ def _get_object_data(self, d_handle, kind, ptype, p): ret.controller = None return ret - - def save_object(self, config, kind): """ Save the configuration of a provider object. @@ -308,15 +343,16 @@ def save_object(self, config, kind): # Add new entry. p = handle(name=config.name, type=config.type) self.session.add(p) - #logging.debug("Created new DB entry: {0}".format(p)) - #print "save_object() config.__dict__={0}".format(config.__dict__) + # logging.debug("Created new DB entry: {0}".format(p)) + # print "save_object() config.__dict__={0}".format(config.__dict__) if 'provider_id' in config.__dict__: - logging.debug("provider_id is in config.__dict__ {0} {1}".format(config.provider_id, type(config.provider_id))) + logging.debug( + "provider_id is in config.__dict__ {0} {1}".format(config.provider_id, type(config.provider_id))) p.provider_id = config.provider_id if 'controller_id' in config.__dict__: logging.debug("controller_id is in config.__dict__ {0}".format(config.controller_id)) p.controller_id = config.controller_id - #logging.debug("Updated DB entry: {0}".format(p)) + # logging.debug("Updated DB entry: {0}".format(p)) self.session.commit() data = config.config.copy() @@ -326,33 +362,34 @@ def save_object(self, config, kind): d.value = data[d.name] del data[d.name] else: - #logging.debug("Deleting entry: {0}".format(d)) + # logging.debug("Deleting entry: {0}".format(d)) self.session.delete(d) for d in data.keys(): dd = d_handle(parent_id=p.id, name=d, value=data[d]) - #logging.debug("Created new entry: {0}".format(dd)) + # logging.debug("Created new entry: {0}".format(dd)) self.session.add(dd) self.session.commit() - def get_instance_by_id(self, id): """ Create or get the value for an instance. """ return self.session.query(Instance).filter_by(id=id).first() - - def get_instance(self, provider_instance_identifier, ip_address, provider_id=None, controller_id=None, worker_group_id=None): + + def get_instance(self, provider_instance_identifier, ip_address, provider_id=None, controller_id=None, + worker_group_id=None, provider_type=None): """ Create or get the value for an instance. """ p = self.session.query(Instance).filter_by(provider_instance_identifier=provider_instance_identifier).first() if p is None: - p = Instance(provider_instance_identifier=provider_instance_identifier, ip_address=ip_address, provider_id=provider_id, controller_id=controller_id, worker_group_id=worker_group_id) + p = Instance(provider_instance_identifier=provider_instance_identifier, ip_address=ip_address, + provider_id=provider_id, controller_id=controller_id, worker_group_id=worker_group_id) self.session.add(p) self.session.commit() - #logging.debug("Creating instance: {0}".format(p)) + # logging.debug("Creating instance: {0}".format(p)) else: - #logging.debug("Fetching instance: {0}".format(p)) + # logging.debug("Fetching instance: {0}".format(p)) pass return p - def get_controller_instances(self,controller_id=None): + def get_controller_instances(self, controller_id=None): logging.debug("get_controller_instances by controller_id={0}".format(controller_id)) ret = self.session.query(Instance).filter_by(controller_id=controller_id, worker_group_id=None).all() if ret is None: @@ -360,24 +397,24 @@ def get_controller_instances(self,controller_id=None): else: return ret - def get_worker_instances(self,controller_id=None): - #logging.debug("get_worker_instances by controller_id={0}".format(controller_id)) - ret = self.session.query(Instance).filter_by(controller_id=controller_id).filter(Instance.worker_group_id!=None).all() + def get_worker_instances(self, controller_id=None): + # logging.debug("get_worker_instances by controller_id={0}".format(controller_id)) + ret = self.session.query(Instance).filter_by(controller_id=controller_id).filter( + Instance.worker_group_id != None).all() if ret is None: return [] else: return ret - def get_all_instances(self, provider_id=None, controller_id=None, worker_group_id=None): if provider_id is not None: - #logging.debug("get_all_instances by provider_id={0}".format(provider_id)) + # logging.debug("get_all_instances by provider_id={0}".format(provider_id)) ret = self.session.query(Instance).filter_by(provider_id=provider_id).all() elif controller_id is not None: - #logging.debug("get_all_instances by controller_id={0}".format(controller_id)) + # logging.debug("get_all_instances by controller_id={0}".format(controller_id)) ret = self.session.query(Instance).filter_by(controller_id=controller_id).all() elif worker_group_id is not None: - #logging.debug("get_all_instances by worker_group_id={0}".format(worker_group_id)) + # logging.debug("get_all_instances by worker_group_id={0}".format(worker_group_id)) ret = self.session.query(Instance).filter_by(worker_group_id=worker_group_id).all() else: ret = self.session.query(Instance).all() @@ -388,9 +425,43 @@ def get_all_instances(self, provider_id=None, controller_id=None, worker_group_i def delete_instance(self, instance): """ Delete an instance. """ - #logging.debug("Deleting instance: {0}".format(instance)) + # logging.debug("Deleting instance: {0}".format(instance)) self.session.delete(instance) self.session.commit() + def get_all_jobs(self, controller_id=None): + if controller_id is not None: + # logging.debug("get_all_instances by controller_id={0}".format(controller_id)) + ret = self.session.query(ExecJob).filter_by(controller_id=controller_id).all() + else: + ret = self.session.query(ExecJob).all() + if ret is None: + return [] + else: + return ret + def get_job(self, jobID): + """ Get the objet for a job. """ + # logging.debug("get_job(jobID={0})".format(jobID)) + try: + id = int(jobID) + j = self.session.query(ExecJob).filter_by(id=id).first() + except Exception: + j = self.session.query(ExecJob).filter_by(jobID=jobID).first() + if j is None: + raise DatastoreException("Job {0} not found".format(jobID)) + return j + + def start_job(self, controller_id=None, exec_str=None): + """ Create the objet for a job. """ + date_str = str(datetime.datetime.now()) + jobID = str(uuid.uuid4()) + j = ExecJob(jobID=jobID, controller_id=controller_id, exec_str=exec_str, date=date_str) + self.session.add(j) + self.session.commit() + logging.debug("Creating ExecJob: {0}".format(j)) + return j + def delete_job(self, job): + self.session.delete(job) + self.session.commit() diff --git a/MolnsLib/molns_exec_helper.py b/MolnsLib/molns_exec_helper.py new file mode 100644 index 0000000..6f3a5b2 --- /dev/null +++ b/MolnsLib/molns_exec_helper.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python +import os +import subprocess +import shlex +import json +import traceback +import sys + + +def run_job(exec_str, stdout_file): + with open(stdout_file, 'w') as stdout_fh: + try: + p = subprocess.Popen( + shlex.split(exec_str), + stdout=stdout_fh, + stderr=stdout_fh, + ) + pid = p.pid + # create pid file + pid_file = ".molns/pid" + return_code_file = ".molns/return_value" + with open(pid_file, 'w+') as fd: + fd.write(str(pid)) + # Wait on program execution... + return_code = p.wait() + print "Return code:", return_code + if return_code_file is not None: + with open(return_code_file, 'w+') as fd: + fd.write(str(return_code)) + except Exception as e: + stdout_fh.write('Error: {}'.format(str(e))) + stdout_fh.write(traceback.format_exc()) + raise sys.exc_info()[1], None, sys.exc_info()[2] + + +if __name__ == "__main__": + with open(".molns/cmd",'r') as fd: + exec_str = fd.read() + print "exec_str", exec_str + run_job(exec_str, ".molns/stdout") diff --git a/MolnsLib/molns_landing_page.py b/MolnsLib/molns_landing_page.py new file mode 100644 index 0000000..0238f26 --- /dev/null +++ b/MolnsLib/molns_landing_page.py @@ -0,0 +1,93 @@ +from pipes import quote + +class MolnsLandingPage: + def __init__(self, port): + self.molns_landing_page = quote(""" + + +
+ +Please note that due to the self-signed certification, you will see a warning before you can view the page. Please accept the warning and proceed.
++
+Click here to Register MOLNs
+
+ MOLNs is open-source software, developed as part of the StochSS project, and we are relying on continued funding for sustained development. Please consider registering to show your support.
+
Advanced analysis with Python scientific libraries
+Large scale computational experiments made easy
+
+