Code Security

Why Annual Pentesting Fails Fast-Moving Teams (And What Replaces It)

Amartya | CodeAnt AI Code Review Platform
Sonali Sood

Founding GTM, CodeAnt AI

Most teams still run penetration testing once a year. But their applications don’t change once a year. They change every week, new endpoints, updated authentication flows, third-party integrations, infrastructure changes.

That mismatch creates a structural problem. Security is being tested at one cadence, while risk is being introduced at another.

The result is what we can call the deployment velocity gap, the time between when a vulnerability enters the system and when it is actually detected.

In an annual testing model, that gap can stretch for months. A system may be “secure” at the moment of testing, but every change that follows creates new, untested surface area. By the time the next test arrives, the application has already evolved far beyond what was originally evaluated.

This is not a failure of pentesting itself. It’s a mismatch between how often systems change and how often they are tested.

To understand why this gap exists, and why it continues to grow in modern engineering teams, we need to look at how annual penetration testing actually works in practice, and what it does (and does not) cover.

The Problem with Annual Pentesting

Annual penetration testing gives you a snapshot of your security posture at a single point in time. That’s its core limitation. It evaluates the system as it exists during the testing window, not as it evolves afterward.

At the time of the test, the findings are valid. Vulnerabilities are identified, reports are generated, and remediation begins. On paper, the system looks secure.

But the system does not stay static. In the weeks and months following the test:

  • new features are deployed

  • APIs are modified

  • authentication flows change

  • dependencies are updated

  • infrastructure is reconfigured

Each of these changes introduces new code paths and new attack surfaces. None of them are part of the original assessment. This creates a false sense of security. The system has been tested, but only in its previous state. What exists in production today is already different from what was evaluated.

There is also a second limitation. Annual pentesting is inherently time-bound. Testers operate within a fixed engagement window, typically one to two weeks. Within that timeframe, they prioritize high-impact areas, explore likely attack paths, and validate findings.

But no matter how skilled the testers are, coverage is limited by time. Edge cases, rarely triggered flows, and complex multi-step vulnerabilities often fall outside that window. So the model has two constraints:

  • it tests only a moment in time

  • it tests only a portion of the system

As systems grow more complex and deployment frequency increases, these constraints become more significant.

Security risk is no longer tied to what was tested. It is tied to what has changed since the test. Which leads to a more practical question:

If vulnerabilities are being introduced continuously, how large is the gap between introduction and detection in your system? That is what we need to measure next.

How to Measure Your Organization's Gap

The deployment velocity gap is the time between when a vulnerability is introduced into production and when it is detected by security testing. For annual penetration testing programs, the maximum gap is 365 days. The average gap, given that vulnerabilities are introduced continuously throughout the year, is approximately 180 days, half the testing interval.

This isn't abstract. You can calculate your organization's specific risk exposure:

import datetime
from dataclasses import dataclass, field
from typing import List, Dict
import statistics

@dataclass
class DeploymentEvent:
    date: datetime.date
    commit_count: int
    files_changed: int
    security_relevant_changes: int  # Auth, API, DB layer changes
    introduced_vulnerabilities: int  # Known from retrospective analysis

@dataclass
class SecurityTestEvent:
    date: datetime.date
    test_type: str  # 'annual', 'quarterly', 'monthly', 'continuous'
    findings_count: int
    critical_findings: int

def calculate_deployment_velocity_gap(
    deployment_history: List[DeploymentEvent],
    security_test_history: List[SecurityTestEvent],
    year: int
) -> Dict:
    """
    Calculate the actual deployment velocity gap for an organization.
    Uses historical deployment and testing data to quantify exposure windows.
    """

    # Filter to the analysis year
    deployments = [d for d in deployment_history
                   if d.date.year == year]
    tests = [t for t in security_test_history
             if t.date.year == year]
    tests.sort(key=lambda x: x.date)

    if not tests:
        return {'error': 'No security tests in period'}

    # Calculate testing intervals
    test_dates = [t.date for t in tests]
    intervals = []

    if len(test_dates) > 1:
        for i in range(1, len(test_dates)):
            gap_days = (test_dates[i] - test_dates[i-1]).days
            intervals.append(gap_days)

    # Add gap from last test to end of year (remaining exposure)
    year_end = datetime.date(year, 12, 31)
    remaining = (year_end - test_dates[-1]).days
    intervals.append(remaining)

    # Calculate deployments in each gap window
    gap_analysis = []

    window_start = datetime.date(year, 1, 1)
    for i, test in enumerate(tests):
        window_end = test.date

        # Count deployments in this window
        window_deployments = [
            d for d in deployments
            if window_start <= d.date < window_end
        ]

        security_changes = sum(d.security_relevant_changes for d in window_deployments)
        total_commits = sum(d.commit_count for d in window_deployments)

        # Estimate vulnerability introduction rate
        # Industry baseline: ~1 security-relevant bug per 1000 LOC changed
        # Security-relevant changes have higher density
        estimated_vulns_introduced = security_changes * 0.05  # 5% introduce a finding

        gap_analysis.append({
            'window_start': window_start.isoformat(),
            'window_end': window_end.isoformat(),
            'gap_days': (window_end - window_start).days,
            'deployments_in_window': len(window_deployments),
            'total_commits': total_commits,
            'security_relevant_changes': security_changes,
            'estimated_vulns_introduced': round(estimated_vulns_introduced, 1),
            'test_at_end': test.test_type,
            'findings_at_test': test.findings_count
        })

        window_start = test.date

    # Calculate aggregate metrics
    total_gap_days = sum(g['gap_days'] for g in gap_analysis)
    max_gap = max(g['gap_days'] for g in gap_analysis)
    avg_gap = statistics.mean(g['gap_days'] for g in gap_analysis)
    total_security_changes_untested = sum(
        g['security_relevant_changes'] for g in gap_analysis
    )

    return {
        'year': year,
        'total_deployments': len(deployments),
        'total_security_tests': len(tests),
        'testing_intervals': intervals,
        'max_exposure_window_days': max_gap,
        'average_exposure_window_days': round(avg_gap, 0),
        'total_security_relevant_changes': sum(
            d.security_relevant_changes for d in deployments
        ),
        'estimated_total_vulns_introduced': round(
            sum(d.security_relevant_changes for d in deployments) * 0.05, 0
        ),
        'gap_analysis': gap_analysis,
        'risk_assessment': classify_gap_risk(max_gap, avg_gap)
    }

def classify_gap_risk(max_gap: int, avg_gap: float) -> str:
    if max_gap > 180 or avg_gap > 90:
        return 'CRITICAL — Unacceptably long exposure windows'
    elif max_gap > 90 or avg_gap > 45:
        return 'HIGH — Significant exposure windows between tests'
    elif max_gap > 30 or avg_gap > 14:
        return 'MEDIUM — Manageable with process discipline'
    else:
        return 'LOW — Continuous testing adequately covers deployment velocity'
import datetime
from dataclasses import dataclass, field
from typing import List, Dict
import statistics

@dataclass
class DeploymentEvent:
    date: datetime.date
    commit_count: int
    files_changed: int
    security_relevant_changes: int  # Auth, API, DB layer changes
    introduced_vulnerabilities: int  # Known from retrospective analysis

@dataclass
class SecurityTestEvent:
    date: datetime.date
    test_type: str  # 'annual', 'quarterly', 'monthly', 'continuous'
    findings_count: int
    critical_findings: int

def calculate_deployment_velocity_gap(
    deployment_history: List[DeploymentEvent],
    security_test_history: List[SecurityTestEvent],
    year: int
) -> Dict:
    """
    Calculate the actual deployment velocity gap for an organization.
    Uses historical deployment and testing data to quantify exposure windows.
    """

    # Filter to the analysis year
    deployments = [d for d in deployment_history
                   if d.date.year == year]
    tests = [t for t in security_test_history
             if t.date.year == year]
    tests.sort(key=lambda x: x.date)

    if not tests:
        return {'error': 'No security tests in period'}

    # Calculate testing intervals
    test_dates = [t.date for t in tests]
    intervals = []

    if len(test_dates) > 1:
        for i in range(1, len(test_dates)):
            gap_days = (test_dates[i] - test_dates[i-1]).days
            intervals.append(gap_days)

    # Add gap from last test to end of year (remaining exposure)
    year_end = datetime.date(year, 12, 31)
    remaining = (year_end - test_dates[-1]).days
    intervals.append(remaining)

    # Calculate deployments in each gap window
    gap_analysis = []

    window_start = datetime.date(year, 1, 1)
    for i, test in enumerate(tests):
        window_end = test.date

        # Count deployments in this window
        window_deployments = [
            d for d in deployments
            if window_start <= d.date < window_end
        ]

        security_changes = sum(d.security_relevant_changes for d in window_deployments)
        total_commits = sum(d.commit_count for d in window_deployments)

        # Estimate vulnerability introduction rate
        # Industry baseline: ~1 security-relevant bug per 1000 LOC changed
        # Security-relevant changes have higher density
        estimated_vulns_introduced = security_changes * 0.05  # 5% introduce a finding

        gap_analysis.append({
            'window_start': window_start.isoformat(),
            'window_end': window_end.isoformat(),
            'gap_days': (window_end - window_start).days,
            'deployments_in_window': len(window_deployments),
            'total_commits': total_commits,
            'security_relevant_changes': security_changes,
            'estimated_vulns_introduced': round(estimated_vulns_introduced, 1),
            'test_at_end': test.test_type,
            'findings_at_test': test.findings_count
        })

        window_start = test.date

    # Calculate aggregate metrics
    total_gap_days = sum(g['gap_days'] for g in gap_analysis)
    max_gap = max(g['gap_days'] for g in gap_analysis)
    avg_gap = statistics.mean(g['gap_days'] for g in gap_analysis)
    total_security_changes_untested = sum(
        g['security_relevant_changes'] for g in gap_analysis
    )

    return {
        'year': year,
        'total_deployments': len(deployments),
        'total_security_tests': len(tests),
        'testing_intervals': intervals,
        'max_exposure_window_days': max_gap,
        'average_exposure_window_days': round(avg_gap, 0),
        'total_security_relevant_changes': sum(
            d.security_relevant_changes for d in deployments
        ),
        'estimated_total_vulns_introduced': round(
            sum(d.security_relevant_changes for d in deployments) * 0.05, 0
        ),
        'gap_analysis': gap_analysis,
        'risk_assessment': classify_gap_risk(max_gap, avg_gap)
    }

def classify_gap_risk(max_gap: int, avg_gap: float) -> str:
    if max_gap > 180 or avg_gap > 90:
        return 'CRITICAL — Unacceptably long exposure windows'
    elif max_gap > 90 or avg_gap > 45:
        return 'HIGH — Significant exposure windows between tests'
    elif max_gap > 30 or avg_gap > 14:
        return 'MEDIUM — Manageable with process discipline'
    else:
        return 'LOW — Continuous testing adequately covers deployment velocity'
import datetime
from dataclasses import dataclass, field
from typing import List, Dict
import statistics

@dataclass
class DeploymentEvent:
    date: datetime.date
    commit_count: int
    files_changed: int
    security_relevant_changes: int  # Auth, API, DB layer changes
    introduced_vulnerabilities: int  # Known from retrospective analysis

@dataclass
class SecurityTestEvent:
    date: datetime.date
    test_type: str  # 'annual', 'quarterly', 'monthly', 'continuous'
    findings_count: int
    critical_findings: int

def calculate_deployment_velocity_gap(
    deployment_history: List[DeploymentEvent],
    security_test_history: List[SecurityTestEvent],
    year: int
) -> Dict:
    """
    Calculate the actual deployment velocity gap for an organization.
    Uses historical deployment and testing data to quantify exposure windows.
    """

    # Filter to the analysis year
    deployments = [d for d in deployment_history
                   if d.date.year == year]
    tests = [t for t in security_test_history
             if t.date.year == year]
    tests.sort(key=lambda x: x.date)

    if not tests:
        return {'error': 'No security tests in period'}

    # Calculate testing intervals
    test_dates = [t.date for t in tests]
    intervals = []

    if len(test_dates) > 1:
        for i in range(1, len(test_dates)):
            gap_days = (test_dates[i] - test_dates[i-1]).days
            intervals.append(gap_days)

    # Add gap from last test to end of year (remaining exposure)
    year_end = datetime.date(year, 12, 31)
    remaining = (year_end - test_dates[-1]).days
    intervals.append(remaining)

    # Calculate deployments in each gap window
    gap_analysis = []

    window_start = datetime.date(year, 1, 1)
    for i, test in enumerate(tests):
        window_end = test.date

        # Count deployments in this window
        window_deployments = [
            d for d in deployments
            if window_start <= d.date < window_end
        ]

        security_changes = sum(d.security_relevant_changes for d in window_deployments)
        total_commits = sum(d.commit_count for d in window_deployments)

        # Estimate vulnerability introduction rate
        # Industry baseline: ~1 security-relevant bug per 1000 LOC changed
        # Security-relevant changes have higher density
        estimated_vulns_introduced = security_changes * 0.05  # 5% introduce a finding

        gap_analysis.append({
            'window_start': window_start.isoformat(),
            'window_end': window_end.isoformat(),
            'gap_days': (window_end - window_start).days,
            'deployments_in_window': len(window_deployments),
            'total_commits': total_commits,
            'security_relevant_changes': security_changes,
            'estimated_vulns_introduced': round(estimated_vulns_introduced, 1),
            'test_at_end': test.test_type,
            'findings_at_test': test.findings_count
        })

        window_start = test.date

    # Calculate aggregate metrics
    total_gap_days = sum(g['gap_days'] for g in gap_analysis)
    max_gap = max(g['gap_days'] for g in gap_analysis)
    avg_gap = statistics.mean(g['gap_days'] for g in gap_analysis)
    total_security_changes_untested = sum(
        g['security_relevant_changes'] for g in gap_analysis
    )

    return {
        'year': year,
        'total_deployments': len(deployments),
        'total_security_tests': len(tests),
        'testing_intervals': intervals,
        'max_exposure_window_days': max_gap,
        'average_exposure_window_days': round(avg_gap, 0),
        'total_security_relevant_changes': sum(
            d.security_relevant_changes for d in deployments
        ),
        'estimated_total_vulns_introduced': round(
            sum(d.security_relevant_changes for d in deployments) * 0.05, 0
        ),
        'gap_analysis': gap_analysis,
        'risk_assessment': classify_gap_risk(max_gap, avg_gap)
    }

def classify_gap_risk(max_gap: int, avg_gap: float) -> str:
    if max_gap > 180 or avg_gap > 90:
        return 'CRITICAL — Unacceptably long exposure windows'
    elif max_gap > 90 or avg_gap > 45:
        return 'HIGH — Significant exposure windows between tests'
    elif max_gap > 30 or avg_gap > 14:
        return 'MEDIUM — Manageable with process discipline'
    else:
        return 'LOW — Continuous testing adequately covers deployment velocity'

The Attack Surface Drift Model

Attack surface doesn't just grow, it drifts. New endpoints get added. Old endpoints get modified. Authentication logic gets refactored. Infrastructure gets reconfigured. The attack surface tested in January is materially different from the attack surface that exists in August.

@dataclass
class AttackSurfaceSnapshot:
    """Represents the attack surface at a point in time"""
    date: datetime.date
    api_endpoints: set          # All discoverable API endpoints
    auth_mechanisms: set        # Auth patterns in use (JWT, session, API key)
    external_integrations: set  # Third-party services integrated
    open_ports: set             # Exposed network ports
    subdomains: set             # Discoverable subdomains
    javascript_bundles: list    # Frontend bundle hashes
    dependencies: dict          # {package: version} dependency map
    service_accounts: set       # Cloud/K8s service accounts

def calculate_attack_surface_drift(
    snapshot_a: AttackSurfaceSnapshot,
    snapshot_b: AttackSurfaceSnapshot
) -> Dict:
    """
    Quantify how much the attack surface changed between two snapshots.
    Each change is a potential security regression point.
    """

    drift = {}

    # New endpoints added since last test
    new_endpoints = snapshot_b.api_endpoints - snapshot_a.api_endpoints
    removed_endpoints = snapshot_a.api_endpoints - snapshot_b.api_endpoints

    drift['api_endpoints'] = {
        'added': list(new_endpoints),
        'removed': list(removed_endpoints),
        'net_change': len(new_endpoints) - len(removed_endpoints),
        'security_risk': 'Each new endpoint is untested attack surface',
        'high_risk_additions': [
            ep for ep in new_endpoints
            if any(word in ep.lower() for word in
                   ['admin', 'export', 'delete', 'upload', 'import', 'webhook'])
        ]
    }

    # New authentication mechanisms
    new_auth = snapshot_b.auth_mechanisms - snapshot_a.auth_mechanisms
    drift['auth_mechanisms'] = {
        'added': list(new_auth),
        'security_risk': 'New auth mechanisms may have configuration vulnerabilities'
            if new_auth else 'None',
    }

    # New external integrations (new credential chains)
    new_integrations = snapshot_b.external_integrations - snapshot_a.external_integrations
    drift['external_integrations'] = {
        'added': list(new_integrations),
        'security_risk': 'Each integration adds credential exposure surface and SSRF targets',
        'count': len(new_integrations)
    }

    # New subdomains (potential subdomain takeover targets, broader CORS surface)
    new_subdomains = snapshot_b.subdomains - snapshot_a.subdomains
    drift['subdomains'] = {
        'added': list(new_subdomains),
        'count': len(new_subdomains)
    }

    # Dependency changes (new CVE exposure)
    dep_changes = {}
    for pkg, version in snapshot_b.dependencies.items():
        if pkg not in snapshot_a.dependencies:
            dep_changes[pkg] = {'status': 'added', 'version': version}
        elif snapshot_a.dependencies[pkg] != version:
            dep_changes[pkg] = {
                'status': 'updated',
                'old_version': snapshot_a.dependencies[pkg],
                'new_version': version
            }

    for pkg in snapshot_a.dependencies:
        if pkg not in snapshot_b.dependencies:
            dep_changes[pkg] = {'status': 'removed'}

    drift['dependencies'] = {
        'changes': dep_changes,
        'new_packages': sum(1 for v in dep_changes.values() if v['status'] == 'added'),
        'updated_packages': sum(1 for v in dep_changes.values() if v['status'] == 'updated'),
        'removed_packages': sum(1 for v in dep_changes.values() if v['status'] == 'removed')
    }

    # Risk score for the drift period
    drift_risk_score = (
        len(new_endpoints) * 3 +
        len(drift['api_endpoints']['high_risk_additions']) * 5 +
        len(new_auth) * 8 +
        len(new_integrations) * 4 +
        len(new_subdomains) * 2 +
        drift['dependencies']['new_packages'] * 1
    )

    drift['overall_drift_risk_score'] = drift_risk_score
    drift['risk_classification'] = (
        'CRITICAL' if drift_risk_score > 50 else
        'HIGH' if drift_risk_score > 25 else
        'MEDIUM' if drift_risk_score > 10 else
        'LOW'
    )
    drift['days_since_last_test'] = (snapshot_b.date - snapshot_a.date).days

    return drift
@dataclass
class AttackSurfaceSnapshot:
    """Represents the attack surface at a point in time"""
    date: datetime.date
    api_endpoints: set          # All discoverable API endpoints
    auth_mechanisms: set        # Auth patterns in use (JWT, session, API key)
    external_integrations: set  # Third-party services integrated
    open_ports: set             # Exposed network ports
    subdomains: set             # Discoverable subdomains
    javascript_bundles: list    # Frontend bundle hashes
    dependencies: dict          # {package: version} dependency map
    service_accounts: set       # Cloud/K8s service accounts

def calculate_attack_surface_drift(
    snapshot_a: AttackSurfaceSnapshot,
    snapshot_b: AttackSurfaceSnapshot
) -> Dict:
    """
    Quantify how much the attack surface changed between two snapshots.
    Each change is a potential security regression point.
    """

    drift = {}

    # New endpoints added since last test
    new_endpoints = snapshot_b.api_endpoints - snapshot_a.api_endpoints
    removed_endpoints = snapshot_a.api_endpoints - snapshot_b.api_endpoints

    drift['api_endpoints'] = {
        'added': list(new_endpoints),
        'removed': list(removed_endpoints),
        'net_change': len(new_endpoints) - len(removed_endpoints),
        'security_risk': 'Each new endpoint is untested attack surface',
        'high_risk_additions': [
            ep for ep in new_endpoints
            if any(word in ep.lower() for word in
                   ['admin', 'export', 'delete', 'upload', 'import', 'webhook'])
        ]
    }

    # New authentication mechanisms
    new_auth = snapshot_b.auth_mechanisms - snapshot_a.auth_mechanisms
    drift['auth_mechanisms'] = {
        'added': list(new_auth),
        'security_risk': 'New auth mechanisms may have configuration vulnerabilities'
            if new_auth else 'None',
    }

    # New external integrations (new credential chains)
    new_integrations = snapshot_b.external_integrations - snapshot_a.external_integrations
    drift['external_integrations'] = {
        'added': list(new_integrations),
        'security_risk': 'Each integration adds credential exposure surface and SSRF targets',
        'count': len(new_integrations)
    }

    # New subdomains (potential subdomain takeover targets, broader CORS surface)
    new_subdomains = snapshot_b.subdomains - snapshot_a.subdomains
    drift['subdomains'] = {
        'added': list(new_subdomains),
        'count': len(new_subdomains)
    }

    # Dependency changes (new CVE exposure)
    dep_changes = {}
    for pkg, version in snapshot_b.dependencies.items():
        if pkg not in snapshot_a.dependencies:
            dep_changes[pkg] = {'status': 'added', 'version': version}
        elif snapshot_a.dependencies[pkg] != version:
            dep_changes[pkg] = {
                'status': 'updated',
                'old_version': snapshot_a.dependencies[pkg],
                'new_version': version
            }

    for pkg in snapshot_a.dependencies:
        if pkg not in snapshot_b.dependencies:
            dep_changes[pkg] = {'status': 'removed'}

    drift['dependencies'] = {
        'changes': dep_changes,
        'new_packages': sum(1 for v in dep_changes.values() if v['status'] == 'added'),
        'updated_packages': sum(1 for v in dep_changes.values() if v['status'] == 'updated'),
        'removed_packages': sum(1 for v in dep_changes.values() if v['status'] == 'removed')
    }

    # Risk score for the drift period
    drift_risk_score = (
        len(new_endpoints) * 3 +
        len(drift['api_endpoints']['high_risk_additions']) * 5 +
        len(new_auth) * 8 +
        len(new_integrations) * 4 +
        len(new_subdomains) * 2 +
        drift['dependencies']['new_packages'] * 1
    )

    drift['overall_drift_risk_score'] = drift_risk_score
    drift['risk_classification'] = (
        'CRITICAL' if drift_risk_score > 50 else
        'HIGH' if drift_risk_score > 25 else
        'MEDIUM' if drift_risk_score > 10 else
        'LOW'
    )
    drift['days_since_last_test'] = (snapshot_b.date - snapshot_a.date).days

    return drift
@dataclass
class AttackSurfaceSnapshot:
    """Represents the attack surface at a point in time"""
    date: datetime.date
    api_endpoints: set          # All discoverable API endpoints
    auth_mechanisms: set        # Auth patterns in use (JWT, session, API key)
    external_integrations: set  # Third-party services integrated
    open_ports: set             # Exposed network ports
    subdomains: set             # Discoverable subdomains
    javascript_bundles: list    # Frontend bundle hashes
    dependencies: dict          # {package: version} dependency map
    service_accounts: set       # Cloud/K8s service accounts

def calculate_attack_surface_drift(
    snapshot_a: AttackSurfaceSnapshot,
    snapshot_b: AttackSurfaceSnapshot
) -> Dict:
    """
    Quantify how much the attack surface changed between two snapshots.
    Each change is a potential security regression point.
    """

    drift = {}

    # New endpoints added since last test
    new_endpoints = snapshot_b.api_endpoints - snapshot_a.api_endpoints
    removed_endpoints = snapshot_a.api_endpoints - snapshot_b.api_endpoints

    drift['api_endpoints'] = {
        'added': list(new_endpoints),
        'removed': list(removed_endpoints),
        'net_change': len(new_endpoints) - len(removed_endpoints),
        'security_risk': 'Each new endpoint is untested attack surface',
        'high_risk_additions': [
            ep for ep in new_endpoints
            if any(word in ep.lower() for word in
                   ['admin', 'export', 'delete', 'upload', 'import', 'webhook'])
        ]
    }

    # New authentication mechanisms
    new_auth = snapshot_b.auth_mechanisms - snapshot_a.auth_mechanisms
    drift['auth_mechanisms'] = {
        'added': list(new_auth),
        'security_risk': 'New auth mechanisms may have configuration vulnerabilities'
            if new_auth else 'None',
    }

    # New external integrations (new credential chains)
    new_integrations = snapshot_b.external_integrations - snapshot_a.external_integrations
    drift['external_integrations'] = {
        'added': list(new_integrations),
        'security_risk': 'Each integration adds credential exposure surface and SSRF targets',
        'count': len(new_integrations)
    }

    # New subdomains (potential subdomain takeover targets, broader CORS surface)
    new_subdomains = snapshot_b.subdomains - snapshot_a.subdomains
    drift['subdomains'] = {
        'added': list(new_subdomains),
        'count': len(new_subdomains)
    }

    # Dependency changes (new CVE exposure)
    dep_changes = {}
    for pkg, version in snapshot_b.dependencies.items():
        if pkg not in snapshot_a.dependencies:
            dep_changes[pkg] = {'status': 'added', 'version': version}
        elif snapshot_a.dependencies[pkg] != version:
            dep_changes[pkg] = {
                'status': 'updated',
                'old_version': snapshot_a.dependencies[pkg],
                'new_version': version
            }

    for pkg in snapshot_a.dependencies:
        if pkg not in snapshot_b.dependencies:
            dep_changes[pkg] = {'status': 'removed'}

    drift['dependencies'] = {
        'changes': dep_changes,
        'new_packages': sum(1 for v in dep_changes.values() if v['status'] == 'added'),
        'updated_packages': sum(1 for v in dep_changes.values() if v['status'] == 'updated'),
        'removed_packages': sum(1 for v in dep_changes.values() if v['status'] == 'removed')
    }

    # Risk score for the drift period
    drift_risk_score = (
        len(new_endpoints) * 3 +
        len(drift['api_endpoints']['high_risk_additions']) * 5 +
        len(new_auth) * 8 +
        len(new_integrations) * 4 +
        len(new_subdomains) * 2 +
        drift['dependencies']['new_packages'] * 1
    )

    drift['overall_drift_risk_score'] = drift_risk_score
    drift['risk_classification'] = (
        'CRITICAL' if drift_risk_score > 50 else
        'HIGH' if drift_risk_score > 25 else
        'MEDIUM' if drift_risk_score > 10 else
        'LOW'
    )
    drift['days_since_last_test'] = (snapshot_b.date - snapshot_a.date).days

    return drift

Part 2: What Annual Penetration Testing Actually Tests (And Misses)

The Annual Test Artifact Problem

An annual penetration test tests the application as it exists on the day testing begins — and only that. Every change made after testing begins introduces new potential vulnerabilities that the test won't cover.




The gap between what the test covers and what actually runs in production grows every day after the test window closes.

The Semantic Difference Between "Tested" and "Secure"

This distinction is the core of the annual vs continuous debate:

Statement

What It Means

What It Does NOT Mean

"We have a pentest certificate"

An engagement was conducted on a specific date

The application is currently secure

"Zero critical findings"

No critical vulnerabilities existed on the test date

No critical vulnerabilities exist today

"All findings remediated"

Specific findings from that date were fixed

No new findings were introduced since

"SOC 2 compliant"

Controls were in place during the audit period

Every control is operating effectively right now

"Last pentest: clean"

The tested version had acceptable findings

The current version, with months of new code, is acceptable

The compliance-driven annual test answers: "Were we secure on a specific date?" The operational question is: "Are we secure right now?" These are different questions requiring different answers.

Part 3: The Continuous Penetration Testing Model: How It Actually Works

Continuous penetration testing is not "running a scanner on every commit." It's a structured program with four distinct layers, each operating at different frequencies and depths:

Continuous Security Testing Architecture:

LAYER 1: Automated Security Regression (Every commit, seconds)

  • What: Automated checks for known patterns in new code

  • Tools: SAST, SCA, secret scanning

  • Coverage: Known vulnerability patterns, dependency CVEs, committed secrets

  • Not covered: Novel business logic issues, chain vulnerabilities, auth bypass

  • Value: Catches low-hanging fruit before it ships

  • Limitation: High false positive rate, misses everything complex

LAYER 2: Continuous Attack Surface Monitoring (Daily, automated)

  • What: Tracking changes to the external attack surface

  • Tools: Subdomain monitoring, endpoint enumeration, SSL certificate monitoring

  • Coverage: New subdomains, new exposed ports, new API endpoints

  • Not covered: Internal logic changes, auth configuration changes

  • Value: Immediate notification of surface expansion

  • Limitation: External-only visibility

LAYER 3: Targeted Security Testing (Every sprint/2 weeks, semi-automated)

  • What: Security-focused testing of every changed component

  • Tools: Authenticated API testing, change-aware testing automation

  • Coverage: New endpoints, modified auth flows, changed data models

  • Not covered: Complex multi-step exploit chains, novel attack patterns

  • Value: Tests new code within days of shipping

  • Limitation: Requires understanding of what changed

LAYER 4: Deep Manual + AI Testing (Monthly/quarterly, comprehensive)

  • What: Full penetration testing methodology against current attack surface

  • Tools: Full testing suite, AI-powered code review, manual testing

  • Coverage: Complete attack surface including business logic, chains, novel patterns

  • Not covered: N/A, this is the comprehensive layer

  • Value: Finds everything the automated layers miss

  • Limitation: Resource-intensive, not instantaneous

What "Continuous" Actually Means Operationally

The word "continuous" is used loosely in the security industry. Here's the precise operational meaning across different implementation models:

Model

Test Frequency

Coverage Per Test

Who Tests

Typical Cost

Best For

Annual

Once per year

Full scope

External firm

High (project)

Compliance-only requirements

Semi-annual

Twice per year

Full scope

External firm

High × 2

Slow-moving applications

Quarterly

4× per year

Full scope

External firm

High × 4

Moderate deployment velocity

Monthly

12× per year

Targeted scope

External firm

Medium × 12

High deployment velocity

Sprint-cadence

Every 2 weeks

Change-aware targeted

Embedded or retainer

Medium × 26

Agile teams, SaaS products

Continuous

Ongoing + monthly deep

Layered (see above)

Hybrid

Subscription

Full DevSecOps integration

Bug bounty only

Asynchronous

External perspective

Community

Variable

Supplement, not replacement

Sprint-Cadence Testing: The Most Operationally Effective Model

For engineering teams shipping every 1–2 weeks, sprint-cadence testing is the model that closes the deployment velocity gap most effectively while remaining operationally feasible.

class SprintSecurityTestingProgram:
    """
    Operationalizes sprint-cadence security testing.
    Each sprint's changed components are tested before the next sprint begins.
    """

    def __init__(self, repo_url: str, pentest_team_contact: str):
        self.repo_url = repo_url
        self.pentest_team = pentest_team_contact
        self.sprint_history = []

    def analyze_sprint_changes(
        self,
        sprint_start: datetime.date,
        sprint_end: datetime.date,
        merged_prs: list
    ) -> dict:
        """
        Analyze what changed in a sprint to determine security testing scope.
        """

        changed_components = {
            'authentication': [],    # Changes to auth logic
            'authorization': [],     # Changes to access control
            'api_endpoints': [],     # New or modified endpoints
            'data_access': [],       # ORM, database query changes
            'external_integrations': [],  # Third-party API changes
            'infrastructure': [],    # IaC, Kubernetes, CI/CD changes
            'dependencies': [],      # package.json, requirements.txt changes
            'configuration': [],     # Config files, environment changes
        }

        security_relevant_prs = []

        for pr in merged_prs:
            files_changed = pr.get('files_changed', [])

            # Classify changes by security relevance
            classifications = []

            for file in files_changed:
                if any(pattern in file.lower() for pattern in [
                    'auth', 'login', 'jwt', 'token', 'session', 'oauth'
                ]):
                    changed_components['authentication'].append(file)
                    classifications.append('authentication')

                elif any(pattern in file.lower() for pattern in [
                    'permission', 'role', 'acl', 'policy', 'rbac', 'middleware'
                ]):
                    changed_components['authorization'].append(file)
                    classifications.append('authorization')

                elif any(pattern in file.lower() for pattern in [
                    'routes', 'views', 'controllers', 'handlers', 'api'
                ]):
                    changed_components['api_endpoints'].append(file)
                    classifications.append('api_endpoints')

                elif any(pattern in file.lower() for pattern in [
                    'models', 'queries', 'repository', 'dao', 'db', 'orm'
                ]):
                    changed_components['data_access'].append(file)
                    classifications.append('data_access')

                elif file in [
                    'package.json', 'package-lock.json', 'requirements.txt',
                    'Pipfile', 'pom.xml', 'build.gradle', 'go.mod'
                ]:
                    changed_components['dependencies'].append(file)
                    classifications.append('dependencies')

                elif any(pattern in file.lower() for pattern in [
                    'kubernetes', 'k8s', 'helm', 'terraform', 'bicep',
                    '.github/workflows', 'jenkinsfile', 'dockerfile'
                ]):
                    changed_components['infrastructure'].append(file)
                    classifications.append('infrastructure')

            if classifications:
                security_relevant_prs.append({
                    'pr_number': pr.get('number'),
                    'title': pr.get('title'),
                    'author': pr.get('author'),
                    'security_categories': list(set(classifications)),
                    'files_changed': len(files_changed),
                    'security_relevant_files': [
                        f for f in files_changed
                        if any(cat in f.lower() for cat in [
                            'auth', 'api', 'model', 'route', 'middleware'
                        ])
                    ]
                })

        # Determine test depth required for this sprint
        risk_score = (
            len(changed_components['authentication']) * 10 +  # Highest weight
            len(changed_components['authorization']) * 8 +
            len(changed_components['api_endpoints']) * 5 +
            len(changed_components['data_access']) * 6 +
            len(changed_components['infrastructure']) * 7 +
            len(changed_components['external_integrations']) * 5 +
            len(changed_components['dependencies']) * 3
        )

        return {
            'sprint_start': sprint_start.isoformat(),
            'sprint_end': sprint_end.isoformat(),
            'total_prs': len(merged_prs),
            'security_relevant_prs': len(security_relevant_prs),
            'changed_components': changed_components,
            'sprint_risk_score': risk_score,
            'recommended_test_depth': self.classify_test_depth(risk_score),
            'estimated_test_hours': self.estimate_test_hours(risk_score),
            'priority_areas': self.identify_priority_areas(changed_components),
            'security_relevant_pr_details': security_relevant_prs
        }

    def classify_test_depth(self, risk_score: int) -> str:
        if risk_score > 100:
            return 'FULL_DEPTH — Authentication changes require complete auth chain review'
        elif risk_score > 50:
            return 'TARGETED_DEEP — Multiple security-relevant changes require deep testing'
        elif risk_score > 20:
            return 'TARGETED_STANDARD — Specific changed components need focused testing'
        else:
            return 'LIGHTWEIGHT — Minor changes, automated testing sufficient'

    def estimate_test_hours(self, risk_score: int) -> str:
        if risk_score > 100:
            return '8–16 hours'
        elif risk_score > 50:
            return '4–8 hours'
        elif risk_score > 20:
            return '2–4 hours'
        else:
            return '1–2 hours'

    def identify_priority_areas(self, changed_components: dict) -> list:
        priorities = []

        if changed_components['authentication']:
            priorities.append({
                'area': 'Authentication',
                'priority': 1,
                'reason': 'Auth changes have highest security impact',
                'test_focus': 'JWT validation, session management, MFA bypass, brute force'
            })

        if changed_components['authorization']:
            priorities.append({
                'area': 'Authorization',
                'priority': 2,
                'reason': 'Access control changes may introduce privilege escalation',
                'test_focus': 'RBAC, IDOR, cross-tenant access, role bypass'
            })

        if changed_components['data_access']:
            priorities.append({
                'area': 'Data Access Layer',
                'priority': 3,
                'reason': 'ORM changes may introduce injection or IDOR',
                'test_focus': 'SQL injection, NoSQL injection, ownership filter presence'
            })

        return sorted(priorities, key=lambda x: x['priority'])
class SprintSecurityTestingProgram:
    """
    Operationalizes sprint-cadence security testing.
    Each sprint's changed components are tested before the next sprint begins.
    """

    def __init__(self, repo_url: str, pentest_team_contact: str):
        self.repo_url = repo_url
        self.pentest_team = pentest_team_contact
        self.sprint_history = []

    def analyze_sprint_changes(
        self,
        sprint_start: datetime.date,
        sprint_end: datetime.date,
        merged_prs: list
    ) -> dict:
        """
        Analyze what changed in a sprint to determine security testing scope.
        """

        changed_components = {
            'authentication': [],    # Changes to auth logic
            'authorization': [],     # Changes to access control
            'api_endpoints': [],     # New or modified endpoints
            'data_access': [],       # ORM, database query changes
            'external_integrations': [],  # Third-party API changes
            'infrastructure': [],    # IaC, Kubernetes, CI/CD changes
            'dependencies': [],      # package.json, requirements.txt changes
            'configuration': [],     # Config files, environment changes
        }

        security_relevant_prs = []

        for pr in merged_prs:
            files_changed = pr.get('files_changed', [])

            # Classify changes by security relevance
            classifications = []

            for file in files_changed:
                if any(pattern in file.lower() for pattern in [
                    'auth', 'login', 'jwt', 'token', 'session', 'oauth'
                ]):
                    changed_components['authentication'].append(file)
                    classifications.append('authentication')

                elif any(pattern in file.lower() for pattern in [
                    'permission', 'role', 'acl', 'policy', 'rbac', 'middleware'
                ]):
                    changed_components['authorization'].append(file)
                    classifications.append('authorization')

                elif any(pattern in file.lower() for pattern in [
                    'routes', 'views', 'controllers', 'handlers', 'api'
                ]):
                    changed_components['api_endpoints'].append(file)
                    classifications.append('api_endpoints')

                elif any(pattern in file.lower() for pattern in [
                    'models', 'queries', 'repository', 'dao', 'db', 'orm'
                ]):
                    changed_components['data_access'].append(file)
                    classifications.append('data_access')

                elif file in [
                    'package.json', 'package-lock.json', 'requirements.txt',
                    'Pipfile', 'pom.xml', 'build.gradle', 'go.mod'
                ]:
                    changed_components['dependencies'].append(file)
                    classifications.append('dependencies')

                elif any(pattern in file.lower() for pattern in [
                    'kubernetes', 'k8s', 'helm', 'terraform', 'bicep',
                    '.github/workflows', 'jenkinsfile', 'dockerfile'
                ]):
                    changed_components['infrastructure'].append(file)
                    classifications.append('infrastructure')

            if classifications:
                security_relevant_prs.append({
                    'pr_number': pr.get('number'),
                    'title': pr.get('title'),
                    'author': pr.get('author'),
                    'security_categories': list(set(classifications)),
                    'files_changed': len(files_changed),
                    'security_relevant_files': [
                        f for f in files_changed
                        if any(cat in f.lower() for cat in [
                            'auth', 'api', 'model', 'route', 'middleware'
                        ])
                    ]
                })

        # Determine test depth required for this sprint
        risk_score = (
            len(changed_components['authentication']) * 10 +  # Highest weight
            len(changed_components['authorization']) * 8 +
            len(changed_components['api_endpoints']) * 5 +
            len(changed_components['data_access']) * 6 +
            len(changed_components['infrastructure']) * 7 +
            len(changed_components['external_integrations']) * 5 +
            len(changed_components['dependencies']) * 3
        )

        return {
            'sprint_start': sprint_start.isoformat(),
            'sprint_end': sprint_end.isoformat(),
            'total_prs': len(merged_prs),
            'security_relevant_prs': len(security_relevant_prs),
            'changed_components': changed_components,
            'sprint_risk_score': risk_score,
            'recommended_test_depth': self.classify_test_depth(risk_score),
            'estimated_test_hours': self.estimate_test_hours(risk_score),
            'priority_areas': self.identify_priority_areas(changed_components),
            'security_relevant_pr_details': security_relevant_prs
        }

    def classify_test_depth(self, risk_score: int) -> str:
        if risk_score > 100:
            return 'FULL_DEPTH — Authentication changes require complete auth chain review'
        elif risk_score > 50:
            return 'TARGETED_DEEP — Multiple security-relevant changes require deep testing'
        elif risk_score > 20:
            return 'TARGETED_STANDARD — Specific changed components need focused testing'
        else:
            return 'LIGHTWEIGHT — Minor changes, automated testing sufficient'

    def estimate_test_hours(self, risk_score: int) -> str:
        if risk_score > 100:
            return '8–16 hours'
        elif risk_score > 50:
            return '4–8 hours'
        elif risk_score > 20:
            return '2–4 hours'
        else:
            return '1–2 hours'

    def identify_priority_areas(self, changed_components: dict) -> list:
        priorities = []

        if changed_components['authentication']:
            priorities.append({
                'area': 'Authentication',
                'priority': 1,
                'reason': 'Auth changes have highest security impact',
                'test_focus': 'JWT validation, session management, MFA bypass, brute force'
            })

        if changed_components['authorization']:
            priorities.append({
                'area': 'Authorization',
                'priority': 2,
                'reason': 'Access control changes may introduce privilege escalation',
                'test_focus': 'RBAC, IDOR, cross-tenant access, role bypass'
            })

        if changed_components['data_access']:
            priorities.append({
                'area': 'Data Access Layer',
                'priority': 3,
                'reason': 'ORM changes may introduce injection or IDOR',
                'test_focus': 'SQL injection, NoSQL injection, ownership filter presence'
            })

        return sorted(priorities, key=lambda x: x['priority'])
class SprintSecurityTestingProgram:
    """
    Operationalizes sprint-cadence security testing.
    Each sprint's changed components are tested before the next sprint begins.
    """

    def __init__(self, repo_url: str, pentest_team_contact: str):
        self.repo_url = repo_url
        self.pentest_team = pentest_team_contact
        self.sprint_history = []

    def analyze_sprint_changes(
        self,
        sprint_start: datetime.date,
        sprint_end: datetime.date,
        merged_prs: list
    ) -> dict:
        """
        Analyze what changed in a sprint to determine security testing scope.
        """

        changed_components = {
            'authentication': [],    # Changes to auth logic
            'authorization': [],     # Changes to access control
            'api_endpoints': [],     # New or modified endpoints
            'data_access': [],       # ORM, database query changes
            'external_integrations': [],  # Third-party API changes
            'infrastructure': [],    # IaC, Kubernetes, CI/CD changes
            'dependencies': [],      # package.json, requirements.txt changes
            'configuration': [],     # Config files, environment changes
        }

        security_relevant_prs = []

        for pr in merged_prs:
            files_changed = pr.get('files_changed', [])

            # Classify changes by security relevance
            classifications = []

            for file in files_changed:
                if any(pattern in file.lower() for pattern in [
                    'auth', 'login', 'jwt', 'token', 'session', 'oauth'
                ]):
                    changed_components['authentication'].append(file)
                    classifications.append('authentication')

                elif any(pattern in file.lower() for pattern in [
                    'permission', 'role', 'acl', 'policy', 'rbac', 'middleware'
                ]):
                    changed_components['authorization'].append(file)
                    classifications.append('authorization')

                elif any(pattern in file.lower() for pattern in [
                    'routes', 'views', 'controllers', 'handlers', 'api'
                ]):
                    changed_components['api_endpoints'].append(file)
                    classifications.append('api_endpoints')

                elif any(pattern in file.lower() for pattern in [
                    'models', 'queries', 'repository', 'dao', 'db', 'orm'
                ]):
                    changed_components['data_access'].append(file)
                    classifications.append('data_access')

                elif file in [
                    'package.json', 'package-lock.json', 'requirements.txt',
                    'Pipfile', 'pom.xml', 'build.gradle', 'go.mod'
                ]:
                    changed_components['dependencies'].append(file)
                    classifications.append('dependencies')

                elif any(pattern in file.lower() for pattern in [
                    'kubernetes', 'k8s', 'helm', 'terraform', 'bicep',
                    '.github/workflows', 'jenkinsfile', 'dockerfile'
                ]):
                    changed_components['infrastructure'].append(file)
                    classifications.append('infrastructure')

            if classifications:
                security_relevant_prs.append({
                    'pr_number': pr.get('number'),
                    'title': pr.get('title'),
                    'author': pr.get('author'),
                    'security_categories': list(set(classifications)),
                    'files_changed': len(files_changed),
                    'security_relevant_files': [
                        f for f in files_changed
                        if any(cat in f.lower() for cat in [
                            'auth', 'api', 'model', 'route', 'middleware'
                        ])
                    ]
                })

        # Determine test depth required for this sprint
        risk_score = (
            len(changed_components['authentication']) * 10 +  # Highest weight
            len(changed_components['authorization']) * 8 +
            len(changed_components['api_endpoints']) * 5 +
            len(changed_components['data_access']) * 6 +
            len(changed_components['infrastructure']) * 7 +
            len(changed_components['external_integrations']) * 5 +
            len(changed_components['dependencies']) * 3
        )

        return {
            'sprint_start': sprint_start.isoformat(),
            'sprint_end': sprint_end.isoformat(),
            'total_prs': len(merged_prs),
            'security_relevant_prs': len(security_relevant_prs),
            'changed_components': changed_components,
            'sprint_risk_score': risk_score,
            'recommended_test_depth': self.classify_test_depth(risk_score),
            'estimated_test_hours': self.estimate_test_hours(risk_score),
            'priority_areas': self.identify_priority_areas(changed_components),
            'security_relevant_pr_details': security_relevant_prs
        }

    def classify_test_depth(self, risk_score: int) -> str:
        if risk_score > 100:
            return 'FULL_DEPTH — Authentication changes require complete auth chain review'
        elif risk_score > 50:
            return 'TARGETED_DEEP — Multiple security-relevant changes require deep testing'
        elif risk_score > 20:
            return 'TARGETED_STANDARD — Specific changed components need focused testing'
        else:
            return 'LIGHTWEIGHT — Minor changes, automated testing sufficient'

    def estimate_test_hours(self, risk_score: int) -> str:
        if risk_score > 100:
            return '8–16 hours'
        elif risk_score > 50:
            return '4–8 hours'
        elif risk_score > 20:
            return '2–4 hours'
        else:
            return '1–2 hours'

    def identify_priority_areas(self, changed_components: dict) -> list:
        priorities = []

        if changed_components['authentication']:
            priorities.append({
                'area': 'Authentication',
                'priority': 1,
                'reason': 'Auth changes have highest security impact',
                'test_focus': 'JWT validation, session management, MFA bypass, brute force'
            })

        if changed_components['authorization']:
            priorities.append({
                'area': 'Authorization',
                'priority': 2,
                'reason': 'Access control changes may introduce privilege escalation',
                'test_focus': 'RBAC, IDOR, cross-tenant access, role bypass'
            })

        if changed_components['data_access']:
            priorities.append({
                'area': 'Data Access Layer',
                'priority': 3,
                'reason': 'ORM changes may introduce injection or IDOR',
                'test_focus': 'SQL injection, NoSQL injection, ownership filter presence'
            })

        return sorted(priorities, key=lambda x: x['priority'])

Part 4: Attack Surface Monitoring: The Continuous Layer That Runs Automatically

Between active testing sessions, attack surface monitoring provides real-time visibility into surface expansion:

#!/bin/bash
# attack_surface_monitor.sh
# Runs daily via cron — detects surface changes and alerts security team

DOMAIN="company.com"
BASELINE_DIR="/var/security/baselines"
ALERT_EMAIL="security@company.com"
TODAY=$(date +%Y%m%d)

echo "=== Attack Surface Monitor — $(date) ==="

# MODULE 1: Subdomain Discovery
echo "[*] Scanning subdomains..."

# Run multiple tools for better coverage
subfinder -d "$DOMAIN" -silent > /tmp/subdomains_new.txt 2>/dev/null
amass enum -passive -d "$DOMAIN" >> /tmp/subdomains_new.txt 2>/dev/null
sort -u /tmp/subdomains_new.txt > /tmp/subdomains_today.txt

if [ -f "$BASELINE_DIR/subdomains_baseline.txt" ]; then
    # Find new subdomains not in baseline
    NEW_SUBDOMAINS=$(comm -13 \\
        <(sort "$BASELINE_DIR/subdomains_baseline.txt") \\
        <(sort /tmp/subdomains_today.txt))

    if [ -n "$NEW_SUBDOMAINS" ]; then
        echo "NEW SUBDOMAINS DETECTED:"
        echo "$NEW_SUBDOMAINS" | while read -r subdomain; do
            echo "  + $subdomain"

            # Check if new subdomain is a CNAME to third-party (takeover risk)
            CNAME=$(dig CNAME "$subdomain" +short)
            if [ -n "$CNAME" ]; then
                echo "    → CNAME: $CNAME"

                # Check if the CNAME target is live
                if ! curl -s --head "<https://$CNAME>" --connect-timeout 5 | grep -q "HTTP"; then
                    echo "    ⚠️  CNAME TARGET MAY BE DANGLING — SUBDOMAIN TAKEOVER RISK"
                fi
            fi
        done

        # Alert security team
        echo "NEW SUBDOMAINS:\\n$NEW_SUBDOMAINS" | \\
            mail -s "Attack Surface Change: New Subdomains on $DOMAIN" "$ALERT_EMAIL"
    fi
fi

cp /tmp/subdomains_today.txt "$BASELINE_DIR/subdomains_baseline.txt"

# MODULE 2: SSL Certificate Monitoring (detects new domains/services)
echo "[*] Monitoring Certificate Transparency logs..."

# ct-exposer or crt.sh to find newly issued certs
curl -s "<https://crt.sh/?q=%25.$DOMAIN&output=json>" | \\
    jq -r '.[].name_value' | \\
    sort -u > /tmp/cert_domains_today.txt

if [ -f "$BASELINE_DIR/cert_domains_baseline.txt" ]; then
    NEW_CERT_DOMAINS=$(comm -13 \\
        <(sort "$BASELINE_DIR/cert_domains_baseline.txt") \\
        <(sort /tmp/cert_domains_today.txt))

    if [ -n "$NEW_CERT_DOMAINS" ]; then
        echo "NEW CERTIFICATES ISSUED:"
        echo "$NEW_CERT_DOMAINS"
        # New certs may indicate new services being deployed
    fi
fi

cp /tmp/cert_domains_today.txt "$BASELINE_DIR/cert_domains_baseline.txt"

# MODULE 3: API Endpoint Discovery (monitors production API surface)
echo "[*] Scanning API endpoints..."

# Fetch current OpenAPI spec if available
curl -s "<https://api.$DOMAIN/openapi.json>" > /tmp/api_spec_today.json 2>/dev/null
curl -s "<https://api.$DOMAIN/v3/api-docs>" >> /tmp/api_spec_today.json 2>/dev/null

if [ -f "$BASELINE_DIR/api_spec_baseline.json" ] && [ -s /tmp/api_spec_today.json ]; then
    # Extract all endpoint paths from both specs
    jq -r '.paths | keys[]' /tmp/api_spec_today.json | sort > /tmp/endpoints_today.txt
    jq -r '.paths | keys[]' "$BASELINE_DIR/api_spec_baseline.json" | sort > /tmp/endpoints_baseline.txt

    NEW_ENDPOINTS=$(comm -13 /tmp/endpoints_baseline.txt /tmp/endpoints_today.txt)
    REMOVED_ENDPOINTS=$(comm -23 /tmp/endpoints_baseline.txt /tmp/endpoints_today.txt)

    if [ -n "$NEW_ENDPOINTS" ]; then
        echo "NEW API ENDPOINTS:"
        echo "$NEW_ENDPOINTS" | while read -r endpoint; do
            echo "  + $endpoint"
            # Flag high-risk endpoints
            if echo "$endpoint" | grep -qiE "(admin|export|delete|upload|import)"; then
                echo "    ⚠️  HIGH-RISK ENDPOINT PATTERN — schedule security review"
            fi
        done
    fi
fi

cp /tmp/api_spec_today.json "$BASELINE_DIR/api_spec_baseline.json"

# MODULE 4: JavaScript Bundle Analysis (detects new secrets/endpoints in frontend)
echo "[*] Analyzing JavaScript bundles..."

# Get all bundle URLs from the app
curl -s "<https://app.$DOMAIN/>" | \\
    grep -oP 'src="[^"]*\\.js[^"]*"' | \\
    sed 's/src="//;s/"//' > /tmp/bundle_urls.txt

# Download and scan each bundle
while read -r bundle_url; do
    bundle_hash=$(echo "$bundle_url" | md5sum | cut -d' ' -f1)
    bundle_file="/tmp/bundle_${bundle_hash}.js"

    curl -s "<https://app.$DOMAIN$>{bundle_url}" -o "$bundle_file"

    # Check against baseline hash
    current_hash=$(sha256sum "$bundle_file" | cut -d' ' -f1)
    baseline_hash=$(cat "$BASELINE_DIR/bundle_${bundle_hash}.sha256" 2>/dev/null)

    if [ "$current_hash" != "$baseline_hash" ]; then
        echo "  Bundle changed: $bundle_url"

        # Scan for new secrets
        SECRETS=$(grep -oiE \\
            "(AKIA[0-9A-Z]{16}|sk_live_[0-9a-zA-Z]{24,}|ghp_[0-9a-zA-Z]{36})" \\
            "$bundle_file")

        if [ -n "$SECRETS" ]; then
            echo "  ⚠️  POTENTIAL SECRETS IN BUNDLE:"
            echo "$SECRETS"
        fi

        echo "$current_hash" > "$BASELINE_DIR/bundle_${bundle_hash}.sha256"
    fi
done < /tmp/bundle_urls.txt

# MODULE 5: Cloud Infrastructure Monitoring
echo "[*] Monitoring cloud attack surface..."

# Check for new public S3 buckets
aws s3api list-buckets --query 'Buckets[*].Name' --output text | \\
    tr '\\t' '\\n' | sort > /tmp/s3_buckets_today.txt

if [ -f "$BASELINE_DIR/s3_buckets_baseline.txt" ]; then
    NEW_BUCKETS=$(comm -13 \\
        <(sort "$BASELINE_DIR/s3_buckets_baseline.txt") \\
        <(sort /tmp/s3_buckets_today.txt))

    if [ -n "$NEW_BUCKETS" ]; then
        echo "NEW S3 BUCKETS:"
        echo "$NEW_BUCKETS" | while read -r bucket; do
            # Check if new bucket has public access
            PUBLIC=$(aws s3api get-bucket-policy-status \\
                --bucket "$bucket" \\
                --query 'PolicyStatus.IsPublic' \\
                --output text 2>/dev/null)

            if [ "$PUBLIC" = "True" ]; then
                echo "  ⚠️  PUBLIC BUCKET: $bucket — requires immediate review"
            else
                echo "  + $bucket (private)"
            fi
        done
    fi
fi

cp /tmp/s3_buckets_today.txt "$BASELINE_DIR/s3_buckets_baseline.txt"

echo ""
echo "=== Surface Monitor Complete — $(date) ==="
#!/bin/bash
# attack_surface_monitor.sh
# Runs daily via cron — detects surface changes and alerts security team

DOMAIN="company.com"
BASELINE_DIR="/var/security/baselines"
ALERT_EMAIL="security@company.com"
TODAY=$(date +%Y%m%d)

echo "=== Attack Surface Monitor — $(date) ==="

# MODULE 1: Subdomain Discovery
echo "[*] Scanning subdomains..."

# Run multiple tools for better coverage
subfinder -d "$DOMAIN" -silent > /tmp/subdomains_new.txt 2>/dev/null
amass enum -passive -d "$DOMAIN" >> /tmp/subdomains_new.txt 2>/dev/null
sort -u /tmp/subdomains_new.txt > /tmp/subdomains_today.txt

if [ -f "$BASELINE_DIR/subdomains_baseline.txt" ]; then
    # Find new subdomains not in baseline
    NEW_SUBDOMAINS=$(comm -13 \\
        <(sort "$BASELINE_DIR/subdomains_baseline.txt") \\
        <(sort /tmp/subdomains_today.txt))

    if [ -n "$NEW_SUBDOMAINS" ]; then
        echo "NEW SUBDOMAINS DETECTED:"
        echo "$NEW_SUBDOMAINS" | while read -r subdomain; do
            echo "  + $subdomain"

            # Check if new subdomain is a CNAME to third-party (takeover risk)
            CNAME=$(dig CNAME "$subdomain" +short)
            if [ -n "$CNAME" ]; then
                echo "    → CNAME: $CNAME"

                # Check if the CNAME target is live
                if ! curl -s --head "<https://$CNAME>" --connect-timeout 5 | grep -q "HTTP"; then
                    echo "    ⚠️  CNAME TARGET MAY BE DANGLING — SUBDOMAIN TAKEOVER RISK"
                fi
            fi
        done

        # Alert security team
        echo "NEW SUBDOMAINS:\\n$NEW_SUBDOMAINS" | \\
            mail -s "Attack Surface Change: New Subdomains on $DOMAIN" "$ALERT_EMAIL"
    fi
fi

cp /tmp/subdomains_today.txt "$BASELINE_DIR/subdomains_baseline.txt"

# MODULE 2: SSL Certificate Monitoring (detects new domains/services)
echo "[*] Monitoring Certificate Transparency logs..."

# ct-exposer or crt.sh to find newly issued certs
curl -s "<https://crt.sh/?q=%25.$DOMAIN&output=json>" | \\
    jq -r '.[].name_value' | \\
    sort -u > /tmp/cert_domains_today.txt

if [ -f "$BASELINE_DIR/cert_domains_baseline.txt" ]; then
    NEW_CERT_DOMAINS=$(comm -13 \\
        <(sort "$BASELINE_DIR/cert_domains_baseline.txt") \\
        <(sort /tmp/cert_domains_today.txt))

    if [ -n "$NEW_CERT_DOMAINS" ]; then
        echo "NEW CERTIFICATES ISSUED:"
        echo "$NEW_CERT_DOMAINS"
        # New certs may indicate new services being deployed
    fi
fi

cp /tmp/cert_domains_today.txt "$BASELINE_DIR/cert_domains_baseline.txt"

# MODULE 3: API Endpoint Discovery (monitors production API surface)
echo "[*] Scanning API endpoints..."

# Fetch current OpenAPI spec if available
curl -s "<https://api.$DOMAIN/openapi.json>" > /tmp/api_spec_today.json 2>/dev/null
curl -s "<https://api.$DOMAIN/v3/api-docs>" >> /tmp/api_spec_today.json 2>/dev/null

if [ -f "$BASELINE_DIR/api_spec_baseline.json" ] && [ -s /tmp/api_spec_today.json ]; then
    # Extract all endpoint paths from both specs
    jq -r '.paths | keys[]' /tmp/api_spec_today.json | sort > /tmp/endpoints_today.txt
    jq -r '.paths | keys[]' "$BASELINE_DIR/api_spec_baseline.json" | sort > /tmp/endpoints_baseline.txt

    NEW_ENDPOINTS=$(comm -13 /tmp/endpoints_baseline.txt /tmp/endpoints_today.txt)
    REMOVED_ENDPOINTS=$(comm -23 /tmp/endpoints_baseline.txt /tmp/endpoints_today.txt)

    if [ -n "$NEW_ENDPOINTS" ]; then
        echo "NEW API ENDPOINTS:"
        echo "$NEW_ENDPOINTS" | while read -r endpoint; do
            echo "  + $endpoint"
            # Flag high-risk endpoints
            if echo "$endpoint" | grep -qiE "(admin|export|delete|upload|import)"; then
                echo "    ⚠️  HIGH-RISK ENDPOINT PATTERN — schedule security review"
            fi
        done
    fi
fi

cp /tmp/api_spec_today.json "$BASELINE_DIR/api_spec_baseline.json"

# MODULE 4: JavaScript Bundle Analysis (detects new secrets/endpoints in frontend)
echo "[*] Analyzing JavaScript bundles..."

# Get all bundle URLs from the app
curl -s "<https://app.$DOMAIN/>" | \\
    grep -oP 'src="[^"]*\\.js[^"]*"' | \\
    sed 's/src="//;s/"//' > /tmp/bundle_urls.txt

# Download and scan each bundle
while read -r bundle_url; do
    bundle_hash=$(echo "$bundle_url" | md5sum | cut -d' ' -f1)
    bundle_file="/tmp/bundle_${bundle_hash}.js"

    curl -s "<https://app.$DOMAIN$>{bundle_url}" -o "$bundle_file"

    # Check against baseline hash
    current_hash=$(sha256sum "$bundle_file" | cut -d' ' -f1)
    baseline_hash=$(cat "$BASELINE_DIR/bundle_${bundle_hash}.sha256" 2>/dev/null)

    if [ "$current_hash" != "$baseline_hash" ]; then
        echo "  Bundle changed: $bundle_url"

        # Scan for new secrets
        SECRETS=$(grep -oiE \\
            "(AKIA[0-9A-Z]{16}|sk_live_[0-9a-zA-Z]{24,}|ghp_[0-9a-zA-Z]{36})" \\
            "$bundle_file")

        if [ -n "$SECRETS" ]; then
            echo "  ⚠️  POTENTIAL SECRETS IN BUNDLE:"
            echo "$SECRETS"
        fi

        echo "$current_hash" > "$BASELINE_DIR/bundle_${bundle_hash}.sha256"
    fi
done < /tmp/bundle_urls.txt

# MODULE 5: Cloud Infrastructure Monitoring
echo "[*] Monitoring cloud attack surface..."

# Check for new public S3 buckets
aws s3api list-buckets --query 'Buckets[*].Name' --output text | \\
    tr '\\t' '\\n' | sort > /tmp/s3_buckets_today.txt

if [ -f "$BASELINE_DIR/s3_buckets_baseline.txt" ]; then
    NEW_BUCKETS=$(comm -13 \\
        <(sort "$BASELINE_DIR/s3_buckets_baseline.txt") \\
        <(sort /tmp/s3_buckets_today.txt))

    if [ -n "$NEW_BUCKETS" ]; then
        echo "NEW S3 BUCKETS:"
        echo "$NEW_BUCKETS" | while read -r bucket; do
            # Check if new bucket has public access
            PUBLIC=$(aws s3api get-bucket-policy-status \\
                --bucket "$bucket" \\
                --query 'PolicyStatus.IsPublic' \\
                --output text 2>/dev/null)

            if [ "$PUBLIC" = "True" ]; then
                echo "  ⚠️  PUBLIC BUCKET: $bucket — requires immediate review"
            else
                echo "  + $bucket (private)"
            fi
        done
    fi
fi

cp /tmp/s3_buckets_today.txt "$BASELINE_DIR/s3_buckets_baseline.txt"

echo ""
echo "=== Surface Monitor Complete — $(date) ==="
#!/bin/bash
# attack_surface_monitor.sh
# Runs daily via cron — detects surface changes and alerts security team

DOMAIN="company.com"
BASELINE_DIR="/var/security/baselines"
ALERT_EMAIL="security@company.com"
TODAY=$(date +%Y%m%d)

echo "=== Attack Surface Monitor — $(date) ==="

# MODULE 1: Subdomain Discovery
echo "[*] Scanning subdomains..."

# Run multiple tools for better coverage
subfinder -d "$DOMAIN" -silent > /tmp/subdomains_new.txt 2>/dev/null
amass enum -passive -d "$DOMAIN" >> /tmp/subdomains_new.txt 2>/dev/null
sort -u /tmp/subdomains_new.txt > /tmp/subdomains_today.txt

if [ -f "$BASELINE_DIR/subdomains_baseline.txt" ]; then
    # Find new subdomains not in baseline
    NEW_SUBDOMAINS=$(comm -13 \\
        <(sort "$BASELINE_DIR/subdomains_baseline.txt") \\
        <(sort /tmp/subdomains_today.txt))

    if [ -n "$NEW_SUBDOMAINS" ]; then
        echo "NEW SUBDOMAINS DETECTED:"
        echo "$NEW_SUBDOMAINS" | while read -r subdomain; do
            echo "  + $subdomain"

            # Check if new subdomain is a CNAME to third-party (takeover risk)
            CNAME=$(dig CNAME "$subdomain" +short)
            if [ -n "$CNAME" ]; then
                echo "    → CNAME: $CNAME"

                # Check if the CNAME target is live
                if ! curl -s --head "<https://$CNAME>" --connect-timeout 5 | grep -q "HTTP"; then
                    echo "    ⚠️  CNAME TARGET MAY BE DANGLING — SUBDOMAIN TAKEOVER RISK"
                fi
            fi
        done

        # Alert security team
        echo "NEW SUBDOMAINS:\\n$NEW_SUBDOMAINS" | \\
            mail -s "Attack Surface Change: New Subdomains on $DOMAIN" "$ALERT_EMAIL"
    fi
fi

cp /tmp/subdomains_today.txt "$BASELINE_DIR/subdomains_baseline.txt"

# MODULE 2: SSL Certificate Monitoring (detects new domains/services)
echo "[*] Monitoring Certificate Transparency logs..."

# ct-exposer or crt.sh to find newly issued certs
curl -s "<https://crt.sh/?q=%25.$DOMAIN&output=json>" | \\
    jq -r '.[].name_value' | \\
    sort -u > /tmp/cert_domains_today.txt

if [ -f "$BASELINE_DIR/cert_domains_baseline.txt" ]; then
    NEW_CERT_DOMAINS=$(comm -13 \\
        <(sort "$BASELINE_DIR/cert_domains_baseline.txt") \\
        <(sort /tmp/cert_domains_today.txt))

    if [ -n "$NEW_CERT_DOMAINS" ]; then
        echo "NEW CERTIFICATES ISSUED:"
        echo "$NEW_CERT_DOMAINS"
        # New certs may indicate new services being deployed
    fi
fi

cp /tmp/cert_domains_today.txt "$BASELINE_DIR/cert_domains_baseline.txt"

# MODULE 3: API Endpoint Discovery (monitors production API surface)
echo "[*] Scanning API endpoints..."

# Fetch current OpenAPI spec if available
curl -s "<https://api.$DOMAIN/openapi.json>" > /tmp/api_spec_today.json 2>/dev/null
curl -s "<https://api.$DOMAIN/v3/api-docs>" >> /tmp/api_spec_today.json 2>/dev/null

if [ -f "$BASELINE_DIR/api_spec_baseline.json" ] && [ -s /tmp/api_spec_today.json ]; then
    # Extract all endpoint paths from both specs
    jq -r '.paths | keys[]' /tmp/api_spec_today.json | sort > /tmp/endpoints_today.txt
    jq -r '.paths | keys[]' "$BASELINE_DIR/api_spec_baseline.json" | sort > /tmp/endpoints_baseline.txt

    NEW_ENDPOINTS=$(comm -13 /tmp/endpoints_baseline.txt /tmp/endpoints_today.txt)
    REMOVED_ENDPOINTS=$(comm -23 /tmp/endpoints_baseline.txt /tmp/endpoints_today.txt)

    if [ -n "$NEW_ENDPOINTS" ]; then
        echo "NEW API ENDPOINTS:"
        echo "$NEW_ENDPOINTS" | while read -r endpoint; do
            echo "  + $endpoint"
            # Flag high-risk endpoints
            if echo "$endpoint" | grep -qiE "(admin|export|delete|upload|import)"; then
                echo "    ⚠️  HIGH-RISK ENDPOINT PATTERN — schedule security review"
            fi
        done
    fi
fi

cp /tmp/api_spec_today.json "$BASELINE_DIR/api_spec_baseline.json"

# MODULE 4: JavaScript Bundle Analysis (detects new secrets/endpoints in frontend)
echo "[*] Analyzing JavaScript bundles..."

# Get all bundle URLs from the app
curl -s "<https://app.$DOMAIN/>" | \\
    grep -oP 'src="[^"]*\\.js[^"]*"' | \\
    sed 's/src="//;s/"//' > /tmp/bundle_urls.txt

# Download and scan each bundle
while read -r bundle_url; do
    bundle_hash=$(echo "$bundle_url" | md5sum | cut -d' ' -f1)
    bundle_file="/tmp/bundle_${bundle_hash}.js"

    curl -s "<https://app.$DOMAIN$>{bundle_url}" -o "$bundle_file"

    # Check against baseline hash
    current_hash=$(sha256sum "$bundle_file" | cut -d' ' -f1)
    baseline_hash=$(cat "$BASELINE_DIR/bundle_${bundle_hash}.sha256" 2>/dev/null)

    if [ "$current_hash" != "$baseline_hash" ]; then
        echo "  Bundle changed: $bundle_url"

        # Scan for new secrets
        SECRETS=$(grep -oiE \\
            "(AKIA[0-9A-Z]{16}|sk_live_[0-9a-zA-Z]{24,}|ghp_[0-9a-zA-Z]{36})" \\
            "$bundle_file")

        if [ -n "$SECRETS" ]; then
            echo "  ⚠️  POTENTIAL SECRETS IN BUNDLE:"
            echo "$SECRETS"
        fi

        echo "$current_hash" > "$BASELINE_DIR/bundle_${bundle_hash}.sha256"
    fi
done < /tmp/bundle_urls.txt

# MODULE 5: Cloud Infrastructure Monitoring
echo "[*] Monitoring cloud attack surface..."

# Check for new public S3 buckets
aws s3api list-buckets --query 'Buckets[*].Name' --output text | \\
    tr '\\t' '\\n' | sort > /tmp/s3_buckets_today.txt

if [ -f "$BASELINE_DIR/s3_buckets_baseline.txt" ]; then
    NEW_BUCKETS=$(comm -13 \\
        <(sort "$BASELINE_DIR/s3_buckets_baseline.txt") \\
        <(sort /tmp/s3_buckets_today.txt))

    if [ -n "$NEW_BUCKETS" ]; then
        echo "NEW S3 BUCKETS:"
        echo "$NEW_BUCKETS" | while read -r bucket; do
            # Check if new bucket has public access
            PUBLIC=$(aws s3api get-bucket-policy-status \\
                --bucket "$bucket" \\
                --query 'PolicyStatus.IsPublic' \\
                --output text 2>/dev/null)

            if [ "$PUBLIC" = "True" ]; then
                echo "  ⚠️  PUBLIC BUCKET: $bucket — requires immediate review"
            else
                echo "  + $bucket (private)"
            fi
        done
    fi
fi

cp /tmp/s3_buckets_today.txt "$BASELINE_DIR/s3_buckets_baseline.txt"

echo ""
echo "=== Surface Monitor Complete — $(date) ==="

Automated Security Regression Testing

Beyond surface monitoring, automated regression tests catch the most common vulnerability patterns on every deployment:

import pytest
import requests
import concurrent.futures
from typing import List, Tuple

class SecurityRegressionSuite:
    """
    Automated security regression tests that run on every deployment.
    Tests for the most common high-impact vulnerabilities.
    These don't replace manual testing — they catch regressions fast.
    """

    def __init__(self, base_url: str, auth_tokens: dict):
        self.base_url = base_url
        self.auth_tokens = auth_tokens  # {user_type: token}
        self.session = requests.Session()

    # ═══════════════════════════════════════════════════════
    # AUTHENTICATION REGRESSION TESTS
    # ═══════════════════════════════════════════════════════

    def test_endpoint_requires_authentication(self, endpoints: List[str]) -> List[dict]:
        """Verify all non-public endpoints return 401 without auth"""
        failures = []

        for endpoint in endpoints:
            response = self.session.get(
                f"{self.base_url}{endpoint}",
                allow_redirects=False
            )

            if response.status_code not in [401, 403, 302]:
                failures.append({
                    'test': 'authentication_required',
                    'endpoint': endpoint,
                    'expected': '401/403',
                    'actual': response.status_code,
                    'severity': 'CRITICAL',
                    'finding': f'{endpoint} accessible without authentication'
                })

        return failures

    def test_jwt_algorithm_enforcement(self) -> dict:
        """Test that JWT algorithm is strictly enforced"""
        import jwt
        import json
        import base64

        # Forge a none-algorithm token
        header = base64.urlsafe_b64encode(
            json.dumps({"alg": "none", "typ": "JWT"}).encode()
        ).rstrip(b'=').decode()

        payload = base64.urlsafe_b64encode(
            json.dumps({
                "user_id": "1",
                "role": "admin",
                "exp": 9999999999
            }).encode()
        ).rstrip(b'=').decode()

        forged_token = f"{header}.{payload}."

        response = self.session.get(
            f"{self.base_url}/api/v1/users/profile",
            headers={"Authorization": f"Bearer {forged_token}"}
        )

        return {
            'test': 'jwt_algorithm_none',
            'passed': response.status_code in [401, 403],
            'status_code': response.status_code,
            'severity': 'CRITICAL' if response.status_code == 200 else None,
            'finding': 'JWT none algorithm accepted' if response.status_code == 200 else None
        }

    # ═══════════════════════════════════════════════════════
    # AUTHORIZATION REGRESSION TESTS
    # ═══════════════════════════════════════════════════════

    def test_cross_user_idor(
        self,
        resource_endpoints: List[str],
        user_a_token: str,
        user_b_resources: List[str]
    ) -> List[dict]:
        """Test that User A cannot access User B's resources"""
        failures = []

        headers_a = {"Authorization": f"Bearer {user_a_token}"}

        for endpoint_pattern in resource_endpoints:
            for resource_id in user_b_resources:
                url = endpoint_pattern.replace('{id}', resource_id)

                response = self.session.get(
                    f"{self.base_url}{url}",
                    headers=headers_a,
                    allow_redirects=False
                )

                if response.status_code == 200:
                    failures.append({
                        'test': 'cross_user_idor',
                        'endpoint': url,
                        'accessing_user': 'user_a',
                        'resource_owner': 'user_b',
                        'status_code': 200,
                        'severity': 'HIGH',
                        'finding': f'IDOR: User A can access User B resource at {url}'
                    })

        return failures

    def test_horizontal_privilege_escalation(
        self,
        admin_endpoints: List[str],
        standard_user_token: str
    ) -> List[dict]:
        """Test that standard users cannot access admin endpoints"""
        failures = []

        headers = {"Authorization": f"Bearer {standard_user_token}"}

        for endpoint in admin_endpoints:
            response = self.session.get(
                f"{self.base_url}{endpoint}",
                headers=headers,
                allow_redirects=False
            )

            if response.status_code == 200:
                failures.append({
                    'test': 'privilege_escalation',
                    'endpoint': endpoint,
                    'token_type': 'standard_user',
                    'severity': 'CRITICAL',
                    'finding': f'Standard user accesses admin endpoint: {endpoint}'
                })

        return failures

    # ═══════════════════════════════════════════════════════
    # CORS REGRESSION TESTS
    # ═══════════════════════════════════════════════════════

    def test_cors_origin_reflection(self, api_endpoints: List[str]) -> List[dict]:
        """Test that CORS does not reflect arbitrary origins"""
        failures = []

        test_origin = "<https://evil-test-origin-regression.com>"

        for endpoint in api_endpoints:
            response = self.session.get(
                f"{self.base_url}{endpoint}",
                headers={
                    "Origin": test_origin,
                    "Authorization": f"Bearer {self.auth_tokens.get('standard')}"
                },
                allow_redirects=False
            )

            acao = response.headers.get('Access-Control-Allow-Origin', '')
            acac = response.headers.get('Access-Control-Allow-Credentials', '')

            if acao == test_origin and acac.lower() == 'true':
                failures.append({
                    'test': 'cors_origin_reflection',
                    'endpoint': endpoint,
                    'reflected_origin': test_origin,
                    'credentials_allowed': True,
                    'severity': 'CRITICAL',
                    'finding': f'CORS reflects arbitrary origin with credentials at {endpoint}'
                })

        return failures

    # ═══════════════════════════════════════════════════════
    # INJECTION REGRESSION TESTS
    # ═══════════════════════════════════════════════════════

    def test_sql_injection_time_based(
        self,
        injectable_endpoints: List[Tuple[str, str, str]]  # (endpoint, method, param)
    ) -> List[dict]:
        """
        Test for SQL injection using time-based detection.
        Safe — doesn't read any data, only measures response time difference.
        """
        failures = []

        safe_payload = "test_value_regression_123"
        # MySQL time-based: SLEEP(5)
        sqli_payload_mysql = "test' AND SLEEP(3)--"
        # PostgreSQL time-based: pg_sleep
        sqli_payload_pg = "test' AND pg_sleep(3)--"

        for endpoint, method, param in injectable_endpoints:
            # Baseline response time
            start = time.time()
            safe_response = self.session.request(
                method,
                f"{self.base_url}{endpoint}",
                params={param: safe_payload} if method == 'GET' else None,
                json={param: safe_payload} if method == 'POST' else None,
                headers={"Authorization": f"Bearer {self.auth_tokens.get('standard')}"},
                timeout=10
            )
            baseline_time = time.time() - start

            # Test with MySQL payload
            start = time.time()
            sqli_response = self.session.request(
                method,
                f"{self.base_url}{endpoint}",
                params={param: sqli_payload_mysql} if method == 'GET' else None,
                json={param: sqli_payload_mysql} if method == 'POST' else None,
                headers={"Authorization": f"Bearer {self.auth_tokens.get('standard')}"},
                timeout=15
            )
            test_time = time.time() - start

            # If response is significantly delayed → time-based SQL injection
            if test_time > baseline_time + 2.5:  # 2.5s threshold for 3s sleep
                failures.append({
                    'test': 'sql_injection_time_based',
                    'endpoint': endpoint,
                    'parameter': param,
                    'baseline_time': round(baseline_time, 2),
                    'injection_time': round(test_time, 2),
                    'delay': round(test_time - baseline_time, 2),
                    'severity': 'HIGH',
                    'finding': f'Time-based SQL injection at {endpoint} parameter {param}'
                })

        return failures

    def run_full_regression_suite(
        self,
        endpoints: dict,  # {endpoint_type: [endpoints]}
        user_accounts: dict  # {user_type: {token, resources}}
    ) -> dict:
        """Run the complete regression suite and return consolidated results"""

        all_failures = []

        # Authentication tests
        auth_failures = self.test_endpoint_requires_authentication(
            endpoints.get('authenticated', [])
        )
        all_failures.extend(auth_failures)

        jwt_result = self.test_jwt_algorithm_enforcement()
        if not jwt_result['passed']:
            all_failures.append(jwt_result)

        # Authorization tests
        if 'user_a' in user_accounts and 'user_b' in user_accounts:
            idor_failures = self.test_cross_user_idor(
                endpoints.get('resource', []),
                user_accounts['user_a']['token'],
                user_accounts['user_b']['resource_ids']
            )
            all_failures.extend(idor_failures)

        priv_esc_failures = self.test_horizontal_privilege_escalation(
            endpoints.get('admin', []),
            user_accounts.get('standard', {}).get('token', '')
        )
        all_failures.extend(priv_esc_failures)

        # CORS tests
        cors_failures = self.test_cors_origin_reflection(
            endpoints.get('api', [])
        )
        all_failures.extend(cors_failures)

        # Summary
        critical = [f for f in all_failures if f.get('severity') == 'CRITICAL']
        high = [f for f in all_failures if f.get('severity') == 'HIGH']

        return {
            'total_failures': len(all_failures),
            'critical': len(critical),
            'high': len(high),
            'passed': len(all_failures) == 0,
            'failures': all_failures,
            'recommendation': 'BLOCK DEPLOYMENT' if critical else
                             'REVIEW BEFORE DEPLOYMENT' if high else
                             'DEPLOY WITH MONITORING'
        }
import pytest
import requests
import concurrent.futures
from typing import List, Tuple

class SecurityRegressionSuite:
    """
    Automated security regression tests that run on every deployment.
    Tests for the most common high-impact vulnerabilities.
    These don't replace manual testing — they catch regressions fast.
    """

    def __init__(self, base_url: str, auth_tokens: dict):
        self.base_url = base_url
        self.auth_tokens = auth_tokens  # {user_type: token}
        self.session = requests.Session()

    # ═══════════════════════════════════════════════════════
    # AUTHENTICATION REGRESSION TESTS
    # ═══════════════════════════════════════════════════════

    def test_endpoint_requires_authentication(self, endpoints: List[str]) -> List[dict]:
        """Verify all non-public endpoints return 401 without auth"""
        failures = []

        for endpoint in endpoints:
            response = self.session.get(
                f"{self.base_url}{endpoint}",
                allow_redirects=False
            )

            if response.status_code not in [401, 403, 302]:
                failures.append({
                    'test': 'authentication_required',
                    'endpoint': endpoint,
                    'expected': '401/403',
                    'actual': response.status_code,
                    'severity': 'CRITICAL',
                    'finding': f'{endpoint} accessible without authentication'
                })

        return failures

    def test_jwt_algorithm_enforcement(self) -> dict:
        """Test that JWT algorithm is strictly enforced"""
        import jwt
        import json
        import base64

        # Forge a none-algorithm token
        header = base64.urlsafe_b64encode(
            json.dumps({"alg": "none", "typ": "JWT"}).encode()
        ).rstrip(b'=').decode()

        payload = base64.urlsafe_b64encode(
            json.dumps({
                "user_id": "1",
                "role": "admin",
                "exp": 9999999999
            }).encode()
        ).rstrip(b'=').decode()

        forged_token = f"{header}.{payload}."

        response = self.session.get(
            f"{self.base_url}/api/v1/users/profile",
            headers={"Authorization": f"Bearer {forged_token}"}
        )

        return {
            'test': 'jwt_algorithm_none',
            'passed': response.status_code in [401, 403],
            'status_code': response.status_code,
            'severity': 'CRITICAL' if response.status_code == 200 else None,
            'finding': 'JWT none algorithm accepted' if response.status_code == 200 else None
        }

    # ═══════════════════════════════════════════════════════
    # AUTHORIZATION REGRESSION TESTS
    # ═══════════════════════════════════════════════════════

    def test_cross_user_idor(
        self,
        resource_endpoints: List[str],
        user_a_token: str,
        user_b_resources: List[str]
    ) -> List[dict]:
        """Test that User A cannot access User B's resources"""
        failures = []

        headers_a = {"Authorization": f"Bearer {user_a_token}"}

        for endpoint_pattern in resource_endpoints:
            for resource_id in user_b_resources:
                url = endpoint_pattern.replace('{id}', resource_id)

                response = self.session.get(
                    f"{self.base_url}{url}",
                    headers=headers_a,
                    allow_redirects=False
                )

                if response.status_code == 200:
                    failures.append({
                        'test': 'cross_user_idor',
                        'endpoint': url,
                        'accessing_user': 'user_a',
                        'resource_owner': 'user_b',
                        'status_code': 200,
                        'severity': 'HIGH',
                        'finding': f'IDOR: User A can access User B resource at {url}'
                    })

        return failures

    def test_horizontal_privilege_escalation(
        self,
        admin_endpoints: List[str],
        standard_user_token: str
    ) -> List[dict]:
        """Test that standard users cannot access admin endpoints"""
        failures = []

        headers = {"Authorization": f"Bearer {standard_user_token}"}

        for endpoint in admin_endpoints:
            response = self.session.get(
                f"{self.base_url}{endpoint}",
                headers=headers,
                allow_redirects=False
            )

            if response.status_code == 200:
                failures.append({
                    'test': 'privilege_escalation',
                    'endpoint': endpoint,
                    'token_type': 'standard_user',
                    'severity': 'CRITICAL',
                    'finding': f'Standard user accesses admin endpoint: {endpoint}'
                })

        return failures

    # ═══════════════════════════════════════════════════════
    # CORS REGRESSION TESTS
    # ═══════════════════════════════════════════════════════

    def test_cors_origin_reflection(self, api_endpoints: List[str]) -> List[dict]:
        """Test that CORS does not reflect arbitrary origins"""
        failures = []

        test_origin = "<https://evil-test-origin-regression.com>"

        for endpoint in api_endpoints:
            response = self.session.get(
                f"{self.base_url}{endpoint}",
                headers={
                    "Origin": test_origin,
                    "Authorization": f"Bearer {self.auth_tokens.get('standard')}"
                },
                allow_redirects=False
            )

            acao = response.headers.get('Access-Control-Allow-Origin', '')
            acac = response.headers.get('Access-Control-Allow-Credentials', '')

            if acao == test_origin and acac.lower() == 'true':
                failures.append({
                    'test': 'cors_origin_reflection',
                    'endpoint': endpoint,
                    'reflected_origin': test_origin,
                    'credentials_allowed': True,
                    'severity': 'CRITICAL',
                    'finding': f'CORS reflects arbitrary origin with credentials at {endpoint}'
                })

        return failures

    # ═══════════════════════════════════════════════════════
    # INJECTION REGRESSION TESTS
    # ═══════════════════════════════════════════════════════

    def test_sql_injection_time_based(
        self,
        injectable_endpoints: List[Tuple[str, str, str]]  # (endpoint, method, param)
    ) -> List[dict]:
        """
        Test for SQL injection using time-based detection.
        Safe — doesn't read any data, only measures response time difference.
        """
        failures = []

        safe_payload = "test_value_regression_123"
        # MySQL time-based: SLEEP(5)
        sqli_payload_mysql = "test' AND SLEEP(3)--"
        # PostgreSQL time-based: pg_sleep
        sqli_payload_pg = "test' AND pg_sleep(3)--"

        for endpoint, method, param in injectable_endpoints:
            # Baseline response time
            start = time.time()
            safe_response = self.session.request(
                method,
                f"{self.base_url}{endpoint}",
                params={param: safe_payload} if method == 'GET' else None,
                json={param: safe_payload} if method == 'POST' else None,
                headers={"Authorization": f"Bearer {self.auth_tokens.get('standard')}"},
                timeout=10
            )
            baseline_time = time.time() - start

            # Test with MySQL payload
            start = time.time()
            sqli_response = self.session.request(
                method,
                f"{self.base_url}{endpoint}",
                params={param: sqli_payload_mysql} if method == 'GET' else None,
                json={param: sqli_payload_mysql} if method == 'POST' else None,
                headers={"Authorization": f"Bearer {self.auth_tokens.get('standard')}"},
                timeout=15
            )
            test_time = time.time() - start

            # If response is significantly delayed → time-based SQL injection
            if test_time > baseline_time + 2.5:  # 2.5s threshold for 3s sleep
                failures.append({
                    'test': 'sql_injection_time_based',
                    'endpoint': endpoint,
                    'parameter': param,
                    'baseline_time': round(baseline_time, 2),
                    'injection_time': round(test_time, 2),
                    'delay': round(test_time - baseline_time, 2),
                    'severity': 'HIGH',
                    'finding': f'Time-based SQL injection at {endpoint} parameter {param}'
                })

        return failures

    def run_full_regression_suite(
        self,
        endpoints: dict,  # {endpoint_type: [endpoints]}
        user_accounts: dict  # {user_type: {token, resources}}
    ) -> dict:
        """Run the complete regression suite and return consolidated results"""

        all_failures = []

        # Authentication tests
        auth_failures = self.test_endpoint_requires_authentication(
            endpoints.get('authenticated', [])
        )
        all_failures.extend(auth_failures)

        jwt_result = self.test_jwt_algorithm_enforcement()
        if not jwt_result['passed']:
            all_failures.append(jwt_result)

        # Authorization tests
        if 'user_a' in user_accounts and 'user_b' in user_accounts:
            idor_failures = self.test_cross_user_idor(
                endpoints.get('resource', []),
                user_accounts['user_a']['token'],
                user_accounts['user_b']['resource_ids']
            )
            all_failures.extend(idor_failures)

        priv_esc_failures = self.test_horizontal_privilege_escalation(
            endpoints.get('admin', []),
            user_accounts.get('standard', {}).get('token', '')
        )
        all_failures.extend(priv_esc_failures)

        # CORS tests
        cors_failures = self.test_cors_origin_reflection(
            endpoints.get('api', [])
        )
        all_failures.extend(cors_failures)

        # Summary
        critical = [f for f in all_failures if f.get('severity') == 'CRITICAL']
        high = [f for f in all_failures if f.get('severity') == 'HIGH']

        return {
            'total_failures': len(all_failures),
            'critical': len(critical),
            'high': len(high),
            'passed': len(all_failures) == 0,
            'failures': all_failures,
            'recommendation': 'BLOCK DEPLOYMENT' if critical else
                             'REVIEW BEFORE DEPLOYMENT' if high else
                             'DEPLOY WITH MONITORING'
        }
import pytest
import requests
import concurrent.futures
from typing import List, Tuple

class SecurityRegressionSuite:
    """
    Automated security regression tests that run on every deployment.
    Tests for the most common high-impact vulnerabilities.
    These don't replace manual testing — they catch regressions fast.
    """

    def __init__(self, base_url: str, auth_tokens: dict):
        self.base_url = base_url
        self.auth_tokens = auth_tokens  # {user_type: token}
        self.session = requests.Session()

    # ═══════════════════════════════════════════════════════
    # AUTHENTICATION REGRESSION TESTS
    # ═══════════════════════════════════════════════════════

    def test_endpoint_requires_authentication(self, endpoints: List[str]) -> List[dict]:
        """Verify all non-public endpoints return 401 without auth"""
        failures = []

        for endpoint in endpoints:
            response = self.session.get(
                f"{self.base_url}{endpoint}",
                allow_redirects=False
            )

            if response.status_code not in [401, 403, 302]:
                failures.append({
                    'test': 'authentication_required',
                    'endpoint': endpoint,
                    'expected': '401/403',
                    'actual': response.status_code,
                    'severity': 'CRITICAL',
                    'finding': f'{endpoint} accessible without authentication'
                })

        return failures

    def test_jwt_algorithm_enforcement(self) -> dict:
        """Test that JWT algorithm is strictly enforced"""
        import jwt
        import json
        import base64

        # Forge a none-algorithm token
        header = base64.urlsafe_b64encode(
            json.dumps({"alg": "none", "typ": "JWT"}).encode()
        ).rstrip(b'=').decode()

        payload = base64.urlsafe_b64encode(
            json.dumps({
                "user_id": "1",
                "role": "admin",
                "exp": 9999999999
            }).encode()
        ).rstrip(b'=').decode()

        forged_token = f"{header}.{payload}."

        response = self.session.get(
            f"{self.base_url}/api/v1/users/profile",
            headers={"Authorization": f"Bearer {forged_token}"}
        )

        return {
            'test': 'jwt_algorithm_none',
            'passed': response.status_code in [401, 403],
            'status_code': response.status_code,
            'severity': 'CRITICAL' if response.status_code == 200 else None,
            'finding': 'JWT none algorithm accepted' if response.status_code == 200 else None
        }

    # ═══════════════════════════════════════════════════════
    # AUTHORIZATION REGRESSION TESTS
    # ═══════════════════════════════════════════════════════

    def test_cross_user_idor(
        self,
        resource_endpoints: List[str],
        user_a_token: str,
        user_b_resources: List[str]
    ) -> List[dict]:
        """Test that User A cannot access User B's resources"""
        failures = []

        headers_a = {"Authorization": f"Bearer {user_a_token}"}

        for endpoint_pattern in resource_endpoints:
            for resource_id in user_b_resources:
                url = endpoint_pattern.replace('{id}', resource_id)

                response = self.session.get(
                    f"{self.base_url}{url}",
                    headers=headers_a,
                    allow_redirects=False
                )

                if response.status_code == 200:
                    failures.append({
                        'test': 'cross_user_idor',
                        'endpoint': url,
                        'accessing_user': 'user_a',
                        'resource_owner': 'user_b',
                        'status_code': 200,
                        'severity': 'HIGH',
                        'finding': f'IDOR: User A can access User B resource at {url}'
                    })

        return failures

    def test_horizontal_privilege_escalation(
        self,
        admin_endpoints: List[str],
        standard_user_token: str
    ) -> List[dict]:
        """Test that standard users cannot access admin endpoints"""
        failures = []

        headers = {"Authorization": f"Bearer {standard_user_token}"}

        for endpoint in admin_endpoints:
            response = self.session.get(
                f"{self.base_url}{endpoint}",
                headers=headers,
                allow_redirects=False
            )

            if response.status_code == 200:
                failures.append({
                    'test': 'privilege_escalation',
                    'endpoint': endpoint,
                    'token_type': 'standard_user',
                    'severity': 'CRITICAL',
                    'finding': f'Standard user accesses admin endpoint: {endpoint}'
                })

        return failures

    # ═══════════════════════════════════════════════════════
    # CORS REGRESSION TESTS
    # ═══════════════════════════════════════════════════════

    def test_cors_origin_reflection(self, api_endpoints: List[str]) -> List[dict]:
        """Test that CORS does not reflect arbitrary origins"""
        failures = []

        test_origin = "<https://evil-test-origin-regression.com>"

        for endpoint in api_endpoints:
            response = self.session.get(
                f"{self.base_url}{endpoint}",
                headers={
                    "Origin": test_origin,
                    "Authorization": f"Bearer {self.auth_tokens.get('standard')}"
                },
                allow_redirects=False
            )

            acao = response.headers.get('Access-Control-Allow-Origin', '')
            acac = response.headers.get('Access-Control-Allow-Credentials', '')

            if acao == test_origin and acac.lower() == 'true':
                failures.append({
                    'test': 'cors_origin_reflection',
                    'endpoint': endpoint,
                    'reflected_origin': test_origin,
                    'credentials_allowed': True,
                    'severity': 'CRITICAL',
                    'finding': f'CORS reflects arbitrary origin with credentials at {endpoint}'
                })

        return failures

    # ═══════════════════════════════════════════════════════
    # INJECTION REGRESSION TESTS
    # ═══════════════════════════════════════════════════════

    def test_sql_injection_time_based(
        self,
        injectable_endpoints: List[Tuple[str, str, str]]  # (endpoint, method, param)
    ) -> List[dict]:
        """
        Test for SQL injection using time-based detection.
        Safe — doesn't read any data, only measures response time difference.
        """
        failures = []

        safe_payload = "test_value_regression_123"
        # MySQL time-based: SLEEP(5)
        sqli_payload_mysql = "test' AND SLEEP(3)--"
        # PostgreSQL time-based: pg_sleep
        sqli_payload_pg = "test' AND pg_sleep(3)--"

        for endpoint, method, param in injectable_endpoints:
            # Baseline response time
            start = time.time()
            safe_response = self.session.request(
                method,
                f"{self.base_url}{endpoint}",
                params={param: safe_payload} if method == 'GET' else None,
                json={param: safe_payload} if method == 'POST' else None,
                headers={"Authorization": f"Bearer {self.auth_tokens.get('standard')}"},
                timeout=10
            )
            baseline_time = time.time() - start

            # Test with MySQL payload
            start = time.time()
            sqli_response = self.session.request(
                method,
                f"{self.base_url}{endpoint}",
                params={param: sqli_payload_mysql} if method == 'GET' else None,
                json={param: sqli_payload_mysql} if method == 'POST' else None,
                headers={"Authorization": f"Bearer {self.auth_tokens.get('standard')}"},
                timeout=15
            )
            test_time = time.time() - start

            # If response is significantly delayed → time-based SQL injection
            if test_time > baseline_time + 2.5:  # 2.5s threshold for 3s sleep
                failures.append({
                    'test': 'sql_injection_time_based',
                    'endpoint': endpoint,
                    'parameter': param,
                    'baseline_time': round(baseline_time, 2),
                    'injection_time': round(test_time, 2),
                    'delay': round(test_time - baseline_time, 2),
                    'severity': 'HIGH',
                    'finding': f'Time-based SQL injection at {endpoint} parameter {param}'
                })

        return failures

    def run_full_regression_suite(
        self,
        endpoints: dict,  # {endpoint_type: [endpoints]}
        user_accounts: dict  # {user_type: {token, resources}}
    ) -> dict:
        """Run the complete regression suite and return consolidated results"""

        all_failures = []

        # Authentication tests
        auth_failures = self.test_endpoint_requires_authentication(
            endpoints.get('authenticated', [])
        )
        all_failures.extend(auth_failures)

        jwt_result = self.test_jwt_algorithm_enforcement()
        if not jwt_result['passed']:
            all_failures.append(jwt_result)

        # Authorization tests
        if 'user_a' in user_accounts and 'user_b' in user_accounts:
            idor_failures = self.test_cross_user_idor(
                endpoints.get('resource', []),
                user_accounts['user_a']['token'],
                user_accounts['user_b']['resource_ids']
            )
            all_failures.extend(idor_failures)

        priv_esc_failures = self.test_horizontal_privilege_escalation(
            endpoints.get('admin', []),
            user_accounts.get('standard', {}).get('token', '')
        )
        all_failures.extend(priv_esc_failures)

        # CORS tests
        cors_failures = self.test_cors_origin_reflection(
            endpoints.get('api', [])
        )
        all_failures.extend(cors_failures)

        # Summary
        critical = [f for f in all_failures if f.get('severity') == 'CRITICAL']
        high = [f for f in all_failures if f.get('severity') == 'HIGH']

        return {
            'total_failures': len(all_failures),
            'critical': len(critical),
            'high': len(high),
            'passed': len(all_failures) == 0,
            'failures': all_failures,
            'recommendation': 'BLOCK DEPLOYMENT' if critical else
                             'REVIEW BEFORE DEPLOYMENT' if high else
                             'DEPLOY WITH MONITORING'
        }

Part 5: The Economics: Annual vs Continuous Total Cost of Ownership

The surface-level cost comparison (annual pentest = one invoice) consistently underestimates the true cost of the annual model and overestimates the cost of continuous testing:

def calculate_tco_comparison(org_profile: dict) -> dict:
    """
    Calculate Total Cost of Ownership for annual vs continuous security testing.
    Includes direct costs, breach probability adjustment, and remediation costs.
    """

    # Organization profile inputs
    annual_revenue = org_profile['annual_revenue']
    deployment_frequency_per_year = org_profile['deployments_per_year']
    engineering_team_size = org_profile['engineering_team_size']
    avg_engineer_hourly_cost = org_profile['avg_engineer_hourly_cost']
    breach_probability_annual = org_profile['estimated_breach_probability']  # e.g., 0.15 = 15%
    avg_breach_cost = org_profile['avg_breach_cost']  # all-in cost if breach occurs

    # ═══════════════════════════════════════════════════════
    # ANNUAL PENETRATION TESTING MODEL
    # ═══════════════════════════════════════════════════════

    annual_model = {}

    # Direct costs
    annual_model['pentest_cost'] = 25000  # Typical annual pentest (1 week, 1-2 testers)
    annual_model['retest_cost'] = 8000    # Retest after remediation

    # Engineering remediation costs
    # Average: 8 findings, 3 days engineering per finding
    avg_findings = 8
    avg_remediation_days = 3
    annual_model['engineering_remediation_cost'] = (
        avg_findings * avg_remediation_days * 8 *  # 8 hours/day
        avg_engineer_hourly_cost
    )

    # Emergency response costs (for critical findings discovered late)
    # Annual model has longer gap → higher probability of undetected critical issue
    # that then requires emergency response
    prob_emergency_response = 0.35  # 35% chance of emergency security incident
    avg_emergency_response_cost = 50000  # War room, hotfix, communication
    annual_model['expected_emergency_response_cost'] = (
        prob_emergency_response * avg_emergency_response_cost
    )

    # Alert fatigue / wasted engineering time on non-exploitable findings
    # Annual test typically has higher percentage of false positives vs continuous
    annual_model['false_positive_remediation_waste'] = (
        avg_findings * 0.3 *  # 30% false positive rate for annual
        2 * 8 *               # 2 days to discover and document it's a false positive
        avg_engineer_hourly_cost
    )

    # Breach risk — adjusted for longer exposure window
    # Annual model has ~180 day average undetected vulnerability window
    # Breach probability scales with exposure window
    exposure_window_days_annual = 180
    annual_model['adjusted_breach_probability'] = breach_probability_annual * (
        exposure_window_days_annual / 365
    )
    annual_model['expected_breach_cost'] = (
        annual_model['adjusted_breach_probability'] * avg_breach_cost
    )

    annual_model['total_direct_cost'] = (
        annual_model['pentest_cost'] +
        annual_model['retest_cost'] +
        annual_model['engineering_remediation_cost'] +
        annual_model['expected_emergency_response_cost'] +
        annual_model['false_positive_remediation_waste']
    )

    annual_model['total_tco'] = (
        annual_model['total_direct_cost'] +
        annual_model['expected_breach_cost']
    )

    # ═══════════════════════════════════════════════════════
    # CONTINUOUS PENETRATION TESTING MODEL
    # ═══════════════════════════════════════════════════════

    continuous_model = {}

    # Direct costs — subscription model
    continuous_model['monthly_subscription'] = 4500  # Typical continuous program
    continuous_model['annual_subscription_cost'] = continuous_model['monthly_subscription'] * 12

    # Engineering remediation costs — findings caught earlier are cheaper to fix
    # Studies show: 6x cheaper to fix in development vs production
    # Continuous testing catches most issues within 2 weeks of introduction
    continuous_avg_findings = 12  # More findings per year (nothing escapes for 11 months)
    continuous_avg_remediation_days = 1.5  # Caught earlier = simpler fix (feature branch)
    continuous_model['engineering_remediation_cost'] = (
        continuous_avg_findings * continuous_avg_remediation_days * 8 *
        avg_engineer_hourly_cost
    )

    # Emergency response costs — much lower (issues caught before breach)
    prob_emergency_response_continuous = 0.08  # 8% vs 35% for annual
    continuous_model['expected_emergency_response_cost'] = (
        prob_emergency_response_continuous * avg_emergency_response_cost
    )

    # Near-zero false positive waste — continuous testing is more targeted
    continuous_model['false_positive_remediation_waste'] = (
        continuous_avg_findings * 0.05 *  # 5% false positive rate
        1 * 8 *
        avg_engineer_hourly_cost
    )

    # Breach risk — dramatically reduced exposure window
    exposure_window_days_continuous = 14  # 2-week sprint cadence
    continuous_model['adjusted_breach_probability'] = breach_probability_annual * (
        exposure_window_days_continuous / 365
    )
    continuous_model['expected_breach_cost'] = (
        continuous_model['adjusted_breach_probability'] * avg_breach_cost
    )

    continuous_model['total_direct_cost'] = (
        continuous_model['annual_subscription_cost'] +
        continuous_model['engineering_remediation_cost'] +
        continuous_model['expected_emergency_response_cost'] +
        continuous_model['false_positive_remediation_waste']
    )

    continuous_model['total_tco'] = (
        continuous_model['total_direct_cost'] +
        continuous_model['expected_breach_cost']
    )

    # Comparison
    tco_savings = annual_model['total_tco'] - continuous_model['total_tco']

    return {
        'organization_profile': org_profile,
        'annual_model': annual_model,
        'continuous_model': continuous_model,
        'comparison': {
            'annual_tco': round(annual_model['total_tco']),
            'continuous_tco': round(continuous_model['total_tco']),
            'tco_savings': round(tco_savings),
            'savings_percentage': round((tco_savings / annual_model['total_tco']) * 100, 1),
            'breakeven_required_breach_probability': (
                annual_model['total_direct_cost'] - continuous_model['total_direct_cost']
            ) / avg_breach_cost,
            'recommendation': 'Continuous' if tco_savings > 0 else 'Annual',
            'primary_savings_driver': (
                'Breach risk reduction' if continuous_model['expected_breach_cost'] <
                annual_model['expected_breach_cost'] * 0.5
                else 'Engineering efficiency'
            )
        }
    }

# Example calculation:
example_org = {
    'annual_revenue': 10_000_000,
    'deployments_per_year': 52,  # Weekly releases
    'engineering_team_size': 15,
    'avg_engineer_hourly_cost': 100,
    'estimated_breach_probability': 0.12,  # 12% annual breach probability
    'avg_breach_cost': 500_000
}

result = calculate_tco_comparison(example_org)
print(f"Annual model TCO:     ${result['comparison']['annual_tco']:,}")
print(f"Continuous model TCO: ${result['comparison']['continuous_tco']:,}")
print(f"Expected savings:     ${result['comparison']['tco_savings']:,}")
def calculate_tco_comparison(org_profile: dict) -> dict:
    """
    Calculate Total Cost of Ownership for annual vs continuous security testing.
    Includes direct costs, breach probability adjustment, and remediation costs.
    """

    # Organization profile inputs
    annual_revenue = org_profile['annual_revenue']
    deployment_frequency_per_year = org_profile['deployments_per_year']
    engineering_team_size = org_profile['engineering_team_size']
    avg_engineer_hourly_cost = org_profile['avg_engineer_hourly_cost']
    breach_probability_annual = org_profile['estimated_breach_probability']  # e.g., 0.15 = 15%
    avg_breach_cost = org_profile['avg_breach_cost']  # all-in cost if breach occurs

    # ═══════════════════════════════════════════════════════
    # ANNUAL PENETRATION TESTING MODEL
    # ═══════════════════════════════════════════════════════

    annual_model = {}

    # Direct costs
    annual_model['pentest_cost'] = 25000  # Typical annual pentest (1 week, 1-2 testers)
    annual_model['retest_cost'] = 8000    # Retest after remediation

    # Engineering remediation costs
    # Average: 8 findings, 3 days engineering per finding
    avg_findings = 8
    avg_remediation_days = 3
    annual_model['engineering_remediation_cost'] = (
        avg_findings * avg_remediation_days * 8 *  # 8 hours/day
        avg_engineer_hourly_cost
    )

    # Emergency response costs (for critical findings discovered late)
    # Annual model has longer gap → higher probability of undetected critical issue
    # that then requires emergency response
    prob_emergency_response = 0.35  # 35% chance of emergency security incident
    avg_emergency_response_cost = 50000  # War room, hotfix, communication
    annual_model['expected_emergency_response_cost'] = (
        prob_emergency_response * avg_emergency_response_cost
    )

    # Alert fatigue / wasted engineering time on non-exploitable findings
    # Annual test typically has higher percentage of false positives vs continuous
    annual_model['false_positive_remediation_waste'] = (
        avg_findings * 0.3 *  # 30% false positive rate for annual
        2 * 8 *               # 2 days to discover and document it's a false positive
        avg_engineer_hourly_cost
    )

    # Breach risk — adjusted for longer exposure window
    # Annual model has ~180 day average undetected vulnerability window
    # Breach probability scales with exposure window
    exposure_window_days_annual = 180
    annual_model['adjusted_breach_probability'] = breach_probability_annual * (
        exposure_window_days_annual / 365
    )
    annual_model['expected_breach_cost'] = (
        annual_model['adjusted_breach_probability'] * avg_breach_cost
    )

    annual_model['total_direct_cost'] = (
        annual_model['pentest_cost'] +
        annual_model['retest_cost'] +
        annual_model['engineering_remediation_cost'] +
        annual_model['expected_emergency_response_cost'] +
        annual_model['false_positive_remediation_waste']
    )

    annual_model['total_tco'] = (
        annual_model['total_direct_cost'] +
        annual_model['expected_breach_cost']
    )

    # ═══════════════════════════════════════════════════════
    # CONTINUOUS PENETRATION TESTING MODEL
    # ═══════════════════════════════════════════════════════

    continuous_model = {}

    # Direct costs — subscription model
    continuous_model['monthly_subscription'] = 4500  # Typical continuous program
    continuous_model['annual_subscription_cost'] = continuous_model['monthly_subscription'] * 12

    # Engineering remediation costs — findings caught earlier are cheaper to fix
    # Studies show: 6x cheaper to fix in development vs production
    # Continuous testing catches most issues within 2 weeks of introduction
    continuous_avg_findings = 12  # More findings per year (nothing escapes for 11 months)
    continuous_avg_remediation_days = 1.5  # Caught earlier = simpler fix (feature branch)
    continuous_model['engineering_remediation_cost'] = (
        continuous_avg_findings * continuous_avg_remediation_days * 8 *
        avg_engineer_hourly_cost
    )

    # Emergency response costs — much lower (issues caught before breach)
    prob_emergency_response_continuous = 0.08  # 8% vs 35% for annual
    continuous_model['expected_emergency_response_cost'] = (
        prob_emergency_response_continuous * avg_emergency_response_cost
    )

    # Near-zero false positive waste — continuous testing is more targeted
    continuous_model['false_positive_remediation_waste'] = (
        continuous_avg_findings * 0.05 *  # 5% false positive rate
        1 * 8 *
        avg_engineer_hourly_cost
    )

    # Breach risk — dramatically reduced exposure window
    exposure_window_days_continuous = 14  # 2-week sprint cadence
    continuous_model['adjusted_breach_probability'] = breach_probability_annual * (
        exposure_window_days_continuous / 365
    )
    continuous_model['expected_breach_cost'] = (
        continuous_model['adjusted_breach_probability'] * avg_breach_cost
    )

    continuous_model['total_direct_cost'] = (
        continuous_model['annual_subscription_cost'] +
        continuous_model['engineering_remediation_cost'] +
        continuous_model['expected_emergency_response_cost'] +
        continuous_model['false_positive_remediation_waste']
    )

    continuous_model['total_tco'] = (
        continuous_model['total_direct_cost'] +
        continuous_model['expected_breach_cost']
    )

    # Comparison
    tco_savings = annual_model['total_tco'] - continuous_model['total_tco']

    return {
        'organization_profile': org_profile,
        'annual_model': annual_model,
        'continuous_model': continuous_model,
        'comparison': {
            'annual_tco': round(annual_model['total_tco']),
            'continuous_tco': round(continuous_model['total_tco']),
            'tco_savings': round(tco_savings),
            'savings_percentage': round((tco_savings / annual_model['total_tco']) * 100, 1),
            'breakeven_required_breach_probability': (
                annual_model['total_direct_cost'] - continuous_model['total_direct_cost']
            ) / avg_breach_cost,
            'recommendation': 'Continuous' if tco_savings > 0 else 'Annual',
            'primary_savings_driver': (
                'Breach risk reduction' if continuous_model['expected_breach_cost'] <
                annual_model['expected_breach_cost'] * 0.5
                else 'Engineering efficiency'
            )
        }
    }

# Example calculation:
example_org = {
    'annual_revenue': 10_000_000,
    'deployments_per_year': 52,  # Weekly releases
    'engineering_team_size': 15,
    'avg_engineer_hourly_cost': 100,
    'estimated_breach_probability': 0.12,  # 12% annual breach probability
    'avg_breach_cost': 500_000
}

result = calculate_tco_comparison(example_org)
print(f"Annual model TCO:     ${result['comparison']['annual_tco']:,}")
print(f"Continuous model TCO: ${result['comparison']['continuous_tco']:,}")
print(f"Expected savings:     ${result['comparison']['tco_savings']:,}")
def calculate_tco_comparison(org_profile: dict) -> dict:
    """
    Calculate Total Cost of Ownership for annual vs continuous security testing.
    Includes direct costs, breach probability adjustment, and remediation costs.
    """

    # Organization profile inputs
    annual_revenue = org_profile['annual_revenue']
    deployment_frequency_per_year = org_profile['deployments_per_year']
    engineering_team_size = org_profile['engineering_team_size']
    avg_engineer_hourly_cost = org_profile['avg_engineer_hourly_cost']
    breach_probability_annual = org_profile['estimated_breach_probability']  # e.g., 0.15 = 15%
    avg_breach_cost = org_profile['avg_breach_cost']  # all-in cost if breach occurs

    # ═══════════════════════════════════════════════════════
    # ANNUAL PENETRATION TESTING MODEL
    # ═══════════════════════════════════════════════════════

    annual_model = {}

    # Direct costs
    annual_model['pentest_cost'] = 25000  # Typical annual pentest (1 week, 1-2 testers)
    annual_model['retest_cost'] = 8000    # Retest after remediation

    # Engineering remediation costs
    # Average: 8 findings, 3 days engineering per finding
    avg_findings = 8
    avg_remediation_days = 3
    annual_model['engineering_remediation_cost'] = (
        avg_findings * avg_remediation_days * 8 *  # 8 hours/day
        avg_engineer_hourly_cost
    )

    # Emergency response costs (for critical findings discovered late)
    # Annual model has longer gap → higher probability of undetected critical issue
    # that then requires emergency response
    prob_emergency_response = 0.35  # 35% chance of emergency security incident
    avg_emergency_response_cost = 50000  # War room, hotfix, communication
    annual_model['expected_emergency_response_cost'] = (
        prob_emergency_response * avg_emergency_response_cost
    )

    # Alert fatigue / wasted engineering time on non-exploitable findings
    # Annual test typically has higher percentage of false positives vs continuous
    annual_model['false_positive_remediation_waste'] = (
        avg_findings * 0.3 *  # 30% false positive rate for annual
        2 * 8 *               # 2 days to discover and document it's a false positive
        avg_engineer_hourly_cost
    )

    # Breach risk — adjusted for longer exposure window
    # Annual model has ~180 day average undetected vulnerability window
    # Breach probability scales with exposure window
    exposure_window_days_annual = 180
    annual_model['adjusted_breach_probability'] = breach_probability_annual * (
        exposure_window_days_annual / 365
    )
    annual_model['expected_breach_cost'] = (
        annual_model['adjusted_breach_probability'] * avg_breach_cost
    )

    annual_model['total_direct_cost'] = (
        annual_model['pentest_cost'] +
        annual_model['retest_cost'] +
        annual_model['engineering_remediation_cost'] +
        annual_model['expected_emergency_response_cost'] +
        annual_model['false_positive_remediation_waste']
    )

    annual_model['total_tco'] = (
        annual_model['total_direct_cost'] +
        annual_model['expected_breach_cost']
    )

    # ═══════════════════════════════════════════════════════
    # CONTINUOUS PENETRATION TESTING MODEL
    # ═══════════════════════════════════════════════════════

    continuous_model = {}

    # Direct costs — subscription model
    continuous_model['monthly_subscription'] = 4500  # Typical continuous program
    continuous_model['annual_subscription_cost'] = continuous_model['monthly_subscription'] * 12

    # Engineering remediation costs — findings caught earlier are cheaper to fix
    # Studies show: 6x cheaper to fix in development vs production
    # Continuous testing catches most issues within 2 weeks of introduction
    continuous_avg_findings = 12  # More findings per year (nothing escapes for 11 months)
    continuous_avg_remediation_days = 1.5  # Caught earlier = simpler fix (feature branch)
    continuous_model['engineering_remediation_cost'] = (
        continuous_avg_findings * continuous_avg_remediation_days * 8 *
        avg_engineer_hourly_cost
    )

    # Emergency response costs — much lower (issues caught before breach)
    prob_emergency_response_continuous = 0.08  # 8% vs 35% for annual
    continuous_model['expected_emergency_response_cost'] = (
        prob_emergency_response_continuous * avg_emergency_response_cost
    )

    # Near-zero false positive waste — continuous testing is more targeted
    continuous_model['false_positive_remediation_waste'] = (
        continuous_avg_findings * 0.05 *  # 5% false positive rate
        1 * 8 *
        avg_engineer_hourly_cost
    )

    # Breach risk — dramatically reduced exposure window
    exposure_window_days_continuous = 14  # 2-week sprint cadence
    continuous_model['adjusted_breach_probability'] = breach_probability_annual * (
        exposure_window_days_continuous / 365
    )
    continuous_model['expected_breach_cost'] = (
        continuous_model['adjusted_breach_probability'] * avg_breach_cost
    )

    continuous_model['total_direct_cost'] = (
        continuous_model['annual_subscription_cost'] +
        continuous_model['engineering_remediation_cost'] +
        continuous_model['expected_emergency_response_cost'] +
        continuous_model['false_positive_remediation_waste']
    )

    continuous_model['total_tco'] = (
        continuous_model['total_direct_cost'] +
        continuous_model['expected_breach_cost']
    )

    # Comparison
    tco_savings = annual_model['total_tco'] - continuous_model['total_tco']

    return {
        'organization_profile': org_profile,
        'annual_model': annual_model,
        'continuous_model': continuous_model,
        'comparison': {
            'annual_tco': round(annual_model['total_tco']),
            'continuous_tco': round(continuous_model['total_tco']),
            'tco_savings': round(tco_savings),
            'savings_percentage': round((tco_savings / annual_model['total_tco']) * 100, 1),
            'breakeven_required_breach_probability': (
                annual_model['total_direct_cost'] - continuous_model['total_direct_cost']
            ) / avg_breach_cost,
            'recommendation': 'Continuous' if tco_savings > 0 else 'Annual',
            'primary_savings_driver': (
                'Breach risk reduction' if continuous_model['expected_breach_cost'] <
                annual_model['expected_breach_cost'] * 0.5
                else 'Engineering efficiency'
            )
        }
    }

# Example calculation:
example_org = {
    'annual_revenue': 10_000_000,
    'deployments_per_year': 52,  # Weekly releases
    'engineering_team_size': 15,
    'avg_engineer_hourly_cost': 100,
    'estimated_breach_probability': 0.12,  # 12% annual breach probability
    'avg_breach_cost': 500_000
}

result = calculate_tco_comparison(example_org)
print(f"Annual model TCO:     ${result['comparison']['annual_tco']:,}")
print(f"Continuous model TCO: ${result['comparison']['continuous_tco']:,}")
print(f"Expected savings:     ${result['comparison']['tco_savings']:,}")

The Economics Summary Table

Cost Category

Annual Model

Continuous Model

Delta

Direct testing cost

$25,000–$50,000

$48,000–$72,000/yr (sub)

+$10K–$25K

Retest cost

$8,000–$15,000

Included in subscription

-$12K

Engineering remediation

$19,200 (8 findings × 3 days)

$14,400 (12 findings × 1.5 days)

-$4,800

False positive waste

$9,600 (30% false positive)

$1,600 (5% false positive)

-$8,000

Emergency response

$17,500 (35% probability)

$4,000 (8% probability)

-$13,500

Expected breach cost ($500K × probability)

$24,657 (180-day window)

$1,644 (14-day window)

-$23,013

Total TCO

~$104,000

~$80,000

-$24,000

These are illustrative figures for a company with $10M ARR, weekly releases, 15 engineers at $100/hr, 12% breach probability, $500K average breach cost.

Part 6: The Maturity Model: Which Testing Cadence Fits Your Organization

The Security Testing Maturity Framework

Not every organization needs or can operationalize the same testing model. The correct cadence depends on deployment velocity, risk profile, team maturity, and compliance requirements:

Maturity Level

Description

Deployment Velocity

Testing Model

Minimum Frequency

Level 0

No structured security testing

Any

Annual minimum

Annual

Level 1

Compliance-driven testing

Monthly or less

Annual + automated scanning

Annual

Level 2

Risk-aware testing

Bi-weekly

Quarterly + sprint-aware

Quarterly

Level 3

DevSecOps-integrated testing

Weekly

Sprint-cadence + monthly deep

Per-sprint

Level 4

Continuous security program

Daily

Continuous all layers

Ongoing

Level 5

Security-native development

Continuous

Embedded, automated + weekly deep

Real-time

Decision Framework: Annual vs Continuous

Choosing the right testing model depends on three factors:

  • how fast your system changes

  • how sensitive your data is

  • what your compliance requirements demand

Instead of a single answer, use this decision framework.

1. How Often Do You Deploy?

Your deployment frequency directly determines how quickly risk accumulates.

Deployment Frequency

Recommended Model

Why It Matters

What to Invest In

Less than monthly

Annual or semi-annual

Attack surface changes slowly

Strong pre-deployment security reviews

Monthly to bi-weekly

Quarterly (minimum)

New risk accumulates faster than annual coverage

Quarterly external tests + automated regression

Weekly or more

Continuous or sprint-based

Annual testing covers <10% of deployments

Security program aligned with release cadence

2. What Data Do You Handle?

Data sensitivity changes both risk tolerance and testing frequency requirements.

Data Type

Recommended Approach

Why

PII (>10K users), payment data, health data

Quarterly or continuous (minimum annual for compliance)

Breach impact + regulatory exposure is high

Business confidential, moderate PII

Annual minimum, quarterly if deploying frequently

Risk grows with deployment velocity

Internal tools, low sensitivity

Annual may be sufficient

Lower impact if compromised

👉 In high-risk environments, economics shift, breach cost often justifies continuous testing.

3. What Is Your Regulatory Environment?

Compliance sets the minimum, not the optimal level of security.

Framework

Requirement

What It Actually Means

PCI-DSS Level 1/2

Annual pentest required

Continuous testing supplements, not replaces

SOC 2 Type II

Annual expected

Continuous testing strengthens audit posture

HIPAA

Annual risk assessment

Testing frequency is risk-based

ISO 27001

Annual pentest (typical)

Continuous monitoring required

👉 Key insight: Compliance ≠ sufficient security

4. Do You Have a Security Team?

Your ability to act on findings determines how continuous your model can be.

Team Setup

Recommended Model

Why

Dedicated security team (even 1 person)

Continuous testing

Can triage and respond in real time

No dedicated team (shared responsibility)

Sprint-based / monthly cadence

Prevents alert overload

No team + no plans

Quarterly testing

Continuous model will fail operationally

Final Recommendation

If you simplify everything above, the decision comes down to this:

Scenario

Recommended Model

High velocity (weekly+) + sensitive data + budget

Continuous

High velocity (weekly+) + sensitive data + limited budget

Quarterly

Moderate velocity (monthly) + sensitive data

Quarterly

Moderate velocity + low sensitivity

Semi-annual

Low velocity (monthly or less)

Annual

That said, the right testing model is not about preference. It’s about alignment. If your system changes faster than your testing cycle, risk accumulates faster than it is detected.

Part 7: Designing the Continuous Testing Program: The Implementation Playbook

Phase 1: Baseline Establishment (Month 1)




Phase 2: Continuous Operations (Month 2+)




The Finding SLA Matrix for Continuous Programs

Continuous testing requires clear SLAs, because findings arrive continuously, the team needs defined timelines for each severity:

Severity

CVSS Range

Acknowledgment SLA

Remediation SLA

Retest SLA

Escalation

Critical

9.0–10.0

4 hours

48 hours

Within 24h of fix

C-suite notification

High

7.0–8.9

24 hours

7 days

Within 48h of fix

Security team lead

Medium

4.0–6.9

72 hours

30 days

Within sprint

Engineering manager

Low

0.1–3.9

1 week

90 days

Next quarterly

Backlog

Informational

N/A

2 weeks

Next roadmap

N/A

None

Part 8: Common Failure Modes in Continuous Testing Programs

Why Continuous Programs Fail After 6 Months

Organizations that start continuous testing programs often abandon them within 6–12 months. The failure patterns are consistent:

Failure Mode 1: Finding Fatigue Without Triage




Failure Mode 2: Testing Doesn't Track Deployment Changes




Failure Mode 3: Surface Monitoring Without Action




Failure Mode 4: Compliance-Minimum Thinking




Part 9: Metrics That Define a Successful Continuous Program

The KPI Stack for Continuous Security Testing

class ContinuousSecurityProgramMetrics:
    """Track and report continuous security testing program effectiveness"""

    def calculate_program_kpis(self, program_data: dict) -> dict:

        findings_data = program_data['findings']
        test_events = program_data['test_events']
        deployments = program_data['deployments']

        # KPI 1: Mean Time to Detection (MTTD)
        # How long from vulnerability introduction to detection?
        mttd_values = []
        for finding in findings_data:
            if finding.get('introduction_date') and finding.get('detection_date'):
                days = (finding['detection_date'] - finding['introduction_date']).days
                mttd_values.append(days)

        mttd = sum(mttd_values) / len(mttd_values) if mttd_values else None

        # KPI 2: Mean Time to Remediation (MTTR)
        # How long from detection to confirmed fix?
        mttr_values = []
        for finding in findings_data:
            if finding.get('detection_date') and finding.get('remediation_date'):
                days = (finding['remediation_date'] - finding['detection_date']).days
                mttr_values.append(days)

        mttr = sum(mttr_values) / len(mttr_values) if mttr_values else None

        # KPI 3: Vulnerability Introduction Rate
        # New security findings per 100 deployments
        total_findings = len(findings_data)
        total_deployments = len(deployments)
        vuln_rate = (total_findings / total_deployments * 100) if total_deployments else 0

        # KPI 4: Escape Rate
        # Percentage of vulnerabilities NOT caught before production
        # (Found by external researchers or incident response, not internal testing)
        external_discoveries = sum(
            1 for f in findings_data
            if f.get('discovered_by') == 'external'
        )
        escape_rate = (external_discoveries / total_findings * 100) if total_findings else 0

        # KPI 5: SLA Compliance Rate
        # Percentage of findings remediated within defined SLAs
        sla_compliant = sum(
            1 for f in findings_data
            if f.get('remediated_within_sla') == True
        )
        sla_rate = (sla_compliant / total_findings * 100) if total_findings else 0

        # KPI 6: CVSS Trend
        # Is the average CVSS of findings going up or down over time?
        monthly_avg_cvss = {}
        for finding in findings_data:
            month = finding['detection_date'].strftime('%Y-%m')
            if month not in monthly_avg_cvss:
                monthly_avg_cvss[month] = []
            monthly_avg_cvss[month].append(finding['cvss'])

        cvss_trend = {
            month: sum(scores) / len(scores)
            for month, scores in monthly_avg_cvss.items()
        }

        # KPI 7: Attack Surface Growth Rate
        # How fast is the untested attack surface growing?
        surface_snapshots = program_data.get('surface_snapshots', [])
        if len(surface_snapshots) >= 2:
            first = surface_snapshots[0]
            last = surface_snapshots[-1]
            surface_growth = (
                (len(last['endpoints']) - len(first['endpoints'])) /
                len(first['endpoints']) * 100
            )
        else:
            surface_growth = None

        return {
            'mean_time_to_detection_days': round(mttd, 1) if mttd else 'N/A',
            'mean_time_to_remediation_days': round(mttr, 1) if mttr else 'N/A',
            'vulnerability_introduction_rate_per_100_deployments': round(vuln_rate, 2),
            'escape_rate_percent': round(escape_rate, 1),
            'sla_compliance_rate_percent': round(sla_rate, 1),
            'cvss_trend_by_month': cvss_trend,
            'attack_surface_growth_percent': round(surface_growth, 1) if surface_growth else 'N/A',

            'program_health': self.assess_program_health(mttd, mttr, escape_rate, sla_rate),

            'benchmarks': {
                'mttd_industry_annual': 180,  # days
                'mttd_industry_continuous': 14,
                'mttd_your_program': mttd,
                'mttr_pci_requirement_critical': 1,  # day
                'sla_compliance_target': 95,  # percent
            }
        }

    def assess_program_health(self, mttd, mttr, escape_rate, sla_rate) -> str:
        score = 0

        if mttd and mttd < 14: score += 2
        elif mttd and mttd < 30: score += 1

        if mttr and mttr < 7: score += 2
        elif mttr and mttr < 30: score += 1

        if escape_rate < 5: score += 2
        elif escape_rate < 15: score += 1

        if sla_rate > 95: score += 2
        elif sla_rate > 80: score += 1

        if score >= 7: return 'EXCELLENT'
        elif score >= 5: return 'GOOD'
        elif score >= 3: return 'IMPROVING'
        else: return 'NEEDS_ATTENTION'
class ContinuousSecurityProgramMetrics:
    """Track and report continuous security testing program effectiveness"""

    def calculate_program_kpis(self, program_data: dict) -> dict:

        findings_data = program_data['findings']
        test_events = program_data['test_events']
        deployments = program_data['deployments']

        # KPI 1: Mean Time to Detection (MTTD)
        # How long from vulnerability introduction to detection?
        mttd_values = []
        for finding in findings_data:
            if finding.get('introduction_date') and finding.get('detection_date'):
                days = (finding['detection_date'] - finding['introduction_date']).days
                mttd_values.append(days)

        mttd = sum(mttd_values) / len(mttd_values) if mttd_values else None

        # KPI 2: Mean Time to Remediation (MTTR)
        # How long from detection to confirmed fix?
        mttr_values = []
        for finding in findings_data:
            if finding.get('detection_date') and finding.get('remediation_date'):
                days = (finding['remediation_date'] - finding['detection_date']).days
                mttr_values.append(days)

        mttr = sum(mttr_values) / len(mttr_values) if mttr_values else None

        # KPI 3: Vulnerability Introduction Rate
        # New security findings per 100 deployments
        total_findings = len(findings_data)
        total_deployments = len(deployments)
        vuln_rate = (total_findings / total_deployments * 100) if total_deployments else 0

        # KPI 4: Escape Rate
        # Percentage of vulnerabilities NOT caught before production
        # (Found by external researchers or incident response, not internal testing)
        external_discoveries = sum(
            1 for f in findings_data
            if f.get('discovered_by') == 'external'
        )
        escape_rate = (external_discoveries / total_findings * 100) if total_findings else 0

        # KPI 5: SLA Compliance Rate
        # Percentage of findings remediated within defined SLAs
        sla_compliant = sum(
            1 for f in findings_data
            if f.get('remediated_within_sla') == True
        )
        sla_rate = (sla_compliant / total_findings * 100) if total_findings else 0

        # KPI 6: CVSS Trend
        # Is the average CVSS of findings going up or down over time?
        monthly_avg_cvss = {}
        for finding in findings_data:
            month = finding['detection_date'].strftime('%Y-%m')
            if month not in monthly_avg_cvss:
                monthly_avg_cvss[month] = []
            monthly_avg_cvss[month].append(finding['cvss'])

        cvss_trend = {
            month: sum(scores) / len(scores)
            for month, scores in monthly_avg_cvss.items()
        }

        # KPI 7: Attack Surface Growth Rate
        # How fast is the untested attack surface growing?
        surface_snapshots = program_data.get('surface_snapshots', [])
        if len(surface_snapshots) >= 2:
            first = surface_snapshots[0]
            last = surface_snapshots[-1]
            surface_growth = (
                (len(last['endpoints']) - len(first['endpoints'])) /
                len(first['endpoints']) * 100
            )
        else:
            surface_growth = None

        return {
            'mean_time_to_detection_days': round(mttd, 1) if mttd else 'N/A',
            'mean_time_to_remediation_days': round(mttr, 1) if mttr else 'N/A',
            'vulnerability_introduction_rate_per_100_deployments': round(vuln_rate, 2),
            'escape_rate_percent': round(escape_rate, 1),
            'sla_compliance_rate_percent': round(sla_rate, 1),
            'cvss_trend_by_month': cvss_trend,
            'attack_surface_growth_percent': round(surface_growth, 1) if surface_growth else 'N/A',

            'program_health': self.assess_program_health(mttd, mttr, escape_rate, sla_rate),

            'benchmarks': {
                'mttd_industry_annual': 180,  # days
                'mttd_industry_continuous': 14,
                'mttd_your_program': mttd,
                'mttr_pci_requirement_critical': 1,  # day
                'sla_compliance_target': 95,  # percent
            }
        }

    def assess_program_health(self, mttd, mttr, escape_rate, sla_rate) -> str:
        score = 0

        if mttd and mttd < 14: score += 2
        elif mttd and mttd < 30: score += 1

        if mttr and mttr < 7: score += 2
        elif mttr and mttr < 30: score += 1

        if escape_rate < 5: score += 2
        elif escape_rate < 15: score += 1

        if sla_rate > 95: score += 2
        elif sla_rate > 80: score += 1

        if score >= 7: return 'EXCELLENT'
        elif score >= 5: return 'GOOD'
        elif score >= 3: return 'IMPROVING'
        else: return 'NEEDS_ATTENTION'
class ContinuousSecurityProgramMetrics:
    """Track and report continuous security testing program effectiveness"""

    def calculate_program_kpis(self, program_data: dict) -> dict:

        findings_data = program_data['findings']
        test_events = program_data['test_events']
        deployments = program_data['deployments']

        # KPI 1: Mean Time to Detection (MTTD)
        # How long from vulnerability introduction to detection?
        mttd_values = []
        for finding in findings_data:
            if finding.get('introduction_date') and finding.get('detection_date'):
                days = (finding['detection_date'] - finding['introduction_date']).days
                mttd_values.append(days)

        mttd = sum(mttd_values) / len(mttd_values) if mttd_values else None

        # KPI 2: Mean Time to Remediation (MTTR)
        # How long from detection to confirmed fix?
        mttr_values = []
        for finding in findings_data:
            if finding.get('detection_date') and finding.get('remediation_date'):
                days = (finding['remediation_date'] - finding['detection_date']).days
                mttr_values.append(days)

        mttr = sum(mttr_values) / len(mttr_values) if mttr_values else None

        # KPI 3: Vulnerability Introduction Rate
        # New security findings per 100 deployments
        total_findings = len(findings_data)
        total_deployments = len(deployments)
        vuln_rate = (total_findings / total_deployments * 100) if total_deployments else 0

        # KPI 4: Escape Rate
        # Percentage of vulnerabilities NOT caught before production
        # (Found by external researchers or incident response, not internal testing)
        external_discoveries = sum(
            1 for f in findings_data
            if f.get('discovered_by') == 'external'
        )
        escape_rate = (external_discoveries / total_findings * 100) if total_findings else 0

        # KPI 5: SLA Compliance Rate
        # Percentage of findings remediated within defined SLAs
        sla_compliant = sum(
            1 for f in findings_data
            if f.get('remediated_within_sla') == True
        )
        sla_rate = (sla_compliant / total_findings * 100) if total_findings else 0

        # KPI 6: CVSS Trend
        # Is the average CVSS of findings going up or down over time?
        monthly_avg_cvss = {}
        for finding in findings_data:
            month = finding['detection_date'].strftime('%Y-%m')
            if month not in monthly_avg_cvss:
                monthly_avg_cvss[month] = []
            monthly_avg_cvss[month].append(finding['cvss'])

        cvss_trend = {
            month: sum(scores) / len(scores)
            for month, scores in monthly_avg_cvss.items()
        }

        # KPI 7: Attack Surface Growth Rate
        # How fast is the untested attack surface growing?
        surface_snapshots = program_data.get('surface_snapshots', [])
        if len(surface_snapshots) >= 2:
            first = surface_snapshots[0]
            last = surface_snapshots[-1]
            surface_growth = (
                (len(last['endpoints']) - len(first['endpoints'])) /
                len(first['endpoints']) * 100
            )
        else:
            surface_growth = None

        return {
            'mean_time_to_detection_days': round(mttd, 1) if mttd else 'N/A',
            'mean_time_to_remediation_days': round(mttr, 1) if mttr else 'N/A',
            'vulnerability_introduction_rate_per_100_deployments': round(vuln_rate, 2),
            'escape_rate_percent': round(escape_rate, 1),
            'sla_compliance_rate_percent': round(sla_rate, 1),
            'cvss_trend_by_month': cvss_trend,
            'attack_surface_growth_percent': round(surface_growth, 1) if surface_growth else 'N/A',

            'program_health': self.assess_program_health(mttd, mttr, escape_rate, sla_rate),

            'benchmarks': {
                'mttd_industry_annual': 180,  # days
                'mttd_industry_continuous': 14,
                'mttd_your_program': mttd,
                'mttr_pci_requirement_critical': 1,  # day
                'sla_compliance_target': 95,  # percent
            }
        }

    def assess_program_health(self, mttd, mttr, escape_rate, sla_rate) -> str:
        score = 0

        if mttd and mttd < 14: score += 2
        elif mttd and mttd < 30: score += 1

        if mttr and mttr < 7: score += 2
        elif mttr and mttr < 30: score += 1

        if escape_rate < 5: score += 2
        elif escape_rate < 15: score += 1

        if sla_rate > 95: score += 2
        elif sla_rate > 80: score += 1

        if score >= 7: return 'EXCELLENT'
        elif score >= 5: return 'GOOD'
        elif score >= 3: return 'IMPROVING'
        else: return 'NEEDS_ATTENTION'

The Security Posture Dashboard

Metric

Annual Model Baseline

Continuous Program Target

Why It Matters

Mean Time to Detection

~180 days

<14 days

Determines breach window

Mean Time to Remediation

~45 days (batch quarterly)

<7 days (continuous pipeline)

Reduces risk-open duration

Vulnerability Escape Rate

~25% (found by others first)

<5%

Measures program effectiveness

SLA Compliance Rate

~60%

>95%

Audit evidence quality

False Positive Rate

~40%

<10%

Engineering team trust

Attack Surface Coverage

~70% (tested version drifts)

>95% (weekly updates)

Completeness of protection

CVSS Trend (target: declining)

Uncorrelated

Measurable decline

Program impact evidence

The Gap Closes Every Sprint or It Never Closes

Annual penetration testing assumes your application stays roughly the same all year. It doesn’t. If you ship weekly, that assumption breaks within weeks not months.

What you’re left with is a growing exposure window:

  • new code is deployed

  • new attack surface is introduced

  • new vulnerabilities go untested

This is the deployment velocity gap, and it’s measurable. Continuous penetration testing fixes this by aligning security cadence with deployment cadence.

Instead of testing once and hoping it holds, every change gets evaluated within the same cycle it was introduced.

  • New sprint → new attack surface

  • New attack surface → new testing

  • No change goes untested for long

That’s how the gap actually closes. The impact is straightforward:

  • Detection time drops from months to days

  • Undetected vulnerability windows shrink significantly

  • Security becomes part of delivery, not a checkpoint after it

And unlike annual testing, you’re not relying on a single snapshot. You’re building continuous evidence that your system is secure right now.

CodeAnt AI’s continuous pentesting model is built for teams that ship fast:

  • Sprint-cadence security testing aligned with releases

  • Critical findings surfaced within 48 hours

  • Continuous visibility into your real attack surface

No long setup. No waiting months for answers.

Start with a quick scoping call. See what your current gap actually looks like.

Continue reading:

FAQs

We pass our annual SOC 2 audit, why do we need continuous testing?

Continuous testing sounds expensive. What's the minimum viable program for a 10-person startup?

How do we convince our CTO that continuous testing is worth the investment over just doing the annual test?

Can we do continuous testing ourselves with an internal team?

What happens to our existing annual pentest commitment if we switch to continuous?

Table of Contents

Start Your 14-Day Free Trial

AI code reviews, security, and quality trusted by modern engineering teams. No credit card required!

Share blog: