Real-World Examples

Complete, production-ready security rules you can copy and run immediately. These examples detect real OWASP Top 10 vulnerabilities.

✓ Ready to use: All examples on this page are complete, tested rules that work on first try. Just copy, save as owasp_rules.py, and runpathfinder scan --rules owasp_rules.py --project .

Complete OWASP Top 10 Ruleset

Copy this complete file for production-ready OWASP Top 10 detection:

#!/usr/bin/env python3
"""
OWASP Top 10 Security Rules for Code Pathfinder
Production-ready rules for detecting common web vulnerabilities.

Usage:
    pathfinder scan --rules owasp_top10.py --project /path/to/your/code
"""

from codepathfinder import calls, flows, rule, Or
from codepathfinder.presets import PropagationPresets


# =============================================================================
# A03:2021 - Injection
# =============================================================================

@rule(
    id="owasp-a03-sqli",
    severity="critical",
    cwe="CWE-89",
    owasp="A03:2021",
)
def detect_sql_injection():
    """
    Detects SQL injection where user input flows to SQL execution.

    Covers:
    - Direct SQL execution (cursor.execute)
    - ORM raw queries (Model.raw)
    - Various database adapters

    Example vulnerable code:
        user_id = request.GET.get("id")
        cursor.execute(f"SELECT * FROM users WHERE id={user_id}")
    """
    return flows(
        from_sources=[
            calls("request.GET"),
            calls("request.POST"),
            calls("request.args.get"),
            calls("request.form.get"),
            calls("input"),
        ],
        to_sinks=[
            calls("execute"),
            calls("executemany"),
            calls("*.execute"),
            calls("*.executemany"),
            calls("*.raw"),
        ],
        sanitized_by=[
            calls("escape"),
            calls("escape_string"),
            calls("*.escape"),
            calls("parameterize"),
        ],
        propagates_through=PropagationPresets.standard(),
        scope="global",
    )


@rule(
    id="owasp-a03-cmdi",
    severity="critical",
    cwe="CWE-78",
    owasp="A03:2021",
)
def detect_command_injection():
    """
    Detects OS command injection.

    Covers:
    - os.system, subprocess calls
    - Shell command execution

    Example vulnerable code:
        filename = request.GET.get("file")
        os.system(f"cat {filename}")
    """
    return flows(
        from_sources=[
            calls("request.*"),
            calls("input"),
        ],
        to_sinks=[
            calls("system"),
            calls("popen"),
            calls("os.system"),
            calls("subprocess.*"),
        ],
        sanitized_by=[
            calls("shlex.quote"),
            calls("pipes.quote"),
        ],
        propagates_through=PropagationPresets.standard(),
        scope="global",
    )


@rule(
    id="owasp-a03-code-injection",
    severity="critical",
    cwe="CWE-94",
    owasp="A03:2021",
)
def detect_code_injection():
    """
    Detects code injection via eval/exec.

    Example vulnerable code:
        code = request.POST.get("code")
        eval(code)
    """
    return flows(
        from_sources=[
            calls("request.*"),
            calls("input"),
        ],
        to_sinks=[
            calls("eval"),
            calls("exec"),
            calls("compile"),
        ],
        propagates_through=PropagationPresets.standard(),
        scope="global",
    )


# =============================================================================
# A01:2021 - Broken Access Control
# =============================================================================

@rule(
    id="owasp-a01-path-traversal",
    severity="high",
    cwe="CWE-22",
    owasp="A01:2021",
)
def detect_path_traversal():
    """
    Detects path traversal vulnerabilities.

    Example vulnerable code:
        filename = request.GET.get("file")
        with open(f"/uploads/{filename}") as f:
            return f.read()
    """
    return flows(
        from_sources=[
            calls("request.*"),
            calls("input"),
        ],
        to_sinks=[
            calls("open"),
            calls("*.read"),
            calls("*.write"),
        ],
        sanitized_by=[
            calls("os.path.basename"),
            calls("os.path.normpath"),
        ],
        propagates_through=PropagationPresets.standard(),
        scope="global",
    )


# =============================================================================
# A02:2021 - Cryptographic Failures
# =============================================================================

@rule(
    id="owasp-a02-weak-crypto",
    severity="high",
    cwe="CWE-327",
    owasp="A02:2021",
)
def detect_weak_cryptography():
    """
    Detects weak cryptographic algorithms.

    Covers:
    - MD5, SHA1 hashing
    - DES encryption
    """
    return Or(
        calls("hashlib.md5"),
        calls("hashlib.sha1"),
        calls("hashlib.new", match_position={0: ["md5", "sha1", "md4"]}),
        calls("Crypto.Cipher.DES"),
    )


@rule(
    id="owasp-a02-hardcoded-secrets",
    severity="high",
    cwe="CWE-798",
    owasp="A02:2021",
)
def detect_hardcoded_secrets():
    """
    Detects hardcoded passwords and API keys.

    Example vulnerable code:
        db.connect(password="secret123")
    """
    return Or(
        calls("*.connect", match_name={"password": "*"}),
        calls("*.authenticate", match_name={"password": "*"}),
        calls("*", match_name={"api_key": "*", "secret_key": "*"}),
    )


# =============================================================================
# A05:2021 - Security Misconfiguration
# =============================================================================

@rule(
    id="owasp-a05-debug-mode",
    severity="high",
    cwe="CWE-489",
    owasp="A05:2021",
)
def detect_debug_mode():
    """
    Detects debug mode enabled in production.

    Example vulnerable code:
        app.run(debug=True)
    """
    return Or(
        calls("app.run", match_name={"debug": True}),
        calls("*.run", match_name={"debug": True}),
    )


@rule(
    id="owasp-a05-insecure-deserialization",
    severity="critical",
    cwe="CWE-502",
    owasp="A05:2021",
)
def detect_insecure_deserialization():
    """
    Detects insecure deserialization.

    Covers:
    - pickle.loads
    - yaml.unsafe_load
    """
    return Or(
        calls("pickle.loads"),
        calls("pickle.load"),
        calls("yaml.unsafe_load"),
        calls("yaml.load"),
    )


# =============================================================================
# A07:2021 - Identification and Authentication Failures
# =============================================================================

@rule(
    id="owasp-a07-weak-password-hash",
    severity="high",
    cwe="CWE-916",
    owasp="A07:2021",
)
def detect_weak_password_hashing():
    """
    Detects weak password hashing algorithms.

    Example vulnerable code:
        password_hash = hashlib.md5(password.encode()).hexdigest()
    """
    return Or(
        calls("hashlib.md5"),
        calls("hashlib.sha1"),
        calls("hashlib.sha256"),  # SHA256 alone is not sufficient for passwords
    )


# =============================================================================
# A10:2021 - Server-Side Request Forgery (SSRF)
# =============================================================================

@rule(
    id="owasp-a10-ssrf",
    severity="high",
    cwe="CWE-918",
    owasp="A10:2021",
)
def detect_ssrf():
    """
    Detects SSRF where user input flows to HTTP requests.

    Example vulnerable code:
        url = request.GET.get("url")
        requests.get(url)
    """
    return flows(
        from_sources=[
            calls("request.*"),
        ],
        to_sinks=[
            calls("requests.get"),
            calls("requests.post"),
            calls("urllib.request.urlopen"),
            calls("*.urlopen"),
        ],
        sanitized_by=[
            calls("validate_url"),
            calls("is_safe_url"),
        ],
        propagates_through=PropagationPresets.standard(),
        scope="global",
    )


# =============================================================================
# Additional Security Patterns
# =============================================================================

@rule(
    id="xpath-injection",
    severity="high",
    cwe="CWE-643",
)
def detect_xpath_injection():
    """Detects XPath injection vulnerabilities"""
    return flows(
        from_sources=[calls("request.*"), calls("input")],
        to_sinks=[calls("*.xpath"), calls("*.find")],
        propagates_through=PropagationPresets.standard(),
        scope="global",
    )


@rule(
    id="ldap-injection",
    severity="high",
    cwe="CWE-90",
)
def detect_ldap_injection():
    """Detects LDAP injection vulnerabilities"""
    return flows(
        from_sources=[calls("request.*"), calls("input")],
        to_sinks=[calls("*.search_s"), calls("*.search")],
        propagates_through=PropagationPresets.standard(),
        scope="global",
    )


if __name__ == "__main__":
    print("OWASP Top 10 Security Rules loaded successfully!")
    print("Run with: pathfinder scan --rules owasp_top10.py --project <path>")

Running the Rules

Save the rules above as owasp_top10.py and run:

# Scan your project
pathfinder scan --rules owasp_top10.py --project /path/to/your/code

# Scan current directory
pathfinder scan --rules owasp_top10.py --project .

# With verbose output
pathfinder scan --rules owasp_top10.py --project . --verbose

Example Output

[critical] owasp-a03-sqli
CWE: CWE-89 | OWASP: A03:2021
Detects SQL injection where user input flows to SQL execution
→ app/views.py:42
Source: request.GET.get (line 38)
Sink: cursor.execute (line 42)
Tainted variable: user_id
Confidence: 85%
Scope: global

Framework-Specific Rules

Django-Specific Rules

from codepathfinder import rule, calls, flows, Or
from codepathfinder.presets import PropagationPresets

@rule(id="django-raw-sql", severity="critical", cwe="CWE-89")
def detect_django_raw_sql():
    """Detects Django raw SQL queries with user input"""
    return flows(
        from_sources=[
            calls("request.GET.get"),
            calls("request.POST.get"),
        ],
        to_sinks=[
            calls("*.raw"),
            calls("*.extra"),
            calls("RawSQL"),
        ],
        propagates_through=PropagationPresets.standard(),
        scope="global"
    )

@rule(id="django-debug-true", severity="high", cwe="CWE-489")
def detect_django_debug():
    """Detects DEBUG=True in settings"""
    return calls("*", match_name={"DEBUG": True})

@rule(id="django-secret-key", severity="critical", cwe="CWE-798")
def detect_django_hardcoded_secret():
    """Detects hardcoded SECRET_KEY"""
    return calls("*", match_name={"SECRET_KEY": "*"})

Flask-Specific Rules

@rule(id="flask-debug", severity="high", cwe="CWE-489")
def detect_flask_debug():
    """Detects Flask debug mode"""
    return calls("app.run", match_name={"debug": True})

@rule(id="flask-sqli", severity="critical", cwe="CWE-89")
def detect_flask_sql_injection():
    """Detects SQL injection in Flask apps"""
    return flows(
        from_sources=[
            calls("request.args.get"),
            calls("request.form.get"),
            calls("request.values.get"),
        ],
        to_sinks=[
            calls("*.execute"),
            calls("db.session.execute"),
        ],
        propagates_through=PropagationPresets.standard(),
        scope="global"
    )

@rule(id="flask-ssti", severity="critical", cwe="CWE-94")
def detect_flask_template_injection():
    """Detects Server-Side Template Injection"""
    return flows(
        from_sources=[calls("request.*")],
        to_sinks=[
            calls("render_template_string"),
            calls("*.render_string"),
        ],
        propagates_through=PropagationPresets.standard(),
        scope="global"
    )

Testing Your Rules

Create test files to verify your rules work correctly:

Vulnerable Test Code

Create vulnerable_app.py to test rule detection:

from flask import Flask, request
import sqlite3

app = Flask(__name__)

@app.route('/user')
def get_user():
    # VULNERABLE: SQL Injection
    user_id = request.args.get('id')
    conn = sqlite3.connect('db.sqlite')
    cursor = conn.cursor()
    cursor.execute(f"SELECT * FROM users WHERE id = {user_id}")
    return cursor.fetchone()

@app.route('/file')
def read_file():
    # VULNERABLE: Path Traversal
    filename = request.args.get('file')
    with open(f'/uploads/{filename}') as f:
        return f.read()

@app.route('/execute')
def execute_code():
    # VULNERABLE: Code Injection
    code = request.args.get('code')
    eval(code)

if __name__ == '__main__':
    # VULNERABLE: Debug mode
    app.run(debug=True)

Run Detection

pathfinder scan --rules owasp_top10.py --project vulnerable_app.py

This should detect all 4 vulnerabilities!

Next Steps:

  • Add these rules to your CI/CD pipeline
  • Customize source/sink patterns for your codebase
  • Add framework-specific sanitizers
  • Create organization-specific rules