Real-World Examples

Complete, production-ready security rules you can copy and run immediately. These examples detect real OWASP Top 10 vulnerabilities.

Ready to use. All examples on this page are complete, tested rules that work on first try. Just copy, save as owasp_rules.py, and run pathfinder scan --rules owasp_rules.py --project .

Complete OWASP Top 10 Ruleset

Copy this complete file for production-ready OWASP Top 10 detection:

#!/usr/bin/env python3
"""
OWASP Top 10 Security Rules for Code Pathfinder
Production-ready rules for detecting common web vulnerabilities.

Usage:
    pathfinder scan --rules owasp_top10.py --project /path/to/your/code
"""

from codepathfinder import calls, flows, rule, Or
from codepathfinder.presets import PropagationPresets


# =============================================================================
# A03:2021 - Injection
# =============================================================================

@rule(
    id="owasp-a03-sqli",
    severity="critical",
    cwe="CWE-89",
    owasp="A03:2021",
)
def detect_sql_injection():
    """
    Detects SQL injection where user input flows to SQL execution.

    Covers:
    - Direct SQL execution (cursor.execute)
    - ORM raw queries (Model.raw)
    - Various database adapters

    Example vulnerable code:
        user_id = request.GET.get("id")
        cursor.execute(f"SELECT * FROM users WHERE id={user_id}")
    """
    return flows(
        from_sources=[
            calls("request.GET"),
            calls("request.POST"),
            calls("request.args.get"),
            calls("request.form.get"),
            calls("input"),
        ],
        to_sinks=[
            calls("execute"),
            calls("executemany"),
            calls("*.execute"),
            calls("*.executemany"),
            calls("*.raw"),
        ],
        sanitized_by=[
            calls("escape"),
            calls("escape_string"),
            calls("*.escape"),
            calls("parameterize"),
        ],
        propagates_through=PropagationPresets.standard(),
        scope="global",
    )


@rule(
    id="owasp-a03-cmdi",
    severity="critical",
    cwe="CWE-78",
    owasp="A03:2021",
)
def detect_command_injection():
    """
    Detects OS command injection.

    Covers:
    - os.system, subprocess calls
    - Shell command execution

    Example vulnerable code:
        filename = request.GET.get("file")
        os.system(f"cat {filename}")
    """
    return flows(
        from_sources=[
            calls("request.*"),
            calls("input"),
        ],
        to_sinks=[
            calls("system"),
            calls("popen"),
            calls("os.system"),
            calls("subprocess.*"),
        ],
        sanitized_by=[
            calls("shlex.quote"),
            calls("pipes.quote"),
        ],
        propagates_through=PropagationPresets.standard(),
        scope="global",
    )


@rule(
    id="owasp-a03-code-injection",
    severity="critical",
    cwe="CWE-94",
    owasp="A03:2021",
)
def detect_code_injection():
    """
    Detects code injection via eval/exec.

    Example vulnerable code:
        code = request.POST.get("code")
        eval(code)
    """
    return flows(
        from_sources=[
            calls("request.*"),
            calls("input"),
        ],
        to_sinks=[
            calls("eval"),
            calls("exec"),
            calls("compile"),
        ],
        propagates_through=PropagationPresets.standard(),
        scope="global",
    )


# =============================================================================
# A01:2021 - Broken Access Control
# =============================================================================

@rule(
    id="owasp-a01-path-traversal",
    severity="high",
    cwe="CWE-22",
    owasp="A01:2021",
)
def detect_path_traversal():
    """
    Detects path traversal vulnerabilities.

    Example vulnerable code:
        filename = request.GET.get("file")
        with open(f"/uploads/{filename}") as f:
            return f.read()
    """
    return flows(
        from_sources=[
            calls("request.*"),
            calls("input"),
        ],
        to_sinks=[
            calls("open"),
            calls("*.read"),
            calls("*.write"),
        ],
        sanitized_by=[
            calls("os.path.basename"),
            calls("os.path.normpath"),
        ],
        propagates_through=PropagationPresets.standard(),
        scope="global",
    )


# =============================================================================
# A02:2021 - Cryptographic Failures
# =============================================================================

@rule(
    id="owasp-a02-weak-crypto",
    severity="high",
    cwe="CWE-327",
    owasp="A02:2021",
)
def detect_weak_cryptography():
    """
    Detects weak cryptographic algorithms.

    Covers:
    - MD5, SHA1 hashing
    - DES encryption
    """
    return Or(
        calls("hashlib.md5"),
        calls("hashlib.sha1"),
        calls("hashlib.new", match_position={0: ["md5", "sha1", "md4"]}),
        calls("Crypto.Cipher.DES"),
    )


@rule(
    id="owasp-a02-hardcoded-secrets",
    severity="high",
    cwe="CWE-798",
    owasp="A02:2021",
)
def detect_hardcoded_secrets():
    """
    Detects hardcoded passwords and API keys.

    Example vulnerable code:
        db.connect(password="secret123")
    """
    return Or(
        calls("*.connect", match_name={"password": "*"}),
        calls("*.authenticate", match_name={"password": "*"}),
        calls("*", match_name={"api_key": "*", "secret_key": "*"}),
    )


# =============================================================================
# A05:2021 - Security Misconfiguration
# =============================================================================

@rule(
    id="owasp-a05-debug-mode",
    severity="high",
    cwe="CWE-489",
    owasp="A05:2021",
)
def detect_debug_mode():
    """
    Detects debug mode enabled in production.

    Example vulnerable code:
        app.run(debug=True)
    """
    return Or(
        calls("app.run", match_name={"debug": True}),
        calls("*.run", match_name={"debug": True}),
    )


@rule(
    id="owasp-a05-insecure-deserialization",
    severity="critical",
    cwe="CWE-502",
    owasp="A05:2021",
)
def detect_insecure_deserialization():
    """
    Detects insecure deserialization.

    Covers:
    - pickle.loads
    - yaml.unsafe_load
    """
    return Or(
        calls("pickle.loads"),
        calls("pickle.load"),
        calls("yaml.unsafe_load"),
        calls("yaml.load"),
    )


# =============================================================================
# A07:2021 - Identification and Authentication Failures
# =============================================================================

@rule(
    id="owasp-a07-weak-password-hash",
    severity="high",
    cwe="CWE-916",
    owasp="A07:2021",
)
def detect_weak_password_hashing():
    """
    Detects weak password hashing algorithms.

    Example vulnerable code:
        password_hash = hashlib.md5(password.encode()).hexdigest()
    """
    return Or(
        calls("hashlib.md5"),
        calls("hashlib.sha1"),
        calls("hashlib.sha256"),  # SHA256 alone is not sufficient for passwords
    )


# =============================================================================
# A10:2021 - Server-Side Request Forgery (SSRF)
# =============================================================================

@rule(
    id="owasp-a10-ssrf",
    severity="high",
    cwe="CWE-918",
    owasp="A10:2021",
)
def detect_ssrf():
    """
    Detects SSRF where user input flows to HTTP requests.

    Example vulnerable code:
        url = request.GET.get("url")
        requests.get(url)
    """
    return flows(
        from_sources=[
            calls("request.*"),
        ],
        to_sinks=[
            calls("requests.get"),
            calls("requests.post"),
            calls("urllib.request.urlopen"),
            calls("*.urlopen"),
        ],
        sanitized_by=[
            calls("validate_url"),
            calls("is_safe_url"),
        ],
        propagates_through=PropagationPresets.standard(),
        scope="global",
    )


# =============================================================================
# Additional Security Patterns
# =============================================================================

@rule(
    id="xpath-injection",
    severity="high",
    cwe="CWE-643",
)
def detect_xpath_injection():
    """Detects XPath injection vulnerabilities"""
    return flows(
        from_sources=[calls("request.*"), calls("input")],
        to_sinks=[calls("*.xpath"), calls("*.find")],
        propagates_through=PropagationPresets.standard(),
        scope="global",
    )


@rule(
    id="ldap-injection",
    severity="high",
    cwe="CWE-90",
)
def detect_ldap_injection():
    """Detects LDAP injection vulnerabilities"""
    return flows(
        from_sources=[calls("request.*"), calls("input")],
        to_sinks=[calls("*.search_s"), calls("*.search")],
        propagates_through=PropagationPresets.standard(),
        scope="global",
    )


if __name__ == "__main__":
    print("OWASP Top 10 Security Rules loaded successfully!")
    print("Run with: pathfinder scan --rules owasp_top10.py --project <path>")

Running the Rules

Save the rules above as owasp_top10.py and run:

# Scan your project
pathfinder scan --rules owasp_top10.py --project /path/to/your/code

# Scan current directory
pathfinder scan --rules owasp_top10.py --project .

# With verbose output
pathfinder scan --rules owasp_top10.py --project . --verbose

Example Output

[critical] owasp-a03-sqli
CWE: CWE-89 | OWASP: A03:2021
Detects SQL injection where user input flows to SQL execution
→ app/views.py:42
Source: request.GET.get (line 38)
Sink: cursor.execute (line 42)
Tainted variable: user_id
Confidence: 85%
Scope: global

Framework-Specific Rules

Django-Specific Rules

from codepathfinder import rule, calls, flows, Or
from codepathfinder.presets import PropagationPresets

@rule(id="django-raw-sql", severity="critical", cwe="CWE-89")
def detect_django_raw_sql():
    """Detects Django raw SQL queries with user input"""
    return flows(
        from_sources=[
            calls("request.GET.get"),
            calls("request.POST.get"),
        ],
        to_sinks=[
            calls("*.raw"),
            calls("*.extra"),
            calls("RawSQL"),
        ],
        propagates_through=PropagationPresets.standard(),
        scope="global"
    )

@rule(id="django-debug-true", severity="high", cwe="CWE-489")
def detect_django_debug():
    """Detects DEBUG=True in settings"""
    return calls("*", match_name={"DEBUG": True})

@rule(id="django-secret-key", severity="critical", cwe="CWE-798")
def detect_django_hardcoded_secret():
    """Detects hardcoded SECRET_KEY"""
    return calls("*", match_name={"SECRET_KEY": "*"})

Flask-Specific Rules

@rule(id="flask-debug", severity="high", cwe="CWE-489")
def detect_flask_debug():
    """Detects Flask debug mode"""
    return calls("app.run", match_name={"debug": True})

@rule(id="flask-sqli", severity="critical", cwe="CWE-89")
def detect_flask_sql_injection():
    """Detects SQL injection in Flask apps"""
    return flows(
        from_sources=[
            calls("request.args.get"),
            calls("request.form.get"),
            calls("request.values.get"),
        ],
        to_sinks=[
            calls("*.execute"),
            calls("db.session.execute"),
        ],
        propagates_through=PropagationPresets.standard(),
        scope="global"
    )

@rule(id="flask-ssti", severity="critical", cwe="CWE-94")
def detect_flask_template_injection():
    """Detects Server-Side Template Injection"""
    return flows(
        from_sources=[calls("request.*")],
        to_sinks=[
            calls("render_template_string"),
            calls("*.render_string"),
        ],
        propagates_through=PropagationPresets.standard(),
        scope="global"
    )

Testing Your Rules

Create test files to verify your rules work correctly:

Vulnerable Test Code

Create vulnerable_app.py to test rule detection:

from flask import Flask, request
import sqlite3

app = Flask(__name__)

@app.route('/user')
def get_user():
    # VULNERABLE: SQL Injection
    user_id = request.args.get('id')
    conn = sqlite3.connect('db.sqlite')
    cursor = conn.cursor()
    cursor.execute(f"SELECT * FROM users WHERE id = {user_id}")
    return cursor.fetchone()

@app.route('/file')
def read_file():
    # VULNERABLE: Path Traversal
    filename = request.args.get('file')
    with open(f'/uploads/{filename}') as f:
        return f.read()

@app.route('/execute')
def execute_code():
    # VULNERABLE: Code Injection
    code = request.args.get('code')
    eval(code)

if __name__ == '__main__':
    # VULNERABLE: Debug mode
    app.run(debug=True)

Run Detection

pathfinder scan --rules owasp_top10.py --project vulnerable_app.py

This should detect all 4 vulnerabilities!

Next steps:

  • Add these rules to your CI/CD pipeline
  • Customize source/sink patterns for your codebase
  • Add framework-specific sanitizers
  • Create organization-specific rules