Skip to content

Commit

Permalink
V1.1 20220828
Browse files Browse the repository at this point in the history
V1.1 20220828
  • Loading branch information
asdqsczser committed Sep 2, 2022
1 parent 63ecd3d commit 7f21a78
Show file tree
Hide file tree
Showing 13 changed files with 811 additions and 234 deletions.
4 changes: 2 additions & 2 deletions rl4rs/env/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,10 @@ def __init__(self, recsim: RecSimBase):
action_feature_size = len(self.obs[0]['action_mask'])
self.observation_space = gym.spaces.Dict({
"action_mask": gym.spaces.Box(0, 1, shape=(action_feature_size,)),
"obs": gym.spaces.Box(-1000.0, 1000.0, shape=(len(self.obs[0]["obs"]),))
"obs": gym.spaces.Box(-100000.0, 100000.0, shape=(len(self.obs[0]["obs"]),))
})
else:
self.observation_space = gym.spaces.Box(-1000.0, 1000.0, shape=(len(self.obs[0]),))
self.observation_space = gym.spaces.Box(-100000.0, 100000.0, shape=(len(self.obs[0]),))
if self.config.get("support_conti_env", False):
self.action_space = gym.spaces.Box(-1, 1, shape=(self.config['action_emb_size'],))
else:
Expand Down
19 changes: 15 additions & 4 deletions rl4rs/env/slate.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ def __init__(self, config, records):
self.cur_steps = 0
iteminfo_file = config["iteminfo_file"]
self.item_info_d, self.action_emb = self.get_iteminfo_from_file(iteminfo_file, self.action_size)
if config.get('support_onehot_action', False):
config['action_emb_size'] = self.action_size
self.action_emb_size = self.action_size
self.action_emb = np.eye(self.action_size)
self.location_mask, self.special_items = self.get_mask_from_file(iteminfo_file, self.action_size)

@staticmethod
Expand Down Expand Up @@ -146,9 +150,15 @@ def get_violation(self):
def offline_action(self):
cur_step = self.cur_steps
if cur_step < self.max_steps:
action = [int(x.split('@')[3].split(',')[cur_step]) for x in self.records]
if self.config.get("support_conti_env", False):
action = [self.action_emb[int(x.split('@')[3].split(',')[cur_step])] for x in self.records]
else:
action = [int(x.split('@')[3].split(',')[cur_step]) for x in self.records]
else:
action = [0, ] * self.batch_size
if self.config.get("support_conti_env", False):
action = [self.action_emb[0], ] * self.batch_size
else:
action = [0, ] * self.batch_size
return action

@property
Expand Down Expand Up @@ -290,8 +300,9 @@ def forward(self, model, samples):
[samples.info[i].update({'click_p': probs[i]})
for i in range(len(probs))]
reward = np.sum(price * probs, axis=1)
if self.config.get("support_rllib_mask", False) or \
self.config.get("support_d3rl_mask", False):
if 1:
# if self.config.get("support_rllib_mask", False) or \
# self.config.get("support_d3rl_mask", False):
violation = samples.get_violation()
reward[violation < 0.5] = 0
return reward.tolist()
File renamed without changes.
53 changes: 39 additions & 14 deletions rl4rs/tool/decoder.py → rl4rs/mdpchecker/decoder.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import numpy as np

import time
import bottleneck

def token_probs(model,
batch_inputs,
Expand All @@ -12,19 +13,41 @@ def decode_step(model,
batch_outputs,
candidates=None,
beam_size=1):
predicts = model.predict([np.array(batch_inputs), np.array(batch_outputs)])
tmp = []
for i in range(len(predicts)):
probs = [(prob, j) for j, prob in enumerate(predicts[i][-1])]
if candidates is not None:
probs = [x if x[1] in candidates[i] else (0, x[1]) for x in probs]
probs.sort(reverse=True)
probs = probs[:beam_size]
tmp.append(probs)
return np.array(tmp)
a = time.time()
# predicts (batch_size, token_size)
predicts = model.predict([np.array(batch_inputs), np.array(batch_outputs)])[:,-1]
batch_size, token_size = predicts.shape
print('decode_step', time.time()-a)
# print('decode_step', time.time()-a)
# tmp = []
# for i in range(len(predicts)):
# probs = [(prob, j) for j, prob in enumerate(predicts[i])]
# if candidates is not None:
# probs = [x if x[1] in candidates[i] else (0, x[1]) for x in probs]
# probs.sort(reverse=True)
# probs = probs[:beam_size]
# tmp.append(probs)
if candidates is not None:
mask = np.zeros(predicts.shape)
inds = np.array([[i,]*len(candidates[i]) for i in range(len(candidates))]).flatten()
mask[inds, candidates.flatten().astype(int)] = 1
predicts = predicts * mask
# index = np.argpartition(-predicts, beam_size, axis=1)[:, :beam_size]
# probs = -np.partition(-predicts, beam_size, axis=1)[:, :beam_size]
index = bottleneck.argpartition(-predicts, beam_size, axis=1)[:, :beam_size]
probs = -bottleneck.partition(-predicts, beam_size, axis=1)[:, :beam_size]
inds = np.array([[i,]*len(probs[i]) for i in range(len(probs))])
inds_sorted = np.argsort(-probs, axis=1)[:,:beam_size]
index = index[inds, inds_sorted]
probs = probs[inds, inds_sorted]
# print('decode_step', time.time()-a)
tmp2 = np.array(list(zip(probs.flatten(),index.flatten()))).reshape((batch_size, beam_size, 2))
# tmp (batch_size, beam_size, 2)
# print(np.min(tmp==tmp2))
return tmp2


def beam_search(model, encode_input, beam_size, target_len, use_candidates=False):
def beam_search(model, encode_input, beam_size, target_len, use_candidates=False, candidates_size = None):
batch_size = len(encode_input)
output_topk = np.zeros((batch_size, beam_size, target_len + 1), dtype=np.int)
beam_score = np.ones((batch_size, beam_size))
Expand All @@ -33,11 +56,13 @@ def beam_search(model, encode_input, beam_size, target_len, use_candidates=False
candidates = None
prob = decode_step(model, encode_input, output_topk[:, 0, :1], candidates=candidates, beam_size=beam_size)
if use_candidates:
candidates = prob[:, :, 1]
probs_first_step = decode_step(model, encode_input, output_topk[:, 0, :1], candidates=candidates, beam_size=candidates_size)
candidates = probs_first_step[:, :, 1]
output_topk[:, :, 1] = prob[:, :, 1]
beam_score[:, :] = prob[:, :, 0]
for i in range(1, target_len):
# print(i)
a = time.time()
print('beam_search at target_len_', i)
probs = []
for j in range(beam_size):
# batch_size,k,2
Expand Down
5 changes: 4 additions & 1 deletion rl4rs/policy/policy_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@
class policy_model(object):
def __init__(self, model, config = {}):
self.policy = model
self.config = config
self.page_items = int(config.get('page_items', 9))
self.mask_size = self.page_items+1
self.location_mask = config.get('location_mask', None)
self.special_items = config.get('special_items', None)

def predict_with_mask(self, obs):
if isinstance(self.policy, d3rlpy.algos.AlgoBase):
if self.config.get("support_conti_env",False):
return self.predict(obs)
elif isinstance(self.policy, d3rlpy.algos.AlgoBase):
obs = np.array(obs)
action_probs = np.array(self.action_probs(obs))
batch_size = len(obs)
Expand Down
13 changes: 13 additions & 0 deletions rl4rs/server/gymHttpServer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import gym
import numpy as np
import six
import time
import argparse
import sys
import json
Expand Down Expand Up @@ -37,6 +38,7 @@ class Envs(object):
def __init__(self):
self.envs = {}
self.id_len = 8
self.env_lasttime = {}

def _lookup_env(self, instance_id):
try:
Expand All @@ -51,6 +53,13 @@ def _remove_env(self, instance_id):
raise InvalidUsage('Instance_id {} unknown'.format(instance_id))

def create(self, env_id, config={}, seed=None):
instance_ids = list(self.list_all().keys())
print('list all envs ', instance_ids)
for instance_id in instance_ids[:-2]:
active_time = self.env_lasttime.get(instance_id, 0)
if abs(time.time()-active_time)>=300:
self.env_close(instance_id)
print('env_close envs ', instance_id)
try:
if env_id == 'SlateRecEnv-v0':
config['gpu'] = False
Expand All @@ -64,11 +73,14 @@ def create(self, env_id, config={}, seed=None):
env = gym.make(env_id)
if seed:
env.seed(seed)

except gym.error.Error:
raise InvalidUsage("Attempted to look up malformed environment ID '{}'".format(env_id))

instance_id = str(uuid.uuid4().hex)[:self.id_len]
self.envs[instance_id] = env
self.env_lasttime[instance_id] = time.time()
# assert len(list(self.list_all().keys())) <= 3
return instance_id

def list_all(self):
Expand All @@ -80,6 +92,7 @@ def reset(self, instance_id):
return env.observation_space.to_jsonable(obs)

def step(self, instance_id, action, render):
self.env_lasttime[instance_id] = time.time()
env = self._lookup_env(instance_id)
if isinstance(action, six.integer_types):
nice_action = action
Expand Down
119 changes: 56 additions & 63 deletions script/batchrl_train.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@
from rl4rs.env.seqslate import SeqSlateRecEnv, SeqSlateState
from script import batchrl_trainer
from d3rlpy.dataset import MDPDataset
from rl4rs.nets.cql.encoder import CustomVectorEncoderFactory
from rl4rs.nets.cql.q_function import CustomMeanQFunctionFactory
from script.offline_evaluation import ope_eval
from rl4rs.policy.behavior_model import behavior_model
from rl4rs.policy.policy_model import policy_model
from rl4rs.nets.cql.encoder import CustomVectorEncoderFactory
from d3rlpy.metrics.scorer import dynamics_observation_prediction_error_scorer
from d3rlpy.metrics.scorer import dynamics_reward_prediction_error_scorer
from d3rlpy.metrics.scorer import dynamics_prediction_variance_scorer

algo = sys.argv[1]
stage = sys.argv[2]
Expand All @@ -21,7 +23,7 @@
config = {"epoch": 4, "maxlen": 64, "batch_size": 2048, "action_size": 284, "class_num": 2, "dense_feature_num": 432,
"category_feature_num": 21, "category_hash_size": 100000, "seq_num": 2, "emb_size": 128,
"hidden_units": 128, "max_steps": 9, "sample_file": '../dataset/rl4rs_dataset_a_shuf.csv',
"model_file": "../output/rl4rs_dataset_a_dnn/model", 'gpu': True, "page_items": 9,
"model_file": "../output/rl4rs_dataset_a_dnn/model", 'gpu': True, "page_items": 9, 'action_emb_size':32,
"iteminfo_file": '../dataset/item_info.csv', "support_d3rl_mask": True, "is_eval": True,
"CQL_alpha": 1, 'env': 'SlateRecEnv-v0', 'trial_name': 'a_all'}

Expand All @@ -39,78 +41,73 @@
else:
assert config['env'] in ('SlateRecEnv-v0', 'SeqSlateRecEnv-v0')

if algo in ('MOPO', 'COMBO') or 'conti' in algo:
config["support_conti_env"] = True

if not config.get('gpu', True):
os.environ['CUDA_VISIBLE_DEVICES'] = '-1'
torch.cuda.is_available = lambda: False
print('CUDA_VISIBLE_DEVICES', torch.cuda.is_available())

trail_name = config['env'] + '_' + config['trial_name'] + '.h5'
if not config.get("support_conti_env",False):
trail_name = config['env'] + '_' + config['trial_name'] + '.h5'
elif config.get("support_onehot_action", False):
config['action_emb_size'] = config["action_size"]
trail_name = config['env'] + '_' + config['trial_name'] + '_onehot.h5'
else:
trail_name = config['env'] + '_' + config['trial_name'] + '_conti.h5'
dataset_dir = os.environ['rl4rs_dataset_dir']
output_dir = os.environ['rl4rs_output_dir']
dataset_save_path = dataset_dir + '/' + trail_name
dynamics_save_path = output_dir + '/' + 'dynamics' + '_' + trail_name
model_save_path = output_dir + '/' + algo + '_' + trail_name
scaler = None

print(trail_name, config)

if algo == 'BC':
encoder_factory = CustomVectorEncoderFactory(
config,
action_size=config["action_size"],
mask_size=int(config['page_items'])+1,
with_q=True,
hidden_units=[256]
)
model = d3rlpy.algos.DiscreteBC(
batch_size=256,
beta=0,
encoder_factory=encoder_factory,
use_gpu=config['gpu']
)
elif algo == 'BCQ':
encoder_factory = CustomVectorEncoderFactory(
config,
action_size=config["action_size"],
mask_size=int(config['page_items'])+1,
with_q=True,
hidden_units=[256]
)
model = d3rlpy.algos.DiscreteBCQ(
batch_size=256,
encoder_factory=encoder_factory,
use_gpu=config['gpu']
)
elif algo == 'CQL':
encoder_factory = CustomVectorEncoderFactory(
config,
action_size=config["action_size"],
mask_size=int(config['page_items'])+1,
with_q=True,
hidden_units=[256]
)
model = d3rlpy.algos.DiscreteCQL(
batch_size=256,
q_func_factory=CustomMeanQFunctionFactory(share_encoder=True),
encoder_factory=encoder_factory,
gamma=1.0,
alpha=config["CQL_alpha"],
reward_scaler='standard',
use_gpu=config['gpu']
)
else:
assert algo in ('BC', 'BCQ', 'CQL')
try:
dataset = MDPDataset.load(dataset_save_path)
except Exception:
dataset = None

try:
dynamics = batchrl_trainer.get_model(config, 'dynamics')
dynamics = batchrl_trainer.build_with_dataset(dynamics, dataset)
dynamics.load_model(dynamics_save_path)
except Exception:
dynamics = None

if stage == 'dataset_generate':
if config['env'] == 'SlateRecEnv-v0':
batchrl_trainer.data_generate_rl4rs_a(config, dataset_save_path)
if not config.get("support_conti_env",False):
batchrl_trainer.data_generate_rl4rs_a(config, dataset_save_path)
else:
batchrl_trainer.data_generate_rl4rs_a_conti(config, dataset_save_path)
elif config['env'] == 'SeqSlateRecEnv-v0':
batchrl_trainer.data_generate_rl4rs_b(config, dataset_save_path)
if not config.get("support_conti_env",False):
batchrl_trainer.data_generate_rl4rs_b(config, dataset_save_path)
else:
batchrl_trainer.data_generate_rl4rs_b_conti(config, dataset_save_path)
else:
batchrl_trainer.data_generate_rl4rs_a(config, dataset_save_path)
assert config['env'] in ('SlateRecEnv-v0', 'SeqSlateRecEnv-v0')

if stage == 'train_dynamics' or (stage == 'train' and algo == 'dynamics'):
dynamics = batchrl_trainer.get_model(config, 'dynamics')
print('get_action_size', dataset.episodes[0].get_action_size())
dynamics.fit(dataset,
eval_episodes=dataset.episodes[-3000:],
n_epochs=10,
show_progress=False,
scorers={
'observation_error': dynamics_observation_prediction_error_scorer,
'reward_error': dynamics_reward_prediction_error_scorer,
'variance': dynamics_prediction_variance_scorer,
}
)
dynamics.save_model(dynamics_save_path)

if stage == 'train':
dataset = MDPDataset.load(dataset_save_path)
model = batchrl_trainer.get_model(config, algo, dynamics)
model.fit(dataset,
eval_episodes=dataset.episodes[-3000:],
n_epochs=config['epoch'],
Expand All @@ -122,24 +119,20 @@
if config['env'] == 'SlateRecEnv-v0' \
else 90 * 2
soft_opc_score = config.get('soft_opc_score', default_soft_opc_score)
dataset = MDPDataset.load(dataset_save_path)
model.build_with_dataset(dataset)
model = batchrl_trainer.get_model(config, algo, dynamics)
model = batchrl_trainer.build_with_dataset(model, dataset)
model.load_model(model_save_path)
eval_episodes = random.sample(dataset.episodes, 2048 * 4)
policy = policy_model(model, config=config)
batchrl_trainer.d3rlpy_eval(eval_episodes, policy, soft_opc_score)
# batchrl_trainer.d3rlpy_eval(eval_episodes, policy, soft_opc_score)
batchrl_trainer.evaluate(config, policy)

if stage == 'ope':
dataset_dir = os.environ['rl4rs_dataset_dir']
sample_model = behavior_model(config, modelfile=dataset_dir + '/logged_policy.h5')
dataset = MDPDataset.load(dataset_save_path)
model.build_with_dataset(dataset)
model = batchrl_trainer.get_model(config, algo, dynamics)
model = batchrl_trainer.build_with_dataset(model, dataset)
model.load_model(model_save_path)
if model.reward_scaler is not None:
model.reward_scaler.fit(dataset)
print('reward_scaler_mean', model.reward_scaler._mean)
print('reward_scaler_std', model.reward_scaler._std)
eval_config = config.copy()
eval_config["is_eval"] = True
eval_config["batch_size"] = 2048
Expand Down
Loading

0 comments on commit 7f21a78

Please sign in to comment.