diff --git a/.github/workflows/precommit.yaml b/.github/workflows/precommit.yaml new file mode 100644 index 0000000..e151b79 --- /dev/null +++ b/.github/workflows/precommit.yaml @@ -0,0 +1,23 @@ +--- +name: precommit + +on: + pull_request: + push: + branches: [main] + +permissions: + actions: read + checks: write + contents: read + pull-requests: write + +jobs: + pre-commit: + runs-on: ubuntu-24.04 + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: '3.11' + - uses: pre-commit/action@v3.0.1 diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml new file mode 100644 index 0000000..8144c65 --- /dev/null +++ b/.github/workflows/tests.yaml @@ -0,0 +1,53 @@ +--- +name: Tests + +on: + push: + branches: [main] + pull_request: + +jobs: + test: + name: Run Tests + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.x' + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -e ".[test]" + + - name: Run tests + run: | + pytest cpk_lib_python_aws/tests/ -v + + + test-installation: + name: Test Package Installation + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.x' + + - name: Install package + run: | + python -m pip install --upgrade pip + pip install -e . + + - name: Test CLI command + run: | + aws-access-auditor --help diff --git a/.gitignore b/.gitignore index b7faf40..2825610 100644 --- a/.gitignore +++ b/.gitignore @@ -1,207 +1,95 @@ -# Byte-compiled / optimized / DLL files -__pycache__/ -*.py[codz] *$py.class - -# C extensions -*.so - -# Distribution / packaging -.Python -build/ -develop-eggs/ -dist/ -downloads/ -eggs/ -.eggs/ -lib/ -lib64/ -parts/ -sdist/ -var/ -wheels/ -share/python-wheels/ -*.egg-info/ -.installed.cfg +*.cover *.egg -MANIFEST - -# PyInstaller -# Usually these files are written by a python script from a template -# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.egg-info/ +*.egg-info/ +*.egg-link +*.installed.cfg +*.log *.manifest +*.mo +*.pot +*.py.cover +*.py[codz] +*.pyc +*.pyd +*.pyo +*.sage.py +*.so *.spec - -# Installer logs -pip-log.txt -pip-delete-this-directory.txt - -# Unit test / coverage reports -htmlcov/ -.tox/ -.nox/ +*.swp +.Python +.abstra/ +.cache .coverage .coverage.* -.cache -nosetests.xml -coverage.xml -*.cover -*.py.cover +.cursorignore +.cursorindexingignore +.dmypy.json +.eggs/ +.env +.envrc .hypothesis/ -.pytest_cache/ -cover/ - -# Translations -*.mo -*.pot - -# Django stuff: -*.log -local_settings.py -db.sqlite3 -db.sqlite3-journal - -# Flask stuff: -instance/ -.webassets-cache - -# Scrapy stuff: -.scrapy - -# Sphinx documentation -docs/_build/ - -# PyBuilder -.pybuilder/ -target/ - -# Jupyter Notebook +.installed.cfg .ipynb_checkpoints - -# IPython -profile_default/ -ipython_config.py - -# pyenv -# For a library or package, you might want to ignore these files since the code is -# intended to run in multiple environments; otherwise, check them in: -# .python-version - -# pipenv -# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. -# However, in case of collaboration, if having platform-specific dependencies or dependencies -# having no cross-platform support, pipenv may install dependencies that don't work, or not -# install all needed dependencies. -#Pipfile.lock - -# UV -# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control. -# This is especially recommended for binary packages to ensure reproducibility, and is more -# commonly ignored for libraries. -#uv.lock - -# poetry -# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. -# This is especially recommended for binary packages to ensure reproducibility, and is more -# commonly ignored for libraries. -# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control -#poetry.lock -#poetry.toml - -# pdm -# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. -# pdm recommends including project-wide configuration in pdm.toml, but excluding .pdm-python. -# https://pdm-project.org/en/latest/usage/project/#working-with-version-control -#pdm.lock -#pdm.toml -.pdm-python +.mypy_cache/ +.nox/ .pdm-build/ - -# pixi -# Similar to Pipfile.lock, it is generally recommended to include pixi.lock in version control. -#pixi.lock -# Pixi creates a virtual environment in the .pixi directory, just like venv module creates one -# in the .venv directory. It is recommended not to include this directory in version control. +.pdm-python .pixi - -# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm -__pypackages__/ - -# Celery stuff -celerybeat-schedule -celerybeat.pid - -# SageMath parsed files -*.sage.py - -# Environments -.env -.envrc -.venv -env/ -venv/ -ENV/ -env.bak/ -venv.bak/ - -# Spyder project settings +.pybuilder/ +.pypirc +.pyre/ +.pytest_cache/ +.pytype/ +.ropeproject +.ruff_cache/ +.scrapy .spyderproject .spyproject - -# Rope project settings -.ropeproject - -# mkdocs documentation +.tox/ +.venv +.vscode/ +.webassets-cache /site - -# mypy -.mypy_cache/ -.dmypy.json -dmypy.json - -# Pyre type checker -.pyre/ - -# pytype static type analyzer -.pytype/ - -# Cython debug symbols +ENV/ +MANIFEST +__marimo__/ +__pycache__/ +__pypackages__/ +build/ +celerybeat-schedule +celerybeat.pid +cover/ +coverage.xml cython_debug/ - -# PyCharm -# JetBrains specific template is maintained in a separate JetBrains.gitignore that can -# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore -# and can be added to the global gitignore or merged into this file. For a more nuclear -# option (not recommended) you can uncomment the following to ignore the entire idea folder. -#.idea/ - -# Abstra -# Abstra is an AI-powered process automation framework. -# Ignore directories containing user credentials, local state, and settings. -# Learn more at https://abstra.io/docs -.abstra/ - -# Visual Studio Code -# Visual Studio Code specific template is maintained in a separate VisualStudioCode.gitignore -# that can be found at https://github.com/github/gitignore/blob/main/Global/VisualStudioCode.gitignore -# and can be added to the global gitignore or merged into this file. However, if you prefer, -# you could uncomment the following to ignore the entire vscode folder -# .vscode/ - -# Ruff stuff: -.ruff_cache/ - -# PyPI configuration file -.pypirc - -# Cursor -# Cursor is an AI-powered code editor. `.cursorignore` specifies files/directories to -# exclude from AI features like autocomplete and code analysis. Recommended for sensitive data -# refer to https://docs.cursor.com/context/ignore-files -.cursorignore -.cursorindexingignore - -# Marimo -marimo/_static/ +db.sqlite3 +db.sqlite3-journal +develop-eggs/ +dist/ +dmypy.json +docs/_build/ +downloads/ +eggs/ +env.bak/ +env/ +htmlcov/ +instance/ +ipython_config.py +lib/ +lib64/ +local_settings.py marimo/_lsp/ -__marimo__/ +marimo/_static/ +nosetests.xml +parts/ +pip-delete-this-directory.txt +pip-log.txt +profile_default/ +sdist/ +share/python-wheels/ +target/ +var/ +venv.bak/ +venv/ +wheels/ diff --git a/.gitleaks.toml b/.gitleaks.toml new file mode 100644 index 0000000..b8bce2e --- /dev/null +++ b/.gitleaks.toml @@ -0,0 +1,9 @@ +title = "Gitleaks Configuration" + +[allowlist] +description = "Allowlist for test files" +paths = [ + '''cpk_lib_python_github/.*tests/.*''', + '''.*conftest\.py''', + '''.*test_.*\.py''', +] diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..8516fd6 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,111 @@ +--- +repos: + + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v5.0.0 + hooks: + - id: check-merge-conflict + - id: check-added-large-files + args: [--maxkb=500] + - id: trailing-whitespace + - id: detect-private-key + - id: end-of-file-fixer + - id: fix-encoding-pragma + - id: file-contents-sorter + files: ^(requirements.*\.txt|\.gitignore)$ + - id: check-case-conflict + - id: mixed-line-ending + args: [--fix=lf] + # ----------------------------- + # Checkov is a static code analysis tool for scanning infrastructure as code (IaC) files for misconfigurations + # that may lead to security or compliance problems. + # ----------------------------- + # Checkov includes more than 750 predefined policies to check for common misconfiguration issues. + # Checkov also supports the creation and contribution of custom policies. + # ----------------------------- + # - repo: https://github.com/bridgecrewio/checkov.git + # rev: 3.2.174 + # hooks: + # - id: checkov + + # ----------------------------- + # Python Code Formatting with Black + # ----------------------------- + - repo: https://github.com/psf/black + rev: 25.1.0 + hooks: + - id: black + language_version: python3 + files: \.py$ + args: [--config=pyproject.toml] + + # ----------------------------- + # Python Import Sorting with isort (complements Black) + # ----------------------------- + - repo: https://github.com/pycqa/isort + rev: 6.0.1 + hooks: + - id: isort + files: \.py$ + args: [--profile=black, --line-length=88] + + # ----------------------------- + # Python Code Quality with Pylint + # ----------------------------- + - repo: https://github.com/pycqa/pylint + rev: v3.3.7 + hooks: + - id: pylint + args: [--rcfile=pyproject.toml] + files: \.py$ + additional_dependencies: [PyJWT, requests, toml, colorama, setuptools, boto3, botocore, pyyaml, certifi, urllib3] + + # ----------------------------- + # Gitleaks SAST tool for detecting and preventing hardcoded secrets like passwords, api keys, and tokens in git repos + # ----------------------------- + # If you are knowingly committing something that is not a secret and gitleaks is catching it, + # you can add an inline comment of '# gitleaks:allow' to the end of that line in your file. + # This will instructs gitleaks to ignore that secret - example: + # some_non_secret_value = a1b2c3d4e5f6g7h8i9j0 # gitleaks:allow + # ----------------------------- + - repo: https://github.com/gitleaks/gitleaks + rev: v8.27.2 + hooks: + - id: gitleaks + args: ['--config=.gitleaks.toml'] + # ----------------------------- + # # Generates Table of Contents in Markdown files + # # ----------------------------- + - repo: https://github.com/frnmst/md-toc + rev: 9.0.0 + hooks: + - id: md-toc + args: [-p, github] # CLI options + # ----------------------------- + # YAML Linting on yaml files for pre-commit and github actions + # ----------------------------- + - repo: https://github.com/adrienverge/yamllint + rev: v1.37.1 + hooks: + - id: yamllint + name: Check YAML syntax with yamllint + args: [--strict, -c=.yamllint.yaml, '.'] + always_run: true + pass_filenames: true + + # ----------------------------- + # GitHub Actions Workflow Linting on .github/workflows/*.yml files + # ----------------------------- + - repo: https://github.com/rhysd/actionlint + rev: v1.7.7 + hooks: + - id: actionlint + + - repo: local + hooks: + - id: toml build + name: test the .toml package health + entry: pip3 install . + language: python + pass_filenames: false + always_run: true diff --git a/.yamllint.yaml b/.yamllint.yaml new file mode 100644 index 0000000..0ccdc36 --- /dev/null +++ b/.yamllint.yaml @@ -0,0 +1,35 @@ +--- +yaml-files: + - '*.yaml' + - '*.yml' + - '.yamllint' + +rules: + anchors: enable + braces: enable + brackets: enable + colons: enable + commas: enable + comments: + level: warning + comments-indentation: + level: warning + document-end: disable + document-start: + level: warning + empty-lines: enable + empty-values: disable + float-values: disable + hyphens: enable + indentation: enable + key-duplicates: enable + key-ordering: disable + # line-length: + # max: 150 + # level: warning + new-line-at-end-of-file: enable + new-lines: enable + octal-values: disable + quoted-strings: disable + trailing-spaces: enable + truthy: disable diff --git a/README.md b/README.md new file mode 100644 index 0000000..18bb24e --- /dev/null +++ b/README.md @@ -0,0 +1,13 @@ +# ☁️ CPK AWS Python Libraries + +A comprehensive collection of Python libraries for AWS automation, management, and integration. This package provides a suite of tools designed to simplify AWS operations for development teams, CI/CD pipelines, and automation scripts. + +## 📋 Overview + +**CPK AWS Python Libraries** is a modular collection of AWS-related utilities designed for: + +- 🔍 **SSO Auditing & Compliance** - AWS Single Sign-On analysis and reporting +- 🔐 **Identity Management** - User, group, and permission set automation +- 📊 **Security Analysis** - Comprehensive AWS access auditing +- 🚀 **CI/CD Integration** - Seamless pipeline automation for AWS governance +- 🔧 **Development Tools** - CLI utilities for daily AWS administration tasks diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..4cdcfeb --- /dev/null +++ b/__init__.py @@ -0,0 +1,15 @@ +# -*- coding: utf-8 -*- +"""CPK Python AWS Library - Comprehensive AWS utilities and tools.""" + +from cpk_lib_python_aws.aws_access_auditor import AWSSSOAuditor +from cpk_lib_python_aws.aws_access_auditor import Config as SSOConfig +from cpk_lib_python_aws.shared import AWSBaseClient, AWSError, OutputSink + +__version__ = "1.0.0" +__all__ = [ + "AWSSSOAuditor", + "SSOConfig", + "OutputSink", + "AWSBaseClient", + "AWSError", +] diff --git a/cpk_lib_python_aws/__init__.py b/cpk_lib_python_aws/__init__.py new file mode 100644 index 0000000..e9f69eb --- /dev/null +++ b/cpk_lib_python_aws/__init__.py @@ -0,0 +1,20 @@ +# -*- coding: utf-8 -*- +"""CPK Python AWS Library - Comprehensive AWS utilities and tools.""" + +# Import SSO Auditor components +from .aws_access_auditor import AWSSSOAuditor +from .aws_access_auditor import Config as SSOConfig + +# Import shared components +from .shared import AWSBaseClient, AWSError, OutputSink + +__version__ = "1.0.0" +__all__ = [ + # Shared components + "OutputSink", + "AWSBaseClient", + "AWSError", + # SSO Auditor components + "AWSSSOAuditor", + "SSOConfig", +] diff --git a/cpk_lib_python_aws/aws_access_auditor/README.md b/cpk_lib_python_aws/aws_access_auditor/README.md new file mode 100644 index 0000000..0dd484d --- /dev/null +++ b/cpk_lib_python_aws/aws_access_auditor/README.md @@ -0,0 +1,391 @@ +# 🔍 AWS SSO Auditor + +A powerful CLI tool for auditing AWS Single Sign-On (SSO) configurations, analyzing permission sets, groups, and assignments across AWS accounts. This tool simplifies AWS SSO compliance and security audits by providing comprehensive reporting and analysis capabilities. + +## 📋 Table of Contents + +- [Features](#-features) +- [Installation](#-installation) +- [Quick Start](#-quick-start) +- [Usage Examples](#-usage-examples) +- [Command Reference](#-command-reference) +- [Environment Variables](#-environment-variables) +- [Sample Outputs](#-sample-outputs) +- [Common Use Cases](#-common-use-cases) +- [Configuration](#-configuration) +- [Python Usage](#-python-usage) + +## ✨ Features + +- 🔍 **Comprehensive SSO Auditing**: Analyze AWS SSO groups, permission sets, and account assignments +- 👥 **Group Analysis**: Detailed group membership and permission mapping +- 🔐 **Permission Set Analysis**: In-depth analysis of AWS managed, customer managed, and inline policies +- 📊 **Multi-format Output**: Support for JSON and YAML output formats +- 🎯 **Account-specific Auditing**: Focus audits on specific AWS accounts +- 📁 **Flexible Output Management**: Configurable output directories with timestamp support +- 🌍 **Environment Variables**: Full support for environment-based configuration +- 🎨 **Rich Console Output**: Colorized, well-formatted output with progress indicators +- 📝 **Debug Mode**: Detailed logging for troubleshooting +- 🏗️ **Professional Architecture**: Modular design with proper error handling + +## 🚀 Installation + +### Prerequisites + +- AWS SSO enabled in your AWS organization +- AWS credentials with appropriate SSO permissions: + - `sso-admin:*` permissions + - `identitystore:*` permissions + - `organizations:ListAccounts` permission + +### Install from Source + +```bash +pip install git+https://github.com/opencpk/cpk-lib-python-aws.git@main +``` + +### Verify Installation + +```bash +aws-access-auditor --help +``` + +## 🎯 Quick Start + +### 1. Set up AWS Credentials + +```bash +# Using AWS CLI profiles +export AWS_PROFILE=sso-admin + +# Or using environment variables +export AWS_REGION=us-east-1 +``` + +### 2. Run Your First Audit + +```bash +aws-access-auditor 123456789012 --output-format json +``` + +### 3. Generate Comprehensive Reports + +```bash +aws-access-auditor 123456789012 --output-format both --output-dir ./audit-reports +``` + +## 📖 Usage Examples + +### 🔍 Basic Auditing + +#### Audit a specific AWS account: + +```bash +aws-access-auditor 123456789012 +``` + +#### Audit with JSON output only: + +```bash +aws-access-auditor 123456789012 --output-format json +``` + +#### Audit with YAML output only: + +```bash +aws-access-auditor 123456789012 --output-format yaml +``` + +#### Audit with both formats: + +```bash +aws-access-auditor 123456789012 --output-format both +``` + +### 📁 Output Management + +#### Custom output directory: + +```bash +aws-access-auditor 123456789012 --output-dir ./my-audit-reports +``` + +#### Disable timestamps in filenames: + +```bash +aws-access-auditor 123456789012 --no-timestamp +``` + +### 🌍 Region and Profile Configuration + +#### Specify AWS region: + +```bash +aws-access-auditor 123456789012 --aws-region us-west-2 +``` + +#### Use specific AWS profile: + +```bash +aws-access-auditor 123456789012 --aws-profile sso-admin-profile +``` + +### 🔇 Quiet and Debug Modes + +#### Quiet mode (no console output): + +```bash +aws-access-auditor 123456789012 --quiet +``` + +#### Debug mode (detailed logging): + +```bash +aws-access-auditor 123456789012 --debug +``` + +### 🐛 Debug & Help + +#### Show help: + +```bash +aws-access-auditor --help +``` + +## 📚 Command Reference + +| Argument | Description | Default | +| ----------------- | ---------------------------------------------------- | ------------------------- | +| `account_id` | AWS Account ID to audit (required) | - | +| `--output-format` | Output format: `json`, `yaml`, or `both` | `both` | +| `--output-dir` | Output directory path | `./aws-sso-audit-results` | +| `--aws-region` | AWS region | `us-east-1` | +| `--aws-profile` | AWS profile to use | None | +| `--quiet` `-q` | Suppress console output and logging, only save files | `False` | +| `--debug` | Enable debug logging | `False` | +| `--no-timestamp` | Don't include timestamp in filenames | `False` | +| `--help` | Show help message | - | + +## 🌍 Environment Variables + +| Variable | Description | Example | +| ------------------------------- | ------------------------ | ----------------- | +| `AWS_REGION` | AWS region | `us-east-1` | +| `AWS_PROFILE` | AWS profile to use | `sso-admin` | +| `AWS_ACCESS_AUDITOR_OUTPUT_DIR` | Default output directory | `./audit-reports` | +| `AWS_ACCESS_AUDITOR_DEBUG` | Enable debug mode | `true` | +| `AWS_ACCESS_AUDITOR_QUIET` | Enable quiet mode | `true` | + +### Setting Environment Variables + +```bash +# Basic configuration +export AWS_REGION=us-east-1 + +# Advanced configuration +export AWS_ACCESS_AUDITOR_OUTPUT_DIR=./audit-reports +export AWS_ACCESS_AUDITOR_DEBUG=true + +# Then use shorter commands +aws-access-auditor 123456789012 +``` + +## 🎨 Sample Outputs + +### 📊 Successful Audit Output + +```bash +$ aws-access-auditor 123456789012 --output-format json --debug +``` + +**Console Output:** + +``` +⏳ Initializing AWS clients... +⏳ Starting audit for account: 123456789012 +⏳ Retrieving account assignments... +🔍 Found 13 assignments +⏳ Processing assignments... +⏳ Processing group: 90967fb4-d4e1-7019-c6a2-3b4d2a8c7e5f +⏳ Processing permission set: arn:aws:sso:::permissionSet/ssoins-1234567890abcdef/ps-1234567890abcdef +⏳ Finalizing audit results... +✅ Results saved to: ./aws-sso-audit-results/aws_sso_audit_123456789012_20250107_124443.json + +📊 AWS SSO Audit Summary +🆔 Account: 123456789012 +📅 Generated: 2025-01-07T12:44:43.717 +👥 Groups: 3 +🔐 Permission Sets: 5 +🔗 Assignments: 13 +``` + +### 📋 Sample JSON Output Structure + +```json +{ + "metadata": { + "generated_at": "2025-01-07T12:44:43.717", + "account_id": "123456789012", + "sso_instance_arn": "arn:aws:sso:::instance/ssoins-1234567890abcdef", + "identity_store_id": "d-1234567890", + "auditor_version": "1.0.0", + "config": { + "aws_region": "us-east-1", + "output_formats": ["json"] + } + }, + "sso_groups_summary": ["Developers", "Administrators", "ReadOnlyUsers"], + "sso_permission_sets_summary": [ + "AdministratorAccess", + "DeveloperAccess", + "ReadOnlyAccess", + "PowerUserAccess", + "CustomDataAccess" + ], + "sso_groups": [ + { + "GroupId": "90967fb4-d4e1-7019-c6a2-3b4d2a8c7e5f", + "DisplayName": "Developers", + "Description": "Development team members", + "Members": [ + { + "UserId": "90967fb4-b2c1-70a8-b8a2-1b2c3d4e5f6g", + "UserName": "john.doe", + "DisplayName": "John Doe", + "Email": "john.doe@company.com" + } + ], + "PermissionSets": [ + { + "Name": "DeveloperAccess", + "Description": "Developer permissions", + "Policies": { + "managed_policies": [ + { + "Name": "PowerUserAccess", + "Arn": "arn:aws:iam::aws:policy/PowerUserAccess" + } + ], + "customer_managed_policies": [], + "inline_policy": null + } + } + ] + } + ], + "permission_sets": [ + { + "Name": "DeveloperAccess", + "Description": "Developer permissions", + "CreatedDate": "2024-01-15T10:30:00Z", + "SessionDuration": "PT8H", + "Policies": { + "managed_policies": [ + { + "Name": "PowerUserAccess", + "Arn": "arn:aws:iam::aws:policy/PowerUserAccess" + } + ], + "customer_managed_policies": [], + "inline_policy": { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": "s3:GetObject", + "Resource": "arn:aws:s3:::company-dev-bucket/*" + } + ] + } + }, + "AssignedGroups": ["90967fb4-d4e1-7019-c6a2-3b4d2a8c7e5f"] + } + ], + "summary": { + "total_groups": 3, + "total_permission_sets": 5, + "total_assignments": 13 + } +} +``` + +### ❌ Error Output Examples + +#### Account not found: + +```bash +$ aws-access-auditor 999999999999 +``` + +**Output:** + +``` +❌ AWS SSO Auditor Error: No permission sets found for account 999999999999 +``` + +#### Invalid credentials: + +```bash +$ aws-access-auditor 123456789012 +``` + +**Output:** + +``` +❌ Unexpected error: Unable to locate credentials +``` + +### 🔇 Quiet Mode Output + +```bash +$ aws-access-auditor 123456789012 --quiet +``` + +**Output:** (No console output, only files generated) + +### 🐛 Debug Mode Output + +```bash +$ aws-access-auditor 123456789012 --debug +``` + +## 🐍 Python Usage + +If you prefer to use this tool as a Python library in your scripts: + +### Programmatic Usage + +```python -c " +from cpk_lib_python_aws.aws_access_auditor import AWSSSOAuditor, Config, OutputFormatter +from cpk_lib_python_aws.shared import OutputSink + +config = Config( + output_directory='./test-results-1', + include_timestamp=False, + quiet=False +) + +output_sink = OutputSink(quiet=True) +auditor = AWSSSOAuditor(config, output_sink) +formatter = OutputFormatter(config, output_sink) + +results = auditor.audit_account('123456789012') +formatter.save_results(results, '123456789012') +" +``` + +### Without Timestamps + +``` +./aws-sso-audit-results/ +├── aws_sso_audit_123456789012.json +└── aws_sso_audit_123456789012.yaml +``` + +## 📄 License + +This project is licensed under the GPLv3 License. + +## 🤝 Contributing + +Contributions are welcome! Please feel free to submit issues and pull requests. diff --git a/cpk_lib_python_aws/aws_access_auditor/__init__.py b/cpk_lib_python_aws/aws_access_auditor/__init__.py new file mode 100644 index 0000000..55c36c5 --- /dev/null +++ b/cpk_lib_python_aws/aws_access_auditor/__init__.py @@ -0,0 +1,25 @@ +# -*- coding: utf-8 -*- +"""AWS SSO Auditor - Professional AWS SSO auditing and compliance tool.""" + +from ..shared import OutputSink +from .auditor import AWSSSOAuditor +from .aws_client_manager import AWSClientManager +from .config import Config +from .exceptions import ( + AWSSSOAuditorError, + ConfigurationError, + InsufficientPermissionsError, +) +from .formatters import OutputFormatter + +__version__ = "1.0.0" +__all__ = [ + "AWSSSOAuditor", + "Config", + "AWSSSOAuditorError", + "InsufficientPermissionsError", # Keep this as is since it's defined in your local exceptions + "ConfigurationError", + "OutputFormatter", + "AWSClientManager", + "OutputSink", +] diff --git a/cpk_lib_python_aws/aws_access_auditor/__main__.py b/cpk_lib_python_aws/aws_access_auditor/__main__.py new file mode 100644 index 0000000..d2719d2 --- /dev/null +++ b/cpk_lib_python_aws/aws_access_auditor/__main__.py @@ -0,0 +1,9 @@ +# -*- coding: utf-8 -*- +"""Entry point for running AWS SSO Auditor as a module.""" + +import sys + +from .cli import main + +if __name__ == "__main__": + sys.exit(main()) diff --git a/cpk_lib_python_aws/aws_access_auditor/auditor.py b/cpk_lib_python_aws/aws_access_auditor/auditor.py new file mode 100644 index 0000000..b3b8742 --- /dev/null +++ b/cpk_lib_python_aws/aws_access_auditor/auditor.py @@ -0,0 +1,344 @@ +# -*- coding: utf-8 -*- +"""Core AWS SSO auditing functionality.""" +import json +import logging +from datetime import datetime +from typing import Any, Dict, List + +from .aws_client_manager import AWSClientManager +from .config import Config +from .exceptions import AWSSSOAuditorError +from .utils import clean_aws_response, safe_get_nested + +logger = logging.getLogger(__name__) + + +class NullOutputSink: + """Null object pattern for output sink.""" + + def progress(self, message): + """Display progress message (no-op).""" + + def debug_info(self, message): + """Display debug information (no-op).""" + + def warning(self, message): + """Display warning message (no-op).""" + + def info(self, message): + """Display info message (no-op).""" + + def error(self, message): + """Display error message (no-op).""" + + +class AWSSSOAuditor: + """Main AWS SSO auditing class.""" + + def __init__(self, config: Config = None, output_sink=None): + """Initialize the AWS SSO Auditor.""" + self.config = config or Config() + self.config.validate() + + # Initialize output sink + self.output_sink = output_sink or NullOutputSink() + + # Initialize AWS clients + self.output_sink.progress("Initializing AWS clients...") + self.aws_manager = AWSClientManager(self.config) + + # Store frequently used references + self.sso_admin_client = self.aws_manager.sso_admin_client + self.identitystore_client = self.aws_manager.identitystore_client + self.organizations_client = self.aws_manager.organizations_client + self.identity_store_id = self.aws_manager.identity_store_id + self.instance_arn = self.aws_manager.instance_arn + + # Show client info in debug mode + if self.config.debug: + client_info = self.aws_manager.get_client_info() + logger.debug("AWS Client Info: %s", client_info) + self.output_sink.debug_info(f"Connected to SSO instance: {self.instance_arn}") + + logger.info("AWS SSO Auditor initialized successfully") + + # pylint: disable=too-many-branches + def audit_account(self, account_id: str) -> Dict[str, Any]: + """Perform complete audit of SSO access for the given account.""" + logger.info("Starting AWS SSO audit for account: %s", account_id) + self.output_sink.progress(f"Starting audit for account: {account_id}") + + try: + # Get all account assignments (only for permission sets assigned to this account) + self.output_sink.progress("Retrieving account assignments...") + assignments = self.get_all_account_assignments(account_id) + self.output_sink.debug_info(f"Found {len(assignments)} assignments") + + # Organize data + groups_data = {} + permission_sets_data = {} + + self.output_sink.progress("Processing assignments...") + for assignment in assignments: + principal_type = assignment["PrincipalType"] + principal_id = assignment["PrincipalId"] + permission_set_arn = assignment["PermissionSetArn"] + + if principal_type == "GROUP": + if principal_id not in groups_data: + self.output_sink.progress(f"Processing group: {principal_id}") + group_details = self.get_group_details(principal_id) + group_members = self.get_group_members(principal_id) + groups_data[principal_id] = { + **group_details, + "Members": group_members, + "PermissionSets": [], + } + + # Get full permission set details for this group + permission_set_details = self.get_permission_set_details(permission_set_arn) + permission_set_policies = self.get_permission_set_policies(permission_set_arn) + + permission_set_full_details = { + **permission_set_details, + "Policies": permission_set_policies, + } + + groups_data[principal_id]["PermissionSets"].append(permission_set_full_details) + + # Collect permission set data (only for those with assignments to this account) + if permission_set_arn not in permission_sets_data: + self.output_sink.progress(f"Processing permission set: {permission_set_arn}") + permission_set_details = self.get_permission_set_details(permission_set_arn) + permission_set_policies = self.get_permission_set_policies(permission_set_arn) + permission_sets_data[permission_set_arn] = { + **permission_set_details, + "Policies": permission_set_policies, + "AssignedGroups": [], + } + + if principal_type == "GROUP": + if ( + principal_id + not in permission_sets_data[permission_set_arn]["AssignedGroups"] + ): + permission_sets_data[permission_set_arn]["AssignedGroups"].append( + principal_id + ) + + # Create simple lists for summary + group_names = [group["DisplayName"] for group in groups_data.values()] + permission_set_names = [ + ps.get("Name", "Unknown") for ps in permission_sets_data.values() + ] + + self.output_sink.progress("Finalizing audit results...") + + # Build final result + result = { + "metadata": { + "generated_at": datetime.now().isoformat(), + "account_id": account_id, + "sso_instance_arn": self.instance_arn, + "identity_store_id": self.identity_store_id, + "auditor_version": "1.0.0", + "config": { + "aws_region": self.config.aws_region, + "output_formats": self.config.output_formats, + }, + }, + "sso_groups_summary": group_names, + "sso_permission_sets_summary": permission_set_names, + "sso_groups": list(groups_data.values()), + "permission_sets": list(permission_sets_data.values()), + "summary": { + "total_groups": len(groups_data), + "total_permission_sets": len(permission_sets_data), + "total_assignments": len(assignments), + }, + } + + logger.info("Audit completed successfully for account %s", account_id) + return result + + except Exception as e: + logger.error("Audit failed for account %s: %s", account_id, e) + raise AWSSSOAuditorError(f"Failed to audit account {account_id}: {e}") from e + + def get_permission_sets_for_account(self, account_id: str) -> List[str]: + """Get only permission sets that are provisioned/assigned to the specific account.""" + try: + permission_sets = [] + paginator = self.sso_admin_client.get_paginator( + "list_permission_sets_provisioned_to_account" + ) + + for page in paginator.paginate(InstanceArn=self.instance_arn, AccountId=account_id): + permission_sets.extend(page["PermissionSets"]) + + logger.info( + "Found %d permission sets provisioned to account %s", + len(permission_sets), + account_id, + ) + return permission_sets + except Exception as e: + logger.error("Error getting permission sets for account %s: %s", account_id, e) + return [] + + def get_account_assignments_for_permission_set( + self, permission_set_arn: str, account_id: str + ) -> List[Dict[str, Any]]: + """Get account assignments for a specific permission set and account.""" + try: + assignments = [] + paginator = self.sso_admin_client.get_paginator("list_account_assignments") + + for page in paginator.paginate( + InstanceArn=self.instance_arn, + AccountId=account_id, + PermissionSetArn=permission_set_arn, + ): + assignments.extend(page["AccountAssignments"]) + + return assignments + except Exception as e: + logger.error( + "Error getting account assignments for permission set %s: %s", permission_set_arn, e + ) + return [] + + def get_all_account_assignments(self, account_id: str) -> List[Dict[str, Any]]: + """Get all account assignments for the given account. + + Only checks permission sets that are provisioned to this account. + """ + all_assignments = [] + + # Get only permission sets that are provisioned to this account + permission_sets = self.get_permission_sets_for_account(account_id) + + # Then get assignments for each provisioned permission set + for permission_set_arn in permission_sets: + assignments = self.get_account_assignments_for_permission_set( + permission_set_arn, account_id + ) + all_assignments.extend(assignments) + + logger.info("Found %d total assignments for account %s", len(all_assignments), account_id) + return all_assignments + + def get_group_details(self, group_id: str) -> Dict[str, Any]: + """Get group details including name and description.""" + try: + response = self.identitystore_client.describe_group( + IdentityStoreId=self.identity_store_id, GroupId=group_id + ) + return { + "GroupId": response["GroupId"], + "DisplayName": response["DisplayName"], + "Description": response.get("Description", ""), + } + except Exception as e: + logger.error("Error getting group details for %s: %s", group_id, e) + return {"GroupId": group_id, "DisplayName": "Unknown", "Description": ""} + + def get_group_members(self, group_id: str) -> List[Dict[str, Any]]: + """Get all members of a group.""" + try: + members = [] + paginator = self.identitystore_client.get_paginator("list_group_memberships") + + for page in paginator.paginate( + IdentityStoreId=self.identity_store_id, GroupId=group_id + ): + for membership in page["GroupMemberships"]: + user_id = membership["MemberId"]["UserId"] + user_details = self.get_user_details(user_id) + members.append(user_details) + + return members + except Exception as e: + logger.error("Error getting group members for %s: %s", group_id, e) + return [] + + def get_user_details(self, user_id: str) -> Dict[str, Any]: + """Get user details including username and display name.""" + try: + response = self.identitystore_client.describe_user( + IdentityStoreId=self.identity_store_id, UserId=user_id + ) + return { + "UserId": response["UserId"], + "UserName": response["UserName"], + "DisplayName": response.get("DisplayName", response["UserName"]), + "Email": safe_get_nested(response, ["Emails", 0, "Value"], ""), + } + except Exception as e: + logger.error("Error getting user details for %s: %s", user_id, e) + return {"UserId": user_id, "UserName": "Unknown", "DisplayName": "Unknown", "Email": ""} + + def get_permission_set_details(self, permission_set_arn: str) -> Dict[str, Any]: + """Get permission set details.""" + try: + response = self.sso_admin_client.describe_permission_set( + InstanceArn=self.instance_arn, PermissionSetArn=permission_set_arn + ) + return clean_aws_response(response["PermissionSet"]) + except Exception as e: + logger.error("Error getting permission set details for %s: %s", permission_set_arn, e) + return {} + + def get_permission_set_policies(self, permission_set_arn: str) -> Dict[str, Any]: + """Get all policies attached to a permission set.""" + policies = {"managed_policies": [], "customer_managed_policies": [], "inline_policy": None} + + try: + # Get AWS managed policies + managed_paginator = self.sso_admin_client.get_paginator( + "list_managed_policies_in_permission_set" + ) + for page in managed_paginator.paginate( + InstanceArn=self.instance_arn, PermissionSetArn=permission_set_arn + ): + policies["managed_policies"].extend(page["AttachedManagedPolicies"]) + + # Get customer managed policies + customer_paginator = self.sso_admin_client.get_paginator( + "list_customer_managed_policy_references_in_permission_set" + ) + for page in customer_paginator.paginate( + InstanceArn=self.instance_arn, PermissionSetArn=permission_set_arn + ): + for policy_ref in page["CustomerManagedPolicyReferences"]: + policy_details = self.get_customer_managed_policy_details(policy_ref) + policies["customer_managed_policies"].append(policy_details) + + # Get inline policy + try: + inline_response = self.sso_admin_client.get_inline_policy_for_permission_set( + InstanceArn=self.instance_arn, PermissionSetArn=permission_set_arn + ) + if inline_response.get("InlinePolicy"): + policies["inline_policy"] = json.loads(inline_response["InlinePolicy"]) + except self.sso_admin_client.exceptions.ResourceNotFoundException: + # No inline policy exists + pass + + except Exception as e: + logger.error("Error getting policies for permission set %s: %s", permission_set_arn, e) + + return policies + + def get_customer_managed_policy_details(self, policy_ref: Dict[str, Any]) -> Dict[str, Any]: + """Get details for customer managed policy.""" + try: + return { + "Name": policy_ref["Name"], + "Path": policy_ref.get("Path", "/"), + "Type": "CustomerManaged", + "Note": "Policy document not retrieved - requires target account access", + } + except Exception as e: + logger.error("Error getting customer managed policy details: %s", e) + return policy_ref diff --git a/cpk_lib_python_aws/aws_access_auditor/aws_client_manager.py b/cpk_lib_python_aws/aws_access_auditor/aws_client_manager.py new file mode 100644 index 0000000..71dd5f3 --- /dev/null +++ b/cpk_lib_python_aws/aws_access_auditor/aws_client_manager.py @@ -0,0 +1,86 @@ +# -*- coding: utf-8 -*- +"""AWS client management for SSO auditing - extends shared base.""" + +import logging +from typing import Any, Dict + +from ..shared import AWSBaseClient +from .config import Config +from .exceptions import AWSClientError, SSOInstanceNotFoundError + +logger = logging.getLogger(__name__) + + +class AWSClientManager(AWSBaseClient): + """Manages AWS clients specific to SSO auditing.""" + + def __init__(self, config: Config): + """Initialize AWS clients with SSO-specific configuration.""" + super().__init__(region=config.aws_region, profile=config.aws_profile) + self.config = config + + # SSO-specific clients + self.sso_admin_client = None + self.identitystore_client = None + self.organizations_client = None + + # SSO instance information + self.sso_instance = None + self.identity_store_id = None + self.instance_arn = None + + self._initialize_sso_clients() + + def _initialize_sso_clients(self) -> None: + """Initialize SSO-specific AWS clients.""" + try: + # Initialize AWS clients + self.sso_admin_client = self.session.client("sso-admin") + self.identitystore_client = self.session.client("identitystore") + self.organizations_client = self.session.client("organizations") + + logger.info("SSO-specific AWS clients initialized successfully") + + # Discover SSO instance + self._discover_sso_instance() + + except Exception as e: + logger.error("Failed to initialize SSO clients: %s", e) + raise AWSClientError(f"Error initializing SSO clients: {e}") from e + + def _discover_sso_instance(self) -> None: + """Discover and validate SSO instance.""" + try: + response = self.sso_admin_client.list_instances() + if not response["Instances"]: + raise SSOInstanceNotFoundError("No SSO instances found in this AWS account") + + self.sso_instance = response["Instances"][0] + self.identity_store_id = self.sso_instance["IdentityStoreId"] + self.instance_arn = self.sso_instance["InstanceArn"] + + logger.info("SSO instance discovered: %s", self.instance_arn) + + except Exception as e: + if "No SSO instances found" in str(e): + raise + logger.error("Failed to discover SSO instance: %s", e) + raise SSOInstanceNotFoundError(f"Failed to get SSO instance: {e}") from e + + def get_client_info(self) -> Dict[str, Any]: + """Get information about configured SSO clients.""" + base_info = { + "region": self.region, + "profile": self.profile, + "caller_identity": self.get_caller_identity(), + } + + sso_info = { + "sso_instance_arn": self.instance_arn, + "identity_store_id": self.identity_store_id, + "has_sso_admin": self.sso_admin_client is not None, + "has_identity_store": self.identitystore_client is not None, + "has_organizations": self.organizations_client is not None, + } + + return {**base_info, **sso_info} diff --git a/cpk_lib_python_aws/aws_access_auditor/cli.py b/cpk_lib_python_aws/aws_access_auditor/cli.py new file mode 100644 index 0000000..daf8a45 --- /dev/null +++ b/cpk_lib_python_aws/aws_access_auditor/cli.py @@ -0,0 +1,141 @@ +# -*- coding: utf-8 -*- +"""CLI interface for AWS SSO Auditor.""" + +import argparse +import logging +import sys +from typing import List + +from ..shared import OutputSink # <-- CHANGED: Import from shared instead of local +from .auditor import AWSSSOAuditor +from .config import Config +from .exceptions import AWSSSOAuditorError +from .formatters import OutputFormatter + + +def setup_logging(debug: bool = False, quiet: bool = False) -> None: + """Setup logging configuration.""" + if quiet: + level = logging.ERROR + elif debug: + level = logging.DEBUG + else: + level = logging.INFO + + logging.basicConfig( + level=level, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + handlers=[logging.FileHandler("aws_access_auditor.log"), logging.StreamHandler()], + ) + + +def create_parser() -> argparse.ArgumentParser: + """Create CLI argument parser.""" + parser = argparse.ArgumentParser( + description="Audit AWS SSO Groups and Permission Sets for an account", + prog="aws-access-auditor", + ) + + parser.add_argument("account_id", help="AWS Account ID to audit") + + parser.add_argument( + "--output-format", + choices=["json", "yaml", "both"], + default="both", + help="Output format (default: both)", + ) + + parser.add_argument( + "--output-dir", + default="./aws-sso-audit-results", + help="Output directory (default: ./aws-sso-audit-results)", + ) + + parser.add_argument("--aws-region", default="us-east-1", help="AWS region (default: us-east-1)") + + parser.add_argument("--aws-profile", help="AWS profile to use") + + parser.add_argument( + "--quiet", "-q", action="store_true", help="Suppress console output, only save files" + ) + + parser.add_argument("--debug", action="store_true", help="Enable debug logging") + + parser.add_argument( + "--no-timestamp", action="store_true", help="Don't include timestamp in filenames" + ) + + return parser + + +def main(args: List[str] = None) -> int: + """Main CLI entry point.""" + parser = create_parser() + parsed_args = parser.parse_args(args) + + # Setup logging + setup_logging(parsed_args.debug, parsed_args.quiet) + logger = logging.getLogger(__name__) + + # Create configuration + output_formats = ( + [parsed_args.output_format] if parsed_args.output_format != "both" else ["json", "yaml"] + ) + + config = Config( + aws_region=parsed_args.aws_region, + aws_profile=parsed_args.aws_profile, + output_formats=output_formats, + output_directory=parsed_args.output_dir, + include_timestamp=not parsed_args.no_timestamp, + debug=parsed_args.debug, + quiet=parsed_args.quiet, + ) + + # Create output sink for clean console management + output = OutputSink(config.quiet, config.debug) + + try: + # Initialize auditor and formatter + output.progress("Initializing AWS SSO Auditor...") + auditor = AWSSSOAuditor(config, output) + formatter = OutputFormatter(config, output) + + # Run audit + output.info(f"Starting audit for account: {parsed_args.account_id}") + logger.info("Starting audit for account: %s", parsed_args.account_id) + + results = auditor.audit_account(parsed_args.account_id) + + # Save results + output.progress("Saving results to files...") + saved_files = formatter.save_results(results, parsed_args.account_id) + logger.info("Results saved to: %s", ", ".join(saved_files)) + + # Display results using output sink + formatter.display_results(results) + output.success(f"Results saved to: {', '.join(saved_files)}") + + # Show summary in debug mode + if config.debug: + summary = results.get("summary", {}) + output.debug_info( + f"Processed {summary.get('total_groups', 0)} groups, " + f"{summary.get('total_permission_sets', 0)} permission sets" + ) + + logger.info("Audit completed successfully") + return 0 + + except AWSSSOAuditorError as e: + logger.error("AWS SSO Auditor Error: %s", e) + output.error(f"AWS SSO Auditor Error: {e}") + return 1 + except Exception as e: + logger.error("Unexpected error: %s", e) + output.error(f"Unexpected error: {e}") + return 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/cpk_lib_python_aws/aws_access_auditor/config.py b/cpk_lib_python_aws/aws_access_auditor/config.py new file mode 100644 index 0000000..aba5dbb --- /dev/null +++ b/cpk_lib_python_aws/aws_access_auditor/config.py @@ -0,0 +1,54 @@ +# -*- coding: utf-8 -*- +"""Configuration management for AWS SSO Auditor.""" + +import os +from dataclasses import dataclass +from typing import List, Optional + +from .exceptions import ConfigurationError + + +@dataclass +class Config: + """Configuration for AWS SSO Auditor.""" + + # AWS Configuration + aws_region: str = "us-east-1" + aws_profile: Optional[str] = None + timeout: int = 30 + + # Output Configuration + output_formats: List[str] = None + output_directory: str = "." + include_timestamp: bool = True + + # Behavior Configuration + debug: bool = False + quiet: bool = False + + def __post_init__(self): + """Initialize configuration from environment variables.""" + if self.output_formats is None: + self.output_formats = ["json", "yaml"] + + # Override with environment variables + self.aws_region = os.getenv("AWS_REGION", self.aws_region) + self.aws_profile = os.getenv("AWS_PROFILE", self.aws_profile) + + self.output_directory = os.getenv("AWS_ACCESS_AUDITOR_OUTPUT_DIR", self.output_directory) + if os.getenv("AWS_ACCESS_AUDITOR_DEBUG", "").lower() == "true": + self.debug = True + if os.getenv("AWS_ACCESS_AUDITOR_QUIET", "").lower() == "true": + self.quiet = True + + def validate(self) -> None: + """Validate configuration settings.""" + valid_formats = ["json", "yaml", "both"] + for fmt in self.output_formats: + if fmt not in valid_formats: + raise ConfigurationError( + f"Invalid output format: {fmt}. Must be one of {valid_formats}" + ) + + if self.timeout <= 0: + raise ConfigurationError("Timeout must be greater than 0") diff --git a/cpk_lib_python_aws/aws_access_auditor/exceptions.py b/cpk_lib_python_aws/aws_access_auditor/exceptions.py new file mode 100644 index 0000000..bc098f5 --- /dev/null +++ b/cpk_lib_python_aws/aws_access_auditor/exceptions.py @@ -0,0 +1,24 @@ +# -*- coding: utf-8 -*- +"""Custom exceptions for AWS SSO Auditor.""" + +from ..shared.exceptions import AWSError + + +class AWSSSOAuditorError(AWSError): + """Base exception for AWS SSO Auditor.""" + + +class InsufficientPermissionsError(AWSSSOAuditorError): + """Raised when insufficient AWS permissions.""" + + +class ConfigurationError(AWSSSOAuditorError): + """Raised when configuration is invalid.""" + + +class SSOInstanceNotFoundError(AWSSSOAuditorError): + """Raised when no SSO instance is found.""" + + +class AWSClientError(AWSSSOAuditorError): + """Raised when AWS client initialization fails.""" diff --git a/cpk_lib_python_aws/aws_access_auditor/formatters.py b/cpk_lib_python_aws/aws_access_auditor/formatters.py new file mode 100644 index 0000000..0411cb6 --- /dev/null +++ b/cpk_lib_python_aws/aws_access_auditor/formatters.py @@ -0,0 +1,118 @@ +# -*- coding: utf-8 -*- +"""Output formatting utilities for AWS SSO Auditor.""" + +import json +import logging +import os +from datetime import datetime +from typing import Any, Dict, List + +import yaml + +from .config import Config + +logger = logging.getLogger(__name__) + + +class OutputFormatter: + """Handles output formatting and file operations.""" + + def __init__(self, config: Config, output_sink=None): + """Initialize formatter with configuration and optional output sink.""" + self.config = config + self.output_sink = output_sink + self._ensure_output_directory() + + def _ensure_output_directory(self) -> None: + """Ensure output directory exists.""" + try: + os.makedirs(self.config.output_directory, exist_ok=True) + logger.debug("Output directory ensured: %s", self.config.output_directory) + except Exception as e: + logger.error( + "Failed to create output directory %s: %s", self.config.output_directory, e + ) + raise + + def save_results(self, data: Dict[str, Any], account_id: str) -> List[str]: + """Save results to files based on configuration.""" + saved_files = [] + + timestamp = ( + datetime.now().strftime("%Y%m%d_%H%M%S") if self.config.include_timestamp else "" + ) + + for format_type in self.config.output_formats: + if format_type in ["json", "both"]: + json_file = self._save_json(data, account_id, timestamp) + saved_files.append(json_file) + + if format_type in ["yaml", "both"]: + yaml_file = self._save_yaml(data, account_id, timestamp) + saved_files.append(yaml_file) + + return saved_files + + def _save_json(self, data: Dict[str, Any], account_id: str, timestamp: str) -> str: + """Save data as JSON file.""" + filename_parts = ["aws_sso_audit", account_id] + if timestamp: + filename_parts.append(timestamp) + filename = "_".join(filename_parts) + ".json" + + filepath = os.path.join(self.config.output_directory, filename) + + with open(filepath, "w", encoding="utf-8") as f: + json.dump(data, f, indent=2, default=str) + + logger.info("JSON results saved to: %s", filepath) + return filepath + + def _save_yaml(self, data: Dict[str, Any], account_id: str, timestamp: str) -> str: + """Save data as YAML file.""" + filename_parts = ["aws_sso_audit", account_id] + if timestamp: + filename_parts.append(timestamp) + filename = "_".join(filename_parts) + ".yaml" + + filepath = os.path.join(self.config.output_directory, filename) + + with open(filepath, "w", encoding="utf-8") as f: + yaml.dump(data, f, default_flow_style=False, sort_keys=False) + + logger.info("YAML results saved to: %s", filepath) + return filepath + + def display_results(self, data: Dict[str, Any]) -> None: + """Display results to console using output sink if available.""" + if self.config.quiet: + return + + if self.output_sink: + # Use output sink for clean display + self.output_sink.separator() + self.output_sink.info("AWS SSO AUDIT RESULTS") + self.output_sink.separator() + self.output_sink.print_raw(json.dumps(data, indent=2, default=str)) + else: + # Fallback to direct print (backward compatibility) + print("\n" + "=" * 80) + print("AWS SSO AUDIT RESULTS") + print("=" * 80) + print(json.dumps(data, indent=2, default=str)) + + def format_summary(self, data: Dict[str, Any]) -> str: + """Format a summary of audit results.""" + summary = data.get("summary", {}) + metadata = data.get("metadata", {}) + + lines = [ + "📊 AWS SSO Audit Summary", + f"🆔 Account: {metadata.get('account_id', 'Unknown')}", + f"📅 Generated: {metadata.get('generated_at', 'Unknown')}", + f"👥 Groups: {summary.get('total_groups', 0)}", + f"🔐 Permission Sets: {summary.get('total_permission_sets', 0)}", + f"🔗 Assignments: {summary.get('total_assignments', 0)}", + ] + + return "\n".join(lines) diff --git a/cpk_lib_python_aws/aws_access_auditor/utils.py b/cpk_lib_python_aws/aws_access_auditor/utils.py new file mode 100644 index 0000000..004a42a --- /dev/null +++ b/cpk_lib_python_aws/aws_access_auditor/utils.py @@ -0,0 +1,49 @@ +# -*- coding: utf-8 -*- +"""Utility functions for AWS SSO Auditor.""" + +from typing import Any, Dict, List + +from ..shared.utils import validate_account_id as base_validate_account_id + + +def validate_account_id(account_id: str) -> bool: + """Validate AWS account ID format (wrapper for shared function).""" + return base_validate_account_id(account_id) + + +def format_permission_set_arn(instance_arn: str, permission_set_name: str) -> str: + """Format permission set ARN from instance ARN and name.""" + parts = instance_arn.split(":") + if len(parts) >= 6: + account = parts[4] + instance_id = instance_arn.split("/")[-1] + return f"arn:aws:sso:::{account}:permissionSet/{instance_id}/{permission_set_name}" + return permission_set_name + + +def safe_get_nested(data: Dict[str, Any], keys: List[str], default: Any = None) -> Any: + """Safely get nested dictionary value.""" + current = data + for key in keys: + if isinstance(current, dict) and key in current: + current = current[key] + else: + return default + return current + + +def clean_aws_response(response: Dict[str, Any]) -> Dict[str, Any]: + """Clean AWS API response by removing metadata.""" + cleaned = response.copy() + # Remove common AWS metadata keys + metadata_keys = ["ResponseMetadata", "NextToken", "IsTruncated"] + for key in metadata_keys: + cleaned.pop(key, None) + return cleaned + + +def format_timestamp(timestamp) -> str: + """Format AWS timestamp for display.""" + if hasattr(timestamp, "isoformat"): + return timestamp.isoformat() + return str(timestamp) diff --git a/cpk_lib_python_aws/shared/__init__.py b/cpk_lib_python_aws/shared/__init__.py new file mode 100644 index 0000000..6c37377 --- /dev/null +++ b/cpk_lib_python_aws/shared/__init__.py @@ -0,0 +1,18 @@ +# -*- coding: utf-8 -*- +"""Shared AWS utilities and base classes.""" + +from .aws_base import AWSBaseClient +from .exceptions import AWSError, CredentialsError, PermissionsError, RegionError +from .output_sink import OutputSink +from .utils import get_aws_regions, validate_account_id + +__all__ = [ + "AWSBaseClient", + "AWSError", + "CredentialsError", + "RegionError", + "PermissionsError", # This matches your exceptions.py + "validate_account_id", + "get_aws_regions", + "OutputSink", +] diff --git a/cpk_lib_python_aws/shared/aws_base.py b/cpk_lib_python_aws/shared/aws_base.py new file mode 100644 index 0000000..b5c7740 --- /dev/null +++ b/cpk_lib_python_aws/shared/aws_base.py @@ -0,0 +1,45 @@ +# -*- coding: utf-8 -*- +"""Base AWS client management shared across packages.""" + +import logging +from abc import ABC +from typing import Any, Dict, Optional + +import boto3 + +from .exceptions import AWSError, CredentialsError + +logger = logging.getLogger(__name__) + + +class AWSBaseClient(ABC): + """Base class for AWS service clients with common functionality.""" + + def __init__(self, region: str = "us-east-1", profile: Optional[str] = None): + """Initialize base AWS client.""" + self.region = region + self.profile = profile + self.session = None + self._initialize_session() + + def _initialize_session(self) -> None: + """Initialize boto3 session with optional profile.""" + try: + session_kwargs = {"region_name": self.region} + if self.profile: + session_kwargs["profile_name"] = self.profile + + self.session = boto3.Session(**session_kwargs) + logger.info("AWS session initialized for region: %s", self.region) + + except Exception as e: + logger.error("Failed to initialize AWS session: %s", e) + raise CredentialsError(f"Failed to initialize AWS session: {e}") from e + + def get_caller_identity(self) -> Dict[str, Any]: + """Get current AWS caller identity.""" + try: + sts_client = self.session.client("sts") + return sts_client.get_caller_identity() + except Exception as e: + raise AWSError(f"Failed to get caller identity: {e}") from e diff --git a/cpk_lib_python_aws/shared/exceptions.py b/cpk_lib_python_aws/shared/exceptions.py new file mode 100644 index 0000000..d03b659 --- /dev/null +++ b/cpk_lib_python_aws/shared/exceptions.py @@ -0,0 +1,18 @@ +# -*- coding: utf-8 -*- +"""Shared AWS exceptions.""" + + +class AWSError(Exception): + """Base AWS error for all AWS-related exceptions.""" + + +class CredentialsError(AWSError): + """Raised when AWS credentials are invalid or missing.""" + + +class RegionError(AWSError): + """Raised when AWS region is invalid or unsupported.""" + + +class PermissionsError(AWSError): + """Raised when insufficient AWS permissions.""" diff --git a/cpk_lib_python_aws/shared/output_sink.py b/cpk_lib_python_aws/shared/output_sink.py new file mode 100644 index 0000000..7edf25c --- /dev/null +++ b/cpk_lib_python_aws/shared/output_sink.py @@ -0,0 +1,66 @@ +# -*- coding: utf-8 -*- +"""Shared output sink for managing console output across AWS tools.""" + +import sys + + +class OutputSink: + """Manages console output with different verbosity levels across AWS tools.""" + + def __init__(self, quiet: bool = False, debug: bool = False): + """Initialize output sink with verbosity settings.""" + self.quiet = quiet + self.debug = debug + + def info(self, message: str) -> None: + """Print informational message (suppressed in quiet mode).""" + if not self.quiet: + print(message) + + def success(self, message: str) -> None: + """Print success message (suppressed in quiet mode).""" + if not self.quiet: + print(f"✅ {message}") + + def warning(self, message: str) -> None: + """Print warning message (always shown unless quiet).""" + if not self.quiet: + print(f"⚠️ {message}") + + def error(self, message: str) -> None: + """Print error message (always shown, even in quiet mode).""" + print(f"❌ {message}", file=sys.stderr) + + def debug_info(self, message: str) -> None: + """Print debug message (only in debug mode).""" + if self.debug and not self.quiet: + print(f"🔍 {message}") + + def progress(self, message: str) -> None: + """Print progress message (only in debug mode).""" + if self.debug and not self.quiet: + print(f"⏳ {message}") + + def separator(self, char: str = "=", length: int = 80) -> None: + """Print separator line (suppressed in quiet mode).""" + if not self.quiet: + print(char * length) + + def print_raw(self, message: str, file=None) -> None: + """Print raw message without formatting (respects quiet mode for stdout).""" + if file == sys.stderr: + # Always print to stderr (errors) + print(message, file=file) + elif not self.quiet: + # Print to stdout only if not quiet + print(message, file=file) + + def metric(self, name: str, value: str) -> None: + """Print metric information (debug mode only).""" + if self.debug and not self.quiet: + print(f"📊 {name}: {value}") + + def timing(self, operation: str, duration: float) -> None: + """Print timing information (debug mode only).""" + if self.debug and not self.quiet: + print(f"⏱️ {operation}: {duration:.2f}s") diff --git a/cpk_lib_python_aws/shared/utils.py b/cpk_lib_python_aws/shared/utils.py new file mode 100644 index 0000000..51f3088 --- /dev/null +++ b/cpk_lib_python_aws/shared/utils.py @@ -0,0 +1,43 @@ +# -*- coding: utf-8 -*- +"""Shared AWS utility functions.""" +import re +from typing import List + +import boto3 + + +def validate_account_id(account_id: str) -> bool: + """Validate AWS account ID format.""" + if not account_id: + return False + + # AWS account IDs are 12-digit numbers + pattern = r"^\d{12}$" + return bool(re.match(pattern, account_id)) + + +def get_aws_regions() -> List[str]: + """Get list of AWS regions.""" + try: + ec2 = boto3.client("ec2", region_name="us-east-1") + response = ec2.describe_regions() + return [region["RegionName"] for region in response["Regions"]] + except Exception: + # Fallback to common regions if API call fails + return [ + "us-east-1", + "us-east-2", + "us-west-1", + "us-west-2", + "eu-west-1", + "eu-west-2", + "eu-central-1", + "ap-southeast-1", + "ap-southeast-2", + "ap-northeast-1", + ] + + +def format_arn(service: str, region: str, account_id: str, resource: str) -> str: + """Format AWS ARN.""" + return f"arn:aws:{service}:{region}:{account_id}:{resource}" diff --git a/cpk_lib_python_aws/tests/__init__.py b/cpk_lib_python_aws/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cpk_lib_python_aws/tests/aws_access_auditor/__init__.py b/cpk_lib_python_aws/tests/aws_access_auditor/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cpk_lib_python_aws/tests/aws_access_auditor/test_auditor.py b/cpk_lib_python_aws/tests/aws_access_auditor/test_auditor.py new file mode 100644 index 0000000..82b9c5f --- /dev/null +++ b/cpk_lib_python_aws/tests/aws_access_auditor/test_auditor.py @@ -0,0 +1,201 @@ +# -*- coding: utf-8 -*- +"""Tests for AWS Access Auditor module.""" +from unittest.mock import Mock, patch + +from cpk_lib_python_aws.aws_access_auditor.auditor import AWSSSOAuditor, NullOutputSink +from cpk_lib_python_aws.aws_access_auditor.config import Config + + +class TestNullOutputSink: + """Test the NullOutputSink class.""" + + def test_null_output_sink_methods(self): + """Test that all NullOutputSink methods can be called without error.""" + sink = NullOutputSink() + + # All methods should return None and not raise exceptions + assert sink.progress("test message") is None + assert sink.debug_info("test message") is None + assert sink.warning("test message") is None + assert sink.info("test message") is None + assert sink.error("test message") is None + + @patch("cpk_lib_python_aws.aws_access_auditor.auditor.AWSClientManager") + def test_auditor_initialization_with_default_config(self, mock_aws_manager): + """Test auditor initialization with default configuration.""" + # Mock the AWS client manager + mock_manager_instance = Mock() + mock_manager_instance.sso_admin_client = Mock() + mock_manager_instance.identitystore_client = Mock() + mock_manager_instance.organizations_client = Mock() + mock_manager_instance.identity_store_id = "d-123456789" + mock_manager_instance.instance_arn = "arn:aws:sso:::instance/ssoins-123456789" + mock_manager_instance.get_client_info.return_value = {"region": "us-east-1"} + mock_aws_manager.return_value = mock_manager_instance + + auditor = AWSSSOAuditor() + + # Verify initialization + assert auditor.config is not None + assert isinstance(auditor.output_sink, NullOutputSink) + assert auditor.identity_store_id == "d-123456789" + assert auditor.instance_arn == "arn:aws:sso:::instance/ssoins-123456789" + + @patch("cpk_lib_python_aws.aws_access_auditor.auditor.AWSClientManager") + def test_auditor_initialization_with_custom_config(self, mock_aws_manager): + """Test auditor initialization with custom configuration.""" + mock_manager_instance = Mock() + mock_manager_instance.sso_admin_client = Mock() + mock_manager_instance.identitystore_client = Mock() + mock_manager_instance.organizations_client = Mock() + mock_manager_instance.identity_store_id = "d-123456789" + mock_manager_instance.instance_arn = "arn:aws:sso:::instance/ssoins-123456789" + mock_manager_instance.get_client_info.return_value = {"region": "us-west-2"} + mock_aws_manager.return_value = mock_manager_instance + + config = Config(aws_region="us-west-2", debug=True) + output_sink = Mock() + + auditor = AWSSSOAuditor(config, output_sink) + + assert auditor.config.aws_region == "us-west-2" + assert auditor.config.debug is True + assert auditor.output_sink == output_sink + + @patch("cpk_lib_python_aws.aws_access_auditor.auditor.AWSClientManager") + def test_get_permission_sets_for_account_success(self, mock_aws_manager): + """Test successful retrieval of permission sets for account.""" + # Setup mocks + mock_manager_instance = Mock() + mock_sso_client = Mock() + mock_paginator = Mock() + + mock_sso_client.get_paginator.return_value = mock_paginator + mock_paginator.paginate.return_value = [ + { + "PermissionSets": [ + "arn:aws:sso:::permissionSet/ps-123", + "arn:aws:sso:::permissionSet/ps-456", + ] + } + ] + + mock_manager_instance.sso_admin_client = mock_sso_client + mock_manager_instance.identitystore_client = Mock() + mock_manager_instance.organizations_client = Mock() + mock_manager_instance.identity_store_id = "d-123456789" + mock_manager_instance.instance_arn = "arn:aws:sso:::instance/ssoins-123456789" + mock_manager_instance.get_client_info.return_value = {"region": "us-east-1"} + mock_aws_manager.return_value = mock_manager_instance + + auditor = AWSSSOAuditor() + result = auditor.get_permission_sets_for_account("123456789012") + + assert len(result) == 2 + assert "arn:aws:sso:::permissionSet/ps-123" in result + assert "arn:aws:sso:::permissionSet/ps-456" in result + + @patch("cpk_lib_python_aws.aws_access_auditor.auditor.AWSClientManager") + def test_get_permission_sets_for_account_failure(self, mock_aws_manager): + """Test handling of errors when retrieving permission sets.""" + # Setup mocks to raise exception + mock_manager_instance = Mock() + mock_sso_client = Mock() + mock_sso_client.get_paginator.side_effect = Exception("AWS API Error") + + mock_manager_instance.sso_admin_client = mock_sso_client + mock_manager_instance.identitystore_client = Mock() + mock_manager_instance.organizations_client = Mock() + mock_manager_instance.identity_store_id = "d-123456789" + mock_manager_instance.instance_arn = "arn:aws:sso:::instance/ssoins-123456789" + mock_manager_instance.get_client_info.return_value = {"region": "us-east-1"} + mock_aws_manager.return_value = mock_manager_instance + + auditor = AWSSSOAuditor() + result = auditor.get_permission_sets_for_account("123456789012") + + # Should return empty list on error + assert result == [] + + @patch("cpk_lib_python_aws.aws_access_auditor.auditor.AWSClientManager") + def test_get_group_details_success(self, mock_aws_manager): + """Test successful retrieval of group details.""" + mock_manager_instance = Mock() + mock_identity_client = Mock() + + mock_identity_client.describe_group.return_value = { + "GroupId": "group-123", + "DisplayName": "Test Group", + "Description": "A test group", + } + + mock_manager_instance.sso_admin_client = Mock() + mock_manager_instance.identitystore_client = mock_identity_client + mock_manager_instance.organizations_client = Mock() + mock_manager_instance.identity_store_id = "d-123456789" + mock_manager_instance.instance_arn = "arn:aws:sso:::instance/ssoins-123456789" + mock_manager_instance.get_client_info.return_value = {"region": "us-east-1"} + mock_aws_manager.return_value = mock_manager_instance + + auditor = AWSSSOAuditor() + result = auditor.get_group_details("group-123") + + assert result["GroupId"] == "group-123" + assert result["DisplayName"] == "Test Group" + assert result["Description"] == "A test group" + + @patch("cpk_lib_python_aws.aws_access_auditor.auditor.AWSClientManager") + def test_get_group_details_failure(self, mock_aws_manager): + """Test handling of errors when retrieving group details.""" + mock_manager_instance = Mock() + mock_identity_client = Mock() + mock_identity_client.describe_group.side_effect = Exception("Group not found") + + mock_manager_instance.sso_admin_client = Mock() + mock_manager_instance.identitystore_client = mock_identity_client + mock_manager_instance.organizations_client = Mock() + mock_manager_instance.identity_store_id = "d-123456789" + mock_manager_instance.instance_arn = "arn:aws:sso:::instance/ssoins-123456789" + mock_manager_instance.get_client_info.return_value = {"region": "us-east-1"} + mock_aws_manager.return_value = mock_manager_instance + + auditor = AWSSSOAuditor() + result = auditor.get_group_details("group-123") + + # Should return default values on error + assert result["GroupId"] == "group-123" + assert result["DisplayName"] == "Unknown" + assert result["Description"] == "" + + @patch("cpk_lib_python_aws.aws_access_auditor.auditor.AWSClientManager") + def test_audit_account_basic_flow(self, mock_aws_manager): + """Test basic audit_account flow with minimal data.""" + mock_manager_instance = Mock() + mock_sso_client = Mock() + mock_identity_client = Mock() + + # Mock get_all_account_assignments to return empty list + mock_manager_instance.sso_admin_client = mock_sso_client + mock_manager_instance.identitystore_client = mock_identity_client + mock_manager_instance.organizations_client = Mock() + mock_manager_instance.identity_store_id = "d-123456789" + mock_manager_instance.instance_arn = "arn:aws:sso:::instance/ssoins-123456789" + mock_manager_instance.get_client_info.return_value = {"region": "us-east-1"} + mock_aws_manager.return_value = mock_manager_instance + + auditor = AWSSSOAuditor() + + # Mock the get_permission_sets_for_account to return empty list + auditor.get_permission_sets_for_account = Mock(return_value=[]) + + result = auditor.audit_account("123456789012") + + # Verify basic structure + assert "metadata" in result + assert "sso_groups" in result + assert "permission_sets" in result + assert "summary" in result + assert result["metadata"]["account_id"] == "123456789012" + assert result["summary"]["total_groups"] == 0 + assert result["summary"]["total_permission_sets"] == 0 + assert result["summary"]["total_assignments"] == 0 diff --git a/cpk_lib_python_aws/tests/aws_access_auditor/test_cli.py b/cpk_lib_python_aws/tests/aws_access_auditor/test_cli.py new file mode 100644 index 0000000..330bc1e --- /dev/null +++ b/cpk_lib_python_aws/tests/aws_access_auditor/test_cli.py @@ -0,0 +1,344 @@ +# -*- coding: utf-8 -*- +"""Tests for AWS SSO Auditor CLI module.""" +import argparse +import logging +from unittest.mock import Mock, patch + +import pytest + +from cpk_lib_python_aws.aws_access_auditor.cli import create_parser, main, setup_logging +from cpk_lib_python_aws.aws_access_auditor.config import Config +from cpk_lib_python_aws.aws_access_auditor.exceptions import AWSSSOAuditorError + + +class TestSetupLogging: + """Test the setup_logging function.""" + + @patch("cpk_lib_python_aws.aws_access_auditor.cli.logging.basicConfig") + def test_setup_logging_default(self, mock_basic_config): + """Test setup_logging with default parameters.""" + setup_logging() + + mock_basic_config.assert_called_once() + call_args = mock_basic_config.call_args + assert call_args[1]["level"] == logging.INFO + assert "%(asctime)s - %(name)s - %(levelname)s - %(message)s" in call_args[1]["format"] + + @patch("cpk_lib_python_aws.aws_access_auditor.cli.logging.basicConfig") + def test_setup_logging_debug(self, mock_basic_config): + """Test setup_logging with debug enabled.""" + setup_logging(debug=True) + + call_args = mock_basic_config.call_args + assert call_args[1]["level"] == logging.DEBUG + + @patch("cpk_lib_python_aws.aws_access_auditor.cli.logging.basicConfig") + def test_setup_logging_quiet(self, mock_basic_config): + """Test setup_logging with quiet enabled.""" + setup_logging(quiet=True) + + call_args = mock_basic_config.call_args + assert call_args[1]["level"] == logging.ERROR + + +class TestCreateParser: + """Test the create_parser function.""" + + def test_create_parser_basic(self): + """Test that parser is created with correct structure.""" + parser = create_parser() + + assert isinstance(parser, argparse.ArgumentParser) + assert parser.prog == "aws-access-auditor" + + def test_parser_required_arguments(self): + """Test parsing with required arguments only.""" + parser = create_parser() + args = parser.parse_args(["123456789012"]) + + assert args.account_id == "123456789012" + assert args.output_format == "both" + assert args.output_dir == "./aws-sso-audit-results" + assert args.aws_region == "us-east-1" + assert args.aws_profile is None + assert args.quiet is False + assert args.debug is False + assert args.no_timestamp is False + + def test_parser_all_arguments(self): + """Test parsing with all arguments provided.""" + parser = create_parser() + args = parser.parse_args( + [ + "123456789012", + "--output-format", + "json", + "--output-dir", + "/tmp/results", + "--aws-region", + "us-west-2", + "--aws-profile", + "my-profile", + "--quiet", + "--debug", + "--no-timestamp", + ] + ) + + assert args.account_id == "123456789012" + assert args.output_format == "json" + assert args.output_dir == "/tmp/results" + assert args.aws_region == "us-west-2" + assert args.aws_profile == "my-profile" + assert args.quiet is True + assert args.debug is True + assert args.no_timestamp is True + + def test_parser_invalid_output_format(self): + """Test parser rejects invalid output format.""" + parser = create_parser() + + with pytest.raises(SystemExit): + parser.parse_args(["123456789012", "--output-format", "invalid"]) + + def test_parser_missing_account_id(self): + """Test parser requires account_id.""" + parser = create_parser() + + with pytest.raises(SystemExit): + parser.parse_args([]) + + +class TestMain: + """Test the main function.""" + + @patch("cpk_lib_python_aws.aws_access_auditor.cli.OutputFormatter") + @patch("cpk_lib_python_aws.aws_access_auditor.cli.AWSSSOAuditor") + @patch("cpk_lib_python_aws.aws_access_auditor.cli.OutputSink") + @patch("cpk_lib_python_aws.aws_access_auditor.cli.setup_logging") + def test_main_success(self, mock_setup_logging, mock_output_sink, mock_auditor, mock_formatter): + """Test successful main execution.""" + # Setup mocks + mock_output_instance = Mock() + mock_output_sink.return_value = mock_output_instance + + mock_auditor_instance = Mock() + mock_auditor_instance.audit_account.return_value = { + "metadata": {"account_id": "123456789012"}, + "summary": {"total_groups": 5, "total_permission_sets": 3}, + } + mock_auditor.return_value = mock_auditor_instance + + mock_formatter_instance = Mock() + mock_formatter_instance.save_results.return_value = ["file1.json", "file2.yaml"] + mock_formatter.return_value = mock_formatter_instance + + # Run main + result = main(["123456789012"]) + + # Verify success + assert result == 0 + mock_setup_logging.assert_called_once() + mock_auditor_instance.audit_account.assert_called_once_with("123456789012") + mock_formatter_instance.save_results.assert_called_once() + mock_formatter_instance.display_results.assert_called_once() + + @patch("cpk_lib_python_aws.aws_access_auditor.cli.OutputFormatter") + @patch("cpk_lib_python_aws.aws_access_auditor.cli.AWSSSOAuditor") + @patch("cpk_lib_python_aws.aws_access_auditor.cli.OutputSink") + @patch("cpk_lib_python_aws.aws_access_auditor.cli.setup_logging") + def test_main_with_custom_args( + self, mock_setup_logging, mock_output_sink, mock_auditor, mock_formatter + ): + """Test main with custom arguments.""" + # Setup mocks + mock_output_instance = Mock() + mock_output_sink.return_value = mock_output_instance + + mock_auditor_instance = Mock() + mock_auditor_instance.audit_account.return_value = { + "metadata": {"account_id": "123456789012"}, + "summary": {}, + } + mock_auditor.return_value = mock_auditor_instance + + mock_formatter_instance = Mock() + mock_formatter_instance.save_results.return_value = ["file1.json"] + mock_formatter.return_value = mock_formatter_instance + + # Run main with custom args + result = main( + ["123456789012", "--output-format", "json", "--aws-region", "eu-west-1", "--debug"] + ) + + # Verify + assert result == 0 + mock_setup_logging.assert_called_once_with(True, False) # debug=True, quiet=False + + @patch("cpk_lib_python_aws.aws_access_auditor.cli.AWSSSOAuditor") + @patch("cpk_lib_python_aws.aws_access_auditor.cli.OutputSink") + @patch("cpk_lib_python_aws.aws_access_auditor.cli.setup_logging") + def test_main_aws_access_auditor_error( + self, mock_setup_logging, mock_output_sink, mock_auditor + ): + """Test main handling AWSSSOAuditorError.""" + mock_output_instance = Mock() + mock_output_sink.return_value = mock_output_instance + + mock_auditor.side_effect = AWSSSOAuditorError("Test error") + + result = main(["123456789012"]) + + assert result == 1 + assert mock_setup_logging.called # Verify setup_logging was called + mock_output_instance.error.assert_called_with("AWS SSO Auditor Error: Test error") + + @patch("cpk_lib_python_aws.aws_access_auditor.cli.AWSSSOAuditor") + @patch("cpk_lib_python_aws.aws_access_auditor.cli.OutputSink") + @patch("cpk_lib_python_aws.aws_access_auditor.cli.setup_logging") + def test_main_unexpected_error(self, mock_setup_logging, mock_output_sink, mock_auditor): + """Test main handling unexpected errors.""" + mock_output_instance = Mock() + mock_output_sink.return_value = mock_output_instance + + mock_auditor.side_effect = Exception("Unexpected error") + + result = main(["123456789012"]) + + assert result == 1 + assert mock_setup_logging.called # Verify setup_logging was called + mock_output_instance.error.assert_called_with("Unexpected error: Unexpected error") + + @patch("cpk_lib_python_aws.aws_access_auditor.cli.OutputFormatter") + @patch("cpk_lib_python_aws.aws_access_auditor.cli.AWSSSOAuditor") + @patch("cpk_lib_python_aws.aws_access_auditor.cli.OutputSink") + @patch("cpk_lib_python_aws.aws_access_auditor.cli.setup_logging") + def test_main_config_creation( + self, mock_setup_logging, mock_output_sink, mock_auditor, mock_formatter + ): + """Test that Config is created correctly from CLI args.""" + mock_output_instance = Mock() + mock_output_sink.return_value = mock_output_instance + + mock_auditor_instance = Mock() + mock_auditor_instance.audit_account.return_value = { + "metadata": {"account_id": "123456789012"}, + "summary": {}, + } + mock_auditor.return_value = mock_auditor_instance + + mock_formatter_instance = Mock() + mock_formatter_instance.save_results.return_value = ["file1.json"] + mock_formatter.return_value = mock_formatter_instance + + result = main( + [ + "123456789012", + "--output-format", + "yaml", + "--output-dir", + "/custom/dir", + "--aws-region", + "ap-southeast-1", + "--aws-profile", + "test-profile", + "--no-timestamp", + "--quiet", + ] + ) + + # Verify Config was created with correct parameters + assert result == 0 + assert mock_setup_logging.called # Verify setup_logging was called + + # Check that auditor was called with a config + call_args = mock_auditor.call_args + config = call_args[0][0] # First argument should be config + + assert isinstance(config, Config) + assert config.aws_region == "ap-southeast-1" + assert config.aws_profile == "test-profile" + assert config.output_formats == ["yaml"] + assert config.output_directory == "/custom/dir" + assert config.include_timestamp is False # no-timestamp flag + assert config.quiet is True + + def test_main_both_output_format(self): + """Test that 'both' output format expands to json and yaml.""" + with patch("cpk_lib_python_aws.aws_access_auditor.cli.setup_logging"), patch( + "cpk_lib_python_aws.aws_access_auditor.cli.OutputSink" + ) as mock_output_sink, patch( + "cpk_lib_python_aws.aws_access_auditor.cli.AWSSSOAuditor" + ) as mock_auditor, patch( + "cpk_lib_python_aws.aws_access_auditor.cli.OutputFormatter" + ) as mock_formatter: + + mock_output_instance = Mock() + mock_output_sink.return_value = mock_output_instance + + mock_auditor_instance = Mock() + mock_auditor_instance.audit_account.return_value = { + "metadata": {"account_id": "123456789012"}, + "summary": {}, + } + mock_auditor.return_value = mock_auditor_instance + + mock_formatter_instance = Mock() + mock_formatter_instance.save_results.return_value = ["file1.json", "file2.yaml"] + mock_formatter.return_value = mock_formatter_instance + + result = main(["123456789012", "--output-format", "both"]) + + assert result == 0 + + # Verify config has both formats + call_args = mock_auditor.call_args + config = call_args[0][0] + assert set(config.output_formats) == {"json", "yaml"} + + def test_main_invalid_args(self): + """Test main with invalid arguments.""" + # This should exit due to argparse error + with pytest.raises(SystemExit): + main(["123456789012", "--invalid-arg"]) + + +class TestCLIIntegration: + """Integration-style tests for CLI components.""" + + def test_config_from_parser_args(self): + """Test creating Config from parsed arguments.""" + parser = create_parser() + args = parser.parse_args( + [ + "123456789012", + "--output-format", + "json", + "--output-dir", + "/test/dir", + "--aws-region", + "us-west-2", + "--debug", + "--no-timestamp", + ] + ) + + # This mimics what main() does with the args + output_formats = [args.output_format] if args.output_format != "both" else ["json", "yaml"] + + config = Config( + aws_region=args.aws_region, + aws_profile=args.aws_profile, + output_formats=output_formats, + output_directory=args.output_dir, + include_timestamp=not args.no_timestamp, + debug=args.debug, + quiet=args.quiet, + ) + + assert config.aws_region == "us-west-2" + assert config.output_formats == ["json"] + assert config.output_directory == "/test/dir" + assert config.include_timestamp is False + assert config.debug is True + assert config.quiet is False diff --git a/cpk_lib_python_aws/tests/aws_access_auditor/test_config.py b/cpk_lib_python_aws/tests/aws_access_auditor/test_config.py new file mode 100644 index 0000000..72d3c7d --- /dev/null +++ b/cpk_lib_python_aws/tests/aws_access_auditor/test_config.py @@ -0,0 +1,70 @@ +# -*- coding: utf-8 -*- +"""Tests for AWS Access Auditor configuration module.""" +import os + +import pytest + +from cpk_lib_python_aws.aws_access_auditor.config import Config +from cpk_lib_python_aws.aws_access_auditor.exceptions import ConfigurationError + + +def test_default_config_values(): + """Test that default configuration values are set correctly.""" + config = Config() + assert config.aws_region == "us-east-1" + assert config.output_formats == ["json", "yaml"] + assert config.output_directory == "." + assert config.include_timestamp is True + assert config.debug is False + assert config.quiet is False + assert config.timeout == 30 + assert config.aws_profile is None + + +def test_config_validation_valid_formats(): + """Test that valid output formats pass validation.""" + config = Config(output_formats=["json"]) + config.validate() + + config = Config(output_formats=["yaml"]) + config.validate() + + config = Config(output_formats=["both"]) + config.validate() + + +def test_config_validation_invalid_format(): + """Test that invalid output formats raise ConfigurationError.""" + config = Config(output_formats=["invalid"]) + with pytest.raises(ConfigurationError, match="Invalid output format: invalid"): + config.validate() + + +def test_environment_variable_override(): + """Test that environment variables override default values.""" + # Set environment variables + os.environ["AWS_REGION"] = "eu-west-1" + os.environ["AWS_ACCESS_AUDITOR_DEBUG"] = "true" + os.environ["AWS_ACCESS_AUDITOR_QUIET"] = "true" + + try: + config = Config() + assert config.aws_region == "eu-west-1" + assert config.debug is True + assert config.quiet is True + finally: + # Clean up environment variables + os.environ.pop("AWS_REGION", None) + os.environ.pop("AWS_ACCESS_AUDITOR_DEBUG", None) + os.environ.pop("AWS_ACCESS_AUDITOR_QUIET", None) + + +def test_constructor_overrides(): + """Test that constructor parameters override defaults.""" + config = Config( + aws_region="ap-southeast-1", output_directory="/tmp/test", debug=True, timeout=60 + ) + assert config.aws_region == "ap-southeast-1" + assert config.output_directory == "/tmp/test" + assert config.debug is True + assert config.timeout == 60 diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..7549f6a --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,123 @@ +[build-system] +requires = ["setuptools>=61.0", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "cpk-lib-python-aws" +version = "1.0.0" +description = "CPK Python AWS Library - Collection of professional AWS tools and utilities" +authors = [ + {name = "Your Name", email = "your.email@example.com"} +] +readme = "README.md" +license = {text = "GPL-3.0"} +requires-python = ">=3.8.1" +dependencies = [ + "boto3>=1.26.0", + "botocore>=1.29.0", + "pyyaml>=6.0", + "certifi>=2023.7.22", + "urllib3>=1.25.4,<2.0.0", # Changed this line +] + +[project.optional-dependencies] +test = [ + "pytest>=7.0.0", + "pytest-cov>=4.0.0", + "pytest-mock>=3.10.0", +] +dev = [ + "pytest>=7.0.0", + "pytest-cov>=4.0.0", + "black>=23.0.0", + "flake8>=6.0.0", # This should work with Python 3.8+ + "mypy>=1.0.0", + "pylint>=2.17.0", + "boto3-stubs[sso-admin,identitystore,organizations]>=1.26.0", +] + +[project.scripts] +aws-access-auditor = "cpk_lib_python_aws.aws_access_auditor.cli:main" + +[project.urls] +Homepage = "https://github.com/opencpk/cpk-lib-python-aws" +Repository = "https://github.com/opencpk/cpk-lib-python-aws" +Documentation = "https://github.com/opencpk/cpk-lib-python-aws#readme" + +[tool.setuptools.packages.find] +where = ["."] +include = ["cpk_lib_python_aws*"] + +[tool.black] +line-length = 100 +target-version = ['py38'] + +[tool.mypy] +python_version = "3.8" +warn_return_any = true +warn_unused_configs = true +disallow_untyped_defs = true + +# 🎯 PYLINT CONFIGURATION - This should fix your import issues +[tool.pylint.master] +# Add current directory to Python path for imports +init-hook = 'import sys; sys.path.append(".")' +# Don't cache results to avoid stale import issues +persistent = false + +[tool.pylint.format] +max-line-length = 100 + +[tool.pylint.messages_control] +disable = [ + "broad-exception-caught", + "too-many-arguments", + "too-many-locals", + "import-error", # This should fix your import issues + "no-name-in-module", # Also helps with module import issues + "wrong-import-order", # Less strict about import order + "ungrouped-imports", # Allow flexible import grouping + "too-few-public-methods", + "invalid-name", +] + +[tool.pylint.design] +max-statements = 60 +max-module-lines = 1200 +max-attributes = 15 # Allow more class attributes +max-public-methods = 25 # Allow more public methods + +[tool.pylint.typecheck] +# Don't complain about missing members in these modules +ignored-modules = [ + "boto3", + "botocore", + "yaml", + "cpk_lib_python_aws", +] + +[tool.pylint.imports] +# Allow relative imports within the package +allow-any-import-level = true + + +[tool.pytest.ini_options] +testpaths = [ + "cpk_lib_python_aws/tests" +] +python_files = ["test_*.py"] +python_classes = ["Test*"] +python_functions = ["test_*"] +addopts = [ + "--strict-markers", + "--strict-config", + "--verbose", + "--cov=cpk_lib_python_aws", + "--cov-report=term-missing", + "--cov-report=html" +] +markers = [ + "unit: Unit tests with mocked responses", + "integration: Integration tests with real AWS API calls", + "slow: Slow running tests" +]