Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: user-scheduled tasks #120

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@
/docgen-md
/docgen-openrpc
/deps.json
/userschedule

extern/filecoin-ffi/rust/target
**/*.a
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -79,6 +79,11 @@ sptool: $(BUILD_DEPS)
.PHONY: sptool
BINS+=sptool

userschedule:
rm -f userschedule
$(GOCC) build $(GOFLAGS) -o userschedule ./cmd/userschedule
.PHONY: userschedule

ifeq ($(shell uname),Linux)

batchdep: build/.supraseal-install
2 changes: 2 additions & 0 deletions cmd/curio/tasks/tasks.go
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@ import (
"github.com/filecoin-project/curio/deps/config"
"github.com/filecoin-project/curio/harmony/harmonydb"
"github.com/filecoin-project/curio/harmony/harmonytask"
"github.com/filecoin-project/curio/harmony/taskhelp/usertaskmgt"
"github.com/filecoin-project/curio/lib/chainsched"
"github.com/filecoin-project/curio/lib/curiochain"
"github.com/filecoin-project/curio/lib/fastparamfetch"
@@ -208,6 +209,7 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task
// (we could have just appended to this list in the reverse order, but defining
// tasks in pipeline order is more intuitive)

usertaskmgt.WrapTasks(activeTasks, dependencies.Cfg.Subsystems.UserScheduler, dependencies.DB, dependencies.ListenAddr)
ht, err := harmonytask.New(db, activeTasks, dependencies.ListenAddr)
if err != nil {
return nil, err
100 changes: 100 additions & 0 deletions cmd/userschedule/userschedule.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// This is an example round-robin scheduler.
// It can be used by modifying Curio's base configuration Subsystems.UserSchedule
// to point to a machine named myscheduler with URL:
// http://myscheduler:7654/userschedule
// Be sure to open the selected port on the machine running this scheduler.
//
// Usage:
//
// Fork the repo from Github and clone it to your local machine,
// Edit this file as needed to implement your own scheduling logic,
// build with 'make userschedule' then run with ./userschedule
package main

import (
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"runtime/debug"
"runtime/pprof"
"sync"
"syscall"

"golang.org/x/xerrors"
)

const WorkerBusyTimeout = 60 // Seconds until Curio asks again for this task.
func sched(w http.ResponseWriter, r *http.Request) {
var input struct {
TaskID string `json:"task_id"`
TaskType string `json:"task_type"`
Workers []string `json:"workers"`
}
// Parse the request
if err := json.NewDecoder(r.Body).Decode(&input); err != nil {
OrHTTPFail(w, xerrors.Errorf("failed to parse request: %s", err))
}

// Scheduler Logic goes here
selectedWorker := roundRobin(input.TaskType, input.Workers)

// Respond to Curio
w.WriteHeader(http.StatusOK)
err := json.NewEncoder(w).Encode(struct {
Worker string `json:"worker"`
Timeout int `json:"timeout"`
}{selectedWorker, WorkerBusyTimeout})
if err != nil {
OrHTTPFail(w, err)
}
}

// ///////// Round Robin Scheduler ///////// //
var mx sync.Mutex
var m = make(map[string]int)

func roundRobin(taskType string, workers []string) string {
mx.Lock()
defer mx.Unlock()
selectedWorker := workers[m[taskType]%len(workers)]
m[taskType]++
return selectedWorker
}

// ///////////////////////////////////
// Everything below this line is boilerplate code.
// ///////////////////////////////////

func main() {
setupCloseHandler()
mux := http.NewServeMux()
mux.HandleFunc("/userschedule", func(w http.ResponseWriter, r *http.Request) {
defer func() { _ = recover() }()
sched(w, r)
})
fmt.Println(http.ListenAndServe(":7654", mux))
}

// Intentionally inlined dependencies to make it easy to copy-paste into your own codebase.
func OrHTTPFail(w http.ResponseWriter, err error) {
if err != nil {
w.WriteHeader(500)
_, _ = w.Write([]byte(err.Error()))
log.Printf("http fail. err %s, stack %s", err, string(debug.Stack()))
panic(1)
}
}

func setupCloseHandler() {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
<-c
fmt.Println("\r- Ctrl+C pressed in Terminal")
_ = pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
panic(1)
}()
}
39 changes: 39 additions & 0 deletions deps/config/doc_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 44 additions & 0 deletions deps/config/types.go
Original file line number Diff line number Diff line change
@@ -264,9 +264,53 @@ type CurioSubsystemsConfig struct {
// also be bounded by resources available on the machine.
SyntheticPoRepMaxTasks int

// UserScheduler allows for the user to schedule tasks on specific machines of their choice.
// This http endpoint gets a POST request with the following JSON body:
// {
// "task_id": "task_id",
// "task_type": "task_type",
// "workers": ["worker1", "worker2"]
// }
// And looks for a 200 response with the following JSON body:
// {
// "worker": "worker1"
// "timeout": 60
// }
// Timeout in seconds until it will be rescheduled.
UserScheduler []UserSchedule

// Batch Seal
EnableBatchSeal bool
}

// UserSchedule allows for the user to schedule a task on specific machines of their choice.
// This http endpoint gets a POST request with the following JSON body:
//
// {
// "task_id": "task_id",
// "task_type": "task_type",
// "workers": ["worker1", "worker2"]
// }
//
// And looks for a 200 response with the following JSON body:
//
// {
// "worker": "worker1"
// "timeout": 60
// }
//
// Timeout in seconds until it will be rescheduled.
type UserSchedule struct {
// TaskName as listed in the GUI. Ex: SDR
TaskName string

// URL to http(s) user scheduler
URL string

// HaltOnSchedulerDown - If true, the tasks will not run when the URL response is not usable.
// The False value is recommended to keep scheduling working even if the UserScheduler service is down.
HaltOnSchedulerDown bool
}
type CurioFees struct {
DefaultMaxFee types.FIL
MaxPreCommitGasFee types.FIL
7 changes: 7 additions & 0 deletions harmony/harmonydb/sql/20240724-user_sched.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE TABLE harmony_task_user (
task_id INTEGER PRIMARY KEY,
owner TEXT NOT NULL,
expiration INTEGER NOT NULL,
ignore_userscheduler BOOLEAN NOT NULL DEFAULT FALSE,
FOREIGN KEY (task_id) REFERENCES harmony_task (id) ON DELETE CASCADE
);
27 changes: 16 additions & 11 deletions harmony/harmonytask/task_type_handler.go
Original file line number Diff line number Diff line change
@@ -212,13 +212,15 @@ canAcceptAgain:
}
return owner == h.TaskEngine.ownerID
})
if doErr != nil {
if doErr != nil && doErr != ErrReturnToPoolPlease {
log.Errorw("Do() returned error", "type", h.Name, "id", strconv.Itoa(int(*tID)), "error", doErr)
}
}()
return true
}

var ErrReturnToPoolPlease = errors.New("return to pool")

func (h *taskTypeHandler) recordCompletion(tID TaskID, workStart time.Time, done bool, doErr error) {
workEnd := time.Now()
retryWait := time.Millisecond * 100
@@ -246,15 +248,9 @@ func (h *taskTypeHandler) recordCompletion(tID TaskID, workStart time.Time, done

retryRecordCompletion:
cm, err := h.TaskEngine.db.BeginTransaction(h.TaskEngine.ctx, func(tx *harmonydb.Tx) (bool, error) {
var postedTime time.Time
err := tx.QueryRow(`SELECT posted_time FROM harmony_task WHERE id=$1`, tID).Scan(&postedTime)

if err != nil {
return false, fmt.Errorf("could not log completion: %w ", err)
}
result := "unspecified error"
if done {
_, err = tx.Exec("DELETE FROM harmony_task WHERE id=$1", tID)
_, err := tx.Exec("DELETE FROM harmony_task WHERE id=$1", tID)
if err != nil {

return false, fmt.Errorf("could not log completion: %w", err)
@@ -268,9 +264,9 @@ retryRecordCompletion:
result = "error: " + doErr.Error()
}
var deleteTask bool
if h.MaxFailures > 0 {
if doErr != ErrReturnToPoolPlease && h.MaxFailures > 0 {
ct := uint(0)
err = tx.QueryRow(`SELECT count(*) FROM harmony_task_history
err := tx.QueryRow(`SELECT count(*) FROM harmony_task_history
WHERE task_id=$1 AND result=FALSE`, tID).Scan(&ct)
if err != nil {
return false, fmt.Errorf("could not read task history: %w", err)
@@ -280,7 +276,7 @@ retryRecordCompletion:
}
}
if deleteTask {
_, err = tx.Exec("DELETE FROM harmony_task WHERE id=$1", tID)
_, err := tx.Exec("DELETE FROM harmony_task WHERE id=$1", tID)
if err != nil {
return false, fmt.Errorf("could not delete failed job: %w", err)
}
@@ -292,6 +288,15 @@ retryRecordCompletion:
}
}
}
if doErr == ErrReturnToPoolPlease {
return true, nil
}
var postedTime time.Time
err := tx.QueryRow(`SELECT posted_time FROM harmony_task WHERE id=$1`, tID).Scan(&postedTime)

if err != nil {
return false, fmt.Errorf("could not log completion: %w ", err)
}
_, err = tx.Exec(`INSERT INTO harmony_task_history
(task_id, name, posted, work_start, work_end, result, completed_by_host_and_port, err)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`, tID, h.Name, postedTime.UTC(), workStart.UTC(), workEnd.UTC(), done, h.TaskEngine.hostAndPort, result)
170 changes: 170 additions & 0 deletions harmony/taskhelp/usertaskmgt/usertaskmgt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
Package usertaskmgt provides a way to wrap tasks with a URL that can be called to assign the task to a worker.
Timeline
- UrlTask accepts everything
- once accepted, UrlTask.Do() finds who should own the task and updates the DB:
- harmony_task_user.owner_id & expiration_time
- harmony_task releases the task (without err)
- The poller will see the task & call CanAccept()
- CanAccept() will see the owner_id and call the deeper canaccept() if it's us.
- If it's not us, check the expiration time and release the task by deleting the row.
- The task will be done by the worker who was told to do it, or eventually reassigned.
Pitfalls:
- If the user's URL is down, the task will be stuck in the DB.
- Turnaround time is slowed by the additional trip through the poller.
- Full task resources are claimed by the URL runner, so the task needs a full capacity.
*/
package usertaskmgt

import (
"bytes"
"context"
"encoding/json"
"net/http"
"net/url"
"strconv"
"strings"
"time"

logging "github.com/ipfs/go-log/v2"
"github.com/samber/lo"
"golang.org/x/xerrors"

"github.com/filecoin-project/curio/deps/config"
"github.com/filecoin-project/curio/harmony/harmonydb"
"github.com/filecoin-project/curio/harmony/harmonytask"
)

var log = logging.Logger("userTaskMgt")

func WrapTasks(tasks []harmonytask.TaskInterface, UserScheduler []config.UserSchedule, db *harmonydb.DB, hostAndPort string) {
m := lo.SliceToMap(UserScheduler, func(s config.UserSchedule) (string, *config.UserSchedule) {
_, err := url.Parse(s.URL)
if err != nil {
log.Errorf("Invalid UserScheduleUrl: %s. Expected: taskName,url", s)
return "", nil
}
return s.TaskName, &s
})
for i, task := range tasks {
if s, ok := m[task.TypeDetails().Name]; ok {
tasks[i] = &UrlTask{
TaskInterface: task,
UserScheduleUrl: s.URL,
name: task.TypeDetails().Name,
db: db,
hostAndPort: hostAndPort,
haltOnSchedulerDown: s.HaltOnSchedulerDown,
}
}
}
}

type UrlTask struct {
harmonytask.TaskInterface
db *harmonydb.DB
UserScheduleUrl string
name string
hostAndPort string
haltOnSchedulerDown bool
}

// CanAccept should accept all IF no harmony_task_user row exists, ELSE
// if us, try CanAccept() until expiration hits.
func (t *UrlTask) CanAccept(tids []harmonytask.TaskID, te *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
id := tids[0]
var owner string
var expiration int64
var ignoreUserScheduler bool
err := t.db.QueryRow(context.Background(), `SELECT
COALESCE(owner,''),
COALESCE(expiration, 0),
COALESCE(ignore_userscheduler,false)
from harmony_task_user WHERE task_id=$1`, id).Scan(&owner, &expiration, &ignoreUserScheduler)
if err != nil {
return nil, xerrors.Errorf("could not get owner: %w", err)
}
if owner != "" {
if owner == t.hostAndPort || ignoreUserScheduler {
return t.TaskInterface.CanAccept(tids, te)
}
if expiration < time.Now().Unix() {
_, err = t.db.Exec(context.Background(), `DELETE FROM harmony_task_user WHERE task_id=$1`, id)
if err != nil {
return nil, xerrors.Errorf("could not delete from harmony_task_user: %w", err)
}
}
}
return &id, nil
}

var client = &http.Client{Timeout: time.Second * 10}

func (t *UrlTask) Do(id harmonytask.TaskID, stillMe func() bool) (b bool, err error) {
defer func() {
if err != harmonytask.ErrReturnToPoolPlease && !t.haltOnSchedulerDown {
log.Error("Proceeding without user scheduler service running (as configured)")
log.Error(err)
_, err = t.db.Exec(context.Background(),
`INSERT INTO harmony_task_user (task_id, owner, expiration, ignore_userscheduler)
VALUES ($1, '-', 0, true)`, id)
if err != nil {
log.Error("Could not insert into harmony_task_user: ", err)
return
}
err = harmonytask.ErrReturnToPoolPlease
}
}()
var owner string
err = t.db.QueryRow(context.Background(), `SELECT COALESCE(owner,'') FROM harmony_task_user WHERE task_id=$1`, id).Scan(&owner)
if err != nil {
return false, xerrors.Errorf("could not get owner: %w", err)
}
if owner == t.hostAndPort {
return t.TaskInterface.Do(id, stillMe)
}
var workerList []string
err = t.db.Select(context.Background(), &workerList, `SELECT host_and_port
FROM harmony_machines m JOIN harmony_machine_details d ON d.machine_id=m.id
WHERE tasks LIKE $1`, "%,"+t.name+",%")
if err != nil {
return false, xerrors.Errorf("could not get worker list: %w", err)
}

resp, err := client.Post(t.UserScheduleUrl, "application/json", bytes.NewReader([]byte(`
{
"task_type": "`+t.name+`",
"task_id": `+strconv.Itoa(int(id))+`,
"workers": [`+strings.Join(workerList, ",")+`],
}
`)))
if err != nil {
return false, xerrors.Errorf("could not call user defined URL: %w", err)
}
if resp.StatusCode != http.StatusOK {
return false, xerrors.Errorf("User defined URL returned non-200 status code: %d", resp.StatusCode)
}
var respData struct {
Worker string
Timeout int
}
defer resp.Body.Close()
err = json.NewDecoder(resp.Body).Decode(&respData)
if err != nil {
return false, xerrors.Errorf("could not decode user defined URL response: %w", err)
}

// If it's us, we cannot shortcut because we don't have CanAccept's 2nd arg.

expires := time.Now().Add(time.Second * time.Duration(respData.Timeout))
_, err = t.db.Exec(context.Background(), `INSERT INTO harmony_task_user (task_id, owner, expiration) VALUES ($1,$2)`, id, respData.Worker, expires)
if err != nil {
return false, xerrors.Errorf("could not insert into harmony_task_user: %w", err)
}

return false, harmonytask.ErrReturnToPoolPlease
}
84 changes: 84 additions & 0 deletions itests/userschedule_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package itests

import (
"os/exec"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/filecoin-project/curio/deps/config"
"github.com/filecoin-project/curio/harmony/harmonydb"
"github.com/filecoin-project/curio/harmony/harmonytask"
"github.com/filecoin-project/curio/harmony/resources"
"github.com/filecoin-project/curio/harmony/taskhelp/usertaskmgt"
)

func TestUserTaskMgt(t *testing.T) {
gopath, err := exec.LookPath("go")
require.NoError(t, err)
require.NoError(t, exec.Command(gopath, "run", "cmd/userschedule").Run()) // round-robin scheduler

db := dbSetup(t)
harmonytask.POLL_DURATION = 200 * time.Millisecond
output := ""
var instances []*ut
for a := 0; a < 3; a++ { // make 3 "machines"
inst := &ut{db: db, f: func() { output += strconv.Itoa(a) }}
instances = append(instances, inst)

host := "foo:" + strconv.Itoa(a)
tasks := []harmonytask.TaskInterface{inst}
usertaskmgt.WrapTasks(tasks, []config.UserSchedule{
{TaskName: "foo", URL: "http://localhost:7654"},
}, db, host)
_, err := harmonytask.New(db, tasks, host)
require.NoError(t, err)
}

for a := 0; a < 5; a++ { // schedule 5 tasks
instances[0].myAddTask(func(tID harmonytask.TaskID, tx *harmonydb.Tx) (bool, error) {
return true, nil
})
}
require.Equal(t, "01201", output)
}

type ut struct {
db *harmonydb.DB
f func()
myAddTask harmonytask.AddTaskFunc
}

func (u *ut) TypeDetails() harmonytask.TaskTypeDetails {
return harmonytask.TaskTypeDetails{
Name: "foo",
Cost: resources.Resources{},
}
}
func (u *ut) CanAccept(tids []harmonytask.TaskID, te *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
return &tids[0], nil
}
func (u *ut) Adder(f harmonytask.AddTaskFunc) {
u.myAddTask = f
}
func (u *ut) Do(tID harmonytask.TaskID, _ func() bool) (bool, error) {
u.f()
time.Sleep(time.Second) // so there's no chance that a later task will finish first.
return true, nil
}

func dbSetup(t *testing.T) *harmonydb.DB {
sharedITestID := harmonydb.ITestNewID()
dbConfig := config.HarmonyDB{
Hosts: []string{envElse("CURIO_HARMONYDB_HOSTS", "127.0.0.1")},
Database: "yugabyte",
Username: "yugabyte",
Password: "yugabyte",
Port: "5433",
}
db, err := harmonydb.NewFromConfigWithITestID(t, dbConfig, sharedITestID)
require.NoError(t, err)
return db
}