-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcustom_network.py
176 lines (139 loc) · 5.58 KB
/
custom_network.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
import copy
from mimetypes import init
from utils import findConv2dOutShape
import torch.nn.functional as F
import torch
import torch.nn as nn
from utils import loss_batch, get_lr
device = torch.device("cuda")
class Net (nn.Module):
def __init__(self,params):
super(Net,self). __init__()
C_in,H_in,W_in=params["input_shape"]
init_f=params["initial_filters"]
num_fc1=params["num_fc1"]
num_classes=params["num_classes"]
self.dropout_rate=params["dropout_rate"]
self.activation_str = params['activation_func']
self.conv1 = nn.Conv2d(C_in, init_f, kernel_size=3)
h,w=findConv2dOutShape(H_in,W_in,self.conv1)
self.conv2 = nn.Conv2d(init_f, 2*init_f, kernel_size=3)
h,w=findConv2dOutShape(h,w,self.conv2)
self.conv3 = nn.Conv2d(2*init_f, 4*init_f, kernel_size=3)
h,w=findConv2dOutShape(h,w,self.conv3)
self.conv4 = nn.Conv2d(4*init_f, 8*init_f, kernel_size=3)
h,w=findConv2dOutShape(h,w,self.conv4)
# compute the flatten size
self.num_flatten=h*w*8*init_f
self.fc1 = nn.Linear(self.num_flatten, num_fc1)
self.fc2 = nn.Linear(num_fc1, num_classes)
@staticmethod
def activation_func(act_str):
if act_str == 'tanh' or act_str == 'sigmoid':
return eval("torch." + act_str)
elif act_str == 'relu' or act_str == 'leaky_relu' or act_str == 'silu':
return eval('torch.nn.functional.' + act_str)
def forward(self, x):
x = self.activation_func(self.activation_str)(self.conv1(x))
x = F.max_pool2d(x, 2, 2)
x = self.activation_func(self.activation_str)(self.conv2(x))
x = F.max_pool2d(x, 2, 2)
x = self.activation_func(self.activation_str)(self.conv3(x))
x = F.max_pool2d(x, 2, 2)
x = self.activation_func(self.activation_str)(self.conv4(x))
x = F.max_pool2d(x, 2, 2)
x = x.view(-1, self.num_flatten)
x = self.activation_func(self.activation_str)(self.fc1(x))
x=F.dropout(x, self.dropout_rate, training= self.training)
x = self.fc2(x)
return F.log_softmax(x, dim=1)
def loss_epoch(model, loss_func, dataset_dl, sanity_check=False, opt=None):
running_loss=0.0
running_metric=0.0
data_steps = 0
total = 0
len_data=len(dataset_dl.dataset)
for xb, yb in dataset_dl:
# move batch to device
xb=xb.to(device)
yb=yb.to(device)
# get model output
output=model(xb)
# get loss per batch
loss_b,metric_b=loss_batch(loss_func, output, yb, opt)
# update running loss
running_loss+=loss_b
data_steps +=1
total += yb.size(0)
# update running metric
if metric_b is not None:
running_metric+=metric_b
# break the loop in case of sanity check
if sanity_check is True:
break
# average loss value
loss=running_loss/float(data_steps)
# average metric value
metric=running_metric/float(total)
return loss, metric
def train_val(model, params):
# extract model parameters
num_epochs=params["num_epochs"]
loss_func=params["loss_func"]
opt=params["optimizer"]
train_dl=params["train_dl"]
val_dl=params["val_dl"]
sanity_check=params["sanity_check"]
lr_scheduler=params["lr_scheduler"]
path2weights=params["path2weights"]
save=params["save_weights"]
# history of loss values in each epoch
loss_history={
"train": [],
"val": [],
}
# histroy of metric values in each epoch
metric_history={
"train": [],
"val": [],
}
# a deep copy of weights for the best performing model
best_model_wts = copy.deepcopy(model.state_dict())
# initialize best loss to a large value
best_loss=float('inf')
# main loop
for epoch in range(num_epochs):
# get current learning rate
current_lr=get_lr(opt)
print('Epoch {}/{}, current lr={}'.format(epoch, num_epochs - 1, current_lr))
# train model on training dataset
model.train()
train_loss, train_metric=loss_epoch(model,loss_func,train_dl,sanity_check,opt)
# collect loss and metric for training dataset
loss_history["train"].append(train_loss)
metric_history["train"].append(train_metric)
# evaluate model on validation dataset
model.eval()
with torch.no_grad():
val_loss, val_metric=loss_epoch(model,loss_func,val_dl,sanity_check)
# store best model
if val_loss < best_loss:
best_loss = val_loss
best_model_wts = copy.deepcopy(model.state_dict())
# store weights into a local file
if save:
torch.save(model.state_dict(), path2weights)
print("Copied best model weights!")
# collect loss and metric for validation dataset
loss_history["val"].append(val_loss)
metric_history["val"].append(val_metric)
# learning rate schedule
lr_scheduler.step(val_loss)
if current_lr != get_lr(opt):
print("Loading best model weights!")
model.load_state_dict(best_model_wts)
print("train loss: %.6f, val loss: %.6f, val accuracy: %.2f" %(train_loss,val_loss,100*val_metric))
print("-"*10)
# load best model weights
model.load_state_dict(best_model_wts)
return model, loss_history, metric_history