-
Notifications
You must be signed in to change notification settings - Fork 8
/
generator.py
1006 lines (834 loc) · 36.6 KB
/
generator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#-*- coding: utf-8 -*-
# vim: set bg=dark noet ts=4 sw=4 fdm=indent :
""" Generator of Chinese Poem (宋词)"""
__author__ = 'linpingta'
import os
import sys
reload(sys)
sys.setdefaultencoding('utf8')
try:
import ConfigParser
except ImportError:
import configparser as ConfigParser
import logging
import re
import simplejson as json
import jieba
import jieba.posseg as pseg
from gensim import models
import random
import operator
from title_rhythm import TitleRhythmDict
basepath = os.path.abspath(os.path.dirname(__file__))
def my_unicode(lst):
return repr(lst).decode('unicode-escape')
def my_unicode_sd(d):
lst = [ word for (word, count) in d ]
return my_unicode(lst)
def my_unicode_d(d):
lst = [ word for word, count in d.iteritems() ]
return my_unicode(lst)
class Generator(object):
""" Generator of Chinese Poem
"""
def __init__(self, basepath, conf):
self.basepath = basepath
self._ci_words_file = os.path.join(self.basepath, conf.get('ci', 'ci_words_file'))
self._ci_rhythm_file = os.path.join(self.basepath, conf.get('ci', 'ci_rhythm_file'))
self._ci_result_file = os.path.join(self.basepath, conf.get('ci', 'ci_result_file'))
self._support_titles = conf.get('ci', 'support_titles')
# user input
self._important_words = []
self._title = ""
self._force_data_build = False
# load from data file
self._title_pingze_dict = {}
self._title_delimiter_dict = {}
self._pingze_words_dict = {}
self._pingze_rhythm_dict = {}
self._rhythm_word_dict = {}
self._reverse_rhythm_word_dict = {}
self._reverse_pingze_word_dict = {}
self._sentences = []
# split related data
self._split_sentences = []
self._word_model = None
# word count related
self._word_count_dict = {}
self._rhythm_count_dict = {}
self._bigram_word_to_start_dict = {}
self._bigram_word_to_end_dict = {}
self._bigram_count_dict = {}
# storage of related precalculated data
self._data_files = [
"title_pingze_dict", "title_delimiter_dict", "pingze_words_dict", "pingze_rhythm_dict", "rhythm_word_dict", "reverse_rhythm_word_dict", "reverse_pingze_word_dict", "word_count_dict", "rhythm_count_dict", "split_sentences", "bigram_word_to_start_dict", "bigram_word_to_end_dict", "bigram_count_dict", "sentences"
]
# store generated poem
self._result = ""
# store error reason if no poem generated
self._error_info = ""
self._search_ratio = 0
@property
def search_ratio(self):
return self._search_ratio
@property
def important_words(self):
return self._important_words
@property
def title(self):
return self._title
@property
def force_data_build(self):
return self._force_data_build
@search_ratio.setter
def search_ratio(self, value):
self._search_ratio = value
@important_words.setter
def important_words(self, value):
self._important_words = value
@title.setter
def title(self, value):
self._title = value
@force_data_build.setter
def force_data_build(self, value):
self._force_data_build = value
def _get_top_words_with_count(self, word_count_dict, topN=1):
words = []
if not word_count_dict:
return u""
word_count_dict = sorted(word_count_dict.items(), key=operator.itemgetter(1), reverse=True)
for i, (word, count) in enumerate(word_count_dict):
if i >= topN:
break
words.append((word, count))
return words
def _get_top_word_uniform_random(self, word_count_dict, topN=1):
words_with_count = self._get_top_words_with_count(word_count_dict, topN)
words = []
[ words.append(word) for (word, count) in words_with_count ]
idx = random.randint(0, len(words)-1)
return words[idx]
def _get_top_word_weight_random(self, word_count_dict, topN=1):
words_with_count = self._get_top_words_with_count(word_count_dict, topN)
return self._weighted_choice(words_with_count)
def _show_word_sentence(self, format_sentence, word_sentence, logger, comment="omg"):
logger.info("%s: format_sentence %s" % (comment, my_unicode(format_sentence)))
tmp_sentence = []
for i in range(len(format_sentence)):
if i in word_sentence:
tmp_sentence.append(word_sentence[i])
else:
tmp_sentence.append("X")
logger.info("%s: word_sentence %s" % (comment, my_unicode(tmp_sentence)))
def _show_word_sentences(self, format_sentences, word_sentences, logger, comment="omg"):
[ self._show_word_sentence(format_sentence, word_sentence, logger,comment) for (format_sentence, word_sentence) in zip(format_sentences, word_sentences) ]
def _build_title_pingze_dict(self, logger):
for title, content_rhythm in TitleRhythmDict.iteritems():
#print title
#print content_rhythm
sentences = re.findall(r"[0-9]+", content_rhythm)
new_sentences = []
for sentence in sentences:
new_sentence = ""
for word in sentence:
if not int(word):
new_sentence += "0"
elif not (int(word) % 2):
new_sentence += "2"
else:
new_sentence += "1"
new_sentences.append(new_sentence)
self._title_pingze_dict[title.decode()] = new_sentences
delimiters = []
for word in content_rhythm:
if word in [",", ".", "`", "|"]:
delimiters.append(word)
self._title_delimiter_dict[title.decode()] = delimiters
def _build_pingze_rhythm_words_dict(self, logger):
with open(self._ci_rhythm_file, 'r') as fp_r:
count = 1
while 1:
line = fp_r.readline()
line = line.strip().decode("utf-8")
if not line:
continue
if line == "END":
break
if u":" in line: # Chinese title part
#print line
#print len(line)
next_line = fp_r.readline().strip().decode("utf-8")
rhythm_word = line[-2]
is_ping = True
if u"平" in line: # ping related
self._pingze_rhythm_dict.setdefault('1', []).append(rhythm_word)
is_ping = True
else: # ze related
self._pingze_rhythm_dict.setdefault('2', []).append(rhythm_word)
is_ping = False
# build reverse dict for count later
invalid_flag = False
invalid_value = []
words = []
for word in next_line:
if word == u"[":
invalid_flag = True
if invalid_flag:
invalid_value.append(word)
if word == u"]":
invalid_flag = False
continue
self._reverse_rhythm_word_dict[word] = rhythm_word
if is_ping: # ping related
self._reverse_pingze_word_dict[word] = '1'
else: # ze related
self._reverse_pingze_word_dict[word] = '2'
words.append(word)
self._rhythm_word_dict[rhythm_word] = words
if u"平" in line: # ping related
self._pingze_words_dict.setdefault('1', []).extend(words)
else: # ze related
self._pingze_words_dict.setdefault('2', []).extend(words)
#count += 1
#if count > 2:
# break
def _count_general_rhythm_words(self, logger):
with open(self._ci_words_file, 'r') as fp_r:
count = 1
while 1:
line = fp_r.readline()
line = line.strip().decode("utf-8")
if not line:
continue
if line == "END":
break
if (u"," not in line) and (u"。" not in line): # only use content part for stats
continue
sentences = re.split(u"[,。]", line)
for sentence in sentences:
if sentence:
self._sentences.append(sentence)
final_word = sentence[-1]
#print 'final', final_word
if final_word not in self._reverse_rhythm_word_dict:
#print 'not exist', final_word
continue
rhythm_word = self._reverse_rhythm_word_dict[final_word]
#print 'rhythm', rhythm_word
if final_word not in self._word_count_dict:
self._word_count_dict[final_word] = 1
else:
self._word_count_dict[final_word] += 1
if rhythm_word not in self._rhythm_count_dict:
self._rhythm_count_dict[rhythm_word] = 1
else:
self._rhythm_count_dict[rhythm_word] += 1
# build 2-gram
for idx, word in enumerate(sentence):
if idx >= len(sentence) - 1:
break
first_word = word
second_word = sentence[idx+1]
if (first_word == u'、') or (second_word == u'、'):
continue
bigram_key = '__'.join([first_word, second_word])
if bigram_key not in self._bigram_count_dict:
self._bigram_count_dict[bigram_key] = 1
else:
self._bigram_count_dict[bigram_key] += 1
self._bigram_word_to_start_dict.setdefault(first_word, []).append(bigram_key)
self._bigram_word_to_end_dict.setdefault(second_word, []).append(bigram_key)
#print line
#print 'bigram'
#print self._bigram_count_dict
#print self._bigram_word_to_start_dict
#print self._bigram_word_to_end_dict
#count += 1
#if count > 10:
# break
def _split_words(self, logger):
""" split words with jieba"""
with open(self._ci_words_file, 'r') as fp_r:
count = 1
while 1:
line = fp_r.readline()
line = line.strip().decode("utf-8")
if not line:
continue
if line == "END":
break
if (u"," not in line) and (u"。" not in line): # only use content part for stats
continue
#print line
words = jieba.cut(line)
words = list(words)
#print '/ '.join(words)
self._split_sentences.append(words)
count += 1
#if count > 10:
# break
def _build_word2vec(self, logger):
""" build word2vec for words"""
if not self._split_words:
logger.error("no split words, skip")
else:
self._word_model = models.Word2Vec(self._split_sentences, min_count=5)
self._word_model.save(os.path.join(self.basepath, "data", "word_model"))
def _init_data_build(self, logger):
""" generate title, pingze, rhythm, word relationship"""
# mapping title to ping&ze
self._build_title_pingze_dict(logger)
# mapping pingze, rhythm to words
self._build_pingze_rhythm_words_dict(logger)
# mapping rhythm_end to words,
self._count_general_rhythm_words(logger)
## split words
self._split_words(logger)
## build word2vec
self._build_word2vec(logger)
# save related data
for data_file in self._data_files:
value = getattr(self, "_"+data_file)
with open(os.path.join(self.basepath, "data", data_file), "w") as fp_w:
json.dump(value, fp_w)
def _load_data_build(self, logger):
for data_file in self._data_files:
with open(os.path.join(self.basepath, "data", data_file), "r") as fp_r:
value = json.load(fp_r)
setattr(self, "_"+data_file, value)
self._word_model = models.Word2Vec.load(os.path.join(self.basepath, "data", "word_model"))
def _get_format_with_title(self, title, logger):
if title not in self._title_pingze_dict:
return -1
return self._title_pingze_dict[title]
def _check_position_by_sentence_length(self, sentence_length, logger):
if sentence_length == 7:
return [0,2,4,5]
elif sentence_length == 6:
return [0,2,4]
elif sentence_length == 5:
return [0,2,4]
elif sentence_length == 4:
return [0,2]
elif sentence_length == 3:
return [0]
else:
return []
def _weighted_choice(self, choices, already_check_choices=[]):
sub_choices = []
for (c,w) in choices:
if c not in already_check_choices:
sub_choices.append((c,w))
total = sum(w for (c, w) in sub_choices)
r = random.uniform(0, total)
upto = 0
for c, w in sub_choices:
if upto + w >= r:
return c
upto += w
def _compare_words(self, format_words, input_words):
for (format_word, input_word) in zip(format_words, input_words):
if format_word == '0': # no check needed
continue
if format_word != input_word:
return False
return True
def _combine_candidate_word_with_single_sentence(self, format_sentence, candidate_words, already_used_words, logger):
"""
In each sentence, put one candidate word in it
with consideration of pingze as well as postion and already used condition
"""
position_word_dict = {}
logger.info('single sentence: format_sentence %s' % my_unicode(format_sentence))
logger.debug('single sentence: already_used_words %s' % my_unicode(already_used_words))
# remove already used words
logger.debug('single sentence: origin_candidate_words %s' % my_unicode(candidate_words))
new_candidate_words = [ word for word in candidate_words if word[0] not in already_used_words ]
logger.debug('single sentence: new_candidate_words %s' % my_unicode(new_candidate_words))
if not new_candidate_words:
logger.warning("use all words, that should not happen")
new_candidate_words = candidate_words
sentence_length = len(format_sentence)
# chekc delimiter for sentence
positions = self._check_position_by_sentence_length(sentence_length, logger)
if not positions: # don't consider position, only consider pingze
logger.info("sentence_length[%d] dont check position, as not defined" % sentence_length)
logger.debug("single sentence: positions %s" % str(positions))
# random fill first
random_already_check_words = []
is_word_found = False
for i in range(10):
# randomly select one candidate word
candidate_word = self._weighted_choice(new_candidate_words, random_already_check_words)
if not candidate_word:
raise ValueError("candidate_word %s not exist in %s" % (candidate_word, my_unicode(new_candidate_words)))
random_already_check_words.append(candidate_word)
logger.debug("single sentence: iter[%d] candidate_word %s" % (i, candidate_word))
# get word pingze
word_pingze = []
word_pingze_flag = True
for candidate_word_elem in candidate_word:
if candidate_word_elem not in self._reverse_pingze_word_dict:
word_pingze_flag = False
break
word_pingze.append(self._reverse_pingze_word_dict[candidate_word_elem])
logger.debug("single sentence: iter[%d] candidate_word %s, word_pingze %s" % (i, candidate_word, my_unicode(word_pingze)))
if (not word_pingze_flag) or (len(word_pingze) != len(candidate_word)):
logger.warning("word_pingze len[%d] not equal to word len[%d]" % (len(word_pingze), len(candidate_word)))
continue
for j in range(len(positions) - 1): # dont check rhythm part
pos_start = positions[j]
pos_end = positions[j+1]
tmp_word = format_sentence[pos_start:pos_end]
logger.debug('iter[%d] pos_iter[%d] word_pingze %s, tmp_word %s' % (i, j, word_pingze, tmp_word))
if (len(tmp_word) == len(word_pingze)) and (self._compare_words(tmp_word, word_pingze)):
# write word with position
for p, m in enumerate(range(pos_start, pos_end)):
position_word_dict[m] = candidate_word[p]
is_word_found = True
break
if is_word_found:
already_used_words.append(candidate_word)
logger.info("single sentence: add candidate_word %s to word_sentence" % candidate_word)
break
return position_word_dict
def _filter_simliar_words(self, whole_similar_words, logger):
filtered_similar_words = []
for (word, similarity) in whole_similar_words:
logger.debug("word[%s] len[%d]" % (word, len(word)))
word_elems = pseg.cut(word)
word_flag_valid = False
for word_elem, flag in word_elems:
logger.debug("word[%s] word_elem[%s] flag[%s]" % (word, word_elem, flag))
if flag in ['n', 'ns', 'nr', 't']:
word_flag_valid = True
break
if len(word) < 2 and (not word_flag_valid):
continue
filtered_similar_words.append((word, similarity))
return filtered_similar_words
def _combine_important_word_with_sentence(self, important_words, format_sentences, logger):
"""
make every sentence has one related importanct word
and promise pingze order as well as position order
we try to use whole word to find similar words first,
if not, then use each word to find
"""
word_sentences = []
sentence_length = len(format_sentences)
candidate_length = 5 * sentence_length
# if put all words in word2vec.most_similar function, and any one of words not exist will lead to call fail
# so try to check all words and get most common valid words, ugly but seems no official func given
useful_important_words = []
for important_word in important_words:
try:
similar_words = self._word_model.most_similar(positive=[ important_word ], topn=candidate_length)
except KeyError as e1:
pass
else:
useful_important_words.append(important_word)
# trick here if no useful word given
if not useful_important_words:
logger.warning("no valid tags %s in user input, trick" % my_unicode(useful_important_words))
useful_important_words = [u"菊花"]
# cut useful words, it seems too many useful words not ok than simple one
max_useful_words_len = 3
if len(useful_important_words) > max_useful_words_len:
useful_important_words = useful_important_words[:max_useful_words_len]
whole_similar_words = []
try:
whole_similar_words = self._word_model.most_similar(positive=useful_important_words, topn=candidate_length)
logger.info("get whole_similar_words %s based on useful_important_words %s as whole" % (my_unicode(whole_similar_words), my_unicode(useful_important_words)))
except KeyError as e:
logger.exception(e)
# Oops, we don't know what user want, create one randomly
if not whole_similar_words:
logger.warning("Oops, no similar word generated based on important_word[%s] seperately" % str(important_word))
else:
# filter word type and word length
whole_similar_words = self._filter_simliar_words(whole_similar_words, logger)
logger.info("filtered whole_similar_words %s based on important_words %s as whole" % (my_unicode(whole_similar_words), my_unicode(important_words)))
# order list of tuple, and fetch the first candidate_length of candidates
whole_similar_words = sorted(whole_similar_words, key=operator.itemgetter(1), reverse=True)
candidate_words = whole_similar_words[:candidate_length]
logger.info("get candidate_words %s based on important_words %s" % (my_unicode(candidate_words), my_unicode(important_words)))
# at now, we promise whole_similar_words have enough data
# now, combine them with sentences
already_used_words = []
for format_sentence in format_sentences:
word_sentence = self._combine_candidate_word_with_single_sentence(format_sentence, candidate_words, already_used_words, logger)
word_sentences.append(word_sentence)
return word_sentences
def _generate_common_rhythm(self, is_ping=True):
""" generate common rhythm"""
candidate_rhythms = self._pingze_rhythm_dict["1"] if is_ping else self._pingze_rhythm_dict["2"]
#print 'rhythm_count', self._rhythm_count_dict
candidate_rhythm_count_dict = {}
for candidate_rhythm in candidate_rhythms:
if candidate_rhythm in self._rhythm_count_dict:
candidate_rhythm_count_dict[candidate_rhythm] = self._rhythm_count_dict[candidate_rhythm]
candidate_rhythm_count_dict = sorted(candidate_rhythm_count_dict.items(), key=operator.itemgetter(1), reverse=True)
count = 0
narrow_candidate_rhythms = []
for (rhythm, rhythm_count) in candidate_rhythm_count_dict:
narrow_candidate_rhythms.append((rhythm, rhythm_count))
count = count + 1
if count > 5:
break
selected_rhythm = self._weighted_choice(narrow_candidate_rhythms)
return selected_rhythm
def _generate_common_words(self, rhythm, is_ping=True):
""" generate common words"""
candidate_words = self._rhythm_word_dict[rhythm]
candidate_word_count_dict = {}
for candidate_word in candidate_words:
if candidate_word in self._word_count_dict:
candidate_word_count_dict[candidate_word] = self._word_count_dict[candidate_word]
candidate_word_count_dict = sorted(candidate_word_count_dict.items(), key=operator.itemgetter(1), reverse=True)
return candidate_word_count_dict
def _generate_common_rhythm_words(self, is_ping, logger):
""" generate rhythm words
first, generate common rhythm
second, generate words based on rhythm
"""
logger.info("generate_rhythm: generate common rhythm for isping[%d]" % int(is_ping))
rhythm = self._generate_common_rhythm(is_ping)
logger.info("generate_rhythm: use rhythm[%s] for is_ping[%d] generatoin" % (rhythm, int(is_ping)))
logger.info("generate_rhythm: generate common words for isping[%d]" % int(is_ping))
word_count_dict = self._generate_common_words(rhythm, is_ping)
logger.info("generate_rhythm: word_count_dict %s for isping[%d]" % (my_unicode_sd(word_count_dict), int(is_ping)))
return word_count_dict
def _generate_rhythm(self, format_sentences, word_sentences, logger):
""" generate rhythm"""
logger.info("generate_rhythm: format_sentences")
# generate ping word with count
ping_word_count_dict = self._generate_common_rhythm_words(True, logger)
# genrate ze word with count
ze_word_count_dict = self._generate_common_rhythm_words(False, logger)
already_used_rhythm_words = []
for format_sentence, word_sentence in zip(format_sentences, word_sentences):
logger.info("generate_rhythm: format_sentence %s, word_sentence %s" % (my_unicode(format_sentence), my_unicode(word_sentence)))
rhythm_word = ""
if format_sentence[-1] == '1':
rhythm_word = self._weighted_choice(ping_word_count_dict, already_used_rhythm_words)
elif format_sentence[-1] == '2':
rhythm_word = self._weighted_choice(ze_word_count_dict, already_used_rhythm_words)
elif format_sentence[-1] == '0':
rhythm_word = self._weighted_choice(ping_word_count_dict + ze_word_count_dict, already_used_rhythm_words)
else:
logger.error("rhythm_type[%s] illegal" % format_sentence[-1])
already_used_rhythm_words.append(rhythm_word)
logger.debug("generate_rhythm: use rhythm_word %s" % rhythm_word)
word_sentence[len(format_sentence)-1] = rhythm_word
def _fill_word(self, direction, tofill_position, format_sentence, word_sentence, global_repeat_words, current_repeat_dict, level, logger):
""" fill word by related word, and position"""
logger.debug("fill_word: level[%d] fill word" % level)
seed_word = word_sentence[tofill_position - direction]
logger.debug("fill_word: level[%d] tofill_position[%d] seed_word %s" % (level, tofill_position, seed_word))
# check 2-gram dict and pingze order
if direction > 0:
bigram_word_dict = self._bigram_word_to_start_dict
verb_position = -1
else:
bigram_word_dict = self._bigram_word_to_end_dict
verb_position = 0
logger.debug("fill_word: level[%d] verb_position[%d]" % (level, verb_position))
if seed_word in bigram_word_dict:
candidate_words = bigram_word_dict[seed_word]
candidate_verb_count_dict = {}
for candidate_word in candidate_words:
candidate_verb = candidate_word[verb_position]
#logger.debug("fill_word: level[%d] candidate_verb %s, candidate_word %s with seed_word %s" % (level, candidate_verb, candidate_word, seed_word))
if candidate_verb not in self._reverse_pingze_word_dict:
#logger.debug("fill_word: level[%d] candidate_verb %s no pingze info, skip" % (level, candidate_verb))
continue
# not use repeated word
if candidate_verb in global_repeat_words:
#logger.debug("fill_word: level[%d] candidate_verb %s in global repeat words %s, skip" % (level, candidate_verb, my_unicode(global_repeat_words)))
continue
# not use too many repeated word in one sentence
if candidate_verb in current_repeat_dict:
if current_repeat_dict[candidate_verb] > 2:
logger.debug("fill_word: level[%d] candidate_verb %s in current repeat words, skip" % (level, candidate_verb))
continue
# check pingze order first
format_tofill_position = format_sentence[tofill_position]
candidate_verb_position = self._reverse_pingze_word_dict[candidate_verb]
#logger.debug("fill_word: level[%d] candidate_verb %s format_pingze_position %s, verb_position %s" % (level, candidate_verb, format_tofill_position, candidate_verb_position))
if (format_tofill_position != '0') and (candidate_verb_position != format_tofill_position):
#logger.debug("fill_word: level[%d] candidate_verb %s pingze not match, skip" % (level, candidate_verb))
continue
# set initial, protect not exists
candidate_verb_count_dict[candidate_verb] = 1
if candidate_word in self._bigram_count_dict:
candidate_verb_count_dict[candidate_verb] = self._bigram_count_dict[candidate_word]
if candidate_verb_count_dict: # there exists some valid verbs
#selected_word = ""
# definitive select max one
#max_count = -1
#for candidate_verb, count in candidate_verb_count_dict.iteritems():
# if count > max_count:
# max_count = count
# selected_word = candidate_verb
#logger.debug("fill_word: level[%d] select_word %s with count %d" % (level, selected_word, max_count))
# random select word
topN = 5
selected_word = self._get_top_word_weight_random(candidate_verb_count_dict, topN)
logger.debug("fill_word: level[%d] select_word %s with random topN %d" % (level, selected_word, topN))
else:
logger.error("fill_word: level[%d] no candidate word" % (level))
if candidate_words: # no pingze satisfy, random select one
idx = random.randint(0, len(candidate_words) - 1)
selected_word = candidate_words[idx][verb_position]
logger.debug("fill_word: level[%d] select_word %s with idx %d" % (level, selected_word, idx))
else:
raise ValueError("word exist in bigram_word_dict, but it's empty")
else: # word not exists in 2-gram
logger.error("fill_word: level[%d] seed_word %s not exist in 2-gram" % (level, seed_word))
# select and fill
word_sentence[tofill_position] = selected_word
if selected_word not in current_repeat_dict:
current_repeat_dict[selected_word] = 1
else:
current_repeat_dict[selected_word] += 1
logger.info("fill_word: level[%d] tofill_position[%d] seed_word %s, fill_word %s" % (level, tofill_position, seed_word, selected_word))
def _up_fill_direction(self, tofill_position, sentence_length, logger):
""" some words are connected tight than other"""
format_positions = self._check_position_by_sentence_length(sentence_length, logger)
# we dont know, use down-fill
if not format_positions:
return False
if tofill_position in format_positions:
return False
else:
return True
def _search_generate(self, format_sentence, word_sentence, global_repeat_words, current_repeat_dict, already_used_sentences, already_used_rhythm_words, logger):
""" try to search already exist word"""
# no search for only rhythm sentence
if len(word_sentence) <= 1:
return False
sentence_length = len(format_sentence)
if sentence_length <= 2:
return False
for sentence in self._sentences:
if sentence_length != len(sentence):
continue
for word in word_sentence.values():
if word not in sentence:
continue
# now, check rhythm
current_rhythm_word = word_sentence[len(format_sentence) - 1]
if current_rhythm_word not in self._reverse_rhythm_word_dict:
continue
current_rhythm = self._reverse_rhythm_word_dict[current_rhythm_word]
sentence_word = sentence[-1]
if sentence_word not in self._reverse_rhythm_word_dict:
continue
if sentence_word in already_used_rhythm_words:
continue
sentence_rhythm = self._reverse_rhythm_word_dict[sentence_word]
if sentence_rhythm == current_rhythm:
sentence_dict = {}
for i, word in enumerate(sentence):
sentence_dict[i] = word
if sentence_dict in already_used_sentences:
continue
u = random.random()
if u < 0.8:
continue
return sentence_dict
return False
def _sub_generate(self, format_sentence, word_sentence, global_repeat_words, current_repeat_dict, logger, level=0):
""" recursion generate single sentence"""
sentence_length = len(format_sentence)
word_sentence_length = len(word_sentence.keys())
logger.info("sub_generate: level[%d]" % level)
logger.debug("sub_generate: level[%d] sentence_len %d, word_filled_len %d" % (level, sentence_length, word_sentence_length))
# all word position filled, return
if word_sentence_length == sentence_length:
logger.info("sub_generate: recursion finish")
return
# show candidate positions based on current filled positions
candidate_positions = []
for i in range(sentence_length):
if i in word_sentence:
continue
if (i-1) in word_sentence or (i+1) in word_sentence:
candidate_positions.append(i)
logger.debug("sub_generate: level[%d] candidate_positions %s" % (level, str(candidate_positions)))
if not candidate_positions:
raise ValueError("candidate_positions len zero, illegal")
if len(candidate_positions) == 1: # no choice but use this
tofill_position = candidate_positions[0]
else: # random choose one in choices
## always fill rhythm_word related at end
#if sentence_length - word_sentence_length > 1:
# if (sentence_length - 2) in candidate_positions:
# candidate_positions.remove(sentence_length - 2)
idx = random.randint(0, len(candidate_positions) - 1)
tofill_position = candidate_positions[idx]
logger.debug("sub_generate: level[%d] tofill_position %d" % (level, tofill_position))
up_fill_direction = (tofill_position - 1) in word_sentence
down_fill_direction = (tofill_position + 1) in word_sentence
both_fill_direction = up_fill_direction and down_fill_direction
if both_fill_direction: # consider format, choose only one, consider later
if self._up_fill_direction(tofill_position, sentence_length, logger):
up_fill_direction = True
down_fill_direction = False
else:
up_fill_direction = False
down_fill_direction = True
logger.debug("sub_generate: level[%d] up_fill_direction[%d] down_fill_direction[%d]" % (level, up_fill_direction, down_fill_direction))
# fill word one by one
if up_fill_direction:
logger.debug("sub_generate: level[%d] use up_fill method" % (level))
self._fill_word(1, tofill_position, format_sentence, word_sentence, global_repeat_words, current_repeat_dict, level, logger)
else:
logger.debug("sub_generate: level[%d] use down_fill method" % (level))
self._fill_word(-1, tofill_position, format_sentence, word_sentence, global_repeat_words, current_repeat_dict, level, logger)
level = level + 1
self._sub_generate(format_sentence, word_sentence, global_repeat_words, current_repeat_dict, logger, level)
def _fill_result_with_format(self, result_sentence_list):
""" fill result with format"""
result = ""
delimiters = self._title_delimiter_dict[self._title]
idx_delimiter = 0
for result_sentence in result_sentence_list:
result += result_sentence
result += delimiters[idx_delimiter]
if (idx_delimiter+1 < len(delimiters)) and (delimiters[idx_delimiter+1] == "|"):
result += " | "
idx_delimiter += 1
idx_delimiter += 1
return result
def _generate(self, format_sentences, word_sentences, logger):
""" generate poem based on important words and rhythm word"""
result_sentence_list = []
# generate each sentence
# avoid words between sentences
global_repeat_words = []
already_used_rhythm_words = []
already_used_sentences = []
for i, (format_sentence, word_sentence) in enumerate(zip(format_sentences, word_sentences)):
result_sub_sentence = ""
# avoid too many same word in one sentence
current_repeat_dict = {}
for word in word_sentence.values():
if word not in current_repeat_dict:
current_repeat_dict[word] = 1
else:
current_repeat_dict[word] += 1
self._show_word_sentence(format_sentence, word_sentence, logger, "omg origin:s %d" % (i+1))
u = random.random()
if u < self._search_ratio:
search_sentence = self._search_generate(format_sentence, word_sentence, global_repeat_words, current_repeat_dict, already_used_sentences, already_used_rhythm_words, logger)
if not search_sentence:
self._sub_generate(format_sentence, word_sentence, global_repeat_words, current_repeat_dict, logger)
else:
logger.info("[%d] use search generate for word sentence" % i)
word_sentence = search_sentence
already_used_sentences.append(search_sentence)
else:
self._sub_generate(format_sentence, word_sentence, global_repeat_words, current_repeat_dict, logger)
self._show_word_sentence(format_sentence, word_sentence, logger, "omg final:s %d" % (i+1))
for word in word_sentence.values():
result_sub_sentence += word
global_repeat_words.append(word)
already_used_rhythm_words.append(word_sentence[len(format_sentence) - 1])
result_sentence_list.append(result_sub_sentence)
# fill with delimiter
if self._title not in self._title_delimiter_dict:
print 'here'
return u','.join(result_sentence_list)
elif len(self._title_delimiter_dict[self._title]) != (len(result_sentence_list)+1):
print 'here2'
raise
return u','.join(result_sentence_list)
else:
return self._fill_result_with_format(result_sentence_list)
def init(self, logger):
if self._force_data_build:
self._init_data_build(logger)
else:
try:
self._load_data_build(logger)
except Exception as e:
logger.exception(e)
self._init_data_build(logger)
def check(self, input_param_dict, logger):
""" select ci-title with supported titles"""
return ""
if ('title' in input_param_dict) and (input_param_dict['title'] not in self._support_titles):
return "%s 不是候选的词牌名" % input_param_dict['title']
def generate(self, logger):
""" main function for poem generated"""
# get title related sentences
format_sentences = self._get_format_with_title(self._title, logger)
if format_sentences < 0:
raise ValueError("title[%s] not defined in dict" % self._title)
# combine important words with format sentences
word_sentences = self._combine_important_word_with_sentence(self._important_words, format_sentences, logger)
self._show_word_sentences(format_sentences, word_sentences, logger)
# decide rhythm and related words
self._generate_rhythm(format_sentences, word_sentences, logger)
self._show_word_sentences(format_sentences, word_sentences, logger)
# now, generate poem
result_sentences = self._generate(format_sentences, word_sentences, logger)
logger.info("titile[%s] generate ci %s" % (self._title, result_sentences))
return result_sentences
if __name__ == '__main__':
confpath = os.path.join(basepath, 'conf/poem.conf')
conf = ConfigParser.RawConfigParser()
conf.read(confpath)
logging.basicConfig(filename=os.path.join(basepath, 'logs/chinese_poem.log'), level=logging.DEBUG,
format = '[%(filename)s:%(lineno)s - %(funcName)s %(asctime)s;%(levelname)s] %(message)s',
datefmt = '%a, %d %b %Y %H:%M:%S'
)
logger = logging.getLogger('ChinesePoem')
generator = Generator(basepath, conf)
try:
# special case test
user_input_dict = dict(title=u"南乡子", important_words=[], force_data_build=False)
user_input_dict = dict(title=u"南乡子", important_words=[u"计算机"], force_data_build=False)
#user_input_dict = dict(title=u"水调歌头", important_words=[u"菊花", u"院子"], force_data_build=False)
# As user input, for theme of poem, and title
#user_input_dict = dict(title=u"浣溪沙", important_words=[u"菊花", u"庭院"], force_data_build=False)
#user_input_dict = dict(title=u"蝶恋花", important_words=[u"菊花", u"院子"], force_data_build=False)
#user_input_dict = dict(title=u"南乡子", important_words=[u"菊花", u"院子"], force_data_build=False)
#user_input_dict = dict(title=u"浣溪沙", important_words=[u"山川", u"流水"], force_data_build=False)
#user_input_dict = dict(title=u"浣溪沙", important_words=[u"菊花", u"院子"], force_data_build=False)
#user_input_dict = dict(title=u"浣溪沙", important_words=[u"菊", u"院子"], force_data_build=False)
#if True:
for title in TitleRhythmDict.keys():
#title = u"浣溪沙"
#title = u"水调歌头"
title = title.decode()
print title
mock_tags = {"天空":{"text":"天空","confidence":99},"草":{"text":"草","confidence":99},"户外":{"text":"户外","confidence":99},"山":{"text":"山","confidence":99},"田地":{"text":"田地","confidence":98},"绿色":{"text":"绿色","confidence":93},"自然":{"text":"自然","confidence":93},"动物":{"text":"动物","confidence":81},"绿色的":{"text":"绿色的","confidence":70},"放牧":{"text":"放牧","confidence":70},"打开":{"text":"打开","confidence":65},"牧场":{"text":"牧场","confidence":64},"青葱的":{"text":"青葱的","confidence":56},"高地":{"text":"高地","confidence":48},"黄牛":{"text":"黄牛","confidence":42},"平原":{"text":"平原","confidence":27},"距离":{"text":"距离","confidence":13}}
important_words = []
for mock_tag in mock_tags.keys():
important_words.append(mock_tag.decode())
user_input_dict = dict(title=title, important_words=important_words, force_data_build=False)
# Init
u = random.random() * 0.8
print 'ratio', u
generator.search_ratio = u
generator.force_data_build = user_input_dict["force_data_build"]
generator.init(logger)
# Generate poem
print 'title', title
error_info = generator.check(user_input_dict, logger)
if not error_info:
generator.important_words = user_input_dict["important_words"]
generator.title = user_input_dict["title"]
logger.info("generate poem for title %s, with important words %s" % (generator.title, my_unicode(generator.important_words)))
print generator.generate(logger)
else:
logger.error("dont generate poem because of %s" % error_info)
print error_info
except ValueError as e: