Skip to content
This repository was archived by the owner on Jan 26, 2021. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -270,3 +270,5 @@ _Pvt_Extensions

# Python
*.pyc
*.egg
binding/python/multiverso_python.egg-info/
5 changes: 5 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ if(USE_HDFS)
LINK_DIRECTORIES(${JVM_LIB})
endif(USE_HDFS)

if(ENABLE_DCASGD)
add_definitions(-DENABLE_DCASGD)
endif(ENABLE_DCASGD)

include_directories(${PROJECT_SOURCE_DIR}/include)

set(MULTIVERSO_DIR ${PROJECT_SOURCE_DIR})
Expand All @@ -43,3 +47,4 @@ configure_file(

add_custom_target(uninstall
COMMAND ${CMAKE_COMMAND} -P ${CMAKE_CURRENT_BINARY_DIR}/cmake_uninstall.cmake)

129 changes: 129 additions & 0 deletions binding/python/examples/torch/mnist.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
from __future__ import print_function
import argparse
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.autograd import Variable

import numpy as np
import multiverso as mv
from multiverso.torch_ext import torchmodel

mv.init(sync=True, updater='sgd')

# Training settings
parser = argparse.ArgumentParser(description='PyTorch MNIST Example')
parser.add_argument('--batch-size', type=int, default=64, metavar='N',
help='input batch size for training (default: 64)')
parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N',
help='input batch size for testing (default: 1000)')
parser.add_argument('--epochs', type=int, default=10, metavar='N',
help='number of epochs to train (default: 10)')
parser.add_argument('--lr', type=float, default=0.01, metavar='LR',
help='learning rate (default: 0.01)')
parser.add_argument('--momentum', type=float, default=0, metavar='M',
help='SGD momentum (default: 0)')
parser.add_argument('--no-cuda', action='store_true', default=False,
help='disables CUDA training')
parser.add_argument('--seed', type=int, default=1, metavar='S',
help='random seed (default: 1)')
parser.add_argument('--log-interval', type=int, default=10, metavar='N',
help='how many batches to wait before logging training status')
args = parser.parse_args()
args.cuda = not args.no_cuda and torch.cuda.is_available()

torch.manual_seed(args.seed)
if args.cuda:
torch.cuda.manual_seed(args.seed)


kwargs = {'num_workers': 1, 'pin_memory': True} if args.cuda else {}
train_loader = torch.utils.data.DataLoader(
datasets.MNIST('../data', train=True, download=True,
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
])),
batch_size=args.batch_size, shuffle=True, **kwargs)
test_loader = torch.utils.data.DataLoader(
datasets.MNIST('../data', train=False, transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
])),
batch_size=args.batch_size, shuffle=True, **kwargs)


class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
self.conv2_drop = nn.Dropout2d()
self.fc1 = nn.Linear(320, 50)
self.fc2 = nn.Linear(50, 10)

def forward(self, x):
x = F.relu(F.max_pool2d(self.conv1(x), 2))
x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
x = x.view(-1, 320)
x = F.relu(self.fc1(x))
x = F.dropout(x, training=self.training)
x = self.fc2(x)
return F.log_softmax(x)

model = torchmodel.MVTorchModel(Net())

if args.cuda:
model.cuda()

optimizer = optim.SGD(model.parameters(), lr=args.lr * mv.workers_num(), momentum=args.momentum)

def train(epoch):
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
if batch_idx % mv.workers_num() == mv.worker_id():
if args.cuda:
data, target = data.cuda(), target.cuda()
data, target = Variable(data), Variable(target)
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()

model.cpu()
model.mv_sync()
model.cuda()

if (batch_idx/mv.workers_num()) % args.log_interval == 0:
print('Worker: {}\tTrain Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
mv.worker_id(), epoch, batch_idx * len(data), len(train_loader.dataset),
100. * batch_idx / len(train_loader), loss.data[0]))

def test(epoch):
model.eval()
test_loss = 0
correct = 0
for data, target in test_loader:
if args.cuda:
data, target = data.cuda(), target.cuda()
data, target = Variable(data, volatile=True), Variable(target)
output = model(data)
test_loss += F.nll_loss(output, target).data[0]
pred = output.data.max(1)[1] # get the index of the max log-probability
correct += pred.eq(target.data).cpu().sum()

test_loss = test_loss
test_loss /= len(test_loader) # loss function already averages over batch size
print('\nWorker: {}\tTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
mv.worker_id(), test_loss, correct, len(test_loader.dataset),
100. * correct / len(test_loader.dataset)))


for epoch in range(1, args.epochs + 1):
train(epoch)
test(epoch)

mv.shutdown()
4 changes: 3 additions & 1 deletion binding/python/multiverso/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
mv_lib = Loader.get_lib()


def init(sync=False):
def init(sync=False, updater=None):
'''Initialize mutliverso.

This should be called only once before training at the beginning of the
Expand All @@ -29,6 +29,8 @@ def init(sync=False):
args = [b""] # the first argument will be ignored. So we put a placeholder here
if sync:
args.append(b"-sync=true")
if updater:
args.append(b"-updater_type="+updater)
n = len(args)
args_type = ctypes.c_char_p * n
mv_lib.MV_Init(ctypes.pointer(ctypes.c_int(n)), args_type(*[ctypes.c_char_p(arg) for arg in args]))
Expand Down
10 changes: 7 additions & 3 deletions binding/python/multiverso/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def get(self):
mv_lib.MV_GetArrayTable(self._handler, data.ctypes.data_as(C_FLOAT_P), self._size)
return data

def add(self, data, sync=False):
def add(self, data, sync=False, lr=0.1, mom=0.0, rho=0.0, lam=0.0):
'''add the data to the multiverso ArrayTable

Data type of `data` is numpy.ndarray with one-dimensional
Expand All @@ -76,9 +76,13 @@ def add(self, data, sync=False):
data = convert_data(data)
assert(data.size == self._size)
if sync:
mv_lib.MV_AddArrayTable(self._handler, data.ctypes.data_as(C_FLOAT_P), self._size)
# mv_lib.MV_AddArrayTable(self._handler, data.ctypes.data_as(C_FLOAT_P), self._size)
mv_lib.MV_AddArrayTableOption(self._handler, data.ctypes.data_as(C_FLOAT_P), self._size,
ctypes.c_float(lr), ctypes.c_float(mom), ctypes.c_float(rho), ctypes.c_float(lam))
else:
mv_lib.MV_AddAsyncArrayTable(self._handler, data.ctypes.data_as(C_FLOAT_P), self._size)
# mv_lib.MV_AddAsyncArrayTable(self._handler, data.ctypes.data_as(C_FLOAT_P), self._size)
mv_lib.MV_AddAsyncArrayTableOption(self._handler, data.ctypes.data_as(C_FLOAT_P), self._size,
ctypes.c_float(lr), ctypes.c_float(mom), ctypes.c_float(rho), ctypes.c_float(lam))


class MatrixTableHandler(TableHandler):
Expand Down
Empty file.
51 changes: 51 additions & 0 deletions binding/python/multiverso/torch_ext/torchmodel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#!/usr/bin/env python
# coding:utf8

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.autograd import Variable

import numpy as np
import multiverso as mv


class MVTorchModel(object):
def __init__(self, tmobj):
assert(isinstance(tmobj, nn.Module))
self._tmobj = tmobj
self._mv_params=[]
for param in self._tmobj.parameters():
self._mv_params.append(mv.ArrayTableHandler(param.data.numpy().size, param.data.numpy().reshape((-1,))))
mv.barrier()
self._last_mv_params=[]
for mv_param in self._mv_params:
self._last_mv_params.append(mv_param.get())
for param, last_mv_param in zip(self._tmobj.parameters(), self._last_mv_params):
param.data=torch.from_numpy(last_mv_param.reshape(param.data.numpy().shape))

def mv_sync(self, lr=0.1, mom=0.0, rho=0.0, lam=0.0):
for mv_param, last_mv_param, param in zip(self._mv_params, self._last_mv_params, self._tmobj.parameters()):
mv_param.add(last_mv_param - param.data.numpy().reshape((-1,)), lr=lr, mom=mom, rho=rho, lam=lam)

for i, (mv_param, last_mv_param, param) in enumerate(zip(self._mv_params, self._last_mv_params, self._tmobj.parameters())):
self._last_mv_params[i]=mv_param.get()
param.data=torch.from_numpy(self._last_mv_params[i].reshape(param.data.numpy().shape))

def __call__(self, *args, **kwargs):
return self._tmobj(*args, **kwargs)

def __getstate__(self):
odict = self.__dict__.copy()
del odict['_mv_params']
return odict

def __getattribute__(self, attr):
if attr in ['_tmobj', '_mv_params', '_last_mv_params']:
return object.__getattribute__(self, attr)
elif attr in ['mv_sync', '__call__','__getstate__']:
return getattr(MVTorchModel, attr).__get__(self)
else:
return getattr(self._tmobj, attr)
2 changes: 1 addition & 1 deletion binding/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def readme():
url='https://github.com/Microsoft/multiverso',
author='Microsoft',
license='MIT',
packages=['multiverso', 'multiverso.theano_ext', 'multiverso.theano_ext.lasagne_ext'],
packages=['multiverso', 'multiverso.torch_ext', 'multiverso.theano_ext', 'multiverso.theano_ext.lasagne_ext'],
# TODO: The lasagne on pypi is too old. multiverso need some functions in
# lasagne-0.2 which is not released yet. Please replace the dev version
# with the stable release later.
Expand Down
3 changes: 3 additions & 0 deletions include/multiverso/c_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@ DllExport void MV_GetArrayTable(TableHandler handler, float* data, int size);

DllExport void MV_AddArrayTable(TableHandler handler, float* data, int size);

DllExport void MV_AddArrayTableOption(TableHandler handler, float* data, int size, float lr, float mom, float rho, float lambda);

DllExport void MV_AddAsyncArrayTable(TableHandler handler, float* data, int size);

DllExport void MV_AddAsyncArrayTableOption(TableHandler handler, float* data, int size, float lr, float mom, float rho, float lambda);

// Matrix Table
DllExport void MV_NewMatrixTable(int num_row, int num_col, TableHandler* out);
Expand Down
2 changes: 1 addition & 1 deletion include/multiverso/updater/dcasgd
Submodule dcasgd updated 1 files
+2 −2 dcasgd_updater.h
4 changes: 2 additions & 2 deletions include/multiverso/updater/sgd_updater.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ template <typename T>
class SGDUpdater : public Updater<T> {
public:
explicit SGDUpdater(size_t){
Log::Debug("[SGDUpdater] Init. \n");
Log::Debug("[SGDUpdater] Init. \n");
}
void Update(size_t num_element, T* data, T* delta,
AddOption*, size_t offset) override {
Expand All @@ -28,4 +28,4 @@ class SGDUpdater : public Updater<T> {

}

#endif // MULTIVERSO_UPDATER_ASGD_UPDATER_H_
#endif // MULTIVERSO_UPDATER_ASGD_UPDATER_H_
22 changes: 22 additions & 0 deletions src/c_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "multiverso/table/array_table.h"
#include "multiverso/table/matrix_table.h"
#include "multiverso/util/log.h"
#include "multiverso/updater/updater.h"


extern "C" {
Expand Down Expand Up @@ -46,11 +47,32 @@ void MV_AddArrayTable(TableHandler handler, float* data, int size) {
worker->Add(data, size);
}

void MV_AddArrayTableOption(TableHandler handler, float* data, int size, float lr, float mom, float rho, float lambda) {
auto worker = reinterpret_cast<multiverso::ArrayWorker<float>*>(handler);
multiverso::AddOption option;
option.set_worker_id(multiverso::MV_WorkerId());
option.set_learning_rate(lr);
option.set_momentum(mom);
option.set_rho(rho);
option.set_lambda(lambda);
worker->Add(data, size, &option);
}

void MV_AddAsyncArrayTable(TableHandler handler, float* data, int size) {
auto worker = reinterpret_cast<multiverso::ArrayWorker<float>*>(handler);
worker->AddAsync(data, size);
}

void MV_AddAsyncArrayTableOption(TableHandler handler, float* data, int size, float lr, float mom, float rho, float lambda) {
auto worker = reinterpret_cast<multiverso::ArrayWorker<float>*>(handler);
multiverso::AddOption option;
option.set_worker_id(multiverso::MV_WorkerId());
option.set_learning_rate(lr);
option.set_momentum(mom);
option.set_rho(rho);
option.set_lambda(lambda);
worker->AddAsync(data, size, &option);
}

// MatrixTable
void MV_NewMatrixTable(int num_row, int num_col, TableHandler* out) {
Expand Down
6 changes: 2 additions & 4 deletions src/updater/updater.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ namespace multiverso {

MV_DEFINE_string(updater_type, "default", "multiverso server updater type");
MV_DEFINE_int(omp_threads, 4 , "#theads used by openMP for updater");
#ifdef ENABLE_DCASGD
MV_DEFINE_bool(is_pipelined, false, "Only used for CNTK - DCASGD");
#endif

template <typename T>
void Updater<T>::Update(size_t num_element, T* data, T* delta,
Expand Down Expand Up @@ -51,9 +48,10 @@ Updater<T>* Updater<T>::GetUpdater(size_t size) {
if (type == "adagrad") return new AdaGradUpdater<T>(size);
if (type == "momentum_sgd") return new MomentumUpdater<T>(size);
#ifdef ENABLE_DCASGD
if (type == "dcasgd") return new DCASGDUpdater<T>(size, MV_CONFIG_is_pipelined);
if (type == "dcasgd") return new DCASGDUpdater<T>(size);
#endif
// Default: simple updater
Log::Info("[Updater] Init. \n");
return new Updater<T>();
}

Expand Down