diff --git a/rl4rs/env/base.py b/rl4rs/env/base.py index e589d9b..7187298 100644 --- a/rl4rs/env/base.py +++ b/rl4rs/env/base.py @@ -207,10 +207,10 @@ def __init__(self, recsim: RecSimBase): action_feature_size = len(self.obs[0]['action_mask']) self.observation_space = gym.spaces.Dict({ "action_mask": gym.spaces.Box(0, 1, shape=(action_feature_size,)), - "obs": gym.spaces.Box(-1000.0, 1000.0, shape=(len(self.obs[0]["obs"]),)) + "obs": gym.spaces.Box(-100000.0, 100000.0, shape=(len(self.obs[0]["obs"]),)) }) else: - self.observation_space = gym.spaces.Box(-1000.0, 1000.0, shape=(len(self.obs[0]),)) + self.observation_space = gym.spaces.Box(-100000.0, 100000.0, shape=(len(self.obs[0]),)) if self.config.get("support_conti_env", False): self.action_space = gym.spaces.Box(-1, 1, shape=(self.config['action_emb_size'],)) else: diff --git a/rl4rs/env/slate.py b/rl4rs/env/slate.py index c6f22e9..df8da47 100644 --- a/rl4rs/env/slate.py +++ b/rl4rs/env/slate.py @@ -19,6 +19,10 @@ def __init__(self, config, records): self.cur_steps = 0 iteminfo_file = config["iteminfo_file"] self.item_info_d, self.action_emb = self.get_iteminfo_from_file(iteminfo_file, self.action_size) + if config.get('support_onehot_action', False): + config['action_emb_size'] = self.action_size + self.action_emb_size = self.action_size + self.action_emb = np.eye(self.action_size) self.location_mask, self.special_items = self.get_mask_from_file(iteminfo_file, self.action_size) @staticmethod @@ -146,9 +150,15 @@ def get_violation(self): def offline_action(self): cur_step = self.cur_steps if cur_step < self.max_steps: - action = [int(x.split('@')[3].split(',')[cur_step]) for x in self.records] + if self.config.get("support_conti_env", False): + action = [self.action_emb[int(x.split('@')[3].split(',')[cur_step])] for x in self.records] + else: + action = [int(x.split('@')[3].split(',')[cur_step]) for x in self.records] else: - action = [0, ] * self.batch_size + if self.config.get("support_conti_env", False): + action = [self.action_emb[0], ] * self.batch_size + else: + action = [0, ] * self.batch_size return action @property @@ -290,8 +300,9 @@ def forward(self, model, samples): [samples.info[i].update({'click_p': probs[i]}) for i in range(len(probs))] reward = np.sum(price * probs, axis=1) - if self.config.get("support_rllib_mask", False) or \ - self.config.get("support_d3rl_mask", False): + if 1: + # if self.config.get("support_rllib_mask", False) or \ + # self.config.get("support_d3rl_mask", False): violation = samples.get_violation() reward[violation < 0.5] = 0 return reward.tolist() diff --git a/rl4rs/tool/__init__.py b/rl4rs/mdpchecker/__init__.py similarity index 100% rename from rl4rs/tool/__init__.py rename to rl4rs/mdpchecker/__init__.py diff --git a/rl4rs/tool/decoder.py b/rl4rs/mdpchecker/decoder.py similarity index 50% rename from rl4rs/tool/decoder.py rename to rl4rs/mdpchecker/decoder.py index b908afc..210bf0b 100644 --- a/rl4rs/tool/decoder.py +++ b/rl4rs/mdpchecker/decoder.py @@ -1,5 +1,6 @@ import numpy as np - +import time +import bottleneck def token_probs(model, batch_inputs, @@ -12,19 +13,41 @@ def decode_step(model, batch_outputs, candidates=None, beam_size=1): - predicts = model.predict([np.array(batch_inputs), np.array(batch_outputs)]) - tmp = [] - for i in range(len(predicts)): - probs = [(prob, j) for j, prob in enumerate(predicts[i][-1])] - if candidates is not None: - probs = [x if x[1] in candidates[i] else (0, x[1]) for x in probs] - probs.sort(reverse=True) - probs = probs[:beam_size] - tmp.append(probs) - return np.array(tmp) + a = time.time() + # predicts (batch_size, token_size) + predicts = model.predict([np.array(batch_inputs), np.array(batch_outputs)])[:,-1] + batch_size, token_size = predicts.shape + print('decode_step', time.time()-a) + # print('decode_step', time.time()-a) + # tmp = [] + # for i in range(len(predicts)): + # probs = [(prob, j) for j, prob in enumerate(predicts[i])] + # if candidates is not None: + # probs = [x if x[1] in candidates[i] else (0, x[1]) for x in probs] + # probs.sort(reverse=True) + # probs = probs[:beam_size] + # tmp.append(probs) + if candidates is not None: + mask = np.zeros(predicts.shape) + inds = np.array([[i,]*len(candidates[i]) for i in range(len(candidates))]).flatten() + mask[inds, candidates.flatten().astype(int)] = 1 + predicts = predicts * mask + # index = np.argpartition(-predicts, beam_size, axis=1)[:, :beam_size] + # probs = -np.partition(-predicts, beam_size, axis=1)[:, :beam_size] + index = bottleneck.argpartition(-predicts, beam_size, axis=1)[:, :beam_size] + probs = -bottleneck.partition(-predicts, beam_size, axis=1)[:, :beam_size] + inds = np.array([[i,]*len(probs[i]) for i in range(len(probs))]) + inds_sorted = np.argsort(-probs, axis=1)[:,:beam_size] + index = index[inds, inds_sorted] + probs = probs[inds, inds_sorted] + # print('decode_step', time.time()-a) + tmp2 = np.array(list(zip(probs.flatten(),index.flatten()))).reshape((batch_size, beam_size, 2)) + # tmp (batch_size, beam_size, 2) + # print(np.min(tmp==tmp2)) + return tmp2 -def beam_search(model, encode_input, beam_size, target_len, use_candidates=False): +def beam_search(model, encode_input, beam_size, target_len, use_candidates=False, candidates_size = None): batch_size = len(encode_input) output_topk = np.zeros((batch_size, beam_size, target_len + 1), dtype=np.int) beam_score = np.ones((batch_size, beam_size)) @@ -33,11 +56,13 @@ def beam_search(model, encode_input, beam_size, target_len, use_candidates=False candidates = None prob = decode_step(model, encode_input, output_topk[:, 0, :1], candidates=candidates, beam_size=beam_size) if use_candidates: - candidates = prob[:, :, 1] + probs_first_step = decode_step(model, encode_input, output_topk[:, 0, :1], candidates=candidates, beam_size=candidates_size) + candidates = probs_first_step[:, :, 1] output_topk[:, :, 1] = prob[:, :, 1] beam_score[:, :] = prob[:, :, 0] for i in range(1, target_len): - # print(i) + a = time.time() + print('beam_search at target_len_', i) probs = [] for j in range(beam_size): # batch_size,k,2 diff --git a/rl4rs/policy/policy_model.py b/rl4rs/policy/policy_model.py index 4f2fdc4..170d3ae 100644 --- a/rl4rs/policy/policy_model.py +++ b/rl4rs/policy/policy_model.py @@ -8,13 +8,16 @@ class policy_model(object): def __init__(self, model, config = {}): self.policy = model + self.config = config self.page_items = int(config.get('page_items', 9)) self.mask_size = self.page_items+1 self.location_mask = config.get('location_mask', None) self.special_items = config.get('special_items', None) def predict_with_mask(self, obs): - if isinstance(self.policy, d3rlpy.algos.AlgoBase): + if self.config.get("support_conti_env",False): + return self.predict(obs) + elif isinstance(self.policy, d3rlpy.algos.AlgoBase): obs = np.array(obs) action_probs = np.array(self.action_probs(obs)) batch_size = len(obs) diff --git a/rl4rs/server/gymHttpServer.py b/rl4rs/server/gymHttpServer.py index 79d2708..47a78f6 100644 --- a/rl4rs/server/gymHttpServer.py +++ b/rl4rs/server/gymHttpServer.py @@ -5,6 +5,7 @@ import gym import numpy as np import six +import time import argparse import sys import json @@ -37,6 +38,7 @@ class Envs(object): def __init__(self): self.envs = {} self.id_len = 8 + self.env_lasttime = {} def _lookup_env(self, instance_id): try: @@ -51,6 +53,13 @@ def _remove_env(self, instance_id): raise InvalidUsage('Instance_id {} unknown'.format(instance_id)) def create(self, env_id, config={}, seed=None): + instance_ids = list(self.list_all().keys()) + print('list all envs ', instance_ids) + for instance_id in instance_ids[:-2]: + active_time = self.env_lasttime.get(instance_id, 0) + if abs(time.time()-active_time)>=300: + self.env_close(instance_id) + print('env_close envs ', instance_id) try: if env_id == 'SlateRecEnv-v0': config['gpu'] = False @@ -64,11 +73,14 @@ def create(self, env_id, config={}, seed=None): env = gym.make(env_id) if seed: env.seed(seed) + except gym.error.Error: raise InvalidUsage("Attempted to look up malformed environment ID '{}'".format(env_id)) instance_id = str(uuid.uuid4().hex)[:self.id_len] self.envs[instance_id] = env + self.env_lasttime[instance_id] = time.time() + # assert len(list(self.list_all().keys())) <= 3 return instance_id def list_all(self): @@ -80,6 +92,7 @@ def reset(self, instance_id): return env.observation_space.to_jsonable(obs) def step(self, instance_id, action, render): + self.env_lasttime[instance_id] = time.time() env = self._lookup_env(instance_id) if isinstance(action, six.integer_types): nice_action = action diff --git a/script/batchrl_train.py b/script/batchrl_train.py index 5b710c3..7888614 100644 --- a/script/batchrl_train.py +++ b/script/batchrl_train.py @@ -8,11 +8,13 @@ from rl4rs.env.seqslate import SeqSlateRecEnv, SeqSlateState from script import batchrl_trainer from d3rlpy.dataset import MDPDataset -from rl4rs.nets.cql.encoder import CustomVectorEncoderFactory -from rl4rs.nets.cql.q_function import CustomMeanQFunctionFactory from script.offline_evaluation import ope_eval from rl4rs.policy.behavior_model import behavior_model from rl4rs.policy.policy_model import policy_model +from rl4rs.nets.cql.encoder import CustomVectorEncoderFactory +from d3rlpy.metrics.scorer import dynamics_observation_prediction_error_scorer +from d3rlpy.metrics.scorer import dynamics_reward_prediction_error_scorer +from d3rlpy.metrics.scorer import dynamics_prediction_variance_scorer algo = sys.argv[1] stage = sys.argv[2] @@ -21,7 +23,7 @@ config = {"epoch": 4, "maxlen": 64, "batch_size": 2048, "action_size": 284, "class_num": 2, "dense_feature_num": 432, "category_feature_num": 21, "category_hash_size": 100000, "seq_num": 2, "emb_size": 128, "hidden_units": 128, "max_steps": 9, "sample_file": '../dataset/rl4rs_dataset_a_shuf.csv', - "model_file": "../output/rl4rs_dataset_a_dnn/model", 'gpu': True, "page_items": 9, + "model_file": "../output/rl4rs_dataset_a_dnn/model", 'gpu': True, "page_items": 9, 'action_emb_size':32, "iteminfo_file": '../dataset/item_info.csv', "support_d3rl_mask": True, "is_eval": True, "CQL_alpha": 1, 'env': 'SlateRecEnv-v0', 'trial_name': 'a_all'} @@ -39,78 +41,73 @@ else: assert config['env'] in ('SlateRecEnv-v0', 'SeqSlateRecEnv-v0') +if algo in ('MOPO', 'COMBO') or 'conti' in algo: + config["support_conti_env"] = True + if not config.get('gpu', True): os.environ['CUDA_VISIBLE_DEVICES'] = '-1' torch.cuda.is_available = lambda: False print('CUDA_VISIBLE_DEVICES', torch.cuda.is_available()) -trail_name = config['env'] + '_' + config['trial_name'] + '.h5' +if not config.get("support_conti_env",False): + trail_name = config['env'] + '_' + config['trial_name'] + '.h5' +elif config.get("support_onehot_action", False): + config['action_emb_size'] = config["action_size"] + trail_name = config['env'] + '_' + config['trial_name'] + '_onehot.h5' +else: + trail_name = config['env'] + '_' + config['trial_name'] + '_conti.h5' dataset_dir = os.environ['rl4rs_dataset_dir'] output_dir = os.environ['rl4rs_output_dir'] dataset_save_path = dataset_dir + '/' + trail_name +dynamics_save_path = output_dir + '/' + 'dynamics' + '_' + trail_name model_save_path = output_dir + '/' + algo + '_' + trail_name scaler = None - print(trail_name, config) -if algo == 'BC': - encoder_factory = CustomVectorEncoderFactory( - config, - action_size=config["action_size"], - mask_size=int(config['page_items'])+1, - with_q=True, - hidden_units=[256] - ) - model = d3rlpy.algos.DiscreteBC( - batch_size=256, - beta=0, - encoder_factory=encoder_factory, - use_gpu=config['gpu'] - ) -elif algo == 'BCQ': - encoder_factory = CustomVectorEncoderFactory( - config, - action_size=config["action_size"], - mask_size=int(config['page_items'])+1, - with_q=True, - hidden_units=[256] - ) - model = d3rlpy.algos.DiscreteBCQ( - batch_size=256, - encoder_factory=encoder_factory, - use_gpu=config['gpu'] - ) -elif algo == 'CQL': - encoder_factory = CustomVectorEncoderFactory( - config, - action_size=config["action_size"], - mask_size=int(config['page_items'])+1, - with_q=True, - hidden_units=[256] - ) - model = d3rlpy.algos.DiscreteCQL( - batch_size=256, - q_func_factory=CustomMeanQFunctionFactory(share_encoder=True), - encoder_factory=encoder_factory, - gamma=1.0, - alpha=config["CQL_alpha"], - reward_scaler='standard', - use_gpu=config['gpu'] - ) -else: - assert algo in ('BC', 'BCQ', 'CQL') +try: + dataset = MDPDataset.load(dataset_save_path) +except Exception: + dataset = None + +try: + dynamics = batchrl_trainer.get_model(config, 'dynamics') + dynamics = batchrl_trainer.build_with_dataset(dynamics, dataset) + dynamics.load_model(dynamics_save_path) +except Exception: + dynamics = None if stage == 'dataset_generate': if config['env'] == 'SlateRecEnv-v0': - batchrl_trainer.data_generate_rl4rs_a(config, dataset_save_path) + if not config.get("support_conti_env",False): + batchrl_trainer.data_generate_rl4rs_a(config, dataset_save_path) + else: + batchrl_trainer.data_generate_rl4rs_a_conti(config, dataset_save_path) elif config['env'] == 'SeqSlateRecEnv-v0': - batchrl_trainer.data_generate_rl4rs_b(config, dataset_save_path) + if not config.get("support_conti_env",False): + batchrl_trainer.data_generate_rl4rs_b(config, dataset_save_path) + else: + batchrl_trainer.data_generate_rl4rs_b_conti(config, dataset_save_path) else: batchrl_trainer.data_generate_rl4rs_a(config, dataset_save_path) assert config['env'] in ('SlateRecEnv-v0', 'SeqSlateRecEnv-v0') +if stage == 'train_dynamics' or (stage == 'train' and algo == 'dynamics'): + dynamics = batchrl_trainer.get_model(config, 'dynamics') + print('get_action_size', dataset.episodes[0].get_action_size()) + dynamics.fit(dataset, + eval_episodes=dataset.episodes[-3000:], + n_epochs=10, + show_progress=False, + scorers={ + 'observation_error': dynamics_observation_prediction_error_scorer, + 'reward_error': dynamics_reward_prediction_error_scorer, + 'variance': dynamics_prediction_variance_scorer, + } + ) + dynamics.save_model(dynamics_save_path) + if stage == 'train': - dataset = MDPDataset.load(dataset_save_path) + model = batchrl_trainer.get_model(config, algo, dynamics) model.fit(dataset, eval_episodes=dataset.episodes[-3000:], n_epochs=config['epoch'], @@ -122,24 +119,20 @@ if config['env'] == 'SlateRecEnv-v0' \ else 90 * 2 soft_opc_score = config.get('soft_opc_score', default_soft_opc_score) - dataset = MDPDataset.load(dataset_save_path) - model.build_with_dataset(dataset) + model = batchrl_trainer.get_model(config, algo, dynamics) + model = batchrl_trainer.build_with_dataset(model, dataset) model.load_model(model_save_path) eval_episodes = random.sample(dataset.episodes, 2048 * 4) policy = policy_model(model, config=config) - batchrl_trainer.d3rlpy_eval(eval_episodes, policy, soft_opc_score) + # batchrl_trainer.d3rlpy_eval(eval_episodes, policy, soft_opc_score) batchrl_trainer.evaluate(config, policy) if stage == 'ope': dataset_dir = os.environ['rl4rs_dataset_dir'] sample_model = behavior_model(config, modelfile=dataset_dir + '/logged_policy.h5') - dataset = MDPDataset.load(dataset_save_path) - model.build_with_dataset(dataset) + model = batchrl_trainer.get_model(config, algo, dynamics) + model = batchrl_trainer.build_with_dataset(model, dataset) model.load_model(model_save_path) - if model.reward_scaler is not None: - model.reward_scaler.fit(dataset) - print('reward_scaler_mean', model.reward_scaler._mean) - print('reward_scaler_std', model.reward_scaler._std) eval_config = config.copy() eval_config["is_eval"] = True eval_config["batch_size"] = 2048 diff --git a/script/batchrl_trainer.py b/script/batchrl_trainer.py index 251b224..1809c31 100644 --- a/script/batchrl_trainer.py +++ b/script/batchrl_trainer.py @@ -7,6 +7,166 @@ from d3rlpy.dataset import MDPDataset, Episode from rl4rs.policy.policy_model import policy_model from rl4rs.utils import d3rlpy_scorer +from rl4rs.nets.cql.encoder import CustomVectorEncoderFactory +from rl4rs.nets.cql.q_function import CustomMeanQFunctionFactory + + +def get_model(config, algo, dynamics=None): + if algo == 'dynamics': + discrete_action = False if config.get("support_conti_env",False) else True + encoder_factory = CustomVectorEncoderFactory( + config, + action_size=config["action_size"], + mask_size=int(config['page_items'])+1, + with_q=False, + hidden_units=[256, 128], + use_batch_norm=True, + dropout_rate = 0.2, + use_dense=True + ) + model = d3rlpy.dynamics.ProbabilisticEnsembleDynamics(batch_size=512, + discrete_action = discrete_action, + encoder_factory=encoder_factory, + learning_rate=1e-4, + scaler = 'min_max', + reward_scaler = 'standard', + use_gpu=config['gpu']) + elif algo == 'BC': + encoder_factory = CustomVectorEncoderFactory( + config, + action_size=config["action_size"], + mask_size=int(config['page_items'])+1, + with_q=True, + hidden_units=[256] + ) + model = d3rlpy.algos.DiscreteBC( + batch_size=256, + beta=0, + encoder_factory=encoder_factory, + use_gpu=config['gpu'] + ) + elif algo == 'BCQ': + encoder_factory = CustomVectorEncoderFactory( + config, + action_size=config["action_size"], + mask_size=int(config['page_items'])+1, + with_q=True, + hidden_units=[256] + ) + model = d3rlpy.algos.DiscreteBCQ( + batch_size=256, + encoder_factory=encoder_factory, + use_gpu=config['gpu'] + ) + elif algo == 'BCQ-conti': + # encoder_factory = CustomVectorEncoderFactory( + # config, + # action_size=config["action_size"], + # mask_size=int(config['page_items'])+1, + # with_q=True, + # hidden_units=[256] + # ) + model = d3rlpy.algos.BCQ( + batch_size=256, + # encoder_factory=encoder_factory, + use_gpu=config['gpu'] + ) + elif algo == 'CQL': + encoder_factory = CustomVectorEncoderFactory( + config, + action_size=config["action_size"], + mask_size=int(config['page_items'])+1, + with_q=True, + hidden_units=[256] + ) + model = d3rlpy.algos.DiscreteCQL( + batch_size=256, + # q_func_factory=CustomMeanQFunctionFactory(share_encoder=True), + # encoder_factory=encoder_factory, + gamma=1.0, + alpha=config["CQL_alpha"], + reward_scaler='standard', + use_gpu=config['gpu'] + ) + elif algo == 'CQL-conti': + # encoder_factory = CustomVectorEncoderFactory( + # config, + # action_size=config["action_size"], + # mask_size=int(config['page_items'])+1, + # with_q=True, + # hidden_units=[256] + # ) + model = d3rlpy.algos.CQL( + batch_size=256, + # q_func_factory=CustomMeanQFunctionFactory(share_encoder=True), + # encoder_factory=encoder_factory, + gamma=1.0, + alpha=config["CQL_alpha"], + reward_scaler='standard', + use_gpu=config['gpu'] + ) + elif algo == 'MOPO': + # encoder_factory = CustomVectorEncoderFactory( + # config, + # action_size=config["action_size"], + # mask_size=int(config['page_items'])+1, + # with_q=True, + # hidden_units=[256] + # ) + model = d3rlpy.algos.MOPO( + dynamics = dynamics, + batch_size=256, + update_actor_interval = 2000, + # q_func_factory=CustomMeanQFunctionFactory(share_encoder=True), + # actor_encoder_factory=encoder_factory, + gamma=1.0, + scaler = 'min_max', + reward_scaler='standard', + use_gpu=config['gpu'], + # real_ratio = 1, + # rollout_interval = 1000000000, + # generated_maxlen = 100 + ) + elif algo == 'COMBO': + # encoder_factory = CustomVectorEncoderFactory( + # config, + # action_size=config["action_size"], + # mask_size=int(config['page_items'])+1, + # with_q=True, + # hidden_units=[256] + # ) + model = d3rlpy.algos.COMBO( + dynamics = dynamics, + batch_size=256, + update_actor_interval = 2000, + # q_func_factory=CustomMeanQFunctionFactory(share_encoder=True), + # actor_encoder_factory=encoder_factory, + gamma=1.0, + scaler = 'min_max', + reward_scaler='standard', + use_gpu=config['gpu'], + # real_ratio = 1, + # rollout_interval = 1000000000, + # generated_maxlen = 100 + ) + else: + assert algo in ('BC', 'BCQ', 'CQL', "MOPO", "COMBO") + return model + + +def build_with_dataset(model, dataset): + model.build_with_dataset(dataset) + if model._scaler: + model._scaler.fit(dataset) + + # initialize action scaler + if model._action_scaler: + model._action_scaler.fit(dataset) + + # initialize reward scaler + if model._reward_scaler: + model._reward_scaler.fit(dataset) + return model def data_generate_rl4rs_a(config, datasetfile): @@ -57,6 +217,58 @@ def data_generate_rl4rs_a(config, datasetfile): dataset.dump(datasetfile) +def data_generate_rl4rs_a_conti(config, datasetfile): + config["support_conti_env"] = 1 + if config.get('gpu', 0) < 1: + os.environ['CUDA_VISIBLE_DEVICES'] = '-1' + if config.get("support_onehot_action", False): + config['action_emb_size'] = config["action_size"] + + batch_size = config["batch_size"] + sim = SlateRecEnv(config, state_cls=SlateState) + env = gym.make('SlateRecEnv-v0', recsim=sim) + epoch = 1000000 // batch_size + observations = np.zeros((epoch, batch_size, 10, 256 + 9 + 1), 'float32') + actions = np.zeros((epoch, batch_size, 10, config['action_emb_size']), 'float32') + rewards = np.zeros((epoch, batch_size, 10), 'float32') + terminals = np.zeros((epoch, batch_size, 10), 'float32') + for i in range(epoch): + obs = env.reset() + observations[i, :, 0, :] = obs + action = env.offline_action + actions[i, :, 0] = action + rewards[i, :, 0] = [0] * batch_size + terminals[i, :, 0] = [0] * batch_size + for j in range(9): + obs, reward, done, info = env.step(action) + observations[i, :, j + 1, :] = obs + action = env.offline_action + actions[i, :, j + 1] = action + rewards[i, :, j + 1] = env.offline_reward + terminals[i, :, j + 1] = done + + print('max', np.max(actions)) + print('actions shape', actions.shape) + + # shuffle + p = np.random.permutation(epoch) + observations = observations[p] + actions = actions[p] + rewards = rewards[p] + terminals = terminals[p] + + # reshape + observations.shape = (epoch * batch_size * 10, -1) + actions.shape = (epoch * batch_size * 10, -1) + rewards.shape = (epoch * batch_size * 10,) + terminals.shape = (epoch * batch_size * 10,) + + dataset = MDPDataset(observations, actions, rewards, terminals, discrete_action=False) + print('get_action_size', dataset.episodes[0].get_action_size()) + + # # save as HDF5 + dataset.dump(datasetfile) + def data_generate_rl4rs_b(config, datasetfile): if config.get('gpu', 0) < 1: os.environ['CUDA_VISIBLE_DEVICES'] = '-1' @@ -108,6 +320,60 @@ def data_generate_rl4rs_b(config, datasetfile): dataset.dump(datasetfile) +def data_generate_rl4rs_b_conti(config, datasetfile): + config["support_conti_env"] = 1 + if config.get('gpu', 0) < 1: + os.environ['CUDA_VISIBLE_DEVICES'] = '-1' + if config.get("support_onehot_action", False): + config['action_emb_size'] = config["action_size"] + + batch_size = config["batch_size"] + sim = SeqSlateRecEnv(config, state_cls=SeqSlateState) + env = gym.make('SeqSlateRecEnv-v0', recsim=sim) + epoch = 500000 // batch_size + max_step = config['max_steps'] + + observations = np.zeros((epoch, batch_size, max_step + 1, 256 + 9 + 1), 'float32') + actions = np.zeros((epoch, batch_size, max_step + 1, config['action_emb_size']), 'float32') + rewards = np.zeros((epoch, batch_size, max_step + 1), 'float32') + terminals = np.zeros((epoch, batch_size, max_step + 1), 'float32') + for i in range(epoch): + print('epoch', i) + obs = env.reset() + observations[i, :, 0, :] = obs + action = env.offline_action + actions[i, :, 0] = action + rewards[i, :, 0] = [0] * batch_size + terminals[i, :, 0] = [0] * batch_size + for j in range(max_step): + obs, reward, done, info = env.step(action) + observations[i, :, j + 1, :] = obs + action = env.offline_action + actions[i, :, j + 1] = action + rewards[i, :, j + 1] = env.offline_reward + terminals[i, :, j + 1] = done + + print('max', np.max(actions)) + + # shuffle + p = np.random.permutation(epoch) + observations = observations[p] + actions = actions[p] + rewards = rewards[p] + terminals = terminals[p] + + # reshape + observations.shape = (epoch * batch_size * (max_step + 1), -1) + actions.shape = (epoch * batch_size * (max_step + 1), config['action_emb_size']) + rewards.shape = (epoch * batch_size * (max_step + 1),) + terminals.shape = (epoch * batch_size * (max_step + 1),) + + dataset = MDPDataset(observations, actions, rewards, terminals, discrete_action=False) + + # # save as HDF5 + dataset.dump(datasetfile) + + def d3rlpy_eval(eval_episodes, policy: policy_model, soft_opc_score=90): if isinstance(policy.policy, d3rlpy.algos.DiscreteBC): scorers = { diff --git a/script/tool/mdp_checker.py b/script/mdpchecker/mdp_checker.py similarity index 86% rename from script/tool/mdp_checker.py rename to script/mdpchecker/mdp_checker.py index 65c9f95..6071b2d 100644 --- a/script/tool/mdp_checker.py +++ b/script/mdpchecker/mdp_checker.py @@ -3,11 +3,13 @@ import random from scipy.stats import spearmanr from keras_transformer import get_model, decode -from rl4rs.tool.decoder import beam_search, token_probs +from rl4rs.mdpchecker.decoder import beam_search, token_probs # dataset_file = 'recsys15.csv' # dataset_file = 'movielens.csv' # dataset_file = 'rl4rs.csv' +# dataset_file = 'lastfm.csv' +# dataset_file = 'cikm2016.csv' dataset_file = sys.argv[1] + '.csv' dataset_dir = sys.argv[2] @@ -16,6 +18,8 @@ # increase the sample size if 'recsys15' in dataset_file: source_len = 8 +elif 'cikm2016' in dataset_file: + source_len = 5 else: source_len = 16 target_len = 5 @@ -28,16 +32,19 @@ for sample in data: user_id, items = sample.split(' ') item_list = items.split(',') - assert len(item_list) >= source_len + target_len - i = 0 - if 'rl4rs' in dataset_file: - source_tokens.append(item_list[:source_len]) - target_tokens.append(item_list[source_len:source_len + target_len]) + if len(item_list) >= source_len + target_len: + # assert len(item_list) >= source_len + target_len + i = 0 + if 'rl4rs' in dataset_file or 'cikm2016' in dataset_file: + source_tokens.append(item_list[:source_len]) + target_tokens.append(item_list[source_len:source_len + target_len]) + else: + while i + source_len + target_len < len(item_list): + source_tokens.append(item_list[i: i + source_len]) + target_tokens.append(item_list[i + source_len: i + source_len + target_len]) + i = i + np.random.randint(source_len, source_len + target_len) // 6 else: - while i + source_len + target_len < len(item_list): - source_tokens.append(item_list[i: i + source_len]) - target_tokens.append(item_list[i + source_len: i + source_len + target_len]) - i = i + np.random.randint(source_len, source_len + target_len) // 6 + print('len(item_list) <= source_len + target_len in', '\t',sample) # Generate dictionaries token_dict = { @@ -128,6 +135,8 @@ def build_token_dict(token_list): beam_size = 100 # use 20 hot items since rl4rs has only 200+ items hot_beam_size = 20 if 'rl4rs' in dataset_file else beam_size +# cikm2016 has only 60853 items +candidates_size = 6000 if 'cikm2016' in dataset_file else hot_beam_size random.seed(1) encode_input = random.sample(encode_input[-10000:], batch_size) output_greedy, greedy_score = beam_search(model, encode_input, beam_size=1, target_len=target_len) @@ -139,7 +148,7 @@ def build_token_dict(token_list): output_topk_5, beam_score_5 = output_topk[:, :int(beam_size * 0.05)], beam_score[:, :int(beam_size * 0.05)] output_topk_20, beam_score_20 = output_topk[:, :int(beam_size * 0.2)], beam_score[:, :int(beam_size * 0.2)] -output_topk_hot, beam_score_hot = beam_search(model, encode_input, beam_size=hot_beam_size, target_len=target_len, use_candidates=True) +output_topk_hot, beam_score_hot = beam_search(model, encode_input, beam_size=hot_beam_size, target_len=target_len, use_candidates=True, candidates_size=candidates_size) output_topk_hot5, beam_score_hot5 = output_topk_hot[:, :int(beam_size * 0.05)], beam_score_hot[:, :int(beam_size * 0.05)] output_topk_hot20, beam_score_hot20 = output_topk_hot[:, :int(beam_size * 0.2)], beam_score_hot[:, :int(beam_size * 0.2)] diff --git a/script/mdpchecker/preprocess.py b/script/mdpchecker/preprocess.py new file mode 100644 index 0000000..f290c33 --- /dev/null +++ b/script/mdpchecker/preprocess.py @@ -0,0 +1,269 @@ +import sys +import pandas as pd +import pandasql as ps + +dataset_file = sys.argv[1] +dataset_dir = sys.argv[2] + +pysqldf = lambda q: ps.sqldf(q, globals()) + +# recsys15-click +if 'lastfm' in dataset_file: + df = pd.read_csv(dataset_dir + '/lastfm-dataset-1K/userid-timestamp-artid-artname-traid-traname.tsv' + ,names=["userid", "timestamp", "artid", "artname", "traid", "traname"] + ,sep='\t') + + sql0 = """ + select userid as sessionid, min(timestamp) as timestamp, artid + from + df a + group by userid, artid, substr(timestamp,1,12) + """ + + sql1 = """ + select a.sessionid, a.timestamp, b.item + from + df a + join + (select artid, ROW_NUMBER() OVER(ORDER BY artid) AS item + from ( + select artid + from + df a + group by artid + having count(*)>=30 + )aa + )b + on a.artid=b.artid + """ + + sql2 = """ + select sessionid, group_concat(item) as items + from( + select * + from + df2 + order by timestamp asc + )a + group by sessionid + + """ + + df = pysqldf(sql0) + + df2 = pysqldf(sql1) + + df3 = pysqldf(sql2) + + print('items num.', df2['item'].value_counts().count()) + print('max item id', df2['item'].max()) + print('sessionid num.', df2['sessionid'].value_counts().count()) + + df3.to_csv(dataset_dir + '/' + dataset_file + '.csv', sep=' ', header=True, index=False, encoding='utf-8') + +if 'cikm2016' in dataset_file: + # queryId;sessionId;userId;timeframe;duration;eventdate;searchstring.tokens;categoryId;items;is.test + queries_df = pd.read_csv(dataset_dir + '/CIKMCUP2016_Track2/train-queries.csv',sep=';') + # queryId;timeframe;itemId + click_df = pd.read_csv(dataset_dir + '/CIKMCUP2016_Track2/train-clicks.csv',sep=';') + # sessionId;userId;itemId;timeframe;eventdate + pv_df = pd.read_csv(dataset_dir + '/CIKMCUP2016_Track2/train-item-views.csv',sep=';') + + # sql0 = """ + # select a.sessionId as sessionid, min(b.timeframe) as timestamp, b.itemId, a.items as pv_items + # from + # queries_df a + # join click_df b + # on a.queryId = b.queryId + # group by b.queryId, b.itemId, cast(b.timeframe/1000 as int) + # """ + + df_click_sql = """ + select a.sessionId as sessionid, min(cast(b.timeframe as int)) as timestamp, b.itemId as item + from + queries_df a + join click_df b + on a.queryId = b.queryId + join (select sessionId from pv_df group by sessionId)c + on a.sessionId = c.sessionId + group by a.sessionId, b.itemId, cast(b.timeframe/1000 as int) + """ + + df_pv_sql = """ + select a.sessionId as sessionid, min(cast(c.timeframe as int)) as timestamp, c.itemId as item + from + queries_df a + join (select queryId from click_df group by queryId) b + on a.queryId = b.queryId + join pv_df c + on a.sessionId = c.sessionId + group by a.sessionId, c.itemId, cast(c.timeframe/1000 as int) + """ + + df_sql = """ + select aa.sessionid, group_concat(c.item|| ':' ||c.timestamp) as pv_items, aa.click_items + from + ( + select a.sessionid,a.timestamp,a.item,group_concat(b.item|| ':' ||b.timestamp) as click_items from + df_click a + join df_click b + on a.sessionid=b.sessionid and a.timestamp<=b.timestamp + group by a.sessionid,a.item + )aa + + join df_pv c + on aa.sessionid=c.sessionid and aa.timestamp>c.timestamp + group by aa.sessionid,aa.click_items + """ + + df_click = pysqldf(df_click_sql) + df_pv = pysqldf(df_pv_sql) + df = pysqldf(df_sql) + + tmp = [] + items = set() + for x in df.values: + if len(x[1].split(','))>=5 and len(x[2].split(','))>=5: + [items.add(x.split(':')[0]) for x in x[1].split(',')] + [items.add(x.split(':')[0]) for x in x[2].split(',')] + + # item2id=dict([(x,str(i)) for i,x in enumerate(items)]) + # item2id_fn = lambda x:item2id[x] + + for x in df.values: + if len(x[1].split(','))>=5 and len(x[2].split(','))>=5: + pv_items = x[1].split(',') + sorted_pv_items = sorted(pv_items, key=lambda x:int(x.split(':')[1]))[-5:] + sorted_pv_items = [x.split(':')[0] for x in sorted_pv_items] + click_items = x[2].split(',') + sorted_click_items = sorted(click_items, key=lambda x:int(x.split(':')[1]))[:5] + sorted_click_items = [x.split(':')[0] for x in sorted_click_items] + tmp.append([x[0], ','.join(sorted_pv_items), ','.join(sorted_click_items)]) + + print('items num.', len(items)) + print('max item id', len(items)-1) + print('sessionid num.', len(tmp)) + + with open(dataset_dir + '/' + dataset_file + '.csv', 'w') as f: + f.write('sessionid items'+'\n') + f.write('\n'.join([str(x[0])+' '+x[1]+','+x[2] for x in tmp])) + + +# recsys15-click +if 'recsys15' in dataset_file: + df = pd.read_csv(dataset_dir + '/yoochoose-clicks.dat', names=["sessionid", "timestamp", "item", "Category"]) + + sql0 = """ + select sessionid, min(timestamp) as timestamp, item, Category + from + df a + group by sessionid, item, substr(timestamp,1,12) + """ + + sql1 = """ + select a.* + from + df a + join + (SELECT item FROM df group by item having count(*)>=1000)b + on a.item=b.item + """ + + sql2 = """ + select a.* + from + df2 a + join + (SELECT sessionid FROM df2 group by sessionid having count(*)>=13)b + on a.sessionid=b.sessionid + """ + + sql3 = """ + select sessionid, group_concat(item) as items + from + df3 + group by sessionid + order by timestamp asc + """ + + df = pysqldf(sql0) + + df2 = pysqldf(sql1) + + df3 = pysqldf(sql2) + + df4 = pysqldf(sql3) + + print('items num.', df3['item'].value_counts().count()) + print('max item id', df3['item'].max()) + print('sessionid num.', df3['sessionid'].value_counts().count()) + + df4.to_csv(dataset_dir + '/' + dataset_file + '.csv', sep=' ', header=True, index=False, encoding='utf-8') + +if 'movielens' in dataset_file: + # movielens-25m + df = pd.read_csv(dataset_dir + '/ml-25m/ratings.csv') + # userId,movieId,rating,timestamp + sql0 = """ + select * + from + df a + where rating>=3 + """ + + sql1 = """ + select a.* + from + df a + join + (SELECT movieId FROM df group by movieId having count(*)>=1000)b + on a.movieId=b.movieId + """ + + sql2 = """ + select a.* + from + df2 a + join + (SELECT userId FROM df2 group by userId having count(*)>=30 and count(*)<=100)b + on a.userId=b.userId + """ + + sql3 = """ + select userId as sessionid, group_concat(movieId) as items + from + df3 + group by userId + order by timestamp asc + """ + df = pysqldf(sql0) + + df2 = pysqldf(sql1) + + df3 = pysqldf(sql2) + + df4 = pysqldf(sql3) + + print('items num.', df3['movieId'].value_counts().count()) + print('max item id', df3['movieId'].max()) + print('sessionid num.', df3['userId'].value_counts().count()) + + df4.to_csv(dataset_dir + '/movielens.csv', sep=' ', header=True, index=False, encoding='utf-8') + +if 'rl4rs' in dataset_file: + # RL4RS + data = open(dataset_dir + '/rl4rs_dataset_a.csv', 'r').read().split('\n')[:-1] + tmp = ['sessionid items'] + for x in data: + session_id = x.split('@')[1] + sequence_id = list(map(int, x.split('@')[5].split(','))) + items = list(map(int, x.split('@')[3].split(','))) + if len(sequence_id) >= 16: + tmp.append(session_id + ' ' + ','.join(list(map(str, sequence_id[-16:] + items[:5])))) + + print('items num.', 283) + print('max item id', 283) + print('sessionid num.', len(tmp)) + + with open(dataset_dir + '/rl4rs.csv', 'w') as f: + f.write('\n'.join(tmp)) diff --git a/script/modelfree_train.py b/script/modelfree_train.py index cfeb319..583bac6 100644 --- a/script/modelfree_train.py +++ b/script/modelfree_train.py @@ -2,6 +2,7 @@ import numpy as np import gym import ray +from copy import deepcopy from ray.rllib.models import ModelCatalog from ray.tune.registry import register_env from rl4rs.env.slate import SlateRecEnv, SlateState @@ -36,17 +37,24 @@ "remote_base": 'http://127.0.0.1:16773', 'env': "SlateRecEnv-v0"} config = dict(config, **extra_config) +eval_config = deepcopy(config) if config['env'] == 'SeqSlateRecEnv-v0': - config['max_steps'] = 36 - config['batch_size'] = config['batch_size'] // 4 + eval_config['max_steps'] = config['max_steps'] = 36 + eval_config['batch_size'] = config['batch_size'] = config['batch_size'] // 4 -if algo == "DDPG" or 'conti' in algo: - config['support_conti_env'] = True - config['support_rllib_mask'] = False +if "DDPG" in algo or "TD3" in algo or 'conti' in algo: + eval_config['support_conti_env'] = config['support_conti_env'] = True + eval_config['support_rllib_mask'] = config['support_rllib_mask'] = False + +if 'RAINBOW' in algo: + eval_config['support_rllib_mask'] = config['support_rllib_mask'] = False + eval_config['support_onehot_action'] = True + eval_config['support_conti_env'] = True if 'rawstate' in algo: - config['rawstate_as_obs'] = True + eval_config['rawstate_as_obs'] = config['rawstate_as_obs'] = True + print(extra_config, config) @@ -81,6 +89,20 @@ "model": { "custom_model": "model_rawstate", }}) +if algo == "TD3" or algo == "TD3_rawstate": + assert config['support_conti_env'] == True + cfg = { + "exploration_config": { + "type": "OrnsteinUhlenbeckNoise", + "random_timesteps":10000 + }, + } + if 'rawstate' in algo or config.get('rawstate_as_obs', False): + cfg = dict({ + **cfg, + "model": { + "custom_model": "model_rawstate", + }}) elif algo == "DQN" or algo == "DQN_rawstate": cfg = { # TODO(ekl) we need to set these to prevent the masked values @@ -109,6 +131,51 @@ "model": { "custom_model": "mask_model_rawstate", }}) +elif algo == "SLATEQ" or algo == "SLATEQ_rawstate": + cfg = { + "model": { + "custom_model": "mask_model", + }, + } + if 'rawstate' in algo or config.get('rawstate_as_obs', False): + cfg = dict({ + **cfg, + "model": { + "custom_model": "mask_model_rawstate", + }}) +elif algo == "RAINBOW" or algo == "RAINBOW_rawstate": + # note that DistributionalQModel will make action masking not work + cfg = { + # TODO(ekl) we need to set these to prevent the masked values + # from being further processed in DistributionalQModel, which + # would mess up the masking. It is possible to support these if we + # defined a custom DistributionalQModel that is aware of masking. + "hiddens": [128], + "noisy": False, + "num_atoms":8, + # "dueling": True, + # Whether to use double dqn + # "double_q": True, + # N-step Q learning + "n_step": 3, + # "target_network_update_freq": 200, + "v_min": 0.0, + "v_max": 1000.0, + # === Replay buffer === + # Size of the replay buffer in batches (not timesteps!). + "buffer_size": 100000, + # 'rollout_fragment_length': 200, + # "num_workers": 0, + # "model": { + # "custom_model": "mask_model", + # }, + } + if 'rawstate' in algo or config.get('rawstate_as_obs', False): + cfg = dict({ + **cfg, + "model": { + "custom_model": "mask_model_rawstate", + }}) elif "PPO" in algo: cfg = { "num_workers": 2, @@ -335,7 +402,7 @@ }, "num_gpus": 1 if config.get('gpu', True) else 0, "num_workers": 0, - "framework": 'tf', + "framework": 'tf' if 'SLATEQ' not in algo else 'torch', # "framework": 'tfe', "rollout_fragment_length": config['max_steps'], "batch_mode": "complete_episodes", @@ -352,10 +419,15 @@ trainer = get_rl_model(algo.split('_')[0], rllib_config) if stage == 'train': - # trainer.restore(restore_file) - # print('model restore from %s' % (restore_file)) + try: + trainer.restore(restore_file) + print('model restore from %s' % (restore_file)) + except Exception: + trainer = get_rl_model(algo.split('_')[0], rllib_config) for i in range(config["epoch"]): result = trainer.train() + if (i + 1) % 50 == 0: + print('epoch ',i) if (i + 1) % 500 == 0 or i == 0: print(pretty_print(result)) if (i + 1) % 500 == 0: @@ -387,6 +459,42 @@ print('avg reward', episode_reward / eval_config['batch_size'] / epoch) eval_env.close() + +if stage == 'eval_v2': + # eval_config["epoch"] = 1 + eval_config['is_eval'] = True + eval_config["batch_size"] = 2048 + if config['env'] == 'SeqSlateRecEnv-v0': + config['max_steps'] = 36 + sim = SeqSlateRecEnv(eval_config, state_cls=SeqSlateState) + eval_env = gym.make('SeqSlateRecEnv-v0', recsim=sim) + else: + sim = SlateRecEnv(eval_config, state_cls=SlateState) + eval_env = gym.make('SlateRecEnv-v0', recsim=sim) + # trainer.restore(checkpoint_dir + '/checkpoint_010000/checkpoint-10000') + trainer.restore(restore_file) + print('model restore from %s' % (restore_file)) + from rl4rs.policy.policy_model import policy_model + policy = policy_model(trainer, config) + episode_reward = 0 + done = False + epoch = 4 + actions = [] + for i in range(epoch): + obs = eval_env.reset() + print('test batch at ', i, 'avg reward', episode_reward / eval_config['batch_size'] / (i + 0.0001)) + for _ in range(config["max_steps"]): + if config.get('support_onehot_action', False): + action = policy.predict_with_mask(obs) + else: + action = np.array(policy.action_probs(obs)) + obs, reward, done, info = eval_env.step(action) + episode_reward += sum(reward) + actions.append(action) + print('avg reward', episode_reward / eval_config['batch_size'] / epoch) + eval_env.close() + + if stage == 'ope': dataset_dir = os.environ['rl4rs_dataset_dir'] sample_model = behavior_model(config, modelfile=dataset_dir + '/logged_policy.h5') diff --git a/script/modelfree_trainer.py b/script/modelfree_trainer.py index f59d3f7..a7cfe5b 100644 --- a/script/modelfree_trainer.py +++ b/script/modelfree_trainer.py @@ -2,17 +2,20 @@ import ray.rllib.agents.dqn as dqn import ray.rllib.agents.a3c as a3c import ray.rllib.agents.pg as pg +import ray.rllib.agents.ddpg.td3 as td3 import ray.rllib.agents.impala as impala import ray.rllib.agents.ddpg as ddpg +import ray.rllib.agents.slateq as slateq def get_rl_model(algo, rllib_config): trainer = None if algo == "PPO": trainer = ppo.PPOTrainer(config=rllib_config, env="rllibEnv-v0") - print('trainer_default_config', trainer._default_config) elif algo == "DQN": trainer = dqn.DQNTrainer(config=rllib_config, env="rllibEnv-v0") + elif algo == "RAINBOW": + trainer = dqn.DQNTrainer(config=rllib_config, env="rllibEnv-v0") elif algo == "A2C": trainer = a3c.A2CTrainer(config=rllib_config, env="rllibEnv-v0") elif algo == "A3C": @@ -21,9 +24,13 @@ def get_rl_model(algo, rllib_config): trainer = pg.PGTrainer(config=rllib_config, env="rllibEnv-v0") elif algo == "DDPG": trainer = ddpg.DDPGTrainer(config=rllib_config, env="rllibEnv-v0") + elif algo == "TD3": + trainer = td3.TD3Trainer(config=rllib_config, env="rllibEnv-v0") elif algo == "IMPALA": trainer = impala.ImpalaTrainer(config=rllib_config, env="rllibEnv-v0") - print('trainer_default_config', trainer._default_config) + elif algo == "SLATEQ": + trainer = slateq.SlateQTrainer(config=rllib_config, env="rllibEnv-v0") else: - assert algo in ("PPO", "DQN", "A2C", "A3C", "PG", "IMPALA") + assert algo in ("PPO", "DQN", "A2C", "A3C", "PG", "IMPALA", "TD3", "RAINBOW", "SLATEQ") + print('trainer_default_config', trainer._default_config) return trainer diff --git a/script/tool/preprocess.py b/script/tool/preprocess.py deleted file mode 100644 index 151d66e..0000000 --- a/script/tool/preprocess.py +++ /dev/null @@ -1,127 +0,0 @@ -import sys -import pandas as pd -import pandasql as ps - -dataset_file = sys.argv[1] -dataset_dir = sys.argv[2] - -pysqldf = lambda q: ps.sqldf(q, globals()) - -# recsys15-click -if 'recsys15' in dataset_file: - df = pd.read_csv(dataset_dir + '/yoochoose-clicks.dat', names=["sessionid", "timestamp", "item", "Category"]) - - sql0 = """ - select sessionid, min(timestamp) as timestamp, item, Category - from - df a - group by sessionid, item, substr(timestamp,1,12) - """ - - sql1 = """ - select a.* - from - df a - join - (SELECT item FROM df group by item having count(*)>=1000)b - on a.item=b.item - """ - - sql2 = """ - select a.* - from - df2 a - join - (SELECT sessionid FROM df2 group by sessionid having count(*)>=13)b - on a.sessionid=b.sessionid - """ - - sql3 = """ - select sessionid, group_concat(item) as items - from - df3 - group by sessionid - order by timestamp asc - """ - - df = pysqldf(sql0) - - df2 = pysqldf(sql1) - - df3 = pysqldf(sql2) - - df4 = pysqldf(sql3) - - print('items num.', df3['item'].value_counts().count()) - print('max item id', df3['item'].max()) - print('sessionid num.', df3['sessionid'].value_counts().count()) - - df4.to_csv(dataset_dir + '/' + dataset_file + '.csv', sep=' ', header=True, index=False, encoding='utf-8') - -if 'movielens' in dataset_file: - # movielens-25m - df = pd.read_csv(dataset_dir + '/ml-25m/ratings.csv') - # userId,movieId,rating,timestamp - sql0 = """ - select * - from - df a - where rating>=3 - """ - - sql1 = """ - select a.* - from - df a - join - (SELECT movieId FROM df group by movieId having count(*)>=1000)b - on a.movieId=b.movieId - """ - - sql2 = """ - select a.* - from - df2 a - join - (SELECT userId FROM df2 group by userId having count(*)>=30 and count(*)<=100)b - on a.userId=b.userId - """ - - sql3 = """ - select userId as sessionid, group_concat(movieId) as items - from - df3 - group by userId - order by timestamp asc - """ - df = pysqldf(sql0) - - df2 = pysqldf(sql1) - - df3 = pysqldf(sql2) - - df4 = pysqldf(sql3) - - print('items num.', df3['movieId'].value_counts().count()) - print('max item id', df3['movieId'].max()) - print('sessionid num.', df3['userId'].value_counts().count()) - - df4.to_csv(dataset_dir + '/movielens.csv', sep=' ', header=True, index=False, encoding='utf-8') - -if 'rl4rs' in dataset_file: - # RL4RS - data = open(dataset_dir + '/rl4rs_dataset_a.csv', 'r').read().split('\n')[:-1] - tmp = ['sessionid items'] - for x in data: - session_id = x.split('@')[1] - sequence_id = list(map(int, x.split('@')[5].split(','))) - items = list(map(int, x.split('@')[3].split(','))) - if len(sequence_id) >= 16: - tmp.append(session_id + ' ' + ','.join(list(map(str, sequence_id[-16:] + items[:5])))) - - print('items num.', 283) - print('max item id', 283) - print('sessionid num.', len(tmp)) - - with open(dataset_dir + '/rl4rs.csv', 'w') as f: - f.write('\n'.join(tmp))