Skip to content

Security Architecture

ScamShield AI handles adversarial scam messages that may contain prompt injection attempts, PII from victims, and sensitive financial data. This document covers all security layers: input sanitization, authentication, OIDC verification, rate limiting, and PII logging rules.


Security Layers

flowchart TD
    subgraph "Layer 1: Network"
        CORS["CORS (Allow-Origin: *)"]
        METHOD["Method Check (POST only)"]
    end

    subgraph "Layer 2: Authentication"
        APIKEY["API Key Validation<br/>(x-api-key header)"]
        OIDC["OIDC Token Verification<br/>(Cloud Tasks callbacks)"]
    end

    subgraph "Layer 3: Rate Limiting"
        SESSION_RATE["Per-Session Limits<br/>(100 total, 10/min)"]
    end

    subgraph "Layer 4: Input Sanitization"
        SANITIZE["Prompt Injection Filter<br/>(12 regex patterns)"]
        TRUNCATE["Message Truncation<br/>(2000 chars max)"]
        HISTORY["History Truncation<br/>(20 messages max)"]
    end

    subgraph "Layer 5: Output Safety"
        GEMINI_SAFETY["Gemini Safety Settings<br/>(harassment/hate: BLOCK_NONE)"]
        PII_LOG["PII Logging Rules<br/>(no values at INFO level)"]
    end

    CORS --> METHOD --> APIKEY --> SESSION_RATE --> SANITIZE --> TRUNCATE --> HISTORY --> GEMINI_SAFETY --> PII_LOG

Input Sanitization

File: functions/utils/sanitizer.py

All scammer messages are sanitized before being embedded in Gemini prompts. This prevents prompt injection attacks where a scammer crafts a message designed to override the persona's instructions.

Injection Patterns (12 patterns)

_INJECTION_PATTERNS = [
    # System/instruction override attempts
    re.compile(r"(system|instruction|prompt)\s*:", re.IGNORECASE),
    re.compile(r"ignore\s+(all\s+)?(previous|above|prior)\s+(instructions?|prompts?|rules?)", re.IGNORECASE),
    re.compile(r"you\s+are\s+now\s+(a|an|the)\s+", re.IGNORECASE),
    re.compile(r"forget\s+(everything|all|your)\s+(above|previous|prior)", re.IGNORECASE),
    re.compile(r"new\s+(instructions?|rules?|prompt)\s*:", re.IGNORECASE),

    # Delimiter escape attempts
    re.compile(r"[=]{3,}"),     # ═══ persona prompt delimiters
    re.compile(r"[-]{5,}"),     # ----- section breaks
    re.compile(r"[#]{3,}"),     # ### markdown headers

    # Role play override attempts
    re.compile(r"(respond|act|behave)\s+as\s+(a|an|if)\s+", re.IGNORECASE),
    re.compile(r"(do\s+not|don'?t|stop)\s+(be|being|act|playing)\s+", re.IGNORECASE),

    # Code/tag injection
    re.compile(r"`{3,}"),                                          # Triple backticks
    re.compile(r"<\s*/?\s*(system|assistant|user|human|ai)\s*>", re.IGNORECASE),  # XML role tags
    re.compile(r"<\|[^|]*\|>"),                                    # Special delimiters
    re.compile(r"\[INST\]", re.IGNORECASE),                        # Llama-style instruction markers

    # Structure manipulation
    re.compile(r"\n{5,}"),             # Excessive newlines
    re.compile(r"\\u[0-9a-fA-F]{4}"),  # Unicode escape sequences
]

Each matched pattern is replaced with [FILTERED], preserving the rest of the message for scam analysis.

Message Limits

Limit Value Purpose
MAX_MESSAGE_LENGTH 2000 chars Prevents memory pressure from extremely long messages
MAX_HISTORY_MESSAGES 20 messages Limits conversation history size in prompt context

Processing Flow

def sanitize_message(text: str) -> str:
    if not text:
        return ""
    text = text[:MAX_MESSAGE_LENGTH]           # Truncate first
    for pattern in _INJECTION_PATTERNS:
        text = pattern.sub("[FILTERED]", text)  # Replace injection patterns
    return text.strip()

sanitize_history() applies sanitize_message() to each message in the conversation history, limited to the most recent 20 messages.

Sanitization Order

Truncation happens before pattern matching. This prevents attackers from placing injection payloads after 2000 characters to bypass the filter, where the truncation would remove the [FILTERED] replacement but leave the payload intact in a different processing path.


API Key Authentication

File: functions/guvi/handler.py -- validate_api_key()

Inbound Authentication

The guvi_honeypot endpoint requires an x-api-key header. The expected key is read from the SCAMSHIELD_API_KEY environment variable (populated from GCP Secret Manager).

def validate_api_key(request: https_fn.Request) -> bool:
    expected_key = os.environ.get("SCAMSHIELD_API_KEY")
    if not expected_key:
        if os.environ.get("K_SERVICE"):
            logger.error("SCAMSHIELD_API_KEY not set in production - denying request")
            return False
        logger.warning("SCAMSHIELD_API_KEY not set - allowing all requests (dev mode)")
        return True
    provided_key = request.headers.get("x-api-key", "")
    return provided_key == expected_key

Production behavior (K_SERVICE is set by Cloud Functions):

  • SCAMSHIELD_API_KEY configured: Standard key comparison.
  • SCAMSHIELD_API_KEY missing: All requests denied with error log.

Development behavior (no K_SERVICE):

  • SCAMSHIELD_API_KEY configured: Standard key comparison.
  • SCAMSHIELD_API_KEY missing: All requests allowed with warning log.

Outbound Authentication

The callback service (guvi/callback.py) sends the same SCAMSHIELD_API_KEY in the x-api-key header when posting to GUVI's updateHoneyPotFinalResult endpoint.

Constant-Time Comparison

The current implementation uses Python == for key comparison. For a production deployment, this should be replaced with hmac.compare_digest() to prevent timing side-channel attacks. In the hackathon context with short-lived keys, this is an accepted trade-off.


OIDC Token Verification

File: functions/utils/oidc.py -- verify_cloud_tasks_token()

The send_delayed_callback Cloud Function is invoked by Cloud Tasks, not by external users. To prevent unauthorized calls, it verifies the OIDC Bearer token attached by Cloud Tasks.

Verification Flow

sequenceDiagram
    participant CT as Cloud Tasks
    participant CF as send_delayed_callback
    participant GOOGLE as Google OIDC Keys

    CT->>CF: POST /send_delayed_callback<br/>Authorization: Bearer <oidc_token>
    CF->>CF: Extract token from Authorization header
    CF->>GOOGLE: verify_oauth2_token(token)
    GOOGLE-->>CF: claims {email, aud, iss, ...}
    CF->>CF: Verify claims.email == {project}@appspot.gserviceaccount.com
    CF-->>CT: 200 OK (or 403 Unauthorized)

Implementation

def verify_cloud_tasks_token(request) -> tuple[bool, str]:
    # Skip in local dev (no K_SERVICE)
    if not os.environ.get("K_SERVICE"):
        return True, ""

    # Require Bearer token
    auth_header = request.headers.get("Authorization", "")
    if not auth_header.startswith("Bearer "):
        return False, "Unauthorized"

    token = auth_header[len("Bearer "):]

    # Verify against Google's public keys
    claims = id_token.verify_oauth2_token(token, google_requests.Request())

    # Verify service account email
    expected_sa = f"{project_id}@appspot.gserviceaccount.com"
    if claims.get("email") != expected_sa:
        return False, "Unauthorized"

    return True, ""

Checks performed:

  1. Token presence: Authorization: Bearer <token> header required.
  2. Token validity: Verified against Google's public OIDC keys (signature, expiry, issuer).
  3. Service account identity: Token email must match {project_id}@appspot.gserviceaccount.com.

Local development: OIDC verification is skipped when K_SERVICE is not set.


Rate Limiting

File: functions/utils/rate_limiter.py -- check_rate_limit()

Limits

Limit Value Scope
MAX_MESSAGES_PER_SESSION 100 Lifetime of a session
MAX_MESSAGES_PER_MINUTE 10 Per-minute sliding window

Implementation

Rate limiting uses Firestore transactions to ensure atomic read-increment-write:

flowchart TD
    START["check_rate_limit(session_id)"]
    START --> TX["Begin Firestore Transaction"]
    TX --> READ["Read rate_limits/{session_id}"]
    READ --> EXISTS{"Document exists?"}

    EXISTS -->|"No"| CREATE["Create: total=1, minute_count=1"]
    EXISTS -->|"Yes"| CHECK_TOTAL{"total >= 100?"}

    CHECK_TOTAL -->|"Yes"| DENY_TOTAL["DENY: Session limit reached"]
    CHECK_TOTAL -->|"No"| CHECK_MINUTE{"Same minute?"}

    CHECK_MINUTE -->|"New minute"| RESET["Reset: minute_count=1"]
    CHECK_MINUTE -->|"Same minute"| CHECK_BURST{"minute_count >= 10?"}

    CHECK_BURST -->|"Yes"| DENY_BURST["DENY: Rate limit reached (10/min)"]
    CHECK_BURST -->|"No"| INCREMENT["Increment: total+1, minute_count+1"]

    CREATE --> ALLOW["ALLOW: (True, '')"]
    RESET --> ALLOW
    INCREMENT --> ALLOW

Fail-open behavior: If Firestore is unavailable or the transaction fails, the request is allowed. This prioritizes availability over strict enforcement.

Minute window: Tracked via minute_key = int(now / 60). When the key changes (new minute), the per-minute counter resets to 1.

Rate-Limited Response

When a request is rate-limited, the handler returns a valid GuviResponse with a stalling reply:

{
    "status": "success",
    "reply": "Ek minute ruko beta, bahut zyada messages aa rahe hain. Thoda der mein baat karte hain.",
    "scamDetected": false,
    "agentNotes": "Rate limited -- stalling scammer"
}

This ensures the evaluator does not score the turn as a failure.


Gemini Safety Settings

File: functions/gemini/client.py

The Gemini client configures safety settings to allow scam-related content through for analysis:

Category Threshold Rationale
HARM_CATEGORY_HARASSMENT BLOCK_NONE Scam messages contain threats and intimidation
HARM_CATEGORY_HATE_SPEECH BLOCK_NONE Scammers may use discriminatory language
HARM_CATEGORY_SEXUALLY_EXPLICIT BLOCK_MEDIUM_AND_ABOVE Only sextortion scams need analysis; block graphic content
HARM_CATEGORY_DANGEROUS_CONTENT BLOCK_NONE Scam tactics involve dangerous claims (arrest threats, financial fraud)

Circuit Breakers

The system uses pybreaker circuit breakers to prevent cascading failures:

Gemini API Circuit Breaker

gemini_breaker = pybreaker.CircuitBreaker(
    fail_max=5,          # Opens after 5 failures
    reset_timeout=30,    # Stays open for 30 seconds
    name="gemini_api",
)

When open, classification falls back to keyword-based scoring. Response generation returns a static Hinglish fallback message.

GUVI Callback Circuit Breaker

callback_breaker = pybreaker.CircuitBreaker(
    fail_max=3,          # Opens after 3 failures
    reset_timeout=60,    # Stays open for 60 seconds
    name="guvi_callback",
)

When open, callbacks are skipped for the duration. Since callbacks are sent every turn, the next turn will retry once the breaker resets.


PII Logging Rules

File: .claude/rules/security.md

Level Allowed Forbidden
INFO Session IDs, scam types, evidence counts ("found 2 UPI IDs"), confidence scores, persona names Full scammer messages, evidence values (actual UPI IDs, bank accounts, phone numbers), API keys
WARNING Rate limit reasons, failed auth source IPs, circuit breaker state changes Evidence values, API keys
ERROR Exception types, stack traces (via logger.exception()), error messages Scammer message content, evidence values
DEBUG Evidence values (for local troubleshooting), full messages API keys (even partial)

Examples

# CORRECT (INFO level)
logger.info(f"Processing session={session_id}")
logger.info(f"Accumulated evidence: UPIs={len(upis)}, Accounts={len(accounts)}")

# INCORRECT (would log PII at INFO)
logger.info(f"Found UPI: {upi_id}")  # DO NOT log actual values
logger.info(f"Scammer said: {message}")  # DO NOT log message content

Secrets Management

Secret Store Consumers Injected Via
GEMINI_API_KEY GCP Secret Manager gemini/client.py Firebase Functions secrets=["GEMINI_API_KEY"] decorator
SCAMSHIELD_API_KEY GCP Secret Manager guvi/handler.py, guvi/callback.py Firebase Functions secrets=["SCAMSHIELD_API_KEY"] decorator
DASHBOARD_PIN GCP Secret Manager dashboard/utils/auth.py Cloud Run --set-secrets

Local Development

  • Copy .env.example to .env.local and fill in values.
  • For the dashboard: create dashboard/.streamlit/secrets.toml manually.
  • Both files are gitignored.

Production

  • Secrets are set once via firebase functions:secrets:set SECRET_NAME.
  • The deploy workflow (GitHub Actions) uses Workload Identity Federation (keyless OIDC) -- no stored service account keys.
  • Cloud Run secrets are mapped via --set-secrets in the deploy command; entrypoint.sh converts environment variables to secrets.toml at container startup.

Threat Model Summary

Threat Mitigation File
Prompt injection via scammer message 12 regex sanitization patterns + 2000 char truncation utils/sanitizer.py
Unauthorized API access API key validation with production-mandatory enforcement guvi/handler.py
Unauthorized Cloud Tasks invocation OIDC token verification against Google public keys utils/oidc.py
DoS via message flooding Per-session rate limiting (100 total, 10/min) utils/rate_limiter.py
Gemini API outage Circuit breaker + keyword fallback classification + static response fallback gemini/client.py
GUVI callback outage Circuit breaker + per-turn retry (next turn resends with latest data) guvi/callback.py
Firestore outage In-memory fallback for all session operations firestore/sessions.py
PII leakage in logs Level-based logging rules: no evidence values at INFO+ .claude/rules/security.md
Timing side-channel on API key Acknowledged limitation; hmac.compare_digest() recommended for production guvi/handler.py
Model hallucination (persona identity drift) Stable facts section in persona prompts; turn-aware directives gemini/prompts/personas/