Chapter 5: Intelligence Pipeline --- Cross-Session Learning¶
What We Built¶
The intelligence pipeline is the operational core of ScamShield AI. It coordinates five stages --- classification, persona selection, evidence extraction, state management, and callback reporting --- into a single processing pass for every incoming scammer message. But the real power is not in the per-turn pipeline. It is in what happens across sessions: an evidence index in Firestore that lets the system recognize repeat offenders and adjust its behavior before the scammer even finishes their opening line.
This chapter walks through the orchestrator that ties everything together, the callback service that reports intelligence to the evaluation system, and the cross-session learning mechanism that turns isolated conversations into a connected intelligence network.
Why This Approach¶
A naive honeypot processes each message in isolation: classify, respond, done. That works for demos, but it fails in two important ways:
-
Evidence is fragile. A scammer mentions their UPI ID in message 3, their phone number in message 7, and their bank account in message 9. If you only extract from the latest message, you lose context. If you re-extract from the full history every turn, you need to deduplicate. We chose to extract from the full conversation on every turn and merge with accumulated evidence using set-union deduplication.
-
Scammers recycle identities. The same UPI ID
fraud@oksbimight appear across dozens of sessions. Without cross-session linking, each session starts from zero. With an evidence index, the system can flag a known scammer on the first message of a new session and immediately adopt more aggressive extraction tactics.
The orchestrator pattern gives us a single coordination point where all these concerns are handled in sequence, with clear data flow between stages.
The Code¶
The Orchestrator¶
The Orchestrator class lives in functions/engine/orchestrator.py. Its process() method is the pipeline entry point:
class Orchestrator:
def process(
self,
session: SessionState,
new_message: str,
conversation_history: List[Dict],
metadata: GuviMetadata,
cross_session_match: Optional[Dict] = None,
context: Optional[PipelineContext] = None,
pre_extracted_evidence: Optional[ExtractedIntelligence] = None,
) -> ProcessingResult:
# Step 0: Inject turn-aware quality directives
self._inject_quality_directives(context, session.message_count + 1)
# Step 1: Classify scam type (even for known scammers --- they switch tactics)
scam_type, confidence = self._classify_message(...)
# Step 1b: Cross-session confidence boost (proportional to match count)
if is_known_scammer:
boost = 0.1 * min(match_count, 3) # Max 0.3 for 3+ matches
confidence = min(confidence + boost, 0.95)
# Step 2: Select persona based on scam type + language
persona = self._select_persona(...)
# Step 3: Use pre-extracted evidence (avoids redundant regex pass)
evidence = pre_extracted_evidence or self._extract_evidence(full_text)
# Step 4: Self-correction --- determine strategy adjustment
strategy_adjustment = determine_strategy_adjustment(...)
# Step 5: Generate persona response (with all context injected)
response = self._generate_persona_response(...)
# Step 6: Determine conversation state
state = self._determine_state(...)
return ProcessingResult(...)
Notice the design: the handler pre-extracts evidence before calling the orchestrator. This lets the handler use that evidence for cross-session lookup, then pass it through to avoid a redundant regex pass inside the orchestrator. One extraction, two consumers.
Evidence Accumulation and Deduplication¶
Evidence accumulates across turns using set-union merging in functions/firestore/sessions.py:
EVIDENCE_FIELDS = [
"upiIds", "bankAccounts", "phoneNumbers", "phishingLinks",
"emailAddresses", "suspiciousKeywords", "ifscCodes",
"cryptoWallets", "aadhaarNumbers", "panNumbers",
"amounts", "caseIds", "policyNumbers", "orderNumbers",
]
def merge_evidence_locally(
existing_evidence: Dict[str, Any],
new_evidence: Dict[str, Any],
) -> Dict[str, Any]:
merged = {}
for field in EVIDENCE_FIELDS:
existing = existing_evidence.get(field, [])
new = new_evidence.get(field, [])
combined = list(set(existing) | set(new))
limit = _FIELD_LIMITS.get(field)
if limit is not None:
combined = combined[:limit]
merged[field] = combined
return merged
The _FIELD_LIMITS dict caps suspiciousKeywords at 15 entries to prevent unbounded growth. Every other field is uncapped because high-value evidence (UPI IDs, bank accounts) is rare and each item is worth preserving.
The Callback Service¶
The callback service in functions/guvi/callback.py sends intelligence reports to the evaluation endpoint. It uses a circuit breaker pattern (via pybreaker) to avoid hammering a failing endpoint:
# Circuit breaker: opens after 3 failures, stays open for 60s
callback_breaker = pybreaker.CircuitBreaker(
fail_max=3,
reset_timeout=60,
name="guvi_callback",
)
class GuviCallbackService:
GUVI_CALLBACK_URL = "https://hackathon.guvi.in/api/updateHoneyPotFinalResult"
def send_final_result(self, session, scam_type, total_messages, scam_detected=None):
payload = GuviCallbackPayload(
sessionId=session.guvi_session_id,
scamDetected=final_scam_detected,
scamType=scam_type,
confidenceLevel=round(session.confidence, 2),
totalMessagesExchanged=actual_message_count,
extractedIntelligence=session.extracted_evidence,
agentNotes=self._generate_agent_notes(session, scam_type, actual_message_count),
# ...
)
try:
return callback_breaker.call(self._send_with_retries, payload, session_id)
except pybreaker.CircuitBreakerError:
logger.error(f"Circuit breaker OPEN -- skipping session={session_id}")
return False
The retry logic is deliberately conservative: MAX_RETRIES = 1 per turn. Since callbacks are sent on every turn (more on this in Chapter 9), a failed attempt will be retried with fresher data on the next message. There is no need for aggressive retry --- the next turn is the retry.
Cross-Session Evidence Index¶
The evidence index is the mechanism that enables cross-session learning. Each evidence item (UPI ID, phone number, bank account) gets its own document in the evidence_index Firestore collection, with a list of all sessions where it appeared:
def store_evidence_index(session_id, evidence, scam_type, source):
for ev_type, value in evidence_items:
key = _normalize_evidence_key(ev_type, value) # e.g., "upi:fraud@oksbi"
doc_ref = db.collection(EVIDENCE_INDEX_COLLECTION).document(key)
if doc.exists:
updates = {
"sessions": ArrayUnion([session_id]), # Race-safe append
"last_seen": timestamp,
}
if scam_type:
updates["scam_types"] = ArrayUnion([scam_type])
doc_ref.update(updates)
else:
doc_ref.set({
"type": ev_type,
"value": value,
"sessions": [session_id],
"scam_types": [scam_type] if scam_type else [],
"first_seen": timestamp,
"total_occurrences": 1,
})
ArrayUnion for Race Safety
Firestore's ArrayUnion is critical here. Multiple Cloud Function instances may be processing different sessions concurrently. A naive read-modify-write would lose updates. ArrayUnion atomically appends to the array without reading first.
The lookup side uses find_matching_evidence(), which searches the index for any evidence items that match the current session's extracted data. When a match is found, the handler passes it to the orchestrator, which boosts confidence and injects aggressive extraction tactics into the persona prompt.
The Callback Flow¶
Here is the full sequence from message arrival to intelligence report:
sequenceDiagram
participant GUVI as GUVI Evaluator
participant Handler as Webhook Handler
participant Firestore as Firestore
participant Orchestrator as Orchestrator
participant Gemini as Gemini Flash
participant Callback as Callback Service
participant GuviAPI as GUVI API
GUVI->>Handler: POST /guvi_honeypot (scammer message)
Handler->>Handler: Validate API key + rate limit
Handler->>Firestore: Get/create session
Handler->>Handler: Extract evidence (regex + keywords)
Handler->>Firestore: find_matching_evidence()
Firestore-->>Handler: Cross-session matches
Handler->>Orchestrator: process(session, message, evidence, matches)
Orchestrator->>Gemini: Classify scam type
Gemini-->>Orchestrator: KYC_BANKING (0.85)
Orchestrator->>Orchestrator: Select persona + strategy adjustment
Orchestrator->>Gemini: Generate persona response
Gemini-->>Orchestrator: "Haan ji, mera account verify karna hai..."
Orchestrator-->>Handler: ProcessingResult
Handler->>Handler: Merge evidence (set union)
Handler->>Firestore: Batch update session (single write)
Handler->>Firestore: Store evidence index
Handler->>Callback: send_final_result()
Callback->>GuviAPI: POST updateHoneyPotFinalResult
GuviAPI-->>Callback: 200 OK
Handler-->>GUVI: GuviResponse (reply + intelligence)
Cloud Tasks Scheduler (Delayed Callback)¶
We also built a delayed callback mechanism using Cloud Tasks in functions/tasks/callback_scheduler.py. The idea: schedule a callback to fire after 10 seconds of inactivity, cancelling and rescheduling on each new message:
CALLBACK_DELAY_SECONDS = 10
def schedule_callback_task(session_id: str) -> bool:
# Cancel existing task (if any)
try:
client.delete_task(name=task_name)
except Exception:
pass # Task doesn't exist, that's fine
# Schedule new task with OIDC authentication
task = {
"name": task_name,
"http_request": {
"url": callback_url,
"body": json.dumps({"sessionId": session_id}).encode(),
"oidc_token": {
"service_account_email": CLOUD_TASKS_SA_EMAIL,
"audience": callback_url,
},
},
"schedule_time": timestamp, # now + 10s
}
response = client.create_task(parent=parent, task=task)
OIDC Authentication Is Non-Negotiable
The Cloud Tasks scheduler uses OIDC tokens so the callback endpoint can verify the request came from Cloud Tasks, not an arbitrary caller. Without this, anyone who discovers the callback endpoint URL could trigger fake callbacks. See Chapter 7 for the full OIDC verification story.
Key Architectural Decision¶
Immediate vs. delayed callback: we chose both.
The original design used only delayed callbacks --- wait for the conversation to end (10 seconds of inactivity), then send the intelligence report. This made logical sense: wait until you have all the evidence before reporting.
But the evaluation system could stop at any turn. If the evaluator stopped at turn 3 and we had not sent a callback yet, we scored zero on intelligence reporting. The delayed approach was architecturally clean but operationally fragile.
The solution: send callbacks on every turn starting from turn 1, using the GUVI endpoint's overwrite semantics (updateHoneyPotFinalResult --- "update" means each call replaces the previous). The delayed callback via Cloud Tasks remains as a safety net for edge cases where the handler response completes but the per-turn callback fails.
This "belt and suspenders" approach means the evaluation system always has our latest intelligence, regardless of when it decides to stop the conversation.
What We Learned¶
-
Pre-extract, then pass through. Running evidence extraction once in the handler and passing results to the orchestrator eliminates redundant work. The orchestrator does not need to know how evidence was extracted --- it just needs the results.
-
Set-union is the right merge strategy for evidence. Items are either present or absent. There is no concept of "partially extracted." Deduplication via Python
set()is fast and correct. -
Circuit breakers prevent cascade failures. When the GUVI callback endpoint goes down, the circuit breaker stops us from spending latency budget on doomed requests. The next turn will retry with fresher data.
-
Cross-session learning compounds over time. The more sessions we process, the more evidence accumulates in the index, and the faster we can identify repeat offenders. This is a flywheel: each conversation makes the system smarter for the next one.
-
Firestore's ArrayUnion is underrated. It solves the concurrent-append problem without transactions, which keeps write latency low in a serverless environment where multiple instances may be processing simultaneously.