diff --git a/.gitignore b/.gitignore index daa4ce1..46b63ec 100644 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,4 @@ *.pb.cc .* /script/van* +Makefile.config diff --git a/Makefile b/Makefile index 8db84a0..0923179 100644 --- a/Makefile +++ b/Makefile @@ -1,21 +1,26 @@ + +CONFIG_FILE := Makefile.config +include $(CONFIG_FILE) + CC = g++ -# OPT = -O0 -ggdb +# OPT = -O0 -ggdb -DDEBUG OPT = -O3 -ggdb -THIRD_PATH=$(shell pwd)/third_party +THIRD_PATH=$(shell pwd -L )/third_party + STATIC_THIRD_LIB=0 ifeq ($(STATIC_THIRD_LIB), 1) THIRD_LIB=$(addprefix $(THIRD_PATH)/lib/, libgflags.a libzmq.a libprotobuf.a libglog.a libz.a libsnappy.a) else -THIRD_LIB=-L$(THIRD_PATH)/lib -lgflags -lzmq -lprotobuf -lglog -lz -lsnappy +THIRD_LIB=-L$(THIRD_PATH)/lib -lgflags -lzmq -lprotobuf -lglog -lz -lsnappy -L$(CUDA_PATH)/lib64 -lcudart endif # THIRD_LIB+=-ltcmalloc_and_profiler WARN = -Wall -Wno-unused-function -finline-functions -Wno-sign-compare #-Wconversion -INCPATH = -I./src -I$(THIRD_PATH)/include +INCPATH = -I./src -I$(THIRD_PATH)/include -I/usr/include/eigen3 -I$(CAFFE_PATH)/include -I$(CAFFE_PATH)/build/src -I$(CUDA_PATH)/include CFLAGS = -std=c++0x $(WARN) $(OPT) $(INCPATH) -LDFLAGS += $(THIRD_LIB) -lpthread -lrt +LDFLAGS += $(THIRD_LIB) -lboost_thread -lboost_system -lpthread -lrt -lcaffe -L$(CAFFE_PATH)/build/lib -Wl,-rpath=$(CAFFE_PATH)/build/lib -Wl,-rpath=$(THIRD_PATH)/lib PS_LIB = build/libps.a PS_MAIN = build/libpsmain.a @@ -30,6 +35,20 @@ app: build/ps build/hello: build/app/hello_world/main.o $(PS_LIB) $(PS_MAIN) $(CC) $(CFLAGS) $^ $(LDFLAGS) -o $@ +build/caffe: build/app/caffe/caffe_main.o $(PS_LIB) + $(CC) $(CFLAGS) $^ $(LDFLAGS) -o $@ + +build/caffe_sync: build/app/caffe/caffe_synced.o $(PS_LIB) + $(CC) $(CFLAGS) $^ $(LDFLAGS) -o $@ + +build/caffe_share: build/app/caffe/caffe_share.o $(PS_LIB) + $(CC) $(CFLAGS) $^ $(LDFLAGS) -o $@ + +build/caffe_async_share: build/app/caffe/caffe_async_share.o $(PS_LIB) + $(CC) $(CFLAGS) $^ $(LDFLAGS) -o $@ + +caffe_all: build/caffe build/caffe_sync build/caffe_share build/caffe_async_share + sys_srcs = $(wildcard src/util/*.cc) $(wildcard src/data/*.cc) \ $(wildcard src/system/*.cc) $(wildcard src/filter/*.cc) sys_protos = $(wildcard src/*/proto/*.proto) diff --git a/Makefile.config.example b/Makefile.config.example new file mode 100644 index 0000000..cd88d8c --- /dev/null +++ b/Makefile.config.example @@ -0,0 +1,8 @@ +## + +# CUDA path for include +CUDA_PATH := /usr/local/cuda + +# caffe source path +# need to build caffe successfully +CAFFE_PATH := /data/ML/caffe/caffe diff --git a/conf/caffe.as.conf b/conf/caffe.as.conf new file mode 100644 index 0000000..e4ba0ef --- /dev/null +++ b/conf/caffe.as.conf @@ -0,0 +1,17 @@ +# async shared +PS_PATH /home/immars/work/ML/distributed/parameter_server/build/caffe_async_share +PUSH 4 +PULL 8 +SCHEDULER 192.168.1.108 + +SERVER 192.168.1.108 /home/immars/work/ML/caffe/caffe/models/bvlc_googlenet/ML solver.maxmap.prototxt 0 snapshot/bvlc_maxmap_iter_1210000.solverstate +# SERVER 192.168.1.108 /home/immars/work/ML/caffe/caffe/models/bvlc_googlenet/ML solver.maxmap.prototxt 0 +WORKER 192.168.1.110 /home/immars/work/ML/caffe/caffe/models/bvlc_googlenet/ML solver.maxmap.prototxt 0 W0,W1,W2,W3 +# WORKER 192.168.1.110 /home/immars/work/ML/caffe/caffe/models/bvlc_googlenet/ML/W1 solver.maxmap.prototxt 1 +# WORKER 192.168.1.110 /home/immars/work/ML/caffe/caffe/models/bvlc_googlenet/ML/W2 solver.maxmap.prototxt 2 +# WORKER 192.168.1.110 /home/immars/work/ML/caffe/caffe/models/bvlc_googlenet/ML/W3 solver.maxmap.prototxt 3 +WORKER 192.168.1.112 /home/immars/work/ML/caffe/caffe/models/bvlc_googlenet/ML solver.maxmap.prototxt 0 W0,W1,W2,W3 +# WORKER 192.168.1.112 /home/immars/work/ML/caffe/caffe/models/bvlc_googlenet/ML/W1 solver.maxmap.prototxt 1 +# WORKER 192.168.1.112 /home/immars/work/ML/caffe/caffe/models/bvlc_googlenet/ML/W2 solver.maxmap.prototxt 2 +# WORKER 192.168.1.112 /home/immars/work/ML/caffe/caffe/models/bvlc_googlenet/ML/W3 solver.maxmap.prototxt 3 + diff --git a/conf/caffe.conf b/conf/caffe.conf new file mode 100644 index 0000000..487d4b6 --- /dev/null +++ b/conf/caffe.conf @@ -0,0 +1,6 @@ +PS_PATH /home/immars/work/ML/distributed/parameter_server/build/caffe +SCHEDULER 192.168.1.108 + +SERVER 192.168.1.108 /home/immars/work/ML/caffe/caffe/models/bvlc_reference_caffenet/S0 solver.prototxt -1 +WORKER 192.168.1.110 /home/immars/work/ML/caffe/caffe/models/bvlc_reference_caffenet/W0 solver.prototxt 0 +WORKER 192.168.1.110 /home/immars/work/ML/caffe/caffe/models/bvlc_reference_caffenet/W1 solver.prototxt 1 diff --git a/conf/caffe.googlenet.conf b/conf/caffe.googlenet.conf new file mode 100644 index 0000000..3e23438 --- /dev/null +++ b/conf/caffe.googlenet.conf @@ -0,0 +1,16 @@ +PS_PATH /home/immars/work/ML/distributed/parameter_server/build/caffe_sync +PUSH 8 +PULL 8 +SCHEDULER 192.168.1.108 + +# SERVER 192.168.1.108 /home/immars/work/ML/caffe/caffe/models/bvlc_googlenet/ML solver.adadelta.prototxt -1 snapshot/bvlc_googlenet_iter_220000.solverstate +SERVER 192.168.1.108 /home/immars/work/ML/caffe/caffe/models/bvlc_googlenet/ML solver.adadelta.s.prototxt -1 snapshot/bvlc_googlenet_iter_140000.solverstate +WORKER 192.168.1.110 /home/immars/work/ML/caffe/caffe/models/bvlc_googlenet/ML/W0 solver.adadelta.c.prototxt 0 +WORKER 192.168.1.110 /home/immars/work/ML/caffe/caffe/models/bvlc_googlenet/ML/W1 solver.adadelta.c.prototxt 1 +WORKER 192.168.1.110 /home/immars/work/ML/caffe/caffe/models/bvlc_googlenet/ML/W2 solver.adadelta.c.prototxt 2 +WORKER 192.168.1.110 /home/immars/work/ML/caffe/caffe/models/bvlc_googlenet/ML/W3 solver.adadelta.c.prototxt 3 +WORKER 192.168.1.112 /home/immars/work/ML/caffe/caffe/models/bvlc_googlenet/ML/W0 solver.adadelta.c.prototxt 0 +WORKER 192.168.1.112 /home/immars/work/ML/caffe/caffe/models/bvlc_googlenet/ML/W1 solver.adadelta.c.prototxt 1 +WORKER 192.168.1.112 /home/immars/work/ML/caffe/caffe/models/bvlc_googlenet/ML/W2 solver.adadelta.c.prototxt 2 +WORKER 192.168.1.112 /home/immars/work/ML/caffe/caffe/models/bvlc_googlenet/ML/W3 solver.adadelta.c.prototxt 3 + diff --git a/conf/caffe.maxmap.conf b/conf/caffe.maxmap.conf new file mode 100644 index 0000000..7a4038d --- /dev/null +++ b/conf/caffe.maxmap.conf @@ -0,0 +1,15 @@ +PS_PATH /home/immars/work/ML/distributed/parameter_server/build/caffe_sync +PUSH 8 +PULL 8 +SCHEDULER 192.168.1.108 + +SERVER 192.168.1.108 /home/immars/work/ML/caffe/caffe/models/bvlc_googlenet/ML solver.maxmap.prototxt -1 +WORKER 192.168.1.110 /home/immars/work/ML/caffe/caffe/models/bvlc_googlenet/ML/W0 solver.maxmap.prototxt 0 +# WORKER 192.168.1.110 /home/immars/work/ML/caffe/caffe/models/bvlc_googlenet/ML/W1 solver.maxmap.prototxt 1 +# WORKER 192.168.1.110 /home/immars/work/ML/caffe/caffe/models/bvlc_googlenet/ML/W2 solver.maxmap.prototxt 2 +# WORKER 192.168.1.110 /home/immars/work/ML/caffe/caffe/models/bvlc_googlenet/ML/W3 solver.maxmap.prototxt 3 +# WORKER 192.168.1.112 /home/immars/work/ML/caffe/caffe/models/bvlc_googlenet/ML/W0 solver.maxmap.prototxt 0 +# WORKER 192.168.1.112 /home/immars/work/ML/caffe/caffe/models/bvlc_googlenet/ML/W1 solver.maxmap.prototxt 1 +# WORKER 192.168.1.112 /home/immars/work/ML/caffe/caffe/models/bvlc_googlenet/ML/W2 solver.maxmap.prototxt 2 +# WORKER 192.168.1.112 /home/immars/work/ML/caffe/caffe/models/bvlc_googlenet/ML/W3 solver.maxmap.prototxt 3 + diff --git a/conf/caffe.share.conf b/conf/caffe.share.conf new file mode 100644 index 0000000..562f9cc --- /dev/null +++ b/conf/caffe.share.conf @@ -0,0 +1,15 @@ +PS_PATH /home/immars/work/ML/distributed/parameter_server/build/caffe_share +PUSH 8 +PULL 8 +SCHEDULER 192.168.1.108 + +SERVER 192.168.1.108 /home/immars/work/ML/caffe/caffe/models/bvlc_googlenet/ML solver.maxmap.prototxt -1 +WORKER 192.168.1.110 /home/immars/work/ML/caffe/caffe/models/bvlc_googlenet/ML solver.maxmap.prototxt -1 W0,W1,W2,W3 +# WORKER 192.168.1.110 /home/immars/work/ML/caffe/caffe/models/bvlc_googlenet/ML/W1 solver.maxmap.prototxt 1 +# WORKER 192.168.1.110 /home/immars/work/ML/caffe/caffe/models/bvlc_googlenet/ML/W2 solver.maxmap.prototxt 2 +# WORKER 192.168.1.110 /home/immars/work/ML/caffe/caffe/models/bvlc_googlenet/ML/W3 solver.maxmap.prototxt 3 +WORKER 192.168.1.112 /home/immars/work/ML/caffe/caffe/models/bvlc_googlenet/ML solver.maxmap.prototxt 0 W0,W1,W2,W3 +# WORKER 192.168.1.112 /home/immars/work/ML/caffe/caffe/models/bvlc_googlenet/ML/W1 solver.maxmap.prototxt 1 +# WORKER 192.168.1.112 /home/immars/work/ML/caffe/caffe/models/bvlc_googlenet/ML/W2 solver.maxmap.prototxt 2 +# WORKER 192.168.1.112 /home/immars/work/ML/caffe/caffe/models/bvlc_googlenet/ML/W3 solver.maxmap.prototxt 3 + diff --git a/script/caffe_kill_lan.sh b/script/caffe_kill_lan.sh new file mode 100755 index 0000000..d885c51 --- /dev/null +++ b/script/caffe_kill_lan.sh @@ -0,0 +1,29 @@ +#!/bin/bash +# set -x +# export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:../third_party/lib + +if [ $# -lt 1 ]; then + echo "usage: ./caffe_kill_lan.sh conf_path" + echo "solver.prototxt resides in subdirectories[S0,S1,...,W0,W1,...] of root_dir" + exit -1; +fi + +conf=$1 +tmp=$( mktemp ) +grep -v ^# $conf > $tmp +conf=$tmp +app=$( grep PS_PATH $conf | awk -F'/' '{print $NF;}' ) +echo "app: $app" +echo "kill local" + +killall caffe || killall $app + +echo "kill servers" +# kill servers +grep -E "WORKER|SERVER" $conf | awk '{print $2;}' | sort | uniq | awk -v q="'" -v app="$app" ' +{ + cmd="ssh immars@" $0 " \"killall caffe || killall " app " \" "; + print cmd; + system(cmd); +} +' diff --git a/script/caffe_lan.sh b/script/caffe_lan.sh new file mode 100755 index 0000000..66bfb95 --- /dev/null +++ b/script/caffe_lan.sh @@ -0,0 +1,62 @@ +#!/bin/bash +# set -x +# export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:../third_party/lib +if [ $# -lt 1 ]; then + echo "usage: ./local.sh num_servers num_workers root_dir solver.prototxt [args..]" + echo "solver.prototxt resides in subdirectories[S0,S1,...,W0,W1,...] of root_dir" + exit -1; +fi + +conf=$1 +tmp=$( mktemp ) +grep -v ^# $conf > $tmp +conf=$tmp + +bin=$( grep PS_PATH $conf | awk '{print $2;}' ) +sch_ip=$( grep SCHEDULER $conf | awk '{print $2;}' ) +num_servers=$( grep SERVER $conf | wc -l ) +num_workers=$( grep WORKER $conf | wc -l ) +pullstep=$( grep PULL $conf | awk '{print $2;}' ) +pushstep=$( grep PUSH $conf | awk '{print $2;}' ) + +arg="-num_servers ${num_servers} -num_workers ${num_workers} $@" #" -app ${dir}/$@" + +echo "$bin $conf $sch_ip $num_servers $num_workers" +killall -q caffe + +silence=">/dev/null 2>/dev/null" + + +# start the scheduler +Sch="role:SCHEDULER,hostname:'$sch_ip',port:8001,id:'H'" +${bin} -my_node ${Sch} -scheduler ${Sch} ${arg} >/dev/null 2>/dev/null & + +# start servers +grep SERVER $conf | awk -v bin=$bin -v sch=$Sch -v nums=$num_servers -v numw=$num_workers -vpullstep=$pullstep -vpushstep=$pushstep -v q="'" ' +BEGIN{port=9600;id=0;} +{ + ip=$2;wd=$3;solver=$4;gpu=$5;snapshot=$6; + if(""!=snapshot){ + snapshot= " --snapshot=" snapshot; + } + cmd="ssh -f -n immars@" ip " \"source /etc/profile && cd " wd " && nohup " bin " -num_servers " nums " -num_workers " numw " -my_node \\\"role:SERVER,hostname:" q ip q ",port:" port ",id:" q "S" id q "\\\" -scheduler \\\"" sch "\\\" --solver=" solver " --pullstep=" pullstep " --pushstep=" pushstep " --gpu=" gpu " " snapshot " >" wd "/stdout.txt 2>&1 < /dev/null &\" "; + print cmd; + system(cmd); + port=port+1;id=id+1; +} +' + +grep WORKER $conf | awk -v bin=$bin -v sch=$Sch -v nums=$num_servers -v numw=$num_workers -vpullstep=$pullstep -vpushstep=$pushstep -v q="'" ' +BEGIN{port=9500;id=0;} +{ + ip=$2;wd=$3;solver=$4;gpu=$5;workers=$6; + if(""!=workers){ + workers = " --workers=" workers; + } + cmd="ssh -f -n immars@" ip " \"source /etc/profile && cd " wd " && nohup " bin " -num_servers " nums " -num_workers " numw " -my_node \\\"role:WORKER,hostname:" q ip q ",port:" port ",id:" q "W" id q "\\\" -scheduler \\\"" sch "\\\" --solver=" solver " --pullstep=" pullstep " --pushstep=" pushstep " --synced=true --gpu=" gpu " " workers " >" wd "/stdout.txt 2>&1 < /dev/null &\" "; + print cmd; + system(cmd); + port=port+1;id=id+1; +} +' + diff --git a/script/caffe_local.sh b/script/caffe_local.sh new file mode 100755 index 0000000..e748113 --- /dev/null +++ b/script/caffe_local.sh @@ -0,0 +1,54 @@ +#!/bin/bash +# set -x +# export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:../third_party/lib +if [ $# -lt 3 ]; then + echo "usage: ./local.sh num_servers num_workers root_dir solver.prototxt [args..]" + echo "solver.prototxt resides in subdirectories[S0,S1,...,W0,W1,...] of root_dir" + exit -1; +fi + +bin=$( pwd -L )/build/caffe_sync +num_servers=$1 +shift +num_workers=$1 +shift +root_dir=$1 +shift +solver=$1 +shift +arg="-num_servers ${num_servers} -num_workers ${num_workers} $@" #" -app ${dir}/$@" + + +# killall -q $(basename ${bin}) +killall -q ${bin} +sleep 1 + +silence=">/dev/null 2>/dev/null" + + +# start the scheduler +Sch="role:SCHEDULER,hostname:'127.0.0.1',port:8001,id:'H'" +${bin} -my_node ${Sch} -scheduler ${Sch} ${arg} & + +# start servers +for ((i=0; i<${num_servers}; ++i)); do + port=$((9600 + ${i})) + id=S${i} + N="role:SERVER,hostname:'127.0.0.1',port:${port},id:'${id}'" + # HEAPPROFILE=/tmp/S${i} \ + # CPUPROFILE=/tmp/S${i} \ + echo "cd $root_dir/$id/ && ${bin} -my_node ${N} -scheduler ${Sch} --solver=$solver ${arg} >$root_dir/$id/stdout.txt 2>&1 &" + cd $root_dir/$id/ && ${bin} -my_node ${N} -scheduler ${Sch} --pullstep=2 --pushstep=2 --solver=$solver ${arg} >$root_dir/$id/stdout.txt 2>&1 & +done + +# start workers +for ((i=0; i<${num_workers}; ++i)); do + port=$((9500 + ${i})) + id=W${i} + N="role:WORKER,hostname:'127.0.0.1',port:${port},id:'${id}'" + # HEAPPROFILE=/tmp/W${i} \ + # CPUPROFILE=/tmp/W${i} \ + cd $root_dir/$id/ && ${bin} -my_node ${N} -scheduler ${Sch} --pullstep=2 --pushstep=2 --solver=$solver ${arg} >$root_dir/$id/stdout.txt 2>&1 & +done + +wait diff --git a/src/app/caffe/caffe_async_share.cc b/src/app/caffe/caffe_async_share.cc new file mode 100644 index 0000000..88b26ce --- /dev/null +++ b/src/app/caffe/caffe_async_share.cc @@ -0,0 +1,1011 @@ +#include +#include +#include +#include +#include +#include + +#include +#include +#include "ps.h" +#include "system/app.h" +#include "parameter/v_vector.h" +#include "parameter/kv_vector.h" +#include "caffe/caffe.hpp" +#include "caffe/common.hpp" +#include "caffe/util/math_functions.hpp" +#include "caffe/util/upgrade_proto.hpp" +#include "app/caffe/util.h" + +using namespace caffe; +using namespace std; +using caffe::Blob; +using caffe::Solver; +using caffe::SolverParameter; +using caffe::Caffe; +using caffe::caffe_scal; +using google::protobuf::io::CodedInputStream; +using std::string; +using std::vector; + +// caffe cmd flags + +DEFINE_int32(gpu, -1, + "Run in GPU mode on given device ID."); +DEFINE_string(solver, "", + "The solver definition protocol buffer text file."); +DEFINE_string(model, "", + "The model definition protocol buffer text file.."); +DEFINE_string(snapshot, "", + "Optional; the snapshot solver state to resume training."); +DEFINE_string(workers, "W0", + "cwd if workers, subdirectory of current directory."); +DEFINE_bool(fb_only, true, + "DEPRECATED; workers only ForwardBackward."); +DEFINE_bool(synced, false, + "DEPRECATED; pull/push synced with Forward"); +// client puller / pusher flags +DEFINE_int32(pushstep, 3, + "interval, in minibatches, between push operation."); +DEFINE_int32(pullstep, 3, + "DEPRECATED interval, in minibatches, between pull operation."); + +caffe::SolverParameter solver_param; +Solver* initCaffeSolver(int id){ + + Solver* solver; + + CHECK_GT(FLAGS_solver.size(), 0) << "Need a solver definition to train."; + + caffe::ReadProtoFromTextFileOrDie(FLAGS_solver, &solver_param); + + if (id < 0) { + id = FLAGS_gpu; + } + + if (id < 0 + && solver_param.solver_mode() == caffe::SolverParameter_SolverMode_GPU) { + id = solver_param.device_id(); + } + + // Set device id and mode + if (id >= 0) { + LOG(INFO) << "Use GPU with device ID " << id; + Caffe::SetDevice(id); + Caffe::set_mode(Caffe::GPU); + } else { + LOG(INFO) << "Use CPU."; + Caffe::set_mode(Caffe::CPU); + } + + solver = caffe::GetSolver(solver_param); + + if (FLAGS_snapshot.size()) { + LOG(INFO) << "Resuming from " << FLAGS_snapshot; + solver->Restore(FLAGS_snapshot.c_str()); + } + + return solver; +} + +caffe::Net* initCaffeNet(){ + CHECK_GT(FLAGS_solver.size(), 0) << "Need a solver definition to train."; + + caffe::ReadProtoFromTextFileOrDie(FLAGS_solver, &solver_param); + + caffe::NetParameter net_param; + std::string net_path = solver_param.net(); + caffe::ReadNetParamsFromTextFileOrDie(net_path, &net_param); + return new caffe::Net(net_param); +} + +#define V_WEIGHT "weight" +#define V_DIFF "diff" +#define V_SOLVER "solver" +#define V_ITER "iteration" +namespace PS { + +static std::mutex mu_pwd; +Solver* initCaffeSolverInDir(int id, string root){ + Lock l(mu_pwd); + char* cwd = getcwd(nullptr,1024); + LL << "previous cwd: " << cwd << " root: " << root; + CHECK(cwd != nullptr); + CHECK(0 == chdir(root.c_str())); + Solver* solver = initCaffeSolver(id); + CHECK(0 == chdir(cwd)); + free(cwd); + return solver; +} +void enableP2P(int numGPUs){ + for (int i=0; i, public VVListener { + public: + CaffeServer(const string& name, const string& conf) : App(name) { + diffBlobFront = new std::vector*>(); // for accumulate + diffBlobBack = new std::vector*>(); // for push + start_update = false; + } + virtual ~CaffeServer() { + for(auto blob : (*diffBlobFront)){ + delete blob; + } + for(auto blob : (*diffBlobBack)){ + delete blob; + } + for(auto blob : diffBlobs){ + delete blob; + } + delete diffBlobFront; + delete diffBlobBack; + } + + virtual void init() { + LL << myNodeID() << ", this is server " << myRank(); + + solver = initCaffeSolver(-1); + + // initialize the weight at server + int total_weight = 0; + weights = new VVector(V_WEIGHT, true, this); + diffs = new VVector(V_DIFF, false, this); + solver_states = new VVector(V_SOLVER, true, this); + iterations = new VVector(V_ITER, true, nullptr); + iterations->value(0) = {solver->iter()}; + for (int i = 0; i < solver->net()->params().size();i++){ + auto blob = solver->net()->params()[i]; + weights->value(i).reset(blob->mutable_cpu_data(), blob->count(), false); + auto newBlob = new Blob(blob->num(), blob->channels(), blob->height(), blob->width()); + memset(newBlob->mutable_cpu_diff(), 0, newBlob->diff()->size()); + diffBlobFront->push_back(newBlob); + newBlob = new Blob(blob->num(), blob->channels(), blob->height(), blob->width()); + memset(newBlob->mutable_cpu_diff(), 0, newBlob->diff()->size()); + diffBlobBack->push_back(newBlob); + + newBlob = new Blob(blob->num(), blob->channels(), blob->height(), blob->width()); + memset(newBlob->mutable_cpu_diff(), 0, newBlob->diff()->size()); + diffBlobs.push_back(newBlob); + diffs->value(i).reset(diffBlobs[i]->mutable_cpu_diff(), diffBlobs[i]->count(), false); + total_weight += blob->data()->size(); + } + + LL << "total weight size:" << total_weight; + } + + void testPhase(){ + Lock l(mu_solver); + solver->TestAll(); + } + + void snapshotPhase(){ + Lock l(mu_solver); + solver->Snapshot(); + } + + void run() { + LL << myNodeID() << ", server " << myRank() << " run()ing"; + while(true){ + if(diffCount == 0){ + waitUpdateSignal(); + } + computeUpdate(); + } + LL << myNodeID() << ", server " << myRank() << " over"; + } + + void process(const MessagePtr& msg) { + auto sgd = msg->task.sgd(); + if (sgd.cmd() == SGDCall::UPDATE_MODEL) { // sync param to memory + { + Lock l(mu_solver); + } + } + } + + /** + * by run() + */ + void waitUpdateSignal(){ + std::unique_lock l(mu_update); + while(!start_update){ + cv_update.wait(l); + } + } + + /** + * by run() + */ + void signalUpdateEnd(){ + std::unique_lock l(mu_update); + start_update = false; + cv_update.notify_all(); + } + + /** + * by vectorChanged() + */ + void signalUpdate() { + std::unique_lock l(mu_update); + start_update = true; + cv_update.notify_all(); + } + + /** + * by run() + */ + void computeUpdate() { + int count = 1; + { + Lock l(mu_diff); + if(diffCount < 1){ + LL << "no diff accumulated!"; + return; + } + auto temp = diffBlobFront; diffBlobFront = diffBlobBack;diffBlobBack = temp; + LL << "diff (" << diffCount << ") swapped to back"; + count = diffCount; + diffCount = 0; + } + Lock l(mu_solver); +// float first,last, firstv, lastv; + for (int i = 0; i < solver->net()->params().size();i++){ + auto dest = solver->net()->params()[i]; + auto src = (*diffBlobBack)[i]; + // clear diffBlobBack + memcpy(dest->mutable_cpu_diff(), src->cpu_diff(), dest->diff()->size()); + //scale down? + if(count > 1){ + caffe::caffe_scal(dest->count(), float(1.0 / count), dest->mutable_cpu_diff()); + } +/* if(i==0){ + first=blob->cpu_diff()[0]; + firstv = src[0]; + }else if(i == solver->net()->params().size()-1){ + last=blob->cpu_diff()[blob->count()-1]; + lastv = src[src.size() - 1]; + } +*/ + } + +// LL<< "got diff[" << first<<",...,"<ComputeUpdateValue(); + solver->net()->Update(); + solver->snapshotPhase(); + solver->stepEnd(); + + for (auto blob : (*diffBlobBack)){ + memset(blob->mutable_cpu_diff(), 0, blob->diff()->size()); + } + signalUpdateEnd(); + } + + void accumulateDiff() { + Lock l(mu_diff); + for(int i = 0; i < diffBlobFront->size();i++){ + auto src = diffs->value(i); + auto dest = (*diffBlobFront)[i]; + caffe::caffe_add(dest->count(), src.data(), dest->cpu_diff(), dest->mutable_cpu_diff()); + } + diffCount++; + signalUpdate(); + } + + void vectorChanged(VVector* data){ +// LL << "vector change received:" << data->name(); + CHECK_EQ(data, this->diffs) << "server only accept diff changes"; + accumulateDiff(); + + } + void vectorChanged(VVector* data){ + CHECK(false) << "shouldn't be any VVector change: "<< data->name(); + } + + void vectorGetting(VVector* data){ + Lock l(mu_solver); + float first, last; + if (data == this->weights){ + // need to sync to CPU + for (int i = 0; i < solver->net()->params().size();i++){ + auto blob = solver->net()->params()[i]; + blob->cpu_data(); + if (i==0) { + first = blob->cpu_data()[0]; + } else if (i == solver->net()->params().size() - 1 ) { + last = blob->cpu_data()[blob->count()-1]; + } + } + LL << "weight synced: ["<name(); + } + } + + void vectorGetting(VVector* data){ + LL << "getting char: "<< data->name(); + Lock l(mu_solver); + if (data == this->solver_states){ + // need to serialize solver state into solver_states + caffe::SolverState state; + this->solver->SnapshotSolverState(&state); + state.set_iter(solver->iter()); + state.set_current_step(solver->current_step()); + string buf; + state.SerializeToString(&buf); + solver_states->value(0).resize( buf.size() ); + memcpy(solver_states->value(0).data(), buf.data(), buf.size()); + LL << "server solver state saved, history:" << state.history_size() << ", total:" << buf.size(); + } else { + CHECK(false) << "some one is getting none-gettable! " << data->name(); + } + } + + + private: + VVector *solver_states; // individual data ptr, solver state to initialize workers + VVector *weights; //share data ptr with solver->net->params->cpu_data + + VVector *diffs; //individual data ptr with diffBlobs + std::vector*> diffBlobs; // for receiving from worker + + std::mutex mu_diff; // protect change to diffBlobFront and diffCount + int diffCount; // how many diffBlobs accumulated in diffBlobFront + std::vector*>* diffBlobFront; // for accumulating from diffBlobs + + std::vector*>* diffBlobBack; // for copying into solver + + std::mutex mu_solver; + caffe::Solver* solver; + + VVector *iterations; + + std::mutex mu_update; + std::condition_variable cv_update; + bool start_update; + +}; + +App* CreateServerNode(const std::string& conf) { + return new CaffeServer("app", conf); +} + + +class CaffeWorker; + +class NetForwarder { + + bool terminated; + int id; + CaffeWorker* worker; + string rootDir; + caffe::Solver* solver; + int weightVersion; // current version + int wantedVersion; // wanted version; increase with iterations + std::mutex mu_forward; + std::condition_variable cv_forward; + bool start_forward; + std::unique_ptr internalThread; + + bool needDisplay; + +public: + NetForwarder(CaffeWorker* parent, int id, string workerRoot, bool display): + id(id),worker(parent),rootDir(workerRoot), + solver(nullptr),weightVersion(-1),wantedVersion(0), + start_forward(false),needDisplay(display){ + } + + /** + * by CaffeForwarder + */ + void waitForwardSignal(){ + std::unique_lock l(mu_forward); + while(!start_forward){ + cv_forward.wait(l); + } + } + + /** + * by CaffeForwarder + */ + void signalForwardEnd(){ + std::unique_lock l(mu_forward); + start_forward = false; + cv_forward.notify_all(); + } + + /** + * by CaffeWorker + */ + void signalForward() { + std::unique_lock l(mu_forward); + start_forward = true; + cv_forward.notify_all(); + } + + /** + * by CaffeWorker + */ + void joinForwardEnd() { + if(!start_forward){ + return; + } + { + std::unique_lock l(mu_forward); + while(start_forward) { + cv_forward.wait(l); + } + } + } + + void copyWeight(); + + void tryCopyWeight(); + + void accumulateDiff(); + + void pullIterations(); + + void start() { + struct timeval tv; + unsigned long long t0,t1,t2, t3, t4, t5; + if(nullptr == solver) { + solver = initCaffeSolverInDir(id, rootDir); + LL << "Inited solver On device id # " << id; + } + int iter = solver->param().max_iter() - solver->iter(); + LL << "start training loop # " << id; + waitForwardSignal(); + LL << "start() forward signal received"; + copyWeight(); + pullIterations(); + for (int i = 0; i < iter; i++) { + t0 = tick(&tv); + // wait signal to forward + if(needDisplay){ + solver->testPhase(); + } + t1 = tick(&tv); + tryCopyWeight(); + t2 = tick(&tv); +// LL<< "forwarder # " << id; + solver->forwardBackwardPhase(); + t3 = tick(&tv); + this->accumulateDiff(); + t4 = tick(&tv); + if(needDisplay){ + solver->displayPhase(); + } + t5 = tick(&tv); + // bypass all of computeUpdateValue + solver->stepEnd(); + /* + LL << "# " << id << "\ttestPhase\t"<< (t1-t0) + << "\ttryCopyWeight\t"<< (t2-t1) + << "\tforwardBackward\t"<< (t3-t2) + << "\taccumulateDiff\t"<< (t4-t3) + << "\tdisplayPhase\t"<< (t5-t4); + */ + } + LL << "Forwarder sending forward end signal"; + signalForwardEnd(); + } + + void startAsync(){ + if(!internalThread.get()){ + internalThread.reset(new thread(&NetForwarder::start, this)); + } + } + + void stop() { + //TODO + } +}; + +std::vector &split(const std::string &s, char delim, std::vector &elems) { + std::stringstream ss(s); + std::string item; + while (std::getline(ss, item, delim)) { + elems.push_back(item); + } + return elems; +} + + +std::vector split(const std::string &s, char delim) { + std::vector elems; + split(s, delim, elems); + return elems; +} + +class CaffeWorker: public App{ +private: + + + + std::mutex mu_forward; + std::condition_variable cv_forward; + bool start_forward; + + std::mutex mu_push; + std::condition_variable cv_push; + + std::mutex mu_pull; + std::condition_variable cv_pull; + bool start_pull; + + std::mutex mu_version; // protect change to weightVersion and requestedVersion + int weightVersion; // current version no. of weights, in iteration count + int requestedVersion; // wanted version no. of weights, in iteration count + + std::mutex mu_weight; // protect write to weights + VVector *weights;// individual data ptr, same order/size as solver->net->params + + std::mutex mu_diff; //protect write to diffs diffCount + VVector *diffs;// for accumulated diff, share memory with diffBuffer (front/end) + int diffCount; // accumulated diff count + + std::vector*>* diffBlobFront; // for accumulate + std::vector*>* diffBlobBack; // for push + caffe::Solver* solver; + + VVector* iterations; + + std::unique_ptr pusher; + std::unique_ptr puller; + + volatile bool _terminate = false; + + std::vector forwarders; + +public: + CaffeWorker(const string& name, const string& conf):App(name){ + weightVersion = 0; + requestedVersion = 0; + diffBlobFront = new std::vector*>(); + diffBlobBack = new std::vector*>(); + } + ~CaffeWorker(){ + for(auto blob : (*diffBlobFront)){ + delete blob; + } + for(auto blob : (*diffBlobBack)){ + delete blob; + } + delete diffBlobFront; + delete diffBlobBack; + } + + void init(){ + LL << "worker init()"; + start_forward = false; + start_pull = false; + solver = initCaffeSolver(-1); + //init shared parameter at worker + weights = new VVector(V_WEIGHT); + diffs = new VVector(V_DIFF); + iterations = new VVector(V_ITER); + iterations->value(0) = {0}; + for (int i = 0; i < solver->net()->params().size();i++){ + auto blob = solver->net()->params()[i]; + weights->value(i).resize(blob->count()); + auto newBlob = new Blob(blob->num(), blob->channels(), blob->height(), blob->width()); + memset(newBlob->mutable_cpu_diff(), 0, newBlob->diff()->size()); + diffBlobFront->push_back(newBlob); + newBlob = new Blob(blob->num(), blob->channels(), blob->height(), blob->width()); + memset(newBlob->mutable_cpu_diff(), 0, newBlob->diff()->size()); + diffBlobBack->push_back(newBlob); + diffs->value(i).reset((*diffBlobBack)[i]->mutable_cpu_diff(), (*diffBlobBack)[i]->count(), false); + } + + //init pusher/puller + pusher.reset(new std::thread(&CaffeWorker::pusherMain, this)); + puller.reset(new std::thread(&CaffeWorker::pullerMain, this)); + + //init forwarders + vector workerRoots = split(FLAGS_workers, ','); + char* cwd = getcwd(nullptr,1024); + LL << "cwd: " << cwd; + CHECK(cwd != nullptr); + string cwdString(cwd); + for (int id = 0; id < workerRoots.size(); id++){ + bool display = id == 0; + string workerRoot = cwdString + "/" + workerRoots[id]; + LL << "creating forwarder in: " << workerRoot; +// CHECK(0 == chdir(workerRoot.c_str())); + NetForwarder* forwarder = new NetForwarder(this, id, workerRoot, display); + forwarders.push_back(forwarder); + forwarder->startAsync(); + } + enableP2P(forwarders.size()); +// CHECK(0 == chdir(cwd)); + free(cwd); + LL << "worker init() over"; + } + + /** + * by run() thread + */ + void waitForwardSignal(){ + std::unique_lock l(mu_forward); + while(!start_forward){ + cv_forward.wait(l); + } + } + + /** + * by run() thread + */ + void signalForwardEnd(){ + std::unique_lock l(mu_forward); + start_forward = false; + cv_forward.notify_all(); + } + + /** + * by process() thread + */ + void signalAndJoinForward() { + std::unique_lock l(mu_forward); + start_forward = true; + cv_forward.notify_all(); + while(start_forward) { + cv_forward.wait(l); + } + } + + void pullerMain(){ + LL << "puller start"; + while(true){ + waitPullSignal(); + pullWeight(); + signalPullEnd(); + } + LL << "puller exit"; + } + + void pusherMain(){ + LL << "pusher start"; + CUDA_CHECK(cudaSetDevice(0)); + while(true){ + waitPushSignal(); + pushDiff(); + } + LL << "pusher exit"; + } + + /** + * by pusher thread + */ + void waitPushSignal(){ + std::unique_lock l(mu_push); + cv_push.wait(l); + LL << "push signal received: " << diffCount; + } + + void signalPush(){ + std::unique_lock l(mu_push); + LL << "signal push on: " << diffCount; + cv_push.notify_all(); + } + + /** + * by puller thread + */ + void waitPullSignal(){ + std::unique_lock l(mu_pull); + while(!start_pull){ + cv_pull.wait(l); + } + LL << "pull signal received: " << requestedVersion << " vs " << weightVersion; + } + + /** + * by puller thread + */ + void signalPullEnd(){ + std::unique_lock l(mu_pull); + start_pull = false; + cv_pull.notify_all(); + } + + /** + * by worker run(), wait for initial pull + */ + void waitPullEnd(){ + std::unique_lock l(mu_pull); + while(start_pull){ + cv_pull.wait(l); + } + } + + /** + * by worker run() and forwarder.copy -> worker.tryCopyWeight() + */ + void signalPull(){ + std::unique_lock l(mu_pull); + LL << "signal pull on: " << requestedVersion << " vs " << weightVersion; + start_pull = true; + cv_pull.notify_all(); + } + + + + /** + * by main + */ + void run(){ + LL << "worker run()"; + this->requestedVersion = 0; // mark initial pull version as 0: default forwarder version is -1 + signalPull(); + waitPullEnd(); + LL << "initial pull over"; + for (int i = 0; i < forwarders.size(); i++){ + NetForwarder* forwarder = forwarders[i]; + forwarder->signalForward(); + } + for (int i = 0; i < forwarders.size(); i++){ + NetForwarder* forwarder = forwarders[i]; + forwarder->joinForwardEnd(); + } + disableP2P(forwarders.size()); + LL << "worker run() over"; + } + + void process(const MessagePtr& msg) { + LL << "message received"; + auto sgd = msg->task.sgd(); + if (sgd.cmd() == SGDCall::UPDATE_MODEL) { // sync param to memory + LL << "process() update model received"; + signalAndJoinForward(); + LL << "process() forward end received"; + } + } + + /** + * by forwarder + */ + void gatherDiff(Solver* another) { + struct timeval tv; + unsigned long long t0,t1,t2, t3, t4, t5; + t0 = tick(&tv); + Lock l(mu_diff); + t1 = tick(&tv); + for(int i = 0; i < another->net()->params().size(); i++){ + auto acc = (*diffBlobFront)[i]; + auto blob = another->net()->params()[i]; + ostringstream name; + name << "gatherDiff:solver.blobs[" << i << "]"; +// checkNAN(blob->count(), blob->cpu_diff(), name.str()); + switch (Caffe::mode()) { + case Caffe::CPU: + caffe::caffe_add(acc->count(), blob->cpu_diff(), acc->cpu_diff(), acc->mutable_cpu_diff()); + break; + case Caffe::GPU: + caffe::caffe_gpu_add(acc->count(), blob->gpu_diff(), acc->gpu_diff(), acc->mutable_gpu_diff()); + break; + default: + LOG(FATAL) << "Unknown caffe mode: " << Caffe::mode(); + } +// caffe::caffe_add(acc->count(), blob->cpu_diff(), acc->cpu_diff(), acc->mutable_cpu_diff()); + } + diffCount++; + if(diffCount >= FLAGS_pushstep) { + signalPush(); + } + t2 = tick(&tv); + if(t2 - t0 > 100000){ + LL << "long accumulate diff:\tlock\t" << (t1-t0) << "\tadd\t" << (t2-t1); + } + } + + /** + * by pusher, synchronized (block until message sent) + */ + void pushDiff(){ + { + // copy diff to diffBuffer + Lock l(mu_diff); + float first, last; + auto temp = diffBlobFront; diffBlobFront = diffBlobBack; diffBlobBack = temp; // for accumulate + LL << "Worker diff("<< diffCount <<") swapped to back"; + // clear diff count + diffCount = 0; + } + // reset diffs Vector pointer; sync blob diff to cpu + for(int i = 0; i < diffBlobBack->size(); i++){ + auto blob = (*diffBlobBack)[i]; + diffs->value(i).reset(blob->mutable_cpu_diff(), blob->count(), false); + } + //push to app instead of + MessagePtr msg(new Message(kServerGroup)); + msg->key = {0}; + msg->task.set_key_channel(0); + for(int i = 0; i < diffs->vcount();i++){ + auto acc = (*diffBlobBack)[i]; + acc->cpu_diff(); // sync to cpu + auto diff = diffs->value(i); + CHECK_EQ(acc->cpu_diff(), diff.data()); + msg->addValue(diff); + } + int push_time = diffs->push(msg); + diffs->waitOutMsg(kServerGroup, push_time); + //clear previous diff + for(auto acc : (*diffBlobBack)){ + switch(Caffe::mode()){ + case Caffe::CPU: + memset(acc->mutable_cpu_diff(), 0, acc->diff()->size()); + break; + case Caffe::GPU: + caffe_gpu_set(acc->count(), (float)0, acc->mutable_gpu_diff()); + break; + } + } + + LL << "Worker diff pushed to server"; + } + + /** + * by puller (except the initial pull), synchronized + */ + void pullWeight(){ + LL << "begin pull weight"; +// Task task; +// task.mutable_sgd()->set_cmd(SGDCall::UPDATE_MODEL); +// port(kServerGroup)->submitAndWait(task); + + Lock l(mu_weight); + MessagePtr msg(new Message(kServerGroup)); + msg->key = {0}; + LL << "begin pull"; + int pull_time = weights->pull(msg); + LL << "begin waitOutMsg"; + weights->waitOutMsg(kServerGroup, pull_time); + { + Lock l(mu_version); + this->weightVersion = this->requestedVersion; + } + LL << "weight pulled from server, total:" << weights->totalSize(); + } + + /** + * by main + */ + void pullIterations(Solver* another) { + MessagePtr msg(new Message(kServerGroup)); + msg->key = {0}; + int pull_time = iterations->pull(msg); + iterations->waitOutMsg(kServerGroup, pull_time); + Lock l(mu_weight); + SArraysrc = iterations->value(0); + LL << "iteration got: " << src.size() << "," << src[0]; + another->setIter(src[0]); + } + + + /** + * by forwarder + */ + void copyWeight(Solver* another, int* version){ + Lock l(mu_weight); // lock weight, prevent pulling while copying + float first,last; + for (int i = 0; i < another->net()->params().size();i++){ + auto blob = another->net()->params()[i]; + float* dest = blob->mutable_cpu_data(); + auto src = weights->value(i); + memcpy(dest, src.data(), blob->data()->size()); + //TODO direct copy to GPU? + if(i == 0){ + first = blob->cpu_data()[0]; + }else if(i == another->net()->params().size()-1){ + last = blob->cpu_data()[blob->count()-1]; + } + } + *version = weightVersion; + LL << "weight from server:[" << first << ",...," << last << "]"; + } + + /** + * by forwarder, check weight version & current wanted newest version number against worker's weight; + * copy if newer version arrived; + * mark + */ + bool tryCopyWeight(Solver* another, int* anotherCurrentVersion, int anotherWantedVersion){ + if(requestedVersion < anotherWantedVersion){ + // mark newer version requested + Lock l(mu_version); + if(requestedVersion < anotherWantedVersion){ + requestedVersion = anotherWantedVersion; + if(requestedVersion - weightVersion >= FLAGS_pullstep){ + signalPull(); + } + } + } + if(weightVersion <= *anotherCurrentVersion){ + // no need to copy + return false; + } + // need to copy + copyWeight(another, anotherCurrentVersion); + return true; + } +}; +void NetForwarder::copyWeight() { + this->worker->copyWeight(this->solver, &this->weightVersion); +} + + +void NetForwarder::tryCopyWeight() { + if(this->worker->tryCopyWeight(this->solver, &this->weightVersion, this->wantedVersion)){ + // copy successful; reset version counter to this newly copied version + this->wantedVersion = this->weightVersion; + } + this->wantedVersion ++; +} + +void NetForwarder::accumulateDiff(){ + this->worker->gatherDiff(this->solver); +} + +void NetForwarder::pullIterations(){ + this->worker->pullIterations(this->solver); +} + +} // namespace PS + +namespace PS { +App* App::create(const string& name, const string& conf) { + auto my_role = Postoffice::instance().myNode().role(); + if (my_role == Node::SERVER) { + return new CaffeServer(name, conf); + } else if(my_role == Node::WORKER){ + return new CaffeWorker(name, conf); + }else{ + return new App(); + } +} +} // namespace PS + + +int main(int argc, char *argv[]) { + + google::ParseCommandLineFlags(&argc, &argv, true); + + auto& sys = PS::Postoffice::instance(); + sys.start(&argc, &argv); + + sys.stop(); + LL << "system exit"; + return 0; +} + diff --git a/src/app/caffe/caffe_main.cc b/src/app/caffe/caffe_main.cc new file mode 100644 index 0000000..74b56f6 --- /dev/null +++ b/src/app/caffe/caffe_main.cc @@ -0,0 +1,560 @@ +#include +#include +#include +#include + +#include "ps.h" +#include "system/app.h" +#include "parameter/v_vector.h" +#include "parameter/kv_vector.h" +#include "caffe/caffe.hpp" +#include "caffe/util/math_functions.hpp" +using caffe::Blob; +using caffe::Solver; +using caffe::SolverParameter; +using caffe::Caffe; +using caffe::caffe_scal; +using google::protobuf::io::CodedInputStream; + +// caffe cmd flags + +DEFINE_int32(gpu, -1, + "Run in GPU mode on given device ID."); +DEFINE_string(solver, "", + "The solver definition protocol buffer text file."); +DEFINE_string(model, "", + "The model definition protocol buffer text file.."); +DEFINE_string(snapshot, "", + "Optional; the snapshot solver state to resume training."); + +DEFINE_bool(fb_only, true, + "Optional; workers only ForwardBackward."); +DEFINE_bool(synced, false, + "Optional. pull/push synced with Forward"); + +// client puller / pusher flags + +DEFINE_int32(pullstep, 4, + "interval, in minibatches, between pull operation."); +DEFINE_int32(pushstep, 3, + "interval, in minibatches, between push operation."); + + + +caffe::SolverParameter solver_param; +Solver* initCaffeSolver(){ + + Solver* solver; + + CHECK_GT(FLAGS_solver.size(), 0) << "Need a solver definition to train."; + + caffe::ReadProtoFromTextFileOrDie(FLAGS_solver, &solver_param); + + if (FLAGS_gpu < 0 + && solver_param.solver_mode() == caffe::SolverParameter_SolverMode_GPU) { + FLAGS_gpu = solver_param.device_id(); + } + + // Set device id and mode + if (FLAGS_gpu >= 0) { + LOG(INFO) << "Use GPU with device ID " << FLAGS_gpu; + Caffe::SetDevice(FLAGS_gpu); + Caffe::set_mode(Caffe::GPU); + } else { + LOG(INFO) << "Use CPU."; + Caffe::set_mode(Caffe::CPU); + } + + solver = caffe::GetSolver(solver_param); + + if (FLAGS_snapshot.size()) { + LOG(INFO) << "Resuming from " << FLAGS_snapshot; + solver->Restore(FLAGS_snapshot.c_str()); + } + + return solver; +} + +caffe::Net* initCaffeNet(){ + CHECK_GT(FLAGS_solver.size(), 0) << "Need a solver definition to train."; + + caffe::ReadProtoFromTextFileOrDie(FLAGS_solver, &solver_param); + + std::string net_path = solver_param.net(); + + return NULL;// todo +} + +#define V_WEIGHT "weight" +#define V_DIFF "diff" +#define V_SOLVER "solver" +namespace PS { +class CaffeServer : public App, public VVListener, public VVListener { + public: + CaffeServer(const string& name, const string& conf) : App(name) { } + virtual ~CaffeServer() { } + + virtual void init() { + LL << myNodeID() << ", this is server " << myRank(); + + solver = initCaffeSolver(); + + // initialize the weight at server + int total_weight = 0; + weights = new VVector(V_WEIGHT, true, this); + diffs = new VVector(V_DIFF, false, this); + solver_states = new VVector(V_SOLVER, true, this); + + for (int i = 0; i < solver->net()->params().size();i++){ + auto blob = solver->net()->params()[i]; + weights->value(i).reset(blob->mutable_cpu_data(), blob->count(), false); + Blob* newBlob = new Blob(blob->num(), blob->channels(), blob->height(), blob->width()); + diffBlobs.push_back(newBlob); + diffs->value(i).reset(newBlob->mutable_cpu_diff(), newBlob->count(), false); + total_weight += blob->data()->size(); + } + + LL << "total weight size:" << total_weight; + } + + void testPhase(){ + Lock l(mu_solver); + solver->TestAll(); + } + + void snapshotPhase(){ + Lock l(mu_solver); + solver->Snapshot(); + } + + + void run() { + LL << myNodeID() << ", server " << myRank() << " run()ing"; + int nextTest=0,nextSnapshot=0; + auto param = solver->param(); + bool needTest = param.test_interval() > 0; + bool needSnapshot = param.snapshot() > 0; + if(needTest){ + nextTest = solver->iter() - solver->iter() % param.test_interval() + param.test_interval(); + } + if(needSnapshot){ + nextSnapshot = solver->iter() - solver->iter() % param.snapshot() + param.snapshot(); + } + while(true){ +// LL << "server looping: iter " << solver->iter() << " vs test " << nextTest << " vs snapshot " << nextSnapshot; + if (needTest && solver->iter() >= nextTest) { + nextTest = solver->iter() - solver->iter() % param.test_interval() + param.test_interval(); + // test too slow!! on CPU!! a waste if on GPU!! +// testPhase(); + } + // no loss to display at server + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + } + + void process(const MessagePtr& msg) { + auto sgd = msg->task.sgd(); + if (sgd.cmd() == SGDCall::UPDATE_MODEL) { // sync param to memory + { + Lock l(mu_solver); + } + } + } + + void vectorChanged(VVector* data){ +// LL << "vector change received:" << data->name(); + CHECK_EQ(data, this->diffs) << "server only accept diff changes"; + + Lock l(mu_solver); +// float first,last, firstv, lastv; + for (int i = 0; i < solver->net()->params().size();i++){ + auto blob = solver->net()->params()[i]; + float* dest = blob->mutable_cpu_diff(); + auto src = diffs->value(i); + memcpy(dest, src.data(), blob->diff()->size()); + + //scale down? + if(FLAGS_pushstep != 0){ + caffe::caffe_scal(blob->count(), float(1.0 / FLAGS_pushstep), dest); + } +/* if(i==0){ + first=blob->cpu_diff()[0]; + firstv = src[0]; + }else if(i == solver->net()->params().size()-1){ + last=blob->cpu_diff()[blob->count()-1]; + lastv = src[src.size() - 1]; + } +*/ + } + +// LL<< "got diff[" << first<<",...,"<ComputeUpdateValue(); + solver->net()->Update(); + solver->snapshotPhase(); + solver->stepEnd(); + + } + void vectorChanged(VVector* data){ + CHECK(false) << "shouldn't be any VVector change: "<< data->name(); + } + + void vectorGetting(VVector* data){ + Lock l(mu_solver); + float first, last; + if (data == this->weights){ + // need to sync to CPU + for (int i = 0; i < solver->net()->params().size();i++){ + auto blob = solver->net()->params()[i]; + blob->cpu_data(); + if (i==0) { + first = blob->cpu_data()[0]; + } else if (i == solver->net()->params().size() - 1 ) { + last = blob->cpu_data()[blob->count()-1]; + } + } + LL << "weight synced: ["<name(); + } + } + + void vectorGetting(VVector* data){ + LL << "getting char: "<< data->name(); + Lock l(mu_solver); + if (data == this->solver_states){ + // need to serialize solver state into solver_states + caffe::SolverState state; + this->solver->SnapshotSolverState(&state); + state.set_iter(solver->iter()); + state.set_current_step(solver->current_step()); + string buf; + state.SerializeToString(&buf); + solver_states->value(0).resize( buf.size() ); + memcpy(solver_states->value(0).data(), buf.data(), buf.size()); + LL << "server solver state saved, history:" << state.history_size() << ", total:" << buf.size(); + } else { + CHECK(false) << "some one is getting none-gettable! " << data->name(); + } + } + + + private: + VVector *solver_states; // individual data ptr, solver state to initialize workers + VVector *weights; //share data ptr with solver->net->params->cpu_data + VVector *diffs; //individual data ptr with diffBlobs + std::vector*> diffBlobs; + + std::mutex mu_solver; + caffe::Solver* solver; +}; + +App* CreateServerNode(const std::string& conf) { + return new CaffeServer("app", conf); +} + + +class CaffeWorker: public App{ +private: + + std::mutex mu_weight; // protect write to weight_ready and weights + volatile bool weight_ready; + std::mutex mu_diff; //protect write to diffs + + VVector *solver_states; // individual data ptr, solver state to initialize workers + + VVector *weights;// individual data ptr, same order/size as solver->net->params + VVector *diffs;// for accumulated diff, share memory with diffBlobs + std::vector*> diffBlobs; + caffe::Solver* solver; + + std::unique_ptr pusher; + std::unique_ptr puller; + + volatile unsigned int tickDiff=0, tickStep=0; + + volatile bool _terminate = false; + +public: + CaffeWorker(const string& name, const string& conf):App(name){ + + } + ~CaffeWorker(){ + if(pusher){ + pusher->join(); + } + if(puller){ + puller->join(); + } + } + + void init(){ + + } + + /** + * by main + */ + void run(){ + LL << "worker run()"; + + solver = initCaffeSolver(); + //init shared parameter at worker + weights = new VVector(V_WEIGHT); + diffs = new VVector(V_DIFF); + solver_states = new VVector(V_SOLVER); + solver_states->value(0) = {}; + solver_states->setResizable(true); + + for (int i = 0; i < solver->net()->params().size();i++){ + auto blob = solver->net()->params()[i]; + weights->value(i).resize(blob->count()); + Blob* newBlob = new Blob(blob->num(), blob->channels(), blob->height(), blob->width()); + diffBlobs.push_back(newBlob); + diffs->value(i).reset(newBlob->mutable_cpu_diff(), newBlob->count(), false); + } + + //initial pull from server + pullSolverState(); + pullWeight(); + swapWeight(); + if(!FLAGS_synced){ + // start pusher/puller + pusher = std::unique_ptr(new std::thread(&CaffeWorker::pusherMain, this)); + puller = std::unique_ptr(new std::thread(&CaffeWorker::pullerMain, this)); + } + // start training loop + int iter = solver->param().max_iter() - solver->iter(); + LL << "start training loop"; + for (int i = 0; i < iter; i++) { + //solver->OneStep() + if(FLAGS_synced && tickStep % FLAGS_pullstep == 0){ + pullWeight(); + } + swapWeight(); + solver->testPhase(); + solver->forwardBackwardPhase(); + this->accumulateDiff(); + if(FLAGS_synced && tickDiff % FLAGS_pushstep == 0){ + pushDiff(); + } + solver->displayPhase(); + // bypass all of them + if(!FLAGS_fb_only){ + solver->ComputeUpdateValue(); + solver->net()->Update(); + solver->snapshotPhase(); + } + solver->stepEnd(); + stepEnd(); + } + // If we haven't already, save a snapshot after optimization, unless + // overridden by setting snapshot_after_train := false + if (!FLAGS_fb_only && solver->param().snapshot_after_train() + && (!solver->param().snapshot() || solver->iter() % solver->param().snapshot() != 0)) { + solver->Snapshot(); + } + // After the optimization is done, run an additional train and test pass to + // display the train and test loss/outputs if appropriate (based on the + // display and test_interval settings, respectively). Unlike in the rest of + // training, for the train net we only run a forward pass as we've already + // updated the parameters "max_iter" times -- this final pass is only done to + // display the loss, which is computed in the forward pass. + if (solver->param().display() && solver->iter() % solver->param().display() == 0) { + float loss; + solver->net()->ForwardPrefilled(&loss); + LOG(INFO) << "Iteration " << solver->iter() << ", loss = " << loss; + } + if (solver->param().test_interval() && solver->iter() % solver->param().test_interval() == 0) { + solver->TestAll(); + } + terminate(); + } + + /** + * notify accumulateDiff end, for pusher counting + */ + void diffEnd(){ + tickDiff++; + } + /** + * for puller counting + */ + void stepEnd(){ + tickStep++; + } + + void terminate(){ + _terminate = true; + //TODO tell server I'm done + } + + void pusherMain(){ + LL << "pusher start"; + int nextTick = tickDiff - tickDiff % FLAGS_pushstep + FLAGS_pushstep; + while(!_terminate){ +// LL << "pusher check " << tickDiff; + if(this->tickDiff >= nextTick){ + nextTick = tickDiff - tickDiff % FLAGS_pushstep + FLAGS_pushstep; + pushDiff(); + } + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + LL << "pusher exit"; + } + + void pullerMain(){ + LL << "puller start"; + int nextTick = tickStep - tickStep % FLAGS_pullstep + FLAGS_pullstep; + while(!_terminate){ +// LL << "puller check " << tickStep << " vs " << nextTick; + if(this->tickStep >= nextTick){ + nextTick = tickStep - tickStep % FLAGS_pullstep + FLAGS_pullstep; + pullWeight(); + } + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + LL << "puller exit"; + } + + /** + * by pusher, synchronized (block until message sent) + */ + void pushDiff(){ + Lock l(mu_diff); + //push to app instead of + MessagePtr msg(new Message(kServerGroup)); + msg->key = {0}; + msg->task.set_key_channel(0); + for(int i = 0; i < diffs->vcount();i++){ + auto acc = diffBlobs[i]; + acc->cpu_diff(); // sync to cpu + auto diff = diffs->value(i); + CHECK_EQ(acc->cpu_diff(), diff.data()); + msg->addValue(diff); + } + int push_time = diffs->push(msg); + diffs->waitOutMsg(kServerGroup, push_time); + //clear previous diff + for(auto acc : diffBlobs){ + memset(acc->mutable_cpu_diff(), 0, acc->diff()->size()); + } + } + + /** + * by main + */ + void accumulateDiff(){ + { + Lock l(mu_diff); + for(int i = 0; i < solver->net()->params().size(); i++){ + auto acc = diffBlobs[i]; + auto blob = solver->net()->params()[i]; + switch (Caffe::mode()) { + case Caffe::CPU: + caffe::caffe_add(acc->count(), blob->cpu_diff(), acc->cpu_diff(), acc->mutable_cpu_diff()); + break; + case Caffe::GPU: + caffe::caffe_gpu_add(acc->count(), blob->gpu_diff(), acc->gpu_diff(), acc->mutable_gpu_diff()); + break; + default: + LOG(FATAL) << "Unknown caffe mode: " << Caffe::mode(); + } + } + } + diffEnd(); + } + + /** + * by main + */ + void pullSolverState() { + MessagePtr msg(new Message(kServerGroup)); + msg->key = {0}; + int pull_time = solver_states->pull(msg); + solver_states->waitOutMsg(kServerGroup, pull_time); + Lock l(mu_weight); + SArraysrc = solver_states->value(0); + LL << "solver state got: " << src.size(); + caffe::SolverState state; + CodedInputStream cis((const uint8*) src.data(), src.size()); + cis.SetTotalBytesLimit(INT_MAX, 536870912); + state.ParseFromCodedStream(&cis); + + solver->RestoreSolverState(state); + solver->setIter(state.iter()); + solver->setCurrentStep(state.current_step()); + } + + /** + * by puller (except the initial pull), synchronized + */ + void pullWeight(){ +// Task task; +// task.mutable_sgd()->set_cmd(SGDCall::UPDATE_MODEL); +// port(kServerGroup)->submitAndWait(task); + + Lock l(mu_weight); + if(weight_ready){ + return; + } + MessagePtr msg(new Message(kServerGroup)); + msg->key = {0}; + int pull_time = weights->pull(msg); + weights->waitOutMsg(kServerGroup, pull_time); + weight_ready = true; +// LL << "weight pulled from server, total:" << weights->totalSize(); + } + /** + * by main, copy received weight into solver->net + */ + void swapWeight(){ + Lock l(mu_weight); + if(!weight_ready){ + return; + } + float first,last; + for (int i = 0; i < solver->net()->params().size();i++){ + auto blob = solver->net()->params()[i]; + float* dest = blob->mutable_cpu_data(); + auto src = weights->value(i); + memcpy(dest, src.data(), blob->data()->size()); + //TODO direct copy to GPU? + if(i == 0){ + first = blob->cpu_data()[0]; + }else if(i == solver->net()->params().size()-1){ + last = blob->cpu_data()[blob->count()-1]; + } + } + LL << "weight from server:[" << first << ",...," << last << "]"; + weight_ready = false; + } +}; +} // namespace PS + +namespace PS { +App* App::create(const string& name, const string& conf) { + auto my_role = Postoffice::instance().myNode().role(); + if (my_role == Node::SERVER) { + return new CaffeServer(name, conf); + } else if(my_role == Node::WORKER){ + return new CaffeWorker(name, conf); + }else{ + return new App(); + } +} +} // namespace PS + + +int main(int argc, char *argv[]) { + + google::ParseCommandLineFlags(&argc, &argv, true); + + auto& sys = PS::Postoffice::instance(); + sys.start(&argc, &argv); + + sys.stop(); + return 0; +} + diff --git a/src/app/caffe/caffe_share.cc b/src/app/caffe/caffe_share.cc new file mode 100644 index 0000000..8ee378e --- /dev/null +++ b/src/app/caffe/caffe_share.cc @@ -0,0 +1,724 @@ +#include +#include +#include +#include +#include + +#include "ps.h" +#include "system/app.h" +#include "parameter/v_vector.h" +#include "parameter/kv_vector.h" +#include "caffe/caffe.hpp" +#include "caffe/common.hpp" +#include "caffe/util/math_functions.hpp" +#include "caffe/util/upgrade_proto.hpp" +#include "app/caffe/util.h" + +using namespace caffe; +using namespace std; +using caffe::Blob; +using caffe::Solver; +using caffe::SolverParameter; +using caffe::Caffe; +using caffe::caffe_scal; +using google::protobuf::io::CodedInputStream; +using std::string; +using std::vector; + +// caffe cmd flags + +DEFINE_int32(gpu, -1, + "Run in GPU mode on given device ID."); +DEFINE_string(solver, "", + "The solver definition protocol buffer text file."); +DEFINE_string(model, "", + "The model definition protocol buffer text file.."); +DEFINE_string(snapshot, "", + "Optional; the snapshot solver state to resume training."); +DEFINE_string(workers, "W0", + "cwd if workers, subdirectory of current directory."); +DEFINE_bool(fb_only, true, + "DEPRECATED; workers only ForwardBackward."); +DEFINE_bool(synced, false, + "DEPRECATED; pull/push synced with Forward"); +// client puller / pusher flags +DEFINE_int32(pushstep, 3, + "interval, in minibatches, between push operation."); +DEFINE_int32(pullstep, 3, + "DEPRECATED interval, in minibatches, between pull operation."); + +caffe::SolverParameter solver_param; +Solver* initCaffeSolver(int id){ + + Solver* solver; + + CHECK_GT(FLAGS_solver.size(), 0) << "Need a solver definition to train."; + + caffe::ReadProtoFromTextFileOrDie(FLAGS_solver, &solver_param); + + if (id < 0) { + id = FLAGS_gpu; + } + + if (id < 0 + && solver_param.solver_mode() == caffe::SolverParameter_SolverMode_GPU) { + id = solver_param.device_id(); + } + + // Set device id and mode + if (id >= 0) { + LOG(INFO) << "Use GPU with device ID " << id; + Caffe::SetDevice(id); + Caffe::set_mode(Caffe::GPU); + } else { + LOG(INFO) << "Use CPU."; + Caffe::set_mode(Caffe::CPU); + } + + solver = caffe::GetSolver(solver_param); + + if (FLAGS_snapshot.size()) { + LOG(INFO) << "Resuming from " << FLAGS_snapshot; + solver->Restore(FLAGS_snapshot.c_str()); + } + + return solver; +} + +caffe::Net* initCaffeNet(){ + CHECK_GT(FLAGS_solver.size(), 0) << "Need a solver definition to train."; + + caffe::ReadProtoFromTextFileOrDie(FLAGS_solver, &solver_param); + + caffe::NetParameter net_param; + std::string net_path = solver_param.net(); + caffe::ReadNetParamsFromTextFileOrDie(net_path, &net_param); + return new caffe::Net(net_param); +} + +#define V_WEIGHT "weight" +#define V_DIFF "diff" +#define V_SOLVER "solver" +namespace PS { + +static std::mutex mu_pwd; +Solver* initCaffeSolverInDir(int id, string root){ + Lock l(mu_pwd); + char* cwd = getcwd(nullptr,1024); + LL << "cwd: " << cwd; + CHECK(cwd != nullptr); + CHECK(0 == chdir(root.c_str())); + Solver* solver = initCaffeSolver(id); + CHECK(0 == chdir(cwd)); + free(cwd); + return solver; +} + + +class CaffeServer : public App, public VVListener, public VVListener { + public: + CaffeServer(const string& name, const string& conf) : App(name) { } + virtual ~CaffeServer() { } + + virtual void init() { + LL << myNodeID() << ", this is server " << myRank(); + + solver = initCaffeSolver(-1); + + // initialize the weight at server + int total_weight = 0; + weights = new VVector(V_WEIGHT, true, this); + diffs = new VVector(V_DIFF, false, this); + solver_states = new VVector(V_SOLVER, true, this); + + for (int i = 0; i < solver->net()->params().size();i++){ + auto blob = solver->net()->params()[i]; + weights->value(i).reset(blob->mutable_cpu_data(), blob->count(), false); + Blob* newBlob = new Blob(blob->num(), blob->channels(), blob->height(), blob->width()); + diffBlobs.push_back(newBlob); + diffs->value(i).reset(newBlob->mutable_cpu_diff(), newBlob->count(), false); + Blob* accBlob = new Blob(blob->num(), blob->channels(), blob->height(), blob->width()); + accDiffBlobs.push_back(accBlob); + total_weight += blob->data()->size(); + } + + LL << "total weight size:" << total_weight; + } + + void testPhase(){ + Lock l(mu_solver); + solver->TestAll(); + } + + void snapshotPhase(){ + Lock l(mu_solver); + solver->Snapshot(); + } + + void resetDiffCount() { + Lock l(mu_accDiff); + accDiffCount = 0; + for(auto acc : diffBlobs){ + memset(acc->mutable_cpu_diff(), 0, acc->diff()->size()); + } + for(auto acc : accDiffBlobs){ + memset(acc->mutable_cpu_diff(), 0, acc->diff()->size()); + } + } + + void signalWorkersStart(){ + resetDiffCount(); + Task task; + auto sgd = task.mutable_sgd(); + sgd->set_cmd(SGDCall::UPDATE_MODEL); + port(kWorkerGroup)->submitAndWait(task); +// port(kWorkerGroup)->submit(task); + LL << "signalWorkerStart returned"; + } + + void gatherDiffs(){ + //TODO do not need to synchronize all workers? + /* + while(true){ + { + Lock l0(mu_accDiff); + if(accDiffCount == sys_.yp().num_workers()){ + break; + } + } + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + */ +// LL << "all diff gathered: " << accDiffCount; + Lock l(mu_solver); + Lock l2(mu_accDiff); + CHECK_EQ(accDiffCount, sys_.yp().num_workers()) << "accDiffCount should be same as num_workers!"; +// float first,last, firstv, lastv; + for (int i = 0; i < solver->net()->params().size();i++){ + auto blob = solver->net()->params()[i]; + float* dest = blob->mutable_cpu_diff(); + auto src = accDiffBlobs[i]; + memcpy(dest, src->cpu_diff(), blob->diff()->size()); + +/* if(i==0){ + first=blob->cpu_diff()[0]; + firstv = src[0]; + }else if(i == solver->net()->params().size()-1){ + last=blob->cpu_diff()[blob->count()-1]; + lastv = src[src.size() - 1]; + } +*/ + } + LL << "accDiffs copied to net->params->diff"; + + } + + + void run() { + LL << myNodeID() << ", server " << myRank() << " run()ing"; + auto param = solver->param(); + int iter = solver->param().max_iter() - solver->iter(); + LL << "start training loop"; + for (int i = 0; i < iter; i++) { + signalWorkersStart(); + gatherDiffs(); + solver->ComputeUpdateValue(); + solver->net()->Update(); + solver->snapshotPhase(); + solver->stepEnd(); + } + } + + void process(const MessagePtr& msg) { + auto sgd = msg->task.sgd(); + if (sgd.cmd() == SGDCall::UPDATE_MODEL) { // sync param to memory + { + Lock l(mu_solver); + } + } + } + + void vectorChanged(VVector* data){ +// LL << "vector change received:" << data->name(); + CHECK_EQ(data, this->diffs) << "server only accept diff changes"; + + Lock l(mu_accDiff); + // append diffs to accDiffs + float first,last; + for (int i = 0; i < diffBlobs.size();i++) { + auto blob = diffBlobs[i]; + float* dest = accDiffBlobs[i]->mutable_cpu_diff(); + float* src = diffBlobs[i]->mutable_cpu_diff(); + if(i==0){ + first=blob->cpu_diff()[0]; + }else if(i == solver->net()->params().size()-1){ + last=blob->cpu_diff()[blob->count()-1]; + } + //check + ostringstream nameSt; + nameSt << "diffBlob[" << i <<"]"; +// checkNAN(blob->count(), diffBlobs[i]->cpu_diff(), nameSt.str()); + //scale down? + if(FLAGS_pushstep != 0){ + caffe::caffe_scal(blob->count(), float(1.0 / FLAGS_pushstep), src); + } + + caffe::caffe_add(blob->count(), src, dest, dest); + + } + accDiffCount++; +// LL << "diff gathered: # " << accDiffCount; + LL<< "got diff[" << first<<",...,"<* data){ + CHECK(false) << "shouldn't be any VVector change: "<< data->name(); + } + + void vectorGetting(VVector* data){ + Lock l(mu_solver); + float first, last; + if (data == this->weights){ + // need to sync to CPU + for (int i = 0; i < solver->net()->params().size();i++){ + auto blob = solver->net()->params()[i]; + blob->cpu_data(); + if (i==0) { + first = blob->cpu_data()[0]; + } else if (i == solver->net()->params().size() - 1 ) { + last = blob->cpu_data()[blob->count()-1]; + } + } + LL << "weight synced: ["<name(); + } + } + + void vectorGetting(VVector* data){ + LL << "getting char: "<< data->name(); + Lock l(mu_solver); + if (data == this->solver_states){ + // need to serialize solver state into solver_states + caffe::SolverState state; + this->solver->SnapshotSolverState(&state); + state.set_iter(solver->iter()); + state.set_current_step(solver->current_step()); + string buf; + state.SerializeToString(&buf); + solver_states->value(0).resize( buf.size() ); + memcpy(solver_states->value(0).data(), buf.data(), buf.size()); + LL << "server solver state saved, history:" << state.history_size() << ", total:" << buf.size(); + } else { + CHECK(false) << "some one is getting none-gettable! " << data->name(); + } + } + + + private: + VVector *solver_states; // individual data ptr, solver state to initialize workers + VVector *weights; //share data ptr with solver->net->params->cpu_data + VVector *diffs; //individual data ptr with diffBlobs + std::vector*> diffBlobs; + + std::vector*> accDiffBlobs; + int accDiffCount; + std::mutex mu_accDiff; // guard accDiffCount and accDiffBlobs + + std::mutex mu_solver; + caffe::Solver* solver; +}; + +App* CreateServerNode(const std::string& conf) { + return new CaffeServer("app", conf); +} + + +class CaffeWorker; + +class NetForwarder { + + bool terminated; + int id; + CaffeWorker* worker; + string rootDir; + caffe::Solver* solver; + std::mutex mu_forward; + std::condition_variable cv_forward; + bool start_forward; + std::unique_ptr internalThread; + + bool needDisplay; + +public: + NetForwarder(CaffeWorker* parent, int id, string workerRoot, bool display): + id(id),worker(parent),rootDir(workerRoot), + solver(nullptr),start_forward(false),needDisplay(display){ + } + + /** + * by CaffeForwarder + */ + void waitForwardSignal(){ + std::unique_lock l(mu_forward); + while(!start_forward){ + cv_forward.wait(l); + } + } + + /** + * by CaffeForwarder + */ + void signalForwardEnd(){ + std::unique_lock l(mu_forward); + start_forward = false; + cv_forward.notify_all(); + } + + /** + * by CaffeWorker + */ + void signalForward() { + std::unique_lock l(mu_forward); + start_forward = true; + cv_forward.notify_all(); + } + + /** + * by CaffeWorker + */ + void joinForwardEnd() { + if(!start_forward){ + return; + } + { + std::unique_lock l(mu_forward); + while(start_forward) { + cv_forward.wait(l); + } + } + } + + void copyWeight(); + + void accumulateDiff(); + + void start() { + if(nullptr == solver) { + solver = initCaffeSolverInDir(id, rootDir); + LL << "Inited solver On device id # " << id; + } + /* + if (this->id >= 0) { + LOG(INFO) << "Net Use GPU with device ID " << this->id; + Caffe::SetDevice(this->id); + Caffe::set_mode(Caffe::GPU); + } else { + LOG(INFO) << "Use CPU."; + Caffe::set_mode(Caffe::CPU); + } + */ +// this->net = initCaffeNet(); + std::vector*> bottom_vec; + while(true) { + // wait signal to forward + waitForwardSignal(); + LL << "run() forward signal received"; + copyWeight(); + for (int i = 0; i < FLAGS_pushstep; i++) { + if(needDisplay){ + solver->testPhase(); + } + solver->forwardBackwardPhase(); + this->accumulateDiff(); + if(needDisplay){ + solver->displayPhase(); + } + // bypass all of computeUpdateValue + solver->stepEnd(); + } + LL << "pushstep " << FLAGS_pushstep << " reached."; + LL << "Forwarder sending forward end signal"; + signalForwardEnd(); + } + } + + void startAsync(){ + if(!internalThread.get()){ + internalThread.reset(new thread(&NetForwarder::start, this)); + } + } + + void stop() { + //TODO + } +}; + +std::vector &split(const std::string &s, char delim, std::vector &elems) { + std::stringstream ss(s); + std::string item; + while (std::getline(ss, item, delim)) { + elems.push_back(item); + } + return elems; +} + + +std::vector split(const std::string &s, char delim) { + std::vector elems; + split(s, delim, elems); + return elems; +} + +class CaffeWorker: public App{ +private: + + std::mutex mu_weight; // protect write to weight_ready and weights + std::mutex mu_diff; //protect write to diffs + + + std::mutex mu_forward; + std::condition_variable cv_forward; + bool start_forward; + + VVector *weights;// individual data ptr, same order/size as solver->net->params + VVector *diffs;// for accumulated diff, share memory with diffBlobs + std::vector*> diffBlobs; + caffe::Solver* solver; + + volatile bool _terminate = false; + + std::vector forwarders; + +public: + CaffeWorker(const string& name, const string& conf):App(name){ + + } + ~CaffeWorker(){ + + } + + void init(){ + LL << "worker init()"; + start_forward = false; + solver = initCaffeSolver(-1); + //init shared parameter at worker + weights = new VVector(V_WEIGHT); + diffs = new VVector(V_DIFF); + + for (int i = 0; i < solver->net()->params().size();i++){ + auto blob = solver->net()->params()[i]; + weights->value(i).resize(blob->count()); + Blob* newBlob = new Blob(blob->num(), blob->channels(), blob->height(), blob->width()); + diffBlobs.push_back(newBlob); + diffs->value(i).reset(newBlob->mutable_cpu_diff(), newBlob->count(), false); + } + + //init forwarders + vector workerRoots = split(FLAGS_workers, ','); + char* cwd = getcwd(nullptr,1024); + LL << "cwd: " << cwd; + CHECK(cwd != nullptr); + string cwdString(cwd); + for (int id = 0; id < workerRoots.size(); id++){ + bool display = id == 0; + string workerRoot = cwdString + "/" + workerRoots[id]; + LL << "creating forwarder in: " << workerRoot; +// CHECK(0 == chdir(workerRoot.c_str())); + NetForwarder* forwarder = new NetForwarder(this, id, workerRoot, display); + forwarders.push_back(forwarder); + forwarder->startAsync(); + } +// CHECK(0 == chdir(cwd)); + free(cwd); + LL << "worker init() over"; + } + + /** + * by run() thread + */ + void waitForwardSignal(){ + std::unique_lock l(mu_forward); + while(!start_forward){ + cv_forward.wait(l); + } + } + + /** + * by run() thread + */ + void signalForwardEnd(){ + std::unique_lock l(mu_forward); + start_forward = false; + cv_forward.notify_all(); + } + + /** + * by process() thread + */ + void signalAndJoinForward() { + std::unique_lock l(mu_forward); + start_forward = true; + cv_forward.notify_all(); + while(start_forward) { + cv_forward.wait(l); + } + } + + + /** + * by main + */ + void run(){ + LL << "worker run()"; + while(true) { + // wait signal to forward + waitForwardSignal(); + LL << "run() forward signal received"; + pullWeight(); + for (int i = 0; i < forwarders.size(); i++){ + NetForwarder* forwarder = forwarders[i]; + forwarder->signalForward(); + } + for (int i = 0; i < forwarders.size(); i++){ + NetForwarder* forwarder = forwarders[i]; + forwarder->joinForwardEnd(); + } + LL << "all forwarder joined: " << forwarders.size(); + pushDiff(); + LL << "Worker sending forward end signal"; + signalForwardEnd(); + } + LL << "worker run() over"; + } + + void process(const MessagePtr& msg) { + LL << "message received"; + auto sgd = msg->task.sgd(); + if (sgd.cmd() == SGDCall::UPDATE_MODEL) { // sync param to memory + LL << "process() update model received"; + signalAndJoinForward(); + LL << "process() forward end received"; + } + } + + /** + * by forwarder + */ + void gatherDiff(Solver* another) { + Lock l(mu_diff); + for(int i = 0; i < another->net()->params().size(); i++){ + auto acc = diffBlobs[i]; + auto blob = another->net()->params()[i]; + ostringstream name; + name << "gatherDiff:solver.blobs[" << i << "]"; +// checkNAN(blob->count(), blob->cpu_diff(), name.str()); + caffe::caffe_add(acc->count(), blob->cpu_diff(), acc->cpu_diff(), acc->mutable_cpu_diff()); + } + } + + /** + * by pusher, synchronized (block until message sent) + */ + void pushDiff(){ + Lock l(mu_diff); + //push to app instead of + MessagePtr msg(new Message(kServerGroup)); + msg->key = {0}; + msg->task.set_key_channel(0); + float first, last; + for(int i = 0; i < diffs->vcount();i++){ + auto acc = diffBlobs[i]; + acc->cpu_diff(); // sync to cpu + auto diff = diffs->value(i); + CHECK_EQ(acc->cpu_diff(), diff.data()); + msg->addValue(diff); + if(i == 0){ + first = acc->cpu_diff()[0]; + }else if(i == diffs->vcount() - 1){ + last = acc->cpu_diff()[acc->count()-1]; + } + } + int push_time = diffs->push(msg); + diffs->waitOutMsg(kServerGroup, push_time); + LL << "Worker diff pushed:[" << first <<"," << last << "]"; + //clear previous diff + for(auto acc : diffBlobs){ + memset(acc->mutable_cpu_diff(), 0, acc->diff()->size()); + } + } + + /** + * by puller (except the initial pull), synchronized + */ + void pullWeight(){ + LL << "begin pull weight"; +// Task task; +// task.mutable_sgd()->set_cmd(SGDCall::UPDATE_MODEL); +// port(kServerGroup)->submitAndWait(task); + + Lock l(mu_weight); + MessagePtr msg(new Message(kServerGroup)); + msg->key = {0}; + LL << "begin pull"; + int pull_time = weights->pull(msg); + LL << "begin waitOutMsg"; + weights->waitOutMsg(kServerGroup, pull_time); + LL << "weight pulled from server, total:" << weights->totalSize(); + } + + /** + * by forwarder + */ + void copyWeight(Solver* another){ + float first,last; + for (int i = 0; i < another->net()->params().size();i++){ + auto blob = another->net()->params()[i]; + float* dest = blob->mutable_cpu_data(); + auto src = weights->value(i); + memcpy(dest, src.data(), blob->data()->size()); + //TODO direct copy to GPU? + if(i == 0){ + first = blob->cpu_data()[0]; + }else if(i == another->net()->params().size()-1){ + last = blob->cpu_data()[blob->count()-1]; + } + } + LL << "weight from server:[" << first << ",...," << last << "]"; + } +}; +void NetForwarder::copyWeight() { + this->worker->copyWeight(this->solver); +} + +void NetForwarder::accumulateDiff(){ + this->worker->gatherDiff(this->solver); +} + +} // namespace PS + +namespace PS { +App* App::create(const string& name, const string& conf) { + auto my_role = Postoffice::instance().myNode().role(); + if (my_role == Node::SERVER) { + return new CaffeServer(name, conf); + } else if(my_role == Node::WORKER){ + return new CaffeWorker(name, conf); + }else{ + return new App(); + } +} +} // namespace PS + + +int main(int argc, char *argv[]) { + + google::ParseCommandLineFlags(&argc, &argv, true); + + auto& sys = PS::Postoffice::instance(); + sys.start(&argc, &argv); + + sys.stop(); + return 0; +} + diff --git a/src/app/caffe/caffe_synced.cc b/src/app/caffe/caffe_synced.cc new file mode 100644 index 0000000..670e85b --- /dev/null +++ b/src/app/caffe/caffe_synced.cc @@ -0,0 +1,593 @@ +#include +#include +#include +#include + +#include "ps.h" +#include "system/app.h" +#include "parameter/v_vector.h" +#include "parameter/kv_vector.h" +#include "caffe/caffe.hpp" +#include "caffe/util/math_functions.hpp" +#include "caffe/util/upgrade_proto.hpp" +#include "app/caffe/util.h" +using caffe::Blob; +using caffe::Solver; +using caffe::SolverParameter; +using caffe::Caffe; +using caffe::caffe_scal; +using google::protobuf::io::CodedInputStream; + +// caffe cmd flags + +DEFINE_int32(gpu, -1, + "Run in GPU mode on given device ID."); +DEFINE_string(solver, "", + "The solver definition protocol buffer text file."); +DEFINE_string(model, "", + "The model definition protocol buffer text file.."); +DEFINE_string(snapshot, "", + "Optional; the snapshot solver state to resume training."); + + +DEFINE_bool(fb_only, true, + "DEPRECATED; workers only ForwardBackward."); +DEFINE_bool(synced, false, + "DEPRECATED; pull/push synced with Forward"); +// client puller / pusher flags +DEFINE_int32(pushstep, 3, + "interval, in minibatches, between push operation."); +DEFINE_int32(pullstep, 3, + "DEPRECATED interval, in minibatches, between pull operation."); + + + +caffe::SolverParameter solver_param; +Solver* initCaffeSolver(){ + + Solver* solver; + + CHECK_GT(FLAGS_solver.size(), 0) << "Need a solver definition to train."; + + caffe::ReadProtoFromTextFileOrDie(FLAGS_solver, &solver_param); + + if (FLAGS_gpu < 0 + && solver_param.solver_mode() == caffe::SolverParameter_SolverMode_GPU) { + FLAGS_gpu = solver_param.device_id(); + } + + // Set device id and mode + if (FLAGS_gpu >= 0) { + LOG(INFO) << "Use GPU with device ID " << FLAGS_gpu; + Caffe::SetDevice(FLAGS_gpu); + Caffe::set_mode(Caffe::GPU); + } else { + LOG(INFO) << "Use CPU."; + Caffe::set_mode(Caffe::CPU); + } + + solver = caffe::GetSolver(solver_param); + + if (FLAGS_snapshot.size()) { + LOG(INFO) << "Resuming from " << FLAGS_snapshot; + solver->Restore(FLAGS_snapshot.c_str()); + } + + return solver; +} + +caffe::Net* initCaffeNet(){ + CHECK_GT(FLAGS_solver.size(), 0) << "Need a solver definition to train."; + + caffe::ReadProtoFromTextFileOrDie(FLAGS_solver, &solver_param); + + caffe::NetParameter net_param; + std::string net_path = solver_param.net(); + caffe::ReadNetParamsFromTextFileOrDie(net_path, &net_param); + return new caffe::Net(net_param); +} + +#define V_WEIGHT "weight" +#define V_DIFF "diff" +#define V_SOLVER "solver" +namespace PS { +class CaffeServer : public App, public VVListener, public VVListener { + public: + CaffeServer(const string& name, const string& conf) : App(name) { } + virtual ~CaffeServer() { } + + virtual void init() { + LL << myNodeID() << ", this is server " << myRank(); + + solver = initCaffeSolver(); + + // initialize the weight at server + int total_weight = 0; + weights = new VVector(V_WEIGHT, true, this); + diffs = new VVector(V_DIFF, false, this); + solver_states = new VVector(V_SOLVER, true, this); + + for (int i = 0; i < solver->net()->params().size();i++){ + auto blob = solver->net()->params()[i]; + weights->value(i).reset(blob->mutable_cpu_data(), blob->count(), false); + Blob* newBlob = new Blob(blob->num(), blob->channels(), blob->height(), blob->width()); + diffBlobs.push_back(newBlob); + diffs->value(i).reset(newBlob->mutable_cpu_diff(), newBlob->count(), false); + Blob* accBlob = new Blob(blob->num(), blob->channels(), blob->height(), blob->width()); + accDiffBlobs.push_back(accBlob); + total_weight += blob->data()->size(); + } + + LL << "total weight size:" << total_weight; + } + + void testPhase(){ + Lock l(mu_solver); + solver->TestAll(); + } + + void snapshotPhase(){ + Lock l(mu_solver); + solver->Snapshot(); + } + + void resetDiffCount() { + Lock l(mu_accDiff); + accDiffCount = 0; + for(auto acc : diffBlobs){ + memset(acc->mutable_cpu_diff(), 0, acc->diff()->size()); + } + for(auto acc : accDiffBlobs){ + memset(acc->mutable_cpu_diff(), 0, acc->diff()->size()); + } + } + + void signalWorkersStart(){ + resetDiffCount(); + Task task; + auto sgd = task.mutable_sgd(); + sgd->set_cmd(SGDCall::UPDATE_MODEL); + port(kWorkerGroup)->submitAndWait(task); +// port(kWorkerGroup)->submit(task); + LL << "signalWorkerStart returned"; + } + + void gatherDiffs(){ + //TODO do not need to synchronize all workers? + /* + while(true){ + { + Lock l0(mu_accDiff); + if(accDiffCount == sys_.yp().num_workers()){ + break; + } + } + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + */ +// LL << "all diff gathered: " << accDiffCount; + Lock l(mu_solver); + Lock l2(mu_accDiff); + CHECK_EQ(accDiffCount, sys_.yp().num_workers()) << "accDiffCount should be same as num_workers!"; +// float first,last, firstv, lastv; + for (int i = 0; i < solver->net()->params().size();i++){ + auto blob = solver->net()->params()[i]; + float* dest = blob->mutable_cpu_diff(); + auto src = accDiffBlobs[i]; + memcpy(dest, src->cpu_diff(), blob->diff()->size()); + +/* if(i==0){ + first=blob->cpu_diff()[0]; + firstv = src[0]; + }else if(i == solver->net()->params().size()-1){ + last=blob->cpu_diff()[blob->count()-1]; + lastv = src[src.size() - 1]; + } +*/ + } + LL << "accDiffs copied to net->params->diff"; + + } + + + void run() { + LL << myNodeID() << ", server " << myRank() << " run()ing"; + auto param = solver->param(); + int iter = solver->param().max_iter() - solver->iter(); + LL << "start training loop"; + for (int i = 0; i < iter; i++) { + signalWorkersStart(); + gatherDiffs(); + solver->ComputeUpdateValue(); + solver->net()->Update(); + solver->snapshotPhase(); + solver->stepEnd(); + } + } + + void process(const MessagePtr& msg) { + auto sgd = msg->task.sgd(); + if (sgd.cmd() == SGDCall::UPDATE_MODEL) { // sync param to memory + { + Lock l(mu_solver); + } + } + } + + void vectorChanged(VVector* data){ +// LL << "vector change received:" << data->name(); + CHECK_EQ(data, this->diffs) << "server only accept diff changes"; + + Lock l(mu_accDiff); + // append diffs to accDiffs +// float first,last, firstv, lastv; + for (int i = 0; i < diffBlobs.size();i++) { + auto blob = diffBlobs[i]; + float* dest = accDiffBlobs[i]->mutable_cpu_diff(); + float* src = diffBlobs[i]->mutable_cpu_diff(); + //scale down? + if(FLAGS_pushstep != 0){ + caffe::caffe_scal(blob->count(), float(1.0 / FLAGS_pushstep), src); + } + + caffe::caffe_add(blob->count(), src, dest, dest); + +/* if(i==0){ + first=blob->cpu_diff()[0]; + firstv = src[0]; + }else if(i == solver->net()->params().size()-1){ + last=blob->cpu_diff()[blob->count()-1]; + lastv = src[src.size() - 1]; + } +*/ + } + accDiffCount++; +// LL << "diff gathered: # " << accDiffCount; +// LL<< "got diff[" << first<<",...,"<* data){ + CHECK(false) << "shouldn't be any VVector change: "<< data->name(); + } + + void vectorGetting(VVector* data){ + Lock l(mu_solver); + float first, last; + if (data == this->weights){ + // need to sync to CPU + for (int i = 0; i < solver->net()->params().size();i++){ + auto blob = solver->net()->params()[i]; + blob->cpu_data(); + if (i==0) { + first = blob->cpu_data()[0]; + } else if (i == solver->net()->params().size() - 1 ) { + last = blob->cpu_data()[blob->count()-1]; + } + } + LL << "weight synced: ["<name(); + } + } + + void vectorGetting(VVector* data){ + LL << "getting char: "<< data->name(); + Lock l(mu_solver); + if (data == this->solver_states){ + // need to serialize solver state into solver_states + caffe::SolverState state; + this->solver->SnapshotSolverState(&state); + state.set_iter(solver->iter()); + state.set_current_step(solver->current_step()); + string buf; + state.SerializeToString(&buf); + solver_states->value(0).resize( buf.size() ); + memcpy(solver_states->value(0).data(), buf.data(), buf.size()); + LL << "server solver state saved, history:" << state.history_size() << ", total:" << buf.size(); + } else { + CHECK(false) << "some one is getting none-gettable! " << data->name(); + } + } + + + private: + VVector *solver_states; // individual data ptr, solver state to initialize workers + VVector *weights; //share data ptr with solver->net->params->cpu_data + VVector *diffs; //individual data ptr with diffBlobs + std::vector*> diffBlobs; + + std::vector*> accDiffBlobs; + int accDiffCount; + std::mutex mu_accDiff; // guard accDiffCount and accDiffBlobs + + std::mutex mu_solver; + caffe::Solver* solver; +}; + +App* CreateServerNode(const std::string& conf) { + return new CaffeServer("app", conf); +} + +class CaffeWorker: public App{ +private: + + std::mutex mu_weight; // protect write to weight_ready and weights + volatile bool weight_ready; + std::mutex mu_diff; //protect write to diffs + + + std::mutex mu_forward; + std::condition_variable cv_forward; + bool start_forward; + + VVector *solver_states; // individual data ptr, solver state to initialize workers + + VVector *weights;// individual data ptr, same order/size as solver->net->params + VVector *diffs;// for accumulated diff, share memory with diffBlobs + std::vector*> diffBlobs; + caffe::Solver* solver; + + volatile unsigned int tickDiff=0, tickStep=0; + + volatile bool _terminate = false; + +public: + CaffeWorker(const string& name, const string& conf):App(name){ + + } + ~CaffeWorker(){ + + } + + void init(){ + LL << "worker init()"; + weight_ready = false; + start_forward = false; + solver = initCaffeSolver(); + //init shared parameter at worker + weights = new VVector(V_WEIGHT); + diffs = new VVector(V_DIFF); + solver_states = new VVector(V_SOLVER); + solver_states->value(0) = {}; + solver_states->setResizable(true); + + for (int i = 0; i < solver->net()->params().size();i++){ + auto blob = solver->net()->params()[i]; + weights->value(i).resize(blob->count()); + Blob* newBlob = new Blob(blob->num(), blob->channels(), blob->height(), blob->width()); + diffBlobs.push_back(newBlob); + diffs->value(i).reset(newBlob->mutable_cpu_diff(), newBlob->count(), false); + } + LL << "worker init() over"; + } + + /** + * by run() thread + */ + void waitForwardSignal(){ + std::unique_lock l(mu_forward); + while(!start_forward){ + cv_forward.wait(l); + } + } + + /** + * by run() thread + */ + void signalForwardEnd(){ + std::unique_lock l(mu_forward); + start_forward = false; + cv_forward.notify_all(); + } + + /** + * by process() thread + */ + void signalAndJoinForward() { + std::unique_lock l(mu_forward); + start_forward = true; + cv_forward.notify_all(); + while(start_forward) { + cv_forward.wait(l); + } + } + + + /** + * by main + */ + void run(){ + LL << "worker run()"; + while(true) { + // wait signal to forward + waitForwardSignal(); + LL << "run() forward signal received"; + pullWeight(); + swapWeight(); + for (int i = 0; i < FLAGS_pushstep; i++) { + solver->testPhase(); + solver->forwardBackwardPhase(); + this->accumulateDiff(); + solver->displayPhase(); + // bypass all of computeUpdateValue + solver->stepEnd(); + stepEnd(); + } + LL << "pushstep " << FLAGS_pushstep << "reached."; + pushDiff(); + LL << "run() sending forward end signal"; + signalForwardEnd(); + } + LL << "worker run() over"; + } + + void process(const MessagePtr& msg) { + LL << "message received"; + auto sgd = msg->task.sgd(); + if (sgd.cmd() == SGDCall::UPDATE_MODEL) { // sync param to memory + LL << "process() update model received"; + signalAndJoinForward(); + LL << "process() forward end received"; + } + } + + /** + * notify accumulateDiff end, for pusher counting + */ + void diffEnd(){ + tickDiff++; + } + /** + * for puller counting + */ + void stepEnd(){ + tickStep++; + } + + /** + * by pusher, synchronized (block until message sent) + */ + void pushDiff(){ + Lock l(mu_diff); + //push to app instead of + MessagePtr msg(new Message(kServerGroup)); + msg->key = {0}; + msg->task.set_key_channel(0); + for(int i = 0; i < diffs->vcount();i++){ + auto acc = diffBlobs[i]; + acc->cpu_diff(); // sync to cpu + auto diff = diffs->value(i); + CHECK_EQ(acc->cpu_diff(), diff.data()); + msg->addValue(diff); + } + int push_time = diffs->push(msg); + diffs->waitOutMsg(kServerGroup, push_time); + //clear previous diff + for(auto acc : diffBlobs){ + memset(acc->mutable_cpu_diff(), 0, acc->diff()->size()); + } + } + + /** + * by main + */ + void accumulateDiff(){ + { + Lock l(mu_diff); + for(int i = 0; i < solver->net()->params().size(); i++){ + auto acc = diffBlobs[i]; + auto blob = solver->net()->params()[i]; + ostringstream name; + name << "accumulateDiff:net.params[" << i << "].diff"; + checkNAN(blob->count(), blob->cpu_diff(), name.str()); + switch (Caffe::mode()) { + case Caffe::CPU: + caffe::caffe_add(acc->count(), blob->cpu_diff(), acc->cpu_diff(), acc->mutable_cpu_diff()); + break; + case Caffe::GPU: + caffe::caffe_gpu_add(acc->count(), blob->gpu_diff(), acc->gpu_diff(), acc->mutable_gpu_diff()); + break; + default: + LOG(FATAL) << "Unknown caffe mode: " << Caffe::mode(); + } + } + } + diffEnd(); + } + + /** + * by main + */ + void pullSolverState() { + MessagePtr msg(new Message(kServerGroup)); + msg->key = {0}; + int pull_time = solver_states->pull(msg); + solver_states->waitOutMsg(kServerGroup, pull_time); + Lock l(mu_weight); + SArraysrc = solver_states->value(0); + LL << "solver state got: " << src.size(); + caffe::SolverState state; + CodedInputStream cis((const uint8*) src.data(), src.size()); + cis.SetTotalBytesLimit(INT_MAX, 536870912); + state.ParseFromCodedStream(&cis); + + solver->RestoreSolverState(state); + solver->setIter(state.iter()); + solver->setCurrentStep(state.current_step()); + } + + /** + * by puller (except the initial pull), synchronized + */ + void pullWeight(){ + LL << "begin pull weight"; +// Task task; +// task.mutable_sgd()->set_cmd(SGDCall::UPDATE_MODEL); +// port(kServerGroup)->submitAndWait(task); + + Lock l(mu_weight); + if(weight_ready){ + LL << "weight_ready!"; + return; + } + MessagePtr msg(new Message(kServerGroup)); + msg->key = {0}; + LL << "begin pull"; + int pull_time = weights->pull(msg); + LL << "begin waitOutMsg"; + weights->waitOutMsg(kServerGroup, pull_time); + weight_ready = true; + LL << "weight pulled from server, total:" << weights->totalSize(); + } + /** + * by main, copy received weight into solver->net + */ + void swapWeight(){ + Lock l(mu_weight); + if(!weight_ready){ + return; + } + float first,last; + for (int i = 0; i < solver->net()->params().size();i++){ + auto blob = solver->net()->params()[i]; + float* dest = blob->mutable_cpu_data(); + auto src = weights->value(i); + memcpy(dest, src.data(), blob->data()->size()); + //TODO direct copy to GPU? + if(i == 0){ + first = blob->cpu_data()[0]; + }else if(i == solver->net()->params().size()-1){ + last = blob->cpu_data()[blob->count()-1]; + } + } + LL << "weight from server:[" << first << ",...," << last << "]"; + weight_ready = false; + } +}; +} // namespace PS + +namespace PS { +App* App::create(const string& name, const string& conf) { + auto my_role = Postoffice::instance().myNode().role(); + if (my_role == Node::SERVER) { + return new CaffeServer(name, conf); + } else if(my_role == Node::WORKER){ + return new CaffeWorker(name, conf); + }else{ + return new App(); + } +} +} // namespace PS + + +int main(int argc, char *argv[]) { + + google::ParseCommandLineFlags(&argc, &argv, true); + + auto& sys = PS::Postoffice::instance(); + sys.start(&argc, &argv); + + sys.stop(); + return 0; +} + diff --git a/src/app/caffe/util.h b/src/app/caffe/util.h new file mode 100644 index 0000000..43261d3 --- /dev/null +++ b/src/app/caffe/util.h @@ -0,0 +1,38 @@ +/* + * util.h + * + * Created on: Apr 21, 2015 + * Author: immars + */ + +#ifndef SRC_APP_CAFFE_UTIL_H_ +#define SRC_APP_CAFFE_UTIL_H_ +#include +#include +#include +#include "util/common.h" + +using namespace std; + +void checkNAN(int count, const float* data, string blobName){ + bool isNan = false; + int nanIndex = -1; + int nanCount = 0; + for (int j = 0; j < count; j++){ + if(isnan(data[j])){ + isNan = true; + nanIndex = j; + nanCount++; + } + } + if(isNan){ + LL << nanCount << "NANs in "<< blobName <<"[" << nanIndex << "]!"; + } +} + +inline unsigned long long tick(struct timeval* tv) { + gettimeofday(tv, NULL); + return tv->tv_sec * 1000000 + tv->tv_usec; +} + +#endif /* SRC_APP_CAFFE_UTIL_H_ */ diff --git a/src/parameter/v_vector.h b/src/parameter/v_vector.h new file mode 100644 index 0000000..8ad3b99 --- /dev/null +++ b/src/parameter/v_vector.h @@ -0,0 +1,145 @@ +#pragma once +#include "Eigen/Dense" +#include "parameter/shared_parameter.h" +#include "util/parallel_ordered_match.h" +#include "caffe/util/math_functions.hpp" +namespace PS { + +template class VVector; +template using VVectorPtr = std::shared_ptr>; + + +#define USING_SHARED_PARAMETER_INT8 \ + using Customer::port; \ + using Customer::myNodeID; \ + using SharedParameter::get; \ + using SharedParameter::set; \ + using SharedParameter::myKeyRange; \ + using SharedParameter::keyRange; \ + using SharedParameter::sync + +using caffe::caffe_copy; +using caffe::caffe_add; + +// value vector only, access entirely ignoring keys/key ranges. key default to 0. +// values are stored in arrays. + +template +class VVListener{ +public: + // before value is about to change +// virtual void vectorChanging(VVector* data) = 0; + // after value is set + virtual void vectorChanged(VVector* data) = 0; + // before someone is requesting this vector + virtual void vectorGetting(VVector* data) = 0; + //after value got from this vector +// virtual void vectorGot(VVector* data) = 0; + + virtual ~VVListener() {}; +}; + +template +class VVector : public SharedParameter { + public: + VVector(const string& my_name, bool readonly = false, VVListener* listener = nullptr, const string& parent_name = FLAGS_app_name, int k = 1) : + SharedParameter(my_name, parent_name), val_entry_size_(k) { + this->listener = listener; + this->readonly = readonly; + this->resizable = false; + } + // VVector() : val_entry_size_(1) { } + // VVector(int k) : val_entry_size_(k) { } + virtual ~VVector() { } + + SArray& value(int block_index = 0) { Lock l(mu_); return val_[block_index]; } + void clear(int channel = 0) { Lock l(mu_); val_.erase(channel); } + + // find the local positions of a global key range +// SizeR find(int channel, const Range& key_range) { +// return key(channel).findRange(key_range); +// } + + int valueEntrySize() const { return val_entry_size_; } + + int vcount() {return this->val_.size();} + + void setResizable(bool resize) { this->resizable = resize; } + bool isResizable() { return resizable; } + + int totalSize() { + int sum = 0; + for(int i = 0; i < val_.size(); i++){ + auto v = value(i); + sum += v.size(); + } + return sum; + } + // functions will used by the system + MessagePtrList slice(const MessagePtr& msg, const KeyRangeList& sep); + void getValue(const MessagePtr& msg); + void setValue(const MessagePtr& msg); + + USING_SHARED_PARAMETER_INT8; + protected: + std::mutex mu_; + std::unordered_map> val_; + int val_entry_size_; + + VVListener* listener; + bool readonly; + bool resizable; +}; + +template +void VVector::setValue(const MessagePtr& msg) { + CHECK(!readonly) << "VVector[" << name() << "] is read only!"; +// LL << "VVector::setValue received"; + // do check + CHECK_EQ(msg->value.size(), val_.size()) << "my size(" << val_.size() << ") != message size(" << msg->value.size() << ")"; + for(int i = 0; i < msg->value.size(); i++){ + SArray recv_val(msg->value[i]); + auto& my_val = value(i); + if (get(msg).has_tail_filter() || get(msg).gather()) { + // join the received data with my current data + SArray new_val; + if (recv_val.empty()) { + CHECK(my_val.empty()); + } else { + LL << "VVector::setValue before caffe_add"; + SArray key = {0}; + parallelUnion(key, my_val, key, recv_val, &key, &new_val); + my_val = new_val; + } + } else { + CHECK(isResizable() || my_val.size() == recv_val.size()) << "VVector only support receiving whole VVector"; + my_val.copyFrom(recv_val); + } + } + if(listener){ + listener->vectorChanged(this); + } +// LL << "VVector::setValue leaved"; +} + +template +void VVector::getValue(const MessagePtr& msg) { + if(listener){ + listener->vectorGetting(this); + } +// LL << "VVector::getValue received"; + // get the data + msg->clearValue(); + for(int i = 0; i < val_.size(); i++){ + msg->addValue(this->value(i)); + } +} + +// partition is a sorted key ranges +template +MessagePtrList VVector::slice(const MessagePtr& msg, const KeyRangeList& sep) { + if (get(msg).replica()) return Customer::slice(msg, sep); + return sliceKeyOrderedMsg(msg, sep); +} + +} // namespace PS