ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

DeepCTR

2022-01-27 11:01:22  阅读:207  来源: 互联网

标签:name list feature embedding input DeepCTR columns


DeepCTR框架,作者是阿里大佬浅梦  【官方文档:https://deepctr-doc.readthedocs.io/en/latest/】 主要是对目前的一些基于深度学习的点击率预测算法进行了实现,如PNN,WDL,DeepFM,MLR,DeepCross,AFM,NFM,DIN,DIEN,xDeepFM,AutoInt等,并且对外提供了一致的调用接口。

基于temsorflow的deepctr目录

 

 

一般步骤
  • 安装,导包
  • 数据预处理
  • 生成特征列
  • 生成训练样本和模型
主要模块包括:
  • feature_column-构造特征列
  • inputs-处理输入数据
  • models-实现常见ctr算法
  • layers-继承tf.keras.layers.Layer

feature_column

 

 

 

包含3个类,对应三种特征类型
  • 类别特征-SparseFeat
  • 数值特征-DenseFeat
  • 序列特征-VarLenSparseFeat
以及4个方法
  • get_feature_names
  • build_input_features
  • get_linear_logit
  • input_from_feature_columns

特征类型

SparseFeat(稀疏)

SparseFeat用于处理类别特征,如性别、国籍等类别特征,将类别特征转为固定维度的稠密特征。

SparseFeat(name, vocabulary_size, embedding_dim, use_hash, dtype,embedding_name, group_name)

"""
name:生成的特征列的名字
vocabulary_size:不同特征值的个数或当use_hash=True时的哈希空间
embedding_dim:嵌入向量的维度
use_hash:是否使用哈希编码,默认False;如果为True,则输入将散列到大小为0到vocabulary_size的空间
dtype:默认int32
embeddings_initializer:嵌入矩阵初始化方式,默认随机初始化
embedding_name:默认None,其名字与name保持一致
group_name:特征列所属的组
traninable:嵌入矩阵是否可训练,默认True
"""
 

DenseFeat(密集)

DenseFeat将稠密特征转为向量的形式,并使用transform_fn 函数对其做归一化操作或者其它的线性或非线性变换。

DenseFeat(name, dimension, dtype, transform_fn)
"""
name:特征列名字
dimension:嵌入特征维度,默认为1
dtype:特征类型,默认为float32
transform_fn:转换函数,归一化函数或线性变换函数,返回张量
"""

VarLenSparseFeat

处理类似文本序列的可变长度类型特征。

VarLenSparseFeat(sparsefeat, maxlen, combiner, length_name)
"""
sparseFeat:属于SparseFeat的实例
maxlen:该特征列的最大特征值
combiner:池化方法,mean、sum、max,默认为mean
length_name:特征长度名称,如果为None,用0填充
weight_name:默认None,如果不为空,序列特征会与weight_name的权重进行相乘
weight_norm:是否对权重分数进行归一化,默认True
"""

 

相关方法

get_feature_names

'''
作用:获取所有特征列的名字,以列表形式返回
'''
def get_feature_names(feature_columns):
    features = build_input_features(feature_columns) #为特征列构造keras tensor
    return list(features.keys()) #返回特征列的names

build_input_features

'''
作用:为所有的特征列构造keras tensor,结果以OrderDict形式返回
'''
def build_input_features(feature_columns, prefix=''):
    input_features = OrderedDict()
    for fc in feature_columns:
        if isinstance(fc, SparseFeat): #判断fc是否属于SparseFeat实例
            input_features[fc.name] = Input(
                shape=(1,), name=prefix + fc.name, dtype=fc.dtype) #Input()函数用于构造keras tensor
        elif isinstance(fc, DenseFeat):
            input_features[fc.name] = Input(
                shape=(fc.dimension,), name=prefix + fc.name, dtype=fc.dtype)
        elif isinstance(fc, VarLenSparseFeat):
            input_features[fc.name] = Input(shape=(fc.maxlen,), name=prefix + fc.name,
                                            dtype=fc.dtype)
            if fc.weight_name is not None:
                input_features[fc.weight_name] = Input(shape=(fc.maxlen, 1), name=prefix + fc.weight_name,
                                                       dtype="float32")
            if fc.length_name is not None:
                input_features[fc.length_name] = Input((1,), name=prefix + fc.length_name, dtype='int32')

        else:
            raise TypeError("Invalid feature column type,got", type(fc))

    return input_features

get_linear_logit

'''
作用:获取linear_logit(线性变换)的结果
'''
def get_linear_logit(features, feature_columns, units=1, use_bias=False, seed=1024, prefix='linear',
                     l2_reg=0):
    linear_feature_columns = copy(feature_columns)
    #将SparseFeat和VarLenSparseFeat的embedding_dim强制置换为1
    for i in range(len(linear_feature_columns)):
        if isinstance(linear_feature_columns[i], SparseFeat):
            linear_feature_columns[i] = linear_feature_columns[i]._replace(embedding_dim=1,
                                                                           embeddings_initializer=Zeros())
        if isinstance(linear_feature_columns[i], VarLenSparseFeat):
            linear_feature_columns[i] = linear_feature_columns[i]._replace(
                sparsefeat=linear_feature_columns[i].sparsefeat._replace(embedding_dim=1,
                                                                         embeddings_initializer=Zeros()))
    #获取用于线性变换的embedding list
    linear_emb_list = [input_from_feature_columns(features, linear_feature_columns, l2_reg, seed,
                                                  prefix=prefix + str(i))[0] for i in range(units)]
    #获取DenseFeat的数值特征
    _, dense_input_list = input_from_feature_columns(features, linear_feature_columns, l2_reg, seed, prefix=prefix)

    linear_logit_list = []
    for i in range(units):

        if len(linear_emb_list[i]) > 0 and len(dense_input_list) > 0: #既有稀疏类别特征也有稠密特征的情况
            sparse_input = concat_func(linear_emb_list[i]) #将所有稀疏特征列的嵌入向量进行拼接
            dense_input = concat_func(dense_input_list) #将所有稠密特征列的数值特征进行拼接
            linear_logit = Linear(l2_reg, mode=2, use_bias=use_bias, seed=seed)([sparse_input, dense_input]) #将sparse_input和dense_input拼接后进行线性变换
        elif len(linear_emb_list[i]) > 0:  #仅有稀疏类别特征的情况
            sparse_input = concat_func(linear_emb_list[i])
            linear_logit = Linear(l2_reg, mode=0, use_bias=use_bias, seed=seed)(sparse_input)
        elif len(dense_input_list) > 0: #仅有稠密数值特征的情况
            dense_input = concat_func(dense_input_list)
            linear_logit = Linear(l2_reg, mode=1, use_bias=use_bias, seed=seed)(dense_input)
        else:
            # raise NotImplementedError
            return add_func([])
        linear_logit_list.append(linear_logit)

    return concat_func(linear_logit_list) #将所有logit结果拼接后返回

input_from_feature_columns

'''
为所有特征列创建嵌入矩阵,并分别返回包含SparseFeat和VarLenSparseFeat的嵌入矩阵的字典,以及包含DenseFeat的数值特征的字典
具体实现是通过调用inputs中的create_embedding_matrix、embedding_lookup、varlen_embedding_lookup等函数完成
'''
def input_from_feature_columns(features, feature_columns, l2_reg, seed, prefix='', seq_mask_zero=True,
                               support_dense=True, support_group=False):
    sparse_feature_columns = list(
        filter(lambda x: isinstance(x, SparseFeat), feature_columns)) if feature_columns else []
    varlen_sparse_feature_columns = list(
        filter(lambda x: isinstance(x, VarLenSparseFeat), feature_columns)) if feature_columns else []

    embedding_matrix_dict = create_embedding_matrix(feature_columns, l2_reg, seed, prefix=prefix,
                                                    seq_mask_zero=seq_mask_zero)
    group_sparse_embedding_dict = embedding_lookup(embedding_matrix_dict, features, sparse_feature_columns)
    dense_value_list = get_dense_input(features, feature_columns)
    if not support_dense and len(dense_value_list) > 0:
        raise ValueError("DenseFeat is not supported in dnn_feature_columns")

    sequence_embed_dict = varlen_embedding_lookup(embedding_matrix_dict, features, varlen_sparse_feature_columns)
    group_varlen_sparse_embedding_dict = get_varlen_pooling_list(sequence_embed_dict, features,
                                                                 varlen_sparse_feature_columns)
    group_embedding_dict = mergeDict(group_sparse_embedding_dict, group_varlen_sparse_embedding_dict)
    if not support_group:
        group_embedding_dict = list(chain.from_iterable(group_embedding_dict.values()))
    return group_embedding_dict, dense_value_list

inputs

SparseFeat和VarLenSparseFeat对象需要创建嵌入矩阵,嵌入矩阵的构造和查表等操作都是通过inputs.py模块实现的,该模块包含9个方法。

get_inputs_list

'''
作用:过滤输入中的空值并返回列表形式的输入
'''
def get_inputs_list(inputs):
    return list(chain(*list(map(lambda x: x.values(), filter(lambda x: x is not None, inputs)))))

通过filter函数过滤输入中的空值,map函数是取每个元素x的value,chain构建了一个迭代器,循环处理输入中的每条样本,最后返回一个list

create_embedding_dict

'''
作用:为每个稀疏特征创建可训练的嵌入矩阵,使用字典存储所有特征列的嵌入矩阵,并返回该字典
'''
def create_embedding_dict(sparse_feature_columns, varlen_sparse_feature_columns, seed, l2_reg,
                          prefix='sparse_', seq_mask_zero=True):
    sparse_embedding = {}
    #处理稀疏特征
    for feat in sparse_feature_columns:
        # 为每个稀疏特征初始化一个vocabulary_size x embedding_dim 大小的嵌入矩阵
        emb = Embedding(feat.vocabulary_size, feat.embedding_dim,
                        embeddings_initializer=feat.embeddings_initializer,
                        embeddings_regularizer=l2(l2_reg),
                        name=prefix + '_emb_' + feat.embedding_name) 
        # 令该嵌入矩阵可训练
        emb.trainable = feat.trainable
        #添加到字典中
        sparse_embedding[feat.embedding_name] = emb
    #处理可变长度稀疏特征,处理方法同上
    if varlen_sparse_feature_columns and len(varlen_sparse_feature_columns) > 0:
        for feat in varlen_sparse_feature_columns:
            # if feat.name not in sparse_embedding:
            emb = Embedding(feat.vocabulary_size, feat.embedding_dim,
                            embeddings_initializer=feat.embeddings_initializer,
                            embeddings_regularizer=l2(
                                l2_reg),
                            name=prefix + '_seq_emb_' + feat.name,
                            mask_zero=seq_mask_zero)
            emb.trainable = feat.trainable
            sparse_embedding[feat.embedding_name] = emb
    return sparse_embedding

get_embedding_vec_dict

'''
作用:从所有稀疏特征列中查询指定稀疏特征列(参数return_feat_list)的嵌入矩阵,以列表形式返回查询结果
关键参数:
    embedding_dict:type:dict;存储着所有特征列的嵌入矩阵的字典
    input_dict:type:dict;存储着特征列和对应的嵌入矩阵索引的字典,在没有使用hash查询时使用 
    sparse_feature_columns:type:list;所有稀疏特征列
    return_feat_list:需要查询的特征列,默认为空,为空则返回所有稀疏特征列的嵌入矩阵,不为空则仅返回该元组中的特征列的嵌入矩阵
'''
def get_embedding_vec_list(embedding_dict, input_dict, sparse_feature_columns, return_feat_list=(), mask_feat_list=()):
    embedding_vec_list = []
    for fg in sparse_feature_columns:
        feat_name = fg.name
        if len(return_feat_list) == 0 or feat_name in return_feat_list:
            if fg.use_hash:
                lookup_idx = Hash(fg.vocabulary_size, mask_zero=(feat_name in mask_feat_list))(input_dict[feat_name])
            else:
                lookup_idx = input_dict[feat_name]

            embedding_vec_list.append(embedding_dict[feat_name](lookup_idx))

    return embedding_vec_list

create_embedding_matrix

'''
作用:从所有特征列中筛选出SparseFeat和VarLenSparseFeat,然后调用函数create_embedding_dict为筛选的特征列创建嵌入矩阵
'''
def create_embedding_matrix(feature_columns, l2_reg, seed, prefix="", seq_mask_zero=True):
    from . import feature_column as fc_lib

    sparse_feature_columns = list(
        filter(lambda x: isinstance(x, fc_lib.SparseFeat), feature_columns)) if feature_columns else []
    varlen_sparse_feature_columns = list(
        filter(lambda x: isinstance(x, fc_lib.VarLenSparseFeat), feature_columns)) if feature_columns else []
    sparse_emb_dict = create_embedding_dict(sparse_feature_columns, varlen_sparse_feature_columns, seed,
                                            l2_reg, prefix=prefix + 'sparse', seq_mask_zero=seq_mask_zero)
    return sparse_emb_dict

embedding_lookup

'''
作用:从所有稀疏特征列中查询指定稀疏特征列(参数return_feat_list)的嵌入矩阵,以字典形式返回查询结果
参数:
    sparse_embedding_dict:存储稀疏特征列的嵌入矩阵的字典
    sparse_input_dict:存储稀疏特征列的名字和索引的字典
    sparse_feature_columns:稀疏特征列列表,元素为SparseFeat
    return_feat_list:需要查询的稀疏特征列,如果元组为空,默认返回所有特征列的嵌入矩阵
    mask_feat_list:用于哈希查询
    to_list:是否以列表形式返回查询结果,默认是False
'''
def embedding_lookup(sparse_embedding_dict, sparse_input_dict, sparse_feature_columns, return_feat_list=(),
                     mask_feat_list=(), to_list=False):
    group_embedding_dict = defaultdict(list) #存储结果的列表
    for fc in sparse_feature_columns: # 遍历查找
        feature_name = fc.name
        embedding_name = fc.embedding_name
        if (len(return_feat_list) == 0 or feature_name in return_feat_list):
            if fc.use_hash: #获取哈希查询的索引
                lookup_idx = Hash(fc.vocabulary_size, mask_zero=(feature_name in mask_feat_list))(
                    sparse_input_dict[feature_name])
            else: #从sparse_input_dict中获取该特征列的索引
                lookup_idx = sparse_input_dict[feature_name]

            group_embedding_dict[fc.group_name].append(sparse_embedding_dict[embedding_name](lookup_idx))
    if to_list: # 如果为真,则将结果转为列表形式返回
        return list(chain.from_iterable(group_embedding_dict.values()))
    return group_embedding_dict

varlen_embedding_kookup

'''
作用:获取varlen_sparse_feature_columns的嵌入矩阵
'''
def varlen_embedding_lookup(embedding_dict, sequence_input_dict, varlen_sparse_feature_columns):
    varlen_embedding_vec_dict = {}
    for fc in varlen_sparse_feature_columns:
        feature_name = fc.name
        embedding_name = fc.embedding_name
        if fc.use_hash:
            lookup_idx = Hash(fc.vocabulary_size, mask_zero=True)(sequence_input_dict[feature_name])
        else:
            lookup_idx = sequence_input_dict[feature_name]
        varlen_embedding_vec_dict[feature_name] = embedding_dict[embedding_name](lookup_idx)
    return varlen_embedding_vec_dict

get_varlen_pooling_list

'''
作用:获取varlen_sparse_feature_columns池化后的嵌入向量
'''
def get_varlen_pooling_list(embedding_dict, features, varlen_sparse_feature_columns, to_list=False):
    pooling_vec_list = defaultdict(list)
    for fc in varlen_sparse_feature_columns:
        feature_name = fc.name
        combiner = fc.combiner
        feature_length_name = fc.length_name
        if feature_length_name is not None: # length_name不为空,说明该特征列不存在用0填充的情况
            if fc.weight_name is not None: # weight_name不为空,说明序列需要进行权重化操作
                seq_input = WeightedSequenceLayer(weight_normalization=fc.weight_norm)(
                    [embedding_dict[feature_name], features[feature_length_name], features[fc.weight_name]]) #需要对查找结果做权重化操作再得到seq_input
            else: # weight_name为空,说明序列不需要进行权重化操作
                seq_input = embedding_dict[feature_name] #直接从嵌入矩阵里找到对应结果,赋值给seq_input
            vec = SequencePoolingLayer(combiner, supports_masking=False)(
                [seq_input, features[feature_length_name]]) #池化操作,因为没有填充,所以supports_masking=False,即池化时不需要mask掉填充的部分
        else:  #length_name为空,说明该特征列存在用0填充的情况,因此在权重化操作和池化操作时都要令supports_masking=True,即mask掉填充的部分
            if fc.weight_name is not None:
                seq_input = WeightedSequenceLayer(weight_normalization=fc.weight_norm, supports_masking=True)(
                    [embedding_dict[feature_name], features[fc.weight_name]])
            else:
                seq_input = embedding_dict[feature_name]
            vec = SequencePoolingLayer(combiner, supports_masking=True)(
                seq_input)
        pooling_vec_list[fc.group_name].append(vec)
    if to_list:
        return chain.from_iterable(pooling_vec_list.values())
    return pooling_vec_list

get_dense_input

'''
作用:从所有特征列中选出DenseFeat,并以列表形式返回结果
'''
def get_dense_input(features, feature_columns):
    from . import feature_column as fc_lib
    #筛选出DenseFeat元素
    dense_feature_columns = list(
        filter(lambda x: isinstance(x, fc_lib.DenseFeat), feature_columns)) if feature_columns else []
    dense_input_list = []
    #循环对各个DenseFeat元素执行transform_fn函数
    for fc in dense_feature_columns:
        if fc.transform_fn is None:
            dense_input_list.append(features[fc.name])
        else:
            transform_result = Lambda(fc.transform_fn)(features[fc.name])
            dense_input_list.append(transform_result)
    return dense_input_list

def mergeDict

'''
    作用:将a、b两个字典合并
'''
def mergeDict(a, b):
    c = defaultdict(list)
    for k, v in a.items():
        c[k].extend(v)
    for k, v in b.items():
        c[k].extend(v)
    return c

 

 

 

 

 

   

标签:name,list,feature,embedding,input,DeepCTR,columns
来源: https://www.cnblogs.com/bubblebeee/p/15838842.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有