Real-World Examples
Complete, production-ready security rules you can copy and run immediately. These examples detect real OWASP Top 10 vulnerabilities.
✓ Ready to use: All examples on this page are complete, tested rules that work on first try. Just copy, save as owasp_rules.py, and runpathfinder scan --rules owasp_rules.py --project .
Complete OWASP Top 10 Ruleset
Copy this complete file for production-ready OWASP Top 10 detection:
#!/usr/bin/env python3
"""
OWASP Top 10 Security Rules for Code Pathfinder
Production-ready rules for detecting common web vulnerabilities.
Usage:
pathfinder scan --rules owasp_top10.py --project /path/to/your/code
"""
from codepathfinder import calls, flows, rule, Or
from codepathfinder.presets import PropagationPresets
# =============================================================================
# A03:2021 - Injection
# =============================================================================
@rule(
id="owasp-a03-sqli",
severity="critical",
cwe="CWE-89",
owasp="A03:2021",
)
def detect_sql_injection():
"""
Detects SQL injection where user input flows to SQL execution.
Covers:
- Direct SQL execution (cursor.execute)
- ORM raw queries (Model.raw)
- Various database adapters
Example vulnerable code:
user_id = request.GET.get("id")
cursor.execute(f"SELECT * FROM users WHERE id={user_id}")
"""
return flows(
from_sources=[
calls("request.GET"),
calls("request.POST"),
calls("request.args.get"),
calls("request.form.get"),
calls("input"),
],
to_sinks=[
calls("execute"),
calls("executemany"),
calls("*.execute"),
calls("*.executemany"),
calls("*.raw"),
],
sanitized_by=[
calls("escape"),
calls("escape_string"),
calls("*.escape"),
calls("parameterize"),
],
propagates_through=PropagationPresets.standard(),
scope="global",
)
@rule(
id="owasp-a03-cmdi",
severity="critical",
cwe="CWE-78",
owasp="A03:2021",
)
def detect_command_injection():
"""
Detects OS command injection.
Covers:
- os.system, subprocess calls
- Shell command execution
Example vulnerable code:
filename = request.GET.get("file")
os.system(f"cat {filename}")
"""
return flows(
from_sources=[
calls("request.*"),
calls("input"),
],
to_sinks=[
calls("system"),
calls("popen"),
calls("os.system"),
calls("subprocess.*"),
],
sanitized_by=[
calls("shlex.quote"),
calls("pipes.quote"),
],
propagates_through=PropagationPresets.standard(),
scope="global",
)
@rule(
id="owasp-a03-code-injection",
severity="critical",
cwe="CWE-94",
owasp="A03:2021",
)
def detect_code_injection():
"""
Detects code injection via eval/exec.
Example vulnerable code:
code = request.POST.get("code")
eval(code)
"""
return flows(
from_sources=[
calls("request.*"),
calls("input"),
],
to_sinks=[
calls("eval"),
calls("exec"),
calls("compile"),
],
propagates_through=PropagationPresets.standard(),
scope="global",
)
# =============================================================================
# A01:2021 - Broken Access Control
# =============================================================================
@rule(
id="owasp-a01-path-traversal",
severity="high",
cwe="CWE-22",
owasp="A01:2021",
)
def detect_path_traversal():
"""
Detects path traversal vulnerabilities.
Example vulnerable code:
filename = request.GET.get("file")
with open(f"/uploads/{filename}") as f:
return f.read()
"""
return flows(
from_sources=[
calls("request.*"),
calls("input"),
],
to_sinks=[
calls("open"),
calls("*.read"),
calls("*.write"),
],
sanitized_by=[
calls("os.path.basename"),
calls("os.path.normpath"),
],
propagates_through=PropagationPresets.standard(),
scope="global",
)
# =============================================================================
# A02:2021 - Cryptographic Failures
# =============================================================================
@rule(
id="owasp-a02-weak-crypto",
severity="high",
cwe="CWE-327",
owasp="A02:2021",
)
def detect_weak_cryptography():
"""
Detects weak cryptographic algorithms.
Covers:
- MD5, SHA1 hashing
- DES encryption
"""
return Or(
calls("hashlib.md5"),
calls("hashlib.sha1"),
calls("hashlib.new", match_position={0: ["md5", "sha1", "md4"]}),
calls("Crypto.Cipher.DES"),
)
@rule(
id="owasp-a02-hardcoded-secrets",
severity="high",
cwe="CWE-798",
owasp="A02:2021",
)
def detect_hardcoded_secrets():
"""
Detects hardcoded passwords and API keys.
Example vulnerable code:
db.connect(password="secret123")
"""
return Or(
calls("*.connect", match_name={"password": "*"}),
calls("*.authenticate", match_name={"password": "*"}),
calls("*", match_name={"api_key": "*", "secret_key": "*"}),
)
# =============================================================================
# A05:2021 - Security Misconfiguration
# =============================================================================
@rule(
id="owasp-a05-debug-mode",
severity="high",
cwe="CWE-489",
owasp="A05:2021",
)
def detect_debug_mode():
"""
Detects debug mode enabled in production.
Example vulnerable code:
app.run(debug=True)
"""
return Or(
calls("app.run", match_name={"debug": True}),
calls("*.run", match_name={"debug": True}),
)
@rule(
id="owasp-a05-insecure-deserialization",
severity="critical",
cwe="CWE-502",
owasp="A05:2021",
)
def detect_insecure_deserialization():
"""
Detects insecure deserialization.
Covers:
- pickle.loads
- yaml.unsafe_load
"""
return Or(
calls("pickle.loads"),
calls("pickle.load"),
calls("yaml.unsafe_load"),
calls("yaml.load"),
)
# =============================================================================
# A07:2021 - Identification and Authentication Failures
# =============================================================================
@rule(
id="owasp-a07-weak-password-hash",
severity="high",
cwe="CWE-916",
owasp="A07:2021",
)
def detect_weak_password_hashing():
"""
Detects weak password hashing algorithms.
Example vulnerable code:
password_hash = hashlib.md5(password.encode()).hexdigest()
"""
return Or(
calls("hashlib.md5"),
calls("hashlib.sha1"),
calls("hashlib.sha256"), # SHA256 alone is not sufficient for passwords
)
# =============================================================================
# A10:2021 - Server-Side Request Forgery (SSRF)
# =============================================================================
@rule(
id="owasp-a10-ssrf",
severity="high",
cwe="CWE-918",
owasp="A10:2021",
)
def detect_ssrf():
"""
Detects SSRF where user input flows to HTTP requests.
Example vulnerable code:
url = request.GET.get("url")
requests.get(url)
"""
return flows(
from_sources=[
calls("request.*"),
],
to_sinks=[
calls("requests.get"),
calls("requests.post"),
calls("urllib.request.urlopen"),
calls("*.urlopen"),
],
sanitized_by=[
calls("validate_url"),
calls("is_safe_url"),
],
propagates_through=PropagationPresets.standard(),
scope="global",
)
# =============================================================================
# Additional Security Patterns
# =============================================================================
@rule(
id="xpath-injection",
severity="high",
cwe="CWE-643",
)
def detect_xpath_injection():
"""Detects XPath injection vulnerabilities"""
return flows(
from_sources=[calls("request.*"), calls("input")],
to_sinks=[calls("*.xpath"), calls("*.find")],
propagates_through=PropagationPresets.standard(),
scope="global",
)
@rule(
id="ldap-injection",
severity="high",
cwe="CWE-90",
)
def detect_ldap_injection():
"""Detects LDAP injection vulnerabilities"""
return flows(
from_sources=[calls("request.*"), calls("input")],
to_sinks=[calls("*.search_s"), calls("*.search")],
propagates_through=PropagationPresets.standard(),
scope="global",
)
if __name__ == "__main__":
print("OWASP Top 10 Security Rules loaded successfully!")
print("Run with: pathfinder scan --rules owasp_top10.py --project <path>")Running the Rules
Save the rules above as owasp_top10.py and run:
# Scan your project
pathfinder scan --rules owasp_top10.py --project /path/to/your/code
# Scan current directory
pathfinder scan --rules owasp_top10.py --project .
# With verbose output
pathfinder scan --rules owasp_top10.py --project . --verboseExample Output
[critical] owasp-a03-sqli
CWE: CWE-89 | OWASP: A03:2021
Detects SQL injection where user input flows to SQL execution
→ app/views.py:42
Source: request.GET.get (line 38)
Sink: cursor.execute (line 42)
Tainted variable: user_id
Confidence: 85%
Scope: global
Framework-Specific Rules
Django-Specific Rules
from codepathfinder import rule, calls, flows, Or
from codepathfinder.presets import PropagationPresets
@rule(id="django-raw-sql", severity="critical", cwe="CWE-89")
def detect_django_raw_sql():
"""Detects Django raw SQL queries with user input"""
return flows(
from_sources=[
calls("request.GET.get"),
calls("request.POST.get"),
],
to_sinks=[
calls("*.raw"),
calls("*.extra"),
calls("RawSQL"),
],
propagates_through=PropagationPresets.standard(),
scope="global"
)
@rule(id="django-debug-true", severity="high", cwe="CWE-489")
def detect_django_debug():
"""Detects DEBUG=True in settings"""
return calls("*", match_name={"DEBUG": True})
@rule(id="django-secret-key", severity="critical", cwe="CWE-798")
def detect_django_hardcoded_secret():
"""Detects hardcoded SECRET_KEY"""
return calls("*", match_name={"SECRET_KEY": "*"})Flask-Specific Rules
@rule(id="flask-debug", severity="high", cwe="CWE-489")
def detect_flask_debug():
"""Detects Flask debug mode"""
return calls("app.run", match_name={"debug": True})
@rule(id="flask-sqli", severity="critical", cwe="CWE-89")
def detect_flask_sql_injection():
"""Detects SQL injection in Flask apps"""
return flows(
from_sources=[
calls("request.args.get"),
calls("request.form.get"),
calls("request.values.get"),
],
to_sinks=[
calls("*.execute"),
calls("db.session.execute"),
],
propagates_through=PropagationPresets.standard(),
scope="global"
)
@rule(id="flask-ssti", severity="critical", cwe="CWE-94")
def detect_flask_template_injection():
"""Detects Server-Side Template Injection"""
return flows(
from_sources=[calls("request.*")],
to_sinks=[
calls("render_template_string"),
calls("*.render_string"),
],
propagates_through=PropagationPresets.standard(),
scope="global"
)Testing Your Rules
Create test files to verify your rules work correctly:
Vulnerable Test Code
Create vulnerable_app.py to test rule detection:
from flask import Flask, request
import sqlite3
app = Flask(__name__)
@app.route('/user')
def get_user():
# VULNERABLE: SQL Injection
user_id = request.args.get('id')
conn = sqlite3.connect('db.sqlite')
cursor = conn.cursor()
cursor.execute(f"SELECT * FROM users WHERE id = {user_id}")
return cursor.fetchone()
@app.route('/file')
def read_file():
# VULNERABLE: Path Traversal
filename = request.args.get('file')
with open(f'/uploads/{filename}') as f:
return f.read()
@app.route('/execute')
def execute_code():
# VULNERABLE: Code Injection
code = request.args.get('code')
eval(code)
if __name__ == '__main__':
# VULNERABLE: Debug mode
app.run(debug=True)Run Detection
pathfinder scan --rules owasp_top10.py --project vulnerable_app.pyThis should detect all 4 vulnerabilities!
Next Steps:
- Add these rules to your CI/CD pipeline
- Customize source/sink patterns for your codebase
- Add framework-specific sanitizers
- Create organization-specific rules