Cross-File Taint Analysis

User input enters in app.py, flows through a helper in utils.py, hits cursor.execute() in db.py. Most tools cannot see this. Code Pathfinder traces the full chain.

What is Taint Analysis?

Taint analysis tracks data that you do not trust. You mark where untrusted data enters your program (a source), you mark where that data would be dangerous (a sink), and the engine figures out whether there is a path between the two. If tainted data reaches a sink without passing through a sanitizer, you have a vulnerability.

SOURCE  request.args.get("username")
| assigned to variable
| passed as function argument
| concatenated into string
SINK    cursor.execute(query)
FINDING: tainted data reaches sink without sanitization

That is the entire idea. The interesting part is what happens between source and sink. Real codebases do not put request.args.get() and cursor.execute() on adjacent lines in the same function. The input gets assigned to a variable, passed to a helper, returned from that helper, concatenated into a query string, and eventually handed to a database cursor three files away. Code Pathfinder follows all of that.

Scope: Local vs Global

Every dataflow rule has a scope parameter. It controls how far the engine looks.

scope="local" tracks taint within a single function. If the source and sink are in the same function body, it will find the connection. It is fast, but it misses anything that crosses a function boundary.

# scope="local" detects this:
def view(request):
    user_id = request.args.get("id")
    cursor.execute(f"SELECT * FROM users WHERE id={user_id}")
    # ^ source and sink in the same function. Found.
# scope="local" does NOT detect this:
def get_input(request):
    return request.args.get("id")

def view(request):
    user_id = get_input(request)
    cursor.execute(f"SELECT * FROM users WHERE id={user_id}")
    # ^ source is inside get_input(). Missed.

scope="global" tracks taint across function calls and across files. It follows data through arguments, return values, assignments, and string operations, no matter how many hops or files sit between source and sink.

# scope="global" detects both examples above,
# plus cases where the source and sink live in different files entirely.

Use scope="local" when you need speed and you know the pattern is always self-contained. Use scope="global" for everything else. All 190+ rules that ship with Code Pathfinder use scope="global" out of the box.

Defining Sources

Sources tell the engine where untrusted data enters. You can use the simple calls() matcher or the more precise QueryType system.

Simple sources with calls()

from codepathfinder import calls

from_sources=[
    calls("request.args.get"),
    calls("request.form.get"),
    calls("input"),
]

Precise sources with QueryType

QueryType lets you define a type once and match its methods everywhere. You list the fully qualified names the engine should resolve, plus glob patterns as a fallback.

from codepathfinder import QueryType

class FlaskRequest(QueryType):
    fqns = ["flask"]
    patterns = ["*request"]

# Now use .method() to select which methods are sources:
from_sources=[
    FlaskRequest.method("get", "args", "form", "values",
                        "get_json", "cookies", "headers"),
]

The .method() call returns a MethodMatcher. You can chain qualifiers onto it, but for sources, you usually just need the method names.

Defining Sinks

Sinks are the dangerous functions. The key feature here is .tracks(), which tells the engine which argument position matters for taint.

from codepathfinder import QueryType

class DBCursor(QueryType):
    fqns = ["sqlite3.Cursor", "mysql.connector.cursor.MySQLCursor",
            "psycopg2.extensions.cursor", "pymysql.cursors.Cursor"]
    patterns = ["*Cursor"]
    match_subclasses = True

to_sinks=[
    DBCursor.method("execute", "executemany").tracks(0),
]

The .tracks(0) is doing something important. cursor.execute() takes two arguments: the SQL string at position 0 and the parameter tuple at position 1. You only care whether tainted data reaches the SQL string. The parameter tuple is the safe path. Without.tracks(0), the engine would flag cursor.execute("SELECT * FROM users WHERE name = ?", (username,)) as vulnerable, because username is tainted and it reaches the call. With.tracks(0), the engine knows that position 1 is safe, and that parameterized query gets a clean bill of health.

This single feature eliminates an entire class of false positives that trips up most static analysis tools.

Sanitizers

Sanitizers break the taint chain. If tainted data passes through a sanitizer before reaching a sink, no finding is reported.

sanitized_by=[
    calls("escape"),
    calls("escape_string"),
    calls("html.escape"),
    calls("shlex.quote"),
    calls("bleach.clean"),
]

How sanitizers work in practice:

user_input = request.args.get("name")
safe_input = escape(user_input)         # taint stops here
cursor.execute(f"SELECT * FROM users WHERE name='{safe_input}'")

The escape() call breaks the taint chain. The engine sees that tainted data was sanitized before it reached execute(), so it does not report a finding.

Propagation

Propagation defines how taint moves through code. You can list individual primitives, but the simplest approach is PropagationPresets.standard().

from codepathfinder.presets import PropagationPresets

# standard() covers:
#   - variable assignments     (x = tainted)
#   - function arguments       (func(tainted))
#   - function return values   (return tainted)
#   - string concatenation     ("prefix" + tainted)
#   - string formatting        (f"{tainted}")
#
# This handles ~75-80% of real-world taint flows.
propagates_through=PropagationPresets.standard()

For most security rules, standard() is the right choice. It catches assignments, function calls, returns, and string operations. If you need less coverage for speed, there is PropagationPresets.minimal() which only tracks assignments and function arguments. If you want everything the engine supports, use PropagationPresets.comprehensive().

Full Working Example

Here is a complete rule using the @python_rule decorator. This is what a production rule looks like. You can copy this into a file, point the scanner at your project, and it will work.

from rules.python_decorators import python_rule
from codepathfinder import calls, flows, QueryType
from codepathfinder.presets import PropagationPresets


class FlaskRequest(QueryType):
    fqns = ["flask"]
    patterns = ["*request"]


class DBCursor(QueryType):
    fqns = ["sqlite3.Cursor", "mysql.connector.cursor.MySQLCursor",
            "psycopg2.extensions.cursor", "pymysql.cursors.Cursor"]
    patterns = ["*Cursor"]
    match_subclasses = True


@python_rule(
    id="PYTHON-FLASK-SEC-003",
    name="Flask SQL Injection via Tainted String",
    severity="CRITICAL",
    category="flask",
    cwe="CWE-89",
    tags="python,flask,sql-injection,database,owasp-a03,cwe-89",
    message="User input flows to SQL execution without parameterization. Use parameterized queries.",
    owasp="A03:2021",
)
def detect_flask_sql_injection():
    """Detects Flask request data flowing to SQL execution."""
    return flows(
        from_sources=[
            FlaskRequest.method("get", "args", "form", "values",
                                "get_json", "cookies", "headers"),
        ],
        to_sinks=[
            DBCursor.method("execute", "executemany").tracks(0),
        ],
        sanitized_by=[
            calls("escape"),
            calls("escape_string"),
        ],
        propagates_through=PropagationPresets.standard(),
        scope="global",
    )

Let's break down what this rule does. It defines two types: FlaskRequest for where input enters, and DBCursor for where SQL gets executed. The from_sources list says "any call to .get(), .args, .form, or similar methods on a Flask request object is tainted." The to_sinks list says "if that tainted data reaches argument position 0 of .execute() or .executemany() on any database cursor, that is a finding." The scope="global" means the engine will follow the data across function calls and file boundaries.

Cross-File SQL Injection

This is where things get interesting. Most real Flask apps do not put route handlers and database queries in the same file. The route lives in app.py, the database logic lives indb.py, and user input crosses that boundary through a function call. Here is what that looks like.

app.py

"""Cross-file SQL injection: source in routes, sink in db layer."""
from flask import Flask, request
from db import query_user

app = Flask(__name__)


@app.route('/user')
def get_user():
    username = request.args.get('username')
    result = query_user(username)
    return str(result)

db.py

"""DB layer with raw SQL -- sink for tainted input from app.py."""
import sqlite3


def get_connection():
    return sqlite3.connect('app.db')


def query_user(name):
    conn = get_connection()
    cursor = conn.cursor()
    cursor.execute("SELECT * FROM users WHERE name = '" + name + "'")
    return cursor.fetchall()

The vulnerability is clear when you see both files side by side. request.args.get('username') inapp.py produces tainted data. That data gets passed as the username argument toquery_user(). Inside db.py, the name parameter (which is the same tainted value) gets concatenated into a SQL string and handed to cursor.execute().

A grep-based tool would find cursor.execute() in db.py but would have no idea whether name came from user input or from a hardcoded config value. A single-file analyzer would see the source in app.py and the sink in db.py but would not connect them. Code Pathfinder with scope="global" follows the chain:request.args.get() to username to query_user(username) toname to cursor.execute(). It reports the full path.

The fix is a parameterized query:

def query_user(name):
    conn = get_connection()
    cursor = conn.cursor()
    cursor.execute("SELECT * FROM users WHERE name = ?", (name,))
    return cursor.fetchall()

With .tracks(0) on the sink, the engine knows that name now flows into argument position 1 (the parameter tuple), not position 0 (the SQL string). No finding.

Running a Scan

Point the scanner at your project and specify the ruleset:

pathfinder scan --project . --ruleset python/flask/PYTHON-FLASK-SEC-003

For SARIF output (useful for CI and GitHub code scanning):

pathfinder scan --project . \
  --ruleset python/flask/PYTHON-FLASK-SEC-003 \
  --output sarif \
  --output-file results.sarif

To scan with all Python/Flask rules at once:

pathfinder scan --project . --ruleset python/flask

The scanner outputs the file, line number, and the full taint chain from source to sink. On GitHub, it can post inline review comments directly on pull requests pointing to the exact lines.

Try it live: Open PYTHON-FLASK-SEC-003 in the playground to run this rule against the cross-file test case in your browser. No install needed.

190+ rules ship with scope="global" out of the box, covering SQL injection, command injection, path traversal, SSRF, XSS, deserialization, and more across Flask, Django, FastAPI, and Pyramid. Browse them all in the rule registry.