Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 33 additions & 38 deletions poncho/src/poncho/library_network_code.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ def remote_wrapper(event):
# Handler to sigchld when child exits.
def sigchld_handler(signum, frame):
# write any byte to signal that there's at least 1 child
os.writev(w, [b"a"])
# if this fails, the notification mechanism has failed and the error will be raised normally
os.write(w, b"a")


# Read data from worker, start function, and dump result to `outfile`.
Expand Down Expand Up @@ -150,74 +151,58 @@ def start_function(in_pipe_fd, thread_limit=1):
raise

except Exception as e:
stdout_timed_message(
f"Library code: Function call failed due to {e}",
file=sys.stderr,
)
stdout_timed_message(f"Library code: Function call failed due to {e}")
sys.exit(1)
finally:
os.chdir(library_sandbox)
return -1, function_id
else:
elif exec_method == "fork":
try:
arg_infile = os.path.join(function_sandbox, "infile")
with open(arg_infile, "rb") as f:
event = cloudpickle.load(f)
except Exception:
stdout_timed_message(f"TASK {function_id} error: can't load the arguments from {arg_infile}")
return
return -1, function_id
p = os.fork()
if p == 0:
exit_status = 1

try:
# change the working directory to the function's sandbox
os.chdir(function_sandbox)

stdout_timed_message(f"TASK {function_id} {function_name} arrives, starting to run in process {os.getpid()}")

try:
exit_status = 1
except Exception:
stdout_timed_message(f"TASK {function_id} error: can't load the arguments from infile")
exit_status = 2
raise

try:
# setup stdout/err for a function call so we can capture them.
function_stdout_fd = os.open(
function_stdout_filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC
)
# store the library's stdout fd
library_fd = os.dup(sys.stdout.fileno())
# each child process independently redirects its own stdout/stderr.
with open(function_stdout_filename, "wb") as f:
os.dup2(f.fileno(), sys.stdout.fileno()) # redirect stdout
os.dup2(f.fileno(), sys.stderr.fileno()) # redirect stderr

# only redirect the stdout of a specific FunctionCall task into its own stdout fd,
# otherwise use the library's stdout
os.dup2(function_stdout_fd, sys.stdout.fileno())
os.dup2(function_stdout_fd, sys.stderr.fileno())
# once the file descriptors are redirected, we can start the function execution
stdout_timed_message(f"TASK {function_id} {function_name} starts in PID {os.getpid()}")
result = globals()[function_name](event)
stdout_timed_message(f"TASK {function_id} {function_name} finished")

# restore to the library's stdout fd on completion
os.dup2(library_fd, sys.stdout.fileno())
except Exception:
stdout_timed_message(f"TASK {function_id} error: can't execute this function")
exit_status = 3
stdout_timed_message(f"TASK {function_id} error: can't execute {function_name} due to {traceback.format_exc()}")
exit_status = 2
raise
finally:
if function_stdout_fd in locals():
os.close(function_stdout_fd)

try:
with open("outfile", "wb") as f:
cloudpickle.dump(result, f)
except Exception:
stdout_timed_message(f"TASK {function_id} error: can't load the result from outfile")
exit_status = 4
if os.path.exits("outfile"):
exit_status = 3
if os.path.exists("outfile"):
os.remove("outfile")
raise

try:
if not result["Success"]:
exit_status = 5
exit_status = 4
except Exception:
stdout_timed_message(f"TASK {function_id} error: the result is invalid")
exit_status = 5
Expand All @@ -232,13 +217,14 @@ def start_function(in_pipe_fd, thread_limit=1):
os._exit(exit_status)
elif p < 0:
stdout_timed_message(f"TASK {function_id} error: unable to fork to execute {function_name}")
return -1
return -1, function_id

# return pid and function id of child process to parent.
else:
return p, function_id

return -1
else:
stdout_timed_message(f"error: invalid execution method {exec_method}")
return -1, function_id


# Send result of a function execution to worker. Wake worker up to do work with SIGCHLD.
Expand Down Expand Up @@ -431,7 +417,16 @@ def main():
)
else:
pid, func_id = start_function(in_pipe_fd, thread_limit)
pid_to_func_id[pid] = func_id
# pid == -1 indicates a failure during fork/setup of the function execution
if pid == -1:
send_result(
out_pipe_fd,
args.worker_pid,
func_id,
1,
)
else:
pid_to_func_id[pid] = func_id
else:
# at least 1 child exits, reap all.
# read only once as os.read is blocking if there's nothing to read.
Expand Down