Dataflow Analysis
Detect injection vulnerabilities by tracking how untrusted data flows through your code. Master dataflow analysis to find OWASP Top 10 vulnerabilities.
What is Dataflow Analysis?
Dataflow analysis (also called taint analysis) tracks how data moves through your program. It answers the question: "Can untrusted user input reach a dangerous function?"
Why is this powerful?
Instead of just finding execute() calls, you find execute() calls that use user input. This dramatically reduces false positives while catching real vulnerabilities.
flows() Function
The flows() function is your primary tool for dataflow analysis:
from codepathfinder import rule, flows, calls
from codepathfinder.presets import PropagationPresets
@rule(id="sql-injection", severity="critical", cwe="CWE-89")
def detect_sql_injection():
"""Detects SQL injection vulnerabilities"""
return flows(
from_sources=calls("request.GET", "request.POST"),
to_sinks=calls("execute", "executemany"),
sanitized_by=calls("escape", "quote"),
propagates_through=PropagationPresets.standard(),
scope="global"
)Parameters explained:
from_sources- Where untrusted data comes from (user input)to_sinks- Dangerous functions that should not receive untrusted datasanitized_by- Functions that make data safe (optional)propagates_through- How taint spreads (assignments, function calls, etc.)scope- Track within one function ("local") or across functions ("global")
Sources and Sinks
Sources: Where Untrusted Data Comes From
Sources are entry points for untrusted data. Common sources include:
# Web application sources
from_sources=[
calls("request.GET"), # URL parameters
calls("request.POST"), # Form data
calls("request.args.get"), # Flask args
calls("request.form.get"), # Flask forms
calls("request.json"), # JSON payloads
]
# CLI application sources
from_sources=[
calls("input"), # User input
calls("sys.argv"), # Command line args
]
# File reading sources
from_sources=[
calls("open"),
calls("*.read"),
calls("*.readline"),
]Sinks: Dangerous Functions
Sinks are functions that become dangerous when receiving untrusted data:
# SQL sinks
to_sinks=[
calls("execute"),
calls("executemany"),
calls("*.execute"),
calls("*.raw"),
]
# Command execution sinks
to_sinks=[
calls("system"),
calls("popen"),
calls("os.system"),
calls("subprocess.*"),
]
# Code execution sinks
to_sinks=[
calls("eval"),
calls("exec"),
calls("compile"),
]
# File operation sinks
to_sinks=[
calls("open"),
calls("*.write"),
calls("*.read"),
]Sanitizers
Sanitizers are functions that clean or validate data, breaking the taint flow. Specifying sanitizers reduces false positives significantly.
# SQL sanitizers
sanitized_by=[
calls("escape"),
calls("escape_string"),
calls("quote_sql"),
calls("parameterize"),
]
# Command injection sanitizers
sanitized_by=[
calls("shlex.quote"),
calls("pipes.quote"),
]
# Path traversal sanitizers
sanitized_by=[
calls("os.path.basename"),
calls("os.path.normpath"),
]
# XSS sanitizers
sanitized_by=[
calls("html.escape"),
calls("bleach.clean"),
calls("*.escape"),
]Example with sanitizer:
# This will NOT be detected (sanitized)
user_input = request.GET.get("name")
safe_input = escape(user_input)
cursor.execute(f"SELECT * FROM users WHERE name='{safe_input}'")The escape() call breaks the taint flow, so no vulnerability is reported.
Propagation
Propagation defines HOW taint spreads through code. Code Pathfinder provides presets for common patterns:
Propagation Presets
from codepathfinder.presets import PropagationPresets
# Minimal (fastest, ~60-70% coverage)
PropagationPresets.minimal()
# - Variable assignments (x = tainted)
# - Function arguments (func(tainted))
# Standard (recommended, ~75-80% coverage)
PropagationPresets.standard()
# - All minimal patterns
# - Function returns (return tainted)
# - String concatenation ("prefix" + tainted)
# - String formatting (f"{tainted}")
# Comprehensive (most thorough)
PropagationPresets.comprehensive()
# - All standard patterns
# - Additional edge casesRecommendation: Use PropagationPresets.standard() for most security rules. It provides excellent coverage while maintaining good performance.
Explicit Propagation
For fine-grained control, specify exactly how taint should propagate:
from codepathfinder import propagates
flows(
from_sources=calls("input"),
to_sinks=calls("eval"),
propagates_through=[
propagates.assignment(), # x = tainted
propagates.function_args(), # func(tainted)
propagates.function_returns(), # return tainted
propagates.string_concat(), # "str" + tainted
propagates.string_format(), # f"{tainted}"
],
scope="local"
)Scope: Local vs Global
Local Scope (Intra-procedural)
Tracks taint within a single function. Faster and simpler, but won't detect vulnerabilities that cross function boundaries.
@rule(id="simple-sqli", severity="high")
def detect_simple_sql_injection():
"""Detects SQL injection within same function"""
return flows(
from_sources=calls("request.GET"),
to_sinks=calls("execute"),
propagates_through=PropagationPresets.minimal(),
scope="local" # Same function only
)Detects:
def view(request):
user_id = request.GET.get("id")
cursor.execute(f"SELECT * FROM users WHERE id={user_id}")
# ↑ DETECTED (source and sink in same function)Does NOT detect:
def get_user_input(request):
return request.GET.get("id")
def view(request):
user_id = get_user_input(request)
cursor.execute(f"SELECT * FROM users WHERE id={user_id}")
# ↑ NOT DETECTED (crosses function boundary)Global Scope (Inter-procedural)
Tracks taint across function calls. More comprehensive but slower.
@rule(id="comprehensive-sqli", severity="critical")
def detect_comprehensive_sql_injection():
"""Detects SQL injection across function boundaries"""
return flows(
from_sources=calls("request.GET"),
to_sinks=calls("execute"),
propagates_through=PropagationPresets.standard(),
scope="global" # Across functions
)Now both examples above will be detected!
Complete Examples
SQL Injection
@rule(id="sqli-comprehensive", severity="critical", cwe="CWE-89", owasp="A03:2021")
def detect_sql_injection():
"""Comprehensive SQL injection detection"""
return flows(
from_sources=[
calls("request.GET"),
calls("request.POST"),
calls("request.args.get"),
calls("request.form.get"),
calls("input"),
],
to_sinks=[
calls("execute"),
calls("executemany"),
calls("*.execute"),
calls("*.executemany"),
calls("*.raw"),
],
sanitized_by=[
calls("escape"),
calls("escape_string"),
calls("*.escape"),
calls("parameterize"),
],
propagates_through=PropagationPresets.standard(),
scope="global"
)Command Injection
@rule(id="cmdi", severity="critical", cwe="CWE-78", owasp="A03:2021")
def detect_command_injection():
"""Detects OS command injection"""
return flows(
from_sources=[
calls("request.*"),
calls("input"),
],
to_sinks=[
calls("system"),
calls("popen"),
calls("os.system"),
calls("subprocess.*"),
],
sanitized_by=[
calls("shlex.quote"),
calls("pipes.quote"),
],
propagates_through=PropagationPresets.standard(),
scope="global"
)Path Traversal
@rule(id="path-traversal", severity="high", cwe="CWE-22")
def detect_path_traversal():
"""Detects path traversal vulnerabilities"""
return flows(
from_sources=[
calls("request.*"),
calls("input"),
],
to_sinks=[
calls("open"),
calls("*.read"),
calls("*.write"),
],
sanitized_by=[
calls("os.path.basename"),
calls("os.path.normpath"),
],
propagates_through=PropagationPresets.standard(),
scope="global"
)Server-Side Request Forgery (SSRF)
@rule(id="ssrf", severity="high", cwe="CWE-918", owasp="A10:2021")
def detect_ssrf():
"""Detects SSRF vulnerabilities"""
return flows(
from_sources=[
calls("request.GET"),
calls("request.POST"),
],
to_sinks=[
calls("requests.get"),
calls("requests.post"),
calls("urllib.request.urlopen"),
],
sanitized_by=[
calls("validate_url"),
calls("is_safe_url"),
],
propagates_through=PropagationPresets.standard(),
scope="global"
)