diff --git a/.gitignore b/.gitignore index bf0e1ac..c7091ed 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,12 @@ *.pyc .molns/ molns_install.log +.ec2_creds_molns +.idea/ +*.tar.gz +*.tar +notes +qsubscript +/dockerfile_* +\#qsubscript\# +docker_test.py diff --git a/MolnsLib/DockerProvider.py b/MolnsLib/DockerProvider.py new file mode 100644 index 0000000..cc04fe1 --- /dev/null +++ b/MolnsLib/DockerProvider.py @@ -0,0 +1,243 @@ +import logging +import os +import tempfile +import time +import DockerProxy +import constants +import installSoftware +from collections import OrderedDict +from DockerSSH import DockerSSH +from constants import Constants +from molns_provider import ProviderBase, ProviderException + + +def docker_provider_default_key_name(): + user = os.environ.get('USER') or 'USER' + return "{0}_molns_docker_sshkey_{1}".format(user, hex(int(time.time())).replace('0x', '')) + + +class DockerBase(ProviderBase): + """ Base class for Docker. """ + + SSH_KEY_EXTENSION = ".pem" + PROVIDER_TYPE = 'Docker' + + def __init__(self, name, config=None, config_dir=None, **kwargs): + ProviderBase.__init__(self, name, config, config_dir, **kwargs) + self.docker = DockerProxy.DockerProxy() + self.ssh = DockerSSH(self.docker) + + def _get_container_status(self, container_id): + self.docker.container_status(container_id) + + def start_instance(self, num=1): + """ Start given number of (or 1) containers. """ + started_containers = [] + for i in range(num): + container_id = self.docker.create_container(self.provider.config["molns_image_name"], name=self.name, + port_bindings={ + Constants.DEFAULT_PUBLIC_WEBSERVER_PORT: + ('127.0.0.1', self.config['web_server_port']), + Constants.DEFAULT_PRIVATE_NOTEBOOK_PORT: + ('127.0.0.1', self.config['notebook_port'])}, + working_directory=self.config["working_directory"]) + stored_container = self.datastore.get_instance(provider_instance_identifier=container_id, + ip_address=self.docker.get_container_ip_address(container_id) + , provider_id=self.provider.id, controller_id=self.id, + provider_type=constants.Constants.DockerProvider) + started_containers.append(stored_container) + if num == 1: + return started_containers[0] + return started_containers + + def resume_instance(self, instances): + instance_ids = [] + if isinstance(instances, list): + for instance in instances: + instance_ids.append(instance.provider_instance_identifier) + else: + instance_ids.append(instances.provider_instance_identifier) + self.docker.start_containers(instance_ids) + + def stop_instance(self, instances): + instance_ids = [] + if isinstance(instances, list): + for instance in instances: + instance_ids.append(instance.provider_instance_identifier) + else: + instance_ids.append(instances.provider_instance_identifier) + self.docker.stop_containers(instance_ids) + + def terminate_instance(self, instances): + instance_ids = [] + if isinstance(instances, list): + for instance in instances: + instance_ids.append(instance.provider_instance_identifier) + else: + instance_ids.append(instances.provider_instance_identifier) + self.docker.terminate_containers(instance_ids) + + def exec_command(self, container_id, command): + self.docker.execute_command(container_id, command) + + +class DockerProvider(DockerBase): + """ Provider handle for local Docker based service. """ + + OBJ_NAME = 'DockerProvider' + + CONFIG_VARS = OrderedDict([ + ('ubuntu_image_name', + {'q': 'Base Ubuntu image to use', 'default': constants.Constants.DOCKER_DEFAULT_IMAGE, + 'ask': True}), + ('molns_image_name', + {'q': 'Local MOLNs image (Docker image ID or image tag) to use ', 'default': None, 'ask': True}), + ('key_name', + {'q': 'Docker Key Pair name', 'default': "docker-default", 'ask': False}), # Unused. + ('group_name', + {'q': 'Docker Security Group name', 'default': 'molns', 'ask': False}), # Unused. + ('login_username', + {'default': 'ubuntu', 'ask': False}), # Unused. + ('provider_type', + {'default': constants.Constants.DockerProvider, 'ask': False}) + ]) + + def get_config_credentials(self): + return None + + @staticmethod + def __get_new_dockerfile_name(): + import uuid + filename = constants.Constants.DOCKERFILE_NAME + str(uuid.uuid4()) + return filename + + def check_ssh_key(self): + """ Returns true. (Implementation does not use SSH.) """ + return True + + def create_ssh_key(self): + """ Returns true. """ + ssh_key_dir = os.path.join(self.config_dir, self.name) + with open(ssh_key_dir, 'w') as fp: + fp.write("This is a dummy key.") + os.chmod(ssh_key_dir, 0o600) + + def check_security_group(self): + """ Returns true. (Implementation does not use SSH.) """ + return True + + def create_seurity_group(self): + """ Returns true. (Implementation does not use SSH.) """ + return True + + def create_molns_image(self): + """ Create a molns image, save it on localhost and return DockerImage ID of created image. """ + file_to_remove = None + try: + dockerfile, file_to_remove = self._create_dockerfile(installSoftware.InstallSW.get_command_list()) + image_id = self.docker.build_image(dockerfile) + return image_id + except Exception as e: + logging.exception(e) + raise ProviderException("Failed to create molns image: {0}".format(e)) + finally: + if file_to_remove is not None: + os.remove(file_to_remove) + + def check_molns_image(self): + """ Check if the molns image exists. """ + if 'molns_image_name' in self.config and self.config['molns_image_name'] is not None \ + and self.config['molns_image_name'] != '': + return self.docker.image_exists(self.config['molns_image_name']) + return False + + def _create_dockerfile(self, commands): + """ Create Dockerfile from given commands. """ + import Utils + + user_id = Utils.get_sudo_user_id() + dockerfile = '''FROM ubuntu:14.04\nRUN apt-get update\n\n# Add user ubuntu.\nRUN useradd -u {0} -ms /bin/bash ubuntu\n + # Set up base environment.\nRUN apt-get install -yy \ \n software-properties-common \ \n + python-software-properties \ \n wget \ \n curl \ \n git \ \n ipython \ \n sudo \ \n + screen \ \n iptables \nRUN echo "ubuntu ALL=(ALL) NOPASSWD: ALL" >> /etc/sudoers + \nWORKDIR /home/ubuntu\n\nUSER ubuntu\nENV HOME /home/ubuntu'''.format(user_id) + + flag = False + + for entry in commands: + if isinstance(entry, list): + dockerfile += '''\n\nRUN ''' + first = True + flag = False + for sub_entry in entry: + if first is True: + dockerfile += self._preprocess(sub_entry) + first = False + else: + dockerfile += ''' && \ \n ''' + self._preprocess(sub_entry) + else: + if flag is False: + dockerfile += '''\n\nRUN ''' + flag = True + dockerfile += self._preprocess(entry) + else: + dockerfile += ''' && \ \n ''' + self._preprocess(entry) + + dockerfile += '''\n\n\n''' + + dockerfile_file = DockerProvider.__get_new_dockerfile_name() + with open(dockerfile_file, 'w') as Dockerfile: + Dockerfile.write(dockerfile) + named_dockerfile = tempfile.NamedTemporaryFile() + named_dockerfile.write(dockerfile) + named_dockerfile.seek(0) + + return named_dockerfile, dockerfile_file + + @staticmethod + def _preprocess(command): + """ Prepends "shell only" commands with '/bin/bash -c'. """ + for shell_command in DockerProxy.DockerProxy.shell_commands: + if shell_command in command: + replace_string = "/bin/bash -c \"" + shell_command + command = command.replace(shell_command, replace_string) + command += "\"" + return command + + +def get_default_working_directory(config=None): + if config is None: + raise Exception("Config should not be None.") + return os.path.realpath(os.path.join(config.config_dir, "docker_controller_working_dirs", config.name)) + + +class DockerController(DockerBase): + """ Provider handle for a Docker controller. """ + + OBJ_NAME = 'DockerController' + CONFIG_VARS = OrderedDict([ + ('web_server_port', + {'q': 'Port to use for web server', 'default': "8080", + 'ask': True}), + ('notebook_port', + {'q': 'Port to use for jupyter notebook', 'default': "8081", + 'ask': True}), + ('working_directory', + {'q': 'Working directory for this controller', 'default': get_default_working_directory, 'ask': True}), + ('ssh_key_file', + {'q': 'SSH key to a qsub and docker enabled cluster', 'default': "None", 'ask': True}) + ]) + + def get_instance_status(self, instance): + return self.docker.container_status(instance.provider_instance_identifier) + + +class DockerWorkerGroup(DockerController): + """ Provider handle for Docker worker group. """ + + OBJ_NAME = 'DockerWorkerGroup' + + CONFIG_VARS = OrderedDict([ + ('num_vms', + {'q': 'Number of containers in group', 'default': '1', 'ask': True}), + ]) diff --git a/MolnsLib/DockerProxy.py b/MolnsLib/DockerProxy.py new file mode 100644 index 0000000..5d0d6b6 --- /dev/null +++ b/MolnsLib/DockerProxy.py @@ -0,0 +1,314 @@ +import logging +import re +import time +import constants +from molns_provider import ProviderBase +from constants import Constants +from docker import APIClient as Client +from docker.errors import NotFound, NullResource, APIError + + +class InvalidVolumeName(Exception): + pass + + +class DockerProxy: + + """ A wrapper over docker-py and some utility methods and classes. """ + + LOG_TAG = "Docker " + + shell_commands = ["source"] + + class ImageBuildException(Exception): + def __init__(self, message=None): + super("Something went wrong while building docker container image.\n{0}".format(message)) + + def __init__(self): + self.client = Client(base_url=Constants.DOCKER_BASE_URL) + self.build_count = 0 + logging.basicConfig(level=logging.DEBUG) + + @staticmethod + def get_container_volume_from_working_dir(working_directory): + import os + return os.path.join("/home/ubuntu/", os.path.basename(working_directory)) + + def create_container(self, image_str, working_directory=None, name=None, + port_bindings={Constants.DEFAULT_PUBLIC_WEBSERVER_PORT: ('127.0.0.1', 8080), + Constants.DEFAULT_PRIVATE_NOTEBOOK_PORT: ('127.0.0.1', 8081)}): + """Creates a new container with elevated privileges. Returns the container ID. Maps port 80 of container + to 8080 of locahost by default""" + + docker_image = DockerImage.from_string(image_str) + volume_dir = DockerProxy.get_container_volume_from_working_dir(working_directory) + + if name is None: + import uuid + random_str = str(uuid.uuid4()) + name = constants.Constants.MolnsDockerContainerNamePrefix + random_str[:8] + image = docker_image.image_id if docker_image.image_id is not Constants.DockerNonExistentTag \ + else docker_image.image_tag + + logging.info("Using image {0}".format(image)) + import os + if DockerProxy._verify_directory(working_directory) is False: + if working_directory is not None: + raise InvalidVolumeName("\n\nMOLNs uses certain reserved names for its configuration files in the " + "controller environment, and unfortunately the provided name for working " + "directory of the controller cannot be one of these. Please configure this " + "controller again with a different volume name and retry. " + "Here is the list of forbidden names: \n{0}" + .format(Constants.ForbiddenVolumeNames)) + + logging.warning(DockerProxy.LOG_TAG + "Unable to verify provided directory to use to as volume. Volume will NOT " + "be created.") + hc = self.client.create_host_config(privileged=True, port_bindings=port_bindings) + container = self.client.create_container(image=image, name=name, command="/bin/bash", tty=True, detach=True, + ports=[Constants.DEFAULT_PUBLIC_WEBSERVER_PORT, + Constants.DEFAULT_PRIVATE_NOTEBOOK_PORT], + host_config=hc, + environment={"PYTHONPATH": "/usr/local/"}) + + else: + container_mount_point = '/home/ubuntu/{0}'.format(os.path.basename(working_directory)) + hc = self.client.create_host_config(privileged=True, port_bindings=port_bindings, + binds={working_directory: {'bind': container_mount_point, + 'mode': 'rw'}}) + + container = self.client.create_container(image=image, name=name, command="/bin/bash", tty=True, detach=True, + ports=[Constants.DEFAULT_PUBLIC_WEBSERVER_PORT, + Constants.DEFAULT_PRIVATE_NOTEBOOK_PORT], + volumes=container_mount_point, host_config=hc, + working_dir=volume_dir, + environment={"PYTHONPATH": "/usr/local/"}) + + container_id = container.get("Id") + + return container_id + + # noinspection PyBroadException + @staticmethod + def _verify_directory(working_directory): + import os + if working_directory is None or os.path.basename(working_directory) in Constants.ForbiddenVolumeNames: + return False + try: + if not os.path.exists(working_directory): + os.makedirs(working_directory) + return True + except: + return False + + def stop_containers(self, container_ids): + """Stops given containers.""" + for container_id in container_ids: + self.stop_container(container_id) + + def stop_container(self, container_id): + """Stops the container with given ID.""" + self.client.stop(container_id) + + def container_status(self, container_id): + """Checks if container with given ID running.""" + status = ProviderBase.STATUS_TERMINATED + try: + ret_val = str(self.client.inspect_container(container_id).get('State').get('Status')) + if ret_val.startswith("running"): + status = ProviderBase.STATUS_RUNNING + else: + status = ProviderBase.STATUS_STOPPED + except NotFound: + pass + return status + + def start_containers(self, container_ids): + """Starts each container in given list of container IDs.""" + for container_id in container_ids: + self.start_container(container_id) + + def start_container(self, container_id): + """ Start the container with given ID.""" + logging.info(DockerProxy.LOG_TAG + " Starting container " + container_id) + try: + self.client.start(container=container_id) + except (NotFound, NullResource) as e: + print (DockerProxy.LOG_TAG + "Something went wrong while starting container.", e) + return False + return True + + def execute_command(self, container_id, command): + """Executes given command as a shell command in the given container. Returns None is anything goes wrong.""" + run_command = "/bin/bash -c \"" + command + "\"" + # print("CONTAINER: {0} COMMAND: {1}".format(container_id, run_command)) + if self.start_container(container_id) is False: + print (DockerProxy.LOG_TAG + "Could not start container.") + return None + try: + exec_instance = self.client.exec_create(container_id, run_command) + response = self.client.exec_start(exec_instance) + return [self.client.exec_inspect(exec_instance), response] + except (NotFound, APIError) as e: + print (DockerProxy.LOG_TAG + " Could not execute command.", e) + return None + + def build_image(self, dockerfile): + """ Build image from given Dockerfile object and return ID of the image created. """ + import uuid + logging.info("Building image...") + random_string = str(uuid.uuid4()) + image_tag = Constants.DOCKER_IMAGE_PREFIX + "{0}".format(random_string[:]) + last_line = "" + try: + for line in self.client.build(fileobj=dockerfile, rm=True, tag=image_tag): + print(DockerProxy._decorate(line)) + if "errorDetail" in line: + raise DockerProxy.ImageBuildException() + last_line = line + + # Return image ID. It's a hack around the fact that docker-py's build image command doesn't return an image + # id. + image_id = get_docker_image_id_from_string(str(last_line)) + logging.info("Image ID: {0}".format(image_id)) + return str(DockerImage(image_id, image_tag)) + + except (DockerProxy.ImageBuildException, IndexError) as e: + raise DockerProxy.ImageBuildException(e) + + @staticmethod + def _decorate(some_line): + return some_line[11:-4].rstrip() + + def image_exists(self, image_str): + """Checks if an image with the given ID/tag exists locally.""" + docker_image = DockerImage.from_string(image_str) + + if docker_image.image_id is Constants.DockerNonExistentTag \ + and docker_image.image_tag is Constants.DockerNonExistentTag: + raise InvalidDockerImageException("Neither image_id nor image_tag provided.") + + for image in self.client.images(): + some_id = image["Id"] + some_tags = image["RepoTags"] or [None] + if docker_image.image_id in \ + some_id[:(Constants.DOCKER_PY_IMAGE_ID_PREFIX_LENGTH + Constants.DOKCER_IMAGE_ID_LENGTH)]: + return True + if docker_image.image_tag in some_tags: + return True + return False + + def terminate_containers(self, container_ids): + """ Terminates containers with given container ids.""" + for container_id in container_ids: + try: + if self.container_status(container_id) == ProviderBase.STATUS_RUNNING: + self.stop_container(container_id) + self.terminate_container(container_id) + except NotFound: + pass + + def terminate_container(self, container_id): + self.client.remove_container(container_id) + + def get_mapped_ports(self, container_id): + container_ins = self.client.inspect_container(container_id) + mapped_ports = container_ins['HostConfig']['PortBindings'] + ret_val = [] + if mapped_ports is None: + logging.info("No mapped ports for {0}".format(container_id)) + return + for k, v in mapped_ports.iteritems(): + host_port = v[0]['HostPort'] + ret_val.append(host_port) + return ret_val + + def get_working_directory(self, container_id): + return self.client.inspect_container(container_id)["Config"]["WorkingDir"] + + def get_home_directory(self, container_id): + env_vars = self.client.inspect_container(container_id)["Config"]["Env"] + home = [i for i in env_vars if i.startswith("HOME")] + return home[0].split("=")[1] + + def put_archive(self, container_id, tar_file_bytes, target_path_in_container): + """ Copies and unpacks a given tarfile in the container at specified location. + Location must exist in container.""" + if self.start_container(container_id) is False: + raise Exception("Could not start container.") + + # Prepend file path with /home/ubuntu/. TODO Should be refined. + if not target_path_in_container.startswith("/home/ubuntu/"): + import os + target_path_in_container = os.path.join("/home/ubuntu/", target_path_in_container) + + logging.info("target path in container: {0}".format(target_path_in_container)) + if not self.client.put_archive(container_id, target_path_in_container, tar_file_bytes): + logging.error(DockerProxy.LOG_TAG + "Failed to copy.") + + def get_container_ip_address(self, container_id): + """ Returns the IP Address of given container.""" + self.start_container(container_id) + ins = self.client.inspect_container(container_id) + ip_address = str(ins.get("NetworkSettings").get("IPAddress")) + while True: + ip_address = str(ins.get("NetworkSettings").get("IPAddress")) + if ip_address == "": + time.sleep(3) + if ip_address.startswith("1") is True: + break + return ip_address + + +def get_docker_image_id_from_string(some_string): + exp = r'[a-z0-9]{12}' + matches = re.findall(exp, some_string) + if len(matches) is 0: + return None + else: + return matches[0] + + +class InvalidDockerImageException(Exception): + def __init__(self, message): + super(message) + + +class DockerImage: + def __init__(self, image_id=None, image_tag=None): + if image_id in [None, Constants.DockerNonExistentTag] and image_tag in [None, Constants.DockerNonExistentTag]: + raise InvalidDockerImageException("Both image_id and image_tag cannot be None.") + + self.image_id = image_id if image_id is not None else Constants.DockerNonExistentTag + self.image_tag = image_tag if image_tag is not None else Constants.DockerNonExistentTag + + def __str__(self): + if self.image_id is Constants.DockerNonExistentTag and self.image_tag is Constants.DockerNonExistentTag: + raise InvalidDockerImageException( + "Cannot serialize DockerImage object because both image_id and image_tag are None.") + + return "{0}{1}{2}".format(self.image_id, Constants.DockerImageDelimiter, self.image_tag) + + @staticmethod + def from_string(serialized_docker_image): + temp = serialized_docker_image.split(Constants.DockerImageDelimiter) + + if len(temp) is 2: + return DockerImage(image_id=temp[0], image_tag=temp[1]) + + if len(temp) > 2 or len(temp) is 0: + raise InvalidDockerImageException("Unexpected format, cannot serialize to DockerImage.") + + temp = temp[0] + # Figure out if temp is image_id or image_name. + if DockerImage.looks_like_image_id(temp): + return DockerImage(image_id=temp) + else: + return DockerImage(image_tag=temp) + + @staticmethod + def looks_like_image_id(some_string): + possible_image_id = get_docker_image_id_from_string(some_string) + if some_string is possible_image_id: + return True + else: + return False diff --git a/MolnsLib/DockerSSH.py b/MolnsLib/DockerSSH.py new file mode 100644 index 0000000..6f2032a --- /dev/null +++ b/MolnsLib/DockerSSH.py @@ -0,0 +1,95 @@ +import StringIO +import tarfile +import os +import re + + +# "unused" arguments to some methods are added to maintain compatibility with existing upper level APIs. + +class DockerSSH(object): + def __init__(self, docker): + self.docker = docker + self.container_id = None + + def exec_command(self, command, verbose=None): + cmd = re.sub("\"", "\\\"", command) # Escape all occurrences of ". + ret_val, response = self.docker.execute_command(self.container_id, cmd) + return response + + def exec_multi_command(self, command, verbose=None): + return self.exec_command(command) + + def open_sftp(self): + return MockSFTP(self.docker, self.container_id) + + def connect(self, instance, endpoint, username=None, key_filename=None): + self.container_id = instance.provider_instance_identifier + + def connect_cluster_node(self, ip_address, port, username, keyfile): + raise DockerSSHException("This invocation means that an error has occurred.") + + def close(self): + self.container_id = None + + +class MockSFTPFileException(Exception): + pass + + +class DockerSSHException(Exception): + pass + + +class MockSFTP: + def __init__(self, docker, container_id): + self.docker = docker + self.container_id = container_id + + def file(self, filename, flag): + return MockSFTPFile(filename, flag, self.docker, self.container_id) + + def close(self): + pass + + +class MockSFTPFile: + def __init__(self, filename, flag, docker, container_id): + self.filename = filename # Absolute path of file. + self.file_contents = "" + self.docker = docker + self.container_id = container_id + if flag is 'w': + self.flag = flag + # else: + # print("WARNING Unrecognized file mode. Filename: {0}, Flag: {1}".format(filename, flag)) + + def write(self, write_this): + self.file_contents += write_this + + def close(self): + # Make tarfile. + import uuid + rand_str = str(uuid.uuid4()) + temp_tar = "transport-{0}.tar".format(rand_str[:8]) + try: + tar = tarfile.TarFile(temp_tar, "w") + string = StringIO.StringIO() + string.write(self.file_contents) + string.seek(0) + tar_file_info = tarfile.TarInfo(name=os.path.basename(self.filename)) + tar_file_info.size = len(string.buf) + tar.addfile(tarinfo=tar_file_info, fileobj=string) + tar.close() + + path_to_file = os.path.dirname(self.filename) + + if not path_to_file.startswith("/home"): + path_to_file = os.path.join(self.docker.get_home_directory(self.container_id), path_to_file) + + with open(temp_tar, mode='rb') as f: + tar_file_bytes = f.read() + + # print("path to file: {0}".format(path_to_file)) + self.docker.put_archive(self.container_id, tar_file_bytes, path_to_file) + finally: + os.remove(temp_tar) # Remove temporary tar file. diff --git a/MolnsLib/EC2Provider.py b/MolnsLib/EC2Provider.py index 3451570..42f706f 100644 --- a/MolnsLib/EC2Provider.py +++ b/MolnsLib/EC2Provider.py @@ -169,7 +169,7 @@ def _get_image_name(self): ########################################## class EC2Controller(EC2Base): - """ Provider handle for an open stack controller. """ + """ Provider handle for an EC2 controller. """ OBJ_NAME = 'EC2Controller' @@ -272,7 +272,7 @@ def get_instance_status(self, instance): ########################################## class EC2WorkerGroup(EC2Controller): - """ Provider handle for an open stack controller. """ + """ Provider handle for EC2 worker group. """ OBJ_NAME = 'EC2WorkerGroup' @@ -464,7 +464,7 @@ def start_ec2_instances(self, image_id=None, key_name=None, group_name=None, num raise ProviderException("Could not find image_id={0}".format(image_id)) if img.state != "available": if img.state != "pending": - raise ProviderException("Image {0} is not available, it has state is {1}.".format(image_id, img.state)) + raise ProviderException("Image {0} is not available, it's state is {1}.".format(image_id, img.state)) while img.state == "pending": print "Image {0} has state {1}, waiting {2} seconds for it to become available.".format(image_id, img.state, self.PENDING_IMAGE_WAITTIME) time.sleep(self.PENDING_IMAGE_WAITTIME) diff --git a/MolnsLib/EucalyptusProvider.py b/MolnsLib/EucalyptusProvider.py new file mode 100644 index 0000000..b79e9f1 --- /dev/null +++ b/MolnsLib/EucalyptusProvider.py @@ -0,0 +1,685 @@ +import boto +import boto.ec2 +from boto.exception import EC2ResponseError +from boto.ec2.regioninfo import RegionInfo +import collections +import os +import time +import sys +import logging +from urlparse import urlparse +from collections import OrderedDict +import installSoftware +import ssh_deploy +from molns_provider import ProviderBase, ProviderException + +#logging.getLogger('boto').setLevel(logging.ERROR) +logging.getLogger('boto').setLevel(logging.CRITICAL) + + +########################################## +class EucalyptusBase(ProviderBase): + """ Abstract class for Eucalyptus. """ + + SSH_KEY_EXTENSION = ".pem" + PROVIDER_TYPE = 'Eucalyptus' + +#def EucalyptusProvider_config_get_region(): +# if os.environ.get('AWS_DEFAULT_REGION') is None: +# return 'us-east-1' +# return os.environ.get('AWS_DEFAULT_REGION') + +def EucalyptusProvider_config_get_ubuntu_images_by_region(conf=None): + if conf is not None: + access_key = conf['aws_access_key'] + secret_key = conf['aws_secret_key'] + ec2_url = conf['ec2_url'] + else: + access_key = os.environ.get('EC2_ACCESS_KEY') + secret_key = os.environ.get('EC2_SECRET_KEY') + ec2_url = os.environ.get('EC2_URL') + + try: + o = urlparse(ec2_url) + ec2_host = o.hostname + ec2_port = o.port + ec2_path = o.path + # Setup connection to Eucalyptus + conn = boto.connect_ec2(aws_access_key_id=access_key, + aws_secret_access_key=secret_key, + is_secure=False, + region=RegionInfo(name="eucalyptus", endpoint=ec2_host), + port=ec2_port, + path=ec2_path) + + # Run commands + images = conn.get_all_images() + for i in images: + if 'trusty' in i.name.lower(): + return i.id + except Exception as e: + logging.debug('EucalyptusProvider_config_get_ubuntu_images_by_region() caught exception {0}'.format(e)) + return None + +def EucalyptusProvider_default_key_name(): + user = os.environ.get('USER') or 'USER' + return "{0}_molns_sshkey_{1}".format(user, hex(int(time.time())).replace('0x','')) +########################################## +class EucalyptusProvider(EucalyptusBase): + """ Provider handle for an Eucalyptus service. """ + + OBJ_NAME = 'EucalyptusProvider' + + CONFIG_VARS = OrderedDict( + [ + ('aws_access_key', + {'q':'Eucalyptus access Key', 'default':os.environ.get('EC2_ACCESS_KEY'), 'ask':True, 'obfuscate':True}), + ('aws_secret_key', + {'q':'Eucalyptus secret key', 'default':os.environ.get('EC2_SECRET_KEY'), 'ask':True, 'obfuscate':True}), + ('ec2_url', + {'q':'URL of Eucalyptus service (EC2_URL)', 'default':os.environ.get('EC2_URL'), 'ask':True, 'obfuscate':False}), + ('key_name', + {'q':'Eucalyptus Key Pair name', 'default':EucalyptusProvider_default_key_name(), 'ask':True}), + ('group_name', + {'q':'Eucalyptus Security Group name', 'default':'molns', 'ask':True}), + ('ubuntu_image_name', + {'q':'ID of the base Ubuntu image to use', 'default':EucalyptusProvider_config_get_ubuntu_images_by_region, 'ask':True}), + ('molns_image_name', + {'q':'ID of the MOLNs image (leave empty for none)', 'default':None, 'ask':True}), + ('default_instance_type', + {'q':'Default Instance Type', 'default':'c3.large', 'ask':True}), + ('login_username', + {'default':'ubuntu', 'ask':False}) + ]) + + def get_config_credentials(self): + """ Return a dict with the credentials necessary for authentication. """ + return { + 'aws_access_key_id' : self.config['aws_access_key'], + 'aws_secret_access_key' : self.config['aws_secret_key'] + } + + + def check_ssh_key(self): + """ Check that the SSH key is found locally and remotely. + Returns: + True if the key is valid, otherwise False. + """ + ssh_key_dir = os.path.join(self.config_dir, self.name) + logging.debug('ssh_key_dir={0}'.format(ssh_key_dir)) + if not os.path.isdir(ssh_key_dir): + logging.debug('making ssh_key_dir={0}'.format(ssh_key_dir)) + os.makedirs(ssh_key_dir) + ssh_key_file = os.path.join(ssh_key_dir,self.config['key_name']+self.SSH_KEY_EXTENSION) + if not os.path.isfile(ssh_key_file): + logging.debug("ssh_key_file '{0}' not found".format(ssh_key_file)) + return False + self._connect() + return self.eucalyptus.keypair_exists(self.config['key_name']) + + def create_ssh_key(self): + """ Create the ssh key and write the file locally. """ + self._connect() + ssh_key_dir = os.path.join(self.config_dir, self.name) + logging.debug('creating ssh key {0} in dir{1}'.format(self.config['key_name'], ssh_key_dir)) + self.eucalyptus.create_keypair(self.config['key_name'], ssh_key_dir) + + def check_security_group(self): + """ Check if the security group is created. """ + self._connect() + return self.eucalyptus.security_group_exists(self.config['group_name']) + + def create_seurity_group(self): + """ Create the security group. """ + self._connect() + return self.eucalyptus.create_security_group(self.config['group_name']) + + def check_molns_image(self): + """ Check if the molns image is created. """ + if 'molns_image_name' in self.config and self.config['molns_image_name'] is not None and self.config['molns_image_name'] != '': + self._connect() + return self.eucalyptus.image_exists(self.config['molns_image_name']) + return False + + def create_molns_image(self): + """ Create the molns image is created. """ + self._connect() + # clear the network-related persisent udev rules: + #echo "" > /etc/udev/rules.d/70-persistent-net.rules + #echo "" > /lib/udev/rules.d/75-persistent-net-generator.rules + # + + # start vm + instances = self.eucalyptus.start_eucalyptus_instances(image_id=self.config["ubuntu_image_name"]) + instance = instances[0] + # get login ip + ip = instance.public_dns_name + # install software + try: + logging.debug("installing software on server (ip={0})".format(ip)) + install_vm_instance = installSoftware.InstallSW(ip, config=self) + #install_vm_instance.run_with_logging() + # create image + logging.debug("Shutting down instance") + self.eucalyptus.stop_eucalyptus_instances([instance]) + logging.debug("Creating image") + image_id = instance.create_image(name=self._get_image_name()) + #logging.debug("Finding volume of instance") + #vol = None + #for v in self.eucalyptus.conn.get_all_volumes(): + # if v.attach_data is not None and v.attach_data.instance_id == instance.id: + # vol = v + # break + #if vol is None: + # raise Exception("Can not find volume associated with instance. Base image must be an EBS backed image.") + #snap = vol.create_snapshot() + #logging.debug('Snapshot {0} of volume {1}'.format(snap.id, vol.id)) + #image_id = self.eucalyptus.conn.register_image(name=self._get_image_name(), snapshot_id=snap.id, delete_root_volume_on_termination=True) + ##deleteOnTermination + #image_id = self.eucalyptus.conn.register_image(name=self._get_image_name(), snapshot_id=snap.id) + logging.debug("Image created: {0}".format(image_id)) + except Exception as e: + logging.exception(e) + raise ProviderException("Failed to create molns image: {0}".format(e)) + finally: + logging.debug("terminating {0}".format(instance)) + self.eucalyptus.terminate_eucalyptus_instances([instance]) + return image_id + + def _connect(self): + if self.connected: return + self.eucalyptus = CreateVM(config=self) + self.connected = True + + def _get_image_name(self): + return "MOLNS_{0}_{1}_{2}".format(self.PROVIDER_TYPE, self.name, int(time.time())) + +########################################## +class EucalyptusController(EucalyptusBase): + """ Provider handle for an open stack controller. """ + + OBJ_NAME = 'EucalyptusController' + + CONFIG_VARS = OrderedDict( + [ + ('instance_type', + {'q':'Default Instance Type', 'default':'c3.large', 'ask':True}), + ]) + + def _connect(self): + if self.connected: return + self.eucalyptus = CreateVM(config=self.provider) + self.connected = True + + def start_instance(self, num=1): + """ Start or resume the controller. """ + try: + self._connect() + instances = self.eucalyptus.start_eucalyptus_instances(image_id=self.provider.config["molns_image_name"], num=int(num), instance_type=self.config["instance_type"]) + ret = [] + for instance in instances: + ip = instance.public_dns_name + i = self.datastore.get_instance(provider_instance_identifier=instance.id, ip_address=ip, provider_id=self.provider.id, controller_id=self.id) + ret.append(i) + if num == 1: + return ret[0] + else: + return ret + except Exception as e: + logging.exception(e) + raise ProviderException("Failed to start molns instance: {0}".format(e)) + + def resume_instance(self, instances): + self._connect() + if isinstance(instances, list): + eucalyptus_instances = [] + for instance in instances: + eucalyptus_instance = self.eucalyptus.get_instance(instance.provider_instance_identifier) + eucalyptus_instances.append(eucalyptus_instance) + new_eucalyptus_instances = self.eucalyptus.resume_eucalyptus_instances(eucalyptus_instances) + instances_to_update = list(instances) + while len(instances_to_update) > 0: + instance = instances_to_update.pop() + success=False + for eucalyptus_inst in new_eucalyptus_instances: + if eucalyptus_inst.id == instance.provider_instance_identifier: + instance.ip_address = eucalyptus_inst.public_dns_name + logging.debug("instance.id={0} updated with ip={1}".format(instance.provider_instance_identifier, instance.ip_address)) + success=True + break + if not success: + raise ProviderException("Could not update the IP of id={0} after resume".format(instance.provider_instance_identifier)) + else: + eucalyptus_instance = self.eucalyptus.get_instance(instances.provider_instance_identifier) + new_instance = self.eucalyptus.resume_eucalyptus_instances([eucalyptus_instance]) + instances.ip_address = new_instance[0].public_dns_name + logging.debug("instance.id={0} updated with ip={1}".format(instances.provider_instance_identifier, instances.ip_address)) + + def stop_instance(self, instances): + self._connect() + if isinstance(instances, list): + eucalyptus_instances = [] + for instance in instances: + eucalyptus_instance = self.eucalyptus.get_instance(instance.provider_instance_identifier) + eucalyptus_instances.append(eucalyptus_instance) + self.eucalyptus.stop_eucalyptus_instances(eucalyptus_instances) + else: + eucalyptus_instance = self.eucalyptus.get_instance(instances.provider_instance_identifier) + self.eucalyptus.stop_eucalyptus_instances([eucalyptus_instance]) + + def terminate_instance(self, instances): + self._connect() + if isinstance(instances, list): + eucalyptus_instances = [] + for instance in instances: + eucalyptus_instance = self.eucalyptus.get_instance(instances.provider_instance_identifier) + eucalyptus_instances.append(eucalyptus_instance) + self.datastore.delete_instance(instance) + self.eucalyptus.terminate_eucalyptus_instances(eucalyptus_instances) + else: + eucalyptus_instance = self.eucalyptus.get_instance(instances.provider_instance_identifier) + self.eucalyptus.terminate_eucalyptus_instances([eucalyptus_instance]) + self.datastore.delete_instance(instances) + + def get_instance_status(self, instance): + self._connect() + try: + status = self.eucalyptus.get_instance_status(instance.provider_instance_identifier) + except Exception as e: + #logging.exception(e) + return self.STATUS_TERMINATED + if status == 'running' or status == 'pending': + return self.STATUS_RUNNING + if status == 'stopped' or status == 'stopping': + return self.STATUS_STOPPED + if status == 'terminated' or status == 'shutting-down': + return self.STATUS_TERMINATED + raise ProviderException("EucalyptusController.get_instance_status() got unknown status '{0}'".format(status)) + + +########################################## +class EucalyptusWorkerGroup(EucalyptusController): + """ Provider handle for an open stack controller. """ + + OBJ_NAME = 'EucalyptusWorkerGroup' + + CONFIG_VARS = OrderedDict( + [ + ('instance_type', + {'q':'Default Instance Type', 'default':'c3.large', 'ask':True}), + ('num_vms', + {'q':'Number of virtual machines in group', 'default':'1', 'ask':True}), + ]) + + def start_instance(self, num=1): + """ Start worker group vms. """ + try: + self._connect() + instances = self.eucalyptus.start_eucalyptus_instances(image_id=self.provider.config["molns_image_name"], num=int(num), instance_type=self.config["instance_type"]) + ret = [] + for instance in instances: + ip = instance.public_dns_name + i = self.datastore.get_instance(provider_instance_identifier=instance.id, ip_address=ip, provider_id=self.provider.id, controller_id=self.controller.id, worker_group_id=self.id) + ret.append(i) + if num == 1: + return ret[0] + else: + return ret + except Exception as e: + logging.exception(e) + raise ProviderException("Failed to start molns instance: {0}".format(e)) + + def terminate_instance(self, instances): + self._connect() + if isinstance(instances, list): + eucalyptus_instances = [] + for instance in instances: + eucalyptus_instance = self.eucalyptus.get_instance(instance.provider_instance_identifier) + eucalyptus_instances.append(eucalyptus_instance) + self.datastore.delete_instance(instance) + self.eucalyptus.terminate_eucalyptus_instances(eucalyptus_instances) + else: + eucalyptus_instance = self.eucalyptus.get_instance(instances.provider_instance_identifier) + self.eucalyptus.terminate_eucalyptus_instances([eucalyptus_instance]) + self.datastore.delete_instance(instances) + + +########################################## +class CreateVM: + ''' + This class is used to create VMs for Eucalyptus + ''' + PENDING_IMAGE_WAITTIME = 60 + + def __init__(self, config=None, connect=True): + if config is not None: + self.config = config + if self.config['aws_access_key'] is None or self.config['aws_secret_key'] is None: + raise ProviderException("AWS_SECRET_KEY or AWS_ACCESS_KEY not set") + if connect: + self.connect() + + def connect(self): + #self.conn = boto.ec2.connect_to_region( + # self.config['aws_region'], + # aws_access_key_id=self.config['aws_access_key'], + # aws_secret_access_key=self.config['aws_secret_key'] + #) + access_key = self.config['aws_access_key'] + secret_key = self.config['aws_secret_key'] + ec2_url = self.config['ec2_url'] + o = urlparse(ec2_url) + ec2_host = o.hostname + ec2_port = o.port + ec2_path = o.path + # Setup connection to Eucalyptus + self.conn = boto.connect_ec2(aws_access_key_id=access_key, + aws_secret_access_key=secret_key, + is_secure=False, + region=RegionInfo(name="eucalyptus", endpoint=ec2_host), + port=ec2_port, + path=ec2_path) + + + def get_instance(self, instance_id): + #logging.debug("get_instance(instance_id={0})".format(instance_id)) + try: + reservations = self.conn.get_all_reservations(instance_ids=[instance_id]) + except EC2ResponseError: + raise ProviderException("instance not found {0}".format(instance_id)) + #logging.debug("get_instance() reservations:{0}".format(reservations)) + for reservation in reservations: + #logging.debug("get_instance() reservation.instances:{0}".format(reservation.instances)) + for instance in reservation.instances: + if instance.id == instance_id: + return instance + raise ProviderException("instance not found {0}".format(instance_id)) + + def get_instance_status(self, instance_id): + return self.get_instance(instance_id).state + + + def get_vm_status(self, key_name=None, verbose=False, show_all=False): + if key_name is None: + key_name = self.config['key_name'] + reservations = self.conn.get_all_reservations() + stopped_vms = [] + running_vms = [] + for reservation in reservations: + for instance in reservation.instances: + if verbose and show_all: + print "{0}\t{1}\t{2}\t{3}".format(instance.id,instance.key_name,instance.state,instance.public_dns_name) + if instance.key_name == key_name: + if verbose and not show_all: + print "{0}\t{1}\t{2}\t{3}".format(instance.id,instance.key_name,instance.state,instance.public_dns_name) + if instance.state == 'running': + running_vms.append(instance) + elif instance.state == 'stopped': + stopped_vms.append(instance) + #return (stopped_vms, running_vms) + return (stopped_vms, sorted(running_vms, key=lambda vm: vm.id)) + + def image_exists(self, image_id): + try: + img = self.conn.get_all_images(image_ids=[image_id])[0] + return True + except IndexError: + return False + + def start_vms(self, image_id=None, key_name=None, group_name=None, num=None, instance_type=None): + if key_name is None: + key_name = self.config['key_name'] + if group_name is None: + group_name = self.config['group_name'] + if num is None: + num = 1 + if instance_type is None: + instance_type = self.config['default_instance_type'] + # Check the group + self.create_security_group(group_name) + + #(stopped_vms, running_vms) = self.get_vm_status(key_name) + #if len(running_vms) > 0: + # msg = "Error: {0} VMs are already running with key_name={1}".format(len(running_vms), + # key_name) + # print msg + # raise ProviderException(msg) + + if len(stopped_vms) > 0: + return self.resume_eucalyptus_instances(stopped_vms) + + if image_id is None: + raise ProviderException("Base Ubuntu image not specified.") + else: + self.image_id = image_id + + # Check image + try: + img = self.conn.get_all_images(image_ids=[self.image_id])[0] + except IndexError: + raise ProviderException("Could not find image_id={0}".format(self.image_id)) + + if img.state != "available": + if img.state != "pending": + raise ProviderException("Image {0} is not available, it has state is {1}.".format(self.image_id, img.state)) + while img.state == "pending": + print "Image {0} has state {1}, waiting {2} seconds for it to become available.".format(self.image_id, img.state, self.PENDING_IMAGE_WAITTIME) + time.sleep(self.PENDING_IMAGE_WAITTIME) + img.update() + + self.key_name = key_name + self.group_name = group_name + group_list = [] + for _ in range(num): + group_list.append(group_name) + + print "Starting {0} Eucalyptus instance(s). This will take a minute...".format(num) + reservation = self.conn.run_instances(self.image_id, min_count=num, max_count=num, key_name=key_name, security_groups=group_list, instance_type=instance_type) + + instances = reservation.instances + num_instance = len(instances) + num_running = 0 + while num_running < num_instance: + num_running = 0 + for instance in instances: + instance.update() + if instance.state == 'running': + num_running += 1 + if num_running < num_instance: + time.sleep(5) + print "Eucalyptus instances started." + return sorted(instances, key=lambda vm: vm.id) + + def start_eucalyptus_instances(self, image_id=None, key_name=None, group_name=None, num=1, instance_type=None): + if key_name is None: + key_name = self.config['key_name'] + if group_name is None: + group_name = self.config['group_name'] + if num is None: + num = 1 + if instance_type is None: + instance_type = self.config['default_instance_type'] + try: + img = self.conn.get_all_images(image_ids=[image_id])[0] + except IndexError: + raise ProviderException("Could not find image_id={0}".format(image_id)) + if img.state != "available": + if img.state != "pending": + raise ProviderException("Image {0} is not available, it has state is {1}.".format(image_id, img.state)) + while img.state == "pending": + print "Image {0} has state {1}, waiting {2} seconds for it to become available.".format(image_id, img.state, self.PENDING_IMAGE_WAITTIME) + time.sleep(self.PENDING_IMAGE_WAITTIME) + img.update() + print "Starting {0} Eucalyptus instance(s). This will take a minute...".format(num) + reservation = self.conn.run_instances(image_id, min_count=num, max_count=num, key_name=key_name, security_groups=[group_name], instance_type=instance_type) + instances = reservation.instances + num_instance = len(instances) + num_running = 0 + while num_running < num_instance: + num_running = 0 + for instance in instances: + instance.update() + if instance.state == 'running': + num_running += 1 + if num_running < num_instance: + time.sleep(5) + print "Eucalyptus instances started." + return sorted(instances, key=lambda vm: vm.id) + + def stop_vms(self, key_name=None): + if key_name is None: + key_name = self.config['key_name'] + (stopped_vms, running_vms) = self.get_vm_status(key_name) + self.stop_eucalyptus_instances(running_vms) + + def terminate_vms(self, key_name=None): + if key_name is None: + key_name = self.config['key_name'] + (stopped_vms, running_vms) = self.get_vm_status(key_name) + self.terminate_eucalyptus_instances(running_vms+stopped_vms) + + def resume_eucalyptus_instances(self, instances): + num_instance = len(instances) + print "Resuming Eucalyptus instance(s). This will take a minute..." + for instance in instances: + print "\t{0}.".format(instance.id) + instance.start() + num_running = 0 + while num_running < num_instance: + num_running = 0 + for instance in instances: + instance.update() + if instance.state == 'running': + num_running += 1 + if num_running < num_instance: + time.sleep(5) + print "Eucalyptus instances resumed." + return instances + + def stop_eucalyptus_instances(self, instances): + num_instance = len(instances) + print "Stopping Eucalyptus instance(s). This will take a minute..." + for instance in instances: + print "\t{0}.".format(instance.id) + instance.stop() + num_stopped = 0 + while num_stopped < num_instance: + num_stopped = 0 + for instance in instances: + instance.update() + if instance.state == 'stopped': + num_stopped += 1 + if num_stopped < num_instance: + time.sleep(5) + print "Eucalyptus instances stopped." + + def terminate_eucalyptus_instances(self, instances): + num_instance = len(instances) + print "Terminating Eucalyptus instance(s). This will take a minute..." + for instance in instances: + print "\t{0}.".format(instance.id) + instance.terminate() + num_terminated = 0 + while num_terminated < num_instance: + num_terminated = 0 + for instance in instances: + instance.update() + if instance.state == 'terminated': + num_terminated += 1 + if num_terminated < num_instance: + time.sleep(5) + print "Eucalyptus instance terminated." + + def create_vm_image(self, image_name=None, key_name=None): + if key_name is None: + key_name = self.config['key_name'] + if image_name is None: + image_name = "MOLNS_{0}_{1}".format(key_name,int(time.time())) + (stopped_vms, running_vms) = self.get_vm_status(key_name) + if len(running_vms) != 1: + raise ProviderException("Expected only one running vm, {0} are running".format(len(running_vms))) + self.stop_eucalyptus_instances(running_vms) + instance = running_vms[0] + image_ami = instance.create_image(image_name) + print "Image created id={0} name={0}".format(image_ami, image_name) + self.terminate_eucalyptus_instances(running_vms) + return image_ami + + + + def keypair_exists(self, key_name): + for sg in self.conn.get_all_key_pairs(): + if sg.name == key_name: + return True + return False + + def keypair_file_exists(cls, key_name, conf_dir): + return os.path.exists(conf_dir + os.sep + key_name + ".pem") + + def create_keypair(self, key_name, conf_dir): + key_pair = self.conn.create_key_pair(key_name) + key_pair.save(conf_dir) + + def security_group_exists(self, group_name): + for sg in self.conn.get_all_security_groups(): + if sg.name == group_name: + return True + return False + + def create_security_group(self, group_name): + security_group = None + for sg in self.conn.get_all_security_groups(): + if sg.name == group_name: + security_group = sg + break + if security_group is None: + print "Security group not found, creating one." + security_group = self.conn.create_security_group(group_name, 'MOLNs Security Group') + self.set_security_group_rules(security_group) + elif not self.check_security_group_rules(security_group): + raise ProviderException("Security group {0} exists, but has the wrong firewall rules. Please delete the group, or choose a different one.") + return security_group + + + def set_security_group_rules(self, group, expected_rules=ProviderBase.FIREWALL_RULES): + for rule in expected_rules: + if not group.authorize(ip_protocol=rule.ip_protocol, + from_port=rule.from_port, + to_port=rule.to_port, + cidr_ip=rule.cidr_ip): + return False + return True + + def check_security_group_rules(self, group, expected_rules=ProviderBase.FIREWALL_RULES): + """ Check to be sure the expected_rules are set for this group. """ + ret = True + + current_rules = [] + for rule in group.rules: + if not rule.grants[0].cidr_ip: + current_rule = self.SecurityGroupRule(rule.ip_protocol, + rule.from_port, + rule.to_port, + "0.0.0.0/0", + rule.grants[0].name) + else: + current_rule = self.SecurityGroupRule(rule.ip_protocol, + rule.from_port, + rule.to_port, + rule.grants[0].cidr_ip, + None) + + if current_rule not in expected_rules: + print "Unexpected Rule: {0}".format(current_rule) + ret = False + else: + #print "Current Rule: {0}".format(current_rule) + current_rules.append(current_rule) + + for rule in expected_rules: + if rule not in current_rules: + print "Rule not found: {0}".format(rule) + ret = False + + return ret + diff --git a/MolnsLib/OpenStackProvider.py b/MolnsLib/OpenStackProvider.py index 26c8779..5a35c98 100644 --- a/MolnsLib/OpenStackProvider.py +++ b/MolnsLib/OpenStackProvider.py @@ -43,6 +43,8 @@ class OpenStackProvider(OpenStackBase): {'q':'OpenStack project_name', 'default':os.environ.get('OS_TENANT_NAME'), 'ask':True}), ('neutron_nic', {'q':'Network ID (leave empty if only one possible network)', 'default':None, 'ask':True}), + ('region_name', + {'q':'Specify the region (leave empty if only one region)', 'default':os.environ.get('OS_REGION_NAME'), 'ask':True}), ('floating_ip_pool', {'q':'Name of Floating IP Pool (leave empty if only one possible pool)', 'default':None, 'ask':True}), ('nova_version', @@ -192,6 +194,8 @@ def _connect(self): creds['api_key'] = self.config['nova_password'] creds['auth_url'] = self.config['nova_auth_url'] creds['project_id'] = self.config['nova_project_id'] + if 'region_name' in self.config and self.config['region_name'] is not None: + creds['region_name'] = self.config['region_name'] self.nova = novaclient.Client(self.config['nova_version'], **creds) self.connected = True diff --git a/MolnsLib/Utils.py b/MolnsLib/Utils.py new file mode 100644 index 0000000..a6b75f2 --- /dev/null +++ b/MolnsLib/Utils.py @@ -0,0 +1,45 @@ +def get_user_name(): + try: + import os + return os.environ['SUDO_USER'] + except KeyError: + import getpass + return getpass.getuser() + + +def get_sudo_user_id(): + import pwd + u_name = get_user_name() + return pwd.getpwnam(u_name).pw_uid + + +def get_sudo_group_id(): + import grp + u_name = get_user_name() + return grp.getgrnam(u_name).gr_gid + + +def ensure_sudo_mode(some_function): + import os + import sys + if sys.platform.startswith("linux") and os.getuid() != 0: + pass + raise NoPrivilegedMode("\n\nOn Linux platforms, 'docker' is a priviledged command. " + "To use 'docker' functionality, please run in sudo mode or as root user.") + return some_function + + +class Log: + verbose = True + + def __init__(self): + pass + + @staticmethod + def write_log(message): + if Log.verbose: + print message + + +class NoPrivilegedMode(Exception): + pass diff --git a/MolnsLib/constants.py b/MolnsLib/constants.py new file mode 100644 index 0000000..7ac7ffb --- /dev/null +++ b/MolnsLib/constants.py @@ -0,0 +1,28 @@ +import os + + +class Constants: + DockerWorkingDirectoryPrefix = "working_dir_" + LOGGING_DIRECTORY = "~/MOLNS_LOG" + DOCKER_BASE_URL = "unix://var/run/docker.sock" + DOCKER_DEFAULT_IMAGE = "ubuntu:latest" + DOCKER_DEFAULT_PORT = '9000' + DOCKER_CONTAINER_RUNNING = "running" + DOCKER_CONTAINER_EXITED = "exited" + DOCKERFILE_NAME = "dockerfile_" + DOKCER_IMAGE_ID_LENGTH = 12 + DOCKER_IMAGE_PREFIX = "molns-docker-provider-" + DOCKER_PY_IMAGE_ID_PREFIX_LENGTH = 7 + DockerProvider = "Docker" + DockerNonExistentTag = "**NA**" + DockerImageDelimiter = "|||" + MolnsDockerContainerNamePrefix = "Molns-" + MolnsExecHelper = "molns_exec_helper.py" + DEFAULT_PRIVATE_NOTEBOOK_PORT = 8081 + DEFAULT_PUBLIC_NOTEBOOK_PORT = 443 + DEFAULT_PRIVATE_WEBSERVER_PORT = 8001 + DEFAULT_PUBLIC_WEBSERVER_PORT = 80 + DEFAULT_QSUB_SSH_PORT = 22 + ForbiddenVolumeNames = [".ssh", ".ipython", ".molns", "ipython", "localarea", "shared"] + ConfigDir = os.path.join(os.path.dirname(os.path.abspath(__file__)), ".molns/") + ClusterKeyFileNameOnController = "molns_cluster_secretkey" diff --git a/MolnsLib/installSoftware.py b/MolnsLib/installSoftware.py index 2094ccf..a89fc90 100644 --- a/MolnsLib/installSoftware.py +++ b/MolnsLib/installSoftware.py @@ -16,7 +16,7 @@ class InstallSW: This class is used for installing software ''' - # Install the necessary software for IPython and PyURDME. + # Contextualization, install the software for IPython and PyURDME. # Commands can be specified in 3 ways: # 1: a string # 2: a list a strings @@ -24,6 +24,8 @@ class InstallSW: # item is a 'check' command, which should error (return code 1) if the first item(s) did not # install correctly command_list = [ + + # Basic contextualization "curl http://www.ubuntu.com", # Check to make sure networking is up. "sudo apt-get update", "sudo apt-get -y install git", @@ -32,69 +34,115 @@ class InstallSW: "sudo apt-get -y install python-matplotlib python-numpy python-scipy", "sudo apt-get -y install make", "sudo apt-get -y install python-software-properties", - "sudo add-apt-repository ppa:fenics-packages/fenics", - "sudo apt-get update", - "sudo apt-get -y install fenics", "sudo apt-get -y install cython python-h5py", "sudo apt-get -y install python-pip python-dev build-essential", "sudo pip install pyzmq --upgrade", "sudo pip install dill cloud pygments", "sudo pip install tornado Jinja2", + + # Molnsutil develop + [ + "sudo pip install jsonschema jsonpointer", + # EC2/S3 and OpenStack APIs + "sudo pip install boto", + "sudo apt-get -y install pandoc", + # This set of packages is needed for OpenStack, as molns_util uses them for hybrid cloud deployment + "sudo apt-get -y install libxml2-dev libxslt1-dev python-dev", + "sudo pip install python-novaclient", + "sudo easy_install -U pip", + "sudo pip install python-keystoneclient", + "sudo pip install python-swiftclient", + ], + [ + "sudo rm -rf /usr/local/molnsutil;sudo mkdir -p /usr/local/molnsutil;sudo chown ubuntu /usr/local/molnsutil", + "cd /usr/local/ && git clone https://github.com/aviral26/molnsutil.git && cd /usr/local/molnsutil && git checkout qsub_support" + ], + + # Molns develop + [ + "sudo rm -rf /usr/local/molns;sudo mkdir -p /usr/local/molns;sudo chown ubuntu /usr/local/molns", + "cd /usr/local/ && git clone https://github.com/aviral26/molns.git && cd /usr/local/molns" + ], + + # Cluster execution + [ + "sudo rm -rf /usr/local/cluster_execution;sudo mkdir -p /usr/local/cluster_execution;sudo chown ubuntu /usr/local/cluster_execution", + "cd /usr/local/ && git clone https://github.com/aviral26/cluster_execution.git" + ], - - # For molnsutil - "sudo pip install jsonschema jsonpointer", - # S3 and OS APIs - "sudo pip install boto", - "sudo apt-get -y install pandoc", - # This set of packages is really only needed for OpenStack, but molnsutil uses them - "sudo apt-get -y install libxml2-dev libxslt1-dev python-dev", - "sudo pip install python-novaclient", - "sudo easy_install -U pip", - "sudo pip install python-keystoneclient", - "sudo pip install python-swiftclient", # So the workers can mount the controller via SSHfs [ "sudo apt-get -y install sshfs", "sudo gpasswd -a ubuntu fuse", + "mkdir -p /home/ubuntu/.ssh/", "echo 'ServerAliveInterval 60' >> /home/ubuntu/.ssh/config", ], - # FOR DEVELOPMENT, NEEDS TO BE TESTED - # High-performance ssh-hpn - #[ - # "sudo add-apt-repository ppa:w-rouesnel/openssh-hpn -y", - # "sudo apt-get update -y", - #], - - # IPython install + + # IPython [ "sudo rm -rf ipython;git clone --recursive https://github.com/Molns/ipython.git", "cd ipython && git checkout 3.0.0-molns_fixes && python setup.py submodule && sudo python setup.py install", "sudo rm -rf ipython", "ipython profile create default", "sudo pip install terminado", #Jupyter terminals + "python -c \"from IPython.external import mathjax; mathjax.install_mathjax(tag='2.2.0')\"" ], - [ "sudo rm -rf /usr/local/pyurdme;sudo mkdir -p /usr/local/pyurdme;sudo chown ubuntu /usr/local/pyurdme", - "cd /usr/local/ && git clone https://github.com/MOLNs/pyurdme.git", - "cd /usr/local/pyurdme && git checkout develop", + + + ### Simulation software related to pyurdme and StochSS + + # Gillespy + [ "sudo rm -rf /usr/local/StochKit;sudo mkdir -p /usr/local/StochKit;sudo chown ubuntu /usr/local/StochKit", + "cd /usr/local/ && git clone https://github.com/StochSS/stochkit.git StochKit", + "cd /usr/local/StochKit && ./install.sh", - "cp /usr/local/pyurdme/pyurdme/data/three.js_templates/js/* .ipython/profile_default/static/custom/", + #"wget https://github.com/StochSS/stochss/blob/master/ode-1.0.4.tgz?raw=true -q -O /tmp/ode.tgz", + "wget https://github.com/StochSS/StochKit_ode/archive/master.tar.gz?raw=true -q -O /tmp/ode.tgz", + "cd /tmp && tar -xzf /tmp/ode.tgz", + "sudo mv /tmp/StochKit_ode-master /usr/local/ode", + "rm /tmp/ode.tgz", + "cd /usr/local/ode/cvodes/ && tar -xzf \"cvodes-2.7.0.tar.gz\"", + "cd /usr/local/ode/cvodes/cvodes-2.7.0/ && ./configure --prefix=\"/usr/local/ode/cvodes/cvodes-2.7.0/cvodes\" 1>stdout.log 2>stderr.log", + "cd /usr/local/ode/cvodes/cvodes-2.7.0/ && make 1>stdout.log 2>stderr.log", + "cd /usr/local/ode/cvodes/cvodes-2.7.0/ && make install 1>stdout.log 2>stderr.log", + "cd /usr/local/ode/ && STOCHKIT_HOME=/usr/local/StochKit/ STOCHKIT_ODE=/usr/local/ode/ make 1>stdout.log 2>stderr.log", + + "sudo rm -rf /usr/local/gillespy;sudo mkdir -p /usr/local/gillespy;sudo chown ubuntu /usr/local/gillespy", + "cd /usr/local/ && git clone https://github.com/briandrawert/gillespy.git", + "cd /usr/local/gillespy && sudo STOCHKIT_HOME=/usr/local/StochKit/ STOCHKIT_ODE_HOME=/usr/local/ode/ python setup.py install" + + ], + + # FeniCS/Dolfin/pyurdme + [ "sudo add-apt-repository -y ppa:fenics-packages/fenics", + "sudo apt-get update", + "sudo apt-get -y install fenics", + # Gmsh for Finite Element meshes + "sudo apt-get install -y gmsh", + ], + + ["sudo apt-get install docker", "sudo pip install docker", "sudo pip install sqlalchemy", + "sudo pip install boto", "sudo pip install python-novaclient", "sudo pip install paramiko"], + # pyurdme + [ "sudo rm -rf /usr/local/pyurdme && sudo mkdir -p /usr/local/pyurdme && sudo chown ubuntu /usr/local/pyurdme", + "cd /usr/local/ && git clone https://github.com/MOLNs/pyurdme.git", + #"cd /usr/local/pyurdme && git checkout develop", # for development only + "cp /usr/local/pyurdme/pyurdme/data/three.js_templates/js/* $HOME/.ipython/profile_default/static/custom/", "source /usr/local/pyurdme/pyurdme_init && python -c 'import pyurdme'", ], - [ "rm -rf MOLNS_notebooks;git clone https://github.com/Molns/MOLNS_notebooks.git", - "cp MOLNS_notebooks/*.ipynb .;rm -rf MOLNS_notebooks;", + + # example notebooks + [ "rm -rf MOLNS_notebooks && git clone https://github.com/Molns/MOLNS_notebooks.git", + "cp MOLNS_notebooks/*.ipynb . && rm -rf MOLNS_notebooks", "ls *.ipynb" ], - [ - "sudo rm -rf /usr/local/molnsutil;sudo mkdir -p /usr/local/molnsutil;sudo chown ubuntu /usr/local/molnsutil", - "cd /usr/local/ && git clone https://github.com/Molns/molnsutil.git", - "cd /usr/local/molnsutil && sudo python setup.py install" - ], - "python -c \"from IPython.external import mathjax; mathjax.install_mathjax(tag='2.2.0')\"", - - # Upgrade scipy from pip to get rid of six.py bug on Trusty + + # Upgrade scipy from pip to get rid of super-annoying six.py bug on Trusty "sudo apt-get -y remove python-scipy", "sudo pip install scipy", - - "sync", # This is critial for some infrastructures. + + "sudo pip install jsonschema jsonpointer", # redo this install to be sure it has not been removed. + "sudo pip install paramiko", + + "sync", # This is critical for some infrastructures. ] # How many time do we try to install each package. @@ -234,7 +282,6 @@ def exec_command_list_switch(self, command_list): raise SystemExit("CRITICAL ERROR: could not complete command '{0}'. Exiting.".format(command)) print "Installation complete in {0}s".format(time.time() - tic) - def log_exec(self, msg): if self.log_file is not None: self.log_file.write(msg) @@ -298,6 +345,11 @@ def exec_multi_command(self, command, next_command): print "FAILED......\t{0}:{1}\t{2}\t{3}".format(self.hostname, self.ssh_endpoint, command, e) raise InstallSWException() + @staticmethod + def get_command_list(): + """Returns the whole list of dependency installation commands. """ + return InstallSW.command_list + if __name__ == "__main__": print "{0}".format(InstallSW.command_list) print "len={0}".format(len(InstallSW.command_list)) @@ -308,4 +360,3 @@ def exec_multi_command(self, command, next_command): else: cnt += 1 print "cnt={0}".format(cnt) - diff --git a/MolnsLib/molns_datastore.py b/MolnsLib/molns_datastore.py index b54bf43..dee8428 100644 --- a/MolnsLib/molns_datastore.py +++ b/MolnsLib/molns_datastore.py @@ -1,15 +1,20 @@ #!/usr/bin/env python from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base + Base = declarative_base() from sqlalchemy import Column, Integer, String, Sequence from sqlalchemy.orm import sessionmaker import os import logging import sys +import uuid +import datetime + ############################################################# -#VALID_PROVIDER_TYPES = ['OpenStack', 'EC2', 'Rackspace'] -VALID_PROVIDER_TYPES = ['OpenStack', 'EC2'] +VALID_PROVIDER_TYPES = ['OpenStack', 'EC2', 'Eucalyptus', 'Docker'] + + ############################################################# #### SCHEMA ################################################# ############################################################# @@ -18,12 +23,13 @@ class Provider(Base): """ DB object for an infrastructure service provider. """ __tablename__ = 'providers' id = Column(Integer, Sequence('provider_id_seq'), primary_key=True) - type = Column(String) #'EC2', 'Azure', 'OpenStack' + type = Column(String) # 'EC2', 'Azure', 'OpenStack' name = Column(String) def __str__(self): return "Provider({0}): name={1} type={2}".format(self.id, self.name, self.type) + class ProviderData(Base): """ DB object to store the key/value pairs for a service provider. """ __tablename__ = 'provider_data' @@ -33,19 +39,22 @@ class ProviderData(Base): value = Column(String) def __str__(self): - return "ProviderData({0}): provider_id={1} name={2} value={3}".format(self.id, self.parent_id, self.name, self.value) + return "ProviderData({0}): provider_id={1} name={2} value={3}".format(self.id, self.parent_id, self.name, + self.value) + class Controller(Base): """ DB object for a MOLNS controller. """ __tablename__ = 'controllers' id = Column(Integer, Sequence('controller_id_seq'), primary_key=True) - type = Column(String) #'EC2', 'Azure', 'OpenStack' + type = Column(String) # 'EC2', 'Azure', 'OpenStack' name = Column(String) provider_id = Column(Integer) - + def __str__(self): return "Controller({0}): name={1} provider_id={2}".format(self.id, self.name, self.provider_id) + class ControllerData(Base): """ DB object to store the key/value pairs for a controller. """ __tablename__ = 'controller_data' @@ -55,19 +64,24 @@ class ControllerData(Base): value = Column(String) def __str__(self): - return "ControllerData({0}): controller_id={1} name={2} value={3}".format(self.id, self.parent_id, self.name, self.value) + return "ControllerData({0}): controller_id={1} name={2} value={3}".format(self.id, self.parent_id, self.name, + self.value) + class WorkerGroup(Base): """ DB object for a MOLNS WorkerGroup. """ __tablename__ = 'worker_groups' id = Column(Integer, Sequence('worker_group_id_seq'), primary_key=True) - type = Column(String) #'EC2', 'Azure', 'OpenStack' + type = Column(String) # 'EC2', 'Azure', 'OpenStack' name = Column(String) provider_id = Column(Integer) controller_id = Column(Integer) - + def __str__(self): - return "WorkerGroup({0}): name={1} provider_id={2} controller_id={3}".format(self.id, self.name, self.provider_id, self.controller_id) + return "WorkerGroup({0}): name={1} provider_id={2} controller_id={3}".format(self.id, self.name, + self.provider_id, + self.controller_id) + class WorkerGroupData(Base): """ DB object to store the key/value pairs for a worker groups. """ @@ -78,36 +92,54 @@ class WorkerGroupData(Base): value = Column(String) def __str__(self): - return "WorkerGrouprData({0}): worker_group_id={1} name={2} value={3}".format(self.id, self.parent_id, self.name, self.value) + return "WorkerGrouprData({0}): worker_group_id={1} name={2} value={3}".format(self.id, self.parent_id, + self.name, self.value) class Instance(Base): """ DB object for a MOLNS VM instance. """ __tablename__ = 'instances' id = Column(Integer, Sequence('instance_id_seq'), primary_key=True) - type = Column(String) #'head-node' or 'worker' + type = Column(String) # 'head-node' or 'worker' controller_id = Column(Integer) worker_group_id = Column(Integer) provider_id = Column(Integer) ip_address = Column(String) provider_instance_identifier = Column(String) - + + def __str__(self): + return "Instance({0}): provider_instance_identifier={1} provider_id={2} controller_id={3} worker_group_id={4}".format( + self.id, self.provider_instance_identifier, self.provider_id, self.controller_id, self.worker_group_id) + + +class ExecJob(Base): + """ DB object for MOLNS exec jobs. """ + __tablename__ = 'jobs' + id = Column(Integer, Sequence('instance_id_seq'), primary_key=True) + controller_id = Column(Integer) + exec_str = Column(String) + jobID = Column(String) + date = Column(String) + def __str__(self): - return "Instance({0}): provider_instance_identifier={1} provider_id={2} controller_id={3} worker_group_id={4}".format(self.id, self.provider_instance_identifier, self.provider_id, self.controller_id, self.worker_group_id) + return "ExecJob({0}): jobID={1} controller_id={2}, exec_str={3}".format(self.id, self.jobID, self.controller_id, + self.exec_str) class DatastoreException(Exception): pass + ############################################################# HANDLE_MAPPING = { - 'Provider':(Provider,ProviderData), - 'Controller':(Controller,ControllerData), - 'WorkerGroup':(WorkerGroup,WorkerGroupData), + 'Provider': (Provider, ProviderData), + 'Controller': (Controller, ControllerData), + 'WorkerGroup': (WorkerGroup, WorkerGroupData), } -#from OpenStackProvider import OpenStackProvider, OpenStackController, OpenStackWorkerGroup -#from EC2Provider import EC2Provider, EC2Controller, EC2WorkerGroup + +# from OpenStackProvider import OpenStackProvider, OpenStackController, OpenStackWorkerGroup +# from EC2Provider import EC2Provider, EC2Controller, EC2WorkerGroup def dynamic_module_import(name): mod = __import__(name) @@ -116,26 +148,30 @@ def dynamic_module_import(name): mod = getattr(mod, comp) return mod + def get_provider_handle(kind, ptype): """ Return object of 'kind' (Provider, Controller or WokerGroup) for provider of type 'ptype'. Load the module if necessary. """ - #logging.debug("get_provider_handle(kind={0}, ptype={1})".format(kind, ptype)) + # logging.debug("get_provider_handle(kind={0}, ptype={1})".format(kind, ptype)) valid_handles = ['Provider', 'Controller', 'WorkerGroup'] if kind not in valid_handles: raise DatastoreException("Unknown kind {0}".format(kind)) if ptype not in VALID_PROVIDER_TYPES: - raise DatastoreException("Unknown {1} type {0}".format(ptype, kind)) + # raise DatastoreException("Unknown {1} type {0}".format(ptype, kind)) + return None cls_name = "{0}{1}".format(ptype, kind) pkg_name = "MolnsLib.{0}Provider".format(ptype) if pkg_name not in sys.modules: logging.debug("loading {0} from {1}".format(cls_name, pkg_name)) + # pkg = dynamic_module_import(pkg_name) pkg = dynamic_module_import(pkg_name) try: - #logging.debug("dir(pkg={0})={1}".format(pkg, dir(pkg))) + # logging.debug("dir(pkg={0})={1}".format(pkg, dir(pkg))) mod = getattr(pkg, cls_name) except AttributeError: raise DatastoreException("module {0} does not contain {1}".format(pkg_name, cls_name)) return mod + ############################################################# @@ -148,24 +184,25 @@ def __init__(self, db_file=None, config_dir=None): """ Constructor. """ if db_file is not None: self.engine = create_engine('sqlite:///{0}'.format(db_file)) + if config_dir is None: + self.config_dir = os.path.abspath(os.path.dirname(db_file)) elif config_dir is not None: if not os.path.exists(config_dir): os.makedirs(config_dir) self.engine = create_engine('sqlite:///{0}/{1}'.format(config_dir, self.MOLNS_DATASTORE)) + self.config_dir = config_dir else: if not os.path.exists(self.MOLNS_CONFIG_DIR): os.makedirs(self.MOLNS_CONFIG_DIR) self.engine = create_engine('sqlite:///{0}/{1}'.format(self.MOLNS_CONFIG_DIR, self.MOLNS_DATASTORE)) - Base.metadata.create_all(self.engine) # Create all the tables + Base.metadata.create_all(self.engine) # Create all the tables Session = sessionmaker(bind=self.engine) self.session = Session() - self.config_dir = config_dir def __del__(self): """ Destructor. """ self.session.commit() - def list_objects(self, kind): """ Get all the currently configured objects of kind (Provider, Controller, WorkerGroup). @@ -195,16 +232,16 @@ def create_object(self, ptype, name, kind, **kwargs): raise DatastoreException("{1} {0} already exists with type".format(name, kind, p.type)) p_handle = get_provider_handle(kind, ptype) - #logging.debug("create_object() {1}(name={0})".format(name, p_handle)) + # logging.debug("create_object() {1}(name={0})".format(name, p_handle)) p = p_handle(name=name, config_dir=self.config_dir) if 'provider_id' in kwargs: p.provider_id = kwargs['provider_id'] - #logging.debug("create_object() provider_id={0}".format(kwargs['provider_id'])) + # logging.debug("create_object() provider_id={0}".format(kwargs['provider_id'])) if 'controller_id' in kwargs: p.controller_id = kwargs['controller_id'] - #logging.debug("create_object() controller_id={0}".format(kwargs['controller_id'])) + # logging.debug("create_object() controller_id={0}".format(kwargs['controller_id'])) return p - + def delete_object(self, name, kind): """ Delete a objects of kind (Provider, Controller, WorkerGroup). @@ -223,7 +260,7 @@ def delete_object(self, name, kind): logging.debug("Deleting entry: {0}".format(p)) self.session.delete(p) self.session.commit() - + def get_object(self, name, kind): """ Get a config object of of kind (Provider, Controller, WorkerGroup). @@ -259,7 +296,7 @@ def get_object_by_id(self, id, kind): (handle, d_handle) = HANDLE_MAPPING[kind] p = self.session.query(handle).filter_by(id=id).first() if p is None: - raise DatastoreException("{0} {1} not found".format(kind, name)) + raise DatastoreException("{0} {1} not found".format(kind, id)) return self._get_object_data(d_handle, kind, p.type, p) def _get_object_data(self, d_handle, kind, ptype, p): @@ -269,16 +306,26 @@ def _get_object_data(self, d_handle, kind, ptype, p): data[d.name] = d.value p_handle = get_provider_handle(kind, ptype) - #logging.debug("{2}(name={0}, data={1})".format(name,data,p_handle)) + # logging.debug("{2}(name={0}, data={1})".format(name,data,p_handle)) + if p_handle is None: + return None ret = p_handle(name=p.name, config=data, config_dir=self.config_dir) ret.id = p.id ret.datastore = self if 'provider_id' in p.__dict__: - #logging.debug("_get_object_data(): provider_id={0}".format(p.provider_id)) - ret.provider = self.get_object_by_id(id=p.provider_id, kind='Provider') + # logging.debug("_get_object_data(): provider_id={0}".format(p.provider_id)) + try: + ret.provider = self.get_object_by_id(id=p.provider_id, kind='Provider') + except DatastoreException as e: + logging.debug('Error: provider {0} not found'.format(p.provider_id)) + ret.provider = None if 'controller_id' in p.__dict__: - #logging.debug("_get_object_data(): controller_id={0}".format(p.controller_id)) - ret.controller = self.get_object_by_id(id=p.controller_id, kind='Controller') + # logging.debug("_get_object_data(): controller_id={0}".format(p.controller_id)) + try: + ret.controller = self.get_object_by_id(id=p.controller_id, kind='Controller') + except DatastoreException as e: + logging.debug('Error: controller {0} not found'.format(p.controller_id)) + ret.controller = None return ret def save_object(self, config, kind): @@ -296,15 +343,16 @@ def save_object(self, config, kind): # Add new entry. p = handle(name=config.name, type=config.type) self.session.add(p) - #logging.debug("Created new DB entry: {0}".format(p)) - #print "save_object() config.__dict__={0}".format(config.__dict__) + # logging.debug("Created new DB entry: {0}".format(p)) + # print "save_object() config.__dict__={0}".format(config.__dict__) if 'provider_id' in config.__dict__: - logging.debug("provider_id is in config.__dict__ {0} {1}".format(config.provider_id, type(config.provider_id))) + logging.debug( + "provider_id is in config.__dict__ {0} {1}".format(config.provider_id, type(config.provider_id))) p.provider_id = config.provider_id if 'controller_id' in config.__dict__: logging.debug("controller_id is in config.__dict__ {0}".format(config.controller_id)) p.controller_id = config.controller_id - #logging.debug("Updated DB entry: {0}".format(p)) + # logging.debug("Updated DB entry: {0}".format(p)) self.session.commit() data = config.config.copy() @@ -314,33 +362,34 @@ def save_object(self, config, kind): d.value = data[d.name] del data[d.name] else: - #logging.debug("Deleting entry: {0}".format(d)) + # logging.debug("Deleting entry: {0}".format(d)) self.session.delete(d) for d in data.keys(): dd = d_handle(parent_id=p.id, name=d, value=data[d]) - #logging.debug("Created new entry: {0}".format(dd)) + # logging.debug("Created new entry: {0}".format(dd)) self.session.add(dd) self.session.commit() - def get_instance_by_id(self, id): """ Create or get the value for an instance. """ return self.session.query(Instance).filter_by(id=id).first() - - def get_instance(self, provider_instance_identifier, ip_address, provider_id=None, controller_id=None, worker_group_id=None): + + def get_instance(self, provider_instance_identifier, ip_address, provider_id=None, controller_id=None, + worker_group_id=None, provider_type=None): """ Create or get the value for an instance. """ p = self.session.query(Instance).filter_by(provider_instance_identifier=provider_instance_identifier).first() if p is None: - p = Instance(provider_instance_identifier=provider_instance_identifier, ip_address=ip_address, provider_id=provider_id, controller_id=controller_id, worker_group_id=worker_group_id) + p = Instance(provider_instance_identifier=provider_instance_identifier, ip_address=ip_address, + provider_id=provider_id, controller_id=controller_id, worker_group_id=worker_group_id) self.session.add(p) self.session.commit() - #logging.debug("Creating instance: {0}".format(p)) + # logging.debug("Creating instance: {0}".format(p)) else: - #logging.debug("Fetching instance: {0}".format(p)) + # logging.debug("Fetching instance: {0}".format(p)) pass return p - def get_controller_instances(self,controller_id=None): + def get_controller_instances(self, controller_id=None): logging.debug("get_controller_instances by controller_id={0}".format(controller_id)) ret = self.session.query(Instance).filter_by(controller_id=controller_id, worker_group_id=None).all() if ret is None: @@ -348,24 +397,24 @@ def get_controller_instances(self,controller_id=None): else: return ret - def get_worker_instances(self,controller_id=None): - #logging.debug("get_worker_instances by controller_id={0}".format(controller_id)) - ret = self.session.query(Instance).filter_by(controller_id=controller_id).filter(Instance.worker_group_id!=None).all() + def get_worker_instances(self, controller_id=None): + # logging.debug("get_worker_instances by controller_id={0}".format(controller_id)) + ret = self.session.query(Instance).filter_by(controller_id=controller_id).filter( + Instance.worker_group_id != None).all() if ret is None: return [] else: return ret - def get_all_instances(self, provider_id=None, controller_id=None, worker_group_id=None): if provider_id is not None: - #logging.debug("get_all_instances by provider_id={0}".format(provider_id)) + # logging.debug("get_all_instances by provider_id={0}".format(provider_id)) ret = self.session.query(Instance).filter_by(provider_id=provider_id).all() elif controller_id is not None: - #logging.debug("get_all_instances by controller_id={0}".format(controller_id)) + # logging.debug("get_all_instances by controller_id={0}".format(controller_id)) ret = self.session.query(Instance).filter_by(controller_id=controller_id).all() elif worker_group_id is not None: - #logging.debug("get_all_instances by worker_group_id={0}".format(worker_group_id)) + # logging.debug("get_all_instances by worker_group_id={0}".format(worker_group_id)) ret = self.session.query(Instance).filter_by(worker_group_id=worker_group_id).all() else: ret = self.session.query(Instance).all() @@ -376,9 +425,43 @@ def get_all_instances(self, provider_id=None, controller_id=None, worker_group_i def delete_instance(self, instance): """ Delete an instance. """ - #logging.debug("Deleting instance: {0}".format(instance)) + # logging.debug("Deleting instance: {0}".format(instance)) self.session.delete(instance) self.session.commit() + def get_all_jobs(self, controller_id=None): + if controller_id is not None: + # logging.debug("get_all_instances by controller_id={0}".format(controller_id)) + ret = self.session.query(ExecJob).filter_by(controller_id=controller_id).all() + else: + ret = self.session.query(ExecJob).all() + if ret is None: + return [] + else: + return ret + def get_job(self, jobID): + """ Get the objet for a job. """ + # logging.debug("get_job(jobID={0})".format(jobID)) + try: + id = int(jobID) + j = self.session.query(ExecJob).filter_by(id=id).first() + except Exception: + j = self.session.query(ExecJob).filter_by(jobID=jobID).first() + if j is None: + raise DatastoreException("Job {0} not found".format(jobID)) + return j + + def start_job(self, controller_id=None, exec_str=None): + """ Create the objet for a job. """ + date_str = str(datetime.datetime.now()) + jobID = str(uuid.uuid4()) + j = ExecJob(jobID=jobID, controller_id=controller_id, exec_str=exec_str, date=date_str) + self.session.add(j) + self.session.commit() + logging.debug("Creating ExecJob: {0}".format(j)) + return j + def delete_job(self, job): + self.session.delete(job) + self.session.commit() diff --git a/MolnsLib/molns_exec_helper.py b/MolnsLib/molns_exec_helper.py new file mode 100644 index 0000000..6f3a5b2 --- /dev/null +++ b/MolnsLib/molns_exec_helper.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python +import os +import subprocess +import shlex +import json +import traceback +import sys + + +def run_job(exec_str, stdout_file): + with open(stdout_file, 'w') as stdout_fh: + try: + p = subprocess.Popen( + shlex.split(exec_str), + stdout=stdout_fh, + stderr=stdout_fh, + ) + pid = p.pid + # create pid file + pid_file = ".molns/pid" + return_code_file = ".molns/return_value" + with open(pid_file, 'w+') as fd: + fd.write(str(pid)) + # Wait on program execution... + return_code = p.wait() + print "Return code:", return_code + if return_code_file is not None: + with open(return_code_file, 'w+') as fd: + fd.write(str(return_code)) + except Exception as e: + stdout_fh.write('Error: {}'.format(str(e))) + stdout_fh.write(traceback.format_exc()) + raise sys.exc_info()[1], None, sys.exc_info()[2] + + +if __name__ == "__main__": + with open(".molns/cmd",'r') as fd: + exec_str = fd.read() + print "exec_str", exec_str + run_job(exec_str, ".molns/stdout") diff --git a/MolnsLib/molns_landing_page.py b/MolnsLib/molns_landing_page.py new file mode 100644 index 0000000..0238f26 --- /dev/null +++ b/MolnsLib/molns_landing_page.py @@ -0,0 +1,93 @@ +from pipes import quote + +class MolnsLandingPage: + def __init__(self, port): + self.molns_landing_page = quote(""" + + +
+ +Please note that due to the self-signed certification, you will see a warning before you can view the page. Please accept the warning and proceed.
++
+Click here to Register MOLNs
+
+ MOLNs is open-source software, developed as part of the StochSS project, and we are relying on continued funding for sustained development. Please consider registering to show your support.
+
Advanced analysis with Python scientific libraries
+Large scale computational experiments made easy
+
+