Skip to content

Commit

Permalink
Merge pull request #802 from egbertbouman/next_to_devel
Browse files Browse the repository at this point in the history
Merge back next to devel
  • Loading branch information
whirm committed Sep 1, 2014
2 parents 6eb0b9c + 312da72 commit 9307f54
Show file tree
Hide file tree
Showing 29 changed files with 463 additions and 310 deletions.
6 changes: 3 additions & 3 deletions Tribler/Core/APIImplementation/LaunchManyCore.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@ def register(self, session, sesslock):
self.ltmgr = None
self.torrent_checking = None

if not self.initComplete:
self.init()

def init(self):
if self.dispersy:
from Tribler.dispersy.community import HardKilledCommunity
Expand Down Expand Up @@ -750,9 +753,6 @@ def run(self):
if prctlimported:
prctl.set_name("Tribler" + currentThread().getName())

if not self.initComplete:
self.init()

if PROFILE:
fname = "profile-%s" % self.getName()
import cProfile
Expand Down
4 changes: 3 additions & 1 deletion Tribler/Core/CacheDB/SqliteCacheDBHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1894,9 +1894,11 @@ def on_votes_from_dispersy(self, votes):
insert_vote = "INSERT OR REPLACE INTO _ChannelVotes (channel_id, voter_id, dispersy_id, vote, time_stamp) VALUES (?,?,?,?,?)"
self._db.executemany(insert_vote, votes)

for channel_id, voter_id, _, _, _ in votes:
for channel_id, voter_id, _, vote, _ in votes:
if voter_id == None:
self.notifier.notify(NTFY_VOTECAST, NTFY_UPDATE, channel_id, voter_id == None)
if self.my_votes != None:
self.my_votes[channel_id] = vote
self._scheduleUpdateChannelVotes(channel_id)

def on_remove_votes_from_dispersy(self, votes, contains_my_vote):
Expand Down
2 changes: 1 addition & 1 deletion Tribler/Core/Libtorrent/LibtorrentDownloadImpl.py
Original file line number Diff line number Diff line change
Expand Up @@ -837,7 +837,7 @@ def network_tracker_status(self):

result = self.tracker_status.copy()
result['[DHT]'] = [dht_peers, 'Working' if session.is_dht_running() and public else 'Disabled']
result['[PeX]'] = [pex_peers, 'Working']
result['[PeX]'] = [pex_peers, 'Working' if not self.get_anon_mode() else 'Disabled']
return result

def set_state_callback(self, usercallback, getpeerlist=False, delay=0.0):
Expand Down
5 changes: 4 additions & 1 deletion Tribler/Core/Libtorrent/LibtorrentMgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,10 @@ def create_anonymous_session(self):
settings.enable_outgoing_tcp = False
settings.enable_incoming_tcp = False
settings.anonymous_mode = True
ltsession = lt.session(flags=1)
# No PEX for anonymous session
ltsession = lt.session(flags=0)
ltsession.add_extension(lt.create_ut_metadata_plugin)
ltsession.add_extension(lt.create_smart_ban_plugin)
ltsession.set_settings(settings)
ltsession.set_alert_mask(lt.alert.category_t.stats_notification |
lt.alert.category_t.error_notification |
Expand Down
2 changes: 1 addition & 1 deletion Tribler/Core/RemoteTorrentHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ def notify_possible_torrent_infohash(self, infohash, actualTorrentFileName=None)
self.scheduletask(handle_lambda)

def _handleCallback(self, key, actualTorrentFileName=None):
self._logger.debug('rtorrent: got torrent for: %s', key)
self._logger.debug('rtorrent: got torrent for: %s', (hexlify(key) if isinstance(key, basestring) else key))

if key in self.callbacks:
for usercallback in self.callbacks[key]:
Expand Down
10 changes: 6 additions & 4 deletions Tribler/Core/Session.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,15 @@ def set_and_create_dir(dirname, setter, default_dir):
scfg.set_nickname(socket.gethostname())

# SWIFTPROC
if scfg.get_swift_path() is None:
if scfg.get_swift_path() is None or not os.path.exists(scfg.get_swift_path()):
if sys.platform == "win32":
scfg.set_swift_path(os.path.join(scfg.get_install_dir(), "swift.exe"))
swift_path = os.path.join(scfg.get_install_dir(), "swift.exe")
elif is_android(strict=True):
scfg.set_swift_path(os.path.join(os.environ['ANDROID_PRIVATE'], 'swift'))
swift_path = os.path.join(os.environ['ANDROID_PRIVATE'], 'swift')
else:
scfg.set_swift_path(os.path.join(scfg.get_install_dir(), "swift"))
swift_path = os.path.join(scfg.get_install_dir(), "swift")
self._logger.info("Changing swift_path config var from '%s' to '%s'", str(scfg.get_swift_path()), swift_path)
scfg.set_swift_path(swift_path)

if GOTM2CRYPTO:
permidmod.init()
Expand Down
206 changes: 125 additions & 81 deletions Tribler/Core/Swift/SwiftProcess.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# see LICENSE.txt for license information

import sys
import time
import subprocess
import random
import binascii
Expand Down Expand Up @@ -30,7 +31,7 @@ class SwiftProcess(object):
""" Representation of an operating-system process running the C++ swift engine.
A swift engine can participate in one or more swarms."""

def __init__(self, binpath, workdir, zerostatedir, listenport, httpgwport, cmdgwport, spmgr):
def __init__(self, binpath, workdir, zerostatedir, listenport, httpgwport, cmdgwport, spmgr, extra_subprocess_flags):
self._logger = logging.getLogger(self.__class__.__name__)

# Called by any thread, assume sessionlock is held
Expand All @@ -39,6 +40,7 @@ def __init__(self, binpath, workdir, zerostatedir, listenport, httpgwport, cmdgw
self.workdir = workdir
self.zerostatedir = zerostatedir
self.spmgr = spmgr
self.extra_subprocess_flags = extra_subprocess_flags

# Main UDP listen socket
if listenport is None:
Expand All @@ -56,101 +58,143 @@ def __init__(self, binpath, workdir, zerostatedir, listenport, httpgwport, cmdgw
else:
self.httpport = httpgwport

# Security: only accept commands from localhost, enable HTTP gw,
# no stats/webUI web server
args = []
# Arno, 2012-07-09: Unicode problems with popen
args.append(self.binpath.encode(sys.getfilesystemencoding()))

# Arno, 2012-05-29: Hack. Win32 getopt code eats first arg when Windows app
# instead of CONSOLE app.
args.append("-j")
args.append("-l") # listen port
args.append("0.0.0.0:" + str(self.listenport))
args.append("-c") # command port
args.append("127.0.0.1:" + str(self.cmdport))
args.append("-g") # HTTP gateway port
args.append("127.0.0.1:" + str(self.httpport))

if zerostatedir is not None:
if sys.platform == "win32":
# Swift on Windows expects command line arguments as UTF-16.
# popen doesn't allow us to pass params in UTF-16, hence workaround.
# Format = hex encoded UTF-8
args.append("-3")
zssafe = binascii.hexlify(zerostatedir.encode("UTF-8"))
args.append(zssafe) # encoding that swift expects
else:
args.append("-e")
args.append(zerostatedir)
args.append("-T") # zero state connection timeout
args.append("180") # seconds
# args.append("-B") # Enable debugging on swift
self.popen = None
self.popen_outputthreads = None

# make swift quiet
args.append("-q")

self._logger.debug("SwiftProcess: __init__: Running %s workdir %s", args, workdir)
# callbacks for when swift detect a channel close
self._channel_close_callbacks = defaultdict(list)

if sys.platform == "win32":
creationflags = subprocess.CREATE_NEW_PROCESS_GROUP
else:
creationflags = 0

# See also SwiftDef::finalize popen
# We would really like to get the stdout and stderr without creating a new thread for them.
# However, windows does not support non-files in the select command, hence we cannot integrate
# these streams into the FastI2I thread
# A proper solution would be to switch to twisted for the communication with the swift binary
self.popen = subprocess.Popen(args, cwd=workdir, creationflags=creationflags, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

class ReadAndPrintThread(Thread):
def __init__(self, sp, name, socket):
super(ReadAndPrintThread, self).__init__(name=name)
self.sp = sp
self.socket = socket
self.last_line = ''

def run(self):
prefix = currentThread().getName() + ":"
while True:
self.last_line = self.socket.readline()
if not self.last_line:
self.sp._logger.info("%s readline returned nothing quitting", prefix)
break
self.sp._logger.debug("%s %s", prefix, self.last_line.rstrip())

def get_last_line(self):
return self.last_line

self.popen_outputthreads = [ReadAndPrintThread(self, "SwiftProcess_%d_stdout" % self.listenport, self.popen.stdout), ReadAndPrintThread(self, "SwiftProcess_%d_stderr" % self.listenport, self.popen.stderr)]
[thread.start() for thread in self.popen_outputthreads]

self.roothash2dl = {}
self.roothash2dl = dict()
self.donestate = DONE_STATE_WORKING # shutting down
self.fastconn = None
self.tunnels = {}

# callbacks for when swift detect a channel close
self._channel_close_callbacks = defaultdict(list)
self.tunnels = dict()

# Only warn once when TUNNELRECV messages are received without us having a Dispersy endpoint. This occurs after
# Dispersy shutdown
self._warn_missing_endpoint = True

def start_process(self):
with self.splock:
# Security: only accept commands from localhost, enable HTTP gw,
# no stats/webUI web server
args = list()
# Arno, 2012-07-09: Unicode problems with popen
args.append(self.binpath.encode(sys.getfilesystemencoding()))

# Arno, 2012-05-29: Hack. Win32 getopt code eats first arg when Windows app
# instead of CONSOLE app.
args.append("-j")
args.append("-l") # listen port
args.append("0.0.0.0:" + str(self.listenport))
args.append("-c") # command port
args.append("127.0.0.1:" + str(self.cmdport))
args.append("-g") # HTTP gateway port
args.append("127.0.0.1:" + str(self.httpport))

if self.zerostatedir is not None:
if sys.platform == "win32":
# Swift on Windows expects command line arguments as UTF-16.
# popen doesn't allow us to pass params in UTF-16, hence workaround.
# Format = hex encoded UTF-8
args.append("-3")
zssafe = binascii.hexlify(self.zerostatedir.encode("UTF-8"))
args.append(zssafe) # encoding that swift expects
else:
args.append("-e")
args.append(self.zerostatedir)
args.append("-T") # zero state connection timeout
args.append("180") # seconds
# args.append("-B") # Enable debugging on swift

# make swift quiet
args.append("-q")

if sys.platform == "win32":
creationflags = subprocess.CREATE_NEW_PROCESS_GROUP
else:
creationflags = 0
creationflags |= self.extra_subprocess_flags

# See also SwiftDef::finalize popen
# We would really like to get the stdout and stderr without creating a new thread for them.
# However, windows does not support non-files in the select command, hence we cannot integrate
# these streams into the FastI2I thread
# A proper solution would be to switch to twisted for the communication with the swift binary
self.popen = subprocess.Popen(args, cwd=self.workdir, creationflags=creationflags,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)

class ReadAndPrintThread(Thread):
def __init__(self, sp, name, socket):
super(ReadAndPrintThread, self).__init__(name=name)
self.sp = sp
self.socket = socket
self.last_line = ''

def run(self):
prefix = currentThread().getName() + ":"
while True:
self.last_line = self.socket.readline()
if not self.last_line:
self.sp._logger.info("%s readline returned nothing quitting", prefix)
break
self.sp._logger.debug("%s %s", prefix, self.last_line.rstrip())

def get_last_line(self):
return self.last_line

self.popen_outputthreads = [ReadAndPrintThread(self, "SwiftProcess_%d_stdout" % self.listenport, self.popen.stdout), ReadAndPrintThread(self, "SwiftProcess_%d_stderr" % self.listenport, self.popen.stderr)]
[thread.start() for thread in self.popen_outputthreads]

#
# Instance2Instance
#
def start_cmd_connection(self):
# Called by any thread, assume sessionlock is held
with self.splock:
if self.is_alive():
self.fastconn = FastI2IConnection(self.cmdport, self.i2ithread_readlinecallback, self.connection_lost)

if self.is_alive():
self.fastconn = FastI2IConnection(self.cmdport, self.i2ithread_readlinecallback, self.connection_lost)
else:
self._logger.error("sp: start_cmd_connection: Process dead? returncode %s pid %s",
self.popen.returncode, self.popen.pid)
for thread in self.popen_outputthreads:
self._logger.error("sp popenthread %s last line %s", thread.getName(), thread.get_last_line())
# restart process
self.start_process()

self.donestate = DONE_STATE_WORKING # shutting down

old_tunnels = self.tunnels
self.tunnels = dict()

else:
self._logger.error("sp: start_cmd_connection: Process dead? returncode %s pid %s", self.popen.returncode, self.popen.pid)
for thread in self.popen_outputthreads:
self._logger.error("sp popenthread %s last line %s", thread.getName(), thread.get_last_line())

# Only warn once when TUNNELRECV messages are received without us having a Dispersy endpoint. This occurs after
# Dispersy shutdown
self._warn_missing_endpoint = True

# Arno, 2011-10-13: On Linux swift is slow to start and
# allocate the cmd listen socket?!
# 2012-05-23: connection_lost() will attempt another
# connect when the first fails, so not timing dependent,
# just ensures no send_()s get lost. Executed by NetworkThread.
# 2014-06-16: Having the same issues on Windows with multiple
# swift processes. Now always sleep, no matter which
# platform we're using.
self._logger.warn("spm: Need to sleep 1 second for swift to start?! FIXME")
time.sleep(1)

self.fastconn = FastI2IConnection(self.cmdport, self.i2ithread_readlinecallback, self.connection_lost)

# start the swift downloads again
for _, swift_download in self.roothash2dl.items():
self.start_download(swift_download)


# In case swift died and we are recovering from that, reregister all
# the tunnels that existed in the previous session.
for tunnel, callback in old_tunnels.iteritems():
self._logger.info("Reregistering tunnel from crashed swfit: %s %s", tunnel, callback)
self.register_tunnel(tunnel, callback)

def i2ithread_readlinecallback(self, cmd_buffer):
if self.donestate != DONE_STATE_WORKING:
Expand Down Expand Up @@ -439,7 +483,6 @@ def network_shutdown(self):
if self.fastconn:
self.fastconn.stop()


#
# Internal methods
#
Expand Down Expand Up @@ -530,4 +573,5 @@ def get_cmdport(self):
return self.cmdport

def connection_lost(self, port):
self._logger.warn("Connection lost for port %s", port)
self.spmgr.connection_lost(port)
21 changes: 14 additions & 7 deletions Tribler/Core/Swift/SwiftProcessMgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,19 @@ def __init__(self, binpath, i2iport, dlsperproc, tunnellistenport, sesslock):

self.sps = []

self.extra_subprocess_flags = 0
if sys.platform == 'win32' and not __debug__:
import ctypes
SEM_NOGPFAULTERRORBOX = 0x0002 # from MSDN
ctypes.windll.kernel32.SetErrorMode(SEM_NOGPFAULTERRORBOX)
CREATE_NO_WINDOW = 0x08000000
self.extra_subprocess_flags = CREATE_NO_WINDOW

def get_or_create_sp(self, workdir, zerostatedir, listenport, httpgwport, cmdgwport):
""" Download needs a process """
self.sesslock.acquire()
if not self.done:
# print >>sys.stderr,"spm: get_or_create_sp"
try:
with self.sesslock:
if not self.done:
# print >>sys.stderr,"spm: get_or_create_sp"
self.clean_sps()

sp = None
Expand All @@ -54,7 +61,9 @@ def get_or_create_sp(self, workdir, zerostatedir, listenport, httpgwport, cmdgwp

if sp is None:
# Create new process
sp = SwiftProcess(self.binpath, workdir, zerostatedir, listenport, httpgwport, cmdgwport, self)
sp = SwiftProcess(self.binpath, workdir, zerostatedir, listenport, httpgwport, cmdgwport, self,
self.extra_subprocess_flags)
sp.start_process()
self._logger.debug("spm: get_or_create_sp: Creating new %s", sp.get_pid())
self.sps.append(sp)

Expand All @@ -72,8 +81,6 @@ def get_or_create_sp(self, workdir, zerostatedir, listenport, httpgwport, cmdgwp
sp.start_cmd_connection()

return sp
finally:
self.sesslock.release()

def release_sp(self, sp):
""" Download no longer needs process. Apply process-cleanup policy """
Expand Down
Loading

0 comments on commit 9307f54

Please sign in to comment.