diff --git a/runner/run_query.py b/runner/run_query.py index 5f32132..8356d95 100644 --- a/runner/run_query.py +++ b/runner/run_query.py @@ -119,15 +119,14 @@ def insert_into(query): create_as(QUERY_3c_SQL)), '4': (QUERY_4_HQL, None, None)} -# Turn a given query into a version using cached tables +# Turn a given query into a version using cached tables. Only the tables +# that we create need to be named as X_cached; we cache the existing tables +# using Shark's explicit "CACHE" query. def make_input_cached(query): - return query.replace("uservisits", "uservisits_cached") \ - .replace("rankings", "rankings_cached") \ - .replace("url_counts_partial", "url_counts_partial_cached") \ - .replace("url_counts_total", "url_counts_total_cached") \ - .replace("documents", "documents_cached") + return query.replace("url_counts_partial", "url_counts_partial_cached") \ + .replace("url_counts_total", "url_counts_total_cached") -# Turn a given query into one that creats cached tables +# Turn a given query into one that creates cached tables def make_output_cached(query): return query.replace(TMP_TABLE, TMP_TABLE_CACHED) @@ -255,11 +254,11 @@ def ssh_shark(command): slaves = map(str.strip, open(local_slaves_file).readlines()) print "Restarting standalone scheduler..." - ssh_shark("/root/spark/bin/stop-all.sh") + ssh_shark("/root/spark/sbin/stop-all.sh") ensure_spark_stopped_on_slaves(slaves) time.sleep(30) - ssh_shark("/root/spark/bin/stop-all.sh") - ssh_shark("/root/spark/bin/start-all.sh") + ssh_shark("/root/spark/sbin/stop-all.sh") + ssh_shark("/root/spark/sbin/start-all.sh") time.sleep(10) # Two modes here: Shark Mem and Shark Disk. If using Shark disk clear buffer @@ -283,15 +282,12 @@ def convert_to_cached(query): if '4' in opts.query_num: # Query 4 uses entirely different tables query_list += """ - DROP TABLE IF EXISTS documents_cached; - CREATE TABLE documents_cached AS SELECT * FROM documents; + CACHE documents; """ else: query_list += """ - DROP TABLE IF EXISTS uservisits_cached; - DROP TABLE IF EXISTS rankings_cached; - CREATE TABLE uservisits_cached AS SELECT * FROM uservisits; - CREATE TABLE rankings_cached AS SELECT * FROM rankings; + CACHE uservisits; + CACHE rankings; """ if '4' not in opts.query_num: @@ -307,7 +303,7 @@ def convert_to_cached(query): query_file.write("python /root/shark/bin/dev/clear-buffer-cache.py\n") query_file.write( - "%s -e '%s' > %s 2>&1\n" % (runner, query_list, remote_tmp_file)) + "%s -skipRddReload -e '%s' > %s 2>&1\n" % (runner, query_list, remote_tmp_file)) query_file.write( "cat %s | grep Time | grep -v INFO |grep -v MapReduce >> %s\n" % ( @@ -331,6 +327,7 @@ def convert_to_cached(query): print "Stopping Executors on Slaves....." ensure_spark_stopped_on_slaves(slaves) print "Query %s : Trial %i" % (opts.query_num, i+1) + print "Log output on master in ", remote_tmp_file ssh_shark("%s" % remote_query_file) local_results_file = os.path.join(LOCAL_TMP_DIR, "%s_results" % prefix) scp_from(opts.shark_host, opts.shark_identity_file, "root", @@ -478,7 +475,7 @@ def ensure_spark_stopped_on_slaves(slaves): while not stop: cmd = "jps | grep ExecutorBackend" ret_vals = map(lambda s: ssh_ret_code(s, "root", opts.shark_identity_file, cmd), slaves) - print ret_vals + print "ExecutorBackend stopped on slaves?", ret_vals if 0 in ret_vals: print "Spark is still running on some slaves... sleeping" time.sleep(10) @@ -512,7 +509,11 @@ def prettylist(lst): return ",".join([str(k) for k in lst]) output = StringIO() - outfile = open('results/%s_%s_%s' % (fname, opts.query_num, datetime.datetime.now()), 'w') + out_filename = 'results/%s_%s/%s' % (fname, opts.query_num, datetime.datetime.now()) + dir_name = os.path.dirname(out_filename) + if not os.path.exists(dir_name): + os.makedirs(dir_name) + outfile = open(out_filename, 'w') print >> output, "==================================" print >> output, "Results: %s" % prettylist(results)