Security Architecture¶
ScamShield AI handles adversarial scam messages that may contain prompt injection attempts, PII from victims, and sensitive financial data. This document covers all security layers: input sanitization, authentication, OIDC verification, rate limiting, and PII logging rules.
Security Layers¶
flowchart TD
subgraph "Layer 1: Network"
CORS["CORS (Allow-Origin: *)"]
METHOD["Method Check (POST only)"]
end
subgraph "Layer 2: Authentication"
APIKEY["API Key Validation<br/>(x-api-key header)"]
OIDC["OIDC Token Verification<br/>(Cloud Tasks callbacks)"]
end
subgraph "Layer 3: Rate Limiting"
SESSION_RATE["Per-Session Limits<br/>(100 total, 10/min)"]
end
subgraph "Layer 4: Input Sanitization"
SANITIZE["Prompt Injection Filter<br/>(12 regex patterns)"]
TRUNCATE["Message Truncation<br/>(2000 chars max)"]
HISTORY["History Truncation<br/>(20 messages max)"]
end
subgraph "Layer 5: Output Safety"
GEMINI_SAFETY["Gemini Safety Settings<br/>(harassment/hate: BLOCK_NONE)"]
PII_LOG["PII Logging Rules<br/>(no values at INFO level)"]
end
CORS --> METHOD --> APIKEY --> SESSION_RATE --> SANITIZE --> TRUNCATE --> HISTORY --> GEMINI_SAFETY --> PII_LOG
Input Sanitization¶
File: functions/utils/sanitizer.py
All scammer messages are sanitized before being embedded in Gemini prompts. This prevents prompt injection attacks where a scammer crafts a message designed to override the persona's instructions.
Injection Patterns (12 patterns)¶
_INJECTION_PATTERNS = [
# System/instruction override attempts
re.compile(r"(system|instruction|prompt)\s*:", re.IGNORECASE),
re.compile(r"ignore\s+(all\s+)?(previous|above|prior)\s+(instructions?|prompts?|rules?)", re.IGNORECASE),
re.compile(r"you\s+are\s+now\s+(a|an|the)\s+", re.IGNORECASE),
re.compile(r"forget\s+(everything|all|your)\s+(above|previous|prior)", re.IGNORECASE),
re.compile(r"new\s+(instructions?|rules?|prompt)\s*:", re.IGNORECASE),
# Delimiter escape attempts
re.compile(r"[=]{3,}"), # ═══ persona prompt delimiters
re.compile(r"[-]{5,}"), # ----- section breaks
re.compile(r"[#]{3,}"), # ### markdown headers
# Role play override attempts
re.compile(r"(respond|act|behave)\s+as\s+(a|an|if)\s+", re.IGNORECASE),
re.compile(r"(do\s+not|don'?t|stop)\s+(be|being|act|playing)\s+", re.IGNORECASE),
# Code/tag injection
re.compile(r"`{3,}"), # Triple backticks
re.compile(r"<\s*/?\s*(system|assistant|user|human|ai)\s*>", re.IGNORECASE), # XML role tags
re.compile(r"<\|[^|]*\|>"), # Special delimiters
re.compile(r"\[INST\]", re.IGNORECASE), # Llama-style instruction markers
# Structure manipulation
re.compile(r"\n{5,}"), # Excessive newlines
re.compile(r"\\u[0-9a-fA-F]{4}"), # Unicode escape sequences
]
Each matched pattern is replaced with [FILTERED], preserving the rest of the message for scam analysis.
Message Limits¶
| Limit | Value | Purpose |
|---|---|---|
MAX_MESSAGE_LENGTH |
2000 chars | Prevents memory pressure from extremely long messages |
MAX_HISTORY_MESSAGES |
20 messages | Limits conversation history size in prompt context |
Processing Flow¶
def sanitize_message(text: str) -> str:
if not text:
return ""
text = text[:MAX_MESSAGE_LENGTH] # Truncate first
for pattern in _INJECTION_PATTERNS:
text = pattern.sub("[FILTERED]", text) # Replace injection patterns
return text.strip()
sanitize_history() applies sanitize_message() to each message in the conversation history, limited to the most recent 20 messages.
Sanitization Order
Truncation happens before pattern matching. This prevents attackers from placing injection payloads after 2000 characters to bypass the filter, where the truncation would remove the [FILTERED] replacement but leave the payload intact in a different processing path.
API Key Authentication¶
File: functions/guvi/handler.py -- validate_api_key()
Inbound Authentication¶
The guvi_honeypot endpoint requires an x-api-key header. The expected key is read from the SCAMSHIELD_API_KEY environment variable (populated from GCP Secret Manager).
def validate_api_key(request: https_fn.Request) -> bool:
expected_key = os.environ.get("SCAMSHIELD_API_KEY")
if not expected_key:
if os.environ.get("K_SERVICE"):
logger.error("SCAMSHIELD_API_KEY not set in production - denying request")
return False
logger.warning("SCAMSHIELD_API_KEY not set - allowing all requests (dev mode)")
return True
provided_key = request.headers.get("x-api-key", "")
return provided_key == expected_key
Production behavior (K_SERVICE is set by Cloud Functions):
SCAMSHIELD_API_KEYconfigured: Standard key comparison.SCAMSHIELD_API_KEYmissing: All requests denied with error log.
Development behavior (no K_SERVICE):
SCAMSHIELD_API_KEYconfigured: Standard key comparison.SCAMSHIELD_API_KEYmissing: All requests allowed with warning log.
Outbound Authentication¶
The callback service (guvi/callback.py) sends the same SCAMSHIELD_API_KEY in the x-api-key header when posting to GUVI's updateHoneyPotFinalResult endpoint.
Constant-Time Comparison
The current implementation uses Python == for key comparison. For a production deployment, this should be replaced with hmac.compare_digest() to prevent timing side-channel attacks. In the hackathon context with short-lived keys, this is an accepted trade-off.
OIDC Token Verification¶
File: functions/utils/oidc.py -- verify_cloud_tasks_token()
The send_delayed_callback Cloud Function is invoked by Cloud Tasks, not by external users. To prevent unauthorized calls, it verifies the OIDC Bearer token attached by Cloud Tasks.
Verification Flow¶
sequenceDiagram
participant CT as Cloud Tasks
participant CF as send_delayed_callback
participant GOOGLE as Google OIDC Keys
CT->>CF: POST /send_delayed_callback<br/>Authorization: Bearer <oidc_token>
CF->>CF: Extract token from Authorization header
CF->>GOOGLE: verify_oauth2_token(token)
GOOGLE-->>CF: claims {email, aud, iss, ...}
CF->>CF: Verify claims.email == {project}@appspot.gserviceaccount.com
CF-->>CT: 200 OK (or 403 Unauthorized)
Implementation¶
def verify_cloud_tasks_token(request) -> tuple[bool, str]:
# Skip in local dev (no K_SERVICE)
if not os.environ.get("K_SERVICE"):
return True, ""
# Require Bearer token
auth_header = request.headers.get("Authorization", "")
if not auth_header.startswith("Bearer "):
return False, "Unauthorized"
token = auth_header[len("Bearer "):]
# Verify against Google's public keys
claims = id_token.verify_oauth2_token(token, google_requests.Request())
# Verify service account email
expected_sa = f"{project_id}@appspot.gserviceaccount.com"
if claims.get("email") != expected_sa:
return False, "Unauthorized"
return True, ""
Checks performed:
- Token presence:
Authorization: Bearer <token>header required. - Token validity: Verified against Google's public OIDC keys (signature, expiry, issuer).
- Service account identity: Token email must match
{project_id}@appspot.gserviceaccount.com.
Local development: OIDC verification is skipped when K_SERVICE is not set.
Rate Limiting¶
File: functions/utils/rate_limiter.py -- check_rate_limit()
Limits¶
| Limit | Value | Scope |
|---|---|---|
MAX_MESSAGES_PER_SESSION |
100 | Lifetime of a session |
MAX_MESSAGES_PER_MINUTE |
10 | Per-minute sliding window |
Implementation¶
Rate limiting uses Firestore transactions to ensure atomic read-increment-write:
flowchart TD
START["check_rate_limit(session_id)"]
START --> TX["Begin Firestore Transaction"]
TX --> READ["Read rate_limits/{session_id}"]
READ --> EXISTS{"Document exists?"}
EXISTS -->|"No"| CREATE["Create: total=1, minute_count=1"]
EXISTS -->|"Yes"| CHECK_TOTAL{"total >= 100?"}
CHECK_TOTAL -->|"Yes"| DENY_TOTAL["DENY: Session limit reached"]
CHECK_TOTAL -->|"No"| CHECK_MINUTE{"Same minute?"}
CHECK_MINUTE -->|"New minute"| RESET["Reset: minute_count=1"]
CHECK_MINUTE -->|"Same minute"| CHECK_BURST{"minute_count >= 10?"}
CHECK_BURST -->|"Yes"| DENY_BURST["DENY: Rate limit reached (10/min)"]
CHECK_BURST -->|"No"| INCREMENT["Increment: total+1, minute_count+1"]
CREATE --> ALLOW["ALLOW: (True, '')"]
RESET --> ALLOW
INCREMENT --> ALLOW
Fail-open behavior: If Firestore is unavailable or the transaction fails, the request is allowed. This prioritizes availability over strict enforcement.
Minute window: Tracked via minute_key = int(now / 60). When the key changes (new minute), the per-minute counter resets to 1.
Rate-Limited Response¶
When a request is rate-limited, the handler returns a valid GuviResponse with a stalling reply:
{
"status": "success",
"reply": "Ek minute ruko beta, bahut zyada messages aa rahe hain. Thoda der mein baat karte hain.",
"scamDetected": false,
"agentNotes": "Rate limited -- stalling scammer"
}
This ensures the evaluator does not score the turn as a failure.
Gemini Safety Settings¶
File: functions/gemini/client.py
The Gemini client configures safety settings to allow scam-related content through for analysis:
| Category | Threshold | Rationale |
|---|---|---|
HARM_CATEGORY_HARASSMENT |
BLOCK_NONE |
Scam messages contain threats and intimidation |
HARM_CATEGORY_HATE_SPEECH |
BLOCK_NONE |
Scammers may use discriminatory language |
HARM_CATEGORY_SEXUALLY_EXPLICIT |
BLOCK_MEDIUM_AND_ABOVE |
Only sextortion scams need analysis; block graphic content |
HARM_CATEGORY_DANGEROUS_CONTENT |
BLOCK_NONE |
Scam tactics involve dangerous claims (arrest threats, financial fraud) |
Circuit Breakers¶
The system uses pybreaker circuit breakers to prevent cascading failures:
Gemini API Circuit Breaker¶
gemini_breaker = pybreaker.CircuitBreaker(
fail_max=5, # Opens after 5 failures
reset_timeout=30, # Stays open for 30 seconds
name="gemini_api",
)
When open, classification falls back to keyword-based scoring. Response generation returns a static Hinglish fallback message.
GUVI Callback Circuit Breaker¶
callback_breaker = pybreaker.CircuitBreaker(
fail_max=3, # Opens after 3 failures
reset_timeout=60, # Stays open for 60 seconds
name="guvi_callback",
)
When open, callbacks are skipped for the duration. Since callbacks are sent every turn, the next turn will retry once the breaker resets.
PII Logging Rules¶
File: .claude/rules/security.md
| Level | Allowed | Forbidden |
|---|---|---|
| INFO | Session IDs, scam types, evidence counts ("found 2 UPI IDs"), confidence scores, persona names | Full scammer messages, evidence values (actual UPI IDs, bank accounts, phone numbers), API keys |
| WARNING | Rate limit reasons, failed auth source IPs, circuit breaker state changes | Evidence values, API keys |
| ERROR | Exception types, stack traces (via logger.exception()), error messages |
Scammer message content, evidence values |
| DEBUG | Evidence values (for local troubleshooting), full messages | API keys (even partial) |
Examples¶
# CORRECT (INFO level)
logger.info(f"Processing session={session_id}")
logger.info(f"Accumulated evidence: UPIs={len(upis)}, Accounts={len(accounts)}")
# INCORRECT (would log PII at INFO)
logger.info(f"Found UPI: {upi_id}") # DO NOT log actual values
logger.info(f"Scammer said: {message}") # DO NOT log message content
Secrets Management¶
| Secret | Store | Consumers | Injected Via |
|---|---|---|---|
GEMINI_API_KEY |
GCP Secret Manager | gemini/client.py |
Firebase Functions secrets=["GEMINI_API_KEY"] decorator |
SCAMSHIELD_API_KEY |
GCP Secret Manager | guvi/handler.py, guvi/callback.py |
Firebase Functions secrets=["SCAMSHIELD_API_KEY"] decorator |
DASHBOARD_PIN |
GCP Secret Manager | dashboard/utils/auth.py |
Cloud Run --set-secrets |
Local Development¶
- Copy
.env.exampleto.env.localand fill in values. - For the dashboard: create
dashboard/.streamlit/secrets.tomlmanually. - Both files are gitignored.
Production¶
- Secrets are set once via
firebase functions:secrets:set SECRET_NAME. - The deploy workflow (GitHub Actions) uses Workload Identity Federation (keyless OIDC) -- no stored service account keys.
- Cloud Run secrets are mapped via
--set-secretsin the deploy command;entrypoint.shconverts environment variables tosecrets.tomlat container startup.
Threat Model Summary¶
| Threat | Mitigation | File |
|---|---|---|
| Prompt injection via scammer message | 12 regex sanitization patterns + 2000 char truncation | utils/sanitizer.py |
| Unauthorized API access | API key validation with production-mandatory enforcement | guvi/handler.py |
| Unauthorized Cloud Tasks invocation | OIDC token verification against Google public keys | utils/oidc.py |
| DoS via message flooding | Per-session rate limiting (100 total, 10/min) | utils/rate_limiter.py |
| Gemini API outage | Circuit breaker + keyword fallback classification + static response fallback | gemini/client.py |
| GUVI callback outage | Circuit breaker + per-turn retry (next turn resends with latest data) | guvi/callback.py |
| Firestore outage | In-memory fallback for all session operations | firestore/sessions.py |
| PII leakage in logs | Level-based logging rules: no evidence values at INFO+ | .claude/rules/security.md |
| Timing side-channel on API key | Acknowledged limitation; hmac.compare_digest() recommended for production |
guvi/handler.py |
| Model hallucination (persona identity drift) | Stable facts section in persona prompts; turn-aware directives | gemini/prompts/personas/ |