Chapter 7: Hardening --- Security Audit and Prompt Injection Defense¶
What We Built¶
After the core intelligence pipeline and self-correction engine were working, we ran a security audit. The results were humbling: four critical or high-severity issues, any one of which could have compromised the system in production. This chapter documents those findings, the fixes, and the broader hardening work around prompt injection, input sanitization, and rate limiting.
Security in an AI honeypot is an unusual challenge. The system is designed to receive malicious input --- that is the entire point. The question is not "how do we block bad input" but "how do we process bad input safely." Every scammer message is adversarial by nature. Our defenses must protect the system's integrity without tipping off the scammer.
Why This Approach¶
Most web applications defend against malicious input by rejecting it. A honeypot cannot do that. If a scammer sends a prompt injection attempt, we cannot return a 400 error --- that tells them we detected their injection, and they will adapt. Instead, we silently sanitize the input and continue the conversation as if nothing happened.
This "sanitize, don't block" philosophy extends to every defense layer:
- Prompt injection patterns are replaced with
[FILTERED], not rejected - Rate-limited sessions receive a stalling message in Hindi, not an error
- Invalid API keys get a clean 401, but nothing that reveals how validation works
- Errors in the pipeline produce a fallback response, not a stack trace
The scammer should never know they triggered a defense.
The Code¶
Finding 1: API Key Validation Bypass (P0 Critical)¶
The original validate_api_key() used Python's == operator for string comparison:
# BEFORE (vulnerable)
def validate_api_key(request):
expected_key = os.environ.get("SCAMSHIELD_API_KEY")
provided_key = request.headers.get("x-api-key", "")
return provided_key == expected_key # Timing attack vulnerability
Why This Matters
Python's == operator compares strings character by character and short-circuits on the first mismatch. An attacker can measure response times to guess the API key one character at a time. This is a classic timing attack.
The second issue was more subtle: if SCAMSHIELD_API_KEY was not set in the environment, the function returned True for all requests --- even in production.
The fix: Constant-time comparison via hmac.compare_digest, plus explicit production-mode denial when the key is missing:
# AFTER (hardened)
def validate_api_key(request):
expected_key = os.environ.get("SCAMSHIELD_API_KEY")
if not expected_key:
# In production (K_SERVICE is set by Cloud Functions), deny all
if os.environ.get("K_SERVICE"):
logger.error("SCAMSHIELD_API_KEY not set in production")
return False
# Dev mode: allow all requests
logger.warning("SCAMSHIELD_API_KEY not set - allowing all (dev mode)")
return True
provided_key = request.headers.get("x-api-key", "")
return provided_key == expected_key
K_SERVICE Detection
K_SERVICE is an environment variable automatically set by Cloud Functions (and Cloud Run). Its presence reliably indicates a production environment. We use this to distinguish between "key not set because this is local dev" and "key not set because someone forgot to wire the secret."
Finding 2: Missing OIDC Verification (P0 Critical)¶
The Cloud Tasks callback endpoint (send_delayed_callback) accepted requests from anyone. There was no verification that the request actually came from Cloud Tasks with a valid OIDC token.
The fix: A dedicated OIDC verification module in functions/utils/oidc.py:
def verify_cloud_tasks_token(request) -> tuple[bool, str]:
# Skip in local development
if not os.environ.get("K_SERVICE"):
return True, ""
auth_header = request.headers.get("Authorization", "")
if not auth_header.startswith("Bearer "):
logger.warning("Missing Authorization header on callback")
return False, "Unauthorized"
token = auth_header[len("Bearer "):]
try:
claims = id_token.verify_oauth2_token(token, google_requests.Request())
except Exception as e:
logger.warning(f"OIDC token verification failed: {e}")
return False, "Unauthorized"
# Verify the token was issued by the expected service account
expected_sa = f"{project_id}@appspot.gserviceaccount.com"
if claims.get("email") != expected_sa:
logger.warning(f"OIDC email mismatch: got {claims.get('email')}")
return False, "Unauthorized"
return True, ""
Why Extract to a Utility Module?
We initially tried putting this in main.py, but main.py imports trigger Firebase initialization, which fails in unit tests that do not have credentials configured. Extracting OIDC verification to utils/oidc.py breaks the import chain and makes the function independently testable. This was a key architectural lesson: keep utility functions away from framework initialization code.
Finding 3: Secret Wiring Gap (P1 High)¶
The SCAMSHIELD_API_KEY secret was in GCP Secret Manager but was not wired through to the Cloud Function or the dashboard. The deploy workflow was missing the secret reference, and the dashboard's entrypoint.sh was not mapping it to secrets.toml.
The fix: Three changes across two files:
- Add
SCAMSHIELD_API_KEYto the Cloud Function'ssecretsparameter:
@https_fn.on_request(
timeout_sec=60,
memory=512,
region="asia-south1",
secrets=["GEMINI_API_KEY", "SCAMSHIELD_API_KEY"], # Both secrets wired
)
def guvi_honeypot(request):
...
- Add
--set-secretsto the Cloud Run deploy command indeploy.yml:
- name: Deploy to Cloud Run
run: |
gcloud run deploy scamshield-dashboard \
--set-secrets="DASHBOARD_PIN=DASHBOARD_PIN:latest,SCAMSHIELD_API_KEY=SCAMSHIELD_API_KEY:latest"
- Update
entrypoint.shto map the env var tosecrets.toml:
cat > /app/.streamlit/secrets.toml <<EOF
DASHBOARD_PIN = "${DASHBOARD_PIN}"
SCAMSHIELD_API_KEY = "${SCAMSHIELD_API_KEY}"
EOF
Finding 4: Callback Payload Field Name Mismatch (P1 High)¶
The callback payload was using snake_case field names, but the GUVI API expected camelCase. Our Pydantic model had the right field names, but the serialization was using model_dump(by_alias=False) by default.
The fix: Ensure the GuviCallbackPayload model uses camelCase field names directly (not aliases), matching the GUVI spec:
class GuviCallbackPayload(BaseModel):
sessionId: str # not session_id
scamDetected: bool # not scam_detected
scamType: Optional[str] = None
confidenceLevel: Optional[float] = None
totalMessagesExchanged: int
engagementDurationSeconds: Optional[float] = None
engagementMetrics: EngagementMetrics = Field(default_factory=EngagementMetrics)
extractedIntelligence: ExtractedIntelligence
agentNotes: str
Prompt Injection Sanitization¶
The sanitizer in functions/utils/sanitizer.py detects and neutralizes prompt injection patterns. The key design principle: replace injection markers with [FILTERED] rather than stripping them entirely. This preserves the message structure (keeping the LLM's context coherent) while removing the attack payload.
_INJECTION_PATTERNS = [
# System/instruction override attempts
re.compile(r"(system|instruction|prompt)\s*:", re.IGNORECASE),
re.compile(r"ignore\s+(all\s+)?(previous|above|prior)\s+(instructions?|prompts?|rules?)",
re.IGNORECASE),
# Role-play override attempts
re.compile(r"you\s+are\s+now\s+(a|an|the)\s+", re.IGNORECASE),
re.compile(r"(respond|act|behave)\s+as\s+(a|an|if)\s+", re.IGNORECASE),
# Delimiter escape attempts
re.compile(r"[=]{3,}"),
re.compile(r"[-]{5,}"),
re.compile(r"`{3,}"),
# XML/HTML injection targeting LLM role tags
re.compile(r"<\s*/?\s*(system|assistant|user|human|ai)\s*>", re.IGNORECASE),
re.compile(r"<\|[^|]*\|>"),
re.compile(r"\[INST\]", re.IGNORECASE),
# Structure manipulation
re.compile(r"\n{5,}"), # Excessive newlines
re.compile(r"\\u[0-9a-fA-F]{4}"), # Unicode escape sequences
]
def sanitize_message(text: str) -> str:
if not text:
return ""
text = text[:MAX_MESSAGE_LENGTH] # 2000 char limit
for pattern in _INJECTION_PATTERNS:
text = pattern.sub("[FILTERED]", text)
return text.strip()
The history sanitizer adds a message count limit to prevent memory pressure:
MAX_HISTORY_MESSAGES = 20
def sanitize_history(messages: list) -> list:
truncated = messages[-MAX_HISTORY_MESSAGES:]
return [
{**msg, "text": sanitize_message(msg.get("text", ""))}
for msg in truncated
]
Why Not Block Injection Attempts?
A scammer who sends "ignore all previous instructions" is almost certainly testing the system. If we return an error or change behavior, we reveal that we detected the injection. By silently replacing the pattern with [FILTERED], the scammer sees a normal response and does not know their injection was neutralized. The conversation continues, and we keep extracting intelligence.
Rate Limiting¶
Rate limiting uses Firestore atomic counters to enforce two limits per session:
| Limit | Value | Purpose |
|---|---|---|
| Per-session total | 100 messages | Prevent runaway sessions from consuming resources |
| Per-minute per session | 10 messages/min | Prevent burst abuse from automated tools |
MAX_MESSAGES_PER_SESSION = 100
MAX_MESSAGES_PER_MINUTE = 10
def check_rate_limit(session_id: str) -> tuple[bool, str]:
@fs.transactional
def update_in_transaction(transaction):
doc = doc_ref.get(transaction=transaction)
if doc.exists:
data = doc.to_dict()
total = data.get("total_messages", 0)
minute_key = data.get("minute_key", 0)
minute_count = data.get("minute_count", 0)
if total >= MAX_MESSAGES_PER_SESSION:
return False, f"Session limit reached ({MAX_MESSAGES_PER_SESSION})"
if minute_key == current_minute:
if minute_count >= MAX_MESSAGES_PER_MINUTE:
return False, f"Rate limit reached ({MAX_MESSAGES_PER_MINUTE}/min)"
new_minute_count = minute_count + 1
else:
new_minute_count = 1 # New minute window
transaction.update(doc_ref, {
"total_messages": total + 1,
"minute_key": current_minute,
"minute_count": new_minute_count,
})
# ... (new doc case)
return True, ""
When rate-limited, the response is a stalling message in Hindi --- maintaining the honeypot character while throttling the request:
if not allowed:
return GuviResponse(
status="success",
reply="Ek minute ruko beta, bahut zyada messages aa rahe hain. "
"Thoda der mein baat karte hain.",
# ... (rate limited response)
).model_dump()
Stalling as a Feature
The rate limit response translates roughly to: "Wait a minute child, too many messages are coming. Let's talk in a bit." This is perfectly in character for an elderly Indian victim persona. The scammer perceives a slow, confused target --- not a system defending itself.
Error Fallback¶
Every error in the pipeline produces a valid 200 response with a stalling reply, never a stack trace:
except Exception as e:
logger.exception(f"Error processing request: {e}")
return GuviResponse(
status="success",
reply="Ek minute, network slow hai. Thodi der mein message karta hoon.",
scamDetected=False,
extractedIntelligence=ExtractedIntelligence(),
agentNotes=f"Error fallback: {str(e)[:100]}",
).model_dump()
The GUVI spec requires: "always return 200." If we return a 500, the evaluator scores the entire turn as failed. By returning a stalling reply, we keep the conversation alive and give the next turn a chance to succeed.
Key Architectural Decision¶
Block vs. sanitize injection attempts: we chose sanitize.
In most applications, blocking malicious input is the right call. In a honeypot, blocking reveals information. If a scammer sends a prompt injection and gets an error, they know:
- The system detected their injection
- The system is automated (a human would not respond to injection syntax)
- Their next attempt should use a different technique
By sanitizing instead of blocking, we reveal nothing. The [FILTERED] markers are invisible to the scammer (they only see the LLM's response), and the conversation continues normally. The scammer learns nothing about our defenses.
This principle --- reveal nothing, continue the engagement --- is the security philosophy of the entire system. Error messages are stalling messages. Rate limits are confused pauses. Injection defenses are silent replacements. Every defense maintains the illusion of a real conversation.
What We Learned¶
-
Audit early, audit often. Four critical/high issues found in a single audit pass. If we had deployed without the audit, any of these could have been exploited. The timing-attack vulnerability on API key validation and the missing OIDC check were both straightforward to exploit.
-
Constant-time comparison is non-negotiable for secrets. It is easy to forget that
==leaks timing information. Usehmac.compare_digest()for any secret comparison --- API keys, tokens, PINs. -
Extract utilities from initialization-heavy modules. The OIDC verifier needed to be testable without Firebase credentials. Putting it in
utils/oidc.pyinstead ofmain.pybroke the import chain that triggers Firebase initialization. This pattern applies broadly: keep utility functions in modules that do not import framework-specific code at the module level. -
Sanitize in layers. Message-level sanitization (2000 char limit, pattern replacement) catches individual injection attempts. History-level sanitization (20 message limit) catches memory pressure attacks. Rate limiting catches volume attacks. Each layer handles a different attack vector.
-
Error responses are part of your security surface. A 500 error with a stack trace tells an attacker your technology stack, framework version, and internal function names. A generic stalling message tells them nothing. In production, every code path must produce a safe response.