Skip to content

Adding a New Evidence Extractor

Evidence extractors use regex patterns to identify scam-related data in conversation text: UPI IDs, bank accounts, phone numbers, and more. This tutorial walks through adding a new extractor type end-to-end.

Extractor Architecture

functions/extractors/
  regex_patterns.py    # All regex-based extractors + extract_all_evidence()
  keywords.py          # Keyword category detection

Evidence flows through the system as follows:

Scammer message text
  → extract_all_evidence(text)      # Returns dict with snake_case keys
  → transform_to_guvi_format(dict)  # Converts to camelCase for GUVI API
  → ExtractedIntelligence(**dict)    # Pydantic model
  → Stored in Firestore + sent in response/callback

Step 1: Add the Regex Pattern

Open functions/extractors/regex_patterns.py and add your pattern. Follow the existing section structure.

Example: Adding GSTIN (GST Identification Number) Extraction

GSTIN format: 2-digit state code + 10-char PAN + 1 entity number + Z + 1 check digit. Example: 27AADCB2230M1ZR

# ============== GSTIN EXTRACTION ==============

# GSTIN: 2-digit state code (01-37) + PAN (10 chars) + entity number + Z + check digit
GSTIN_PATTERN = re.compile(
    r'\b(\d{2}[A-Z]{5}\d{4}[A-Z]\d[A-Z][A-Z0-9])\b'
)

# Valid state codes (01-37, covering all Indian states/UTs)
_VALID_STATE_CODES = {str(i).zfill(2) for i in range(1, 38)}


def extract_gstin(text: str) -> List[str]:
    """Extract GST Identification Numbers from text.

    Valid format: 27AADCB2230M1ZR (2-digit state + PAN + entity + Z + check).
    State code must be between 01 and 37.

    Args:
        text: Conversation text to search.

    Returns:
        List of valid GSTIN strings found.
    """
    matches = GSTIN_PATTERN.findall(text.upper())
    valid = [m for m in matches if m[:2] in _VALID_STATE_CODES]
    return sorted(set(valid))

Pattern Design Guidelines

  1. Use word boundaries (\b) to prevent matching inside larger strings
  2. Validate beyond regex -- Use post-match validation (state codes, checksums, length) to reduce false positives
  3. Document the format in the docstring with examples
  4. Handle case insensitivity -- Either use re.IGNORECASE or normalize with .upper()/.lower()
  5. Return sorted, deduplicated lists -- Use sorted(set(...)) for consistency

Step 2: Wire into extract_all_evidence()

Add your extractor to the extract_all_evidence() function in the same file:

def extract_all_evidence(text: str) -> Dict[str, Any]:
    """Extract all evidence types from text."""
    urls = extract_urls(text)
    emails = extract_emails(text)
    upi_ids = extract_upi_ids(text)

    upi_set = {u.lower() for u in upi_ids}
    emails = [e for e in emails if e.lower() not in upi_set]

    return {
        "upi_ids": upi_ids,
        "bank_accounts": extract_bank_accounts(text),
        "phone_numbers": extract_phone_numbers(text),
        "email_addresses": emails,
        "urls": urls,
        "phishing_links": [url for url in urls if is_suspicious_url(url)],
        "amounts": extract_amounts(text),
        "ifsc_codes": extract_ifsc_codes(text),
        "aadhaar_numbers": extract_aadhaar_numbers(text),
        "pan_numbers": extract_pan_numbers(text),
        "crypto_wallets": extract_crypto_wallets(text),
        "case_ids": extract_case_ids(text),
        "policy_numbers": extract_policy_numbers(text),
        "order_numbers": extract_order_numbers(text),
        "gstin_numbers": extract_gstin(text),          # Add here
    }

Step 3: Update transform_to_guvi_format()

Add the camelCase mapping:

def transform_to_guvi_format(evidence: Dict[str, Any]) -> Dict[str, Any]:
    """Transform internal snake_case evidence to GUVI camelCase format."""
    phishing = evidence.get("phishing_links", [])
    all_urls = evidence.get("urls", [])
    merged_links = sorted(set(phishing) | set(all_urls))

    return {
        "bankAccounts": evidence.get("bank_accounts", []),
        "upiIds": evidence.get("upi_ids", []),
        # ... existing fields ...
        "orderNumbers": evidence.get("order_numbers", []),
        "gstinNumbers": evidence.get("gstin_numbers", []),  # Add here
    }

Step 4: Add Field to ExtractedIntelligence Model

If this is a new evidence type (not fitting into an existing field), add it to the Pydantic model in functions/guvi/models.py:

class ExtractedIntelligence(BaseModel):
    """Intelligence extracted during the honeypot conversation."""

    bankAccounts: List[str] = Field(default_factory=list, ...)
    upiIds: List[str] = Field(default_factory=list, ...)
    # ... existing fields ...
    orderNumbers: List[str] = Field(default_factory=list, ...)
    gstinNumbers: List[str] = Field(                          # Add here
        default_factory=list,
        description="GST Identification Numbers",
    )

Step 5: Add Keywords (If Relevant)

If your evidence type is associated with specific scam keywords, add them to functions/extractors/keywords.py:

KEYWORD_CATEGORIES = {
    # ... existing categories ...
    "tax_fraud": [
        "GST", "GSTIN", "GST registration", "GST refund",
        "tax return", "income tax refund", "TDS refund",
        "GST verification", "GST expired",
    ],
}

Update category_weights in get_keyword_score() if the new category should affect the suspicion score:

category_weights = {
    # ... existing weights ...
    "tax_fraud": 0.10,
}

Step 6: Write Parametrized Tests

Create comprehensive tests in tests/guvi/test_extractors.py:

import pytest
from extractors.regex_patterns import extract_gstin


class TestGSTINExtraction:
    @pytest.mark.parametrize(
        "text,expected",
        [
            # Valid GSTIN
            ("GST number: 27AADCB2230M1ZR", ["27AADCB2230M1ZR"]),
            ("GSTIN: 07AAECR4756Q1Z2", ["07AAECR4756Q1Z2"]),

            # Multiple GSTINs
            (
                "Seller: 27AADCB2230M1ZR, Buyer: 09AAECI3721N1Z8",
                ["09AAECI3721N1Z8", "27AADCB2230M1ZR"],
            ),

            # Invalid state code (99 is not a valid state)
            ("GSTIN 99AADCB2230M1ZR", []),

            # Too short / too long
            ("27AADCB2230M1Z", []),    # Missing check digit
            ("27AADCB2230M1ZRX", []),  # Extra character

            # No GSTIN present
            ("No tax info here", []),
            ("Random numbers 1234567890", []),

            # Case insensitivity (should normalize to uppercase)
            ("gstin: 27aadcb2230m1zr", ["27AADCB2230M1ZR"]),

            # Embedded in longer text
            (
                "Please verify your GST registration 27AADCB2230M1ZR before proceeding",
                ["27AADCB2230M1ZR"],
            ),
        ],
    )
    def test_gstin_extraction(self, text, expected):
        result = extract_gstin(text)
        assert result == expected

    def test_gstin_not_confused_with_pan(self):
        """PAN is 10 characters; GSTIN is 15. Ensure no cross-contamination."""
        text = "PAN: ABCPD1234F is different from GSTIN: 27ABCPD1234F1Z5"
        result = extract_gstin(text)
        assert "ABCPD1234F" not in result

Test Coverage Checklist

  • [ ] Standard valid patterns (at least 3 examples)
  • [ ] Multiple matches in one text
  • [ ] No matches when pattern is absent
  • [ ] Invalid format variations (too short, too long, wrong characters)
  • [ ] False positive prevention (similar patterns that should NOT match)
  • [ ] Case handling (uppercase, lowercase, mixed)
  • [ ] Pattern embedded in surrounding text
  • [ ] Edge cases specific to your pattern (e.g., boundary digits for state codes)

Step 7: Test Edge Cases

Pay special attention to false positives. Common traps:

Overlapping Patterns

def test_gstin_does_not_match_random_alphanumeric(self):
    """15-char alphanumeric strings that happen to match the regex."""
    text = "Reference: AB12345678901CD"  # Not a real GSTIN
    result = extract_gstin(text)
    assert result == []

Interaction with Other Extractors

def test_no_cross_contamination_with_bank_accounts(self):
    """Ensure GSTIN digits aren't extracted as bank accounts."""
    text = "GSTIN: 27AADCB2230M1ZR"
    from extractors.regex_patterns import extract_bank_accounts
    banks = extract_bank_accounts(text)
    assert banks == []  # The digits in GSTIN should not match

Complete Example: Evidence Flow

Here is how a new extractor integrates with the full pipeline:

1. Scammer sends: "Verify your GST: 27AADCB2230M1ZR immediately"

2. Handler calls _extract_evidence_from_full_conversation()
   → extract_all_evidence(text)
     → extract_gstin(text) returns ["27AADCB2230M1ZR"]
   → transform_to_guvi_format(evidence)
     → {"gstinNumbers": ["27AADCB2230M1ZR"], ...}

3. ExtractedIntelligence model created:
   → evidence.gstinNumbers == ["27AADCB2230M1ZR"]

4. Evidence stored in Firestore session + evidence_index

5. Response includes:
   → "extractedIntelligence": {"gstinNumbers": ["27AADCB2230M1ZR"], ...}

6. Callback includes cumulative evidence

Existing Extractors Reference

Extractor Function Pattern Examples
UPI IDs extract_upi_ids() fraud@oksbi, scammer@paytm
Bank Accounts extract_bank_accounts() 9-18 digit numbers with context
Phone Numbers extract_phone_numbers() 9876543210, +91-8765432109, 011-23456789
Emails extract_emails() fake@domain.com
URLs extract_urls() http://..., www.scam.xyz
Amounts extract_amounts() Rs. 50,000, 5 lakh rupees
IFSC Codes extract_ifsc_codes() SBIN0001234
Aadhaar Numbers extract_aadhaar_numbers() 12 digits, Verhoeff-validated
PAN Numbers extract_pan_numbers() ABCPD1234F
Crypto Wallets extract_crypto_wallets() BTC, ETH, Tron addresses
Case IDs extract_case_ids() FIR-2025-12345, CBI-2025-1234
Policy Numbers extract_policy_numbers() POL12345678, LIC policy 12345678
Order Numbers extract_order_numbers() OD123456789, AWB1234567890