# PYTHON-DESER-001: Unsafe Pickle Deserialization

> **Severity:** CRITICAL | **CWE:** CWE-502

- **Language:** Python
- **Category:** Deserialization
- **URL:** https://codepathfinder.dev/registry/python/deserialization/PYTHON-DESER-001
- **Detection:** `pathfinder scan --ruleset python/PYTHON-DESER-001 --project .`

## Description

Unsafe pickle deserialization: Untrusted data flows to pickle.loads() which can execute arbitrary code. Use json.loads() instead.

## Vulnerable Code

```python
import pickle
from flask import Flask, request

app = Flask(__name__)

@app.route('/api/load_data', methods=['POST'])
def load_user_data():
    """
    CRITICAL VULNERABILITY: Deserializing untrusted pickle data!
    """
    # Source: User-controlled input
    serialized_data = request.data

    # Sink: Unsafe deserialization
    user_data = pickle.loads(serialized_data)  # RCE here!

    return {'data': user_data}

# Attack:
# POST /api/load_data
# Body: <malicious pickle payload>
# Result: Arbitrary code execution on server
```

## Secure Code

```python
import json
from flask import Flask, request
import hmac
import hashlib

app = Flask(__name__)
SECRET_KEY = 'your-secret-key-here'

@app.route('/api/load_data', methods=['POST'])
def load_user_data():
    """
    SECURE: Use JSON for untrusted data, not pickle!
    """
    try:
        # Use JSON instead of pickle
        user_data = json.loads(request.data)
        return {'data': user_data}
    except json.JSONDecodeError:
        return {'error': 'Invalid JSON'}, 400

# If you MUST use pickle with trusted sources:
@app.route('/api/load_trusted', methods=['POST'])
def load_trusted_data():
    """
    LESS UNSAFE: Verify HMAC signature before unpickling.
    Only use this for data you control!
    """
    data = request.get_json()
    signed_data = base64.b64decode(data['signed_data'])
    signature = data['signature']

    # Verify HMAC signature
    expected = hmac.new(SECRET_KEY.encode(), signed_data, hashlib.sha256).hexdigest()
    if not hmac.compare_digest(signature, expected):
        return {'error': 'Invalid signature'}, 403

    # Only unpickle if signature is valid
    obj = pickle.loads(signed_data)
    return {'data': str(obj)}
```

## Detection Rule (Python SDK)

```python
from rules.python_decorators import python_rule
from codepathfinder import calls, flows
from codepathfinder.presets import PropagationPresets


@python_rule(
    id="PYTHON-DESER-001",
    name="Unsafe Pickle Deserialization",
    severity="CRITICAL",
    category="deserialization",
    cwe="CWE-502",
    cve="CVE-2021-3177",
    tags="python,deserialization,pickle,rce,untrusted-data,owasp-a08,cwe-502,remote-code-execution,critical,security,intra-procedural",
    message="Unsafe pickle deserialization: Untrusted data flows to pickle.loads() which can execute arbitrary code. Use json.loads() instead.",
    owasp="A08:2021",
)
def detect_pickle_deserialization():
    """
    Detects unsafe pickle deserialization where user input flows to pickle.loads() within a single function.

    LIMITATION: Only detects intra-procedural flows (within one function).
    Will NOT detect if user input is in one function and pickle.loads is in another.

    Example vulnerable code:
        user_data = request.data
        obj = pickle.loads(user_data)  # RCE!
    """
    return flows(
        from_sources=[
            calls("request.data"),
            calls("request.get_data"),
            calls("request.GET"),
            calls("request.POST"),
            calls("request.COOKIES"),
            calls("input"),
            calls("*.data"),
            calls("*.GET"),
            calls("*.POST"),
            calls("*.read"),
            calls("*.recv"),
        ],
        to_sinks=[
            calls("pickle.loads"),
            calls("pickle.load"),
            calls("_pickle.loads"),
            calls("_pickle.load"),
            calls("*.loads"),
            calls("*.load"),
        ],
        sanitized_by=[
            calls("*.validate"),
            calls("*.verify_signature"),
            calls("*.verify"),
            calls("hmac.compare_digest"),
        ],
        propagates_through=PropagationPresets.standard(),
        scope="local",  # CRITICAL: Only intra-procedural analysis works
    )
```

---

Source: https://codepathfinder.dev/registry/python/deserialization/PYTHON-DESER-001
Code Pathfinder — Open source, type-aware SAST with cross-file dataflow analysis
