LDA
是一种概率主题模型:隐式狄利克雷分布(Latent Dirichlet Allocation
,简称LDA
)。LDA
是2003年提出的一种主题模型,它可以将文档集中每篇文档的主题以概率分布的形式给出。
通过分析一些文档,我们可以抽取出它们的主题(分布),根据主题(分布)进行主题聚类或文本分类。同时,它是一种典型的词袋模型,即一篇文档是由一组词构成,词与词之间没有先后顺序的关系。一篇文档可以包含多个主题,文档中每一个词都由其中的一个主题生成。
举一个简单的例子,比如假设事先给定了这几个主题:Arts、Budgets、Children、Education
,然后通过学习的方式,获取每个主题Topic
对应的词语,如下图所示:
然后以一定的概率选取上述某个主题,再以一定的概率选取那个主题下的某个单词,不断的重复这两步,最终生成如下图所示的一篇文章(不同颜色的词语分别表示不同主题)。
我们看到一篇文章后,往往会推测这篇文章是如何生成的,我们通常认为作者会先确定几个主题,然后围绕这几个主题遣词造句写成全文。LDA
要干的事情就是根据给定的文档,判断它的主题分布。在LDA
模型中,生成文档的过程有如下几步:
-
从狄利克雷分布$\alpha$中生成文档i的主题分布$\theta_{i}$ ;
-
从主题的多项式分布$\theta_{i}$中取样生成文档i第j个词的主题$Z_{i,j}$ ;
-
从狄利克雷分布$\eta$中取样生成主题$Z_{i,j}$对应的词语分布$\beta_{i,j}$ ;
-
从词语的多项式分布$\beta_{i,j}$中采样最终生成词语$W_{i,j}$ .
LDA
的图模型结构如下图所示:
LDA
会涉及很多数学知识,后面的章节我会首先介绍LDA
涉及的数学知识,然后在这些数学知识的基础上详细讲解LDA
的原理。
在高等数学中,有一个长相奇特的Gamma
函数
通过分部积分,可以推导gamma
函数有如下递归性质
通过该递归性质,我们可以很容易证明,gamma
函数可以被当成阶乘在实数集上的延拓,具有如下性质
如下函数被称为Digamma
函数,它是Gamma
函数对数的一阶导数
这是一个很重要的函数,在涉及Dirichlet
分布相关的参数的极大似然估计时,往往需要用到这个函数。Digamma
函数具有如下一个漂亮的性质
二项分布是由伯努利分布推出的。伯努利分布,又称两点分布或0-1
分布,是一个离散型的随机分布,其中的随机变量只有两类取值,即0或者1。二项分布是重复n
次的伯努利试验。简言之,只做一次实验,是伯努利分布,重复做了n次,是二项分布。二项分布的概率密度函数为:
对于k=1,2,...,n,其中C(n,k)
是二项式系数(这就是二项分布的名称的由来)
多项分布是二项分布扩展到多维的情况。多项分布是指单次试验中的随机变量的取值不再是0-1
,而是有多种离散值可能(1,2,3...,k)
。比如投掷6个面的骰子实验,N
次实验结果服从K=6
的多项分布。其中:
多项分布的概率密度函数为:
首先看下面的问题1(问题1到问题4都取自于文献【1】)。
问题1:
为解决这个问题,可以尝试计算$x_{(k)}$落在区间[x,x+delta x]
的概率。首先,把[0,1]
区间分成三段[0,x)
,[x,x+delta x]
,(x+delta x,1]
,然后考虑下简单的情形:即假设n个数中只有1个落在了区间[x,x+delta x]
内,由于这个区间内的数X(k)
是第k大的,所以[0,x)
中应该有k−1个数,(x+delta x,1]
这个区间中应该有n−k个数。
如下图所示:
上述问题可以转换为下述事件E
:
对于上述事件E
,有:
其中,o(delta x)
表示delta x
的高阶无穷小。显然,由于不同的排列组合,即n个数中有一个落在[x,x+delta x]
区间的有n种取法,余下n−1个数中有k−1个落在[0,x)
的有C(n-1,k-1)
种组合。所以和事件E等价的事件一共有nC(n-1,k-1)
个。
文献【1】中证明,只要落在[x,x+delta x]
内的数字超过一个,则对应的事件的概率就是o(delta x)
。所以$x_{(k)}$的概率密度函数为:
利用Gamma
函数,我们可以将f(x)
表示成如下形式:
在上式中,我们用alpha=k
,beta=n-k+1
替换,可以得到beta
分布的概率密度函数
什么是共轭呢?轭的意思是束缚、控制。共轭从字面上理解,则是共同约束,或互相约束。在贝叶斯概率理论中,如果后验概率P(z|x)
和先验概率p(z)
满足同样的分布,那么,先验分布和后验分布被叫做共轭分布,同时,先验分布叫做似然函数的共轭先验分布。
我们在问题1的基础上增加一些观测数据,变成问题2:
第2步的条件可以用另外一句话来表述,即“Yi
中有m1
个比X(k
)小,m2
个比X(k)
大”,所以X(k)
是$X_{(1)},X_{(2)},...,X_{(n)};Y_{(1)},Y_{(2)},...,Y_{(m)}$中k+m1大的数。
根据1.5.1的介绍,我们知道事件p服从beta
分布,它的概率密度函数为:
按照贝叶斯推理的逻辑,把以上过程整理如下:
-
1、p是我们要猜测的参数,我们推导出
p
的分布为f(p)=Beta(p|k,n-k+1)
,称为p
的先验分布 -
2、根据
Yi
中有m1
个比p
小,有m2
个比p
大,Yi
相当是做了m
次伯努利实验,所以m1
服从二项分布B(m,p)
-
3、在给定了来自数据提供
(m1,m2)
知识后,p的后验分布变为f(p|m1,m2)=Beta(p|k+m1,n-k+1+m2)
贝叶斯估计的基本过程是:
先验分布 + 数据的知识 = 后验分布
以上贝叶斯分析过程的简单直观的表示就是:
Beta(p|k,n-k+1) + BinomCount(m1,m2) = Beta(p|k+m1,n-k+1+m2)
更一般的,对于非负实数alpha和beta,我们有如下关系
Beta(p|alpha,beta) + BinomCount(m1,m2) = Beta(p|alpha+m1,beta+m2)
针对于这种观测到的数据符合二项分布,参数的先验分布和后验分布都是Beta
分布的情况,就是Beta-Binomial
共轭。换言之,Beta
分布是二项式分布的共轭先验概率分布。二项分布和Beta分布是共轭分布意味着,如果我们为二项分布的参数p选取的先验分布是Beta
分布,那么以p为参数的二项分布用贝叶斯估计得到的后验分布仍然服从Beta
分布。
Dirichlet
分布,是beta
分布在高维度上的推广。Dirichlet
分布的的密度函数形式跟beta
分布的密度函数类似:
其中
至此,我们可以看到二项分布和多项分布很相似,Beta
分布和Dirichlet
分布很相似。并且Beta
分布是二项式分布的共轭先验概率分布。那么Dirichlet
分布呢?Dirichlet
分布是多项式分布的共轭先验概率分布。下文来论证这点。
在1.5.3章问题2的基础上,我们更进一步引入问题3:
类似于问题1的推导,我们可以容易推导联合分布。为了简化计算,我们取x3
满足x1+x2+x3=1
,x1
和x2
是变量。如下图所示。
概率计算如下:
于是我们得到联合分布为:
观察上述式子的最终结果,可以看出上面这个分布其实就是3维形式的Dirichlet
分布。令alpha1=k1,alpha2=k2,alpha3=n-k1-k2+1
,分布密度函数可以写为:
为了论证Dirichlet
分布是多项式分布的共轭先验概率分布,在上述问题3的基础上再进一步,提出问题4。
为了方便计算,我们记
根据问题中的信息,我们可以推理得到p1,p2
在X;Y
这m+n
个数中分别成为了第k1+m1,k1+k2+m1+m2
大的数。后验分布p应该为
同样的,按照贝叶斯推理的逻辑,可将上述过程整理如下:
-
1 我们要猜测参数
P=(p1,p2,p3)
,其先验分布为Dir(p|k)
; -
2 数据
Yi
落到三个区间[0,p1)
,[p1,p2]
,(p2,1]
的个数分别是m1,m2,m3
,所以m=(m1,m2,m3)
服从多项分布Mult(m|p)
; -
3 在给定了来自数据提供的知识
m
后,p
的后验分布变为Dir(P|k+m)
上述贝叶斯分析过程的直观表述为:
Dir(p|k) + Multcount(m) = Dir(p|k+m)
针对于这种观测到的数据符合多项分布,参数的先验分布和后验分布都是Dirichlet
分布的情况,就是Dirichlet-Multinomial
共轭。这意味着,如果我们为多项分布的参数p选取的先验分布是Dirichlet
分布,那么以p为参数的多项分布用贝叶斯估计得到的后验分布仍然服从Dirichlet
分布。
如果p=Beta(t|alpha,beta)
,那么
上式右边的积分对应到概率分布Beta(t|alpha+1,beta)
,对于这个分布,我们有
把上式带人E(p)
的计算式,可以得到:
这说明,对于Beta
分布的随机变量,其期望可以用上式来估计。Dirichlet
分布也有类似的结论。对于p=Dir(t|alpha)
,有
这个结论在后文的推导中会用到。
LDA
涉及的数学知识较多,需要认真体会,以上大部分的知识来源于文献【1,2,3】,如有不清楚的地方,参见这些文献以了解更多。
在介绍LDA之前,我们先介绍几个基础模型:Unigram model
、mixture of unigrams model
、pLSA model
。为了方便描述,首先定义一些变量:
- 1
w
表示词,V
表示所有词的个数 - 2
z
表示主题,k
表示主题的个数 - 3
$D=(W_{1},W_{2},...,W_{M})$ 表示语料库,M
表示语料库中的文档数。 - 4
$W=(w_{1},w_{2},...,w_{N})$ 表示文档,N
表示文档中词的个数。
对于文档$W=(w_{1},w_{2},...,w_{N})$ ,用$p(w_{n})$表示$w_{n}$的先验概率,生成文档W的概率为:
其图模型为(图中被涂色的w
表示可观测变量,N
表示一篇文档中总共N
个单词,M
表示M
篇文档):
该模型的生成过程是:给某个文档先选择一个主题Z
,再根据该主题生成文档,该文档中的所有词都来自一个主题。生成文档的概率为:
其图模型为(图中被涂色的w
表示可观测变量,未被涂色的z
表示未知的隐变量,N
表示一篇文档中总共N
个单词,M
表示M
篇文档):
在混合一元模型中,假定一篇文档只由一个主题生成,可实际中,一篇文章往往有多个主题,只是这多个主题各自在文档中出现的概率大小不一样。在pLSA
中,假设文档由多个主题生成。下面通过一个投色子的游戏(取自文献【2】的例子)说明pLSA
生成文档的过程。
首先,假定你一共有K
个可选的主题,有V
个可选的词。假设你每写一篇文档会制作一颗K
面的“文档-主题”骰子(扔此骰子能得到K
个主题中的任意一个),和K
个V
面的“主题-词项”骰子(每个骰子对应一个主题,K
个骰子对应之前的K
个主题,且骰子的每一面对应要选择的词项,V
个面对应着V
个可选的词)。
比如可令K=3
,即制作1个含有3个主题的“文档-主题”骰子,这3个主题可以是:教育、经济、交通。然后令V = 3
,制作3个有着3面的“主题-词项”骰子,其中,教育主题骰子的3个面上的词可以是:大学、老师、课程,经济主题骰子的3个面上的词可以是:市场、企业、金融,交通主题骰子的3个面上的词可以是:高铁、汽车、飞机。
其次,每写一个词,先扔该“文档-主题”骰子选择主题,得到主题的结果后,使用和主题结果对应的那颗“主题-词项”骰子,扔该骰子选择要写的词。先扔“文档-主题”的骰子,假设以一定的概率得到的主题是:教育,所以下一步便是扔教育主题筛子,以一定的概率得到教育主题筛子对应的某个词大学。
-
上面这个投骰子产生词的过程简化一下便是:“先以一定的概率选取主题,再以一定的概率选取词”。事实上,一开始可供选择的主题有3个:教育、经济、交通,那为何偏偏选取教育这个主题呢?其实是随机选取的,只是这个随机遵循一定的概率分布。比如可能选取教育主题的概率是0.5,选取经济主题的概率是0.3,选取交通主题的概率是0.2,那么这3个主题的概率分布便是
{教育:0.5,经济:0.3,交通:0.2}
,我们把各个主题z
在文档d
中出现的概率分布称之为主题分布,且是一个多项分布。 -
同样的,从主题分布中随机抽取出教育主题后,依然面对着3个词:大学、老师、课程,这3个词都可能被选中,但它们被选中的概率也是不一样的。比如大学这个词被选中的概率是0.5,老师这个词被选中的概率是0.3,课程被选中的概率是0.2,那么这3个词的概率分布便是
{大学:0.5,老师:0.3,课程:0.2}
,我们把各个词语w在主题z下出现的概率分布称之为词分布,这个词分布也是一个多项分布。 -
所以,选主题和选词都是两个随机的过程,先从主题分布
{教育:0.5,经济:0.3,交通:0.2}
中抽取出主题:教育,然后从该主题对应的词分布{大学:0.5,老师:0.3,课程:0.2}
中抽取出词:大学。
最后,你不停的重复扔“文档-主题”骰子和”主题-词项“骰子,重复N
次(产生N
个词),完成一篇文档,重复这产生一篇文档的方法M
次,则完成M
篇文档。
上述过程抽象出来即是pLSA
的文档生成模型。在这个过程中,我们并未关注词和词之间的出现顺序,所以pLSA
是一种词袋模型。定义如下变量:
-
$(z_{1},z_{2},...,z_{k})$ 表示隐藏的主题; -
$P(d_{i})$ 表示海量文档中某篇文档被选中的概率; -
$P(w_{j}|d_{i})$ 表示词$w_{j}$在文档$d_{i}$中出现的概率;针对海量文档,对所有文档进行分词后,得到一个词汇列表,这样每篇文档就是一个词语的集合。对于每个词语,用它在文档中出现的次数除以文档中词语总的数目便是它在文档中出现的概率; -
$P(z_{k}|d_{i})$ 表示主题$z_{k}$在文档$d_{i}$中出现的概率; -
$P(w_{j}|z_{k})$ 表示词$w_{j}$在主题$z_{k}$中出现的概率。与主题关系越密切的词其条件概率越大。
我们可以按照如下的步骤得到“文档-词项”的生成模型:
-
1 按照$P(d_{i})$选择一篇文档$d_{i}$ ;
-
2 选定文档$d_{i}$之后,从主题分布中按照概率$P(z_{k}|d_{i})$选择主题;
-
3 选定主题后,从词分布中按照概率$P(w_{j}|z_{k})$选择一个词。
利用看到的文档推断其隐藏的主题(分布)的过程,就是主题建模的目的:自动地发现文档集中的主题(分布)。文档d
和单词w
是可被观察到的,但主题z
却是隐藏的。如下图所示(图中被涂色的d、w
表示可观测变量,未被涂色的z
表示未知的隐变量,N
表示一篇文档中总共N
个单词,M
表示M
篇文档)。
上图中,文档d
和词w
是我们得到的样本,可观测得到,所以对于任意一篇文档,其$P(w_{j}|d_{i})$是已知的。根据这个概率可以训练得到文档-主题
概率以及主题-词项
概率。即:
故得到文档中每个词的生成概率为:
$P(d_{i})$可以直接得出,而$P(z_{k}|d_{i})$和$P(w_{j}|z_{k})$未知,所以
z
,所以我们可以用EM
算法来估计这个参数。
LDA
的不同之处在于,pLSA
的主题的概率分布P(c|d)
是一个确定的概率分布,也就是虽然主题c
不确定,但是c
符合的概率分布是确定的,比如符合某个多项分布,这个多项分布的各参数是确定的。
但是在LDA
中,这个多项分布都是不确定的,高斯分布又服从一个狄利克雷先验分布(Dirichlet prior)
。即LDA
就是pLSA
的贝叶斯版本,正因为LDA
被贝叶斯化了,所以才会加的两个先验参数。
LDA
模型中一篇文档生成的方式如下所示:
-
1 按照$P(d_{i})$选择一篇文档$d_{i}$ ;
-
2 从狄利克雷分布$\alpha$中生成文档i的主题分布$\theta_{i}$ ;
-
3 从主题的多项式分布$\theta_{i}$中取样生成文档i第j个词的主题$Z_{i,j}$ ;
-
4 从狄利克雷分布$\eta$中取样生成主题$Z_{i,j}$对应的词语分布$\beta_{i,j}$ ;
-
5 从词语的多项式分布$\beta_{i,j}$中采样最终生成词语$W_{i,j}$
从上面的过程可以看出,LDA
在pLSA
的基础上,为主题分布和词分布分别加了两个Dirichlet
先验。
拿之前讲解pLSA
的例子进行具体说明。如前所述,在pLSA
中,选主题和选词都是两个随机的过程,先从主题分布{教育:0.5,经济:0.3,交通:0.2}
中抽取出主题:教育,然后从该主题对应的词分布{大学:0.5,老师:0.3,课程:0.2}
中抽取出词:大学。
在LDA
中,选主题和选词依然都是两个随机的过程。但在LDA
中,主题分布和词分布不再唯一确定不变,即无法确切给出。例如主题分布可能是{教育:0.5,经济:0.3,交通:0.2}
,也可能是{教育:0.6,经济:0.2,交通:0.2}
,到底是哪个我们不能确定,因为它是随机的可变化的。
但再怎么变化,也依然服从一定的分布,主题分布和词分布由Dirichlet
先验确定。
举个文档d
产生主题z
的例子。
在pLSA
中,给定一篇文档d,主题分布是一定的,比如{ P(zi|d), i = 1,2,3 }
可能就是{0.4,0.5,0.1}
,表示z1、z2、z3
这3个主题被文档d
选中的概率都是个固定的值:P(z1|d) = 0.4、P(z2|d) = 0.5、P(z3|d) = 0.1
,如下图所示:
在LDA
中,主题分布(各个主题在文档中出现的概率分布)和词分布(各个词语在某个主题下出现的概率分布)是唯一确定的。LDA
为提供了两个Dirichlet
先验参数,Dirichlet
先验为某篇文档随机抽取出主题分布和词分布。
给定一篇文档d
,现在有多个主题z1、z2、z3
,它们的主题分布{ P(zi|d), i = 1,2,3 }
可能是{0.4,0.5,0.1}
,也可能是{0.2,0.2,0.6}
,即这些主题被d
选中的概率都不再是确定的值,可能是P(z1|d) = 0.4、P(z2|d) = 0.5、P(z3|d) = 0.1
,也有可能是P(z1|d) = 0.2、P(z2|d) = 0.2、P(z3|d) = 0.6
,而主题分布到底是哪个取值集合我们不确定,但其先验分布是dirichlet
分布,所以可以从无穷多个主题分布中按照dirichlet
先验随机抽取出某个主题分布出来。如下图所示
LDA
在pLSA
的基础上给两参数$P(z_{k}|d_{i})$和$P(w_{j}|z_{k})$加了两个先验分布的参数。这两个分布都是Dirichlet
分布。
下面是LDA
的图模型结构:
在spark
中,提供了两种方法来估计参数,分别是变分EM
(期望最大)算法(见文献【3】【4】)和在线学习算法(见文献【5】)。下面将分别介绍这两种算法以及其源码实现。
变分贝叶斯算法的详细信息可以参考文献【9】。
在上文中,我们知道LDA
将变量theta
和phi
(为了方便起见,我们将上文LDA
图模型中的beta
改为了phi
)看做随机变量,并且为theta
添加一个超参数为alpha
的Dirichlet
先验,为phi
添加一个超参数为eta
的Dirichlet
先验来估计theta
和beta
的最大后验(MAP
)。
可以通过最优化最大后验估计来估计参数。我们首先来定义几个变量:
- 下式的
gamma
表示词为w
,文档为j
时,主题为k
的概率,如公式**(3.1.1)**
-
$N_{wj}$ 表示词w
在文档j
中出现的次数; -
$N_{wk}$ 表示词w
在主题k
中出现的次数,如公式**(3.1.2)**
-
$N_{kj}$ 表示主题k
在文档j
中出现的次数,如公式**(3.1.3)**
-
$N_{k}$ 表示主题k
中包含的词出现的总次数,如公式**(3.1.4)**
-
$N_{j}$ 表示文档j
中包含的主题出现的总次数,如公式**(3.1.5)**
根据文献【4】中2.2
章节的介绍,我们可以推导出如下更新公式**(3.1.6)**,其中alpha
和eta
均大于1:
收敛之后,最大后验估计可以得到公式**(3.1.7)**:
变分EM
算法的流程如下:
-
1 初始化状态,即随机初始化$N_{wk}$和$N_{kj}$
-
2 E-步,对每一个
(文档,词汇)
对i
,计算$P(z_{i}|w_{i},d_{i})$,更新gamma
值 -
3 M-步,计算隐藏变量
phi
和theta
。即计算$N_{wk}$和$N_{kj}$ -
4 重复以上2、3两步,直到满足最大迭代数
第4.2
章会从代码层面说明该算法的实现流程。
在变分贝叶斯推导(VB
)中,根据文献【3】,使用一种更简单的分布q(z,theta,beta)
来估计真正的后验分布,这个简单的分布使用一组自由变量(free parameters
)来定义。
通过最大化对数似然的一个下界(Evidence Lower Bound (ELBO)
)来最优化这些参数,如下公式**(3.2.1)**
最大化ELBO
就是最小化q(z,theta,beta)
和p(z,theta,beta|w,alpha,eta)
的KL
距离。根据文献【3】,我们将q
因式分解为如下**(3.2.2)**的形式:
后验z
通过phi
来参数化,后验theta
通过gamma
来参数化,后验beta
通过lambda
来参数化。为了简单描述,我们把lambda
当作“主题”来看待。公式**(3.2.2)分解为如下(3.2.3)**形式:
我们现在将上面的期望扩展为变分参数的函数形式。这反映了变分目标只依赖于$n_{dw}$ ,即词w
出现在文档d
中的次数。当使用VB
算法时,文档可以通过它们的词频来汇总(summarized
),如公式**(3.2.4)**
上面的公式中,W
表示词的数量,D
表示文档的数量。l
表示文档d
对ELBO
所做的贡献。L
可以通过坐标上升法来最优化,它的更新公式如**(3.2.5)**:
log(theta)
和log(beta)
的期望通过下面的公式**(3.2.6)**计算:
通过EM
算法,我们可以将这些更新分解成E-步
和M-步
。E-步
固定lambda
来更新gamma
和phi
;M-步
通过给定phi
来更新lambda
。批VB
算法的过程如下**(算法1)**所示:
批量变分贝叶斯算法需要固定的内存,并且比吉布斯采样更快。但是它仍然需要在每次迭代时处理所有的文档,这在处理大规模文档时,速度会很慢,并且也不适合流式数据的处理。
文献【5】提出了一种在线变分推导算法。设定gamma(n_d,lambda)
和phi(n_d,lambda)
分别表示gamma_d
和phi_d
的值,我们的目的就是设定phi
来最大化下面的公式**(3.2.7)**
我们在算法2中介绍了在线VB
算法。因为词频的第t个向量$n_{t}$是可观察的,我们在E-步
通过固定lambda
来找到gamma_t
和phi_t
的局部最优解。
然后,我们计算lambda_cap
。如果整个语料库由单个文档重复D
次组成,那么这样的lambda_cap
设置是最优的。之后,我们通过lambda
之前的值以及lambda_cap
来更新lambda
。我们给lambda_cap
设置的权重如公式**(3.2.8)**所示:
在线VB
算法的实现流程如下算法2所示
那么在在线VB
算法中,alpha
和eta
是如何更新的呢?参考文献【8】提供了计算方法。给定数据集,dirichlet
参数的可以通过最大化下面的对数似然来估计
其中,
有多种方法可以最大化这个目标函数,如梯度上升,Newton-Raphson
等。Spark
使用Newton-Raphson
方法估计参数,更新alpha
。Newton-Raphson
提供了一种参数二次收敛的方法,
它一般的更新规则如下公式**(3.3.3)**:
其中,H
表示海森矩阵。对于这个特别的对数似然函数,可以应用Newton-Raphson
去解决高维数据,因为它可以在线性时间求出海森矩阵的逆矩阵。一般情况下,海森矩阵可以用一个对角矩阵和一个元素都一样的矩阵的和来表示。
如下公式**(3.3.4)**,Q
是对角矩阵,C11
是元素相同的一个矩阵。
为了计算海森矩阵的逆矩阵,我们观察到,对任意的可逆矩阵Q
和非负标量c
,有下列式子**(3.3.5)**:
因为Q
是对角矩阵,所以Q
的逆矩阵可以很容易的计算出来。所以Newton-Raphson
的更新规则可以重写为如下**(3.3.6)**的形式
其中b
如下公式**(3.3.7)**,
我们从官方文档【6】给出的使用代码为起始点来详细分析LDA
的实现。
import org.apache.spark.mllib.clustering.{LDA, DistributedLDAModel}
import org.apache.spark.mllib.linalg.Vectors
// 加载和处理数据
val data = sc.textFile("data/mllib/sample_lda_data.txt")
val parsedData = data.map(s => Vectors.dense(s.trim.split(' ').map(_.toDouble)))
// 为文档编号,编号唯一。List((id,vector)....)
val corpus = parsedData.zipWithIndex.map(_.swap).cache()
// 将文档聚类为3类
val ldaModel = new LDA().setK(3).run(corpus)
val topics = ldaModel.topicsMatrix
for (topic <- Range(0, 3)) {
print("Topic " + topic + ":")
for (word <- Range(0, ldaModel.vocabSize)) { print(" " + topics(word, topic)); }
println()
}
以上代码主要做了两件事:加载和切分数据、训练模型。在样本数据中,每一行代表一篇文档,经过处理后,corpus
的类型为List((id,vector)*)
,一个(id,vector)
代表一篇文档。将处理后的数据传给org.apache.spark.mllib.clustering.LDA
类的run
方法,
就可以开始训练模型。run
方法的代码如下所示:
def run(documents: RDD[(Long, Vector)]): LDAModel = {
val state = ldaOptimizer.initialize(documents, this)
var iter = 0
val iterationTimes = Array.fill[Double](maxIterations)(0)
while (iter < maxIterations) {
val start = System.nanoTime()
state.next()
val elapsedSeconds = (System.nanoTime() - start) / 1e9
iterationTimes(iter) = elapsedSeconds
iter += 1
}
state.getLDAModel(iterationTimes)
}
这段代码首先调用initialize
方法初始化状态信息,然后循环迭代调用next
方法直到满足最大的迭代次数。在我们没有指定的情况下,迭代次数默认为20。需要注意的是,
ldaOptimizer
有两个具体的实现类EMLDAOptimizer
和OnlineLDAOptimizer
,它们分别表示使用EM
算法和在线学习算法实现参数估计。在未指定的情况下,默认使用EMLDAOptimizer
。
在spark
中,使用GraphX
来实现EMLDAOptimizer
,这个图是有两种类型的顶点的二分图。这两类顶点分别是文档顶点(Document vertices
)和词顶点(Term vertices
)。
-
文档顶点使用大于0的唯一的指标来索引,保存长度为
k
(主题个数)的向量 -
词顶点使用
{-1, -2, ..., -vocabSize}
来索引,保存长度为k
(主题个数)的向量 -
边(
edges
)对应词出现在文档中的情况。边的方向是document -> term
,并且根据document
进行分区
我们可以根据3.1节中介绍的算法流程来解析源代码。
spark
在EMLDAOptimizer
的initialize
方法中实现初始化功能。包括初始化Dirichlet
参数alpha
和eta
、初始化边、初始化顶点以及初始化图。
//对应超参数alpha
val docConcentration = lda.getDocConcentration
//对应超参数eta
val topicConcentration = lda.getTopicConcentration
this.docConcentration = if (docConcentration == -1) (50.0 / k) + 1.0 else docConcentration
this.topicConcentration = if (topicConcentration == -1) 1.1 else topicConcentration
上面的代码初始化了超参数alpha
和eta
,根据文献【4】,当alpha
未指定时,初始化其为(50.0 / k) + 1.0
,其中k
表示主题个数。当eta
未指定时,初始化其为1.1。
//对于每个文档,为每一个唯一的Term创建一个(Document->Term)的边
val edges: RDD[Edge[TokenCount]] = docs.flatMap { case (docID: Long, termCounts: Vector) =>
// Add edges for terms with non-zero counts.
termCounts.toBreeze.activeIterator.filter(_._2 != 0.0).map { case (term, cnt) =>
//文档id,termindex,词频
Edge(docID, term2index(term), cnt)
}
}
//term2index将term转为{-1, -2, ..., -vocabSize}索引
private[clustering] def term2index(term: Int): Long = -(1 + term.toLong)
上面的这段代码处理每个文档,对文档中每个唯一的Term
(词)创建一个边,边的格式为(文档id,词索引,词频)
。词索引为{-1, -2, ..., -vocabSize}
。
//创建顶点
val docTermVertices: RDD[(VertexId, TopicCounts)] = {
val verticesTMP: RDD[(VertexId, TopicCounts)] =
edges.mapPartitionsWithIndex { case (partIndex, partEdges) =>
val random = new Random(partIndex + randomSeed)
partEdges.flatMap { edge =>
val gamma = normalize(BDV.fill[Double](k)(random.nextDouble()), 1.0)
//此处的sum是DenseVector,gamma*N_wj
val sum = gamma * edge.attr
//srcId表示文献id,dstId表是词索引
Seq((edge.srcId, sum), (edge.dstId, sum))
}
}
verticesTMP.reduceByKey(_ + _)
}
上面的代码创建顶点。我们为每个主题随机初始化一个值,即gamma
是随机的。sum
为gamma * edge.attr
,这里的edge.attr
即N_wj
,所以sum
用gamma * N_wj
作为顶点的初始值。
this.graph = Graph(docTermVertices, edges).partitionBy(PartitionStrategy.EdgePartition1D)
上面的代码初始化Graph
并通过文档分区。
val eta = topicConcentration
val W = vocabSize
val alpha = docConcentration
val N_k = globalTopicTotals
val sendMsg: EdgeContext[TopicCounts, TokenCount, (Boolean, TopicCounts)] => Unit =
(edgeContext) => {
// 计算 N_{wj} gamma_{wjk}
val N_wj = edgeContext.attr
// E-STEP: 计算 gamma_{wjk} 通过N_{wj}来计算
//此处的edgeContext.srcAttr为当前迭代的N_kj , edgeContext.dstAttr为当前迭代的N_wk,
//后面通过M-步,会更新这两个值,作为下一次迭代的当前值
val scaledTopicDistribution: TopicCounts =
computePTopic(edgeContext.srcAttr, edgeContext.dstAttr, N_k, W, eta, alpha) *= N_wj
edgeContext.sendToDst((false, scaledTopicDistribution))
edgeContext.sendToSrc((false, scaledTopicDistribution))
}
上述代码中,W
表示词数,N_k
表示所有文档中,出现在主题k
中的词的词频总数,后续的实现会使用方法computeGlobalTopicTotals
来更新这个值。N_wj
表示词w
出现在文档j
中的词频数,为已知数。E-步
就是利用公式**(3.1.6)**去更新gamma
。
代码中使用computePTopic
方法来实现这个更新。edgeContext
通过方法sendToDst
将scaledTopicDistribution
发送到目标顶点,
通过方法sendToSrc
发送到源顶点以便于后续的M-步
更新的N_kj
和N_wk
。下面我们看看computePTopic
方法。
private[clustering] def computePTopic(
docTopicCounts: TopicCounts,
termTopicCounts: TopicCounts,
totalTopicCounts: TopicCounts,
vocabSize: Int,
eta: Double,
alpha: Double): TopicCounts = {
val K = docTopicCounts.length
val N_j = docTopicCounts.data
val N_w = termTopicCounts.data
val N = totalTopicCounts.data
val eta1 = eta - 1.0
val alpha1 = alpha - 1.0
val Weta1 = vocabSize * eta1
var sum = 0.0
val gamma_wj = new Array[Double](K)
var k = 0
while (k < K) {
val gamma_wjk = (N_w(k) + eta1) * (N_j(k) + alpha1) / (N(k) + Weta1)
gamma_wj(k) = gamma_wjk
sum += gamma_wjk
k += 1
}
// normalize
BDV(gamma_wj) /= sum
}
这段代码比较简单,完全按照公式**(3.1.6)**表示的样子来实现。val gamma_wjk = (N_w(k) + eta1) * (N_j(k) + alpha1) / (N(k) + Weta1)
就是实现的更新逻辑。
// M-STEP: 聚合计算新的 N_{kj}, N_{wk} counts.
val docTopicDistributions: VertexRDD[TopicCounts] =
graph.aggregateMessages[(Boolean, TopicCounts)](sendMsg, mergeMsg).mapValues(_._2)
我们由公式**(3.1.7)**可知,更新隐藏变量phi
和theta
就是更新相应的N_kj
和N_wk
。聚合更新使用aggregateMessages
方法来实现。请参考文献【7】来了解该方法的作用。
在线学习算法首先使用方法initialize
方法初始化参数值
override private[clustering] def initialize(
docs: RDD[(Long, Vector)],
lda: LDA): OnlineLDAOptimizer = {
this.k = lda.getK
this.corpusSize = docs.count()
this.vocabSize = docs.first()._2.size
this.alpha = if (lda.getAsymmetricDocConcentration.size == 1) {
if (lda.getAsymmetricDocConcentration(0) == -1) Vectors.dense(Array.fill(k)(1.0 / k))
else {
require(lda.getAsymmetricDocConcentration(0) >= 0,
s"all entries in alpha must be >=0, got: $alpha")
Vectors.dense(Array.fill(k)(lda.getAsymmetricDocConcentration(0)))
}
} else {
require(lda.getAsymmetricDocConcentration.size == k,
s"alpha must have length k, got: $alpha")
lda.getAsymmetricDocConcentration.foreachActive { case (_, x) =>
require(x >= 0, s"all entries in alpha must be >= 0, got: $alpha")
}
lda.getAsymmetricDocConcentration
}
this.eta = if (lda.getTopicConcentration == -1) 1.0 / k else lda.getTopicConcentration
this.randomGenerator = new Random(lda.getSeed)
this.docs = docs
// 初始化变分分布 q(beta|lambda)
this.lambda = getGammaMatrix(k, vocabSize)
this.iteration = 0
this
}
根据文献【5】,alpha
和eta
的值大于等于0,并且默认为1.0/k
。上文使用getGammaMatrix
方法来初始化变分分布q(beta|lambda)
。
private def getGammaMatrix(row: Int, col: Int): BDM[Double] = {
val randBasis = new RandBasis(new org.apache.commons.math3.random.MersenneTwister(
randomGenerator.nextLong()))
//初始化一个gamma分布
val gammaRandomGenerator = new Gamma(gammaShape, 1.0 / gammaShape)(randBasis)
val temp = gammaRandomGenerator.sample(row * col).toArray
new BDM[Double](col, row, temp).t
}
getGammaMatrix
方法使用gamma
分布初始化一个随机矩阵。
override private[clustering] def next(): OnlineLDAOptimizer = {
//返回文档集中采样的子集
//默认情况下,文档可以被采样多次,且采样比例是0.05
val batch = docs.sample(withReplacement = sampleWithReplacement, miniBatchFraction,
randomGenerator.nextLong())
if (batch.isEmpty()) return this
submitMiniBatch(batch)
}
以上的next
方法首先对文档进行采样,然后调用submitMiniBatch
对采样的文档子集进行处理。下面我们详细分解submitMiniBatch
方法。
- 1 计算
log(beta)
的期望,并将其作为广播变量广播到集群中
val expElogbeta = exp(LDAUtils.dirichletExpectation(lambda)).t
//广播变量
val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
//参数alpha是dirichlet参数
private[clustering] def dirichletExpectation(alpha: BDM[Double]): BDM[Double] = {
val rowSum = sum(alpha(breeze.linalg.*, ::))
val digAlpha = digamma(alpha)
val digRowSum = digamma(rowSum)
val result = digAlpha(::, breeze.linalg.*) - digRowSum
result
}
上述代码调用exp(LDAUtils.dirichletExpectation(lambda))
方法实现参数为lambda
的log beta
的期望。实现原理参见公式**(3.2.6)**。
- 2 计算
phi
以及gamma
,即算法2中的E-步
//对采样文档进行分区处理
val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs =>
//
val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
val stat = BDM.zeros[Double](k, vocabSize)
var gammaPart = List[BDV[Double]]()
nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
val ids: List[Int] = termCounts match {
case v: DenseVector => (0 until v.size).toList
case v: SparseVector => v.indices.toList
}
val (gammad, sstats) = OnlineLDAOptimizer.variationalTopicInference(
termCounts, expElogbetaBc.value, alpha, gammaShape, k)
stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
gammaPart = gammad :: gammaPart
}
Iterator((stat, gammaPart))
}
上面的代码调用OnlineLDAOptimizer.variationalTopicInference
实现算法2中的E-步
,迭代计算phi
和gamma
。
private[clustering] def variationalTopicInference(
termCounts: Vector,
expElogbeta: BDM[Double],
alpha: breeze.linalg.Vector[Double],
gammaShape: Double,
k: Int): (BDV[Double], BDM[Double]) = {
val (ids: List[Int], cts: Array[Double]) = termCounts match {
case v: DenseVector => ((0 until v.size).toList, v.values)
case v: SparseVector => (v.indices.toList, v.values)
}
// 初始化变分分布 q(theta|gamma)
val gammad: BDV[Double] = new Gamma(gammaShape, 1.0 / gammaShape).samplesVector(k) // K
//根据公式(3.2.6)计算 E(log theta)
val expElogthetad: BDV[Double] = exp(LDAUtils.dirichletExpectation(gammad)) // K
val expElogbetad = expElogbeta(ids, ::).toDenseMatrix // ids * K
//根据公式(3.2.5)计算phi,这里加1e-100表示并非严格等于
val phiNorm: BDV[Double] = expElogbetad * expElogthetad :+ 1e-100 // ids
var meanGammaChange = 1D
val ctsVector = new BDV[Double](cts) // ids
// 迭代直至收敛
while (meanGammaChange > 1e-3) {
val lastgamma = gammad.copy
//依据公式(3.2.5)计算gamma
gammad := (expElogthetad :* (expElogbetad.t * (ctsVector :/ phiNorm))) :+ alpha
//根据更新的gamma,计算E(log theta)
expElogthetad := exp(LDAUtils.dirichletExpectation(gammad))
// 更新phi
phiNorm := expElogbetad * expElogthetad :+ 1e-100
//计算两次gamma的差值
meanGammaChange = sum(abs(gammad - lastgamma)) / k
}
val sstatsd = expElogthetad.asDenseMatrix.t * (ctsVector :/ phiNorm).asDenseMatrix
(gammad, sstatsd)
}
- 3 更新
lambda
val statsSum: BDM[Double] = stats.map(_._1).reduce(_ += _)
val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
stats.map(_._2).reduce(_ ++ _).map(_.toDenseMatrix): _*)
val batchResult = statsSum :* expElogbeta.t
// 更新lambda和alpha
updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt)
updateLambda
方法实现算法2中的M-步
,更新lambda
。实现代码如下:
private def updateLambda(stat: BDM[Double], batchSize: Int): Unit = {
// 根据公式(3.2.8)计算权重
val weight = rho()
// 更新lambda,其中stat * (corpusSize.toDouble / batchSize.toDouble)+eta表示rho_cap
lambda := (1 - weight) * lambda +
weight * (stat * (corpusSize.toDouble / batchSize.toDouble) + eta)
}
// 根据公式(3.2.8)计算rho
private def rho(): Double = {
math.pow(getTau0 + this.iteration, -getKappa)
}
- 4 更新
alpha
private def updateAlpha(gammat: BDM[Double]): Unit = {
//计算rho
val weight = rho()
val N = gammat.rows.toDouble
val alpha = this.alpha.toBreeze.toDenseVector
//计算log p_hat
val logphat: BDM[Double] = sum(LDAUtils.dirichletExpectation(gammat)(::, breeze.linalg.*)) / N
//计算梯度为N(-phi(alpha)+log p_hat)
val gradf = N * (-LDAUtils.dirichletExpectation(alpha) + logphat.toDenseVector)
//计算公式(3.3.4)中的c,trigamma表示gamma函数的二阶导数
val c = N * trigamma(sum(alpha))
//计算公式(3.3.4)中的q
val q = -N * trigamma(alpha)
//根据公式(3.3.7)计算b
val b = sum(gradf / q) / (1D / c + sum(1D / q))
val dalpha = -(gradf - b) / q
if (all((weight * dalpha + alpha) :> 0D)) {
alpha :+= weight * dalpha
this.alpha = Vectors.dense(alpha.toArray)
}
}
【1】LDA数学八卦
【2】通俗理解LDA主题模型
【3】[Latent Dirichlet Allocation](docs/Latent Dirichlet Allocation.pdf)
【4】[On Smoothing and Inference for Topic Models](docs/On Smoothing and Inference for Topic Models.pdf)
【5】[Online Learning for Latent Dirichlet Allocation](docs/Online Learning for Latent Dirichlet Allocation.pdf)
【6】Spark官方文档
【8】Maximum Likelihood Estimation of Dirichlet Distribution Parameters