Skip to content

Commit

Permalink
Clean up the dependency detection code
Browse files Browse the repository at this point in the history
This just extracts out some of the common logic and makes the configuration for
different systems more declarative. This should make it much easier to add
support for other package managers.

Merges #39
  • Loading branch information
sangaline authored Mar 7, 2018
2 parents 99b5c0f + b6dd7fd commit 6d26fc6
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 87 deletions.
177 changes: 91 additions & 86 deletions src/exodus_bundler/dependency_detection.py
Original file line number Diff line number Diff line change
@@ -1,105 +1,110 @@
import os
import re
import subprocess

from exodus_bundler.launchers import find_executable


def detect_dependencies(path):
# We'll go through the supported systems one by one.
dependencies = detect_arch_dependencies(path)
if dependencies:
return dependencies

dependencies = detect_debian_dependencies(path)
if dependencies:
return dependencies
class PackageManager(object):
"""Base class representing a package manager.
The class level attributes can be overwritten in derived classes to customize the behavior.
Attributes:
cache_directory (str): The location of the system's package cache.
list_command (:obj:`list` of :obj:`str`): The command and arguments to list the
dependencies of a package .
list_regex (str): A regex to extract the file path from a single line of the output of the
list command.
owner_command (:obj:`owner` of :obj:`str`): The command and arguments to determine the
package that owns a specific file.
owner_regex (str): A regex to extract the package name from the output of the owner command.
"""
cache_directory = None
list_command = None
list_regex = '(.*)'
owner_command = None
owner_regex = '(.*)'

def find_dependencies(self, path):
"""Finds a list of all of the files contained with the package containing a file."""
owner = self.find_owner(path)
if not owner:
return None

args = self.list_command + [owner]
process = subprocess.Popen(args, stdout=subprocess.PIPE)
stdout, stderr = process.communicate()
dependencies = []
for line in stdout.decode('utf-8').split('\n'):
match = re.search(self.list_regex, line.strip())
if match:
dependency_path = match.groups()[0]
if os.path.exists(dependency_path) and not os.path.isdir(dependency_path):
dependencies.append(dependency_path)

dependencies = detect_redhat_dependencies(path)
if dependencies:
return dependencies

return None


def detect_arch_dependencies(path):
cache_directory = '/var/cache/pacman'
if not (os.path.exists(cache_directory) and os.path.isdir(cache_directory)):
return None

pacman = find_executable('pacman')
if not pacman:
return None

process = subprocess.Popen([pacman, '-Qo', path], stdout=subprocess.PIPE)
stdout, stderr = process.communicate()
parts = stdout.decode('utf-8').split(' is owned by ')
if len(parts) != 2:
return None

package_name = parts[1].split(' ')[0]
process = subprocess.Popen([pacman, '-Ql', package_name], stdout=subprocess.PIPE)
stdout, stderr = process.communicate()
dependencies = []
for line in stdout.decode('utf-8').split('\n'):
prefix = '%s ' % package_name
if not line.startswith(prefix):
continue
dependency_path = line[len(prefix):]
if os.path.exists(dependency_path) and not os.path.isdir(dependency_path):
dependencies.append(dependency_path)

return dependencies


def detect_debian_dependencies(path):
def find_owner(self, path):
"""Finds the package that owns the specified file path."""
if not self.cache_exists or not self.commands_exist:
return None
args = self.owner_command + [path]
process = subprocess.Popen(args, stdout=subprocess.PIPE)
stdout, stderr = process.communicate()
output = stdout.decode('utf-8').strip()
match = re.search(self.owner_regex, output)
if match:
return match.groups()[0].strip()

@property
def cache_exists(self):
"""Whether or not the expected package cache directory exists."""
return os.path.exists(self.cache_directory) and os.path.isdir(self.cache_directory)

@property
def commands_exist(self):
"""Whether or not the list and owner package manager commands can be resolved."""
commands = {self.list_command[0], self.owner_command[0]}
return all(find_executable(command) is not None for command in commands)


class Apt(PackageManager):
cache_directory = '/var/cache/apt'
if not (os.path.exists(cache_directory) and os.path.isdir(cache_directory)):
return None
list_command = ['dpkg-query', '-L']
list_regex = '(.+)'
owner_command = ['dpkg', '-S']
owner_regex = '(.+): '

dpkg = find_executable('dpkg')
if not dpkg:
return None

process = subprocess.Popen([dpkg, '-S', path], stdout=subprocess.PIPE)
stdout, stderr = process.communicate()
parts = stdout.decode('utf-8').split(': ')
if len(parts) != 2:
return None

package_name = parts[0]
dpkg_query = find_executable('dpkg-query')
if not dpkg_query:
return None

package_name = parts[0]
process = subprocess.Popen([dpkg_query, '-L', package_name], stdout=subprocess.PIPE)
stdout, stderr = process.communicate()
dependencies = []
for dependency_path in stdout.decode('utf-8').split('\n'):
if os.path.exists(dependency_path) and not os.path.isdir(dependency_path):
dependencies.append(dependency_path)

return dependencies
class Pacman(PackageManager):
cache_directory = '/var/cache/pacman'
list_command = ['pacman', '-Ql']
list_regex = '.*\s+(\/.+)'
owner_command = ['pacman', '-Qo']
owner_regex = ' is owned by (.*)\s+.*'


def detect_redhat_dependencies(path):
class Yum(PackageManager):
cache_directory = '/var/cache/yum'
if not (os.path.exists(cache_directory) and os.path.isdir(cache_directory)):
return None
list_command = ['rpm', '-ql']
list_regex = '(.+)'
owner_command = ['rpm', '-qf']
owner_regex = '(.+)'

rpm = find_executable('rpm')
if not rpm:
return None

process = subprocess.Popen([rpm, '-qf', path], stdout=subprocess.PIPE)
stdout, stderr = process.communicate()
package_name = stdout.decode('utf-8').strip()
package_managers = [
Apt(),
Pacman(),
Yum(),
]

process = subprocess.Popen([rpm, '-ql', package_name], stdout=subprocess.PIPE)
stdout, stderr = process.communicate()
dependencies = []
for dependency_path in stdout.decode('utf-8').split('\n'):
if os.path.exists(dependency_path) and not os.path.isdir(dependency_path):
dependencies.append(dependency_path)

return dependencies
def detect_dependencies(path):
# We'll go through the supported systems one by one.
for package_manager in package_managers:
dependencies = package_manager.find_dependencies(path)
if dependencies:
return dependencies

return None
1 change: 0 additions & 1 deletion tests/test_dependency_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,5 @@ def test_detect_dependencies():
assert os.path.exists(ls), 'This test assumes that `ls` is installed on the system.'

dependencies = detect_dependencies(ls)
print(dependencies)
assert any(ls in dependency for dependency in dependencies), \
'`%s` should have been detected as a dependency for `ls`.' % ls

0 comments on commit 6d26fc6

Please sign in to comment.