forked from Gumtree/Echidna_scripts
-
Notifications
You must be signed in to change notification settings - Fork 0
/
CifFile.py
2050 lines (1903 loc) · 97.9 KB
/
CifFile.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
1.This Software copyright \u00A9 Australian Synchrotron Research Program Inc, ("ASRP").
2.Subject to ensuring that this copyright notice and licence terms
appear on all copies and all modified versions, of PyCIFRW computer
code ("this Software"), a royalty-free non-exclusive licence is hereby
given (i) to use, copy and modify this Software including the use of
reasonable portions of it in other software and (ii) to publish,
bundle and otherwise re-distribute this Software or modified versions
of this Software to third parties, provided that this copyright notice
and terms are clearly shown as applying to all parts of software
derived from this Software on each occasion it is published, bundled
or re-distributed. You are encouraged to communicate useful
modifications to ASRP for inclusion for future versions.
3.No part of this Software may be sold as a standalone package.
4.If any part of this Software is bundled with Software that is sold,
a free copy of the relevant version of this Software must be made
available through the same distribution channel (be that web server,
tape, CD or otherwise).
5.It is a term of exercise of any of the above royalty free licence
rights that ASRP gives no warranty, undertaking or representation
whatsoever whether express or implied by statute, common law, custom
or otherwise, in respect of this Software or any part of it. Without
limiting the generality of the preceding sentence, ASRP will not be
liable for any injury, loss or damage (including consequential loss or
damage) or other loss, loss of profits, costs, charges or expenses
however caused which may be suffered, incurred or arise directly or
indirectly in respect of this Software.
6. This Software is not licenced for use in medical applications.
"""
from types import *
import re
import StarFile
import sys
class CifLoopBlock(StarFile.LoopBlock):
def __init__(self,data=(),dimension=0,**kwargs):
self.loopclass = CifLoopBlock
if dimension > 1:
raise CifError( 'Attempt to nest loops, loop level %d' % dimension)
super(CifLoopBlock,self).__init__(data,dimension=dimension,**kwargs)
def __iter__(self):
return self.recursive_iter()
def AddLoopItem(self,data,precheck=False):
StarFile.LoopBlock.AddLoopItem(self,data,precheck,maxlength=75)
def insert_loop(self,newloop,**kwargs):
if newloop.dimension > 1:
raise CifError( 'Attempt to insert inner loop, loop level %d' % dimension)
StarFile.LoopBlock.insert_loop(self,newloop,**kwargs)
def Loopify(self,datanamelist):
if self.dimension > 0:
return
else:
StarFile.LoopBlock.Loopify(self,datanamelist)
# convert new loop to CifLoopBlock
newloop = self.GetLoop(datanamelist[0])
cifloop = CifLoopBlock(newloop)
self.remove_loop(newloop)
self.insert_loop(cifloop)
class CifBlock(CifLoopBlock):
def __init__(self,data = (), strict = 1, maxoutlength=2048,wraplength=80,overwrite=True,dimension=0):
self.strict = strict
super(CifBlock,self).__init__(data=data,dimension=0,maxoutlength=maxoutlength,wraplength=wraplength,overwrite=overwrite)
if isinstance(data,(StarFile.StarBlock,CifBlock)):
self.saves = StarFile.BlockCollection(datasource=data["saves"],element_class=CifBlock,type_tag="save")
else:
self.saves = StarFile.BlockCollection(element_class=CifLoopBlock,type_tag="save")
if self.strict:
self.checklengths()
self.dictionary = None
def RemoveCifItem(self,itemname):
CifLoopBlock.RemoveLoopItem(self,itemname)
def __getitem__(self,key):
if key == "saves":
return self.saves
try:
rawitem = CifLoopBlock.__getitem__(self,key)
except KeyError:
if self.dictionary:
# send the dictionary the required key and a pointer to us
rawitem = self.dictionary.derive_item(key,self)
else:
raise KeyError, 'No such item: %s' % key
# we now have an item, we can try to convert it to a number if that is appropriate
if not self.dictionary or not self.dictionary.has_key(key): return rawitem
return self.dictionary.change_type(key,rawitem)
def __setitem__(self,key,value):
if key == "saves":
self.saves[key] = value
else:
self.AddCifItem((key,value))
def clear(self):
CifLoopBlock.clear(self)
self.saves = StarFile.BlockCollection(element_class=CifLoopBlock,type_tag="save_")
def copy(self):
newblock = CifLoopBlock.copy(self)
newblock.saves = self.saves.copy()
return self.copy.im_class(newblock) #catch inheritance
def has_key(self,key):
if key == "saves": return 1
else: return CifLoopBlock.has_key(self,key)
def __str__(self):
retstr = ''
for sb in self.saves.keys():
retstr = retstr + '\nsave_%s\n\n' % sb
self.saves[sb].SetOutputLength(self.wraplength,self.maxoutlength)
retstr = retstr + str(self.saves[sb])
retstr = retstr + '\nsave_\n\n'
return retstr + CifLoopBlock.__str__(self)
# this is not appropriate for save blocks. Instead, the save block
# should be accessed directly for update
def update(self,adict):
loopdone = []
if not isinstance(adict,CifBlock):
raise TypeError
for key in adict.block.keys():
self.AddCifItem((key,adict[key]))
for aloop in adict.loops:
self.insert_loop(aloop,audit=True)
def AddCifItem(self,data):
# we accept only tuples, strings and lists!!
if not (isinstance(data[0],(StringType,TupleType,ListType))):
raise TypeError, 'Cif datanames are either a string, tuple or list'
# single items passed straight through to underlying routine
# we catch single item loops as well...
if isinstance(data[0],StringType):
if isinstance(data[1],(TupleType,ListType)) and not isinstance(data[1],(StarFile.StarList,StarFile.StarTuple)):
CifLoopBlock.AddLoopItem(self,((data[0],),((data[1],))))
else:
CifLoopBlock.AddLoopItem(self,data)
return
# otherwise, we unpack one level and send along. This is different
# to the StarBlock behaviour, which assumes that any tuples imply an
# inner loop.
keyvals = zip(data[0],data[1])
map(lambda a:CifLoopBlock.AddLoopItem(self,a),keyvals)
def checklengths(self):
toolong = filter(lambda a:len(a)>75, self.keys())
outstring = ""
for it in toolong: outstring += "\n" + it
if toolong:
raise CifError( 'Following data names too long:' + outstring)
def loopnames(self):
return map(lambda a:a.keys(),self.loops)
def assign_dictionary(self,dic):
if not dic.diclang=="DDLm":
print "Warning: ignoring dictionary %s" % dic.dic_as_cif.my_uri
return
self.dictionary = dic
def merge(self,new_block,mode="strict",match_att=[],match_function=None,nosaves=False,
rel_keys = []):
# deal with save frames
if not nosaves:
self["saves"].merge(new_block["saves"],mode,match_att=match_att,
match_function=match_function)
if mode == 'strict':
for key in new_block.item_order:
if self.has_key(key) and key not in match_att:
raise CifError( "Identical keys %s in strict merge mode" % key)
elif key not in match_att: #no change otherwise
if isinstance(key,StringType):
self[key] = new_block[key]
else:
self.insert_loop(key)
elif mode == 'replace':
newkeys = new_block.keys()
for ma in match_att:
try:
newkeys.remove(ma) #don't touch the special ones
except ValueError:
pass
for key in new_block.item_order:
if isinstance(key,StringType):
self[key] = new_block[key]
else:
self.insert_loop(key) #assume is a loop
elif mode == 'overlay':
for attribute in new_block.keys():
if attribute in match_att: continue #ignore this one
new_value = new_block[attribute]
#non-looped items
if isinstance(new_value,StringType):
self[attribute] = new_value
these_atts = self.keys()
for newloop in new_block.loops:
newkeys = newloop.keys()
# note that the following line determines packet item order
overlaps = filter(lambda a: a in these_atts,newkeys)
if len(overlaps)< len(newloop):#completely new loop
self.insert_loop(newloop)
elif len(overlaps)==len(newloop):
# appending packets
# print "In overlay merge mode, found extra packet items:"
# print `overlaps`
# get key position
loop_keys = filter(lambda a:a in rel_keys,overlaps)
try:
newkeypos = map(lambda a:newkeys.index(a),loop_keys)
newkeypos = newkeypos[0] #one key per loop for now
loop_keys = loop_keys[0]
except (ValueError,IndexError):
newkeypos = []
overlap_data = map(lambda a:listify(self[a]),overlaps) #old packet data
new_data = map(lambda a:new_block[a],overlaps) #new packet data
packet_data = transpose(overlap_data)
new_p_data = transpose(new_data)
# remove any packets for which the keys match between old and new; we
# make the arbitrary choice that the old data stays
if newkeypos:
# get matching values in new list
print "Old, new data:\n%s\n%s" % (`overlap_data[newkeypos]`,`new_data[newkeypos]`)
key_matches = filter(lambda a:a in overlap_data[newkeypos],new_data[newkeypos])
# filter out any new data with these key values
new_p_data = filter(lambda a:a[newkeypos] not in key_matches,new_p_data)
if new_p_data:
new_data = transpose(new_p_data)
else: new_data = []
# wipe out the old data and enter the new stuff
byebyeloop = self.GetLoop(overlaps[0])
# print "Removing '%s' with overlaps '%s'" % (`byebyeloop`,`overlaps`)
# Note that if, in the original dictionary, overlaps are not
# looped, GetLoop will return the block itself. So we check
# for this case...
if byebyeloop != self:
self.remove_loop(byebyeloop)
self.AddCifItem(((overlaps,),(overlap_data,))) #adding old packets
for pd in new_p_data: #adding new packets
if pd not in packet_data:
for i in range(len(overlaps)):
#don't do this at home; we are appending
#to something in place
self[overlaps[i]].append(pd[i])
class CifFile(StarFile.StarFile):
def __init__(self,datasource=None,strict=1,**kwargs):
super(CifFile,self).__init__(datasource=datasource, blocktype=CifBlock,**kwargs)
self.strict = strict
self.header_comment = \
"""#\\#CIF1.1
##########################################################################
# Crystallographic Information Format file
# Produced by PyCifRW module
#
# This is a CIF file. CIF has been adopted by the International
# Union of Crystallography as the standard for data archiving and
# transmission.
#
# For information on this file format, follow the CIF links at
# http://www.iucr.org
##########################################################################
"""
def NewBlock(self,blockname,*nkwargs,**kwargs):
if len(blockname)>75:
raise CifError , 'Blockname %s is longer than 75 characters' % blockname
else:
StarFile.StarFile.NewBlock(self,blockname,*nkwargs,**kwargs)
class CifError(Exception):
def __init__(self,value):
self.value = value
def __str__(self):
return '\nCif Format error: '+ self.value
class ValidCifError(Exception):
def __init__(self,value):
self.value = value
def __str__(self):
return '\nCif Validity error: ' + self.value
class CifDic(StarFile.BlockCollection):
def __init__(self,dic,do_minimum=False,grammar='1.1'):
self.do_minimum = do_minimum
self.dic_as_cif = dic
self.template_cache = {} #for DDLm imports
self.ddlm_functions = {} #for DDLm functions
self.switch_numpy(False) #no Numpy arrays returned
if isinstance(dic,StringType):
self.dic_as_cif = CifFile(dic,grammar=grammar)
(self.dicname,self.diclang,self.defdata) = self.dic_determine(self.dic_as_cif)
super(CifDic,self).__init__(element_class=CifBlock,datasource=self.defdata)
self.scopes_mandatory = {"dictionary":[],"category":[],"item":[]}
self.scopes_naughty = {"dictionary":[],"category":[],"item":[]}
# rename and expand out definitions using "_name" in DDL dictionaries
if self.diclang == "DDL1":
self.DDL1_normalise() #this removes any non-definition entries
self.ddl1_cat_load()
elif self.diclang == "DDL2":
self.DDL2_normalise() #iron out some DDL2 tricky bits
elif self.diclang == "DDLm":
self.ddlm_normalise()
self.ddlm_import() #recursively calls this routine
if not self.do_minimum:
print "Doing full dictionary initialisation"
self.ddlm_parse_valid() #extract validity information from data block
self.transform_drel() #parse the drel functions
self.add_drel_funcs() #put the drel functions into the namespace
self.add_category_info()
# initialise type information
self.typedic={}
self.primdic = {} #typecode<->primitive type translation
self.add_type_info()
self.item_validation_funs = [
self.validate_item_type,
self.validate_item_esd,
self.validate_item_enum, # functions which check conformance
self.validate_enum_range,
self.validate_looping]
self.loop_validation_funs = [
self.validate_loop_membership,
self.validate_loop_key,
self.validate_loop_references] # functions checking loop values
self.global_validation_funs = [
self.validate_exclusion,
self.validate_parent,
self.validate_child,
self.validate_dependents,
self.validate_uniqueness] # where we need to look at other values
self.block_validation_funs = [ # where only a full block will do
self.validate_mandatory_category]
self.global_remove_validation_funs = [
self.validate_remove_parent_child] # removal is quicker with special checks
self.optimize = False # default value
self.done_parents = []
self.done_children = []
self.done_keys = []
# debug
# j = open("dic_debug","w")
# j.write(self.__str__())
# j.close()
def dic_determine(self,cifdic):
if cifdic.has_key("on_this_dictionary"):
self.master_key = "on_this_dictionary"
self.type_spec = "_type"
self.enum_spec = "_enumeration"
self.cat_spec = "_category"
self.esd_spec = "_type_conditions"
self.must_loop_spec = "_list"
self.must_exist_spec = "_list_mandatory"
self.list_ref_spec = "_list_reference"
self.unique_spec = "_list_uniqueness"
self.child_spec = "_list_link_child"
self.parent_spec = "_list_link_parent"
self.related_func = "_related_function"
self.related_item = "_related_item"
self.primitive_type = "_type"
self.dep_spec = "xxx"
self.cat_list = [] #to save searching all the time
name = cifdic["on_this_dictionary"]["_dictionary_name"]
version = cifdic["on_this_dictionary"]["_dictionary_version"]
return (name+version,"DDL1",cifdic)
elif len(cifdic.keys()) == 1: # DDL2/DDLm
self.master_key = cifdic.keys()[0]
name = cifdic[self.master_key]["_dictionary.title"]
version = cifdic[self.master_key]["_dictionary.version"]
if name != self.master_key:
print "Warning: DDL2 blockname %s not equal to dictionary name %s" % (self.master_key,name)
if cifdic[self.master_key].has_key("_dictionary.class"): #DDLm
self.unique_spec = "_category_key.generic"
return(name+version,"DDLm",cifdic[self.master_key]["saves"])
#otherwise DDL2
self.type_spec = "_item_type.code"
self.enum_spec = "_item_enumeration.value"
self.esd_spec = "_item_type_conditions.code"
self.cat_spec = "_item.category_id"
self.loop_spec = "there_is_no_loop_spec!"
self.must_loop_spec = "xxx"
self.must_exist_spec = "_item.mandatory_code"
self.child_spec = "_item_linked.child_name"
self.parent_spec = "_item_linked.parent_name"
self.related_func = "_item_related.function_code"
self.related_item = "_item_related.related_name"
self.unique_spec = "_category_key.name"
self.list_ref_spec = "xxx"
self.primitive_type = "_type"
self.dep_spec = "_item_dependent.dependent_name"
return (name+version,"DDL2",cifdic[self.master_key]["saves"])
else:
raise CifError, "Unable to determine dictionary DDL version"
def DDL1_normalise(self):
# add default type information in DDL2 style
# initial types and constructs
base_types = ["char","numb","null"]
prim_types = base_types[:]
base_constructs = [".*",
'(-?(([0-9]*[.][0-9]+)|([0-9]+)[.]?)([(][0-9]+[)])?([eEdD][+-]?[0-9]+)?)|\?|\.',
"\"\" "]
for key,value in self.dictionary.items():
if value.has_key("_name"):
real_name = value["_name"]
if type(real_name) is ListType: #looped values
for looped_name in real_name:
new_value = value.copy()
new_value["_name"] = looped_name #only looped name
self.dictionary[looped_name] = new_value
else: self.dictionary[real_name] = value
# delete the old one
del self.dictionary[key]
# loop again to normalise the contents of each definition
for key,value in self.dictionary.items():
# deal with a missing _list, _type_conditions
if not value.has_key("_list"): value["_list"] = 'no'
if not value.has_key("_type_conditions"): value["_type_conditions"] = 'none'
# deal with enumeration ranges
if value.has_key("_enumeration_range"):
max,min = self.getmaxmin(value["_enumeration_range"])
if min == ".":
self.dictionary[key].AddLoopItem((("_item_range.maximum","_item_range.minimum"),((max,max),(max,min))))
elif max == ".":
self.dictionary[key].AddLoopItem((("_item_range.maximum","_item_range.minimum"),((max,min),(min,min))))
else:
self.dictionary[key].AddLoopItem((("_item_range.maximum","_item_range.minimum"),((max,max,min),(max,min,min))))
#add any type construct information
if value.has_key("_type_construct"):
base_types.append(value["_name"]+"_type") #ie dataname_type
base_constructs.append(value["_type_construct"]+"$")
prim_types.append(value["_type"]) #keep a record
value["_type"] = base_types[-1] #the new type name
#make categories conform with ddl2
#note that we must remove everything from the last underscore
if value["_category"] == "category_overview":
last_under = value["_name"].rindex("_")
catid = value["_name"][1:last_under]
value["_category.id"] = catid #remove square bracks
if catid not in self.cat_list: self.cat_list.append(catid)
# we now add any missing categories before filling in the rest of the
# information
for key,value in self.dictionary.items():
if self[key].has_key("_category"):
if self[key]["_category"] not in self.cat_list:
# rogue category, add it in
newcat = self[key]["_category"]
fake_name = "_" + newcat + "_[]"
newcatdata = CifBlock()
newcatdata["_category"] = "category_overview"
newcatdata["_category.id"] = newcat
newcatdata["_type"] = "null"
self[fake_name] = newcatdata
self.cat_list.append(newcat)
# write out the type information in DDL2 style
self.dic_as_cif[self.master_key].AddLoopItem((
("_item_type_list.code","_item_type_list.construct",
"_item_type_list.primitive_code"),
(base_types,base_constructs,prim_types)
))
def create_pcloop(self,definition):
old_children = self[definition].get('_item_linked.child_name',[])
old_parents = self[definition].get('_item_linked.parent_name',[])
if isinstance(old_children,StringType):
old_children = [old_children]
if isinstance(old_parents,StringType):
old_parents = [old_parents]
if (len(old_children)==0 and len(old_parents)==0) or \
(len(old_children) > 1 and len(old_parents)>1):
return
if len(old_children)==0:
old_children = [definition]*len(old_parents)
if len(old_parents)==0:
old_parents = [definition]*len(old_children)
newloop = CifLoopBlock(dimension=1)
newloop.AddLoopItem(('_item_linked.parent_name',old_parents))
newloop.AddLoopItem(('_item_linked.child_name',old_children))
try:
del self[definition]['_item_linked.parent_name']
del self[definition]['_item_linked.child_name']
except KeyError:
pass
self[definition].insert_loop(newloop)
def DDL2_normalise(self):
listed_defs = filter(lambda a:isinstance(self[a].get('_item.name'),ListType),self.keys())
# now filter out all the single element lists!
dodgy_defs = filter(lambda a:len(self[a]['_item.name']) > 1, listed_defs)
for item_def in dodgy_defs:
# print "DDL2 norm: processing %s" % item_def
thisdef = self[item_def]
packet_no = thisdef['_item.name'].index(item_def)
realcat = thisdef['_item.category_id'][packet_no]
realmand = thisdef['_item.mandatory_code'][packet_no]
# first add in all the missing categories
# we don't replace the entry in the list corresponding to the
# current item, as that would wipe out the information we want
for child_no in range(len(thisdef['_item.name'])):
if child_no == packet_no: continue
child_name = thisdef['_item.name'][child_no]
child_cat = thisdef['_item.category_id'][child_no]
child_mand = thisdef['_item.mandatory_code'][child_no]
if not self.has_key(child_name):
self[child_name] = CifBlock()
self[child_name]['_item.name'] = child_name
self[child_name]['_item.category_id'] = child_cat
self[child_name]['_item.mandatory_code'] = child_mand
self[item_def]['_item.name'] = item_def
self[item_def]['_item.category_id'] = realcat
self[item_def]['_item.mandatory_code'] = realmand
target_defs = filter(lambda a:self[a].has_key('_item_linked.child_name') or \
self[a].has_key('_item_linked.parent_name'),self.keys())
# now dodgy_defs contains all definition blocks with more than one child/parent link
for item_def in dodgy_defs: self.create_pcloop(item_def) #regularise appearance
for item_def in dodgy_defs:
print 'Processing %s' % item_def
thisdef = self[item_def]
child_list = thisdef['_item_linked.child_name']
parents = thisdef['_item_linked.parent_name']
# for each parent, find the list of children.
family = zip(parents,child_list)
notmychildren = family #We aim to remove non-children
# Loop over the parents, relocating as necessary
while len(notmychildren):
# get all children of first entry
mychildren = filter(lambda a:a[0]==notmychildren[0][0],family)
# print "Parent %s: %d children" % (notmychildren[0][0],len(mychildren))
for parent,child in mychildren: #parent is the same for all
# Make sure that we simply add in the new entry for the child, not replace it,
# otherwise we might spoil the child entry loop structure
try:
childloop = self[child].GetLoop('_item_linked.parent_name')
except KeyError:
# print 'Creating new parent entry %s for definition %s' % (parent,child)
self[child]['_item_linked.parent_name'] = [parent]
childloop = self[child].GetLoop('_item_linked.parent_name')
childloop.AddLoopItem(('_item_linked.child_name',[child]))
continue
else:
# A parent loop already exists and so will a child loop due to the
# call to create_pcloop above
pars = [a for a in childloop if getattr(a,'_item_linked.child_name','')==child]
goodpars = [a for a in pars if getattr(a,'_item_linked.parent_name','')==parent]
if len(goodpars)>0: #no need to add it
#print 'Skipping duplicated parent - child entry in %s: %s - %s' % (child,parent,child)
continue
# print 'Adding %s to %s entry' % (parent,child)
newpacket = childloop.GetPacket(0) #essentially a copy, I hope
setattr(newpacket,'_item_linked.child_name',child)
setattr(newpacket,'_item_linked.parent_name',parent)
childloop.AddPacket(newpacket)
#
# Make sure the parent also points to the children. We get
# the current entry, then add our
# new values if they are not there already
#
parent_name = mychildren[0][0]
old_children = self[parent_name].get('_item_linked.child_name',[])
old_parents = self[parent_name].get('_item_linked.parent_name',[])
oldfamily = zip(old_parents,old_children)
newfamily = []
# print 'Old parents -> %s' % `old_parents`
for jj, childname in mychildren:
alreadythere = filter(lambda a:a[0]==parent_name and a[1] ==childname,oldfamily)
if len(alreadythere)>0: continue
# 'Adding new child %s to parent definition at %s' % (childname,parent_name)
old_children.append(childname)
old_parents.append(parent_name)
# Now output the loop, blowing away previous definitions. If there is something
# else in this category, we are destroying it.
newloop = CifLoopBlock(dimension=1)
newloop.AddLoopItem(('_item_linked.parent_name',old_parents))
newloop.AddLoopItem(('_item_linked.child_name',old_children))
del self[parent_name]['_item_linked.parent_name']
del self[parent_name]['_item_linked.child_name']
self[parent_name].insert_loop(newloop)
# print 'New parents -> %s' % `self[parent_name]['_item_linked.parent_name']`
# now make a new,smaller list
notmychildren = filter(lambda a:a[0]!=mychildren[0][0],notmychildren)
# now flatten any single element lists
single_defs = filter(lambda a:len(self[a]['_item.name'])==1,listed_defs)
for flat_def in single_defs:
flat_keys = self[flat_def].GetLoop('_item.name').keys()
for flat_key in flat_keys: self[flat_def][flat_key] = self[flat_def][flat_key][0]
# now deal with the multiple lists
# next we do aliases
all_aliases = filter(lambda a:self[a].has_key('_item_aliases.alias_name'),self.keys())
for aliased in all_aliases:
my_aliases = listify(self[aliased]['_item_aliases.alias_name'])
for alias in my_aliases:
self[alias] = self[aliased].copy() #we are going to delete stuff...
del self[alias]["_item_aliases.alias_name"]
def ddlm_normalise(self):
for key,value in self.dictionary.items():
if value.has_key("_name.category_id"):
real_name = "_" + value["_name.category_id"] + "." + value["_name.object_id"]
self[real_name] = value
# delete the old one
del self[key]
def ddlm_parse_valid(self):
if not self.dic_as_cif[self.master_key].has_key("_dictionary_valid.scope"):
return
for scope_pack in self.dic_as_cif[self.master_key].GetLoop("_dictionary_valid.scope"):
scope = getattr(scope_pack,"_dictionary_valid.scope")
valid_info = getattr(scope_pack,"_dictionary_valid.attributes")
valid_info = valid_info.split()
for i in range(0,len(valid_info),2):
if valid_info[i]=="+":
self.scopes_mandatory[scope.lower()].append(valid_info[i+1].lower())
elif valid_info[i]=="!":
self.scopes_naughty[scope.lower()].append(valid_info[i+1].lower())
def ddlm_import(self):
import urllib
#first check the outermost datablocks. Note we expect our dREL
#machinery to create _import_list.id only if the individual values are available
#For this to happen, we need the ddl.dic to have been assigned
try:
to_be_imported = self.dic_as_cif[self.master_key]["_import_list.id"]
except KeyError:
pass
else:
# deal with foreshortened import blocks
for import_target in to_be_imported:
if len(import_target)==3: #default values have been left off
import_target.append('Exit')
import_target.append('Exit')
for scope,dict_block,file_loc,on_dupl,on_miss in to_be_imported:
scope = scope.lower() #work around capitalisation in draft dics
if scope == 'att' or scope == 'sta' or scope == 'val':
print 'Improper import directive at top level in %s: ignored' % self.master.key
continue
# resolve URI
full_uri = self.resolve_path(file_loc)
dic_as_cif = CifFile(urllib.urlopen(full_uri),grammar="DDLm")
import_from = CifDic(dic_as_cif,do_minimum=True) #this will recurse internal imports
# and now merge these definitions
if scope == "dic":
self.get_whole_dict(import_from,on_dupl,on_miss)
elif scope=="cat":
self.get_one_cat(import_from,dict_block,on_dupl,on_miss)
elif scope=="grp":
self.get_one_cat_with_children(import_from,dict_block,on_dupl,on_miss)
elif scope=="itm": #not clear what to do if category is missing
self.add_one_defn(import_from,dict_block,on_dupl,on_miss)
# it will never happen again...
del self.dic_as_cif[self.master_key]["_import_list.id"]
# next we resolve per-definition imports
for one_def in self.keys():
try:
to_be_imported = self[one_def]["_import_list.id"]
except KeyError:
pass
else:
if len(to_be_imported) == 5 and len(to_be_imported[0])!=5:
#catch an error in earlier versions of the dictionaries where
#the outer brackets were missing
to_be_imported = [to_be_imported]
# deal with foreshortened import blocks
for import_target in to_be_imported:
if len(import_target)==3: #default values have been left off
import_target.append('Exit')
import_target.append('Exit')
for scope,block,file_loc,on_dupl,on_miss in to_be_imported:
scope = scope.lower() #work around capitalisation in draft dics
if scope == 'dic' or scope == 'cat' or scope == 'grp' or scope == "itm":
print 'Improper import directive at definition level in %s: ignored' % self.master.key
continue
full_uri = self.resolve_path(file_loc)
if full_uri not in self.template_cache:
dic_as_cif = CifFile(urllib.urlopen(full_uri),grammar="DDLm")
self.template_cache[full_uri] = CifDic(dic_as_cif,do_minimum=True) #this will recurse internal imports
print 'Added %s to cached dictionaries' % full_uri
import_from = self.template_cache[full_uri]
if scope == 'att':
self.import_attributes(one_def,import_from,block,on_dupl,on_miss)
elif scope == 'sta':
self.import_loop(one_def,import_from,block,'_enumeration_set.state',on_miss)
elif scope == 'val':
self.import_loop(one_def,import_from,block,'_enumeration_default.value',on_miss)
else:
raise CifError, "Unrecognised import scope %s" % scope
# remove the import attribute
del self[one_def]["_import_list.id"]
def resolve_path(self,file_loc):
import urlparse
url_comps = urlparse.urlparse(file_loc)
if url_comps[0]: return file_loc #already full URI
new_url = urlparse.urljoin(self.dic_as_cif.my_uri,file_loc)
print "Transformed %s to %s for import " % (file_loc,new_url)
return new_url
def get_whole_dict(self,source_dict,on_dupl,on_miss):
print "Cat_map: `%s`" % source_dict.cat_map.values()
for source_cat in source_dict.cat_map.values():
self.get_one_cat(source_dict,source_cat,on_dupl,on_miss)
def get_one_cat(self,source_dict,source_cat,on_dupl,on_miss):
ext_cat = source_dict.get(source_cat,"")
this_cat = self.get(source_cat,"")
print "Adding category %s" % source_cat
if not ext_cat:
if on_miss == "Ignore":
pass
else:
raise CifError, "Missing category %s" % source_cat
else:
all_ext_defns = source_dict.keys()
cat_list = filter(lambda a:source_dict[a].get("_name.category_id","").lower()==source_cat.lower(),
all_ext_defns)
print "Items: %s" % `cat_list`
if this_cat: # The category block itself is duplicated
if on_dupl=="Ignore":
pass
elif on_dupl == "Exit":
raise CifError, "Duplicate category %s" % source_cat
else:
self[source_cat] = ext_cat
else:
self[source_cat] = ext_cat
# now do all member definitions
for cat_defn in cat_list:
self.add_one_defn(source_dict,cat_defn,on_dupl)
def add_one_defn(self,source_dict,cat_defn,on_dupl):
if self.has_key(cat_defn):
if on_dupl == "Ignore": pass
elif on_dupl == "Exit":
raise CifError, "Duplicate definition %s" % cat_defn
else: self[cat_defn] = source_dict[cat_defn]
else: self[cat_defn] = source_dict[cat_defn]
print " "+cat_defn
def get_one_cat_with_children(self,source_dict,source_cat,on_dupl,on_miss):
self.get_one_cat(source_dict,source_cat,on_dupl,on_miss)
child_cats = filter(lambda a:source_dict[a]["_category.parent_id"]==source_dict[source_cat]["_definition.id"],source_dict.cat_map.values())
for child_cat in child_cats: self.get_one_cat(source_dict,child_cat,on_dupl,on_miss)
def import_attributes(self,mykey,source_dict,source_def,on_dupl,on_miss):
# process missing
if not source_dict.has_key(source_def):
if on_miss == 'Exit':
raise CifError, 'Missing definition for import %s' % source_def
else: return #nothing else to do
# now do the import
print 'Adding attributes from %s to %s' % (source_def,mykey)
self[mykey].merge(source_dict[source_def],mode='replace',match_att= \
['_definition.id','_name.category_id','_name.object_id'])
def import_loop(self,mykey,source_dict,source_def,loop_name,on_miss):
# process imssing
if not source_dict.has_key(source_def):
if on_miss == 'Exit':
raise CifError, 'Missing definition for import %s' % source_def
else: return #nothing else to do
print 'Adding %s attributes from %s to %s' % (loop_name,source_def,mykey)
state_loop = source_dict[source_def].GetLoop(loop_name)
self[mykey].insert_loop(state_loop)
def ddl1_cat_load(self):
deflist = self.keys() #slight optimization
cat_mand_dic = {}
cat_unique_dic = {}
# a function to extract any necessary information from each definition
def get_cat_info(single_def):
if self[single_def].get(self.must_exist_spec)=='yes':
thiscat = self[single_def]["_category"]
curval = cat_mand_dic.get(thiscat,[])
curval.append(single_def)
cat_mand_dic[thiscat] = curval
# now the unique items...
# cif_core.dic throws us a curly one: the value of list_uniqueness is
# not the same as the defined item for publ_body_label, so we have
# to collect both together. We assume a non-listed entry, which
# is true for all current (May 2005) ddl1 dictionaries.
if self[single_def].get(self.unique_spec,None)!=None:
thiscat = self[single_def]["_category"]
new_unique = self[single_def][self.unique_spec]
uis = cat_unique_dic.get(thiscat,[])
if single_def not in uis: uis.append(single_def)
if new_unique not in uis: uis.append(new_unique)
cat_unique_dic[thiscat] = uis
map(get_cat_info,deflist) # apply the above function
for cat in cat_mand_dic.keys():
cat_entry = self.get_ddl1_entry(cat)
self[cat_entry]["_category_mandatory.name"] = cat_mand_dic[cat]
for cat in cat_unique_dic.keys():
cat_entry = self.get_ddl1_entry(cat)
self[cat_entry]["_category_key.name"] = cat_unique_dic[cat]
# A helper function get find the entry corresponding to a given category name:
# yes, in DDL1 the actual name is different in the category block due to the
# addition of square brackets which may or may not contain stuff.
def get_ddl1_entry(self,cat_name):
chop_len = len(cat_name)
possibles = filter(lambda a:a[1:chop_len+3]==cat_name+"_[",self.keys())
if len(possibles) > 1 or possibles == []:
raise ValidCifError, "Category name %s can't be matched to category entry" % cat_name
else:
return possibles[0]
def add_type_info(self):
if self.dic_as_cif[self.master_key].has_key("_item_type_list.construct"):
types = self.dic_as_cif[self.master_key]["_item_type_list.code"]
prim_types = self.dic_as_cif[self.master_key]["_item_type_list.primitive_code"]
constructs = map(lambda a: a + "$", self.dic_as_cif[self.master_key]["_item_type_list.construct"])
# add in \r wherever we see \n, and change \{ to \\{
def regex_fiddle(mm_regex):
brack_match = r"((.*\[.+)(\\{)(.*\].*))"
ret_match = r"((.*\[.+)(\\n)(.*\].*))"
fixed_regexp = mm_regex[:] #copy
# fix the brackets
bm = re.match(brack_match,mm_regex)
if bm != None:
fixed_regexp = bm.expand(r"\2\\\\{\4")
# fix missing \r
rm = re.match(ret_match,fixed_regexp)
if rm != None:
fixed_regexp = rm.expand(r"\2\3\\r\4")
#print "Regexp %s becomes %s" % (mm_regex,fixed_regexp)
return fixed_regexp
constructs = map(regex_fiddle,constructs)
packed_up = map(None,types,constructs)
for typecode,construct in packed_up:
self.typedic[typecode] = re.compile(construct,re.MULTILINE|re.DOTALL)
# now make a primitive <-> type construct mapping
packed_up = map(None,types,prim_types)
for typecode,primtype in packed_up:
self.primdic[typecode] = primtype
def add_category_info(self):
if self.diclang == "DDLm":
categories = filter(lambda a:self[a].get("_definition.scope","Item")=="Category",self.keys())
category_ids = map(lambda a:self[a]["_definition.id"],categories)
else:
categories = filter(lambda a:self[a].has_key("_category.id"),self.keys())
# get the category id
category_ids = map(lambda a:self[a]["_category.id"],categories)
# match ids and entries in the dictionary
catpairs = map(None,category_ids,categories)
self.cat_map = {}
for catid,cat in catpairs:self.cat_map[catid] = cat
def names_in_cat(self,cat):
nameblocks = filter(lambda a:self[a].get("_name.category_id","").lower()
==cat.lower(),self.keys())
return map(lambda a:"_" + self[a]["_name.category_id"]+"." + self[a]["_name.object_id"],nameblocks)
def get_key_pack(self,category,value,data):
keyname = self[category][self.unique_spec]
onepack = data.GetPackKey(keyname,value)
return onepack
def get_number_with_esd(numstring):
import string
numb_re = '((-?(([0-9]*[.]([0-9]+))|([0-9]+)[.]?))([(][0-9]+[)])?([eEdD][+-]?[0-9]+)?)|(\?)|(\.)'
our_match = re.match(numb_re,numstring)
if our_match:
a,base_num,b,c,dad,dbd,esd,exp,q,dot = our_match.groups()
# print "Debug: %s -> %s" % (numstring, `our_match.groups()`)
else:
return None,None
if dot or q: return None,None #a dot or question mark
if exp: #has exponent
exp = string.replace(exp,"d","e") # mop up old fashioned numbers
exp = string.replace(exp,"D","e")
base_num = base_num + exp
#print "Debug: have %s for base_num from %s" % (base_num,numstring)
base_num = float(base_num)
# work out esd, if present.
if esd:
esd = float(esd[1:-1]) # no brackets
if dad: # decimal point + digits
esd = esd * (10 ** (-1* len(dad)))
if exp:
esd = esd * (10 ** (float(exp[1:])))
return base_num,esd
def getmaxmin(self,rangeexp):
regexp = '(-?(([0-9]*[.]([0-9]+))|([0-9]+)[.]?)([eEdD][+-]?[0-9]+)?)*'
regexp = regexp + ":" + regexp
regexp = re.match(regexp,rangeexp)
try:
minimum = regexp.group(1)
maximum = regexp.group(7)
except AttributeError:
print "Can't match %s" % rangeexp
if minimum == None: minimum = "."
else: minimum = float(minimum)
if maximum == None: maximum = "."
else: maximum = float(maximum)
return maximum,minimum
def transform_drel(self):
import drel_yacc
parser = drel_yacc.parser
my_namespace = self.keys()
my_namespace = dict(map(None,my_namespace,my_namespace))
parser.loopable_cats = filter(lambda a:self[a].get("_definition.class","Datum")=="List",self.keys())
parser.loopable_cats = map(lambda a:self[a]["_definition.id"],parser.loopable_cats)
parser.listable_items = filter(lambda a:"*" in self[a].get("_type.dimension",""),self.keys())
derivable_list = filter(lambda a:self[a].has_key("_method.expression") and self[a].get("_definition.scope","")!='Category' and self[a].get("_name.category_id","")!= "function",self.keys())
for derivable in derivable_list:
parser.target_id = derivable
# reset the list of visible names for parser
parser.special_id = [my_namespace]
# reset list of looped with statements
parser.withtable = {}
print "Target id: %s" % derivable
drel_expr = self[derivable]["_method.expression"]
if isinstance(drel_expr,ListType):
drel_expr = drel_expr[0]
print "Transforming %s" % drel_expr
# List categories are treated differently...
pyth_meth = parser.parse(drel_expr,debug=True)
self[derivable]["_loop_categories"] = pyth_meth[1].keys()
self[derivable]["_method.expression"] = drel_yacc.make_func(pyth_meth,"pyfunc",None)
print "Final result:\n " + self[derivable]["_method.expression"]
def add_drel_funcs(self):
import drel_yacc
funclist = filter(lambda a:self[a].get("_name.category_id","")=='function',self.keys())
funcnames = map(lambda a:self[a]["_name.object_id"],funclist)
funcbodys = map(lambda a:self[a]["_method.expression"],funclist)
# create executable python code...
parser = drel_yacc.parser
for funcname,funcbody in zip(funcnames,funcbodys):
parser.target_id = funcname
parser.special_id = [{}] #first element is always global namespace of dictionary
parser.withtable = {}
res,ww = parser.parse(funcbody[0])
print 'dREL library function ->\n' + res
global_table = globals()
global_table.update(self.ddlm_functions)
exec res in global_table #add to namespace
print "All functions -> " + `self.ddlm_functions`
def switch_numpy(self,to_val):
if to_val:
self.recursive_numerify = self.numpy_numerify
else:
self.recursive_numerify = self.normal_numerify
def derive_item(self,key,cifdata,store_value = False):
# store any default value in case we have a problem
def_val = self[key].get("_enumeration.default","")
def_index_val = self[key].get("_enumeration.def_index_id","")
the_func = self[key].get("_method.expression","")
if def_val and not the_func : return def_val
if def_index_val and not the_func: #derive a default value
index_vals = self[key]["_enumeration_default.index"]
val_to_index = cifdata[def_index_val] #what we are keying on
# Handle loops
if isinstance(val_to_index,ListType):
keypos = map(lambda a:index_vals.index(a),val_to_index)
result = map(lambda a:self[key]["_enumeration_default.value"][a] ,keypos)
else:
keypos = index_vals.index(val_to_index) #value error if no such value available
result = self[key]["_enumeration_default.value"][keypos]
print "Indexed on %s to get %s for %s" % (def_index_val,`result`,`val_to_index`)
return result
# read it in
the_category = self[key]["_name.category_id"]
the_type = self[the_category]["_definition.class"]
global_table = globals()
global_table.update(self.ddlm_functions)
exec the_func in global_table,locals() #will access dREL functions, puts "pyfunc" in scope
print 'Executing following function'
print the_func
print 'With following loop categories:' + `self[key].get("_loop_categories","")`
# print 'in following global environment: ' + `global_table`
if self[key].get("_loop_categories",""):
loop_category = self[key]["_loop_categories"][0]