From 29872ad410f218861099d61946c62419bed1a2ac Mon Sep 17 00:00:00 2001 From: gnarzilla Date: Sun, 17 Nov 2024 09:03:35 -0700 Subject: [PATCH] Proxy for API interception and credential handling --- pyproject.toml | 43 +- src/guardian/auth/handlers.py | 155 +++++++ src/guardian/cli/__init__.py | 30 +- src/guardian/cli/commands/__init__.py | 7 +- src/guardian/cli/commands/auth.py | 70 ++- src/guardian/cli/commands/deps.py | 74 ++++ src/guardian/cli/commands/docs.py | 54 +++ src/guardian/cli/commands/keys.py | 206 +++++++++ src/guardian/cli/commands/proxy.py | 63 +++ src/guardian/cli/commands/repo.py | 184 +++++++- src/guardian/core/auth.py | 18 +- src/guardian/proxy/certs.py | 68 +++ src/guardian/proxy/config/default.yml | 23 + src/guardian/proxy/launcher.py | 106 +++++ src/guardian/proxy/logging.py | 75 ++++ src/guardian/proxy/server.py | 58 +++ src/guardian/proxy/server_backup.py | 149 +++++++ src/guardian/services/alerts.py | 209 +++++++++ src/guardian/services/git.py | 183 ++++++++ src/guardian/services/key_management.py | 241 ++++++++++ src/guardian/services/key_tracking.py | 534 +++++++++++++++++++++++ src/guardian/services/migration.py | 156 +++++++ src/guardian/services/platform/base.py | 63 +++ src/guardian/services/platform/github.py | 40 ++ src/guardian/services/ssh.py | 116 ++--- src/guardian/utils/tree.py | 120 +++++ 26 files changed, 2920 insertions(+), 125 deletions(-) create mode 100644 src/guardian/auth/handlers.py create mode 100644 src/guardian/cli/commands/deps.py create mode 100644 src/guardian/cli/commands/docs.py create mode 100644 src/guardian/cli/commands/keys.py create mode 100644 src/guardian/cli/commands/proxy.py create mode 100644 src/guardian/proxy/certs.py create mode 100644 src/guardian/proxy/config/default.yml create mode 100644 src/guardian/proxy/launcher.py create mode 100644 src/guardian/proxy/logging.py create mode 100644 src/guardian/proxy/server.py create mode 100644 src/guardian/proxy/server_backup.py create mode 100644 src/guardian/services/alerts.py create mode 100644 src/guardian/services/git.py create mode 100644 src/guardian/services/key_management.py create mode 100644 src/guardian/services/key_tracking.py create mode 100644 src/guardian/services/migration.py create mode 100644 src/guardian/services/platform/base.py create mode 100644 src/guardian/services/platform/github.py create mode 100644 src/guardian/utils/tree.py diff --git a/pyproject.toml b/pyproject.toml index 6c67751..46c9502 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,37 +3,26 @@ name = "guardian" version = "1.0.0" description = "Git Authentication & Development Assistant" requires-python = ">=3.8" -dependencies = [ - "click>=8.0.0", - "rich>=13.0.0", - "keyring>=24.0.0", - "PyYAML>=6.0", -] +dependencies = [ "markdown>=3.7", "markupsafe>=3.0.2", "beautifulsoup4>=4.12.0","aiohttp>=3.9.0", "pyyaml>=6.0.2", "secretstorage>=3.3.3", "astroid>=3.3.5", "autocommand>=2.2.2", "babel>=2.16.0", "backports.tarfile>=1.2.0", "certifi>=2024.8.30", "cffi>=1.17.1", "charset-normalizer>=3.4.0", "click>=8.1.7", "colorama>=0.4.6", "coverage>=7.6.4", "cryptography>=43.0.3", "dill>=0.3.9", "ghp-import>=2.1.0", "github-cli>=1.0.0", "idna>=3.10", "importlib-metadata>=8.5.0", "inflect>=7.4.0", "iniconfig>=2.0.0", "jaraco.classes>=3.4.0", "jaraco.collections>=5.1.0", "jaraco.context>=6.0.1", "jaraco.functools>=4.1.0", "jaraco.text>=4.0.0", "jeepney>=0.8.0", "jinja2>=3.1.4", "jwt>=1.3.1", "keyring>=25.5.0", "markdown-it-py>=3.0.0", "mccabe>=0.7.0", "mdurl>=0.1.2", "mergedeep>=1.3.4", "mitmproxy>=10.0.0","mkdocs>=1.6.1", "mkdocs-get-deps>=0.2.0", "mkdocs-material>=9.5.44", "mkdocs-material-extensions>=1.3.1", "more-itertools>=10.5.0", "mypy-extensions>=1.0.0", "packaging>=24.2", "paginate>=0.5.7", "pathspec>=0.12.1", "pip>=24.2", "platformdirs>=4.3.6", "pluggy>=1.5.0", "pycparser>=2.22", "pygments>=2.18.0", "pylint>=3.3.1", "pymdown-extensions>=10.12", "pytest-cov>=6.0.0", "python-dateutil>=2.9.0.post0", "pyyaml-env-tag>=0.1", "pyyaml>=6.0.1", "regex>=2024.11.6", "requests>=2.32.3", "rich>=13.9.4", "setuptools>=75.4.0", "simplejson>=3.19.3", "six>=1.16.0", "toml>=0.10.2", "tomli>=2.1.0", "tomlkit>=0.13.2", "typeguard>=4.4.1", "typing-extensions>=4.12.2", "urllib3>=2.2.3", "watchdog>=6.0.0", "wheel>=0.45.0", "zipp>=3.21.0",] + +[build-system] +requires = [ "hatchling",] +build-backend = "hatchling.build" [project.optional-dependencies] -test = [ - "pytest>=7.0.0", - "pytest-cov>=4.0.0", - "black>=23.0.0", - "isort>=5.0.0", - "mypy>=1.0.0", -] +dev = [ "black>=24.10.0", "isort>=5.13.2", "mypy>=1.13.0", "pytest>=8.3.3",] [project.scripts] guardian = "guardian.cli:cli" -[build-system] -requires = ["hatchling"] -build-backend = "hatchling.build" - -[tool.pytest.ini_options] -testpaths = ["tests"] -python_files = ["test_*.py"] -addopts = "--cov=guardian --cov-report=xml --cov-report=term-missing" +[project.urls] +Homepage = "https://github.com/None/-home-thatch-dev-guardian" +Documentation = "https://-home-thatch-dev-guardian.readthedocs.io/" +Repository = "https://github.com/None/-home-thatch-dev-guardian.git" [tool.black] line-length = 88 -target-version = ['py38'] +target-version = [ "py38",] [tool.isort] profile = "black" @@ -45,10 +34,10 @@ warn_return_any = true warn_unused_configs = true disallow_untyped_defs = true -[project.urls] -Homepage = "https://github.com/None/-home-thatch-dev-guardian" -Documentation = "https://-home-thatch-dev-guardian.readthedocs.io/" -Repository = "https://github.com/None/-home-thatch-dev-guardian.git" +[tool.pytest.ini_options] +testpaths = [ "tests",] +python_files = [ "test_*.py",] +addopts = "--cov=guardian --cov-report=xml --cov-report=term-missing" [tool.hatch.build.targets.wheel] -packages = ["src/guardian"] +packages = [ "src/guardian",] diff --git a/src/guardian/auth/handlers.py b/src/guardian/auth/handlers.py new file mode 100644 index 0000000..cc93374 --- /dev/null +++ b/src/guardian/auth/handlers.py @@ -0,0 +1,155 @@ +# src/guardian/auth/handlers.py +from typing import Dict, Optional, Type +from dataclasses import dataclass +from datetime import datetime, timedelta +import logging +import requests +from bs4 import BeautifulSoup + +@dataclass +class AuthResult: + """Authentication result with session data""" + success: bool + session_data: Optional[Dict] = None + expires_at: Optional[datetime] = None + refresh_token: Optional[str] = None + error: Optional[str] = None + +class BaseAuthHandler: + """Base handler with common functionality""" + + def __init__(self, site_config: Dict): + self.config = site_config + self.session = requests.Session() + self.logger = logging.getLogger(self.__class__.__name__) + + async def authenticate(self) -> AuthResult: + """Main authentication method""" + raise NotImplementedError + + def _extract_form_data(self, html: str, form_id: str) -> Dict: + """Extract form fields including hidden ones""" + soup = BeautifulSoup(html, 'html.parser') + form = soup.find('form', id=form_id) + if not form: + return {} + + data = {} + for input_field in form.find_all('input'): + if 'name' in input_field.attrs: + data[input_field['name']] = input_field.get('value', '') + return data + +class OAuthHandler(BaseAuthHandler): + """Generic OAuth 2.0 handler with site-specific extensions""" + + SITE_SPECIFICS = { + 'google.com': { + 'auth_url': 'https://accounts.google.com/o/oauth2/v2/auth', + 'token_url': 'https://oauth2.googleapis.com/token', + 'extra_params': {'prompt': 'consent'}, + }, + 'github.com': { + 'auth_url': 'https://github.com/login/oauth/authorize', + 'token_url': 'https://github.com/login/oauth/access_token', + 'headers': {'Accept': 'application/json'}, + } + } + + async def authenticate(self) -> AuthResult: + site = self.config['domain'] + specifics = self.SITE_SPECIFICS.get(site, {}) + + try: + token = await self._oauth_flow( + auth_url=specifics.get('auth_url', self.config['auth_url']), + token_url=specifics.get('token_url', self.config['token_url']), + extra_params=specifics.get('extra_params', {}), + headers=specifics.get('headers', {}) + ) + + return AuthResult( + success=True, + session_data={'token': token['access_token']}, + expires_at=datetime.now() + timedelta(seconds=token['expires_in']), + refresh_token=token.get('refresh_token') + ) + + except Exception as e: + self.logger.error(f"OAuth authentication failed for {site}: {e}") + return AuthResult(success=False, error=str(e)) + +class SessionHandler(BaseAuthHandler): + """Form-based session handler with site-specific adjustments""" + + SITE_SPECIFICS = { + 'facebook.com': { + 'form_id': 'login_form', + 'success_cookies': ['c_user'], + 'extra_headers': {'User-Agent': 'Mozilla/5.0...'}, + }, + 'twitter.com': { + 'form_id': 'signin-form', + 'success_cookies': ['auth_token'], + 'csrf_token': True, + } + } + + async def authenticate(self) -> AuthResult: + site = self.config['domain'] + specifics = self.SITE_SPECIFICS.get(site, {}) + + try: + # Get login page + response = await self._get_login_page( + headers=specifics.get('extra_headers', {}) + ) + + # Extract form data + form_data = self._extract_form_data( + response.text, + specifics.get('form_id', 'login_form') + ) + + # Add credentials + form_data.update(self.config['credentials']) + + # Handle CSRF if needed + if specifics.get('csrf_token'): + form_data['csrf_token'] = self._extract_csrf_token(response.text) + + # Submit login + login_response = await self._submit_login(form_data) + + # Check success based on site-specific cookies + success_cookies = specifics.get('success_cookies', ['sessionid']) + if any(c in self.session.cookies for c in success_cookies): + return AuthResult( + success=True, + session_data={'cookies': self.session.cookies.get_dict()} + ) + + return AuthResult( + success=False, + error="Login failed - required cookies not found" + ) + + except Exception as e: + self.logger.error(f"Session authentication failed for {site}: {e}") + return AuthResult(success=False, error=str(e)) + +class HandlerRegistry: + """Registry of authentication handlers""" + + _handlers: Dict[str, Type[BaseAuthHandler]] = { + 'oauth': OAuthHandler, + 'session': SessionHandler + } + + @classmethod + def get_handler(cls, auth_type: str, site_config: Dict) -> BaseAuthHandler: + """Get appropriate handler for site""" + handler_class = cls._handlers.get(auth_type) + if not handler_class: + raise ValueError(f"Unsupported auth type: {auth_type}") + return handler_class(site_config) diff --git a/src/guardian/cli/__init__.py b/src/guardian/cli/__init__.py index 9c352d9..769e8d5 100644 --- a/src/guardian/cli/__init__.py +++ b/src/guardian/cli/__init__.py @@ -1,7 +1,6 @@ # src/guardian/cli/__init__.py """ Guardian CLI Entry Point - This module initializes the Guardian CLI application and registers all available commands. It provides the main CLI group and context management. """ @@ -27,6 +26,7 @@ def __init__(self): self.config = ConfigService() self.repo = RepoService() self.security = SecurityService() + self.cli = cli @click.group() @click.version_option( @@ -36,25 +36,21 @@ def __init__(self): ) @click.pass_context def cli(ctx): - """Guardian: Git Authentication & Development Assistant - - A comprehensive tool for managing Git authentication, security, - and development workflows. - """ - # Initialize context with our services + """Guardian: Git Authentication & Development Assistant""" ctx.obj = Context() -# Import command groups +# Import all command groups from guardian.cli.commands import ( auth, config, format_cmd, hooks, init, + docs, + deps, + proxy ) - -# Add repo separately until it's fully implemented -from guardian.cli.commands.repo import repo +from guardian.cli.commands.repo import repo # Import repo separately # Define available commands with descriptions COMMANDS = [ @@ -63,15 +59,19 @@ def cli(ctx): (hooks, "Git hooks management"), (format_cmd, "Code formatting"), (init, "Initialize Guardian"), - (repo, "Repository management"), + (docs, "Documentation generation"), + (deps, "Dependencies management"), + (proxy, "Proxy server management") ] -# Register each command +# Register core commands for command, description in COMMANDS: - # Set the help text if not already set - if not command.help and description: + if not getattr(command, 'help', None) and description: command.help = description cli.add_command(command) +# Register repo command separately +cli.add_command(repo) + if __name__ == "__main__": cli() diff --git a/src/guardian/cli/commands/__init__.py b/src/guardian/cli/commands/__init__.py index f59ef7f..f1f8e6a 100644 --- a/src/guardian/cli/commands/__init__.py +++ b/src/guardian/cli/commands/__init__.py @@ -5,12 +5,14 @@ This module exports all available CLI commands. Add new commands here after implementing them in their respective modules. """ - from guardian.cli.commands.auth import auth from guardian.cli.commands.config import config from guardian.cli.commands.format import format_cmd from guardian.cli.commands.hooks import hooks from guardian.cli.commands.init import init +from guardian.cli.commands.docs import docs # New! +from guardian.cli.commands.deps import deps # New! +from guardian.cli.commands.proxy import proxy __all__ = [ "auth", @@ -18,6 +20,9 @@ "format_cmd", "hooks", "init", + "docs", # New! + "deps", + "proxy" ] # Note: repo command is imported separately in cli/__init__.py until fully implemented diff --git a/src/guardian/cli/commands/auth.py b/src/guardian/cli/commands/auth.py index 0cba89e..db61900 100644 --- a/src/guardian/cli/commands/auth.py +++ b/src/guardian/cli/commands/auth.py @@ -211,6 +211,7 @@ def validate_github(ctx): console.print("1. Run: guardian auth setup-github") console.print("2. Visit: https://github.com/settings/tokens") + @auth.command() @click.pass_context def list(ctx): @@ -220,7 +221,13 @@ def list(ctx): if ssh_result.success and ssh_result.data.get('keys'): console.print("\n[bold]SSH Keys:[/bold]") for key in ssh_result.data['keys']: - console.print(f" • {key}") + console.print(f"\n• Type: {key['type']}") + console.print(f" Path: {key['path']}") + console.print(Panel( + key['content'], + title="Public Key Content", + expand=False + )) else: console.print("\n[dim]No SSH keys found[/dim]") @@ -324,3 +331,64 @@ def setup_signing(ctx, name, email): console.print("\nTo remove incomplete setup:") console.print(" guardian config unset user.signingkey") console.print(" guardian config unset commit.gpgsign") + +@auth.command() +@click.option('--token', help='GitLab Personal Access Token') +@click.pass_context +def setup_gitlab(ctx, token): + """Configure GitLab Personal Access Token""" + if not token: + console.print(Panel( + Markdown(""" + # Creating a GitLab Personal Access Token (PAT) + + 1. Visit [GitLab Token Settings](https://gitlab.com/-/profile/personal_access_tokens) + 2. Create a token with scopes: + - `api` (API access) + - `read_repository` (Read repository) + - `write_repository` (Write repository) + 3. Copy the generated token + + [Create new token now →](https://gitlab.com/-/profile/personal_access_tokens/new) + """), + title="GitLab Token Instructions" + )) + + token = click.prompt('Enter your GitLab Personal Access Token', + hide_input=True, confirmation_prompt=True) + + result = ctx.obj.auth.setup_git_token(token, name='gitlab') + if result.success: + console.print(f"[green]✓ {result.message}[/green]") + else: + console.print(f"[red]✗ {result.message}[/red]") + +@auth.command() +@click.option('--token', help='Bitbucket App Password') +@click.pass_context +def setup_bitbucket(ctx, token): + """Configure Bitbucket App Password""" + if not token: + console.print(Panel( + Markdown(""" + # Creating a Bitbucket App Password + + 1. Visit [Bitbucket App Passwords](https://bitbucket.org/account/settings/app-passwords/) + 2. Create a password with permissions: + - Repository: Read, Write + - Pull requests: Read, Write + 3. Copy the generated password + + [Create new app password →](https://bitbucket.org/account/settings/app-passwords/new) + """), + title="Bitbucket Token Instructions" + )) + + token = click.prompt('Enter your Bitbucket App Password', + hide_input=True, confirmation_prompt=True) + + result = ctx.obj.auth.setup_git_token(token, name='bitbucket') + if result.success: + console.print(f"[green]✓ {result.message}[/green]") + else: + console.print(f"[red]✗ {result.message}[/red]") diff --git a/src/guardian/cli/commands/deps.py b/src/guardian/cli/commands/deps.py new file mode 100644 index 0000000..20ff584 --- /dev/null +++ b/src/guardian/cli/commands/deps.py @@ -0,0 +1,74 @@ +# src/guardian/cli/commands/deps.py + +import click +import subprocess +import toml +from rich.console import Console +from rich.panel import Panel +from rich.markdown import Markdown +from guardian.services.status import StatusChecker +from guardian.core.auth import AuthService + +console = Console() + + +@click.group() +def deps(): + """Dependencies management commands""" + pass + +@deps.command() +@click.option('--update-toml', is_flag=True, help='Update pyproject.toml') +def sync(update_toml): + """Sync dependencies with requirements/pyproject.toml""" + try: + import pkg_resources + import toml + from packaging.requirements import Requirement + + # Get installed packages + installed = {pkg.key: pkg for pkg in pkg_resources.working_set} + + # Parse existing pyproject.toml + with open('pyproject.toml') as f: + project_data = toml.load(f) + + # Organize dependencies + deps = {} + dev_deps = {} + + for pkg_name, pkg in installed.items(): + if pkg_name == 'guardian': # Skip our own package + continue + + req = f"{pkg.key}>={pkg.version}" + + # Determine if it's a dev dependency + if pkg_name in ['pytest', 'black', 'mypy', 'isort']: + dev_deps[pkg_name] = req + else: + deps[pkg_name] = req + + if update_toml: + # Update pyproject.toml + project_data['project']['dependencies'] = list(deps.values()) + project_data['project']['optional-dependencies'] = { + 'dev': list(dev_deps.values()) + } + + with open('pyproject.toml', 'w') as f: + toml.dump(project_data, f) + + console.print("[green]✓[/green] Updated pyproject.toml") + + # Show current dependencies + console.print("\n[bold]Dependencies:[/bold]") + for name, req in deps.items(): + console.print(f" • {req}") + + console.print("\n[bold]Development Dependencies:[/bold]") + for name, req in dev_deps.items(): + console.print(f" • {req}") + + except Exception as e: + console.print(f"[red]✗[/red] Failed to sync dependencies: {str(e)}") diff --git a/src/guardian/cli/commands/docs.py b/src/guardian/cli/commands/docs.py new file mode 100644 index 0000000..dfba035 --- /dev/null +++ b/src/guardian/cli/commands/docs.py @@ -0,0 +1,54 @@ +# src/guardian/cli/commands/docs.py +import click +from rich.console import Console +from rich.markdown import Markdown +from rich.tree import Tree +from pathlib import Path +from guardian.utils.tree import CommandTreeGenerator, ProjectTreeGenerator + +console = Console() + +@click.group() +def docs(): + """Documentation management commands""" + pass + +@docs.command() +@click.option('--format', type=click.Choice(['markdown', 'tree']), + default='tree', help='Output format') +@click.option('--output', type=click.Path(), help='Output file for markdown') +@click.pass_context +def commands(ctx, format, output): + """Generate command tree documentation""" + generator = CommandTreeGenerator(ctx.obj.cli) + + if format == 'tree': + generator.print_tree() + else: + content = generator.generate_markdown() + if output: + Path(output).write_text(content) + console.print(f"[green]✓[/green] Command tree written to {output}") + else: + console.print(Markdown(content)) + +@docs.command() +@click.argument('path', type=click.Path(exists=True), default='.') +@click.option('--format', type=click.Choice(['markdown', 'tree']), + default='tree', help='Output format') +@click.option('--output', type=click.Path(), help='Output file for markdown') +@click.option('--ignore', multiple=True, help='Patterns to ignore') +def project(path, format, output, ignore): + """Generate project structure documentation""" + generator = ProjectTreeGenerator(Path(path)) + ignore_patterns = list(ignore) if ignore else None + + if format == 'tree': + generator.print_tree(ignore_patterns) + else: + content = generator.generate_markdown(ignore_patterns) + if output: + Path(output).write_text(content) + console.print(f"[green]✓[/green] Project tree written to {output}") + else: + console.print(Markdown(content)) diff --git a/src/guardian/cli/commands/keys.py b/src/guardian/cli/commands/keys.py new file mode 100644 index 0000000..0c4cb2a --- /dev/null +++ b/src/guardian/cli/commands/keys.py @@ -0,0 +1,206 @@ +# src/guardian/cli/commands/keys.py +import click +import json +from rich.console import Console +from rich.table import Table +from rich.panel import Panel +from pathlib import Path +from guardian.services.key_management import KeyManager +from guardian.services.key_tracking import KeyTracker +from guardian.services.alerts import KeyAlertSystem +console = Console() + +@click.group() +def keys(): + """SSH key management commands""" + pass + +@keys.command() +@click.argument('key_path', type=click.Path(exists=True)) +@click.pass_context +def health(ctx, key_path): + """Check health of SSH key""" + manager = KeyManager() + result = manager.check_key_health(Path(key_path)) + + if result.success: + health = result.data['health'] + + console.print(Panel( + "\n".join([ + f"Age: [cyan]{health['age_days']}[/cyan] days", + f"Algorithm: [cyan]{health['algorithm']}[/cyan]", + f"Key Size: [cyan]{health['key_size']}[/cyan] bits", + f"Permissions OK: {'[green]Yes[/green]' if health['permissions_ok'] else '[red]No[/red]'}", + "", + *([f"[yellow]Recommendations:[/yellow]"] + + [f"• {r}" for r in health['recommendations']] + if health['recommendations'] else + ["[green]No issues found[/green]"]) + ]), + title="Key Health Check" + )) + else: + console.print(f"[red]✗[/red] {result.message}") + +@keys.command() +@click.option('--email', prompt='Email for new key') +@click.option('--no-backup', is_flag=True, help='Skip backup of existing keys') +@click.pass_context +def rotate(ctx, email, no_backup): + """Rotate SSH keys""" + if not no_backup: + console.print("[yellow]Will backup existing keys before rotation[/yellow]") + + if click.confirm("Continue with key rotation?"): + manager = KeyManager() + result = manager.rotate_keys(email, backup=not no_backup) + + if result.success: + console.print(f"[green]✓[/green] {result.message}") + if result.data.get('backup'): + console.print(f"Backup created at: {result.data['backup']['backup_path']}") + console.print(f"New key generated at: {result.data['new_key']}") + else: + console.print(f"[red]✗[/red] {result.message}") + +@keys.command() +@click.option('--password', prompt=True, hide_input=True, + confirmation_prompt=True, help='Password for encryption') +@click.pass_context +def backup(ctx, password): + """Create encrypted backup of all keys""" + manager = KeyManager() + result = manager.create_recovery_bundle(password) + + if result.success: + console.print(f"[green]✓[/green] {result.message}") + console.print(f"Bundle created at: {result.data['bundle_path']}") + console.print("\n[yellow]Store this bundle and password securely![/yellow]") + else: + console.print(f"[red]✗[/red] {result.message}") + +@keys.command() +@click.argument('key_path', type=click.Path(exists=True)) +@click.pass_context +def track(ctx, key_path): + """Start tracking key usage""" + tracker = KeyTracker() + result = tracker.register_key(Path(key_path)) + + if result.success: + console.print(f"[green]✓[/green] {result.message}") + console.print(f"Key ID: {result.data['key_id']}") + else: + console.print(f"[red]✗[/red] {result.message}") + +@keys.command() +@click.argument('key_path', type=click.Path(exists=True)) +@click.pass_context +def usage(ctx, key_path): + """Show key usage statistics""" + tracker = KeyTracker() + result = tracker.get_key_usage(Path(key_path)) + + if result.success: + usage = result.data['usage'] + + console.print(Panel( + "\n".join([ + f"Last Used: [cyan]{usage['last_used']}[/cyan]", + f"Total Uses: [cyan]{usage['usage_count']}[/cyan]", + f"Success Rate: [cyan]{usage['success_rate']*100:.1f}%[/cyan]", + "", + "[bold]Recent Hosts:[/bold]", + *[f"• {host}" for host in usage['hosts']], + "", + "[bold]Platforms:[/bold]", + *[f"• {platform}" for platform in usage['platforms']] + ]), + title="Key Usage Statistics" + )) + + # Check for patterns + pattern_result = tracker.analyze_usage_patterns(Path(key_path)) + if pattern_result.success and pattern_result.data['patterns']['unusual_activity']: + console.print("\n[yellow]Unusual Activity Detected:[/yellow]") + for activity in pattern_result.data['patterns']['unusual_activity']: + console.print(f"• {activity}") + else: + console.print(f"[red]✗[/red] {result.message}") +# src/guardian/cli/commands/keys.py (add these commands) + +@keys.command() +@click.pass_context +def alerts(ctx): + """Show recent security alerts""" + alert_system = KeyAlertSystem() + alert_history = Path(alert_system.alert_history) + + if not alert_history.exists(): + console.print("No alerts recorded") + return + + try: + alerts = json.loads(alert_history.read_text()) + + if not alerts: + console.print("No alerts recorded") + return + + # Group alerts by level + grouped = {'critical': [], 'warning': [], 'info': []} + for alert in alerts: + grouped[alert['level']].append(alert) + + # Show alerts by severity + for level in ['critical', 'warning', 'info']: + if grouped[level]: + console.print(f"\n[bold]{level.upper()} Alerts:[/bold]") + for alert in grouped[level]: + console.print(Panel( + "\n".join([ + f"Time: {alert['timestamp']}", + f"Message: {alert['message']}", + "", + "[bold]Details:[/bold]", + *[f"• {k}: {v}" for k, v in alert['details'].items()], + "", + "[bold]Recommendations:[/bold]", + *[f"• {r}" for r in alert['recommendations']] + ]), + style="red" if level == 'critical' else + "yellow" if level == 'warning' else "blue" + )) + except Exception as e: + console.print(f"[red]Error reading alerts: {e}[/red]") + +@keys.command() +@click.option('--level', + type=click.Choice(['critical', 'warning', 'info', 'all']), + default='all', + help='Alert level to clear') +@click.pass_context +def clear_alerts(ctx, level): + """Clear stored alerts""" + if not click.confirm("Are you sure you want to clear alerts?"): + return + + alert_system = KeyAlertSystem() + alert_history = Path(alert_system.alert_history) + + if not alert_history.exists(): + console.print("No alerts to clear") + return + + try: + if level == 'all': + alert_history.write_text('[]') + console.print("[green]✓[/green] All alerts cleared") + else: + alerts = json.loads(alert_history.read_text()) + alerts = [a for a in alerts if a['level'] != level] + alert_history.write_text(json.dumps(alerts, indent=2)) + console.print(f"[green]✓[/green] {level.title()} alerts cleared") + except Exception as e: + console.print(f"[red]Error clearing alerts: {e}[/red]") diff --git a/src/guardian/cli/commands/proxy.py b/src/guardian/cli/commands/proxy.py new file mode 100644 index 0000000..085be06 --- /dev/null +++ b/src/guardian/cli/commands/proxy.py @@ -0,0 +1,63 @@ +# src/guardian/cli/commands/proxy.py +import click +from rich.console import Console +from guardian.proxy.launcher import ProxyLauncher, load_config +from guardian.proxy.certs import CertificateHelper +from pathlib import Path + +console = Console() + +@click.group() +def proxy(): + """Proxy server management""" + pass + +@proxy.command() +@click.option('--web/--no-web', default=False, help='Start with web interface') +@click.option('--setup-cert', is_flag=True, help='Setup certificates before starting') +@click.pass_context +def start(ctx, web, setup_cert): + """Start the authentication proxy""" + try: + config = load_config() + cert_dir = Path(config['proxy']['cert_path']).expanduser() + + if setup_cert: + helper = CertificateHelper(cert_dir) + console.print(helper.get_browser_instructions()) + if click.confirm("Would you like to install system certificate?"): + if helper.install_system_cert(): + console.print("[green]✓[/green] System certificate installed") + else: + console.print("[yellow]![/yellow] Failed to install system certificate") + if not click.confirm("Continue anyway?"): + return + + launcher = ProxyLauncher(config) + + console.print(f"\nStarting Guardian proxy...") + console.print(f"Configure your browser/system to use proxy: " + f"{config['proxy']['host']}:{config['proxy']['port']}") + + if web: + console.print(f"Web interface available at: " + f"http://{config['proxy']['host']}:8081") + + # Start the proxy + import asyncio + asyncio.run(launcher.start(web_interface=web)) + + except Exception as e: + console.print(f"[red]Failed to start proxy: {str(e)}[/red]") + +@proxy.command() +def cert(): + """Show certificate installation instructions""" + try: + config = load_config() + cert_dir = Path(config['proxy']['cert_path']).expanduser() + helper = CertificateHelper(cert_dir) + console.print(helper.get_browser_instructions()) + + except Exception as e: + console.print(f"[red]Error: {str(e)}[/red]") diff --git a/src/guardian/cli/commands/repo.py b/src/guardian/cli/commands/repo.py index 88758da..0c288fa 100644 --- a/src/guardian/cli/commands/repo.py +++ b/src/guardian/cli/commands/repo.py @@ -3,8 +3,7 @@ from rich.console import Console from rich.panel import Panel from pathlib import Path -import subprocess -from typing import Optional +from guardian.services.git import GitService console = Console() @@ -195,3 +194,184 @@ def apply_sync(ctx): except Exception as e: console.print(f"[red]✗[/red] Failed to apply configuration: {str(e)}") + +@repo.command() +@click.argument('path', type=click.Path(exists=True)) +@click.option('--remote', default='origin', help='Remote name') +@click.option('--branch', help='Branch to push') +@click.pass_context +def push(ctx, path, remote, branch): + """Push changes to remote repository""" + try: + path = Path(path) + if not (path / '.git').exists(): + console.print("[red]✗[/red] Not a git repository") + return + + cmd = ['git', 'push', remote] + if branch: + cmd.append(branch) + + # Check auth before pushing + auth_status = ctx.obj.auth.check_auth_status() + if not auth_status.success: + console.print("[yellow]![/yellow] Authentication issues detected:") + for issue in auth_status.data['issues']: + console.print(f" • {issue}") + if not click.confirm("Continue anyway?"): + return + + result = subprocess.run(cmd, cwd=path, capture_output=True, text=True) + if result.returncode == 0: + console.print("[green]✓[/green] Push successful") + else: + console.print(f"[red]✗[/red] Push failed: {result.stderr}") + + except Exception as e: + console.print(f"[red]✗[/red] Error: {str(e)}") + +@repo.command() +@click.argument('url') +@click.option('--path', type=click.Path(), help='Local path to clone into') +@click.pass_context +def pull(ctx, url, path): + """Pull from remote repository""" + try: + if path: + path = Path(path) + else: + path = Path('.') + + cmd = ['git', 'pull'] + if url: + cmd.extend(['origin', url]) + + result = subprocess.run(cmd, cwd=path, capture_output=True, text=True) + if result.returncode == 0: + console.print("[green]✓[/green] Pull successful") + else: + console.print(f"[red]✗[/red] Pull failed: {result.stderr}") + + except Exception as e: + console.print(f"[red]✗[/red] Error: {str(e)}") + +# Update the status command to handle all platforms +@repo.command() +@click.pass_context +def status(ctx): + """Check repository status and remote connection""" + git_service = GitService() + + # Check current branch + branch_result = git_service.get_current_branch() + if branch_result.success: + console.print(Panel( + f"Current branch: [green]{branch_result.data['branch']}[/green]", + title="Local Status" + )) + + # Check remote connection + remote_result = git_service.check_remote() + if remote_result.success: + platform = remote_result.data['platform'] + # Get appropriate token + token = ctx.obj.auth.keyring.get_credential(f'{platform}_token_default') + if token: + # Verify repository + verify_result = git_service.verify_repo( + platform, + token, + remote_result.data['owner'], + remote_result.data['repo'] + ) + + if verify_result.success: + console.print(Panel( + "\n".join([ + f"Platform: [cyan]{platform.title()}[/cyan]", + f"Owner: [cyan]{remote_result.data['owner']}[/cyan]", + f"Repository: [cyan]{remote_result.data['repo']}[/cyan]", + f"Default branch: [cyan]{verify_result.data['default_branch']}[/cyan]", + f"Private: [cyan]{verify_result.data['private']}[/cyan]", + "", + "[bold]Permissions:[/bold]", + *[f"• {k}: [green]Yes[/green]" if v else f"• {k}: [red]No[/red]" + for k, v in verify_result.data['permissions'].items()] + ]), + title=f"{platform.title()} Status" + )) + + # Check if on default branch + if branch_result.data['branch'] != verify_result.data['default_branch']: + console.print(f"[yellow]Note: You are not on the default branch ({verify_result.data['default_branch']})[/yellow]") + else: + console.print(f"[yellow]{verify_result.message}[/yellow]") + else: + console.print(f"[yellow]No {platform.title()} token configured[/yellow]") + console.print(f"Run: guardian auth setup-{platform}") + else: + console.print("[yellow]Not connected to a remote repository[/yellow]") + else: + console.print("[red]Not a git repository[/red]") + +@repo.group() +def migrate(): + """Repository migration commands""" + pass + +@migrate.command() +@click.argument('source_repo') +@click.argument('target_platform') +@click.option('--target-repo', help='Target repository name (default: same as source)') +@click.pass_context +def plan(ctx, source_repo, target_platform, target_repo): + """Plan repository migration""" + git_service = GitService() + + # Get source platform + remote_result = git_service.check_remote(Path(source_repo)) + if not remote_result.success: + console.print("[red]✗[/red] Could not determine source platform") + return + + source_platform = remote_result.data['platform'] + target_repo = target_repo or remote_result.data['repo'] + + # Create migration planner + try: + source = create_platform(source_platform, ctx.obj.auth) + target = create_platform(target_platform, ctx.obj.auth) + + migration = PlatformMigration(source, target) + plan = migration.create_migration_plan(source_repo, target_repo) + + # Show migration plan + console.print(Panel( + "\n".join([ + f"Source: [cyan]{plan.source_platform}[/cyan] ({plan.source_repo})", + f"Target: [cyan]{plan.target_platform}[/cyan] ({plan.target_repo})", + "", + "[bold]Items to migrate:[/bold]", + *[f"• {item}: {'[green]Yes[/green]' if enabled else '[red]No[/red]'}" + for item, enabled in plan.items.items()], + "", + f"Estimated time: [yellow]{plan.estimated_time}[/yellow] minutes" + ]), + title="Migration Plan" + )) + + except Exception as e: + console.print(f"[red]✗[/red] Failed to create migration plan: {str(e)}") + +@migrate.command() +@click.argument('source_repo') +@click.argument('target_platform') +@click.option('--target-repo', help='Target repository name') +@click.option('--skip', multiple=True, + type=click.Choice(['issues', 'pull_requests', 'wiki', 'actions']), + help='Items to skip during migration') +@click.pass_context +def execute(ctx, source_repo, target_platform, target_repo, skip): + """Execute repository migration""" + # Similar to plan, but executes the migration + pass diff --git a/src/guardian/core/auth.py b/src/guardian/core/auth.py index 300f134..da607f0 100644 --- a/src/guardian/core/auth.py +++ b/src/guardian/core/auth.py @@ -121,14 +121,15 @@ def validate_github_token(self, name: str = "default") -> Result: def list_tokens(self) -> Result: """List stored GitHub tokens""" try: + # Get auth configuration + auth_config = self.config.get('auth', {}) + github_tokens = auth_config.get('github_tokens', []) + tokens = [] - config_tokens = self.config.get('auth', {}.get('github_tokens'), []) - - for name in config_tokens: - key = f"github_token_{name}" - if self.keyring.get_credential(key): - tokens.append(name) - + for token_name in github_tokens: + if self.keyring.get_credential(f"github_token_{token_name}"): + tokens.append(token_name) + return self.create_result( True, f"Found {len(tokens)} GitHub tokens", @@ -142,3 +143,6 @@ def list_tokens(self) -> Result: error=e ) + def list_ssh_keys(self) -> Result: + """List SSH keys""" + return self.ssh.list_ssh_keys() diff --git a/src/guardian/proxy/certs.py b/src/guardian/proxy/certs.py new file mode 100644 index 0000000..c8ee0a1 --- /dev/null +++ b/src/guardian/proxy/certs.py @@ -0,0 +1,68 @@ +# src/guardian/proxy/certs.py +import os +import shutil +from pathlib import Path +import platform +import subprocess +from typing import Optional + +class CertificateHelper: + """Helper for managing proxy certificates""" + + def __init__(self, cert_dir: Path): + self.cert_dir = cert_dir + self.cert_dir.mkdir(parents=True, exist_ok=True) + + def install_system_cert(self) -> bool: + """Install certificate at system level""" + system = platform.system().lower() + + if system == 'linux': + return self._install_linux() + elif system == 'darwin': + return self._install_macos() + elif system == 'windows': + return self._install_windows() + else: + raise NotImplementedError(f"System {system} not supported") + + def _install_linux(self) -> bool: + """Install certificate on Linux""" + try: + cert_file = self.cert_dir / 'mitmproxy-ca.pem' + system_cert_dir = Path('/usr/local/share/ca-certificates') + + # Copy certificate + shutil.copy(cert_file, system_cert_dir / 'guardian-proxy.crt') + + # Update certificates + subprocess.run(['sudo', 'update-ca-certificates'], check=True) + return True + + except Exception as e: + print(f"Failed to install system certificate: {e}") + return False + + def get_browser_instructions(self) -> str: + """Get browser-specific installation instructions""" + return """ +Certificate Installation Instructions: + +Chrome/Chromium: +1. Go to Settings → Privacy and security → Security +2. Click on "Manage certificates" +3. Go to "Authorities" tab +4. Click "Import" and select the certificate file +5. Check "Trust this certificate for identifying websites" + +Firefox: +1. Go to Preferences → Privacy & Security +2. Scroll down to Certificates +3. Click "View Certificates" +4. Go to "Authorities" tab +5. Click "Import" and select the certificate file +6. Check "Trust this CA to identify websites" + +Certificate Location: +{} +""".format(self.cert_dir / 'mitmproxy-ca.pem') diff --git a/src/guardian/proxy/config/default.yml b/src/guardian/proxy/config/default.yml new file mode 100644 index 0000000..5410249 --- /dev/null +++ b/src/guardian/proxy/config/default.yml @@ -0,0 +1,23 @@ +# src/guardian/proxy/config/default.yml +proxy: + host: "127.0.0.1" + port: 8080 + cert_path: "~/.guardian/certs" + +auth: + session_timeout: 3600 + refresh_before: 300 + + sites: + "facebook.com": + auth_type: "session" + login_url: "https://www.facebook.com/login" + success_indicators: + - cookie: "c_user" + + "github.com": + auth_type: "oauth" + auth_url: "https://github.com/login/oauth/authorize" + token_url: "https://github.com/login/oauth/access_token" + success_indicators: + - header: "Authorization" diff --git a/src/guardian/proxy/launcher.py b/src/guardian/proxy/launcher.py new file mode 100644 index 0000000..0949ba8 --- /dev/null +++ b/src/guardian/proxy/launcher.py @@ -0,0 +1,106 @@ +# src/guardian/proxy/launcher.py +import os +import yaml +from pathlib import Path +import asyncio +from mitmproxy.options import Options +from mitmproxy.tools.dump import DumpMaster +from mitmproxy.tools.web.master import WebMaster +from rich.console import Console +from rich.panel import Panel +from pathlib import Path +from .server import GuardianAuthProxy +import logging + +console = Console() +logger = logging.getLogger(__name__) + +def show_startup_message(host: str, port: int, web: bool = False): + """Show clean startup message""" + console.print(Panel( + "\n".join([ + f"[green]Guardian Proxy Starting[/green]", + "", + "[bold]Proxy Configuration:[/bold]", + f"Host: {host}", + f"Port: {port}", + "", + "[bold]Browser Setup:[/bold]", + "1. Configure proxy settings:", + f" • Host: {host}", + f" • Port: {port}", + "2. Visit any website to test connection", + "", + *([ + "[bold]Web Interface:[/bold]", + f"http://{host}:8081" + ] if web else []) + ]), + title="Guardian Proxy", + border_style="blue" + )) + + +def load_config() -> dict: + """Load proxy configuration""" + config_path = Path(__file__).parent / "config" / "default.yml" + with open(config_path) as f: + return yaml.safe_load(f) + +class ProxyLauncher: + def __init__(self, config: dict): + self.config = config + self.proxy = None + + def setup_certificates(self): + """Setup MITM certificates""" + cert_path = Path(self.config['proxy']['cert_path']).expanduser() + cert_path.mkdir(parents=True, exist_ok=True) + + if not (cert_path / "mitmproxy-ca.pem").exists(): + logger.info("Generating new certificates...") + # mitmproxy will generate certs automatically + + return str(cert_path) + + async def start(self, web_interface: bool = False): + """Start the proxy server""" + try: + opts = Options( + listen_host=self.config['proxy']['host'], + listen_port=self.config['proxy']['port'], + confdir=self.setup_certificates() + ) + + # Create master + master_class = WebMaster if web_interface else DumpMaster + self.proxy = master_class(opts) + + # Add our authentication addon + guardian = GuardianAuthProxy(self.config) + self.proxy.addons.add(guardian) + + logger.info(f"Starting proxy on {opts.listen_host}:{opts.listen_port}") + await self.proxy.run() + + except Exception as e: + logger.error(f"Failed to start proxy: {e}") + raise + + async def stop(self): + """Stop the proxy server""" + if self.proxy: + await self.proxy.shutdown() + +def main(): + """Main entry point for the proxy""" + logging.basicConfig(level=logging.INFO) + + config = load_config() + launcher = ProxyLauncher(config) + + try: + asyncio.run(launcher.start()) + except KeyboardInterrupt: + logger.info("Shutting down proxy...") + asyncio.run(launcher.stop()) diff --git a/src/guardian/proxy/logging.py b/src/guardian/proxy/logging.py new file mode 100644 index 0000000..763bbac --- /dev/null +++ b/src/guardian/proxy/logging.py @@ -0,0 +1,75 @@ +# src/guardian/proxy/logging.py +import logging +from rich.console import Console +from rich.logging import RichHandler +from typing import Optional +from urllib.parse import urlparse, parse_qs + +console = Console() + +class GuardianLogger: + """Custom logger for Guardian proxy""" + + def __init__(self, name: str = "guardian.proxy"): + self.logger = logging.getLogger(name) + self._setup_logger() + + def _setup_logger(self): + """Configure logging format and handlers""" + # Remove existing handlers + self.logger.handlers.clear() + + # Create rich handler + handler = RichHandler( + console=console, + show_time=False, + show_path=False, + rich_tracebacks=True, + tracebacks_show_locals=False + ) + + # Set format + handler.setFormatter(logging.Formatter("%(message)s")) + + self.logger.addHandler(handler) + self.logger.setLevel(logging.INFO) + + def _format_url(self, url: str, max_length: int = 100) -> str: + """Format URL for display""" + parsed = urlparse(url) + + # Hide long query parameters + if parsed.query: + query_params = parse_qs(parsed.query) + short_query = '&'.join(f"{k}=..." for k in query_params.keys()) + url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}?{short_query}" + + if len(url) > max_length: + return f"{url[:max_length-3]}..." + return url + + def request(self, method: str, url: str, status: Optional[int] = None): + """Log request""" + formatted_url = self._format_url(url) + if status: + self.logger.info(f"[cyan]{method}[/cyan] {formatted_url} → [green]{status}[/green]") + else: + self.logger.info(f"[cyan]{method}[/cyan] {formatted_url}") + + def response(self, status: int, url: str): + """Log response""" + formatted_url = self._format_url(url) + color = "green" if 200 <= status < 300 else "yellow" if status < 400 else "red" + self.logger.info(f"[{color}]{status}[/{color}] {formatted_url}") + + def auth(self, message: str): + """Log authentication events""" + self.logger.info(f"[magenta]Auth:[/magenta] {message}") + + def error(self, message: str): + """Log errors""" + self.logger.error(f"[red]Error:[/red] {message}") + + def info(self, message: str): + """Log general information""" + self.logger.info(message) diff --git a/src/guardian/proxy/server.py b/src/guardian/proxy/server.py new file mode 100644 index 0000000..c6b61ff --- /dev/null +++ b/src/guardian/proxy/server.py @@ -0,0 +1,58 @@ +# src/guardian/proxy/server.py +from mitmproxy import ctx, http +from typing import Optional +from .logging import GuardianLogger + +class GuardianAuthProxy: + """Guardian authentication proxy addon for mitmproxy""" + + def __init__(self, config: Optional[dict] = None): + self.config = config or {} + self.logger = GuardianLogger() + + def request(self, flow: http.HTTPFlow) -> None: + """Handle incoming requests""" + try: + # Handle direct requests to proxy + if flow.request.pretty_host == "127.0.0.1" and \ + str(flow.request.port) in ["8080", "8081"]: + flow.response = http.Response.make( + 200, + b"Guardian Proxy Running", + {"Content-Type": "text/plain"} + ) + return + + # Log request + self.logger.request( + flow.request.method, + flow.request.url + ) + + # Check for auth requirements + domain = flow.request.pretty_host + if self.needs_auth(domain): + self.handle_auth(flow, domain) + + except Exception as e: + self.logger.error(str(e)) + + def response(self, flow: http.HTTPFlow) -> None: + """Handle responses""" + try: + # Log response + self.logger.request( + flow.request.method, + flow.request.url, + flow.response.status_code + ) + except Exception as e: + self.logger.error(str(e)) + + def needs_auth(self, domain: str) -> bool: + """Check if domain requires authentication""" + return domain in self.config.get('auth', {}).get('sites', {}) + + def handle_auth(self, flow: http.HTTPFlow, domain: str) -> None: + """Handle authentication for domain""" + self.logger.auth(f"Authentication needed for {domain}") diff --git a/src/guardian/proxy/server_backup.py b/src/guardian/proxy/server_backup.py new file mode 100644 index 0000000..87ab9f8 --- /dev/null +++ b/src/guardian/proxy/server_backup.py @@ -0,0 +1,149 @@ +# src/guardian/proxy/server.py +from mitmproxy import ctx, http +from mitmproxy.http import HTTPFlow +import sqlite3 +import jwt +from datetime import datetime +from typing import Dict, Optional +from pathlib import Path +import logging + +class GuardianAuthProxy: + """Guardian authentication proxy addon for mitmproxy""" + + def __init__(self, config: Optional[Dict] = None): + self.config = config or {} + self.db_path = Path.home() / '.guardian' / 'proxy.db' + self.db_path.parent.mkdir(parents=True, exist_ok=True) + self.db = sqlite3.connect(str(self.db_path)) + self.sessions: Dict[str, Dict] = {} + self.logger = logging.getLogger(__name__) + self.setup_logging() + self._init_db() + + def _init_db(self): + """Initialize SQLite database""" + with self.db: + self.db.execute(""" + CREATE TABLE IF NOT EXISTS sessions ( + domain TEXT PRIMARY KEY, + session_data TEXT NOT NULL, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + expires_at DATETIME, + last_used DATETIME + ) + """) + + def setup_logging(self): + """Setup logging configuration""" + formatter = logging.Formatter( + '%(asctime)s - %(name)s - %(levelname)s - %(message)s' + ) + handler = logging.StreamHandler() + handler.setFormatter(formatter) + self.logger.addHandler(handler) + self.logger.setLevel(logging.INFO) + + def request(self, flow: http.HTTPFlow) -> None: + """Handle incoming requests""" + try: + # Handle direct requests to proxy + if flow.request.pretty_host == "127.0.0.1" and \ + str(flow.request.port) in ["8080", "8081"]: + flow.response = http.Response.make( + 200, + b"Guardian Proxy Running", + {"Content-Type": "text/plain"} + ) + return + + # Log the request + self.logger.info(f"Request: {flow.request.method} {flow.request.url}") + + # Continue with normal proxy behavior for other requests + domain = flow.request.pretty_host + if self.needs_auth(domain): + self.handle_auth(flow, domain) + + except Exception as e: + self.logger.error(f"Error handling request: {e}") + + def response(self, flow: http.HTTPFlow) -> None: + """Handle responses""" + try: + # Log the response + self.logger.info( + f"Response: {flow.response.status_code} {flow.request.url}" + ) + except Exception as e: + self.logger.error(f"Error handling response: {e}") + + def needs_auth(self, domain: str) -> bool: + """Check if domain requires authentication""" + return domain in self.config.get('auth', {}).get('sites', {}) + + def get_valid_session(self, domain: str) -> Optional[Dict]: + """Get valid session for domain if exists""" + try: + cur = self.db.execute( + """ + SELECT session_data, expires_at + FROM sessions + WHERE domain = ? + AND (expires_at IS NULL OR expires_at > CURRENT_TIMESTAMP) + """, + (domain,) + ) + row = cur.fetchone() + + if row: + import json + session_data = json.loads(row[0]) + self.db.execute( + "UPDATE sessions SET last_used = CURRENT_TIMESTAMP WHERE domain = ?", + (domain,) + ) + return session_data + + return None + + except Exception as e: + self.logger.error(f"Error getting session: {e}") + return None + + def handle_auth(self, flow: HTTPFlow, domain: str) -> None: + """Handle authentication for domain""" + try: + site_config = self.config['auth']['sites'].get(domain) + if not site_config: + return + + # Log authentication attempt + self.logger.info(f"Authentication needed for {domain}") + + except Exception as e: + self.logger.error(f"Error in auth handling: {e}") + + def inject_session(self, flow: HTTPFlow, session: Dict) -> None: + """Inject session data into request""" + try: + if 'cookies' in session: + # Session cookie-based auth + cookie_header = '; '.join( + f"{k}={v}" for k, v in session['cookies'].items() + ) + flow.request.headers['Cookie'] = cookie_header + + elif 'token' in session: + # Token-based auth + token_type = session.get('token_type', 'Bearer') + flow.request.headers['Authorization'] = \ + f"{token_type} {session['token']}" + + except Exception as e: + self.logger.error(f"Error injecting session: {e}") + + def done(self): + """Clean up when proxy stops""" + if hasattr(self, 'db'): + self.db.close() diff --git a/src/guardian/services/alerts.py b/src/guardian/services/alerts.py new file mode 100644 index 0000000..af11de7 --- /dev/null +++ b/src/guardian/services/alerts.py @@ -0,0 +1,209 @@ +# src/guardian/services/alerts.py +from dataclasses import dataclass +from datetime import datetime, timedelta +from typing import List, Dict, Optional +from pathlib import Path +import json +from guardian.core import Service, Result + +@dataclass +class Alert: + """Security alert information""" + level: str # 'info', 'warning', 'critical' + message: str + timestamp: datetime + details: Dict + recommendations: List[str] + +class KeyAlertSystem(Service): + """Alert system for SSH key usage""" + + def __init__(self): + super().__init__() + self.alert_history = self.config_dir / 'alerts.json' + self._init_alert_store() + + def _init_alert_store(self): + """Initialize alert storage""" + if not self.alert_history.exists(): + self.alert_history.write_text('[]') + + def check_key_usage(self, key_path: Path, usage_data: Dict) -> Result: + """Check for suspicious key usage patterns""" + alerts = [] + + # Check usage timing + if timing_alert := self._check_timing(usage_data): + alerts.append(timing_alert) + + # Check access patterns + if pattern_alert := self._check_patterns(usage_data): + alerts.append(pattern_alert) + + # Check locations + if location_alert := self._check_locations(usage_data): + alerts.append(location_alert) + + # Store alerts if any found + if alerts: + self._store_alerts(alerts) + return self.create_result( + True, + "Security alerts detected", + {'alerts': [a.__dict__ for a in alerts]} + ) + + return self.create_result( + True, + "No security concerns detected" + ) + + def _check_timing(self, usage_data: Dict) -> Optional[Alert]: + """Check for suspicious timing patterns""" + current_hour = datetime.now().hour + + # Night time usage (configurable) + if 0 <= current_hour <= 5: + return Alert( + level='warning', + message="Key usage during unusual hours", + timestamp=datetime.now(), + details={ + 'hour': current_hour, + 'normal_hours': '6:00-23:00' + }, + recommendations=[ + "Verify if this access was intended", + "Consider restricting key usage hours if unexpected" + ] + ) + + # Rapid successive uses + recent_uses = [ + u for u in usage_data.get('recent_uses', []) + if datetime.now() - u['timestamp'] < timedelta(minutes=5) + ] + if len(recent_uses) > 5: # More than 5 uses in 5 minutes + return Alert( + level='critical', + message="Unusually rapid key usage detected", + timestamp=datetime.now(), + details={ + 'uses_count': len(recent_uses), + 'timeframe': '5 minutes' + }, + recommendations=[ + "Check for automated processes", + "Verify no unauthorized access attempts", + "Consider implementing rate limiting" + ] + ) + + return None + + def _check_patterns(self, usage_data: Dict) -> Optional[Alert]: + """Check for suspicious usage patterns""" + # Check for failed attempts + recent_failures = [ + u for u in usage_data.get('recent_uses', []) + if not u.get('success', True) + ] + if len(recent_failures) >= 3: # 3 or more failures + return Alert( + level='critical', + message="Multiple authentication failures detected", + timestamp=datetime.now(), + details={ + 'failure_count': len(recent_failures), + 'hosts': [f['host'] for f in recent_failures] + }, + recommendations=[ + "Check for potential brute force attempts", + "Verify key permissions", + "Consider temporarily disabling access if suspicious" + ] + ) + + return None + + def _check_locations(self, usage_data: Dict) -> Optional[Alert]: + """Check for suspicious access locations""" + known_hosts = set(usage_data.get('known_hosts', [])) + current_host = usage_data.get('current_host') + + if current_host and current_host not in known_hosts: + return Alert( + level='warning', + message="Access from new location detected", + timestamp=datetime.now(), + details={ + 'new_host': current_host, + 'known_hosts': list(known_hosts) + }, + recommendations=[ + "Verify if this new access point is legitimate", + "Add to known hosts if authorized", + "Update access controls if unauthorized" + ] + ) + + return None + + def _store_alerts(self, alerts: List[Alert]): + """Store alerts in history""" + try: + existing = json.loads(self.alert_history.read_text()) + except: + existing = [] + + # Add new alerts + existing.extend([ + { + 'level': alert.level, + 'message': alert.message, + 'timestamp': alert.timestamp.isoformat(), + 'details': alert.details, + 'recommendations': alert.recommendations + } + for alert in alerts + ]) + + # Keep last 100 alerts + existing = existing[-100:] + self.alert_history.write_text(json.dumps(existing, indent=2)) + +class AlertNotifier(Service): + """Handle alert notifications""" + + def notify(self, alert: Alert) -> Result: + """Send alert notification""" + # Terminal notification + self.console.print(Panel( + "\n".join([ + f"[bold red]Security Alert ({alert.level.upper()})[/bold red]", + f"Message: {alert.message}", + f"Time: {alert.timestamp}", + "", + "[bold]Details:[/bold]", + *[f"• {k}: {v}" for k, v in alert.details.items()], + "", + "[bold]Recommendations:[/bold]", + *[f"• {r}" for r in alert.recommendations] + ]), + title="Guardian Security Alert", + style="red" + )) + + # System notification (if available) + try: + import notify2 + notify2.init('Guardian') + notification = notify2.Notification( + "Guardian Security Alert", + f"{alert.level.upper()}: {alert.message}" + ) + notification.show() + except ImportError: + pass + + return self.create_result(True, "Alert notification sent") diff --git a/src/guardian/services/git.py b/src/guardian/services/git.py new file mode 100644 index 0000000..e090b76 --- /dev/null +++ b/src/guardian/services/git.py @@ -0,0 +1,183 @@ +# src/guardian/services/git.py +import subprocess +from pathlib import Path +from typing import Optional, Dict, Literal +import re +import requests +from guardian.core import Service, Result + +PlatformType = Literal['github', 'gitlab', 'bitbucket'] + +class GitService(Service): + """Git operations and status checking""" + + PLATFORM_PATTERNS = { + 'github': ( + r'(?:https://github\.com/|git@github\.com:)([^/]+)/([^/.]+)(?:\.git)?' + ), + 'gitlab': ( + r'(?:https://gitlab\.com/|git@gitlab\.com:)([^/]+)/([^/.]+)(?:\.git)?' + ), + 'bitbucket': ( + r'(?:https://bitbucket\.org/|git@bitbucket\.org:)([^/]+)/([^/.]+)(?:\.git)?' + ) + } + + API_URLS = { + 'github': 'https://api.github.com', + 'gitlab': 'https://gitlab.com/api/v4', + 'bitbucket': 'https://api.bitbucket.org/2.0' + } + + def detect_platform(self, remote_url: str) -> Optional[tuple[str, str, str]]: + """Detect git platform and extract owner/repo""" + for platform, pattern in self.PLATFORM_PATTERNS.items(): + match = re.match(pattern, remote_url) + if match: + return platform, match.group(1), match.group(2) + return None + + def get_current_branch(self, path: Path = Path('.')) -> Result: + """Get current branch name""" + try: + result = subprocess.run( + ['git', 'rev-parse', '--abbrev-ref', 'HEAD'], + cwd=path, + capture_output=True, + text=True, + check=True + ) + return self.create_result( + True, + "Branch found", + {'branch': result.stdout.strip()} + ) + except subprocess.CalledProcessError: + return self.create_result( + False, + "Not a git repository or no branch found" + ) + + def check_remote(self, path: Path = Path('.')) -> Result: + """Check remote repository details""" + try: + remote_url = subprocess.run( + ['git', 'remote', 'get-url', 'origin'], + cwd=path, + capture_output=True, + text=True, + check=True + ).stdout.strip() + + platform_info = self.detect_platform(remote_url) + if platform_info: + platform, owner, repo = platform_info + return self.create_result( + True, + f"{platform.title()} repository found", + { + 'platform': platform, + 'owner': owner, + 'repo': repo, + 'url': remote_url + } + ) + else: + return self.create_result( + False, + "Unknown repository platform" + ) + + except subprocess.CalledProcessError: + return self.create_result( + False, + "No remote origin found" + ) + + def verify_repo(self, platform: str, token: str, owner: str, repo: str) -> Result: + """Verify repository exists and user has access""" + try: + if platform == 'github': + headers = { + 'Authorization': f'Bearer {token}', + 'Accept': 'application/vnd.github.v3+json' + } + url = f"{self.API_URLS[platform]}/repos/{owner}/{repo}" + + elif platform == 'gitlab': + headers = { + 'Authorization': f'Bearer {token}', + } + url = f"{self.API_URLS[platform]}/projects/{owner}%2F{repo}" + + elif platform == 'bitbucket': + # Bitbucket uses Basic Auth with app passwords + headers = { + 'Authorization': f'Bearer {token}', + } + url = f"{self.API_URLS[platform]}/repositories/{owner}/{repo}" + + else: + return self.create_result( + False, + f"Unsupported platform: {platform}" + ) + + response = requests.get(url, headers=headers) + + if response.status_code == 200: + data = response.json() + + # Platform-specific data parsing + if platform == 'github': + return self.create_result( + True, + "Repository verified", + { + 'default_branch': data['default_branch'], + 'permissions': data['permissions'], + 'private': data['private'] + } + ) + + elif platform == 'gitlab': + return self.create_result( + True, + "Repository verified", + { + 'default_branch': data['default_branch'], + 'permissions': { + 'admin': data['permissions']['project_access']['access_level'] >= 40, + 'push': data['permissions']['project_access']['access_level'] >= 30, + 'pull': data['permissions']['project_access']['access_level'] >= 20, + }, + 'private': not data['public'] + } + ) + + elif platform == 'bitbucket': + return self.create_result( + True, + "Repository verified", + { + 'default_branch': data['mainbranch']['name'], + 'permissions': { + 'admin': 'admin' in data['privileges'], + 'push': 'write' in data['privileges'], + 'pull': 'read' in data['privileges'], + }, + 'private': not data['is_private'] + } + ) + else: + return self.create_result( + False, + f"Repository not found or no access ({response.status_code})" + ) + + except Exception as e: + return self.create_result( + False, + f"Failed to verify repository: {str(e)}", + error=e + ) diff --git a/src/guardian/services/key_management.py b/src/guardian/services/key_management.py new file mode 100644 index 0000000..14b7a78 --- /dev/null +++ b/src/guardian/services/key_management.py @@ -0,0 +1,241 @@ +# src/guardian/services/key_management.py +from dataclasses import dataclass +from datetime import datetime, timedelta +from pathlib import Path +from typing import List, Dict, Optional +import subprocess +import json +import shutil +from guardian.core import Service, Result + +@dataclass +class KeyHealth: + """Key health status information""" + age_days: int + algorithm: str + key_size: int + last_used: Optional[datetime] + permissions_ok: bool + recommendations: List[str] + +class KeyManager(Service): + """Advanced key management functionality""" + + WEAK_ALGORITHMS = ['rsa1024', 'dsa', 'ecdsa-sha1'] + MAX_KEY_AGE_DAYS = 180 # 6 months + + def __init__(self): + super().__init__() + self.backup_dir = self.config_dir / 'key_backups' + self.backup_dir.mkdir(parents=True, exist_ok=True) + self.recovery_dir = self.config_dir / 'recovery' + self.recovery_dir.mkdir(parents=True, exist_ok=True) + + def check_key_health(self, key_path: Path) -> Result: + """Check health of an SSH key""" + try: + if not key_path.exists(): + return self.create_result( + False, + f"Key not found: {key_path}" + ) + + recommendations = [] + + # Check key age + age_days = (datetime.now() - datetime.fromtimestamp(key_path.stat().st_mtime)).days + if age_days > self.MAX_KEY_AGE_DAYS: + recommendations.append(f"Key is {age_days} days old. Consider rotation.") + + # Check algorithm and size + key_info = self._get_key_info(key_path) + if key_info['algorithm'] in self.WEAK_ALGORITHMS: + recommendations.append(f"Using weak algorithm: {key_info['algorithm']}") + if key_info['algorithm'] == 'rsa' and key_info['size'] < 3072: + recommendations.append("RSA key size should be at least 3072 bits") + + # Check permissions + permissions = oct(key_path.stat().st_mode)[-3:] + permissions_ok = permissions == '600' if key_path.name.endswith('.pub') else permissions == '644' + if not permissions_ok: + recommendations.append(f"Incorrect permissions: {permissions}") + + # Get last used time (if available) + last_used = self._get_last_used(key_path) + + health = KeyHealth( + age_days=age_days, + algorithm=key_info['algorithm'], + key_size=key_info['size'], + last_used=last_used, + permissions_ok=permissions_ok, + recommendations=recommendations + ) + + return self.create_result( + True, + "Key health check complete", + {'health': health.__dict__} + ) + + except Exception as e: + return self.create_result( + False, + f"Failed to check key health: {str(e)}", + error=e + ) + + def rotate_keys(self, email: str, backup: bool = True) -> Result: + """Rotate SSH keys with backup""" + try: + # Backup existing keys if requested + if backup: + backup_result = self._backup_current_keys() + if not backup_result.success: + return backup_result + + # Generate new keys + from guardian.services.ssh import SSHManager + ssh = SSHManager() + result = ssh.generate_key(email, force=True) + + if not result.success: + return result + + # Verify new keys + verify_result = self._verify_new_keys(result.data['key_path']) + if not verify_result.success: + # Rollback if verification fails + if backup: + self._restore_from_backup(backup_result.data['backup_path']) + return verify_result + + return self.create_result( + True, + "Keys rotated successfully", + { + 'new_key': result.data['key_path'], + 'backup': backup_result.data if backup else None + } + ) + + except Exception as e: + return self.create_result( + False, + f"Failed to rotate keys: {str(e)}", + error=e + ) + + def create_recovery_bundle(self, password: str) -> Result: + """Create encrypted recovery bundle""" + try: + timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') + bundle_dir = self.recovery_dir / timestamp + bundle_dir.mkdir(parents=True) + + # Collect keys and configs + self._collect_recovery_items(bundle_dir) + + # Create recovery instructions + self._create_recovery_instructions(bundle_dir) + + # Encrypt the bundle + bundle_file = self.recovery_dir / f"recovery_{timestamp}.tar.gz.gpg" + self._encrypt_bundle(bundle_dir, bundle_file, password) + + # Cleanup + shutil.rmtree(bundle_dir) + + return self.create_result( + True, + "Recovery bundle created successfully", + {'bundle_path': str(bundle_file)} + ) + + except Exception as e: + return self.create_result( + False, + f"Failed to create recovery bundle: {str(e)}", + error=e + ) + + def _get_key_info(self, key_path: Path) -> Dict: + """Get key algorithm and size""" + result = subprocess.run( + ['ssh-keygen', '-l', '-f', str(key_path)], + capture_output=True, + text=True + ) + # Parse output like: "3072 SHA256:... user@host (RSA)" + parts = result.stdout.split() + return { + 'size': int(parts[0]), + 'algorithm': parts[-1].strip('()').lower() + } + + def _get_last_used(self, key_path: Path) -> Optional[datetime]: + """Get last usage time of key""" + # This would need integration with system logs + # For now, return None + return None + + def _backup_current_keys(self) -> Result: + """Backup current SSH keys""" + timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') + backup_path = self.backup_dir / timestamp + backup_path.mkdir(parents=True) + + try: + ssh_dir = Path.home() / '.ssh' + if not ssh_dir.exists(): + return self.create_result( + False, + "No SSH directory found" + ) + + # Copy all key files + for file in ssh_dir.glob('id_*'): + shutil.copy2(file, backup_path) + + return self.create_result( + True, + "Keys backed up successfully", + {'backup_path': str(backup_path)} + ) + + except Exception as e: + return self.create_result( + False, + f"Failed to backup keys: {str(e)}", + error=e + ) + + def _verify_new_keys(self, key_path: Path) -> Result: + """Verify newly generated keys""" + try: + # Test key permissions + if key_path.stat().st_mode & 0o777 != 0o600: + return self.create_result( + False, + "Incorrect key permissions" + ) + + # Test key validity + result = subprocess.run( + ['ssh-keygen', '-l', '-f', str(key_path)], + capture_output=True + ) + if result.returncode != 0: + return self.create_result( + False, + "Invalid key format" + ) + + return self.create_result(True, "Keys verified successfully") + + except Exception as e: + return self.create_result( + False, + f"Failed to verify keys: {str(e)}", + error=e + ) diff --git a/src/guardian/services/key_tracking.py b/src/guardian/services/key_tracking.py new file mode 100644 index 0000000..31706d3 --- /dev/null +++ b/src/guardian/services/key_tracking.py @@ -0,0 +1,534 @@ +# src/guardian/services/key_tracking.py +import sqlite3 +import subprocess +from dataclasses import dataclass +from datetime import datetime, timedelta +import sqlite3 +from pathlib import Path +import json +from typing import List, Dict, Optional, Tuple +from guardian.core import Service, Result + +@dataclass +class KeyUsage: + """Key usage information""" + key_id: str + last_used: datetime + usage_count: int + hosts: List[str] + platforms: List[str] + success_rate: float + +@dataclass +class UsagePattern: + """Key usage pattern information""" + common_times: List[str] + common_hosts: List[str] + usage_frequency: Dict[str, int] # day -> count + average_daily_uses: float + unusual_patterns: List[Dict] + +class KeyTracker(Service): + def __init__(self): + super().__init__() + self.db_path = self.config_dir / 'keytracking.db' + self._init_db() + + def _init_db(self): + """Initialize SQLite database with all necessary tables""" + with sqlite3.connect(self.db_path) as conn: + # Basic key and usage tracking + conn.execute(""" + CREATE TABLE IF NOT EXISTS keys ( + key_id TEXT PRIMARY KEY, + path TEXT NOT NULL, + created_at DATETIME NOT NULL, + last_used DATETIME, + total_uses INTEGER DEFAULT 0, + algorithm TEXT, + key_size INTEGER + ) + """) + + conn.execute(""" + CREATE TABLE IF NOT EXISTS key_usage ( + id INTEGER PRIMARY KEY, + key_id TEXT NOT NULL, + timestamp DATETIME NOT NULL, + host TEXT NOT NULL, + platform TEXT, + success BOOLEAN NOT NULL, + details TEXT, + FOREIGN KEY (key_id) REFERENCES keys(key_id) + ) + """) + + # Alert tracking + conn.execute(""" + CREATE TABLE IF NOT EXISTS alerts ( + id INTEGER PRIMARY KEY, + key_id TEXT NOT NULL, + level TEXT NOT NULL, + message TEXT NOT NULL, + timestamp DATETIME NOT NULL, + details TEXT, + recommendations TEXT, + acknowledged BOOLEAN DEFAULT FALSE, + FOREIGN KEY (key_id) REFERENCES keys(key_id) + ) + """) + + # Known hosts tracking + conn.execute(""" + CREATE TABLE IF NOT EXISTS known_hosts ( + key_id TEXT NOT NULL, + host TEXT NOT NULL, + first_seen DATETIME NOT NULL, + last_seen DATETIME NOT NULL, + access_count INTEGER DEFAULT 1, + PRIMARY KEY (key_id, host), + FOREIGN KEY (key_id) REFERENCES keys(key_id) + ) + """) + + def _generate_key_id(self, key_path: Path) -> str: + """Generate unique ID for a key""" + import hashlib + + # Use public key fingerprint as ID + result = subprocess.run( + ['ssh-keygen', '-l', '-f', str(key_path)], + capture_output=True, + text=True + ) + + fingerprint = result.stdout.split()[1] + return fingerprint + + def register_key(self, path: Path) -> Result: + """Register a key for tracking""" + try: + # Generate unique key ID from public key + key_id = self._generate_key_id(path) + + with sqlite3.connect(self.db_path) as conn: + conn.execute(""" + INSERT OR REPLACE INTO keys + (key_id, path, created_at) + VALUES (?, ?, ?) + """, (key_id, str(path), datetime.now())) + + return self.create_result( + True, + "Key registered for tracking", + {'key_id': key_id} + ) + except Exception as e: + return self.create_result( + False, + f"Failed to register key: {str(e)}", + error=e + ) + + def analyze_usage_patterns(self, key_path: Path, + days: int = 30) -> Result: + """Analyze key usage patterns""" + try: + key_id = self._generate_key_id(key_path) + + with sqlite3.connect(self.db_path) as conn: + conn.row_factory = sqlite3.Row + + # Get usage data for specified period + usage_data = conn.execute(""" + SELECT + strftime('%H', timestamp) as hour, + strftime('%w', timestamp) as day_of_week, + host, + success + FROM key_usage + WHERE key_id = ? + AND timestamp > datetime('now', ?) + """, (key_id, f'-{days} days')).fetchall() + + if not usage_data: + return self.create_result( + False, + "No usage data found for analysis" + ) + + # Analyze timing patterns + hour_counts = {} + day_counts = {} + host_counts = {} + + for usage in usage_data: + # Count by hour + hour = int(usage['hour']) + hour_counts[hour] = hour_counts.get(hour, 0) + 1 + + # Count by day + day = int(usage['day_of_week']) + day_counts[day] = day_counts.get(day, 0) + 1 + + # Count by host + host = usage['host'] + host_counts[host] = host_counts.get(host, 0) + 1 + + # Find common patterns + common_hours = [ + hour for hour, count in hour_counts.items() + if count > len(usage_data) * 0.1 # More than 10% of uses + ] + + common_hosts = [ + host for host, count in host_counts.items() + if count > len(usage_data) * 0.1 + ] + + # Calculate average daily usage + total_days = min(days, + (datetime.now() - datetime.fromtimestamp( + key_path.stat().st_mtime + )).days) + avg_daily = len(usage_data) / total_days if total_days > 0 else 0 + + # Detect unusual patterns + unusual = [] + + # Check for odd hours + odd_hours = [ + hour for hour, count in hour_counts.items() + if 0 <= hour <= 5 and count > 0 + ] + if odd_hours: + unusual.append({ + 'type': 'odd_hours', + 'details': { + 'hours': odd_hours, + 'count': sum(hour_counts[h] for h in odd_hours) + } + }) + + # Check for sudden spikes + for day in range(days): + date = datetime.now() - timedelta(days=day) + date_str = date.strftime('%Y-%m-%d') + + daily_count = conn.execute(""" + SELECT COUNT(*) as count + FROM key_usage + WHERE key_id = ? + AND date(timestamp) = date(?) + """, (key_id, date_str)).fetchone()['count'] + + if daily_count > avg_daily * 3: # 3x normal usage + unusual.append({ + 'type': 'usage_spike', + 'details': { + 'date': date_str, + 'count': daily_count, + 'average': avg_daily + } + }) + + pattern = UsagePattern( + common_times=[f"{h:02d}:00" for h in sorted(common_hours)], + common_hosts=common_hosts, + usage_frequency=day_counts, + average_daily_uses=avg_daily, + unusual_patterns=unusual + ) + + return self.create_result( + True, + "Usage patterns analyzed", + {'patterns': pattern.__dict__} + ) + + except Exception as e: + return self.create_result( + False, + f"Failed to analyze usage patterns: {str(e)}", + error=e + ) + + def get_key_usage(self, key_path: Path) -> Result: + """Get comprehensive key usage statistics""" + try: + key_id = self._generate_key_id(key_path) + + with sqlite3.connect(self.db_path) as conn: + conn.row_factory = sqlite3.Row + + # Get basic key info + key_info = conn.execute(""" + SELECT * FROM keys WHERE key_id = ? + """, (key_id,)).fetchone() + + if not key_info: + return self.create_result( + False, + "Key not found in tracking database" + ) + + # Get usage statistics + stats = conn.execute(""" + SELECT + COUNT(*) as total_uses, + COUNT(DISTINCT host) as unique_hosts, + COUNT(DISTINCT platform) as unique_platforms, + SUM(CASE WHEN success THEN 1 ELSE 0 END) as successes, + MAX(timestamp) as last_used + FROM key_usage + WHERE key_id = ? + """, (key_id,)).fetchone() + + # Get recent usage + recent_uses = conn.execute(""" + SELECT * + FROM key_usage + WHERE key_id = ? + ORDER BY timestamp DESC + LIMIT 10 + """, (key_id,)).fetchall() + + # Get known hosts + known_hosts = conn.execute(""" + SELECT * + FROM known_hosts + WHERE key_id = ? + ORDER BY last_seen DESC + """, (key_id,)).fetchall() + + return self.create_result( + True, + "Usage statistics retrieved", + { + 'key_info': dict(key_info), + 'stats': dict(stats), + 'recent_uses': [dict(u) for u in recent_uses], + 'known_hosts': [dict(h) for h in known_hosts] + } + ) + + except Exception as e: + return self.create_result( + False, + f"Failed to get usage statistics: {str(e)}", + error=e + ) + + # Previous methods (record_usage, _check_for_alerts, etc.) remain the same + + def record_usage(self, key_path: Path, host: str, + platform: Optional[str] = None, + success: bool = True, + details: Optional[Dict] = None) -> Result: + """Record key usage and check for alerts""" + try: + key_id = self._generate_key_id(key_path) + + with sqlite3.connect(self.db_path) as conn: + conn.row_factory = sqlite3.Row + + # Record usage + conn.execute(""" + INSERT INTO key_usage + (key_id, timestamp, host, platform, success, details) + VALUES (?, ?, ?, ?, ?, ?) + """, ( + key_id, + datetime.now(), + host, + platform, + success, + json.dumps(details) if details else None + )) + + # Update key stats + conn.execute(""" + UPDATE keys + SET last_used = ?, total_uses = total_uses + 1 + WHERE key_id = ? + """, (datetime.now(), key_id)) + + # Check for suspicious patterns + alerts = self._check_for_alerts(conn, key_id, host) + + # Store any alerts + for alert in alerts: + conn.execute(""" + INSERT INTO alerts + (key_id, level, message, timestamp, details, recommendations) + VALUES (?, ?, ?, ?, ?, ?) + """, ( + key_id, + alert['level'], + alert['message'], + datetime.now(), + json.dumps(alert['details']), + json.dumps(alert['recommendations']) + )) + + if alerts: + self._notify_alerts(alerts) + + return self.create_result(True, "Usage recorded successfully") + except Exception as e: + return self.create_result( + False, + f"Failed to record usage: {str(e)}", + error=e + ) + + def _check_for_alerts(self, conn, key_id: str, current_host: str) -> List[Dict]: + """Check for suspicious patterns""" + alerts = [] + + # Check for unusual hours + hour = datetime.now().hour + if 0 <= hour <= 5: + alerts.append({ + 'level': 'warning', + 'message': 'Key usage during unusual hours', + 'details': { + 'hour': hour, + 'host': current_host + }, + 'recommendations': [ + 'Verify if this access was intended', + 'Consider restricting key usage hours' + ] + }) + + # Check for rapid usage + recent_uses = conn.execute(""" + SELECT COUNT(*) as count + FROM key_usage + WHERE key_id = ? + AND timestamp > datetime('now', '-5 minutes') + """, (key_id,)).fetchone()['count'] + + if recent_uses > 5: + alerts.append({ + 'level': 'critical', + 'message': 'Unusually rapid key usage', + 'details': { + 'uses_count': recent_uses, + 'timeframe': '5 minutes' + }, + 'recommendations': [ + 'Check for automated processes', + 'Verify no unauthorized access' + ] + }) + + # Check for new hosts + known_hosts = set(row['host'] for row in conn.execute(""" + SELECT DISTINCT host + FROM key_usage + WHERE key_id = ? + AND timestamp < datetime('now', '-1 hour') + """, (key_id,))) + + if current_host not in known_hosts: + alerts.append({ + 'level': 'warning', + 'message': 'Access from new location', + 'details': { + 'new_host': current_host, + 'known_hosts': list(known_hosts) + }, + 'recommendations': [ + 'Verify if this new access point is legitimate', + 'Add to known hosts if authorized' + ] + }) + + return alerts + + def _notify_alerts(self, alerts: List[Dict]): + """Send notifications for alerts""" + try: + # Terminal notification + console = Console() + for alert in alerts: + console.print(Panel( + "\n".join([ + f"[bold red]Security Alert ({alert['level'].upper()})[/bold red]", + f"Message: {alert['message']}", + "", + "[bold]Details:[/bold]", + *[f"• {k}: {v}" for k, v in alert['details'].items()], + "", + "[bold]Recommendations:[/bold]", + *[f"• {r}" for r in alert['recommendations']] + ]), + title="Guardian Security Alert", + style="red" + )) + + # System notification if available + try: + import notify2 + notify2.init('Guardian') + for alert in alerts: + notification = notify2.Notification( + "Guardian Security Alert", + f"{alert['level'].upper()}: {alert['message']}" + ) + notification.show() + except ImportError: + pass + + except Exception as e: + self.logger.error(f"Failed to send notifications: {e}") + + def get_alerts(self, key_path: Path, + level: Optional[str] = None, + limit: int = 100) -> Result: + """Get recent alerts for a key""" + try: + key_id = self._generate_key_id(key_path) + + with sqlite3.connect(self.db_path) as conn: + conn.row_factory = sqlite3.Row + + query = """ + SELECT * + FROM alerts + WHERE key_id = ? + """ + params = [key_id] + + if level: + query += " AND level = ?" + params.append(level) + + query += " ORDER BY timestamp DESC LIMIT ?" + params.append(limit) + + alerts = [] + for row in conn.execute(query, params): + alerts.append({ + 'level': row['level'], + 'message': row['message'], + 'timestamp': row['timestamp'], + 'details': json.loads(row['details']), + 'recommendations': json.loads(row['recommendations']), + 'acknowledged': bool(row['acknowledged']) + }) + + return self.create_result( + True, + f"Found {len(alerts)} alerts", + {'alerts': alerts} + ) + + except Exception as e: + return self.create_result( + False, + f"Failed to get alerts: {str(e)}", + error=e + ) diff --git a/src/guardian/services/migration.py b/src/guardian/services/migration.py new file mode 100644 index 0000000..d649b6f --- /dev/null +++ b/src/guardian/services/migration.py @@ -0,0 +1,156 @@ +# src/guardian/services/migration.py +from dataclasses import dataclass +from typing import List, Dict, Any, Optional +import tempfile +import subprocess +from pathlib import Path +import shutil + +@dataclass +class MigrationPlan: + """Plan for repository migration""" + source_platform: str + target_platform: str + source_repo: str + target_repo: str + items: Dict[str, bool] # What to migrate (code, issues, PRs, etc.) + estimated_time: int # Estimated minutes + +@dataclass +class MigrationResult: + """Results of migration attempt""" + success: bool + items_migrated: Dict[str, int] + errors: List[str] + warnings: List[str] + +class PlatformMigration: + """Handles repository migration between platforms""" + + def __init__(self, source_platform: GitPlatform, target_platform: GitPlatform): + self.source = source_platform + self.target = target_platform + + def create_migration_plan(self, + source_repo: str, + target_repo: str) -> MigrationPlan: + """Create a migration plan""" + # Check what can be migrated + items = { + 'code': True, # Base repository + 'branches': True, # All branches + 'tags': True, # Version tags + 'releases': True, # Release information + 'issues': self._can_migrate_issues(), + 'pull_requests': self._can_migrate_prs(), + 'wiki': self._has_wiki(source_repo), + 'actions': self._has_actions(source_repo) + } + + # Estimate time based on repo size and features + estimated_time = self._estimate_migration_time(source_repo, items) + + return MigrationPlan( + source_platform=self.source.__class__.__name__, + target_platform=self.target.__class__.__name__, + source_repo=source_repo, + target_repo=target_repo, + items=items, + estimated_time=estimated_time + ) + + def execute_migration(self, plan: MigrationPlan) -> MigrationResult: + """Execute migration plan""" + results = MigrationResult( + success=True, + items_migrated={}, + errors=[], + warnings=[] + ) + + with tempfile.TemporaryDirectory() as temp_dir: + try: + # Clone source repository + self._clone_repository(plan.source_repo, temp_dir) + + # Migrate basics (code, branches, tags) + self._migrate_code(temp_dir, plan.target_repo, results) + + # Migrate additional items if requested + if plan.items.get('issues'): + self._migrate_issues(plan, results) + + if plan.items.get('pull_requests'): + self._migrate_pull_requests(plan, results) + + if plan.items.get('wiki'): + self._migrate_wiki(plan, results) + + if plan.items.get('actions'): + self._migrate_actions(plan, results) + + except Exception as e: + results.success = False + results.errors.append(str(e)) + + return results + + def _migrate_issues(self, plan: MigrationPlan, results: MigrationResult): + """Migrate issues between platforms""" + try: + # Get source issues + source_issues = self.source.get_issues( + *self._parse_repo_string(plan.source_repo) + ) + + migrated = 0 + for issue in source_issues: + try: + # Convert to target platform format + converted_issue = self._convert_issue_format( + issue, + self.source.__class__.__name__, + self.target.__class__.__name__ + ) + + # Create on target platform + self.target.create_issue( + *self._parse_repo_string(plan.target_repo), + converted_issue + ) + migrated += 1 + + except Exception as e: + results.warnings.append( + f"Failed to migrate issue {issue.id}: {str(e)}" + ) + + results.items_migrated['issues'] = migrated + + except Exception as e: + results.errors.append(f"Issue migration failed: {str(e)}") + + def _convert_issue_format(self, issue: IssueData, + source_platform: str, + target_platform: str) -> Dict[str, Any]: + """Convert issue format between platforms""" + # Basic conversion that works across platforms + converted = { + 'title': issue.title, + 'description': issue.description, + 'state': self._convert_state(issue.state, source_platform, target_platform), + 'labels': issue.labels + } + + # Platform-specific adjustments + if target_platform == 'GitLab': + converted['iid'] = issue.id + if 'type' in issue.labels: + converted['issue_type'] = issue.labels['type'] + + elif target_platform == 'Bitbucket': + converted['content'] = { + 'raw': issue.description + } + + return converted diff --git a/src/guardian/services/platform/base.py b/src/guardian/services/platform/base.py new file mode 100644 index 0000000..f156db9 --- /dev/null +++ b/src/guardian/services/platform/base.py @@ -0,0 +1,63 @@ +# src/guardian/services/platforms/base.py +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import List, Dict, Any, Optional +from datetime import datetime + +@dataclass +class IssueData: + """Common issue format across platforms""" + id: str + title: str + description: str + state: str + created_at: datetime + updated_at: datetime + labels: List[str] + assignees: List[str] + comments: int + platform_specific: Dict[str, Any] + +@dataclass +class PRData: + """Common pull request format across platforms""" + id: str + title: str + description: str + state: str + source_branch: str + target_branch: str + created_at: datetime + updated_at: datetime + labels: List[str] + reviewers: List[str] + comments: int + platform_specific: Dict[str, Any] + +class GitPlatform(ABC): + """Base class for platform-specific operations""" + + def __init__(self, token: str): + self.token = token + self.session = self._create_session() + + @abstractmethod + def _create_session(self): + """Create authenticated session""" + pass + + @abstractmethod + def get_issues(self, owner: str, repo: str) -> List[IssueData]: + """Get repository issues""" + pass + + @abstractmethod + def get_pull_requests(self, owner: str, repo: str) -> List[PRData]: + """Get repository pull requests""" + pass + + @abstractmethod + def migrate_to(self, target_platform: 'GitPlatform', + source_repo: str, target_repo: str) -> bool: + """Migrate repository to another platform""" + pass diff --git a/src/guardian/services/platform/github.py b/src/guardian/services/platform/github.py new file mode 100644 index 0000000..870bbe7 --- /dev/null +++ b/src/guardian/services/platform/github.py @@ -0,0 +1,40 @@ +# src/guardian/services/platforms/github.py +class GitHubPlatform(GitPlatform): + """GitHub-specific implementation""" + + def _create_session(self): + session = requests.Session() + session.headers.update({ + 'Authorization': f'Bearer {self.token}', + 'Accept': 'application/vnd.github.v3+json', + 'User-Agent': 'Guardian-Git-Tool' + }) + return session + + def get_issues(self, owner: str, repo: str) -> List[IssueData]: + response = self.session.get( + f"https://api.github.com/repos/{owner}/{repo}/issues" + ) + response.raise_for_status() + + issues = [] + for item in response.json(): + issues.append(IssueData( + id=str(item['number']), + title=item['title'], + description=item['body'] or '', + state=item['state'], + created_at=datetime.fromisoformat(item['created_at'].rstrip('Z')), + updated_at=datetime.fromisoformat(item['updated_at'].rstrip('Z')), + labels=[l['name'] for l in item['labels']], + assignees=[a['login'] for a in item['assignees']], + comments=item['comments'], + platform_specific={ + 'node_id': item['node_id'], + 'url': item['html_url'] + } + )) + return issues + + def migrate_to(self, target_platform: GitPlatform, + source_repo: str, target_repo: str) diff --git a/src/guardian/services/ssh.py b/src/guardian/services/ssh.py index 263246d..b441e61 100644 --- a/src/guardian/services/ssh.py +++ b/src/guardian/services/ssh.py @@ -1,12 +1,8 @@ # src/guardian/services/ssh.py -from pathlib import Path import subprocess -from typing import Optional +from pathlib import Path +from typing import List, Dict, Optional from guardian.core import Service, Result -from rich.console import Console -from rich.prompt import Confirm - -console = Console() class SSHManager(Service): """SSH key management service""" @@ -14,45 +10,53 @@ def __init__(self): super().__init__() self.ssh_dir = Path.home() / '.ssh' self.ssh_dir.mkdir(mode=0o700, exist_ok=True) - - def check_existing_keys(self) -> dict: - """Check for existing SSH keys""" - keys = { - 'rsa': (self.ssh_dir / 'id_rsa', self.ssh_dir / 'id_rsa.pub'), - 'ed25519': (self.ssh_dir / 'id_ed25519', self.ssh_dir / 'id_ed25519.pub'), - 'ecdsa': (self.ssh_dir / 'id_ecdsa', self.ssh_dir / 'id_ecdsa.pub') - } - - existing = {} - for key_type, (priv, pub) in keys.items(): - if priv.exists() and pub.exists(): - existing[key_type] = { - 'private': priv, - 'public': pub, - 'permissions': oct(priv.stat().st_mode)[-3:] - } - - return existing + + def list_ssh_keys(self) -> Result: + """List all SSH keys with their content""" + try: + key_types = { + 'rsa': ('id_rsa', 'id_rsa.pub'), + 'ed25519': ('id_ed25519', 'id_ed25519.pub'), + 'ecdsa': ('id_ecdsa', 'id_ecdsa.pub') + } + + keys = [] + for key_type, (private, public) in key_types.items(): + pub_path = self.ssh_dir / public + if pub_path.exists(): + try: + content = pub_path.read_text().strip() + keys.append({ + 'type': key_type, + 'path': str(pub_path), + 'content': content + }) + except Exception as e: + self.logger.warning(f"Could not read key {pub_path}: {e}") + + return self.create_result( + True, + f"Found {len(keys)} SSH keys", + {'keys': keys} + ) + except Exception as e: + return self.create_result( + False, + "Failed to list SSH keys", + error=e + ) def generate_key(self, email: str, force: bool = False) -> Result: """Generate new SSH key""" - existing = self.check_existing_keys() + key_path = self.ssh_dir / 'id_ed25519' - if existing and not force: - # Show existing keys - console.print("\n[yellow]Existing SSH keys found:[/yellow]") - for key_type, info in existing.items(): - console.print(f"• {key_type}: {info['public']}") - - if not Confirm.ask("\nDo you want to create a new key anyway?"): - return self.create_result( - False, - "Operation cancelled by user" - ) + if key_path.exists() and not force: + return self.create_result( + False, + f"SSH key already exists at {key_path}. Use --force to overwrite." + ) try: - key_path = self.ssh_dir / 'id_ed25519' - cmd = [ 'ssh-keygen', '-t', 'ed25519', @@ -63,6 +67,7 @@ def generate_key(self, email: str, force: bool = False) -> Result: subprocess.run(cmd, check=True) key_path.chmod(0o600) + (key_path.parent / f"{key_path.name}.pub").chmod(0o644) return self.create_result( True, @@ -76,36 +81,3 @@ def generate_key(self, email: str, force: bool = False) -> Result: "Failed to generate SSH key", error=e ) - - def backup_keys(self, backup_dir: Optional[Path] = None) -> Result: - """Backup existing SSH keys""" - if backup_dir is None: - backup_dir = self.config_dir / 'backups' / 'ssh' - - backup_dir.mkdir(parents=True, exist_ok=True) - - try: - existing = self.check_existing_keys() - if not existing: - return self.create_result( - False, - "No SSH keys found to backup" - ) - - import shutil - for key_type, info in existing.items(): - shutil.copy2(info['private'], backup_dir) - shutil.copy2(info['public'], backup_dir) - - return self.create_result( - True, - f"SSH keys backed up to {backup_dir}", - {'backup_dir': str(backup_dir)} - ) - - except Exception as e: - return self.create_result( - False, - "Failed to backup SSH keys", - error=e - ) diff --git a/src/guardian/utils/tree.py b/src/guardian/utils/tree.py new file mode 100644 index 0000000..36dc8a9 --- /dev/null +++ b/src/guardian/utils/tree.py @@ -0,0 +1,120 @@ +# src/guardian/utils/tree.py +import click +from rich.tree import Tree +from rich.console import Console +from pathlib import Path +from typing import List, Optional, Union + +class CommandTreeGenerator: + """Generate command trees for CLI documentation""" + + def __init__(self, cli: click.Group): + self.cli = cli + self.console = Console() + + def generate_markdown(self) -> str: + """Generate markdown representation of command tree""" + lines = ["# Command Tree\n"] + self._add_command_to_markdown(self.cli, lines) + return "\n".join(lines) + + def _add_command_to_markdown(self, command: Union[click.Group, click.Command], + lines: List[str], level: int = 0): + """Recursively build markdown tree""" + prefix = " " * level + if isinstance(command, click.Group): + lines.append(f"{prefix}* {command.name}/") + if command.help: + lines.append(f"{prefix} - {command.help}") + + # Sort commands for consistent output + sorted_commands = sorted(command.commands.items(), + key=lambda x: x[0]) + + for _, cmd in sorted_commands: + self._add_command_to_markdown(cmd, lines, level + 1) + else: + lines.append(f"{prefix}* {command.name}") + if command.help: + lines.append(f"{prefix} - {command.help}") + + def print_tree(self): + """Print rich tree representation of commands""" + tree = Tree("guardian", guide_style="bold bright_blue") + self._add_command_to_tree(self.cli, tree) + self.console.print(tree) + + def _add_command_to_tree(self, command: Union[click.Group, click.Command], + tree: Tree): + """Recursively build rich tree""" + if isinstance(command, click.Group): + for name, cmd in sorted(command.commands.items()): + branch = tree.add( + f"[bold cyan]{name}[/bold cyan]" + + (f"\n[dim]{cmd.help}[/dim]" if cmd.help else "") + ) + if isinstance(cmd, click.Group): + self._add_command_to_tree(cmd, branch) + +class ProjectTreeGenerator: + """Generate tree documentation for project templates""" + + def __init__(self, root_path: Path): + self.root_path = Path(root_path) + self.console = Console() + + def generate_markdown(self, + ignore_patterns: Optional[List[str]] = None) -> str: + """Generate markdown representation of project tree""" + if ignore_patterns is None: + ignore_patterns = ['__pycache__', '*.pyc', '.git', 'venv'] + + lines = [f"# Project Structure: {self.root_path.name}\n"] + self._add_path_to_markdown(self.root_path, lines, ignore_patterns) + return "\n".join(lines) + + def _add_path_to_markdown(self, path: Path, lines: List[str], + ignore_patterns: List[str], level: int = 0): + """Recursively build markdown tree""" + prefix = " " * level + + # Skip ignored patterns + if any(path.match(pattern) for pattern in ignore_patterns): + return + + if path.is_dir(): + lines.append(f"{prefix}* {path.name}/") + + # Sort entries for consistent output + entries = sorted(path.iterdir(), key=lambda p: (p.is_file(), p.name)) + + for entry in entries: + self._add_path_to_markdown(entry, lines, ignore_patterns, level + 1) + else: + lines.append(f"{prefix}* {path.name}") + + def print_tree(self, ignore_patterns: Optional[List[str]] = None): + """Print rich tree representation of project""" + if ignore_patterns is None: + ignore_patterns = ['__pycache__', '*.pyc', '.git', 'venv'] + + tree = Tree( + f"[bold]{self.root_path.name}[/bold]", + guide_style="bold bright_blue" + ) + self._add_path_to_tree(self.root_path, tree, ignore_patterns) + self.console.print(tree) + + def _add_path_to_tree(self, path: Path, tree: Tree, + ignore_patterns: List[str]): + """Recursively build rich tree""" + if any(path.match(pattern) for pattern in ignore_patterns): + return + + if path.is_dir(): + branch = tree.add(f"[bold cyan]{path.name}/[/bold cyan]") + entries = sorted(path.iterdir(), key=lambda p: (p.is_file(), p.name)) + for entry in entries: + self._add_path_to_tree(entry, branch, ignore_patterns) + else: + tree.add(f"[green]{path.name}[/green]")