Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat]: add rtp fg to support negative sampler and batch sequence feature op #296

Open
wants to merge 22 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
9168265
add ssr_fg (neg_sampler && batch_seq_op)
wwxxzz Oct 18, 2022
fe2b84f
fix bugs
wwxxzz Oct 18, 2022
66ac49b
fix bugs
wwxxzz Oct 18, 2022
188c7ea
Merge branch 'master' of github.com:alibaba/EasyRec into fg_seq
wwxxzz Oct 18, 2022
e0e921b
add local op file & bug fix
wwxxzz Oct 18, 2022
2089576
[feat]: add dnn fg test & delete multi tower recall model
wwxxzz Oct 19, 2022
db052ab
[bugfix]: modify samples/rtp_fg/fg_test_extensions_final.config
wwxxzz Oct 19, 2022
bbcdab1
[bugfix]: add op file to ops/
wwxxzz Oct 19, 2022
443084b
[bugfix]: fix model config
wwxxzz Oct 19, 2022
9a3535b
[bugfix]: modify total steps for test case
wwxxzz Oct 19, 2022
6c3137e
[bugfix]: add tf1.15 op
wwxxzz Oct 20, 2022
0f799a4
[bugfix]: dict_values not support index in py3
wwxxzz Oct 20, 2022
39365d2
[bugfix]:modify dict_values to list
wwxxzz Oct 20, 2022
1cb867b
[bugfix] fix some bugs on fg op
wwxxzz Oct 27, 2022
37c748e
Merge branch 'fg_seq' of github.com:alibaba/EasyRec into fg_seq
wwxxzz Oct 27, 2022
9bc1803
[bugfix] fix bug in main.py about fg_json_path
wwxxzz Oct 27, 2022
e210145
[bugfix] fix bug on sequence feature and negative sampler in DSSM
wwxxzz Oct 27, 2022
aed3778
[bugfix] fix bug on sequence feature and negative sampler in DSSM
wwxxzz Oct 27, 2022
3f57ea5
[bugfix] fix bug on sequence feature and negative sampler in DSSM
wwxxzz Oct 27, 2022
75d57ac
[bugfix] remove test case of dssm+negative+sequence
wwxxzz Oct 28, 2022
06d56df
fix bugs
wwxxzz Nov 14, 2022
e529951
Merge branch 'master' of github.com:alibaba/EasyRec into fg_seq
wwxxzz Nov 14, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions docs/source/models/dnn_fg.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# MultiTowerRecall

### 简介

专为接入RTP FG时加入负采样和序列特征训练准备的DNN模型。

### 配置说明

wwxxzz marked this conversation as resolved.
Show resolved Hide resolved
```protobuf
fg_json_path: "!samples/model_config/fg_fusion_train_seq.json"
model_config {
model_class: "DNNFG"
feature_groups {
group_name: "all"
feature_names: 'adgroup_id'
feature_names: 'cate_id'
feature_names: 'campaign_id'
feature_names: 'customer'
feature_names: 'brand'
feature_names: 'user_id'
feature_names: 'cms_segid'
feature_names: 'cms_group_id'
feature_names: 'final_gender_code'
feature_names: 'age_level'
feature_names: 'pvalue_level'
feature_names: 'shopping_level'
feature_names: 'occupation'
feature_names: 'new_user_class_level'
wide_deep: DEEP
sequence_features: {
group_name: "seq_fea"
tf_summary: false
seq_att_map: {
key: "cate_id"
key: "brand"
hist_seq: "click_seq__cate_id"
hist_seq: "click_seq__brand"
}
}
}
dnnfg {
dnn {
hidden_units: 256
hidden_units: 128
hidden_units: 64
}
l2_regularization: 1e-6
}
embedding_regularization: 5e-6
}
```

- fg_json_path: 指定fg json文件目录
- model_class: 'DNNFG', 不需要修改
- feature_groups: 需要一个feature_group: all, **group name不能变**
- dnnfg: dnnfg相关的参数
- dnn: deep part的参数配置
- hidden_units: dnn每一层的channel数目,即神经元的数目
- embedding_regularization: 对embedding部分加regularization,防止overfit

支持的metric_set包括:

- auc
- gauc
- recall_at_topK

### 示例Config

见路径:samples/model_config/fg_fusion_train_neg_seq_on_dnn.config
2 changes: 1 addition & 1 deletion easy_rec/python/core/sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ def get(self, ids):
sampled_values = tf.py_func(self._get_impl, [ids], self._attr_tf_types)
result_dict = {}
for k, t, v in zip(self._attr_names, self._attr_tf_types, sampled_values):
v.set_shape([self._num_sample])
v.set_shape([None])
wwxxzz marked this conversation as resolved.
Show resolved Hide resolved
result_dict[k] = v
return result_dict

Expand Down
218 changes: 187 additions & 31 deletions easy_rec/python/input/input.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
# -*- encoding:utf-8 -*-
# -*- encoding:utf-8 -*-
# Copyright (c) Alibaba, Inc. and its affiliates.
import json
import logging
import os
from abc import abstractmethod
from collections import OrderedDict

import six
import tensorflow as tf

import easy_rec
from easy_rec.python.core import sampler as sampler_lib
from easy_rec.python.protos.dataset_pb2 import DatasetConfig
from easy_rec.python.utils import config_util
from easy_rec.python.utils import constant
from easy_rec.python.utils import fg_util
from easy_rec.python.utils.check_utils import check_split
from easy_rec.python.utils.check_utils import check_string_to_number
from easy_rec.python.utils.expr_util import get_expression
Expand Down Expand Up @@ -78,44 +83,97 @@ def __init__(self,

self._input_path = input_path

self._fg_json_path = None
self._fg_config = None
self._fg_module = None
self._fg_input_map = dict()
self._effective_fg_features = set()

# if self._fg_json_path is not None and self._fg_json_path != '':
# if self._fg_json_path.startswith('!'):
# self._fg_json_path = self._fg_json_path[1:]
# with tf.gfile.GFile(self._fg_json_path, 'r') as f:
# self._fg_config = json.load(f)
# for feature_config in self._fg_config['features']:
# if 'sequence_name' in feature_config:
# sequence_name = feature_config['sequence_name']
# for sub_feature_config in feature_config['features']:
# sub_feature_name = sub_feature_config['feature_name']
# feature_name = sequence_name + '__' + sub_feature_name
# self._fg_input_map[feature_name] = [
# sequence_name + '__' +
# sub_feature_config['expression'].split(':')[-1]
# ]
# else:
# feature_type = feature_config['feature_type']
# feature_name = feature_config['feature_name']
# if feature_type in ['id_feature', 'raw_feature']:
# self._fg_input_map[feature_name] = [
# feature_config['expression'].split(':')[-1]
# ]
# elif feature_type == 'combo_feature':
# self._fg_input_map[feature_name] = [
# k.split(':')[-1] for k in feature_config['expression']
# ]
# elif feature_type == 'lookup_feature':
# self._fg_input_map[feature_name] = [
# feature_config['map'].split(':')[-1],
# feature_config['key'].split(':')[-1]
# ]
# elif feature_type == 'match_feature':
# self._fg_input_map[feature_name] = [
# feature_config['user'].split(':')[-1],
# feature_config['category'].split(':')[-1],
# feature_config['item'].split(':')[-1],
# ]
# else:
# raise ValueError('Unknown feature type: %s' % feature_type)
# fg_op_path = os.path.join(easy_rec.ops_dir, 'libfg_op.so')
# self._fg_module = tf.load_op_library(fg_op_path)

# findout effective fields
self._effective_fields = []

# for multi value inputs, the types maybe different
# from the types defined in input_fields
# it is used in create_multi_placeholders
self._multi_value_types = {}

for fc in self._feature_configs:
for input_name in fc.input_names:
assert input_name in self._input_fields, 'invalid input_name in %s' % str(
fc)
if input_name not in self._effective_fields:
self._effective_fields.append(input_name)

if fc.feature_type in [fc.TagFeature, fc.SequenceFeature]:
if fc.hash_bucket_size > 0:
self._multi_value_types[fc.input_names[0]] = tf.string
else:
self._multi_value_types[fc.input_names[0]] = tf.int64
if len(fc.input_names) > 1:
self._multi_value_types[fc.input_names[1]] = tf.float32

if fc.feature_type == fc.RawFeature:
self._multi_value_types[fc.input_names[0]] = tf.float32

# add sample weight to effective fields
if self._data_config.HasField('sample_weight'):
self._effective_fields.append(self._data_config.sample_weight)

self._effective_fids = [
self._input_fields.index(x) for x in self._effective_fields
]
# sort fids from small to large
self._effective_fids = list(set(self._effective_fids))
self._effective_fields = [
self._input_fields[x] for x in self._effective_fids
]
# for fc in self._feature_configs:
# for input_name in fc.input_names:
# if self._fg_config is not None and input_name in self._fg_input_map:
# self._effective_fg_features.add(input_name)
# true_input_names = self._fg_input_map[input_name]
# else:
# true_input_names = [input_name]
# for true_input_name in true_input_names:
# assert true_input_name in self._input_fields, 'invalid input_name in %s' % str(
# fc)
# if true_input_name not in self._effective_fields:
# self._effective_fields.append(true_input_name)

# if fc.feature_type in [fc.TagFeature, fc.SequenceFeature]:
# if fc.hash_bucket_size > 0:
# self._multi_value_types[fc.input_names[0]] = tf.string
# else:
# self._multi_value_types[fc.input_names[0]] = tf.int64
# if len(fc.input_names) > 1:
# self._multi_value_types[fc.input_names[1]] = tf.float32

# if fc.feature_type == fc.RawFeature:
# self._multi_value_types[fc.input_names[0]] = tf.float32

# # add sample weight to effective fields
# if self._data_config.HasField('sample_weight'):
# self._effective_fields.append(self._data_config.sample_weight)

# self._effective_fids = [
# self._input_fields.index(x) for x in self._effective_fields
# ]
# # sort fids from small to large
# self._effective_fids = list(set(self._effective_fids))
# self._effective_fields = [
# self._input_fields[x] for x in self._effective_fids
# ]

self._label_fids = [self._input_fields.index(x) for x in self._label_fields]

Expand Down Expand Up @@ -284,6 +342,9 @@ def _preprocess(self, field_dict):
sampler_type = self._data_config.WhichOneof('sampler')
sampler_config = getattr(self._data_config, sampler_type)
item_ids = field_dict[sampler_config.item_id_field]

parsed_dict['__batch_size__'] = tf.shape(item_ids)[0]
parsed_dict['__sampler_type__'] = sampler_type
if sampler_type in ['negative_sampler', 'negative_sampler_in_memory']:
sampled = self._sampler.get(item_ids)
elif sampler_type == 'negative_sampler_v2':
Expand All @@ -294,6 +355,11 @@ def _preprocess(self, field_dict):
sampled = self._sampler.get(user_ids, item_ids)
else:
raise ValueError('Unknown sampler %s' % sampler_type)

parsed_dict['__num_neg_sample__'] = tf.shape(list(sampled.values())[0])[0]
self._appended_fields.append('__num_neg_sample__')
self._appended_fields.append('__sampler_type__')

for k, v in sampled.items():
if k in field_dict:
field_dict[k] = tf.concat([field_dict[k], v], axis=0)
Expand All @@ -302,6 +368,13 @@ def _preprocess(self, field_dict):
parsed_dict[k] = v
self._appended_fields.append(k)

if self._fg_config is not None:
if self._mode != tf.estimator.ModeKeys.PREDICT and self._fg_module is not None:
parsed_dict['_fg_cfg'] = True
self._appended_fields.append('_fg_cfg')
field_dict = fg_util._fg(self._fg_config, self._effective_fg_features,
self._fg_module, field_dict, parsed_dict)

for fc in self._feature_configs:
feature_name = fc.feature_name
feature_type = fc.feature_type
Expand Down Expand Up @@ -793,7 +866,46 @@ def _safe_shard(self, dataset):
else:
return dataset.shard(self._task_num, self._task_index)

def _set_effective_fields(self):
for fc in self._feature_configs:
for input_name in fc.input_names:
if self._fg_config is not None and input_name in self._fg_input_map:
self._effective_fg_features.add(input_name)
true_input_names = self._fg_input_map[input_name]
else:
true_input_names = [input_name]
for true_input_name in true_input_names:
assert true_input_name in self._input_fields, 'invalid input_name in %s' % str(
fc)
if true_input_name not in self._effective_fields:
self._effective_fields.append(true_input_name)

if fc.feature_type in [fc.TagFeature, fc.SequenceFeature]:
if fc.hash_bucket_size > 0:
self._multi_value_types[fc.input_names[0]] = tf.string
else:
self._multi_value_types[fc.input_names[0]] = tf.int64
if len(fc.input_names) > 1:
self._multi_value_types[fc.input_names[1]] = tf.float32

if fc.feature_type == fc.RawFeature:
self._multi_value_types[fc.input_names[0]] = tf.float32

# add sample weight to effective fields
if self._data_config.HasField('sample_weight'):
self._effective_fields.append(self._data_config.sample_weight)

self._effective_fids = [
self._input_fields.index(x) for x in self._effective_fields
]
# sort fids from small to large
self._effective_fids = list(set(self._effective_fids))
self._effective_fields = [
self._input_fields[x] for x in self._effective_fids
]

def create_input(self, export_config=None):
self._set_effective_fields()

def _input_fn(mode=None, params=None, config=None):
"""Build input_fn for estimator.
Expand Down Expand Up @@ -829,3 +941,47 @@ def _input_fn(mode=None, params=None, config=None):

_input_fn.input_creator = self
return _input_fn

def set_fg_path(self, fg_json_path=None):
self._fg_json_path = fg_json_path
if self._fg_json_path is not None and self._fg_json_path != '':
if self._fg_json_path.startswith('!'):
self._fg_json_path = self._fg_json_path[1:]
with tf.gfile.GFile(self._fg_json_path, 'r') as f:
self._fg_config = json.load(f)
for feature_config in self._fg_config['features']:
if 'sequence_name' in feature_config:
sequence_name = feature_config['sequence_name']
for sub_feature_config in feature_config['features']:
sub_feature_name = sub_feature_config['feature_name']
feature_name = sequence_name + '__' + sub_feature_name
self._fg_input_map[feature_name] = [
sequence_name + '__' +
sub_feature_config['expression'].split(':')[-1]
]
else:
feature_type = feature_config['feature_type']
feature_name = feature_config['feature_name']
if feature_type in ['id_feature', 'raw_feature']:
self._fg_input_map[feature_name] = [
feature_config['expression'].split(':')[-1]
]
elif feature_type == 'combo_feature':
self._fg_input_map[feature_name] = [
k.split(':')[-1] for k in feature_config['expression']
]
elif feature_type == 'lookup_feature':
self._fg_input_map[feature_name] = [
feature_config['map'].split(':')[-1],
feature_config['key'].split(':')[-1]
]
elif feature_type == 'match_feature':
self._fg_input_map[feature_name] = [
feature_config['user'].split(':')[-1],
feature_config['category'].split(':')[-1],
feature_config['item'].split(':')[-1],
]
else:
raise ValueError('Unknown feature type: %s' % feature_type)
fg_op_path = os.path.join(easy_rec.ops_dir, 'libfg_op.so')
self._fg_module = tf.load_op_library(fg_op_path)
7 changes: 5 additions & 2 deletions easy_rec/python/input/odps_input_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
from easy_rec.python.utils import odps_util

try:
import pai
if tf.__version__ == '1.15':
from tensorflow.python.ops.work_queue import WorkQueue
else:
from pai.data import WorkQueue
except Exception:
pass

Expand Down Expand Up @@ -50,7 +53,7 @@ def _build(self, mode, params):
mode == tf.estimator.ModeKeys.TRAIN:
logging.info('pai_worker_slice_num = %d' %
self._data_config.pai_worker_slice_num)
work_queue = pai.data.WorkQueue(
work_queue = WorkQueue(
self._input_path,
num_epochs=self.num_epochs,
shuffle=self._data_config.shuffle,
Expand Down
Loading