diff --git a/.gitignore b/.gitignore index 7b5af5f..895ccad 100644 --- a/.gitignore +++ b/.gitignore @@ -17,7 +17,6 @@ mf/*.pbs dataset/* jobs/* mf/*.json -*.json *.csv raw_data/*/* baselines/* @@ -39,3 +38,7 @@ ae-hmf/* rank/* ae-word2vec/* dlstm/* +log/* +config/environment/* + +!config/environment/local.json diff --git a/attributes/input_attribute.py b/attributes/input_attribute.py index 4add081..57af604 100644 --- a/attributes/input_attribute.py +++ b/attributes/input_attribute.py @@ -1,72 +1,73 @@ from comb_attribute import HET, MIX import attribute import cPickle as pickle -import sys, os +import sys +import os sys.path.insert(0, '../utils') from load_data import load_raw_data -def read_data(raw_data_dir='../raw_data/data/', data_dir='../cache/data/', - combine_att='mix', logits_size_tr='10000', - thresh=2, use_user_feature=True, use_item_feature=True, no_user_id=False, - test=False, mylog=None): +def read_data(raw_data_dir='../raw_data/data/', data_dir='../cache/data/', + combine_att='mix', logits_size_tr='10000', + thresh=2, use_user_feature=True, use_item_feature=True, no_user_id=False, + test=False, mylog=None, config=None): - if not mylog: - def mylog(val): - print(val) + if not mylog: + def mylog(val): + print(val) - data_filename = os.path.join(data_dir, 'data') - if os.path.isfile(data_filename): - mylog("data file {} exists! loading cached data. \nCaution: change cached data dir (--data_dir) if new data (or new preprocessing) is used.".format(data_filename)) - (data_tr, data_va, u_attr, i_attr, item_ind2logit_ind, - logit_ind2item_ind, user_index, item_index) = pickle.load( - open(data_filename, 'rb')) - # u_attr.overview(mylog) - # i_attr.overview(mylog) + data_filename = os.path.join(data_dir, 'data') + if os.path.isfile(data_filename): + mylog("data file {} exists! loading cached data. \nCaution: change cached data dir (--data_dir) if new data (or new preprocessing) is used.".format(data_filename)) + (data_tr, data_va, u_attr, i_attr, item_ind2logit_ind, + logit_ind2item_ind, user_index, item_index) = pickle.load( + open(data_filename, 'rb')) + # u_attr.overview(mylog) + # i_attr.overview(mylog) - else: - if not os.path.exists(data_dir): - os.mkdir(data_dir) - _submit = 1 if test else 0 - (users, items, data_tr, data_va, user_features, item_features, - user_index, item_index) = load_raw_data(data_dir=raw_data_dir, _submit=_submit) - if not use_user_feature: - n = len(users) - users = users[:, 0].reshape(n, 1) - user_features = ([user_features[0][0]], [user_features[1][0]]) - if not use_item_feature: - m = len(items) - items = items[:, 0].reshape(m, 1) - item_features = ([item_features[0][0]], [item_features[1][0]]) + else: + if not os.path.exists(data_dir): + os.mkdir(data_dir) + _submit = 1 if test else 0 + (users, items, data_tr, data_va, user_features, item_features, + user_index, item_index) = load_raw_data(data_dir=raw_data_dir, _submit=_submit, config=config, mylog=mylog) + if not use_user_feature: + n = len(users) + users = users[:, 0].reshape(n, 1) + user_features = ([user_features[0][0]], [user_features[1][0]]) + if not use_item_feature: + m = len(items) + items = items[:, 0].reshape(m, 1) + item_features = ([item_features[0][0]], [item_features[1][0]]) - if no_user_id: - users[:, 0] = 0 - - if combine_att == 'het': - het = HET(data_dir=data_dir, logits_size_tr=logits_size_tr, threshold=thresh) - u_attr, i_attr, item_ind2logit_ind, logit_ind2item_ind = het.get_attributes( - users, items, data_tr, user_features, item_features) - elif combine_att == 'mix': - mix = MIX(data_dir=data_dir, logits_size_tr=logits_size_tr, - threshold=thresh) - users2, items2, user_features, item_features = mix.mix_attr(users, items, - user_features, item_features) - (u_attr, i_attr, item_ind2logit_ind, - logit_ind2item_ind) = mix.get_attributes(users2, items2, data_tr, - user_features, item_features) + if no_user_id: + users[:, 0] = 0 - mylog("saving data format to data directory") - from preprocess import pickle_save - pickle_save((data_tr, data_va, u_attr, i_attr, - item_ind2logit_ind, logit_ind2item_ind, user_index, item_index), data_filename) + if combine_att == 'het': + het = HET(data_dir=data_dir, + logits_size_tr=logits_size_tr, threshold=thresh) + u_attr, i_attr, item_ind2logit_ind, logit_ind2item_ind = het.get_attributes( + users, items, data_tr, user_features, item_features) + elif combine_att == 'mix': + mix = MIX(data_dir=data_dir, logits_size_tr=logits_size_tr, + threshold=thresh) + users2, items2, user_features, item_features = mix.mix_attr(users, items, + user_features, item_features) + (u_attr, i_attr, item_ind2logit_ind, + logit_ind2item_ind) = mix.get_attributes(users2, items2, data_tr, + user_features, item_features) - mylog('length of item_ind2logit_ind: {}'.format(len(item_ind2logit_ind))) + mylog("saving data format to data directory") + from preprocess import pickle_save + pickle_save((data_tr, data_va, u_attr, i_attr, + item_ind2logit_ind, logit_ind2item_ind, user_index, item_index), data_filename) - # if FLAGS.dataset in ['ml', 'yelp']: - # mylog('disabling the lstm-rec fake feature') - # u_attr.num_features_cat = 1 + mylog('length of item_ind2logit_ind: {}'.format(len(item_ind2logit_ind))) - return (data_tr, data_va, u_attr, i_attr, item_ind2logit_ind, - logit_ind2item_ind, user_index, item_index) + # if FLAGS.dataset in ['ml', 'yelp']: + # mylog('disabling the lstm-rec fake feature') + # u_attr.num_features_cat = 1 + return (data_tr, data_va, u_attr, i_attr, item_ind2logit_ind, + logit_ind2item_ind, user_index, item_index) diff --git a/config/environment/local.json b/config/environment/local.json new file mode 100644 index 0000000..66b97a7 --- /dev/null +++ b/config/environment/local.json @@ -0,0 +1,14 @@ +{ + "recommendations": { + "host": "localhost", + "user": "root", + "password": "", + "database": "recommendations" + }, + "recommendation_service": { + "host": "localhost", + "user": "root", + "password": "", + "database": "recommendation_service" + } +} \ No newline at end of file diff --git a/config/user_item_recommender_config.json b/config/user_item_recommender_config.json new file mode 100644 index 0000000..790c50c --- /dev/null +++ b/config/user_item_recommender_config.json @@ -0,0 +1,92 @@ +{ + "model": "lstm", + "dataset_name": "ml1m", + "raw_data_dir": "examples/dataset", + "cache_dir": "examples/cache/lstm_ml1m", + "train_dir": "examples/train/lstm_ml1m", + "test": false, + "combine_att": "mix", + "use_user_feature": true, + "use_item_feature": true, + "user_vocab_size": 150000, + "item_vocab_size": 3100, + "vocab_min_thresh": 1, + + "loss": "ce", + "loss_func": "log", + "loss_exp_p": 1.0005, + "learning_rate": 1, + "keep_prob": 0.5, + "learning_rate_decay_factor": 1.0, + "batch_size": 64, + "size": 64, + "max_gradient_norm": 5.0, + "patience": 20, + "power": 0.5, + "num_layers": 1, + "n_epoch": 10, + "steps_per_checkpoint": 5, + "L": 30, + "n_bucket": 10, + + "recommend": true, + "saverec": false, + "top_N_items": 100, + "topk": 100, + "recommend_new": false, + + "ensemble": false, + "ensemble_suffix": "", + "seed": 0, + + "nonlinear": "linear", + "hidden_size": 500, + + "n_resample": 50, + "n_sampled": 1024, + + "sample_type": "random", + "user_sample": 1.0, + + "output_feat": 1, + "use_sep_item": false, + "no_input_item_feature": false, + "use_concat": false, + "no_user_id": true, + + "N": "000", + "withAdagrad": true, + "fromScratch": true, + "saveCheckpoint": false, + + "gpu": -1, + "profile": false, + "device_log": false, + "eval": true, + "use_more_train": false, + "model_option": "loss", + + "ta": 1, + "after40": false, + "split": "last", + + "beam_search": false, + "beam_size": 10, + + "max_train_data_size": 0, + "old_att": false, + + "entity1": "users", + "entity2": "items", + "entity1_ID": "userId", + "entity2_ID": "itemId", + "recs_past_N_days": 3, + "source_data_from_S3": true, + "data_source_config": { "mys3bucket": "recommendation-engine", + "src_directory": "data_user_item/", + "src_file_prefix": "data_raw_" + }, + "rec_output_config": { "output_table": "recommended_items", + "archive_table": "recommended_items_archive" + } +} diff --git a/hmf/hmf_class.py b/hmf/hmf_class.py new file mode 100644 index 0000000..3865785 --- /dev/null +++ b/hmf/hmf_class.py @@ -0,0 +1,627 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import math +import os +import sys +import random +import time +import logging +import numpy as np +from six.moves import xrange # pylint: disable=redefined-builtin +import tensorflow as tf + +sys.path.insert(0, '../utils') +sys.path.insert(0, '../attributes') + +from input_attribute import read_data +from prepare_train import positive_items, item_frequency, sample_items +from attrdict import AttrDict +from pandatools import pd +from load_config import load_configurations +from sqlalchemy import create_engine + + +class hmf(object): + """ hmf class object used to recommend "entity2" to "entity1" + using Hybrid matrix factorization model (with deep layer extensions) on an implicit rating dataset. + """ + + def __init__(self, config): + """ Class constructor, initializes key components + """ + # setup + self.config = config + self.config['db_config'] = load_configurations( + 'config/environment/' + self.config['environment'] + '.json') + self.ent1 = self.config['entity1_ID'] + self.ent2 = self.config['entity2_ID'] + self.FLAGS = AttrDict() + # self.config['db_config'] = load_configurations( 'config/environment/'+self.config['environment']+'.json' ) + # datasets, paths, and preprocessing + self.FLAGS.dataset = self.config['dataset_name'] + self.FLAGS.raw_data = self.config['raw_data_dir'] + self.FLAGS.data_dir = self.config['cache_dir'] + self.FLAGS.train_dir = self.config['train_dir'] + self.FLAGS.test = self.config['test'] + self.FLAGS.combine_att = self.config['combine_att'] + self.FLAGS.use_item_feature = self.config['use_item_feature'] + self.FLAGS.use_user_feature = self.config['use_user_feature'] + self.FLAGS.user_vocab_size = self.config['user_vocab_size'] + self.FLAGS.item_vocab_size = self.config['item_vocab_size'] + self.FLAGS.vocab_min_thresh = self.config['vocab_min_thresh'] + + # tuning hypers + self.FLAGS.loss = self.config['loss'] + self.FLAGS.loss_func = self.config['loss_func'] + self.FLAGS.loss_exp_p = self.config['loss_exp_p'] + self.FLAGS.learning_rate = self.config['learning_rate'] + self.FLAGS.keep_prob = self.config['keep_prob'] + self.FLAGS.learning_rate_decay_factor = self.config['learning_rate_decay_factor'] + self.FLAGS.batch_size = self.config['batch_size'] + self.FLAGS.size = self.config['size'] + self.FLAGS.patience = self.config['patience'] + self.FLAGS.n_epoch = self.config['n_epoch'] + self.FLAGS.steps_per_checkpoint = self.config['steps_per_checkpoint'] + + # to recommend + self.FLAGS.recommend = self.config['recommend'] + self.FLAGS.saverec = self.config['saverec'] + self.FLAGS.top_N_items = self.config['top_N_items'] + self.FLAGS.recommend_new = self.config['recommend_new'] + + # nonlinear + self.FLAGS.nonlinear = self.config['nonlinear'] + self.FLAGS.hidden_size = self.config['hidden_size'] + self.FLAGS.num_layers = self.config['num_layers'] + + # algorithms with sampling + self.FLAGS.power = self.config['power'] + self.FLAGS.n_resample = self.config['n_resample'] + self.FLAGS.n_sampled = self.config['n_sampled'] + + self.FLAGS.sample_type = self.config['sample_type'] + self.FLAGS.user_sample = self.config['user_sample'] + self.FLAGS.seed = self.config['seed'] + + # + self.FLAGS.gpu = self.config['gpu'] + self.FLAGS.profile = self.config['profile'] + self.FLAGS.device_log = self.config['device_log'] + self.FLAGS.eval = self.config['eval'] + self.FLAGS.use_more_train = self.config['use_more_train'] + self.FLAGS.model_option = self.config['model_option'] + + # self.FLAGS.max_train_data_size = self.config['max_train_data_size'] + # Xing related + # self.FLAGS.ta = self.config['ta'] + + if self.FLAGS.test: + if self.FLAGS.data_dir[-1] == '/': + self.FLAGS.data_dir = self.FLAGS.data_dir[:-1] + '_test' + else: + self.FLAGS.data_dir = self.FLAGS.data_dir + '_test' + + if not os.path.exists(self.FLAGS.train_dir): + os.mkdir(self.FLAGS.train_dir) + + return + + def mylog(self, msg): + logger = logging.getLogger('main') + logger.info(msg) + + return + + def create_model(self, session, u_attributes=None, i_attributes=None, + item_ind2logit_ind=None, logit_ind2item_ind=None, logit_size_test=None, ind_item=None): + loss = self.FLAGS.loss + gpu = None if self.FLAGS.gpu == -1 else self.FLAGS.gpu + n_sampled = self.FLAGS.n_sampled if self.FLAGS.loss in [ + 'mw', 'mce'] else None + import hmf_model + model = hmf_model.LatentProductModel(self.FLAGS.user_vocab_size, + self.FLAGS.item_vocab_size, self.FLAGS.size, self.FLAGS.num_layers, + self.FLAGS.batch_size, self.FLAGS.learning_rate, + self.FLAGS.learning_rate_decay_factor, u_attributes, i_attributes, + item_ind2logit_ind, logit_ind2item_ind, loss_function=loss, GPU=gpu, + logit_size_test=logit_size_test, nonlinear=self.FLAGS.nonlinear, + dropout=self.FLAGS.keep_prob, n_sampled=n_sampled, indices_item=ind_item, + top_N_items=self.FLAGS.top_N_items, hidden_size=self.FLAGS.hidden_size, + loss_func=self.FLAGS.loss_func, loss_exp_p=self.FLAGS.loss_exp_p) + + if not os.path.isdir(self.FLAGS.train_dir): + os.mkdir(self.FLAGS.train_dir) + + ckpt = tf.train.get_checkpoint_state(self.FLAGS.train_dir) + + if ckpt: + self.mylog("Reading model parameters from %s" % + ckpt.model_checkpoint_path) + model.saver.restore(session, ckpt.model_checkpoint_path) + else: + self.mylog("Created model with fresh parameters.") + # session.run(tf.global_variables_initializer()) + session.run(tf.global_variables_initializer()) + + return model + + def train(self): + raw_data = self.FLAGS.raw_data + train_dir = self.FLAGS.train_dir + data_dir = self.FLAGS.data_dir + combine_att = self.FLAGS.combine_att + test = self.FLAGS.test + logits_size_tr = self.FLAGS.item_vocab_size + thresh = self.FLAGS.vocab_min_thresh + use_user_feature = self.FLAGS.use_user_feature + use_item_feature = self.FLAGS.use_item_feature + batch_size = self.FLAGS.batch_size + steps_per_checkpoint = self.FLAGS.steps_per_checkpoint + loss_func = self.FLAGS.loss + max_patience = self.FLAGS.patience + go_test = self.FLAGS.test + max_epoch = self.FLAGS.n_epoch + sample_type = self.FLAGS.sample_type + power = self.FLAGS.power + use_more_train = self.FLAGS.use_more_train + profile = self.FLAGS.profile + device_log = self.FLAGS.device_log + + with tf.Session(config=tf.ConfigProto(allow_soft_placement=True, log_device_placement=device_log)) as sess: + run_options = None + run_metadata = None + + if profile: + # in order to profile + from tensorflow.python.client import timeline + run_options = tf.RunOptions( + trace_level=tf.RunOptions.FULL_TRACE) + run_metadata = tf.RunMetadata() + steps_per_checkpoint = 30 + + self.mylog("reading data") + (data_tr, data_va, u_attributes, i_attributes, item_ind2logit_ind, + logit_ind2item_ind, _, _) = read_data( + raw_data_dir=raw_data, + data_dir=data_dir, + combine_att=combine_att, + logits_size_tr=logits_size_tr, + thresh=thresh, + use_user_feature=use_user_feature, + use_item_feature=use_item_feature, + test=test, + mylog=self.mylog, + config=self.config) + + self.mylog("train/dev size: %d/%d" % (len(data_tr), len(data_va))) + + ''' + remove some rare items in both train and valid set + this helps make train/valid set distribution similar + to each other + ''' + self.mylog("original train/dev size: %d/%d" % + (len(data_tr), len(data_va))) + data_tr = [p for p in data_tr if (p[1] in item_ind2logit_ind)] + data_va = [p for p in data_va if (p[1] in item_ind2logit_ind)] + self.mylog("new train/dev size: %d/%d" % + (len(data_tr), len(data_va))) + + random.seed(self.FLAGS.seed) + + item_pop, p_item = item_frequency(data_tr, power) + + if use_more_train: + item_population = range(len(item_ind2logit_ind)) + else: + item_population = item_pop + + model = self.create_model(sess, u_attributes, i_attributes, item_ind2logit_ind, + logit_ind2item_ind, ind_item=item_population) + + pos_item_list, pos_item_list_val = None, None + + if loss_func in ['warp', 'mw', 'rs', 'rs-sig', 'rs-sig2', 'bbpr']: + pos_item_list, pos_item_list_val = positive_items( + data_tr, data_va) + model.prepare_warp(pos_item_list, pos_item_list_val) + + self.mylog('started training') + step_time, loss, current_step, auc = 0.0, 0.0, 0, 0.0 + + repeat = 5 if loss_func.startswith('bpr') else 1 + patience = max_patience + + if os.path.isfile(os.path.join(train_dir, 'auc_train.npy')): + auc_train = list( + np.load(os.path.join(train_dir, 'auc_train.npy'))) + auc_dev = list(np.load(os.path.join(train_dir, 'auc_dev.npy'))) + previous_losses = list(np.load(os.path.join(train_dir, + 'loss_train.npy'))) + losses_dev = list( + np.load(os.path.join(train_dir, 'loss_dev.npy'))) + best_auc = max(auc_dev) + best_loss = min(losses_dev) + else: + previous_losses, auc_train, auc_dev, losses_dev = [], [], [], [] + best_auc, best_loss = -1, 1000000 + + item_sampled, item_sampled_id2idx = None, None + + if sample_type == 'random': + get_next_batch = model.get_batch + elif sample_type == 'permute': + get_next_batch = model.get_permuted_batch + else: + print('not implemented!') + exit() + + train_total_size = float(len(data_tr)) + n_epoch = max_epoch + steps_per_epoch = int(1.0 * train_total_size / batch_size) + total_steps = steps_per_epoch * n_epoch + + self.mylog("Train:") + self.mylog("total: {}".format(train_total_size)) + self.mylog("Steps_per_epoch: {}".format(steps_per_epoch)) + self.mylog("Total_steps:{}".format(total_steps)) + self.mylog("Dev:") + self.mylog("total: {}".format(len(data_va))) + + self.mylog("\n\ntraining start!") + + while True: + ranndom_number_01 = np.random.random_sample() + start_time = time.time() + (user_input, item_input, neg_item_input) = get_next_batch(data_tr) + + if loss_func in ['mw', 'mce'] and current_step % self.FLAGS.n_resample == 0: + item_sampled, item_sampled_id2idx = sample_items( + item_population, self.FLAGS.n_sampled, p_item) + else: + item_sampled = None + + step_loss = model.step(sess, user_input, item_input, + neg_item_input, item_sampled, item_sampled_id2idx, loss=loss_func, + run_op=run_options, run_meta=run_metadata) + + step_time += (time.time() - start_time) / \ + steps_per_checkpoint + loss += step_loss / steps_per_checkpoint + current_step += 1 + + if model.global_step.eval() > total_steps: + self.mylog( + "Training reaches maximum steps. Terminating...") + break + + if current_step % steps_per_checkpoint == 0: + + if loss_func in ['ce', 'mce']: + perplexity = math.exp( + loss) if loss < 300 else float('inf') + self.mylog("global step %d learning rate %.4f step-time %.4f perplexity %.2f" % + (model.global_step.eval(), model.learning_rate.eval(), step_time, perplexity)) + else: + self.mylog("global step %d learning rate %.4f step-time %.4f loss %.3f" % + (model.global_step.eval(), model.learning_rate.eval(), step_time, loss)) + if profile: + # Create the Timeline object, and write it to a json + tl = timeline.Timeline(run_metadata.step_stats) + ctf = tl.generate_chrome_trace_format() + with open('timeline.json', 'w') as f: + f.write(ctf) + exit() + + # Decrease learning rate if no improvement was seen over last 3 times. + if len(previous_losses) > 2 and loss > max(previous_losses[-3:]): + sess.run(model.learning_rate_decay_op) + + previous_losses.append(loss) + auc_train.append(auc) + + # Reset timer and loss. + step_time, loss, auc = 0.0, 0.0, 0.0 + + if not self.FLAGS.eval: + continue + + # Run evals on development set and print their loss. + l_va = len(data_va) + eval_loss, eval_auc = 0.0, 0.0 + count_va = 0 + start_time = time.time() + for idx_s in range(0, l_va, batch_size): + idx_e = idx_s + batch_size + if idx_e > l_va: + break + lt = data_va[idx_s:idx_e] + user_va = [x[0] for x in lt] + item_va = [x[1] for x in lt] + for _ in range(repeat): + item_va_neg = None + the_loss = 'warp' if loss_func == 'mw' else loss_func + eval_loss0 = model.step(sess, user_va, item_va, item_va_neg, + None, None, forward_only=True, + loss=the_loss) + eval_loss += eval_loss0 + count_va += 1 + + eval_loss /= count_va + eval_auc /= count_va + step_time = ( + time.time() - start_time) / count_va + if loss_func in ['ce', 'mce']: + eval_ppx = math.exp( + eval_loss) if eval_loss < 300 else float('inf') + self.mylog(" dev: perplexity %.2f eval_auc(not computed) %.4f step-time %.4f" % ( + eval_ppx, eval_auc, step_time)) + else: + self.mylog(" dev: loss %.3f eval_auc(not computed) %.4f step-time %.4f" % (eval_loss, + eval_auc, step_time)) + + if eval_loss < best_loss and not go_test: + best_loss = eval_loss + patience = max_patience + checkpoint_path = os.path.join( + train_dir, "best.ckpt") + self.mylog('Saving best model...') + model.saver.save(sess, checkpoint_path, + global_step=0, write_meta_graph=False) + + if go_test: + checkpoint_path = os.path.join( + train_dir, "best.ckpt") + self.mylog('Saving best model...') + model.saver.save(sess, checkpoint_path, + global_step=0, write_meta_graph=False) + + if eval_loss > best_loss: + patience -= 1 + + auc_dev.append(eval_auc) + losses_dev.append(eval_loss) + + if patience < 0 and not go_test: + self.mylog( + "no improvement for too long.. terminating..") + self.mylog("best loss %.4f" % best_loss) + break + return + + def recommend(self, target_uids=[]): + raw_data = self.FLAGS.raw_data + data_dir = self.FLAGS.data_dir + combine_att = self.FLAGS.combine_att + logits_size_tr = self.FLAGS.item_vocab_size + item_vocab_min_thresh = self.FLAGS.vocab_min_thresh + use_user_feature = self.FLAGS.use_user_feature + use_item_feature = self.FLAGS.use_item_feature + batch_size = self.FLAGS.batch_size + loss = self.FLAGS.loss + top_n = self.FLAGS.top_N_items + test = self.FLAGS.test + device_log = self.FLAGS.device_log + + with tf.Session(config=tf.ConfigProto(allow_soft_placement=True, log_device_placement=device_log)) as sess: + self.mylog("reading data") + (_, _, u_attributes, i_attributes, item_ind2logit_ind, logit_ind2item_ind, user_index, item_index) = read_data( + raw_data_dir=raw_data, + data_dir=data_dir, + combine_att=combine_att, + logits_size_tr=logits_size_tr, + thresh=item_vocab_min_thresh, + use_user_feature=use_user_feature, + use_item_feature=use_item_feature, + test=test, + mylog=self.mylog, + config=self.config) + + model = self.create_model(sess, u_attributes, i_attributes, + item_ind2logit_ind, logit_ind2item_ind, ind_item=None) + + Uinds = [user_index[v] for v in target_uids] + + N = len(Uinds) + self.mylog("%d target users to recommend" % N) + rec = np.zeros((N, top_n), dtype=int) + + count = 0 + time_start = time.time() + for idx_s in range(0, N, batch_size): + count += 1 + if count % 100 == 0: + self.mylog("idx: %d, c: %d" % (idx_s, count)) + + idx_e = idx_s + batch_size + if idx_e <= N: + users = Uinds[idx_s: idx_e] + recs = model.step(sess, users, None, None, forward_only=True, + recommend=True) + rec[idx_s:idx_e, :] = recs + else: + users = range(idx_s, N) + [0] * (idx_e - N) + users = [Uinds[t] for t in users] + recs = model.step(sess, users, None, None, forward_only=True, + recommend=True) + idx_e = N + rec[idx_s:idx_e, :] = recs[:(idx_e - idx_s), :] + + time_end = time.time() + self.mylog("Time used %.1f" % (time_end - time_start)) + + # transform result to a dictionary + # R[user_id] = [item_id1, item_id2, ...] + + ind2id = {} + for iid in item_index: + uind = item_index[iid] + assert(uind not in ind2id) + ind2id[uind] = iid + R = {} + for i in xrange(N): + uid = target_uids[i] + R[uid] = [ind2id[logit_ind2item_ind[v]] + for v in list(rec[i, :])] + + self.recs = pd.DataFrame.from_dict(R, orient="index") + self.recs = self.recs.stack().reset_index() + self.recs.columns = [self.ent1, 'slot', self.ent2] + + return R + + def compute_scores(self): + raw_data = self.FLAGS.raw_data + data_dir = self.FLAGS.data_dir + dataset = self.FLAGS.dataset + save_recommendation = self.FLAGS.saverec + train_dir = self.FLAGS.train_dir + test = self.FLAGS.test + + from evaluate import Evaluation as Evaluate + evaluation = Evaluate(raw_data, test=test, + config=self.config, mylog=self.mylog) + + # if filter: + # get past N days of recommendations, these will be filtered out + # from the final list to avoid making duplicate recommendations + # self.mylog('backing up most recent recommendations & fetch recommendations in the past %d days' %self.config['recs_past_N_days']) + # self.get_past_rec() + + R = self.recommend(evaluation.get_uids()) + + evaluation.eval_on(R) + scores_self, scores_ex = evaluation.get_scores() + self.mylog( + "====evaluation scores (NDCG, RECALL, PRECISION, MAP) @ 2,5,10,20,30====") + self.mylog("METRIC_FORMAT (self): {}".format(scores_self)) + self.mylog("METRIC_FORMAT (ex ): {}".format(scores_ex)) + + if self.FLAGS.saverec: + self.write_to_db(self.recs) + + def write_to_db(self, df): + """ write recommenations to database + """ + + # form the connection string used to connect to recommendation_service DB + try: + cxn_string = "mysql+mysqlconnector://%(user)s:%(password)s@%(host)s/%(database)s" % \ + self.config['db_config']['recommendation_service'] + engine = create_engine(cxn_string) + except: + self.mylog( + 'error creating connection engine, check connection string: %s' % cxn_string) + self.mylog(sys.exc_info()) + sys.exit() + + # write new records to target output DB table + try: + start = time.time() + self.mylog('writing new recommendations to db') + + # decode entity1 and entity2 back into alpha-numeric IDs + #df[self.ent1] = self.ent1_encoder.inverse_transform(df[self.ent1]) + #df[self.ent2] = self.ent2_encoder.inverse_transform(df[self.ent2]) + # add timestamp to mark when the recommendations were made + df['createdAt'] = time.strftime('%Y-%m-%d %H:%M:%S') + df = df[['createdAt', self.ent1, self.ent2, 'slot']] + + # write to output table 5000 rows at a time; note that column "rank" is ommitted + df[[self.ent1, self.ent2, 'createdAt']].to_sql(index=False, + name=self.config['rec_output_config']['output_table'], + con=engine, + if_exists='append', + chunksize=5000) + self.mylog('writing new recommendations took %.2f seconds.' % + (time.time() - start)) + + except: + self.mylog('failed writing new recommendations to \"recommendation_service.%s\"' % + self.config['rec_output_config']['output_table']) + self.mylog(sys.exc_info()) + sys.exit() + + # clear up old recommendations + try: + start = time.time() + self.mylog('clearing up previous recommendations') + + engine.execute('delete from %(table)s where createdAt < \'%(timestamp)s\';' % + {'table': self.config['rec_output_config']['output_table'], + 'timestamp': df.createdAt.values.tolist()[0]}) + self.mylog('clearing up old recommendations took %.2f seconds.' % ( + time.time() - start)) + + except: + self.mylog('failed clearing up old recommendations before %s from %s' % (df.createdAt.values.tolist()[0], + self.config['rec_output_config']['output_table'])) + self.mylog(sys.exc_info()) + sys.exit() + + return + + def get_past_rec(self): + """ Fetches past recommendations from the database + """ + + # form the connection string used to connect to recommendation_service DB + try: + cxn_string = "mysql+mysqlconnector://%(user)s:%(password)s@%(host)s/%(database)s" % \ + self.config['db_config']['recommendation_service'] + engine = create_engine(cxn_string) + except: + self.mylog( + 'error creating connection engine, check connection string: %s' % cxn_string) + self.mylog(sys.exc_info()) + sys.exit() + + # backup the current recommendations in recommendation_service + # this moves current recommendations to the archive table + try: + self.mylog('backing up current recommendations to archive') + start = time.time() + query = 'SELECT * FROM %s' % self.config['rec_output_config']['output_table'] + data_iterator = pd.read_sql_query(query, engine, chunksize=5000) + for records in data_iterator: + # copy records to archive + records[['createdAt', self.ent1, self.ent2]].to_sql(index=False, + name=self.config['rec_output_config']['archive_table'], + con=engine, + if_exists='append') + except: + self.mylog('failed backing up old recommendations') + self.mylog(sys.exc_info()) + sys.exit() + + # get list of recommendations in past N days from archive table + try: + self.mylog('fetching past recommendations') + start = time.time() + query = """SELECT %(entity_one)s, %(entity_two)s + FROM %(table)s + WHERE createdAt >= DATE_SUB(CURDATE(),interval %(recs_past_N_days)s day)"""\ + % {'entity_one': self.ent1_view.columns[0], + 'entity_two': self.ent2_view.columns[0], + 'table': self.config['rec_output_config']['archive_table'], + 'recs_past_N_days': self.config['recs_past_N_days'] + } + self.past_rec = pd.read_sql_query(query, cxn_string) + self.mylog('fetching past recommendations took %.2f seconds' % ( + time.time() - start)) + except: + self.mylog('error executing query to fetch past recommendations') + self.mylog(sys.exc_info()) + sys.exit() + + # filter out ent1 and ent2 IDs in past rec that are not present in the current dataset + # (this is more of a safety net mechanism) + # self.past_rec = self.past_rec[self.past_rec[self.ent1].isin(self.ent1_encoder.classes_.tolist())] + # self.past_rec = self.past_rec[self.past_rec[self.ent2].isin(self.ent2_encoder.classes_.tolist())] + # encode the ent1 and ent2 IDs from past rec + # self.past_rec[self.ent1] = self.ent1_encoder.transform(self.past_rec[self.ent1]) + # self.past_rec[self.ent2] = self.ent2_encoder.transform(self.past_rec[self.ent2]) + + return diff --git a/hmf/run_hmf.py b/hmf/run_hmf.py index a6a69b7..35e7a55 100644 --- a/hmf/run_hmf.py +++ b/hmf/run_hmf.py @@ -25,7 +25,7 @@ tf.app.flags.DEFINE_boolean("use_item_feature", True, "RT") tf.app.flags.DEFINE_integer("user_vocab_size", 150000, "User vocabulary size.") tf.app.flags.DEFINE_integer("item_vocab_size", 50000, "Item vocabulary size.") -tf.app.flags.DEFINE_integer("item_vocab_min_thresh", 2, "filter inactive tokens.") +tf.app.flags.DEFINE_integer("vocab_min_thresh", 2, "filter inactive tokens.") # tuning hypers tf.app.flags.DEFINE_string("loss", 'ce', "loss function: ce, warp, (mw, mce, bpr)") @@ -67,7 +67,7 @@ tf.app.flags.DEFINE_float("user_sample", 1.0, "user sample rate.") tf.app.flags.DEFINE_integer("seed", 0, "mini batch sampling random seed.") -# +# tf.app.flags.DEFINE_integer("gpu", -1, "gpu card number") tf.app.flags.DEFINE_boolean("profile", False, "False = no profile, True = profile") tf.app.flags.DEFINE_boolean("device_log", False, @@ -93,20 +93,20 @@ def mylog(msg): logging.info(msg) return -def create_model(session, u_attributes=None, i_attributes=None, - item_ind2logit_ind=None, logit_ind2item_ind=None, - loss = FLAGS.loss, logit_size_test=None, ind_item = None): +def create_model(session, u_attributes=None, i_attributes=None, + item_ind2logit_ind=None, logit_ind2item_ind=None, + loss = FLAGS.loss, logit_size_test=None, ind_item = None): gpu = None if FLAGS.gpu == -1 else FLAGS.gpu n_sampled = FLAGS.n_sampled if FLAGS.loss in ['mw', 'mce'] else None import hmf_model - model = hmf_model.LatentProductModel(FLAGS.user_vocab_size, - FLAGS.item_vocab_size, FLAGS.size, FLAGS.num_layers, - FLAGS.batch_size, FLAGS.learning_rate, - FLAGS.learning_rate_decay_factor, u_attributes, i_attributes, - item_ind2logit_ind, logit_ind2item_ind, loss_function = loss, GPU=gpu, - logit_size_test=logit_size_test, nonlinear=FLAGS.nonlinear, + model = hmf_model.LatentProductModel(FLAGS.user_vocab_size, + FLAGS.item_vocab_size, FLAGS.size, FLAGS.num_layers, + FLAGS.batch_size, FLAGS.learning_rate, + FLAGS.learning_rate_decay_factor, u_attributes, i_attributes, + item_ind2logit_ind, logit_ind2item_ind, loss_function = loss, GPU=gpu, + logit_size_test=logit_size_test, nonlinear=FLAGS.nonlinear, dropout=FLAGS.keep_prob, n_sampled=n_sampled, indices_item=ind_item, - top_N_items=FLAGS.top_N_items, hidden_size=FLAGS.hidden_size, + top_N_items=FLAGS.top_N_items, hidden_size=FLAGS.hidden_size, loss_func= FLAGS.loss_func, loss_exp_p = FLAGS.loss_exp_p) if not os.path.isdir(FLAGS.train_dir): @@ -125,16 +125,16 @@ def create_model(session, u_attributes=None, i_attributes=None, return model def train(raw_data=FLAGS.raw_data, train_dir=FLAGS.train_dir, mylog=mylog, - data_dir=FLAGS.data_dir, combine_att=FLAGS.combine_att, test=FLAGS.test, - logits_size_tr=FLAGS.item_vocab_size, thresh=FLAGS.item_vocab_min_thresh, - use_user_feature=FLAGS.use_user_feature, + data_dir=FLAGS.data_dir, combine_att=FLAGS.combine_att, test=FLAGS.test, + logits_size_tr=FLAGS.item_vocab_size, thresh=FLAGS.vocab_min_thresh, + use_user_feature=FLAGS.use_user_feature, use_item_feature=FLAGS.use_item_feature, - batch_size=FLAGS.batch_size, steps_per_checkpoint=FLAGS.steps_per_checkpoint, + batch_size=FLAGS.batch_size, steps_per_checkpoint=FLAGS.steps_per_checkpoint, loss_func=FLAGS.loss, max_patience=FLAGS.patience, go_test=FLAGS.test, - max_epoch=FLAGS.n_epoch, sample_type=FLAGS.sample_type, power=FLAGS.power, - use_more_train=FLAGS.use_more_train, profile=FLAGS.profile, + max_epoch=FLAGS.n_epoch, sample_type=FLAGS.sample_type, power=FLAGS.power, + use_more_train=FLAGS.use_more_train, profile=FLAGS.profile, device_log=FLAGS.device_log): - with tf.Session(config=tf.ConfigProto(allow_soft_placement=True, + with tf.Session(config=tf.ConfigProto(allow_soft_placement=True, log_device_placement=device_log)) as sess: run_options = None run_metadata = None @@ -144,25 +144,25 @@ def train(raw_data=FLAGS.raw_data, train_dir=FLAGS.train_dir, mylog=mylog, run_options = tf.RunOptions(trace_level=tf.RunOptions.FULL_TRACE) run_metadata = tf.RunMetadata() steps_per_checkpoint = 30 - + mylog("reading data") - (data_tr, data_va, u_attributes, i_attributes,item_ind2logit_ind, + (data_tr, data_va, u_attributes, i_attributes,item_ind2logit_ind, logit_ind2item_ind, _, _) = read_data( - raw_data_dir=raw_data, - data_dir=data_dir, - combine_att=combine_att, - logits_size_tr=logits_size_tr, - thresh=thresh, + raw_data_dir=raw_data, + data_dir=data_dir, + combine_att=combine_att, + logits_size_tr=logits_size_tr, + thresh=thresh, use_user_feature=use_user_feature, - use_item_feature=use_item_feature, - test=test, + use_item_feature=use_item_feature, + test=test, mylog=mylog) mylog("train/dev size: %d/%d" %(len(data_tr),len(data_va))) ''' remove some rare items in both train and valid set - this helps make train/valid set distribution similar + this helps make train/valid set distribution similar to each other ''' mylog("original train/dev size: %d/%d" %(len(data_tr),len(data_va))) @@ -189,14 +189,14 @@ def train(raw_data=FLAGS.raw_data, train_dir=FLAGS.train_dir, mylog=mylog, mylog('started training') step_time, loss, current_step, auc = 0.0, 0.0, 0, 0.0 - - repeat = 5 if loss_func.startswith('bpr') else 1 + + repeat = 5 if loss_func.startswith('bpr') else 1 patience = max_patience if os.path.isfile(os.path.join(train_dir, 'auc_train.npy')): auc_train = list(np.load(os.path.join(train_dir, 'auc_train.npy'))) auc_dev = list(np.load(os.path.join(train_dir, 'auc_dev.npy'))) - previous_losses = list(np.load(os.path.join(train_dir, + previous_losses = list(np.load(os.path.join(train_dir, 'loss_train.npy'))) losses_dev = list(np.load(os.path.join(train_dir, 'loss_dev.npy'))) best_auc = max(auc_dev) @@ -232,14 +232,14 @@ def train(raw_data=FLAGS.raw_data, train_dir=FLAGS.train_dir, mylog=mylog, ranndom_number_01 = np.random.random_sample() start_time = time.time() (user_input, item_input, neg_item_input) = get_next_batch(data_tr) - + if loss_func in ['mw', 'mce'] and current_step % FLAGS.n_resample == 0: - item_sampled, item_sampled_id2idx = sample_items(item_population, + item_sampled, item_sampled_id2idx = sample_items(item_population, FLAGS.n_sampled, p_item) else: item_sampled = None - step_loss = model.step(sess, user_input, item_input, + step_loss = model.step(sess, user_input, item_input, neg_item_input, item_sampled, item_sampled_id2idx, loss=loss_func, run_op=run_options, run_meta=run_metadata) @@ -267,7 +267,7 @@ def train(raw_data=FLAGS.raw_data, train_dir=FLAGS.train_dir, mylog=mylog, # Decrease learning rate if no improvement was seen over last 3 times. if len(previous_losses) > 2 and loss > max(previous_losses[-3:]): - sess.run(model.learning_rate_decay_op) + sess.run(model.learning_rate_decay_op) previous_losses.append(loss) auc_train.append(auc) @@ -276,8 +276,8 @@ def train(raw_data=FLAGS.raw_data, train_dir=FLAGS.train_dir, mylog=mylog, if not FLAGS.eval: continue - - + + # Run evals on development set and print their loss. l_va = len(data_va) eval_loss, eval_auc = 0.0, 0.0 @@ -294,7 +294,7 @@ def train(raw_data=FLAGS.raw_data, train_dir=FLAGS.train_dir, mylog=mylog, item_va_neg = None the_loss = 'warp' if loss_func == 'mw' else loss_func eval_loss0 = model.step(sess, user_va, item_va, item_va_neg, - None, None, forward_only=True, + None, None, forward_only=True, loss=the_loss) eval_loss += eval_loss0 count_va += 1 @@ -306,22 +306,22 @@ def train(raw_data=FLAGS.raw_data, train_dir=FLAGS.train_dir, mylog=mylog, mylog(" dev: perplexity %.2f eval_auc(not computed) %.4f step-time %.4f" % ( eval_ppx, eval_auc, step_time)) else: - mylog(" dev: loss %.3f eval_auc(not computed) %.4f step-time %.4f" % (eval_loss, + mylog(" dev: loss %.3f eval_auc(not computed) %.4f step-time %.4f" % (eval_loss, eval_auc, step_time)) sys.stdout.flush() - + if eval_loss < best_loss and not go_test: best_loss = eval_loss patience = max_patience checkpoint_path = os.path.join(train_dir, "best.ckpt") mylog('Saving best model...') - model.saver.save(sess, checkpoint_path, + model.saver.save(sess, checkpoint_path, global_step=0, write_meta_graph = False) if go_test: checkpoint_path = os.path.join(train_dir, "best.ckpt") mylog('Saving best model...') - model.saver.save(sess, checkpoint_path, + model.saver.save(sess, checkpoint_path, global_step=0, write_meta_graph = False) if eval_loss > best_loss: @@ -337,27 +337,27 @@ def train(raw_data=FLAGS.raw_data, train_dir=FLAGS.train_dir, mylog=mylog, break return -def recommend(target_uids=[], raw_data=FLAGS.raw_data, data_dir=FLAGS.data_dir, - combine_att=FLAGS.combine_att, logits_size_tr=FLAGS.item_vocab_size, - item_vocab_min_thresh=FLAGS.item_vocab_min_thresh, loss=FLAGS.loss, +def recommend(target_uids=[], raw_data=FLAGS.raw_data, data_dir=FLAGS.data_dir, + combine_att=FLAGS.combine_att, logits_size_tr=FLAGS.item_vocab_size, + vocab_min_thresh=FLAGS.vocab_min_thresh, loss=FLAGS.loss, top_n=FLAGS.top_N_items, test=FLAGS.test, mylog=mylog, - use_user_feature=FLAGS.use_user_feature, + use_user_feature=FLAGS.use_user_feature, use_item_feature=FLAGS.use_item_feature, batch_size=FLAGS.batch_size, device_log=FLAGS.device_log): - with tf.Session(config=tf.ConfigProto(allow_soft_placement=True, + with tf.Session(config=tf.ConfigProto(allow_soft_placement=True, log_device_placement=device_log)) as sess: mylog("reading data") - (_, _, u_attributes, i_attributes, item_ind2logit_ind, + (_, _, u_attributes, i_attributes, item_ind2logit_ind, logit_ind2item_ind, user_index, item_index) = read_data( - raw_data_dir=raw_data, - data_dir=data_dir, - combine_att=combine_att, - logits_size_tr=logits_size_tr, - thresh=item_vocab_min_thresh, + raw_data_dir=raw_data, + data_dir=data_dir, + combine_att=combine_att, + logits_size_tr=logits_size_tr, + thresh=vocab_min_thresh, use_user_feature=use_user_feature, - use_item_feature=use_item_feature, - test=test, + use_item_feature=use_item_feature, + test=test, mylog=mylog) model = create_model(sess, u_attributes, i_attributes, item_ind2logit_ind, @@ -368,24 +368,24 @@ def recommend(target_uids=[], raw_data=FLAGS.raw_data, data_dir=FLAGS.data_dir, N = len(Uinds) mylog("%d target users to recommend" % N) rec = np.zeros((N, top_n), dtype=int) - + count = 0 time_start = time.time() for idx_s in range(0, N, batch_size): count += 1 if count % 100 == 0: mylog("idx: %d, c: %d" % (idx_s, count)) - + idx_e = idx_s + batch_size if idx_e <= N: users = Uinds[idx_s: idx_e] - recs = model.step(sess, users, None, None, forward_only=True, + recs = model.step(sess, users, None, None, forward_only=True, recommend=True) rec[idx_s:idx_e, :] = recs else: users = range(idx_s, N) + [0] * (idx_e - N) users = [Uinds[t] for t in users] - recs = model.step(sess, users, None, None, forward_only=True, + recs = model.step(sess, users, None, None, forward_only=True, recommend=True) idx_e = N rec[idx_s:idx_e, :] = recs[:(idx_e-idx_s),:] @@ -395,7 +395,7 @@ def recommend(target_uids=[], raw_data=FLAGS.raw_data, data_dir=FLAGS.data_dir, # transform result to a dictionary # R[user_id] = [item_id1, item_id2, ...] - + ind2id = {} for iid in item_index: uind = item_index[iid] @@ -411,12 +411,12 @@ def recommend(target_uids=[], raw_data=FLAGS.raw_data, data_dir=FLAGS.data_dir, def compute_scores(raw_data_dir=FLAGS.raw_data, data_dir=FLAGS.data_dir, dataset=FLAGS.dataset, save_recommendation=FLAGS.saverec, train_dir=FLAGS.train_dir, test=FLAGS.test): - + from evaluate import Evaluation as Evaluate evaluation = Evaluate(raw_data_dir, test=test) - + R = recommend(evaluation.get_uids(), data_dir=data_dir) - + evaluation.eval_on(R) scores_self, scores_ex = evaluation.get_scores() mylog("====evaluation scores (NDCG, RECALL, PRECISION, MAP) @ 2,5,10,20,30====") @@ -427,7 +427,7 @@ def compute_scores(raw_data_dir=FLAGS.raw_data, data_dir=FLAGS.data_dir, np.save(name_inds, rec) def main(_): - + if FLAGS.test: if FLAGS.data_dir[-1] == '/': FLAGS.data_dir = FLAGS.data_dir[:-1] + '_test' @@ -450,4 +450,3 @@ def main(_): if __name__ == "__main__": tf.app.run() - diff --git a/lstm/lstm_class.py b/lstm/lstm_class.py new file mode 100644 index 0000000..613ca12 --- /dev/null +++ b/lstm/lstm_class.py @@ -0,0 +1,955 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import math +import os +import random +import sys +import time + +import numpy as np +from six.moves import xrange # pylint: disable=redefined-builtin +import tensorflow as tf +import logging + +from seqModel import SeqModel + +# import pandas as pd +# import configparser +# import env + +sys.path.insert(0, '../utils') +sys.path.insert(0, '../attributes') + +import embed_attribute +from input_attribute import read_data as read_attributed_data + + +import data_iterator +from data_iterator import DataIterator +from best_buckets import * +from tensorflow.python.client import timeline +from prepare_train import positive_items, item_frequency, sample_items, to_week +from attrdict import AttrDict +from pandatools import pd +from load_config import load_configurations +from sqlalchemy import create_engine + + +class lstm(object): + """ lstm class object used to recommend "entity2" to "entity1" + using LSTM-based seq2seq model on an implicit rating dataset. + """ + + def __init__(self, config): + """ Class constructor, initializes key components + """ + # setup + self.config = config + self.config['db_config'] = load_configurations( + 'config/environment/' + self.config['environment'] + '.json') + self.ent1 = self.config['entity1_ID'] + self.ent2 = self.config['entity2_ID'] + self.FLAGS = AttrDict() + # self.config['db_config'] = load_configurations( 'config/environment/'+self.config['environment']+'.json' ) + # datasets, paths, and preprocessing + self.FLAGS.dataset = self.config['dataset_name'] + self.FLAGS.raw_data = self.config['raw_data_dir'] + self.FLAGS.data_dir = self.config['cache_dir'] + self.FLAGS.train_dir = self.config['train_dir'] + self.FLAGS.test = self.config['test'] + self.FLAGS.combine_att = self.config['combine_att'] + self.FLAGS.use_item_feature = self.config['use_item_feature'] + self.FLAGS.use_user_feature = self.config['use_user_feature'] + self.FLAGS.item_vocab_size = self.config['item_vocab_size'] + self.FLAGS.vocab_min_thresh = self.config['vocab_min_thresh'] + + # tuning hypers + self.FLAGS.loss = self.config['loss'] + self.FLAGS.learning_rate = self.config['learning_rate'] + self.FLAGS.learning_rate_decay_factor = self.config['learning_rate_decay_factor'] + self.FLAGS.max_gradient_norm = self.config['max_gradient_norm'] + self.FLAGS.keep_prob = self.config['keep_prob'] + self.FLAGS.power = self.config['power'] + self.FLAGS.batch_size = self.config['batch_size'] + self.FLAGS.size = self.config['size'] + self.FLAGS.num_layers = self.config['num_layers'] + self.FLAGS.n_epoch = self.config['n_epoch'] + self.FLAGS.L = self.config['L'] + self.FLAGS.n_bucket = self.config['n_bucket'] + self.FLAGS.patience = self.config['patience'] + # tf.app.flags.DEFINE_integer("steps_per_checkpoint", self.config['steps_per_checkpoint'],"How many training steps to do per checkpoint.") + + # recommendation + self.FLAGS.recommend = self.config['recommend'] + self.FLAGS.saverec = self.config['saverec'] + self.FLAGS.recommend_new = self.config['recommend_new'] + self.FLAGS.topk = self.config['topk'] + + # for ensemble + self.FLAGS.ensemble = self.config['ensemble'] + self.FLAGS.ensemble_suffix = self.config['ensemble_suffix'] + self.FLAGS.seed = self.config['seed'] + + # attribute model variants + self.FLAGS.output_feat = self.config['output_feat'] + self.FLAGS.use_sep_item = self.config['use_sep_item'] + self.FLAGS.no_input_item_feature = self.config['no_input_item_feature'] + self.FLAGS.use_concat = self.config['use_concat'] + self.FLAGS.no_user_id = self.config['no_user_id'] + + # devices + self.FLAGS.N = self.config['N'] + + # + self.FLAGS.withAdagrad = self.config['withAdagrad'] + self.FLAGS.fromScratch = self.config['fromScratch'] + self.FLAGS.saveCheckpoint = self.config['saveCheckpoint'] + self.FLAGS.profile = self.config['profile'] + + # others... + self.FLAGS.ta = self.config['ta'] + self.FLAGS.user_sample = self.config['user_sample'] + + self.FLAGS.after40 = self.config['after40'] + self.FLAGS.split = self.config['split'] + + self.FLAGS.n_sampled = self.config['n_sampled'] + self.FLAGS.n_resample = self.config['n_resample'] + + # for beam_search + self.FLAGS.beam_search = self.config['beam_search'] + self.FLAGS.beam_size = self.config['beam_size'] + + self.FLAGS.max_train_data_size = self.config['max_train_data_size'] + self.FLAGS.old_att = self.config['old_att'] + + _buckets = [] + + if self.FLAGS.test: + if self.FLAGS.data_dir[-1] == '/': + self.FLAGS.data_dir = self.FLAGS.data_dir[:-1] + '_test' + else: + self.FLAGS.data_dir = self.FLAGS.data_dir + '_test' + + if not os.path.exists(self.FLAGS.train_dir): + os.mkdir(self.FLAGS.train_dir) + + if self.FLAGS.beam_search: + self.FLAGS.batch_size = 1 + self.FLAGS.n_bucket = 1 + + return + + def mylog(self, msg): + logger = logging.getLogger('main') + logger.info(msg) + + def split_buckets(self, array, buckets): + """ + array : [(user,[items])] + return: + d : [[(user, [items])]] + """ + d = [[] for i in xrange(len(buckets))] + for u, items in array: + index = self.get_buckets_id(len(items), buckets) + if index >= 0: + d[index].append((u, items)) + return d + + def get_buckets_id(self, l, buckets): + id = -1 + for i in xrange(len(buckets)): + if l <= buckets[i]: + id = i + break + return id + + def form_sequence_prediction(self, data, uids, maxlen, START_ID): + """ + Args: + data = [(user_id,[item_id])] + uids = [user_id] + Return: + d : [(user_id,[item_id])] + """ + d = [] + m = {} + for uid, items in data: + m[uid] = items + for uid in uids: + if uid in m: + items = [START_ID] + m[uid][-(maxlen - 1):] + else: + items = [START_ID] + d.append((uid, items)) + + return d + + def form_sequence(self, data, maxlen=100): + """ + Args: + data = [(u,i,week)] + Return: + d : [(user_id, [item_id])] + """ + + users = [] + items = [] + d = {} # d[u] = [(i,week)] + for u, i, week in data: + if not u in d: + d[u] = [] + d[u].append((i, week)) + + dd = [] + n_all_item = 0 + n_rest_item = 0 + for u in d: + tmp = sorted(d[u], key=lambda x: x[1]) + n_all_item += len(tmp) + while True: + new_tmp = [x[0] for x in tmp][:maxlen] + n_rest_item += len(new_tmp) + # make sure every sequence has at least one item + if len(new_tmp) > 0: + dd.append((u, new_tmp)) + if len(tmp) <= maxlen: + break + else: + if len(tmp) - maxlen <= 7: + tmp = tmp[maxlen - 10:] + else: + tmp = tmp[maxlen:] + + # count below not valid any more + # mylog("All item: {} Rest item: {} Remove item: {}".format(n_all_item, n_rest_item, n_all_item - n_rest_item)) + + return dd + + def prepare_warp(self, embAttr, data_tr, data_va): + pos_item_list, pos_item_list_val = {}, {} + for t in data_tr: + u, i_list = t + pos_item_list[u] = list(set(i_list)) + for t in data_va: + u, i_list = t + pos_item_list_val[u] = list(set(i_list)) + embAttr.prepare_warp(pos_item_list, pos_item_list_val) + + def get_device_address(self, s): + add = [] + if s == "": + for i in xrange(3): + add.append("/cpu:0") + else: + add = ["/gpu:{}".format(int(x)) for x in s] + self.mylog(add) + return add + + def split_train_dev(self, seq_all, ratio=0.05): + random.seed(self.FLAGS.seed) + seq_tr, seq_va = [], [] + for item in seq_all: + r = random.random() + if r < ratio: + seq_va.append(item) + else: + seq_tr.append(item) + return seq_tr, seq_va + + def get_data(self, raw_data, recommend=False): + data_dir = self.FLAGS.data_dir + combine_att = self.FLAGS.combine_att + test = self.FLAGS.test + logits_size_tr = self.FLAGS.item_vocab_size + thresh = self.FLAGS.vocab_min_thresh + use_user_feature = self.FLAGS.use_user_feature + use_item_feature = self.FLAGS.use_item_feature + no_user_id = self.FLAGS.no_user_id + + (data_tr, data_va, u_attr, i_attr, item_ind2logit_ind, logit_ind2item_ind, + user_index, item_index) = read_attributed_data( + raw_data_dir=raw_data, + data_dir=data_dir, + combine_att=combine_att, + logits_size_tr=logits_size_tr, + thresh=thresh, + use_user_feature=use_user_feature, + use_item_feature=use_item_feature, + no_user_id=no_user_id, + test=test, + mylog=self.mylog, + config=self.config) + + # remove unk + data_tr = [p for p in data_tr if (p[1] in item_ind2logit_ind)] + + # remove items before week 40 + if self.FLAGS.after40: + data_tr = [p for p in data_tr if (to_week(p[2]) >= 40)] + + # item frequency (for sampling) + item_population, p_item = item_frequency(data_tr, self.FLAGS.power) + + # UNK and START + # mylog(len(item_ind2logit_ind)) + # mylog(len(logit_ind2item_ind)) + # mylog(len(item_index)) + START_ID = len(item_index) + # START_ID = i_attr.get_item_last_index() + item_ind2logit_ind[START_ID] = 0 + seq_all = self.form_sequence(data_tr, maxlen=self.FLAGS.L) + seq_tr0, seq_va0 = self.split_train_dev(seq_all, ratio=0.05) + + # calculate buckets + global _buckets + _buckets = calculate_buckets( + seq_tr0 + seq_va0, self.FLAGS.L, self.FLAGS.n_bucket) + _buckets = sorted(_buckets) + + # split_buckets + seq_tr = self.split_buckets(seq_tr0, _buckets) + seq_va = self.split_buckets(seq_va0, _buckets) + + # get test data + if recommend: + from evaluate import Evaluation as Evaluate + evaluation = Evaluate(raw_data, test=test, + config=self.config, mylog=self.mylog) + uinds = evaluation.get_uinds() + seq_test = self.form_sequence_prediction( + seq_all, uinds, self.FLAGS.L, START_ID) + _buckets = calculate_buckets( + seq_test, self.FLAGS.L, self.FLAGS.n_bucket) + _buckets = sorted(_buckets) + seq_test = self.split_buckets(seq_test, _buckets) + else: + seq_test = [] + evaluation = None + uinds = [] + + # create embedAttr + + devices = self.get_device_address(self.FLAGS.N) + with tf.device(devices[0]): + u_attr.set_model_size(self.FLAGS.size) + i_attr.set_model_size(self.FLAGS.size) + + embAttr = embed_attribute.EmbeddingAttribute( + u_attr, i_attr, self.FLAGS.batch_size, self.FLAGS.n_sampled, _buckets[-1], self.FLAGS.use_sep_item, item_ind2logit_ind, logit_ind2item_ind, devices=devices) + + if self.FLAGS.loss in ["warp", 'mw']: + self.prepare_warp(embAttr, seq_tr0, seq_va0) + + return seq_tr, seq_va, seq_test, embAttr, START_ID, item_population, p_item, evaluation, uinds, user_index, item_index, logit_ind2item_ind + + def create_model(self, session, embAttr, START_ID, run_options, run_metadata): + devices = self.get_device_address(self.FLAGS.N) + dtype = tf.float32 + model = SeqModel(_buckets, + self.FLAGS.size, + self.FLAGS.num_layers, + self.FLAGS.max_gradient_norm, + self.FLAGS.batch_size, + self.FLAGS.learning_rate, + self.FLAGS.learning_rate_decay_factor, + embAttr, + withAdagrad=self.FLAGS.withAdagrad, + num_samples=self.FLAGS.n_sampled, + dropoutRate=self.FLAGS.keep_prob, + START_ID=START_ID, + loss=self.FLAGS.loss, + dtype=dtype, + devices=devices, + use_concat=self.FLAGS.use_concat, + no_user_id=False, # to remove this argument + output_feat=self.FLAGS.output_feat, + no_input_item_feature=self.FLAGS.no_input_item_feature, + topk_n=self.FLAGS.topk, + run_options=run_options, + run_metadata=run_metadata + ) + + ckpt = tf.train.get_checkpoint_state(self.FLAGS.train_dir) + # if FLAGS.recommend or (not FLAGS.fromScratch) and ckpt and tf.gfile.Exists(ckpt.model_checkpoint_path): + + if self.FLAGS.recommend or self.FLAGS.beam_search or self.FLAGS.ensemble or (not self.FLAGS.fromScratch) and ckpt: + self.mylog("Reading model parameters from %s" % + ckpt.model_checkpoint_path) + model.saver.restore(session, ckpt.model_checkpoint_path) + else: + self.mylog("Created model with fresh parameters.") + session.run(tf.global_variables_initializer()) + return model + + def show_all_variables(self): + all_vars = tf.global_variables() + for var in all_vars: + self.mylog(var.name) + + def train(self): + raw_data = self.FLAGS.raw_data + + # Read Data + self.mylog("Reading Data...") + train_set, dev_set, test_set, embAttr, START_ID, item_population, p_item, _, _, _, _, _ = self.get_data( + raw_data) + n_targets_train = np.sum( + [np.sum([len(items) for uid, items in x]) for x in train_set]) + train_bucket_sizes = [len(train_set[b]) for b in xrange(len(_buckets))] + train_total_size = float(sum(train_bucket_sizes)) + train_buckets_scale = [sum(train_bucket_sizes[:i + 1]) / + train_total_size for i in xrange(len(train_bucket_sizes))] + dev_bucket_sizes = [len(dev_set[b]) for b in xrange(len(_buckets))] + dev_total_size = int(sum(dev_bucket_sizes)) + + # steps + batch_size = self.FLAGS.batch_size + n_epoch = self.FLAGS.n_epoch + steps_per_epoch = int(train_total_size / batch_size) + steps_per_dev = int(dev_total_size / batch_size) + + steps_per_checkpoint = int(steps_per_epoch / 2) + total_steps = steps_per_epoch * n_epoch + + # reports + self.mylog(_buckets) + self.mylog("Train:") + self.mylog("total: {}".format(train_total_size)) + self.mylog("bucket sizes: {}".format(train_bucket_sizes)) + self.mylog("Dev:") + self.mylog("total: {}".format(dev_total_size)) + self.mylog("bucket sizes: {}".format(dev_bucket_sizes)) + self.mylog("") + self.mylog("Steps_per_epoch: {}".format(steps_per_epoch)) + self.mylog("Total_steps:{}".format(total_steps)) + self.mylog("Steps_per_checkpoint: {}".format(steps_per_checkpoint)) + + # with tf.Session(config=tf.ConfigProto(allow_soft_placement=True, log_device_placement = False, device_count={'CPU':8, 'GPU':1})) as sess: + with tf.Session(config=tf.ConfigProto(allow_soft_placement=True, log_device_placement=False)) as sess: + + # runtime profile + if self.FLAGS.profile: + run_options = tf.RunOptions( + trace_level=tf.RunOptions.FULL_TRACE) + run_metadata = tf.RunMetadata() + else: + run_options = None + run_metadata = None + + self.mylog("Creating Model.. (this can take a few minutes)") + model = self.create_model(sess, embAttr, START_ID, + run_options, run_metadata) + self.show_all_variables() + + # Data Iterators + dite = DataIterator(model, train_set, len( + train_buckets_scale), batch_size, train_buckets_scale) + + iteType = 0 + if iteType == 0: + self.mylog("withRandom") + ite = dite.next_random() + elif iteType == 1: + self.mylog("withSequence") + ite = dite.next_sequence() + + # statistics during training + step_time, loss = 0.0, 0.0 + current_step = 0 + previous_losses = [] + his = [] + low_ppx = float("inf") + low_ppx_step = 0 + steps_per_report = 30 + n_targets_report = 0 + report_time = 0 + n_valid_sents = 0 + patience = self.FLAGS.patience + item_sampled, item_sampled_id2idx = None, None + + while current_step < total_steps: + + # start + start_time = time.time() + + # re-sample every once a while + if self.FLAGS.loss in ['mw', 'mce'] and current_step % self.FLAGS.n_resample == 0: + item_sampled, item_sampled_id2idx = sample_items( + item_population, self.FLAGS.n_sampled, p_item) + else: + item_sampled = None + + # data and train + users, inputs, outputs, weights, bucket_id = ite.next() + + L = model.step(sess, users, inputs, outputs, weights, bucket_id, + item_sampled=item_sampled, item_sampled_id2idx=item_sampled_id2idx) + + # loss and time + step_time += (time.time() - start_time) / steps_per_checkpoint + + loss += L + current_step += 1 + n_valid_sents += np.sum(np.sign(weights[0])) + + # for report + report_time += (time.time() - start_time) + n_targets_report += np.sum(weights) + + if current_step % steps_per_report == 0: + self.mylog("--------------------" + "Report" + + str(current_step) + "-------------------") + self.mylog("StepTime: {} Speed: {} targets / sec in total {} targets".format( + report_time / steps_per_report, n_targets_report * 1.0 / report_time, n_targets_train)) + + report_time = 0 + n_targets_report = 0 + + # Create the Timeline object, and write it to a json + if self.FLAGS.profile: + tl = timeline.Timeline(run_metadata.step_stats) + ctf = tl.generate_chrome_trace_format() + with open('timeline.json', 'w') as f: + f.write(ctf) + exit() + + if current_step % steps_per_checkpoint == 0: + self.mylog("--------------------" + "TRAIN" + + str(current_step) + "-------------------") + # Print statistics for the previous epoch. + + loss = loss / n_valid_sents + perplexity = math.exp( + float(loss)) if loss < 300 else float("inf") + self.mylog("global step %d learning rate %.4f step-time %.2f perplexity " "%.2f" % + (model.global_step.eval(), model.learning_rate.eval(), step_time, perplexity)) + + train_ppx = perplexity + + # Save checkpoint and zero timer and loss. + step_time, loss, n_valid_sents = 0.0, 0.0, 0 + + # dev data + self.mylog("--------------------" + "DEV" + + str(current_step) + "-------------------") + eval_loss, eval_ppx = self.evaluate( + sess, model, dev_set, item_sampled_id2idx=item_sampled_id2idx) + self.mylog("dev: ppx: {}".format(eval_ppx)) + + his.append([current_step, train_ppx, eval_ppx]) + + if eval_ppx < low_ppx: + patience = self.FLAGS.patience + low_ppx = eval_ppx + low_ppx_step = current_step + checkpoint_path = os.path.join( + self.FLAGS.train_dir, "best.ckpt") + self.mylog("Saving best model....") + s = time.time() + model.saver.save(sess, checkpoint_path, + global_step=0, write_meta_graph=False) + self.mylog("Best model saved using {} sec".format( + time.time() - s)) + else: + patience -= 1 + + if patience <= 0: + self.mylog( + "Training finished. Running out of patience.") + break + + sys.stdout.flush() + + def evaluate(self, sess, model, data_set, item_sampled_id2idx=None): + # Run evals on development set and print their perplexity/loss. + dropoutRateRaw = self.FLAGS.keep_prob + sess.run(model.dropout10_op) + + start_id = 0 + loss = 0.0 + n_steps = 0 + n_valids = 0 + batch_size = self.FLAGS.batch_size + + dite = DataIterator(model, data_set, len(_buckets), batch_size, None) + ite = dite.next_sequence(stop=True) + + for users, inputs, outputs, weights, bucket_id in ite: + L = model.step(sess, users, inputs, outputs, + weights, bucket_id, forward_only=True) + loss += L + n_steps += 1 + n_valids += np.sum(np.sign(weights[0])) + + loss = loss / (n_valids) + ppx = math.exp(loss) if loss < 300 else float("inf") + + sess.run(model.dropoutAssign_op) + + return loss, ppx + + def recommend(self): + raw_data = self.FLAGS.raw_data + + # Read Data + self.mylog("recommend") + self.mylog("Reading Data...") + _, _, test_set, embAttr, START_ID, _, _, evaluation, uinds, user_index, item_index, logit_ind2item_ind = self.get_data( + raw_data, recommend=True) + test_bucket_sizes = [len(test_set[b]) for b in xrange(len(_buckets))] + test_total_size = int(sum(test_bucket_sizes)) + + # reports + self.mylog(_buckets) + self.mylog("Test:") + self.mylog("total: {}".format(test_total_size)) + self.mylog("buckets: {}".format(test_bucket_sizes)) + + with tf.Session(config=tf.ConfigProto(allow_soft_placement=True, log_device_placement=False)) as sess: + + # runtime profile + if self.FLAGS.profile: + run_options = tf.RunOptions( + trace_level=tf.RunOptions.FULL_TRACE) + run_metadata = tf.RunMetadata() + else: + run_options = None + run_metadata = None + + self.mylog("Creating Model") + model = self.create_model(sess, embAttr, START_ID, + run_options, run_metadata) + self.show_all_variables() + + sess.run(model.dropoutRate.assign(1.0)) + + start_id = 0 + n_steps = 0 + batch_size = self.FLAGS.batch_size + + dite = DataIterator(model, test_set, len( + _buckets), batch_size, None) + ite = dite.next_sequence(stop=True, recommend=True) + + n_total_user = len(uinds) + n_recommended = 0 + uind2rank = {} + for r, uind in enumerate(uinds): + uind2rank[uind] = r + rec = np.zeros((n_total_user, self.FLAGS.topk), dtype=int) + rec_value = np.zeros((n_total_user, self.FLAGS.topk), dtype=float) + start = time.time() + + for users, inputs, positions, valids, bucket_id in ite: + results = model.step_recommend( + sess, users, inputs, positions, bucket_id) + for i, valid in enumerate(valids): + if valid == 1: + n_recommended += 1 + if n_recommended % 1000 == 0: + self.mylog("Evaluating n {} bucket_id {}".format( + n_recommended, bucket_id)) + uind, topk_values, topk_indexes = results[i] + rank = uind2rank[uind] + rec[rank, :] = topk_indexes + rec_value[rank, :] = topk_values + n_steps += 1 + end = time.time() + self.mylog("Time used {} sec for {} steps {} users ".format( + end - start, n_steps, n_recommended)) + + ind2id = {} + for iid in item_index: + iind = item_index[iid] + assert(iind not in ind2id) + ind2id[iind] = iid + + uind2id = {} + for uid in user_index: + uind = user_index[uid] + assert(uind not in uind2id) + uind2id[uind] = uid + + R = {} + for i in xrange(n_total_user): + uid = uind2id[uinds[i]] + R[uid] = [ind2id[logit_ind2item_ind[v]] + for v in list(rec[i, :])] + + self.recs = pd.DataFrame.from_dict(R, orient="index") + self.recs = self.recs.stack().reset_index() + self.recs.columns = [self.ent1, 'slot', self.ent2] + + evaluation.eval_on(R) + + scores_self, scores_ex = evaluation.get_scores() + self.mylog( + "====evaluation scores (NDCG, RECALL, PRECISION, MAP) @ 2,5,10,20,30====") + self.mylog("METRIC_FORMAT (self): {}".format(scores_self)) + self.mylog("METRIC_FORMAT (ex ): {}".format(scores_ex)) + + if self.FLAGS.saverec: + self.write_to_db(self.recs) + + def ensemble(self): + raw_data = self.FLAGS.raw_data + # Read Data + self.mylog("Ensemble {} {}".format( + self.FLAGS.train_dir, self.FLAGS.ensemble_suffix)) + self.mylog("Reading Data...") + # task = Task(FLAGS.dataset) + _, _, test_set, embAttr, START_ID, _, _, evaluation, uinds, user_index, item_index, logit_ind2item_ind = self.get_data( + raw_data, recommend=True) + test_bucket_sizes = [len(test_set[b]) for b in xrange(len(_buckets))] + test_total_size = int(sum(test_bucket_sizes)) + + # reports + self.mylog(_buckets) + self.mylog("Test:") + self.mylog("total: {}".format(test_total_size)) + self.mylog("buckets: {}".format(test_bucket_sizes)) + + # load top_index, and top_value + suffixes = self.FLAGS.ensemble_suffix.split(',') + top_indexes = [] + top_values = [] + for suffix in suffixes: + # dir_path = FLAGS.train_dir+suffix + dir_path = self.FLAGS.train_dir.replace('seed', 'seed' + suffix) + self.mylog("Loading results from {}".format(dir_path)) + index_path = os.path.join( + dir_path, "top{}_index.npy".format(self.FLAGS.topk)) + value_path = os.path.join( + dir_path, "top{}_value.npy".format(self.FLAGS.topk)) + top_index = np.load(index_path) + top_value = np.load(value_path) + top_indexes.append(top_index) + top_values.append(top_value) + + # ensemble + rec = np.zeros(top_indexes[0].shape) + for row in xrange(rec.shape[0]): + v = {} + for i in xrange(len(suffixes)): + for col in xrange(rec.shape[1]): + index = top_indexes[i][row, col] + value = top_values[i][row, col] + if index not in v: + v[index] = 0 + v[index] += value + items = [(index, v[index] / len(suffixes)) for index in v] + items = sorted(items, key=lambda x: -x[1]) + rec[row:] = [x[0] for x in items][:FLAGS.topk] + if row % 1000 == 0: + self.mylog("Ensembling n {}".format(row)) + + ind2id = {} + for iid in item_index: + uind = item_index[iid] + assert(uind not in ind2id) + ind2id[uind] = iid + + uind2id = {} + for uid in user_index: + uind = user_index[uid] + assert(uind not in uind2id) + uind2id[uind] = uid + + R = {} + for i in xrange(n_total_user): + uind = uinds[i] + uid = uind2id[uind] + R[uid] = [ind2id[logit_ind2item_ind[v]] for v in list(rec[i, :])] + + evaluation.eval_on(R) + + scores_self, scores_ex = evaluation.get_scores() + self.mylog( + "====evaluation scores (NDCG, RECALL, PRECISION, MAP) @ 2,5,10,20,30====") + self.mylog("METRIC_FORMAT (self): {}".format(scores_self)) + self.mylog("METRIC_FORMAT (ex ): {}".format(scores_ex)) + + def beam_search(self): + self.mylog("Reading Data...") + task = Task(self.FLAGS.dataset) + _, _, test_set, embAttr, START_ID, _, _, evaluation, uids = read_data( + task, test=True) + test_bucket_sizes = [len(test_set[b]) for b in xrange(len(_buckets))] + test_total_size = int(sum(test_bucket_sizes)) + + # reports + self.mylog(_buckets) + self.mylog("Test:") + self.mylog("total: {}".format(test_total_size)) + self.mylog("buckets: {}".format(test_bucket_sizes)) + + with tf.Session(config=tf.ConfigProto(allow_soft_placement=True, log_device_placement=False)) as sess: + + # runtime profile + if self.FLAGS.profile: + run_options = tf.RunOptions( + trace_level=tf.RunOptions.FULL_TRACE) + run_metadata = tf.RunMetadata() + else: + run_options = None + run_metadata = None + + self.mylog("Creating Model") + model = self.create_model(sess, embAttr, START_ID, + run_options, run_metadata) + self.show_all_variables() + model.init_beam_decoder() + + sess.run(model.dropoutRate.assign(1.0)) + + start_id = 0 + n_steps = 0 + batch_size = self.FLAGS.batch_size + + dite = DataIterator(model, test_set, len( + _buckets), batch_size, None) + ite = dite.next_sequence(stop=True, recommend=True) + + n_total_user = len(uids) + n_recommended = 0 + uid2rank = {} + for r, uid in enumerate(uids): + uid2rank[uid] = r + rec = np.zeros((n_total_user, self.FLAGS.topk), dtype=int) + rec_value = np.zeros((n_total_user, self.FLAGS.topk), dtype=float) + start = time.time() + + for users, inputs, positions, valids, bucket_id in ite: + self.mylog(inputs) + self.mylog(positions) + results = model.beam_step( + sess, index=0, user_input=users, item_inputs=inputs, sequence_length=positions, bucket_id=bucket_id) + break + + def write_to_db(self, df): + """ write recommenations to database + """ + + # form the connection string used to connect to recommendation_service DB + try: + cxn_string = "mysql+mysqlconnector://%(user)s:%(password)s@%(host)s/%(database)s" % \ + self.config['db_config']['recommendation_service'] + engine = create_engine(cxn_string) + except: + self.mylog( + 'error creating connection engine, check connection string: %s' % cxn_string) + self.mylog(sys.exc_info()) + sys.exit() + + # write new records to target output DB table + try: + start = time.time() + self.mylog('writing new recommendations to db') + + # decode entity1 and entity2 back into alpha-numeric IDs + #df[self.ent1] = self.ent1_encoder.inverse_transform(df[self.ent1]) + #df[self.ent2] = self.ent2_encoder.inverse_transform(df[self.ent2]) + # add timestamp to mark when the recommendations were made + df['createdAt'] = time.strftime('%Y-%m-%d %H:%M:%S') + df = df[['createdAt', self.ent1, self.ent2, 'slot']] + + # write to output table 5000 rows at a time; note that column "rank" is ommitted + df[[self.ent1, self.ent2, 'createdAt']].to_sql(index=False, + name=self.config['rec_output_config']['output_table'], + con=engine, + if_exists='append', + chunksize=5000) + self.mylog('writing new recommendations took %.2f seconds.' % + (time.time() - start)) + + except: + self.mylog('failed writing new recommendations to \"recommendation_service.%s\"' % + self.config['rec_output_config']['output_table']) + self.mylog(sys.exc_info()) + sys.exit() + + # clear up old recommendations + try: + start = time.time() + self.mylog('clearing up previous recommendations') + + engine.execute('delete from %(table)s where createdAt < \'%(timestamp)s\';' % + {'table': self.config['rec_output_config']['output_table'], + 'timestamp': df.createdAt.values.tolist()[0]}) + self.mylog('clearing up old recommendations took %.2f seconds.' % ( + time.time() - start)) + + except: + self.mylog('failed clearing up old recommendations before %s from %s' % (df.createdAt.values.tolist()[0], + self.config['rec_output_config']['output_table'])) + self.mylog(sys.exc_info()) + sys.exit() + + return + + def get_past_rec(self): + """ Fetches past recommendations from the database + """ + + # form the connection string used to connect to recommendation_service DB + try: + cxn_string = "mysql+mysqlconnector://%(user)s:%(password)s@%(host)s/%(database)s" % \ + self.config['db_config']['recommendation_service'] + engine = create_engine(cxn_string) + except: + self.mylog( + 'error creating connection engine, check connection string: %s' % cxn_string) + self.mylog(sys.exc_info()) + sys.exit() + + # backup the current recommendations in recommendation_service + # this moves current recommendations to the archive table + try: + self.mylog('backing up current recommendations to archive') + start = time.time() + query = 'SELECT * FROM %s' % self.config['rec_output_config']['output_table'] + data_iterator = pd.read_sql_query(query, engine, chunksize=5000) + for records in data_iterator: + # copy records to archive + records[['createdAt', self.ent1, self.ent2]].to_sql(index=False, + name=self.config['rec_output_config']['archive_table'], + con=engine, + if_exists='append') + except: + self.mylog('failed backing up old recommendations') + self.mylog(sys.exc_info()) + sys.exit() + + # get list of recommendations in past N days from archive table + try: + self.mylog('fetching past recommendations') + start = time.time() + query = """SELECT %(entity_one)s, %(entity_two)s + FROM %(table)s + WHERE createdAt >= DATE_SUB(CURDATE(),interval %(recs_past_N_days)s day)"""\ + % {'entity_one': self.ent1_view.columns[0], + 'entity_two': self.ent2_view.columns[0], + 'table': self.config['rec_output_config']['archive_table'], + 'recs_past_N_days': self.config['recs_past_N_days'] + } + self.past_rec = pd.read_sql_query(query, cxn_string) + self.mylog('fetching past recommendations took %.2f seconds' % ( + time.time() - start)) + except: + self.mylog('error executing query to fetch past recommendations') + self.mylog(sys.exc_info()) + sys.exit() + + # filter out ent1 and ent2 IDs in past rec that are not present in the current dataset + # (this is more of a safety net mechanism) + # self.past_rec = self.past_rec[self.past_rec[self.ent1].isin(self.ent1_encoder.classes_.tolist())] + # self.past_rec = self.past_rec[self.past_rec[self.ent2].isin(self.ent2_encoder.classes_.tolist())] + # encode the ent1 and ent2 IDs from past rec + # self.past_rec[self.ent1] = self.ent1_encoder.transform(self.past_rec[self.ent1]) + # self.past_rec[self.ent2] = self.ent2_encoder.transform(self.past_rec[self.ent2]) + + return diff --git a/lstm/seqModel.py b/lstm/seqModel.py index c9c0d44..9037581 100755 --- a/lstm/seqModel.py +++ b/lstm/seqModel.py @@ -21,8 +21,9 @@ import data_iterator # import env + class SeqModel(object): - + def __init__(self, buckets, size, @@ -32,23 +33,23 @@ def __init__(self, learning_rate, learning_rate_decay_factor, embeddingAttribute, - withAdagrad = True, + withAdagrad=True, num_samples=512, forward_only=False, - dropoutRate = 1.0, - START_ID = 0, - loss = "ce", - devices = "", - run_options = None, - run_metadata = None, - use_concat = True, - output_feat = 1, - no_input_item_feature = False, - no_user_id = True, - topk_n = 30, + dropoutRate=1.0, + START_ID=0, + loss="ce", + devices="", + run_options=None, + run_metadata=None, + use_concat=True, + output_feat=1, + no_input_item_feature=False, + no_user_id=True, + topk_n=30, dtype=tf.float32): """Create the model. - + Args: buckets: a list of pairs (I, O), where I specifies maximum input length that will be processed in that bucket, and O specifies maximum output @@ -86,7 +87,7 @@ def __init__(self, with tf.device(devices[0]): self.dropoutRate = tf.Variable( - float(dropoutRate), trainable=False, dtype=dtype) + float(dropoutRate), trainable=False, dtype=dtype) self.dropoutAssign_op = self.dropoutRate.assign(dropoutRate) self.dropout10_op = self.dropoutRate.assign(1.0) self.learning_rate = tf.Variable( @@ -96,72 +97,80 @@ def __init__(self, self.global_step = tf.Variable(0, trainable=False) with tf.device(devices[1]): - single_cell = tf.contrib.rnn.core_rnn_cell.LSTMCell(size, state_is_tuple=True) - single_cell = tf.contrib.rnn.core_rnn_cell.DropoutWrapper(single_cell,input_keep_prob = self.dropoutRate) + single_cell = tf.contrib.rnn.LSTMCell(size, state_is_tuple=True) + single_cell = tf.contrib.rnn.DropoutWrapper( + single_cell, input_keep_prob=self.dropoutRate) if num_layers >= 1: - single_cell = tf.contrib.rnn.core_rnn_cell.MultiRNNCell([single_cell] * num_layers, state_is_tuple=True) - single_cell = tf.contrib.rnn.core_rnn_cell.DropoutWrapper(single_cell, output_keep_prob = self.dropoutRate) - + single_cell = tf.contrib.rnn.MultiRNNCell( + [single_cell] * num_layers, state_is_tuple=True) + single_cell = tf.contrib.rnn.DropoutWrapper( + single_cell, output_keep_prob=self.dropoutRate) + self.single_cell = single_cell - - + # Feeds for inputs. with tf.device(devices[2]): self.targets = [] self.target_ids = [] self.target_weights = [] - # target: 1 2 3 4 + # target: 1 2 3 4 # inputs: go 1 2 3 # weights:1 1 1 1 for i in xrange(buckets[-1]): - self.targets.append(tf.placeholder(tf.int32, - shape=[self.batch_size], name = "target{}".format(i))) - self.target_ids.append(tf.placeholder(tf.int32, - shape=[self.batch_size], name = "target_id{}".format(i))) - self.target_weights.append(tf.placeholder(dtype, - shape = [self.batch_size], name="target_weight{}".format(i))) + self.targets.append(tf.placeholder(tf.int32, + shape=[self.batch_size], name="target{}".format(i))) + self.target_ids.append(tf.placeholder(tf.int32, + shape=[self.batch_size], name="target_id{}".format(i))) + self.target_weights.append(tf.placeholder(dtype, + shape=[self.batch_size], name="target_weight{}".format(i))) with tf.device(devices[0]): self.inputs = [] if use_concat: - user_embed, _ = self.embeddingAttribute.get_batch_user(1.0,concat = True, no_id = no_user_id) + user_embed, _ = self.embeddingAttribute.get_batch_user( + 1.0, concat=True, no_id=no_user_id) user_embed_size = self.embeddingAttribute.get_user_model_size( - no_id = no_user_id, concat = True) + no_id=no_user_id, concat=True) item_embed_size = self.embeddingAttribute.get_item_model_size( concat=True) - w_input_user = tf.get_variable("w_input_user",[user_embed_size, size], dtype = dtype) - w_input_item = tf.get_variable("w_input_item",[item_embed_size, size], dtype = dtype) + w_input_user = tf.get_variable( + "w_input_user", [user_embed_size, size], dtype=dtype) + w_input_item = tf.get_variable( + "w_input_item", [item_embed_size, size], dtype=dtype) user_embed_transform = tf.matmul(user_embed, w_input_user) for i in xrange(buckets[-1]): name = "input{}".format(i) item_embed, _ = self.embeddingAttribute.get_batch_item(name, - self.batch_size, concat = True, no_attribute=self.no_input_item_feature) + self.batch_size, concat=True, no_attribute=self.no_input_item_feature) item_embed_transform = tf.matmul(item_embed, w_input_item) input_embed = user_embed_transform + item_embed_transform self.inputs.append(input_embed) else: - user_embed, _ = self.embeddingAttribute.get_batch_user(1.0,concat = False, no_id = no_user_id) - + user_embed, _ = self.embeddingAttribute.get_batch_user( + 1.0, concat=False, no_id=no_user_id) + for i in xrange(buckets[-1]): name = "input{}".format(i) item_embed, _ = self.embeddingAttribute.get_batch_item(name, - self.batch_size, concat = False, no_attribute=self.no_input_item_feature) + self.batch_size, concat=False, no_attribute=self.no_input_item_feature) item_embed = tf.reduce_mean(item_embed, 0) input_embed = tf.reduce_mean([user_embed, item_embed], 0) self.inputs.append(input_embed) - - self.outputs, self.losses, self.outputs_full, self.losses_full, self.topk_values, self.topk_indexes = self.model_with_buckets(self.inputs,self.targets, self.target_weights, self.buckets, single_cell,self.embeddingAttribute, dtype, devices = devices) + + self.outputs, self.losses, self.outputs_full, self.losses_full, self.topk_values, self.topk_indexes = self.model_with_buckets( + self.inputs, self.targets, self.target_weights, self.buckets, single_cell, self.embeddingAttribute, dtype, devices=devices) # for warp if self.loss in ["warp", "mw"]: - self.set_mask, self.reset_mask = self.embeddingAttribute.get_warp_mask(device = self.devices[2]) + self.set_mask, self.reset_mask = self.embeddingAttribute.get_warp_mask( + device=self.devices[2]) - #with tf.device(devices[0]): + # with tf.device(devices[0]): # train with tf.device(devices[0]): params = tf.trainable_variables() @@ -176,19 +185,21 @@ def __init__(self, opt = tf.train.GradientDescentOptimizer(self.learning_rate) for b in xrange(len(buckets)): - gradients = tf.gradients(self.losses[b], params, colocate_gradients_with_ops=True) - clipped_gradients, norm = tf.clip_by_global_norm(gradients, max_gradient_norm) + gradients = tf.gradients( + self.losses[b], params, colocate_gradients_with_ops=True) + clipped_gradients, norm = tf.clip_by_global_norm( + gradients, max_gradient_norm) self.gradient_norms.append(norm) - self.updates.append(opt.apply_gradients(zip(clipped_gradients, params), global_step=self.global_step)) + self.updates.append(opt.apply_gradients( + zip(clipped_gradients, params), global_step=self.global_step)) self.saver = tf.train.Saver(tf.global_variables()) - - def init_beam_decoder(self,beam_size=10, max_steps = 30): + def init_beam_decoder(self, beam_size=10, max_steps=30): # a non bucket design - # - # how to feed in: + # + # how to feed in: # user_history = [1,2,3,4] # inputs = [GO, 1, 2, 3], sequene_length = [4-1] @@ -206,78 +217,91 @@ def init_beam_decoder(self,beam_size=10, max_steps = 30): # two variable: before_state, after_state for i, state_tuple in enumerate(init_state): - cb = tf.get_variable("before_c_{}".format(i), shape, initializer=tf.constant_initializer(0.0), trainable = False) - hb = tf.get_variable("before_h_{}".format(i), shape, initializer=tf.constant_initializer(0.0), trainable = False) - sb = tf.contrib.rnn.core_rnn_cell.LSTMStateTuple(cb,hb) - ca = tf.get_variable("after_c_{}".format(i), shape, initializer=tf.constant_initializer(0.0), trainable = False) - ha = tf.get_variable("after_h_{}".format(i), shape, initializer=tf.constant_initializer(0.0), trainable = False) - sa = tf.contrib.rnn.core_rnn_cell.LSTMStateTuple(ca,ha) + cb = tf.get_variable("before_c_{}".format( + i), shape, initializer=tf.constant_initializer(0.0), trainable=False) + hb = tf.get_variable("before_h_{}".format( + i), shape, initializer=tf.constant_initializer(0.0), trainable=False) + sb = tf.contrib.rnn.LSTMStateTuple(cb, hb) + ca = tf.get_variable("after_c_{}".format( + i), shape, initializer=tf.constant_initializer(0.0), trainable=False) + ha = tf.get_variable("after_h_{}".format( + i), shape, initializer=tf.constant_initializer(0.0), trainable=False) + sa = tf.contrib.rnn.core_rnn_cell.LSTMStateTuple(ca, ha) self.before_state.append(sb) - self.after_state.append(sa) - - # a new place holder for sequence_length - self.sequence_length = tf.placeholder(tf.int32, shape=[1], name = "sequence_length") - - # the final_state after processing the start state - with tf.variable_scope("",reuse=True): - _, self.beam_final_state = tf.contrib.rnn.static_rnn(self.single_cell,self.inputs,initial_state = init_state, sequence_length = self.sequence_length) - + self.after_state.append(sa) + + # a new place holder for sequence_length + self.sequence_length = tf.placeholder( + tf.int32, shape=[1], name="sequence_length") + + # the final_state after processing the start state + with tf.variable_scope("", reuse=True): + _, self.beam_final_state = tf.contrib.rnn.static_rnn( + self.single_cell, self.inputs, initial_state=init_state, sequence_length=self.sequence_length) + with tf.variable_scope("beam_search"): # copy the final_state to before_state - self.final2before_ops = [] # an operation sequence + self.final2before_ops = [] # an operation sequence for i in xrange(len(self.before_state)): final_c = self.beam_final_state[i].c final_h = self.beam_final_state[i].h - final_c_expand = tf.nn.embedding_lookup(final_c,[0] * self.beam_size) - final_h_expand = tf.nn.embedding_lookup(final_h,[0] * self.beam_size) + final_c_expand = tf.nn.embedding_lookup( + final_c, [0] * self.beam_size) + final_h_expand = tf.nn.embedding_lookup( + final_h, [0] * self.beam_size) copy_c = self.before_state[i].c.assign(final_c_expand) copy_h = self.before_state[i].h.assign(final_h_expand) self.final2before_ops.append(copy_c) self.final2before_ops.append(copy_h) # operation: copy after_state to before_state according to a ma - self.beam_parent = tf.placeholder(tf.int32, shape=[self.beam_size], name = "beam_parent") - self.after2before_ops = [] # an operation sequence + self.beam_parent = tf.placeholder( + tf.int32, shape=[self.beam_size], name="beam_parent") + self.after2before_ops = [] # an operation sequence for i in xrange(len(self.before_state)): after_c = self.after_state[i].c after_h = self.after_state[i].h - after_c_expand = tf.nn.embedding_lookup(after_c,self.beam_parent) - after_h_expand = tf.nn.embedding_lookup(after_h,self.beam_parent) + after_c_expand = tf.nn.embedding_lookup( + after_c, self.beam_parent) + after_h_expand = tf.nn.embedding_lookup( + after_h, self.beam_parent) copy_c = self.before_state[i].c.assign(after_c_expand) copy_h = self.before_state[i].h.assign(after_h_expand) self.after2before_ops.append(copy_c) self.after2before_ops.append(copy_h) - - # operation: one step RNN - with tf.variable_scope("",reuse=True): - self.beam_step_outputs, self.beam_step_state = tf.contrib.rnn.static_rnn(self.single_cell,self.beam_step_inputs,initial_state = self.before_state) + # operation: one step RNN + with tf.variable_scope("", reuse=True): + self.beam_step_outputs, self.beam_step_state = tf.contrib.rnn.static_rnn( + self.single_cell, self.beam_step_inputs, initial_state=self.before_state) with tf.variable_scope("beam_search"): # operate: copy beam_step_state to after_state - self.beam2after_ops = [] # an operation sequence + self.beam2after_ops = [] # an operation sequence for i in xrange(len(self.after_state)): - copy_c = self.after_state[i].c.assign(self.beam_step_state[i].c) - copy_h = self.after_state[i].h.assign(self.beam_step_state[i].h) + copy_c = self.after_state[i].c.assign( + self.beam_step_state[i].c) + copy_h = self.after_state[i].h.assign( + self.beam_step_state[i].h) self.beam2after_ops.append(copy_c) self.beam2after_ops.append(copy_h) - def show_before_state(self): for i in xrange(self.before_state): print(self.before_state[i].c.eval()) print(self.before_state[i].h.eval()) - def beam_step(self, session, index = 0, beam_input = None, user_input=None, item_inputs=None,sequence_length = None, bucket_id = 0): + def beam_step(self, session, index=0, beam_input=None, user_input=None, item_inputs=None, sequence_length=None, bucket_id=0): if index == 0: length = self.buckets[bucket_id] - - input_feed = {} - (update_sampled, input_feed_sampled, input_feed_warp) = self.embeddingAttribute.add_input(input_feed, user_input, item_inputs, forward_only = True, recommend = True) + + input_feed = {} + (update_sampled, input_feed_sampled, input_feed_warp) = self.embeddingAttribute.add_input( + input_feed, user_input, item_inputs, forward_only=True, recommend=True) input_feed[self.sequence_length.name] = sequence_length - + output_feed = [self.final2before.ops] - + self.show_before_state() _ = session.run(output_feed, input_feed) self.show_before_state() @@ -285,9 +309,8 @@ def beam_step(self, session, index = 0, beam_input = None, user_input=None, item else: pass - - def step(self,session, user_input, item_inputs, targets, target_weights, - bucket_id, item_sampled=None, item_sampled_id2idx = None, forward_only = False, recommend = False): + def step(self, session, user_input, item_inputs, targets, target_weights, + bucket_id, item_sampled=None, item_sampled_id2idx=None, forward_only=False, recommend=False): length = self.buckets[bucket_id] @@ -299,10 +322,11 @@ def step(self,session, user_input, item_inputs, targets, target_weights, if self.loss in ['mw', 'ce']: input_feed[self.target_ids[l].name] = targets[l] - (update_sampled, input_feed_sampled, input_feed_warp) = self.embeddingAttribute.add_input(input_feed, user_input, item_inputs, forward_only = forward_only, recommend = recommend, loss = self.loss, item_sampled_id2idx=item_sampled_id2idx) + (update_sampled, input_feed_sampled, input_feed_warp) = self.embeddingAttribute.add_input(input_feed, user_input, + item_inputs, forward_only=forward_only, recommend=recommend, loss=self.loss, item_sampled_id2idx=item_sampled_id2idx) if self.loss in ["warp", "mw"]: session.run(self.set_mask[self.loss], input_feed_warp) - + if item_sampled is not None and self.loss in ['mw', 'mce']: session.run(update_sampled, input_feed_sampled) @@ -311,19 +335,21 @@ def step(self,session, user_input, item_inputs, targets, target_weights, output_feed = [self.losses_full[bucket_id]] else: output_feed = [self.losses[bucket_id]] - output_feed += [self.updates[bucket_id], self.gradient_norms[bucket_id]] + output_feed += [self.updates[bucket_id], + self.gradient_norms[bucket_id]] if self.loss in ["warp", "mw"]: session.run(self.set_mask[self.loss], input_feed_warp) - outputs = session.run(output_feed, input_feed, options = self.run_options, run_metadata = self.run_metadata) + outputs = session.run( + output_feed, input_feed, options=self.run_options, run_metadata=self.run_metadata) if self.loss in ["warp", "mw"]: session.run(self.reset_mask[self.loss], input_feed_warp) return outputs[0] - - def step_recommend(self,session, user_input, item_inputs, positions, bucket_id): + + def step_recommend(self, session, user_input, item_inputs, positions, bucket_id): length = self.buckets[bucket_id] if bucket_id == 0: pre_length = 0 @@ -332,31 +358,33 @@ def step_recommend(self,session, user_input, item_inputs, positions, bucket_id): input_feed = {} - (update_sampled, input_feed_sampled, input_feed_warp) = self.embeddingAttribute.add_input(input_feed, user_input, item_inputs, forward_only = True, recommend = True, loss = self.loss) + (update_sampled, input_feed_sampled, input_feed_warp) = self.embeddingAttribute.add_input( + input_feed, user_input, item_inputs, forward_only=True, recommend=True, loss=self.loss) # output_feed output_feed = {} - for pos in range(pre_length,length): - output_feed[pos] = [self.topk_values[bucket_id][pos], self.topk_indexes[bucket_id][pos]] + for pos in range(pre_length, length): + output_feed[pos] = [self.topk_values[bucket_id] + [pos], self.topk_indexes[bucket_id][pos]] + + outputs = session.run( + output_feed, input_feed, options=self.run_options, run_metadata=self.run_metadata) - outputs = session.run(output_feed, input_feed, options = self.run_options, run_metadata = self.run_metadata) - # results = [(uid, [value], [index])] results = [] for i, pos in enumerate(positions): uid = user_input[i] - values = outputs[pos][0][i,:] - indexes = outputs[pos][1][i,:] - results.append((uid,values,indexes)) + values = outputs[pos][0][i, :] + indexes = outputs[pos][1][i, :] + results.append((uid, values, indexes)) return results - - def get_batch(self, data_set, bucket_id, start_id = None): + def get_batch(self, data_set, bucket_id, start_id=None): length = self.buckets[bucket_id] - users, item_inputs,item_outputs, weights = [], [], [], [] + users, item_inputs, item_outputs, weights = [], [], [], [] for i in xrange(self.batch_size): if start_id == None: @@ -366,8 +394,8 @@ def get_batch(self, data_set, bucket_id, start_id = None): user, item_seq = data_set[bucket_id][start_id + i] else: user = self.USER_PAD_ID - item_seq = [] - + item_seq = [] + pad_seq = [self.PAD_ID] * (length - len(item_seq)) if len(item_seq) == 0: item_input_seq = [self.START_ID] + pad_seq[1:] @@ -380,7 +408,7 @@ def get_batch(self, data_set, bucket_id, start_id = None): item_inputs.append(item_input_seq) item_outputs.append(item_output_seq) weights.append(target_weight) - + # Now we create batch-major vectors from the data selected above. def batch_major(l): output = [] @@ -390,25 +418,23 @@ def batch_major(l): temp.append(l[j][i]) output.append(temp) return output - + batch_user = users batch_item_inputs = batch_major(item_inputs) batch_item_outputs = batch_major(item_outputs) batch_weights = batch_major(weights) - + finished = False if start_id != None and start_id + self.batch_size >= len(data_set[bucket_id]): finished = True - return batch_user, batch_item_inputs, batch_item_outputs, batch_weights, finished - - def get_batch_recommend(self, data_set, bucket_id, start_id = None): + def get_batch_recommend(self, data_set, bucket_id, start_id=None): length = self.buckets[bucket_id] - + users, item_inputs, positions, valids = [], [], [], [] - + for i in xrange(self.batch_size): if start_id == None: user, item_seq = random.choice(data_set[bucket_id]) @@ -421,17 +447,17 @@ def get_batch_recommend(self, data_set, bucket_id, start_id = None): position = len(item_seq) - 1 else: user = self.USER_PAD_ID - item_seq = [] + item_seq = [] valid = 0 - position = length-1 - + position = length - 1 + pad_seq = [self.PAD_ID] * (length - len(item_seq)) item_input_seq = item_seq + pad_seq valids.append(valid) users.append(user) positions.append(position) item_inputs.append(item_input_seq) - + # Now we create batch-major vectors from the data selected above. def batch_major(l): output = [] @@ -441,19 +467,18 @@ def batch_major(l): temp.append(l[j][i]) output.append(temp) return output - + batch_item_inputs = batch_major(item_inputs) - + finished = False if start_id != None and start_id + self.batch_size >= len(data_set[bucket_id]): finished = True return users, batch_item_inputs, positions, valids, finished - def model_with_buckets(self, inputs, targets, weights, buckets, cell, embeddingAttribute, dtype, - per_example_loss=False, name=None, devices = None): + per_example_loss=False, name=None, devices=None): all_inputs = inputs + targets + weights losses = [] @@ -462,26 +487,30 @@ def model_with_buckets(self, inputs, targets, weights, outputs_full = [] topk_values = [] topk_indexes = [] - softmax_loss_function = lambda x,y: self.embeddingAttribute.compute_loss(x ,y, loss=self.loss, device = devices[2]) + + def softmax_loss_function(x, y): return self.embeddingAttribute.compute_loss( + x, y, loss=self.loss, device=devices[2]) with tf.device(devices[1]): init_state = cell.zero_state(self.batch_size, dtype) - with tf.name_scope(name, "model_with_buckets", all_inputs): - # with ops.op_scope(all_inputs, name, "model_with_buckets"): + # with ops.op_scope(all_inputs, name, "model_with_buckets"): for j, bucket in enumerate(buckets): - with variable_scope.variable_scope(variable_scope.get_variable_scope(),reuse=True if j > 0 else None): - + with variable_scope.variable_scope(variable_scope.get_variable_scope(), reuse=True if j > 0 else None): + with tf.device(devices[1]): - bucket_outputs, _ = tf.contrib.rnn.static_rnn(cell,inputs[:bucket],initial_state = init_state) + bucket_outputs, _ = tf.contrib.rnn.static_rnn( + cell, inputs[:bucket], initial_state=init_state) with tf.device(devices[2]): - bucket_outputs_full = [self.embeddingAttribute.get_prediction(x, device=devices[2], output_feat=self.output_feat) for x in bucket_outputs] - + bucket_outputs_full = [self.embeddingAttribute.get_prediction( + x, device=devices[2], output_feat=self.output_feat) for x in bucket_outputs] + if self.loss in ['warp', 'ce']: t = targets - bucket_outputs = [self.embeddingAttribute.get_prediction(x, device=devices[2], output_feat=self.output_feat) for x in bucket_outputs] + bucket_outputs = [self.embeddingAttribute.get_prediction( + x, device=devices[2], output_feat=self.output_feat) for x in bucket_outputs] elif self.loss in ['mw']: # bucket_outputs0 = [self.embeddingAttribute.get_prediction(x, pool='sampled', device=devices[2]) for x in bucket_outputs] t, bucket_outputs0 = [], [] @@ -489,8 +518,10 @@ def model_with_buckets(self, inputs, targets, weights, for i in xrange(len(bucket_outputs)): x = bucket_outputs[i] ids = self.target_ids[i] - bucket_outputs0.append(self.embeddingAttribute.get_prediction(x, pool='sampled', device=devices[2], output_feat=self.output_feat)) - t.append(self.embeddingAttribute.get_target_score(x, ids, device=devices[2])) + bucket_outputs0.append(self.embeddingAttribute.get_prediction( + x, pool='sampled', device=devices[2], output_feat=self.output_feat)) + t.append(self.embeddingAttribute.get_target_score( + x, ids, device=devices[2])) bucket_outputs = bucket_outputs0 outputs.append(bucket_outputs) @@ -498,107 +529,109 @@ def model_with_buckets(self, inputs, targets, weights, if per_example_loss: losses.append(sequence_loss_by_example( - outputs[-1], t[:bucket], weights[:bucket], - softmax_loss_function=softmax_loss_function)) + outputs[-1], t[:bucket], weights[:bucket], + softmax_loss_function=softmax_loss_function)) losses_full.append(sequence_loss_by_example( - outputs_full[-1], t[:bucket], weights[:bucket], - softmax_loss_function=softmax_loss_function)) + outputs_full[-1], t[:bucket], weights[:bucket], + softmax_loss_function=softmax_loss_function)) else: losses.append(sequence_loss( - outputs[-1], t[:bucket], weights[:bucket], - softmax_loss_function=softmax_loss_function)) + outputs[-1], t[:bucket], weights[:bucket], + softmax_loss_function=softmax_loss_function)) losses_full.append(sequence_loss( - outputs_full[-1], t[:bucket], weights[:bucket],softmax_loss_function=softmax_loss_function)) + outputs_full[-1], t[:bucket], weights[:bucket], softmax_loss_function=softmax_loss_function)) topk_value, topk_index = [], [] for full_logits in outputs_full[-1]: - value, index = tf.nn.top_k(tf.nn.softmax(full_logits), self.topk_n, sorted = True) + value, index = tf.nn.top_k(tf.nn.softmax( + full_logits), self.topk_n, sorted=True) topk_value.append(value) topk_index.append(index) topk_values.append(topk_value) topk_indexes.append(topk_index) - + return outputs, losses, outputs_full, losses_full, topk_values, topk_indexes def sequence_loss_by_example(logits, targets, weights, average_across_timesteps=True, softmax_loss_function=None, name=None): - """Weighted cross-entropy loss for a sequence of logits (per example). - - Args: - logits: List of 2D Tensors of shape [batch_size x num_decoder_symbols]. - targets: List of 1D batch-sized int32 Tensors of the same length as logits. - weights: List of 1D batch-sized float-Tensors of the same length as logits. - average_across_timesteps: If set, divide the returned cost by the total - label weight. - softmax_loss_function: Function (inputs-batch, labels-batch) -> loss-batch - to be used instead of the standard softmax (the default if this is None). - name: Optional name for this operation, default: "sequence_loss_by_example". - - Returns: - 1D batch-sized float Tensor: The log-perplexity for each sequence. - - Raises: - ValueError: If len(logits) is different from len(targets) or len(weights). - """ - if len(targets) != len(logits) or len(weights) != len(logits): - raise ValueError("Lengths of logits, weights, and targets must be the same " - "%d, %d, %d." % (len(logits), len(weights), len(targets))) - with tf.name_scope(name, "sequence_loss_by_example", logits + targets + weights): - # with ops.op_scope(logits + targets + weights,name, "sequence_loss_by_example"): - log_perp_list = [] - for logit, target, weight in zip(logits, targets, weights): - if softmax_loss_function is None: - # TODO(irving,ebrevdo): This reshape is needed because - # sequence_loss_by_example is called with scalars sometimes, which - # violates our general scalar strictness policy. - target = array_ops.reshape(target, [-1]) - crossent = nn_ops.sparse_softmax_cross_entropy_with_logits( - logit, target) - else: - crossent = softmax_loss_function(logit, target) - log_perp_list.append(crossent * weight) - - log_perps = math_ops.add_n(log_perp_list) - if average_across_timesteps: - total_size = math_ops.add_n(weights) - total_size += 1e-12 # Just to avoid division by 0 for all-0 weights. - log_perps /= total_size - return log_perps + """Weighted cross-entropy loss for a sequence of logits (per example). + + Args: + logits: List of 2D Tensors of shape [batch_size x num_decoder_symbols]. + targets: List of 1D batch-sized int32 Tensors of the same length as logits. + weights: List of 1D batch-sized float-Tensors of the same length as logits. + average_across_timesteps: If set, divide the returned cost by the total + label weight. + softmax_loss_function: Function (inputs-batch, labels-batch) -> loss-batch + to be used instead of the standard softmax (the default if this is None). + name: Optional name for this operation, default: "sequence_loss_by_example". + + Returns: + 1D batch-sized float Tensor: The log-perplexity for each sequence. + + Raises: + ValueError: If len(logits) is different from len(targets) or len(weights). + """ + if len(targets) != len(logits) or len(weights) != len(logits): + raise ValueError("Lengths of logits, weights, and targets must be the same " + "%d, %d, %d." % (len(logits), len(weights), len(targets))) + with tf.name_scope(name, "sequence_loss_by_example", logits + targets + weights): + # with ops.op_scope(logits + targets + weights,name, "sequence_loss_by_example"): + log_perp_list = [] + for logit, target, weight in zip(logits, targets, weights): + if softmax_loss_function is None: + # TODO(irving,ebrevdo): This reshape is needed because + # sequence_loss_by_example is called with scalars sometimes, which + # violates our general scalar strictness policy. + target = array_ops.reshape(target, [-1]) + crossent = nn_ops.sparse_softmax_cross_entropy_with_logits( + logit, target) + else: + crossent = softmax_loss_function(logit, target) + log_perp_list.append(crossent * weight) + + log_perps = math_ops.add_n(log_perp_list) + if average_across_timesteps: + total_size = math_ops.add_n(weights) + # Just to avoid division by 0 for all-0 weights. + total_size += 1e-12 + log_perps /= total_size + return log_perps def sequence_loss(logits, targets, weights, average_across_timesteps=True, average_across_batch=False, softmax_loss_function=None, name=None): - """Weighted cross-entropy loss for a sequence of logits, batch-collapsed. - - Args: - logits: List of 2D Tensors of shape [batch_size x num_decoder_symbols]. - targets: List of 1D batch-sized int32 Tensors of the same length as logits. - weights: List of 1D batch-sized float-Tensors of the same length as logits. - average_across_timesteps: If set, divide the returned cost by the total - label weight. - average_across_batch: If set, divide the returned cost by the batch size. - softmax_loss_function: Function (inputs-batch, labels-batch) -> loss-batch - to be used instead of the standard softmax (the default if this is None). - name: Optional name for this operation, defaults to "sequence_loss". - - Returns: - A scalar float Tensor: The average log-perplexity per symbol (weighted). - - Raises: - ValueError: If len(logits) is different from len(targets) or len(weights). - """ - - with tf.name_scope(name, "sequence_loss", logits + targets + weights): - # with ops.op_scope(logits + targets + weights, name, "sequence_loss"): - cost = math_ops.reduce_sum(sequence_loss_by_example( - logits, targets, weights, - average_across_timesteps=average_across_timesteps, - softmax_loss_function=softmax_loss_function)) - if average_across_batch: - total_size = tf.reduce_sum(tf.sign(weights[0])) - return cost / math_ops.cast(total_size, cost.dtype) - else: - return cost + """Weighted cross-entropy loss for a sequence of logits, batch-collapsed. + + Args: + logits: List of 2D Tensors of shape [batch_size x num_decoder_symbols]. + targets: List of 1D batch-sized int32 Tensors of the same length as logits. + weights: List of 1D batch-sized float-Tensors of the same length as logits. + average_across_timesteps: If set, divide the returned cost by the total + label weight. + average_across_batch: If set, divide the returned cost by the batch size. + softmax_loss_function: Function (inputs-batch, labels-batch) -> loss-batch + to be used instead of the standard softmax (the default if this is None). + name: Optional name for this operation, defaults to "sequence_loss". + + Returns: + A scalar float Tensor: The average log-perplexity per symbol (weighted). + + Raises: + ValueError: If len(logits) is different from len(targets) or len(weights). + """ + + with tf.name_scope(name, "sequence_loss", logits + targets + weights): + # with ops.op_scope(logits + targets + weights, name, "sequence_loss"): + cost = math_ops.reduce_sum(sequence_loss_by_example( + logits, targets, weights, + average_across_timesteps=average_across_timesteps, + softmax_loss_function=softmax_loss_function)) + if average_across_batch: + total_size = tf.reduce_sum(tf.sign(weights[0])) + return cost / math_ops.cast(total_size, cost.dtype) + else: + return cost diff --git a/main.py b/main.py new file mode 100644 index 0000000..80528cb --- /dev/null +++ b/main.py @@ -0,0 +1,73 @@ +import os +import sys +import logging +import time +import datetime +import psutil +from logging import FileHandler + +sys.path.insert(0, './utils') +sys.path.insert(0, './hmf') +sys.path.insert(0, './lstm') +sys.path.insert(0, './attributes') + +from hmf_class import hmf +from lstm_class import lstm +from load_config import config_ENV_setup +import tensorflow as tf + + +def main(_): + # create directories if they don't exist already + # ---------------------------------------------- + if not os.path.exists('log'): + os.makedirs('log') + + # setup logging, create log file handler & formatter + # ---------------------------------------------- + logFileHandler = FileHandler( + filename='log/rec_pipeline_%s.log' % (time.strftime('%Y-%m-%d_%H-%M-%S'))) + logFormatter = logging.Formatter( + "'%(asctime)s - %(name)s - %(levelname)s - %(message)s'") + # create and configure logger obj + logger = logging.getLogger('main') + logger.setLevel(logging.DEBUG) + logFileHandler.setFormatter(logFormatter) + logger.addHandler(logFileHandler) + + # log initial CPU and RAM status + logger.info('starting main process') + logger.info('cpu at %.2f%%' % (psutil.cpu_percent())) + logger.info('memory state: %s' % str(psutil.virtual_memory())) + + # load config files + config_file_path_dict = {'user_item_recommender_config': 'config/user_item_recommender_config.json' + } + configs = config_ENV_setup(config_file_path_dict) + + # build recommendations + + if configs['user_item_recommender_config']['model'] == "hmf": + hmf_model = hmf(configs['user_item_recommender_config']) + + if configs['user_item_recommender_config']['recommend']: + hmf_model.compute_scores() + else: + hmf_model.train() + else: + lstm_model = lstm(configs['user_item_recommender_config']) + + if configs['user_item_recommender_config']['beam_search']: + lstm_model.beam_search() + + if configs['user_item_recommender_config']['ensemble']: + lstm_model.ensemble() + + if configs['user_item_recommender_config']['recommend']: + lstm_model.recommend() + else: + lstm_model.train() + + +if __name__ == "__main__": + tf.app.run() diff --git a/utils/evaluate.py b/utils/evaluate.py index 033bd84..971edad 100644 --- a/utils/evaluate.py +++ b/utils/evaluate.py @@ -6,18 +6,18 @@ class Evaluation(object): - def __init__(self, raw_data_dir, test=False): + def __init__(self, raw_data_dir, test=False, config=None, mylog=None): res_filename = 'res_T_test.csv' if test else 'res_T.csv' if not isfile(join(raw_data_dir, res_filename)): print('eval file does not exist. creating ... ') - self.create_eval_file(raw_data_dir) + self.create_eval_file(raw_data_dir, config, mylog) self.T = load_submit(res_filename, submit_dir=raw_data_dir) hist_filename = 'historical_train_test.csv' if test else 'historical_train.csv' self.hist = load_submit(hist_filename, submit_dir=raw_data_dir) - - self.Iatt, _, self.Iid2ind = load_items(raw_data_dir) - self.Uatt, _, self.Uid2ind = load_users(raw_data_dir) + + self.Iatt, _, self.Iid2ind = load_items(raw_data_dir, config, mylog) + self.Uatt, _, self.Uid2ind = load_users(raw_data_dir, config, mylog) self.Uids = self.get_uids() self.Uinds = [self.Uid2ind[v] for v in self.Uids] @@ -34,7 +34,7 @@ def get_uinds(self): return self.Uinds def eval_on(self, rec): - + self.res = rec tmp_filename = 'rec' @@ -46,7 +46,7 @@ def eval_on(self, rec): # for k, v in rec.items(): # rec[k] = v.split(',') - r_ex = self.combine_sub(self.hist, rec, 1, users = self.Uatt) + r_ex = self.combine_sub(self.hist, rec, 1, users=self.Uatt) result = metrics(rec, self.T) l = result.values() @@ -60,55 +60,56 @@ def get_scores(self): def set_uinds(self, uinds): self.Uinds = uinds - - def create_eval_file(self, raw_data): + + def create_eval_file(self, raw_data, config, mylog): DIR = raw_data - interact, names = load_interactions(data_dir=DIR) + interact, names = load_interactions( + data_dir=DIR, config=config, mylog=mylog) interact_tr, interact_va, interact_te = interact - data_tr = zip(list(interact_tr[:, 0]), list(interact_tr[:, 1]), list(interact_tr[:, 2])) - data_va = zip(list(interact_va[:, 0]), list(interact_va[:, 1]), list(interact_va[:, 2])) - data_te = zip(list(interact_te[:, 0]), list(interact_te[:, 1]), list(interact_te[:, 2])) + data_tr = zip(list(interact_tr[:, 0]), list( + interact_tr[:, 1]), list(interact_tr[:, 2])) + data_va = zip(list(interact_va[:, 0]), list( + interact_va[:, 1]), list(interact_va[:, 2])) + data_te = zip(list(interact_te[:, 0]), list( + interact_te[:, 1]), list(interact_te[:, 2])) seq_tr, seq_va, seq_te = {}, {}, {} - for u, i , t in data_tr: + for u, i, t in data_tr: if u not in seq_tr: seq_tr[u] = [] seq_tr[u].append((i, t)) - for u, i , t in data_va: + for u, i, t in data_va: if u not in seq_va: seq_va[u] = [] - seq_va[u].append((i,t)) + seq_va[u].append((i, t)) - for u, i , t in data_te: + for u, i, t in data_te: if u not in seq_te: seq_te[u] = [] seq_te[u].append(i) for u, v in seq_tr.items(): - l = sorted(v, key = lambda x:x[1], reverse=True) + l = sorted(v, key=lambda x: x[1], reverse=True) seq_tr[u] = ','.join([str(p[0]) for p in l]) for u, v in seq_va.items(): - l = sorted(v, key = lambda x:x[1], reverse=True) + l = sorted(v, key=lambda x: x[1], reverse=True) seq_va[u] = ','.join(str(p[0]) for p in l) for u, v in seq_te.items(): seq_te[u] = ','.join(str(p) for p in seq_te[u]) - - format_submit(seq_tr, 'historical_train.csv', submit_dir=DIR) + format_submit(seq_tr, 'historical_train.csv', submit_dir=DIR) format_submit(seq_va, 'res_T.csv', submit_dir=DIR) format_submit(seq_te, 'res_T_test.csv', submit_dir=DIR) seq_va_tr = seq_va for u in seq_tr: if u in seq_va: - seq_va_tr[u] = seq_va[u] +','+ seq_tr[u] + seq_va_tr[u] = seq_va[u] + ',' + seq_tr[u] format_submit(seq_va_tr, 'historical_train_test.csv', submit_dir=DIR) return - - diff --git a/utils/fetch_data_from_s3.py b/utils/fetch_data_from_s3.py new file mode 100644 index 0000000..8cb4bbd --- /dev/null +++ b/utils/fetch_data_from_s3.py @@ -0,0 +1,120 @@ +import time +import datetime +import logging +import os.path +import pickle +import sys +import pandas as pd +from io import BytesIO +from s3_utility import Connect2S3 + + +def ReadFromS3(mys3bucket, file_prefix, logger, filename=None): + """ Read data file off of specified S3 bucket. + If filename is None, then download the file following the "file_prefix" + with the latest timestamp. Else, download the file specified by "filename" + + Args: + mys3bucket : name of the AWS S3 bucket to access + file_prefix : source directory + file prefix used to filter + logger : for logging + filename : name of file to read (default is None) + + Returns: + data : raw bytes data from the S3 source data file + """ + # start timer! + start = time.time() + + # connect to source s3 bucket + bucket = Connect2S3(mys3bucket).Bucket(mys3bucket) + + # if filename is not specified + if filename is None: + + # retrieve max timestamp on all obj following "file_prefix" + timestamps = [] + + # iterate over all objects within the bucket satisfying the prefix + for obj in bucket.objects.filter(Prefix=file_prefix): + try: + # collect the timestamps + ts = datetime.datetime.strptime( + str(obj.key)[-23:-4], '%Y-%m-%d-%H-%M-%S') + timestamps.append(ts) + except: + pass + + # error if no objects within the bucket satify the prefix filter criteria + try: + # get max timestamp + max_timestamp = datetime.datetime.strftime( + max(timestamps), '%Y-%m-%d-%H-%M-%S') + except: + logger("no file with prefix \"%s\" was found." % + (file_prefix)) + sys.exit() + + # try loading the obj following "file_prefix" + "max_timestamp" + ".csv" name + for obj in bucket.objects.filter(Prefix=file_prefix + max_timestamp + '.csv'): + # read source file + data = obj.get()['Body'].read() + logger("reading file: " + file_prefix + + max_timestamp + '.csv') + logger("read file took %.2f seconds." % (time.time() - start)) + try: + return data + except: + logger("no file: \"%s\" was found." % + (file_prefix + max_timestamp + '.csv')) + sys.exit() + + # if filename is specified + else: + # find source file obj within the s3 bucket using the specific filename + # for filter criteria + for obj in bucket.objects.filter(Prefix=filename): + # read source file + data = obj.get()['Body'].read() + logger("reading file: %s" % filename) + logger("read file took %d seconds." % (time.time() - start)) + + try: + return data + except: + logger.error("target file \"%s\" does not exist." % (filename)) + sys.exit() + + +def fetch_data_from_s3(mys3bucket, file_prefix, filename, logger): + """ Intermediary wrapper function that calls "ReadFromS3" to read static + data file off of AWS S3. Converts the raw bytes into dataframe obj and + perform formating (rename columsn & typecasting) to downstream processes. + + Args: + mys3bucket : name of AWS S3 bucket to access + file_prefix : file-prefix to perform filter to locate the + desired data file in the S3 bucket + filename : name of data file in the AWS S3 bucket + column_names : list of strings specifying data column names + logger : for logging + + Return: + df : processed dataframe obj containing source data + """ + + start = time.time() + + # read source data off of s3 bucket + data = ReadFromS3(mys3bucket, file_prefix, logger, filename) + + try: + # convert bytes data into pandas dataframe + df = pd.read_csv(BytesIO(data), sep="\t", header=0) + logger("load data & convert to dataframe took %.2f seconds" % + (time.time() - start)) + return df + except: + logger('failed converting raw bytes to dataframe') + logger(sys.exc_info()) + sys.exit() diff --git a/utils/load_config.py b/utils/load_config.py new file mode 100644 index 0000000..de03a51 --- /dev/null +++ b/utils/load_config.py @@ -0,0 +1,87 @@ +import os +import sys +import json +import logging + + +def load_configurations(config_filepath): + """ wrapper function for setting up environment and configuration + related information. + + Args: + config_filepath : local filepath to the config file + Returns: + ENV : specifies which environment to work in + config : holds config info + """ + + logger = logging.getLogger('main.load_config.load_configurations') + + # load configuration files from local filepath + logger.info('loading %s' % config_filepath) + # get data source & model configuration settings + try: + with open(config_filepath) as f: + config = json.load(f) + except: + logger.info('error load config: %s' % config_filepath) + logger.error(sys.exc_info()) + sys.exit() + + return config + + +def get_ENV(): + """ reads and check the ENV parameter for validity. + + Args: None + Returns: + ENV : specifies the envrionment the script is to run on; + valid choices are "local", "development", "qa", + "staging", and "production". + """ + + logger = logging.getLogger('main.load_config.get_ENV') + + # loads the environment variable that is passed to the shell script. + try: + ENV = os.environ['ENV'].lower() + except: + logger.info('no ENV value specified, default to \"local\"') + ENV = 'local' + return ENV + + if ENV in ['local', 'development', 'qa', 'staging', 'production']: + logger.info('using environment: %s' % ENV) + return ENV + else: + logger.error('invalid ENV specified') + sys.exit() + + +def config_ENV_setup(config_file_path_dict): + """ wrapper script to handle loading of necessary config files and + environments for the main program + + Args: + config_file_path_dict : dictionary where each key is the + name of the config file, + values are the local filepath for + the config file. + Returns: + config_dict : dictionary where each key is the name of + the config file, values are the json object + associated with the config. Also provides an + addition "ENV" key that specifies the + environment parameter. + """ + + config_dict = {} + + for config_name in config_file_path_dict.keys(): + + config_filepath = config_file_path_dict[config_name] + config_dict[config_name] = load_configurations(config_filepath) + config_dict[config_name]['environment'] = get_ENV() + + return config_dict diff --git a/utils/load_data.py b/utils/load_data.py index 0b50097..c1d55e6 100644 --- a/utils/load_data.py +++ b/utils/load_data.py @@ -1,96 +1,163 @@ from os.path import join, isfile import pandas as pd import numpy as np +from fetch_data_from_s3 import fetch_data_from_s3 + def build_index(values): - count, index = 0, {} - opt = 1 if values.shape[1] else 0 - for v in values: - if opt == 1: - index[v[0]] = count - elif opt == 0: - index[v] = count - count += 1 - return index - -def load_csv(filename, indexing=True, sep = '\t', header=0): - if not isfile(filename): - return [],None, None if indexing else [], None - data = pd.read_csv(filename, delimiter=sep, header=header) - columns = list(data.columns) - values = data.values - if indexing: - index = build_index(values) - return values, columns, index - else: - return values, columns + count, index = 0, {} + opt = 1 if values.shape[1] else 0 + for v in values: + if opt == 1: + index[v[0]] = count + elif opt == 0: + index[v] = count + count += 1 + return index + + +def load_csv(filename, config, mylog, indexing=True, sep='\t', header=0): + if config and config['source_data_from_S3']: + data = fetch_data_from_s3(mys3bucket=config['data_source_config']['mys3bucket'], + file_prefix=filename, + filename=None, + logger=mylog) + else: + if not isfile(filename): + return [], None, None if indexing else [], None + data = pd.read_csv(filename, delimiter=sep, header=header) + + columns = list(data.columns) + values = data.values + + if indexing: + index = build_index(values) + return values, columns, index + else: + return values, columns + def file_check(filename): - if not isfile(filename): - print("Error: user file {} does not exit!".format(filename)) - exit(1) - return - -def load_users(data_dir, sep='\t'): - filename = join(data_dir, 'u.csv') - file_check(filename) - users, attr_names, user_index = load_csv(filename) - filename = join(data_dir, 'u_attr.csv') - if isfile(filename): - vals, _ = load_csv(filename, False) - attr_types = vals.flatten().tolist() - else: - attr_types = [0] * len(attr_names) - return users, (attr_names, attr_types), user_index - -def load_items(data_dir, sep='\t'): - filename = join(data_dir, 'i.csv') - file_check(filename) - items, attr_names, item_index = load_csv(filename) - filename = join(data_dir, 'i_attr.csv') - if isfile(filename): - vals, _ = load_csv(filename, False) - attr_types = vals.flatten().tolist() - else: - attr_types = [0] * len(attr_names) - return items, (attr_names, attr_types), item_index - -def load_interactions(data_dir, sep='\t'): - filename0 = join(data_dir, 'obs_') - suffix = ['tr.csv', 'va.csv', 'te.csv'] - ints, names = [], [] - for s in suffix: - filename = filename0 + s - interact, name = load_csv(filename, False) - assert(interact.shape[1] >= 2) - if interact.shape[1] == 2: - l = interact.shape[0] - interact = np.append(interact, np.zeros((l, 1), dtype=int), 1) - ints.append(interact) - names.append(name) - return ints, names[0] - -def load_raw_data(data_dir, _submit=0): - users, u_attr, user_index = load_users(data_dir) - items, i_attr, item_index = load_items(data_dir) - ints, names = load_interactions(data_dir) - for v in ints: - for i in range(len(v)): - v[i][0] = user_index[v[i][0]] - v[i][1] = item_index[v[i][1]] - interact_tr, interact_va, interact_te = ints - - data_va, data_te = None, None - if _submit == 1: - interact_tr = np.append(interact_tr, interact_va, 0) - data_tr = zip(list(interact_tr[:, 0]), list(interact_tr[:, 1]), - list(interact_tr[:, 2])) - data_va = zip(list(interact_te[:, 0]), list(interact_te[:, 1]), - list(interact_te[:, 2])) - else: - data_tr = zip(list(interact_tr[:, 0]), list(interact_tr[:, 1]), - list(interact_tr[:, 2])) - data_va = zip(list(interact_va[:, 0]), list(interact_va[:, 1]), - list(interact_va[:, 2])) - return users, items, data_tr, data_va, u_attr, i_attr, user_index, item_index + if not isfile(filename): + print("Error: user file {} does not exit!".format(filename)) + exit(1) + return + + +def load_users(data_dir, config, mylog, sep='\t'): + if config and config['source_data_from_S3']: + filename = config['data_source_config']['src_directory'] + \ + config['environment'] + '/' + \ + config['data_source_config']['src_file_prefix'] + 'u_' + else: + filename = join(data_dir, 'u.csv') + file_check(filename) + + users, attr_names, user_index = load_csv(filename, config, mylog) + + if config and config['source_data_from_S3']: + filename = config['data_source_config']['src_directory'] + \ + config['environment'] + '/' + \ + config['data_source_config']['src_file_prefix'] + 'u_attr_' + + vals, _ = load_csv(filename, config, mylog, False) + attr_types = vals.flatten().tolist() + else: + filename = join(data_dir, 'u_attr.csv') + + if isfile(filename): + vals, _ = load_csv(filename, config, mylog, False) + attr_types = vals.flatten().tolist() + else: + attr_types = [0] * len(attr_names) + + return users, (attr_names, attr_types), user_index + + +def load_items(data_dir, config, mylog, sep='\t'): + if config and config['source_data_from_S3']: + filename = config['data_source_config']['src_directory'] + \ + config['environment'] + '/' + \ + config['data_source_config']['src_file_prefix'] + 'i_' + else: + filename = join(data_dir, 'i.csv') + file_check(filename) + + items, attr_names, item_index = load_csv(filename, config, mylog) + + if config and config['source_data_from_S3']: + filename = config['data_source_config']['src_directory'] + \ + config['environment'] + '/' + \ + config['data_source_config']['src_file_prefix'] + 'i_attr_' + + vals, _ = load_csv(filename, config, mylog, False) + attr_types = vals.flatten().tolist() + else: + filename = join(data_dir, 'i_attr.csv') + + if isfile(filename): + vals, _ = load_csv(filename, config, mylog, False) + attr_types = vals.flatten().tolist() + else: + attr_types = [0] * len(attr_names) + + return items, (attr_names, attr_types), item_index + + +def load_interactions(data_dir, config, mylog, sep='\t'): + if config and config['source_data_from_S3']: + filename0 = config['data_source_config']['src_directory'] + \ + config['environment'] + '/' + \ + config['data_source_config']['src_file_prefix'] + 'obs_' + + suffix = ['tr_', 'va_', 'te_'] + ints, names = [], [] + for s in suffix: + filename = filename0 + s + interact, name = load_csv(filename, config, mylog, False) + assert(interact.shape[1] >= 2) + if interact.shape[1] == 2: + l = interact.shape[0] + interact = np.append(interact, np.zeros((l, 1), dtype=int), 1) + ints.append(interact) + names.append(name) + else: + filename0 = join(data_dir, 'obs_') + + suffix = ['tr.csv', 'va.csv', 'te.csv'] + ints, names = [], [] + for s in suffix: + filename = filename0 + s + interact, name = load_csv(filename, config, mylog, False) + assert(interact.shape[1] >= 2) + if interact.shape[1] == 2: + l = interact.shape[0] + interact = np.append(interact, np.zeros((l, 1), dtype=int), 1) + ints.append(interact) + names.append(name) + return ints, names[0] + + +def load_raw_data(data_dir, _submit=0, config=None, mylog=None): + users, u_attr, user_index = load_users(data_dir, config, mylog) + items, i_attr, item_index = load_items(data_dir, config, mylog) + ints, names = load_interactions(data_dir, config, mylog) + for v in ints: + for i in range(len(v)): + v[i][0] = user_index[v[i][0]] + v[i][1] = item_index[v[i][1]] + interact_tr, interact_va, interact_te = ints + data_va, data_te = None, None + if _submit == 1: + interact_tr = np.append(interact_tr, interact_va, 0) + data_tr = zip(list(interact_tr[:, 0]), list(interact_tr[:, 1]), + list(interact_tr[:, 2])) + data_va = zip(list(interact_te[:, 0]), list(interact_te[:, 1]), + list(interact_te[:, 2])) + else: + data_tr = zip(list(interact_tr[:, 0]), list(interact_tr[:, 1]), + list(interact_tr[:, 2])) + data_va = zip(list(interact_va[:, 0]), list(interact_va[:, 1]), + list(interact_va[:, 2])) + return users, items, data_tr, data_va, u_attr, i_attr, user_index, item_index diff --git a/utils/s3_utility.py b/utils/s3_utility.py new file mode 100644 index 0000000..1955bb7 --- /dev/null +++ b/utils/s3_utility.py @@ -0,0 +1,96 @@ +import boto3 +import botocore +import time +import datetime + + +def Connect2S3(mys3bucket): + """ Checks the existence for AWS S3 bucket, if exists, + connect to S3. Returns the s3 connections obj. + If bucket does not exist, return None + Note: system should have AWS CLI installed and credential + set up properly through AWS CLI's "aws configuration" command. + See: http://boto3.readthedocs.io/en/latest/guide/quickstart.html#guide-quickstart + + Args: + mys3bucket: name for the S3 bucket to connect to + + Returns: + boto3.s3 connections obj + """ + + # instantiate s3 obj + s3 = boto3.resource('s3') + # attempt to locate mys3bucket + bucket = s3.Bucket(mys3bucket) + try: + s3.meta.client.head_bucket(Bucket=mys3bucket) + return s3 + except botocore.exceptions.ClientError as e: + # check if error is a 404 error, indicating bucket nonexistence + if e.response['Error']['Code']=='404': + print("[Error] Bucket \"%s\" does not exist." %mys3bucket) + return None + + +def DownloadFromS3(mys3bucket, src_dir, dest_dir, filename): + """ download specified file from specified AWS S3 bucket + and directory. + + Args: + mys3bucket: name of the AWS S3 bucket to access + src_dir: source directory location in the AWS S3 bucket + dest_dir: destination directory to download file to + filename: name of file to pull + + Returns: + status: boolean value for success(True)/fail(False) + """ + + # establish connection to s3 bucket + s3 = Connect2S3(mys3bucket) + # attempt to download file + try: + s3.meta.client.download_file(Bucket=mys3bucket, + Key=src_dir+filename, + Filename=dest_dir+filename) + return True + except AttributeError: + print("Could not connecto S3 bucket, check if bucket \"%s\" exists"%mys3bucket) + except botocore.exceptions.ClientError as e: + if e.response['Error']['Code']=='404': + print("[Error] Source \"%s\" or destination \"%s\" does not exist."%(src_dir+filename, dest_dir+filename)) + return False + + + +def UploadToS3(mys3bucket, src_dir, dest_dir, filename): + """ push specified dir/file up onto specified S3 bucket. + + Args: + mys3bucket: name of the AWS S3 bucket to access + src_dir: source directory location in the AWS S3 bucket + dest_dir: destination directory to download file to + filename: name of file to push + + Returns: + status: boolean value for success(True)/fail(False) + """ + # establish connection to s3 bucket + s3 = Connect2S3(mys3bucket) + # attempt to push file + try: + data = open(src_dir+filename, 'rb') + # attempt to upload to S3 + try: + s3.Bucket(mys3bucket).put_object(Key=dest_dir+filename, Body=data) + return True + except botocore.exceptions.ClientError as e: + if e.response['Error']['Code']=='404': + print("The bucket or directory does not exist.") + return False + except FileNotFoundError: + print("[Error] Source \"%s\" does not exist."%(src_dir+filename)) + return False + + \ No newline at end of file