Skip to content

Commit

Permalink
Add support for pulling split apks, Fixes #2271 (#2446)
Browse files Browse the repository at this point in the history
* Add support for pulling split apks from device, Fixes #2271
* Replace Quark with Behaviour analysis using quark rules
  • Loading branch information
ajinabraham authored Nov 8, 2024
1 parent e5af3a8 commit ee2cb73
Show file tree
Hide file tree
Showing 16 changed files with 3,140 additions and 261 deletions.
11 changes: 7 additions & 4 deletions mobsf/DynamicAnalyzer/views/android/dynamic_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,16 +325,19 @@ def trigger_static_analysis(request, checksum):
err = 'Cannot connect to Android Runtime'
return print_n_send_error_response(request, err)
env = Environment(identifier)
apk_file = env.get_apk(checksum, package)
if not apk_file:
scan_type = env.get_apk(checksum, package)
if not scan_type:
err = 'Failed to download APK file'
return print_n_send_error_response(request, err)
file_name = f'{package}.apk'
if scan_type == 'apks':
file_name = f'{file_name}s'
data = {
'analyzer': 'static_analyzer',
'status': 'success',
'hash': checksum,
'scan_type': 'apk',
'file_name': f'{package}.apk',
'scan_type': scan_type,
'file_name': file_name,
}
add_to_recent_scan(data)
return HttpResponseRedirect(f'/static_analyzer/{checksum}/')
Expand Down
83 changes: 62 additions & 21 deletions mobsf/DynamicAnalyzer/views/android/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import tempfile
import threading
import time
from pathlib import Path
from base64 import b64encode
from hashlib import md5

Expand Down Expand Up @@ -436,31 +437,71 @@ def get_device_packages(self):
device_packages[md5] = (pkg, apk)
return device_packages

def get_apk(self, checksum, package):
"""Download APK from device."""
try:
out_dir = os.path.join(settings.UPLD_DIR, checksum + '/')
if not os.path.exists(out_dir):
os.makedirs(out_dir)
out_file = os.path.join(out_dir, f'{checksum}.apk')
if is_file_exists(out_file):
return out_file
def download_apk_packages(self, pkg_path, out_file):
"""Download APK package(s)."""
with tempfile.TemporaryDirectory() as temp_dir:
# Download APK package(s)
# Can be single or multiple packages
out = self.adb_command([
'pm',
'path',
package], True)
out = out.decode('utf-8').rstrip()
path = out.split('package:', 1)[1].strip()
logger.info('Downloading APK')
self.adb_command([
'pull',
path,
out_file,
pkg_path.as_posix(),
temp_dir,
])
if is_file_exists(out_file):
return out_file
fmt = out.decode('utf-8').strip()
logger.info('ADB Pull Output: %s', fmt)
# Filter for APK files in the directory
apk_files = []
for f in Path(temp_dir).glob('*.apk'):
if f.is_file():
apk_files.append(f)
# Check if there is exactly one APK file
if len(apk_files) == 1:
shutil.move(apk_files[0], out_file)
return 'apk'
else:
# If there are multiple APK files, zip them
shutil.make_archive(out_file, 'zip', root_dir=temp_dir, base_dir='.')
# Rename the zip file to APK
apks_file = out_file.with_suffix('.apk')
os.rename(out_file.as_posix() + '.zip', apks_file)
return 'apks'

def get_apk_packages(self, package):
"""Get all APK packages from device."""
out = self.adb_command([
'pm',
'path',
package], True)
return out.decode('utf-8').strip()

def get_apk_parent_directory(self, package):
"""Get parent directory of APK packages."""
package_out = self.get_apk_packages(package)
package_out = package_out.split()
if ('package:' in package_out[0]
and package_out[0].endswith('.apk')):
path = package_out[0].split('package:', 1)[1].strip()
return Path(path).parent
return False

def get_apk(self, checksum, package):
"""Download APK from device."""
try:
# Do not download if already exists
out_dir = Path(settings.UPLD_DIR) / checksum
out_dir.mkdir(parents=True, exist_ok=True)
out_file = out_dir / f'{checksum}.apk'
if out_file.exists():
return 'apk'
# Get APK package parent directory
pkg_path = self.get_apk_parent_directory(package)
if pkg_path:
# Download APK package(s)
logger.info('Downloading APK')
return self.download_apk_packages(pkg_path, out_file)
except Exception:
return False
logger.exception('Failed to download APK')
return False

def system_check(self, runtime):
"""Check if /system is writable."""
Expand Down
45 changes: 45 additions & 0 deletions mobsf/MalwareAnalyzer/views/android/behaviour_analysis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# -*- coding: utf_8 -*-
"""Perform behaviour analysis."""
import logging
from pathlib import Path

from django.conf import settings

from mobsf.MobSF.utils import (
append_scan_status,
get_android_src_dir,
)
from mobsf.StaticAnalyzer.views.sast_engine import (
scan,
)

logger = logging.getLogger(__name__)


def analyze(checksum, app_dir, typ):
"""Perform behaviour analysis."""
try:
root = Path(settings.BASE_DIR) / 'MalwareAnalyzer' / 'views'
rules = root / 'android' / 'rules' / 'behaviour_rules.yaml'
app_dir = Path(app_dir)
src = get_android_src_dir(app_dir, typ)
skp = settings.SKIP_CLASS_PATH
msg = 'Android Behaviour Analysis Started'
logger.info(msg)
append_scan_status(checksum, msg)
# Behaviour Analysis
findings = scan(
checksum,
rules.as_posix(),
{'.java', '.kt'},
[src.as_posix() + '/'],
skp)
msg = 'Android Behaviour Analysis Completed'
logger.info(msg)
append_scan_status(checksum, msg)
return findings
except Exception as exp:
msg = 'Failed to perform behaviour analysis'
logger.exception(msg)
append_scan_status(checksum, msg, repr(exp))
return {}
144 changes: 0 additions & 144 deletions mobsf/MalwareAnalyzer/views/android/quark.py

This file was deleted.

Loading

0 comments on commit ee2cb73

Please sign in to comment.