-
Notifications
You must be signed in to change notification settings - Fork 0
/
NDP-h.py
2289 lines (1916 loc) · 97.1 KB
/
NDP-h.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# NDP-h.py
#
# Non-Deterministic Processor (NDP) - efficient parallel SAT-solver
# In honor of Youcef Hamadache.
# Copyright (c) 2024 GridSAT Stiftung
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
# GridSAT Stiftung - Georgstr. 11 - 30159 Hannover - Germany - ipfs: gridsat.eth/ - info@gridsat.io
#
"""
Non-Deterministic Processor (NDP-h)
====================================================
The Non-Deterministic Processor (NDP-h) is a tribute to Youcef Hamadache, devised as an
efficient parallel SAT-solver. This tool is exclusively tailored for efficiently solving
CNFs generated for Paul Purdom and Amr Sabry's Factoring Problems.
Parallel Breadth-First Search and Preparation for Parallel Iterative Processing:
--------------------------------------------------------------------------------
NDP-h initiates its resolution flow with a parallel CPU breadth-first search, dissecting the SAT problem
into smaller, independently solvable sub-formulas. This strategic decomposition into distinct sub-formulas
is pivotal for the NDP's efficiency, as it prepares the groundwork for further leveraging distributed
computing to tackle each sub-formula concurrently.
Parallel Processing with Ray:
------------------------------
Following the specific organization and prioritization of clauses, NDP-h employs Ray, a distributed
computing framework, for the parallel processing of the identified sub-formulas. This integration
enables NDP-h to distribute the computational workload across multiple CPU cores in a cluster,
significantly enhancing the NDP's capacity to address more complex SAT problems efficiently.
Method:
-------
At the core of NDP-h's approach is an exclusive implementation non-conform with the theory coded in
NDP-blueprint-thief available at https://github.com/GridSAT/NDP-blueprint-thief
on IPFS ipfs://QmUezbkQV2bPh3HzYbLe8YsgNHgaRC3a7cWKh2kiVYrdAQin and at http://gridsat.io.
The NDP-h method only organizes clauses by their length and their initial index, giving precedence
to unit clauses.
Unit Clauses:
-------------
Unit clauses comprising a single variable represent the bits of the product. The sorting criteria ensures unit clauses
to be processed first, thereby streamlining the resolution process by focusing on critical variables at the beginning
of the computation.
First Assignment:
-----------------
Unlike the comprehensive NDP-blueprint-thief that outputs all solutions by default (except opted otherwise), NDP-h
always terminates upon finding the first satisfying assignment (provided the input is not prime) with no BDD output,
keeping the memory footprint low and almost constant during the whole resolution process.
Operational Dynamics:
---------------------
The compelling aspect of NDP-h emerges with the relationship between the overall problem size and the proportion
of new variables (VARs) required. While the absolute memory size and the number of variables tend to increase with
the complexity of the factorization challenges addressed, a nuanced efficiency is observed in the relative decrease
of the percentage of new VARs compared to the overall problem size. This efficiency is intrinsically linked to the
specific input format derived from Paul Purdom and Amr Sabry's CNF Generator for Factoring Problems, particularly
its asymmetric 3CNF transformation approach.
Conclusion:
-----------
The NDP-h approach to solving SAT problems pertaining to factorization challenges, demonstrates a significant
application of parallel processing techniques in computational number theory and cryptography. By integrating
a methodical decomposition of problems, employing unit clause prioritization and leveraging parallel computing
for resolution, NDP-h exemplifies an effective strategy for tackling complex SAT problems.
Its focus on terminating upon the first satisfying assignment underscores the solver's practical orientation
towards obtaining solutions efficiently, without delving into the exhaustive exploration of the potential solution
space with BDD generation.
Key Features:
------------
- Parses and validates DIMACS formatted files for Paul Purdom and Amr Sabry's CNF factorization challenges.
- Extracts essential metadata and prepares data for processing.
- Outputs processed data for use in further computation or analysis in .JASON, .txt, and .csv with input-, ID,
as well as stopping-size percentage in file name for streamlined result organization.
Installation and Usage Guide:
============================
Prerequisites:
-------------
- Python 3.x
- pip
- virtualenv (optional for creating isolated Python environments)
- Ray (for distributed computing)
requirements.txt
----------------
ray[default] # make a .txt file and copy into script directory
Installation Steps:
------------------
1. Start a screen session, e.g., screen -S NDP-h
2. Make a directory, e.g., mkdir NDP-h
3. Go to the directory, e.g., cd NDP-h
4. Clone the Repository: git clone https://github.com/GridSAT/NDP-h.git (if published)
or copy NDP-h.py and requirements.txt into the directory
5. Install Python 3 package manager (pip) and sysstat: sudo apt install python3-pip sysstat
6. Set Up a Virtual Environment (Optional): pip install virtualenv.
7. Create and activate a virtual environment: virtualenv NDP-h then source NDP-h/bin/activate
8. Install dependencies: pip install -r requirements.txt
9. Make a directory for the inputs, e.g., mkdir NDP-h/inputs
10. Generate DIMACS files at Paul Purdom and Amr Sabry's CNF Generator at:
https://cgi.luddy.indiana.edu/~sabry/cnf.html
For bit-wise input generation, use e.g.: https://bigprimes.org/RSA-challenge
or
generate DIMACS locally with: https://github.com/GridSAT/CNF_FACT-MULT
or on IPFS ipfs://QmYuzG46RnjhVXQj7sxficdRX2tUbzcTkSjZAKENMF5jba
11. Copy the generated DIMACS files into the inputs directory.
Using Ray for Distributed Computing:
-----------------------------------
- Follow Ray documentation at https://docs.ray.io/ for setup instructions.
- Example commands for starting the head node (without ray dashboard) in a cluster with 4 worker nodes:
Start head node without Ray Dashboard - example initialization with 4 CPUs as system reserves:
export RAY_DISABLE_IMPORT_WARNING=1
CPUS=$(( $(lscpu --online --parse=CPU | egrep -v '^#' | wc -l) - 4 ))
ray start --head --include-dashboard=false --disable-usage-stats --num-cpus=$CPUS
Start worker nodes - example initialization with 1 CPUs system reserves:
export RAY_DISABLE_IMPORT_WARNING=1
CPUS=$(( $(lscpu --online --parse=CPU | egrep -v '^#' | wc -l) - 1 ))
ray start --address='MASTER-IP:6379' --redis-password='MASTER-PASSWORT' --num-cpus=$CPUS
Running NDP:
-----------
Execute from the command line, specifying the path to the DIMACS formatted input file and optionally the output directory, e.g.:
python3 NDP-h.py inputs/rsaFACT-64bit.dimacs -d
CLI options: --input_file_path="inputs/INPUT.dimacs" --output_dir="outputs/"
Execution options
=================
-b Breath-First (BF) only outputting a result JSON
-r Resume from BF only choosing the respective result JSON from a list
-s Save BF JSON along the full NDP execution for later re-use
BF-options
==========
-q define a Queue Size for BF
-p any digit from 0 - 99% to specify % of VARs for BF
-a absolute #VARs for BF
-d default Queue Size setting next lower power of 2 of #CPUs (recommended)
If BF-options are not specified via CLI you will be prompted to enter respective values
Additional Resources:
--------------------
Paul Purdom and Amr Sabry's CNF Generator for Factoring Problems required.
visit https://cgi.luddy.indiana.edu/~sabry/cnf.html to generate a respective problem
or visit https://github.com/GridSAT/CNF_FACT-MULT for the source code
or visit ipfs://QmYuzG46RnjhVXQj7sxficdRX2tUbzcTkSjZAKENMF5jba for the source code on IPFS
File stats:
----------
1700 net code lines
216 comment lines
99 KB file size
"""
import os
import time
import random
import logging
import argparse
import csv
import hashlib
import re # Regular Expression module for string searching and manipulation
from collections import deque
import json
# Suppress deprecation warnings to keep the console output clean
os.environ['RAY_DEDUP_LOGS'] = '0'
# Disable Ray's log deduplication feature if needed
os.environ['PYTHONWARNINGS'] = "ignore::DeprecationWarning"
# Basic configuration for logging:
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
import ray # Ray is a framework for distributed computing
import argparse
import sys
import math
import subprocess
### Version of NDP code
NDP_h_VERSION = "3.2.2"
#Revision History
"""
1.0.0 - NDP-c with single CPU Breath-First (BF) and single CPU Satisfy Iterative
1.1.0 - NDP-c with single CPU BF and Parallel CPU Satisfy Iterative
1.2.0 - NDP-c with single CPU BF and Parallel CPU Satisfy Iterative with Ray
2.0.0 - NDP-g with GPU BF and CPU Satisfy Iterative
2.1.0 - NDP-g with GPU BF and Parallel CPU Satisfy Iterative
2.1.0 - NDP-g with GPU BF and Parallel CPU Satisfy Iterative with Ray
3.0.0 - NDP-h with CPU Parallel BF and Parallel CPU Satisfy Iterative
3.1.0 - NDP-h with Ray Parallel BF and Ray Parallel Satisfy Iterative
3.1.1 - added -q CLI options for setting Queue Size for BF
3.1.2 - refined CLI logic with combinations of -q and with -a and -p and added default -d
3.1.3 - refined -q CLI logic with fall backs to respect power of 2 inputs
3.1.4 - added -s CLI to optionally save BF results
3.1.5 - enhanced CLI logic and removed combinations of -q and with -a and -p
3.2.0 - BF JSON filename includes #bits and queue-size
3.2.1 - BF JSONs are read-in with -r via choice from list
3.2.2 - default -d rounding down queue size to next lower power of 2 of #CPUs
"""
# Print NDP-h version information
print(f'\n\nNDP-h Version {NDP_h_VERSION}')
def round_down_to_power_of_two(number):
"""
Ensure number is a power of two, rounding down if necessary.
If the number is already a power of two, it's returned as is.
Otherwise, it's rounded down to the nearest lower power of two.
"""
# Ensure the number is at least 1
number = max(1, number)
# Check if the number is already a power of two
if is_power_of_two(number):
return number
else:
return 2 ** math.floor(math.log2(number))
def is_power_of_two(n):
"""
Check if `n` is a power of two.
"""
return n > 0 and (n & (n - 1)) == 0
def convert_memory(memory_bytes):
"""
Convert memory size from bytes to appropriate units (TB, GB, MB, bytes).
"""
if memory_bytes == 'N/A':
return memory_bytes
if isinstance(memory_bytes, str):
memory_bytes = int(memory_bytes)
if memory_bytes >= 1099511627776:
return f"{memory_bytes / 1099511627776:.1f} TB"
elif memory_bytes >= 1073741824:
return f"{memory_bytes / 1073741824:.1f} GB"
elif memory_bytes >= 1048576:
return f"{memory_bytes / 1048576:.1f} MB"
else:
return f"{memory_bytes} bytes"
def initialize_and_print_cluster_resources():
"""
Initialize Ray and print cluster and node resources information.
"""
if not ray.is_initialized():
current_dir = os.path.dirname(os.path.abspath(__file__))
# Set Ray's logging level to only show errors + exclude potentially large files
ray.init(runtime_env={"working_dir": ".", "excludes": ["*.log", "__pycache__/", ".git/", "*.js", "*.json"]}, logging_level=logging.ERROR)
print("Ray initialized.\n")
cluster_resources = ray.cluster_resources()
num_cpus = int(cluster_resources.get("CPU", 1)) # Defaults to 1 if not available
num_gpus = int(cluster_resources.get("GPU", 0)) # Defaults to 0 if not available
# Convert memory resources to human-readable format
for resource, value in cluster_resources.items():
if resource.endswith('memory'):
cluster_resources[resource] = convert_memory(int(value))
# Get information about all nodes in the cluster
all_nodes = ray.nodes()
# Create a string to store cluster resources information
cluster_resources_info = "Cluster Resources:\n\n"
for resource, value in cluster_resources.items():
# Convert CPU count to integer if it's CPU resource
if resource == 'CPU':
value = int(value)
else:
value = int(value) if isinstance(value, float) else value
cluster_resources_info += f"{resource}: {value}\n"
# Iterate over each node and append its resources information to cluster_resources_info
cluster_resources_info += "\n\nNode Resources:\n\n"
for node in all_nodes:
node_id = node['NodeID']
cluster_resources_info += f"Node: {node_id}\n"
# Handle CPU resource
cpu_value = node['Resources'].get('CPU', 'N/A')
if cpu_value != 'N/A':
cpu_info = int(cpu_value)
else:
cpu_info = cpu_value
cluster_resources_info += f"CPU: {cpu_info}\n"
# Handle memory resource
memory_bytes = node['Resources'].get('memory', 'N/A')
memory_readable = convert_memory(memory_bytes) if memory_bytes != 'N/A' else 'N/A'
cluster_resources_info += f"Memory: {memory_readable}\n"
if 'GPU' in node['Resources']:
cluster_resources_info += f"GPU: {int(node['Resources']['GPU'])}\n"
else:
cluster_resources_info += "No GPUs available on this node.\n"
cluster_resources_info += "=" * 30 + "\n"
# Iterate over each node and print its resources
for node in all_nodes:
print(f"Node: {node['NodeID']}")
cpu_value = node['Resources'].get('CPU', 'N/A')
if cpu_value != 'N/A':
try:
cpu_info = int(cpu_value)
except ValueError:
cpu_info = cpu_value
else:
cpu_info = cpu_value
print(f"CPU: {cpu_info}")
# Handle memory resource
memory_bytes = node['Resources'].get('memory', 'N/A')
if memory_bytes != 'N/A':
memory_readable = convert_memory(int(memory_bytes))
else:
memory_readable = memory_bytes
print(f"Memory: {memory_readable}")
if 'GPU' in node['Resources']:
print(f"GPU: {int(node['Resources']['GPU'])}")
else:
print("No GPUs available on this node.")
print("=" * 30)
return cluster_resources, num_cpus, num_gpus, cluster_resources_info, memory_readable
def generate_problem_id(problem_data, start_timestamp, cluster_resources):
"""
Generates a unique problem ID based on problem data, cluster resources, and the start timestamp.
Parameters:
problem_data (dict): A dictionary containing problem-specific data.
cluster_resources (dict): A dictionary of the resources available in the Ray cluster.
start_timestamp (str): The start timestamp of the problem-solving session.
Returns:
str: A unique MD5 hash serving as the problem ID.
"""
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime())
problem_data.update({
"Start Timestamp": start_timestamp,
"Generation Timestamp": current_time,
"Cluster Resources": cluster_resources
})
data_str = ""
for key, value in sorted(problem_data.items()):
if isinstance(value, dict):
value_str = ",".join([f"{k}:{v}" for k, v in sorted(value.items())])
data_str += f"{key}:{value_str},"
else:
data_str += f"{key}:{value},"
problem_id = hashlib.md5(data_str.encode()).hexdigest()
return problem_id
def read_input_files(file_path, output_dir=None):
"""
Processes the input file specified by the file_path, extracting important SAT problem-related information.
Parameters:
file_path (str): The path to the input file containing the DIMACS CNF formula and other metadata.
output_dir (str, optional): The directory where output files will be saved. If not provided, uses the current directory.
Returns:
tuple: A tuple containing the DIMACS formula, input digit, variables for both inputs, number of clauses, file name, and output file names.
Raises:
ValueError: If required header information is missing or the input file is in an incorrect format.
"""
# Initialize variables to store header information
dimacs_formula = None
input_digit = None
first_input_vars = None
second_input_vars = None
num_clauses = 0 # Variable to count the number of clauses
with open(file_path, 'r') as file:
try:
file_name = os.path.basename(file_path) # Get the file name
lines = file.readlines()
output_file_name_txt = None
output_file_name_csv = None
print("\nreading input file:", file_name)
# Check if the file contains the required header information
for line in lines:
if line.startswith("c Circuit for product"):
input_digit = int(re.search(r'\d+', line).group())
elif line.startswith("c Variables for first input"):
first_input_vars = [int(x) for x in re.findall(r'\d+', line)]
elif line.startswith("c Variables for second input"):
second_input_vars = [int(x) for x in re.findall(r'\d+', line)]
elif line.startswith("p cnf"):
dimacs_formula = "".join(lines[lines.index(line):])
num_clauses = sum(1 for line in lines if not line.startswith("c") and not line.startswith("p"))
break
# Check if all required information is extracted
if dimacs_formula is not None and input_digit is not None and first_input_vars is not None and second_input_vars is not None:
break
except ValueError as e:
# Print only the error message without the traceback
print(e)
return
# Check if all required information is extracted
if dimacs_formula is None or input_digit is None or first_input_vars is None or second_input_vars is None:
print(f"\n")
raise ValueError("\n\nPaul Purdom and Amr Sabry header information missing or incorrect format in the input file.\n\nCNF Generator for Factoring Problems required.\n\nvisit https://cgi.luddy.indiana.edu/~sabry/cnf.html to generate a respective problem\nor\nvisit https://github.com/GridSAT/CNF_FACT-MULT for the source code\nor\nvisit ipfs://QmeeDsP4WnHG5F8dYazDnyUGhdnF1t8J4i4YAMgXCa6wB4 for the source code on IPFS\n\n")
print(f" input file OK\n")
return dimacs_formula, input_digit, first_input_vars, second_input_vars, num_clauses, file_name, output_file_name_txt, output_file_name_csv
"""
This section includes utility functions for processing SAT problem instances and their solutions. These functions are essential for the analysis and transformation of the problem data, aiding in the broader task of finding satisfying assignments or demonstrating unsatisfiability.
- non_zero_subarrays_recursive: Recursively navigates through a nested list structure to extract all non-zero subarrays. This function is critical for filtering out irrelevant data and focusing on meaningful results, particularly after applying resolution steps or when analyzing the final output of the SAT solving process.
- replace_elements: Matches elements between two lists based on the absolute value of their integers, replacing elements in the target lists with those found in a source array. This function is vital for reconciling the SAT solver's output with specific variable assignments, ensuring the results are correctly interpreted in the context of the original problem.
- flatten_list_of_lists: Converts a nested list structure into a flat list. This simplification is necessary for various processing stages, such as when preparing data for output or further analysis, ensuring that complex nested results are accessible and manageable.
- convert_to_binary_array_and_integer: Transforms a list of integers into a binary representation and then computes its corresponding integer value. This function is crucial for interpreting variable assignments as binary numbers, facilitating the evaluation of SAT solutions in numerical contexts, such as verifying the factors of an integer in cryptographic applications.
"""
def parse_dimacs(dimacs_formula):
# Split the formula into lines and remove empty lines
lines = [line.strip() for line in dimacs_formula.split('\n') if line.strip()]
# Extract the number of variables and clauses from the 'p cnf' line
_, _, num_vars, num_clauses = lines[0].split()
num_vars, num_clauses = int(num_vars), int(num_clauses)
# Initialize the formula array
formula = []
# Fill in the formula based on the clauses
for line in lines[1:]:
literals = [int(lit) for lit in line.split()[:-1]]
formula.append(literals)
# For all clauses which have less than 3 literals: Add zeros
for clause in formula:
clause.extend([0] * (3 - len(clause)))
return formula
def ResolutionStep(A, i):
"""
Performs the resolution step on a given formula A with respect to a chosen literal i.
This function applies resolution to simplify the formula based on the selected literal i.
It generates two sets of clauses (sub-formulas): one assuming the literal i is true (Left Array),
and the other assuming it is false (Right Array).
Parameters:
- A: The current set of clauses (formula) represented as a list of lists where each sub-list is a clause.
- i: The chosen literal for the resolution step.
Returns:
- LA: The modified set of clauses assuming literal i is true.
- RA: The modified set of clauses assuming literal i is false.
"""
LA = []
RA = []
for subarray in A:
modified_subarray = [x + i for x in subarray]
if 2*i in modified_subarray:
continue
LA.append(modified_subarray)
LA = [[x - i if x != 0 else x for x in subarray] for subarray in LA]
for subarray in A:
modified_subarray = [x - i for x in subarray]
if (-1)*2*i in modified_subarray:
continue
RA.append(modified_subarray)
RA = [[x + i if x != 0 else x for x in subarray] for subarray in RA]
return LA, RA
def choice(A):
"""
Selects a variable from the given 2D array 'A' for the next resolution step based on specific criteria.
The selection criteria are as follows:
1. If a subarray contains exactly two zeros, the function returns the absolute value of the non-zero integer.
2. If no subarray meets the first criteria, and there's a subarray with one zero and two integers,
it returns the absolute value of the first non-zero integer.
3. If neither condition is met, it returns the absolute value of the first integer in the first subarray.
Parameters:
A (list of list of int): The 2D array of integers from which to select the next variable.
Returns:
int: The absolute value of the selected integer for the next resolution step.
"""
# Check for subarray with two zeros
for subarray in A:
if subarray.count(0) == 2:
# Choose the non-zero integer in the subarray and return its absolute value
return abs(next(x for x in subarray if x != 0))
# Check for subarray with one zero and two integers
for subarray in A:
if subarray.count(0) == 1 and len(subarray) == 3:
# Choose the first occurring non-zero integer in the subarray and return its absolute value
return abs(next(x for x in subarray if x != 0))
# If no subarrays with zeros, choose the first integer in the first subarray and return its absolute value
return abs(A[0][0]) if A and A[0] else None
def Satisfy_iterative_two_arguments(A, assignment):
"""
Wrapper function that calls the Satisfy_iterative function with the given 2D array 'A'
and returns its result along with the original 'assignment'.
This function is designed to integrate Satisfy_iterative into workflows that require maintaining
the original assignment alongside the result of the satisfiability check.
Parameters:
A (list of list of int): The 2D array representing clauses in a SAT problem.
assignment (list of int): The original assignment (contextual data) that is passed through unchanged.
Returns:
tuple: A tuple where the first element is the result of Satisfy_iterative function and
the second element is the original 'assignment'.
"""
# Returns the result of Satisfy_iterative and the unchanged assignment
result = Satisfy_iterative(A)
return result, assignment
def Satisfy_iterative(A):
"""
Iteratively attempts to find a satisfying assignment for the formula A.
It uses a stack to manage branching with chosen literals and applies resolution steps
to simplify the formula. It explores both possibilities for each chosen literal (true and false)
until a satisfying assignment is found or all options are exhausted.
Parameters:
- A: The initial set of clauses (formula) to be satisfied, represented as a list of lists.
Returns:
- A list of satisfying assignments, if any, or an indication of unsatisfiability.
"""
# Iteratively finds satisfying assignments
stack = [(A, [])] # Initialize stack with the initial formula and empty choices
results = [] # Store the results
result_LA = [] # Store the left assignment
result_RA = [] # Store the right assignment
while stack: # Iterate until stack is empty
current_A, choices = stack.pop() # Pop the current formula and choices
i = choice(current_A) # Choose a variable index from the current formula
if i is None: # If no more variables are available
results.append(choices) # Append the current choices to results
continue
LA, RA = ResolutionStep(current_A, i) # Apply resolution step
if not LA or any(subarray == [0, 0, 0] for subarray in LA): # Check for unsatisfiability in left assignment
if not LA:
result_LA = choices + [i] # Set left assignment result if LA is empty
else:
result_LA = [0] # Set left assignment result to unsatisfiable
else:
stack.append((LA, choices + [i])) # Add new formula and updated choices to stack
if not RA or any(subarray == [0, 0, 0] for subarray in RA): # Check for unsatisfiability in right assignment
if not RA:
result_RA = choices + [-i] # Set right assignment result if RA is empty
else:
result_RA = [0] # Set right assignment result to unsatisfiable
else:
stack.append((RA, choices + [-i])) # Add new formula and updated choices to stack
results.append([result_LA, result_RA]) # Append left and right assignment results to results list
return results # Return the satisfying assignments
@ray.remote
def process_task_ray(task):
"""
Process a task in a Ray remote function, involving resolution steps and comparison of 2D arrays.
"""
current_A, current_choices, length = task
i = choice(current_A)
if i is None:
return None, None, length
LA, RA = ResolutionStep(current_A, i)
new_tasks = []
if not contains_zero_array(LA):
new_tasks.append((LA, current_choices + [i], len(LA)))
if not contains_zero_array(RA):
diff, equal_cond = compare_2D_arrays(LA, RA)
if not equal_cond:
new_tasks.append((RA, current_choices + [-i], len(RA)))
return new_tasks
def satisfy_breadth_first_parallel_ray(A, sizeOfCNF, iterations, num_vars, percentage=None, absolute=None, queue_size=None, default_size=False):
"""Parallel breadth-first search algorithm using Ray.
This function conducts a parallel breadth-first search using Ray for efficient processing. It iteratively explores various configurations
to satisfy given constraints or until a stopping condition is met.
Parameters:
A: Initial array configuration.
sizeOfCNF: Size of the CNF formula.
iterations: Maximum number of iterations.
num_vars: Number of variables.
percentage: Percentage of variables to consider as a stopping criterion.
absolute: Absolute number of variables to consider as a stopping criterion.
queue_size: Maximum size of the processing queue.
default_size: Flag indicating whether to use default queue size.
Returns:
queue: The final processing queue.
queue_length: Length of the final processing queue.
clause_sets_list: List of clause sets explored.
assignments_list: List of variable assignments explored.
factor: The stopping size or factor.
factor_source: Source of the stopping size (percentage, absolute, queue_size).
queue_size: The specified queue size.
iteration_count: Total number of iterations processed.
default_size: Flag indicating whether default queue size was used.
"""
cluster_resources = ray.cluster_resources()
num_cpus = int(cluster_resources.get("CPU", 1))
queue = deque([(A, [], sizeOfCNF)])
print("\nBreadth-first parallel processing initiated..\n")
print(f" CPUs: {int(num_cpus)}")
print(f" VARs: {num_vars}\n")
factor = None
factor_source = None
iterations = 0
i = 0
iteration_count = 0 # Initialize iteration_count at the start
if percentage is not None:
factor = round(percentage * num_vars / 100)
factor_source = "percentage"
# i = factor
if 0 <= percentage <= 99.00:
print(f" Stopping size: {factor} ({percentage:.2f}%)\n")
while i > 0 and queue:
task_refs = [process_task_ray.remote(task) for task in queue]
queue.clear()
# Asynchronously gather results from Ray tasks
for new_tasks in ray.get(task_refs):
if new_tasks:
queue.extend(new_tasks)
i -= 1
print(f" Remaining VARs: {i}, queue size: {len(queue)}")
else:
print("\nMax percentage value is 99% and must be positive - try again.\n")
sys.exit(1)
if absolute is not None:
factor_source = "absolute"
factor = absolute
# i = factor
print(f" Stopping size: {factor}\n")
if absolute > 0 and absolute == int(absolute) and absolute < num_vars:
while i > 0 and queue:
task_refs = [process_task_ray.remote(task) for task in queue]
queue.clear()
# Asynchronously gather results from Ray tasks
for new_tasks in ray.get(task_refs):
if new_tasks:
queue.extend(new_tasks)
i -= 1
print(f" Remaining VARs: {i}, queue size: {len(queue)}")
else:
# If absolute is not a positive whole number, has decimals, or is not less than num_vars
if absolute >= num_vars:
print(f"\n Absolute number must be smaller (num_vars) - retry.\n")
else:
print(" Value must be a positive, whole number without decimals - retry.\n")
sys.exit(1)
if queue_size is not None:
try:
factor_source = "queue_size"
print(f" Queue size: {queue_size}\n")
if queue_size > num_vars:
print(f" Queue size cannot exceed {num_vars} VARs - retry.\n")
sys.exit(1)
elif not is_power_of_two(queue_size): # Check if queue_size is NOT a power of two
print(f" Adjust queue size to power of 2 - example: {round_down_to_power_of_two(num_cpus)}\n")
sys.exit(1)
while len(queue) <= queue_size:
task_refs = [process_task_ray.remote(task) for task in queue]
queue.clear()
# Asynchronously gather results from Ray tasks
for new_tasks in ray.get(task_refs):
if new_tasks:
queue.extend(new_tasks)
iteration_count += 1
print(f" Processed VARs: {iteration_count}, queue size: {len(queue)}")
if len(queue) == queue_size:
print(f"\n Queue size limit reached ({queue_size}). Stopping..")
break
if iteration_count == num_vars -1:
print(f"\n #VARs cannot exceed {num_vars} VARs. Stopping..")
break
except ValueError:
print(f"\nQueue size must be positive number and to the power of 2 - example: {round_down_to_power_of_two(num_cpus)}\n")
sys.exit(1)
if default_size is not False:
# Set the queue_size to the next power of 2 of the number of CPUs
queue_size = round_down_to_power_of_two(num_cpus)
print(f" Next lower power of 2 of {int(num_cpus)}: CPUs\n")
print(f" Queue size: {queue_size}\n")
while len(queue) <= queue_size:
task_refs = [process_task_ray.remote(task) for task in queue]
queue.clear()
# Asynchronously gather results from Ray tasks
for new_tasks in ray.get(task_refs):
if new_tasks:
queue.extend(new_tasks)
iteration_count += 1
print(f" Processed VARs: {iteration_count}, queue size: {len(queue)}")
if len(queue) >= queue_size:
factor_source = "queue_size"
print(f"\n Queue size limit reached ({queue_size}). Stopping..")
break
if iteration_count == num_vars - 1:
print(f"\n #VARs cannot exceed {num_vars} VARs. Stopping..")
break
print(f"\n Final queue size: {len(queue)}\n")
# Extract clause sets and assignments
clause_sets_list = [task[0] for task in queue]
assignments_list = [task[1] for task in queue]
# Calculate percentage_stats_q
if queue_size == 0:
percentage_stats_q = 0
else:
percentage_stats_q = (iteration_count / num_vars) * 100
# Calculate percentage_stats_a
if absolute is not None:
percentage_stats_a = (int(absolute) / num_vars) * 100
else:
percentage_stats_a = 0 # Or any default value you prefer
return queue, len(queue), clause_sets_list, assignments_list, factor, factor_source, queue_size, iteration_count, default_size, percentage_stats_q, percentage_stats_a
def save_bf_results(clause_sets_list, assignments_list, output_file_name_json):
"""
Save breadth-first search results to a JSON file.
Parameters:
- clause_sets_list: List of clause sets explored.
- assignments_list: List of variable assignments explored.
- output_path: Path to save the results.
Raises:
- ValueError: If output_path is None.
"""
if output_file_name_json is None:
raise ValueError("\nOutput path is None. Please specify a valid path.\n")
bfs_data = {
"clauseSets": clause_sets_list,
"assignments": assignments_list
}
with open(output_file_name_json, 'w') as file:
json.dump(bfs_data, file, indent=4)
def list_available_json_files(output_dir):
"""
List available JSON files in the specified directory.
Parameters:
- directory: Directory path to search for JSON files.
Returns:
- List of available JSON file names.
"""
json_files = [f for f in os.listdir(output_dir) if f.endswith('.json')]
return json_files
def select_json_file(json_files):
"""
Prompt the user to select a JSON file from a list.
Parameters:
- json_files: List of available JSON files.
Returns:
- selected_json_file: Filename of the selected JSON file, or None if no valid selection is made.
"""
print(f"\n\n +++ satisfy parallel only +++\n")
print("\nAvailable JSON files:\n")
for i, json_file in enumerate(json_files, start=1):
print(f"{i}. {json_file}")
selection = input("\nEnter the number of the JSON file you want to load: ")
try:
selection_index = int(selection) - 1
if 0 <= selection_index < len(json_files):
return json_files[selection_index]
else:
print("\nInvalid selection. Please enter a number corresponding to a JSON file.\n")
return None
except ValueError:
print("\nInvalid input. Please enter a number.\n")
return None
def load_bf_results(output_dir, num_cores):
"""
Load breadth-first search results from a selected JSON file.
Parameters:
- directory: Directory path to search for JSON files.
- num_cores: Number of CPU cores for parallel processing.
Returns:
- clause_sets: List of clause sets loaded.
- assignments: List of variable assignments loaded.
Raises:
- FileNotFoundError: If no JSON files are found in the directory.
"""
json_files = list_available_json_files(output_dir)
if not json_files:
raise FileNotFoundError("\nNo JSON files found in the specified directory.\n")
selected_json_file = select_json_file(json_files)
if selected_json_file is None:
print("\nNo JSON file selected. Exiting.\n")
sys.exit(1)
input_path = os.path.join(output_dir, selected_json_file)
try:
with open(input_path, 'r') as file:
bfs_data = json.load(file)
print("\n loaded.")
print(f"\n\nevaluating {int(num_cores)} CPU cores for parallel processing..\n")
return bfs_data["clauseSets"], bfs_data["assignments"]
except FileNotFoundError:
print("\nSelected BF results file not found. Please run with -b first.\n")
sys.exit(1)
@ray.remote
def satisfy_parallel(clauseSet, assignment):
"""
Parallelized function using Ray.
This function takes a clause set and an initial assignment, attempts to find a
satisfying assignment for the clause set, and returns the result. It is designed
to run in parallel across multiple CPU cores using Ray, a framework for distributed computing.
Parameters:
- clauseSet: A set of clauses representing a SAT problem to solve.
- assignment: An initial assignment of values to variables for the SAT problem.
Returns:
- nonZeroResult + oldAssignment: The satisfying assignment if one is found, otherwise an empty list.
The result combines the new satisfying assignment (nonZeroResult) with the initial assignment (oldAssignment)
provided as input.
Note:
- The function prints 'processing' at the beginning to indicate the start of a task and 'completed' when
a satisfying assignment is found or if the task concludes without finding one.
- It utilizes 'Satisfy_iterative_two_arguments' to find satisfying assignments and 'non_zero_subarrays_recursive'
to filter out non-zero subarrays, which are crucial for determining the SAT problem's solution.
- This task is specifically allocated a single CPU core (`num_cpus=1`), indicating its execution in a separate
process within Ray's distributed system.
"""
print(f"processing")
task_id = ray.get_runtime_context().get_task_id()
# Compute resultAssignment and oldAssignment using Satisfy_iterative_two_arguments
resultAssignment, oldAssignment = Satisfy_iterative_two_arguments(clauseSet, assignment)
# Check if resultAssignment is not None
if resultAssignment is not None:
# Find non-zero subarrays recursively
nonZeroResult = non_zero_subarrays_recursive(resultAssignment)
# If nonZeroResult is not empty, return concatenated with oldAssignment
if nonZeroResult != []:
print(f"completed.")
return nonZeroResult + oldAssignment
return [] # Return empty list if no non-zero subarrays found
def parallel_satisfy_ray(clauseSets, assignments):
"""
Executes the SAT problem solving in parallel using Ray.
This function takes a list of clause sets and their corresponding assignments,
submits each pair as a separate task to be processed in parallel, and collects
the results. It leverages Ray's distributed computing capabilities to efficiently
manage task execution across available CPU cores.
Parameters:
- clauseSets: A list of clause sets, where each clause set represents a distinct SAT problem.
- assignments: A list of initial assignments corresponding to each clause set.
Returns:
- endResult: A list of non-null results from the SAT problem solving tasks, indicating successful computations.
- tasks_submitted: The total number of tasks submitted for execution, representing the workload size.
- tasks_completed: The number of tasks that successfully completed and returned a non-null result.
"""
# Submit each problem instance (clause set and assignment pair) to Ray for parallel processing.
# Each 'satisfy_parallel' task will attempt to find a satisfying assignment for its clause set.
futures = [satisfy_parallel.remote(clauseSet, assignment) for clauseSet, assignment in zip(clauseSets, assignments)]
# Assuming each task could use 1 CPU, the number of active workers is the number of submitted tasks
active_workers = len(futures)
print(f"parallel processing using {active_workers} workers:\n")
# Wait for the tasks to complete and collect the results
results = ray.get(futures)
endResult = [result for result in results if result]
print(f" all done.")
for result in results:
if result:
endResult.extend(result)
break
ray.shutdown() # Shutdown Ray
return endResult, len(futures)
def discardFalse(B):
"""
Filters out clause sets containing the clause [0, 0, 0] from a collection of clause sets.
The presence of [0, 0, 0] in a clause set indicates a contradiction or an unsatisfiable
clause, making the entire clause set unsatisfiable. This function removes such clause sets
to streamline the SAT solving process.
Parameters:
- B: A list of clause sets, where each clause set is a list of clauses.
Returns: