Phase 1 — Deep Dive
// Sub-phase 1a · Maigret + Holehe · schema · module stubs
1a Module File Structure
modules/
├── username_enum/
│ ├── __init__.py
│ ├── maigret_wrapper.py ← BD implements, LD reviews
│ └── holehe_wrapper.py ← BD implements, LD reviews
├── dns_intel/
│ ├── __init__.py
│ ├── dnspython_wrapper.py ← Phase 1b
│ └── harvester_wrapper.py ← Phase 1b
├── geo_intel/
│ ├── __init__.py
│ ├── exif_extractor.py ← Phase 1c
│ └── ip_locator.py ← Phase 1c
└── nlp_intel/
├── __init__.py
└── spacy_ner.py ← Phase 1d
Phase 1a — New SQLite Tables (SA-approved schema)
CREATE TABLE IF NOT EXISTS username_hits (
id INTEGER PRIMARY KEY,
profile_file TEXT,
username TEXT,
site TEXT,
url TEXT,
status TEXT, -- found|not_found|error
response_ms INTEGER,
raw_json TEXT,
retrieved_at TEXT
);
CREATE TABLE IF NOT EXISTS email_service_hits (
id INTEGER PRIMARY KEY,
profile_file TEXT,
email TEXT,
service TEXT,
domain TEXT,
registered INTEGER, -- 1=yes 0=no
rate_limited INTEGER, -- 1=yes
raw_json TEXT,
retrieved_at TEXT
);
maigret_wrapper.py — Stub with Pseudocode
MODULE_META = {
"name": "maigret_wrapper",
"version": "1.0.0",
"phase": "recon_discovery",
"requires": ["maigret>=0.4.4"],
"license": "MIT",
"input_tables": [],
"output_tables": ["username_hits"],
}
FUNCTION validate() → bool:
// Check maigret is importable
TRY import maigret → return True
EXCEPT ImportError → print install hint → return False
FUNCTION _run_maigret(username: str) → list[dict]:
// Option A: subprocess (most reliable across maigret versions)
result = subprocess.run(
["maigret", username, "--json", "-"],
capture_output=True, timeout=120
)
PARSE result.stdout as JSON → extract site hits
RETURN [{site, url, status, response_ms, raw}]
// Option B: direct library call (faster, version-dependent)
// from maigret import MaigretSite — use if API is stable
FUNCTION _normalize(username, raw_hits, profile_file) → list[tuple]:
FOR EACH hit in raw_hits:
status = "found" if hit.status == "Claimed" else "not_found"
YIELD (profile_file, username, hit.site,
hit.url, status, hit.response_ms,
json.dumps(hit.raw), now_utc())
FUNCTION run(dry_run=False) → dict:
IF dry_run:
validate() → print status → RETURN {"dry_run": True}
IF NOT validate(): RETURN {}
conn = _init_db(DB_PATH) // creates username_hits if not exists
summary = {}
FOR EACH profile_file, profile in load_profiles(PROFILES_DIR):
usernames = _extract_usernames(profile) // profile.username + aliases
hits = 0
FOR EACH username in usernames:
raw = _run_maigret(username)
rows = _normalize(username, raw, profile_file)
conn.executemany(INSERT_SQL, rows)
hits += len(rows)
sleep(WAIT)
summary[basename(profile_file)] = hits
conn.close()
RETURN summary
holehe_wrapper.py — Stub
FUNCTION _run_holehe(email: str) → list[dict]:
// Holehe exposes a clean async Python API
import holehe.core as holehe_core
results = asyncio.run(holehe_core.holehe_check_email(email))
RETURN [{service, domain, registered: bool, rate_limited: bool, raw}]
FUNCTION run(dry_run=False) → dict:
// same contract as all modules
IF dry_run → validate, count profiles, RETURN {dry_run: True}
FOR EACH profile:
emails = _extract_emails(profile)
FOR EACH email:
hits = _run_holehe(email)
_store_email_hits(conn, profile_file, email, hits)
RETURN summary
Phase 1 — Regression Test Requirements
tests/test_phase1.py — QA Owns
class TestMaigretWrapper:
test_dry_run_returns_dict()
test_validate_checks_import()
test_normalize_maps_status() # "Claimed" → "found"
test_store_inserts_to_db() # uses tmp sqlite fixture
test_run_skips_on_no_usernames() # empty profile
test_run_handles_subprocess_error() # maigret not installed
class TestHolehe:
test_dry_run_returns_dict()
test_store_registered_flag()
test_rate_limited_flag_stored()
class TestSchemaPhase1:
test_username_hits_table_exists()
test_email_service_hits_table_exists()
test_schema_is_additive() # phase 0 tables unchanged
module_registry.json — Phase 1a Additions
{
"name": "maigret_wrapper",
"path": "modules/username_enum/maigret_wrapper.py",
"phase": "recon_discovery",
"enabled": true,
"description": "Username enumeration via Maigret across 3000+ sites",
"requires": ["maigret>=0.4.4"],
"output_tables": ["username_hits"]
},
{
"name": "holehe_wrapper",
"path": "modules/username_enum/holehe_wrapper.py",
"phase": "recon_discovery",
"enabled": true,
"description": "Email service registration check via Holehe (120+ services)",
"requires": ["holehe>=1.6.1"],
"output_tables": ["email_service_hits"]
}
Context Snapshots
// Compressed history · calibration artifacts · multi-role alignment
What a Snapshot Is
A context snapshot (SNAP-xxx) is a machine-readable + human-readable document issued by the SM at every significant handoff point. Its purpose is to give any team member — or a new LLM context — enough compressed history to resume work correctly without reading the entire project history. Every snapshot is tagged, versioned, and stored in docs/snapshots/.
PROJECT: mod-osint · OSINT orchestration platform
PHASE_COMPLETE: 0 — Foundation
PHASE_ACTIVE: 1a — username/email recon wrappers
REPO_ROOT: ~/mod-osint/
PYTHON: 3.9+ · venv · miniconda confirmed working
CANONICAL_STRUCTURE:
run.py · orchestrator.py · module_registry.json · requirements.txt
.osint_keys.example · BLUEPRINT.md · README.md · install_modules.py
modules/__init__.py
modules/profile_intel/__init__.py
modules/profile_intel/passive_identity_recon.py DONE
modules/profile_intel/breach_data_lookup.py DONE
modules/profile_intel/profile_merge.py DONE
osint_profiles/example.json DONE
SQLITE_SCHEMA_V1: results · breach_hits (both exist after first run)
MODULE_CONTRACT: run(dry_run: bool = False) → dict · MUST NOT hard-exit · MUST load config from env
LOADER: importlib.util.spec_from_file_location — venv-safe — NO sys.path games
KEY_DECISIONS:
ADR-001: modules/ (lowercase) — PEP8, case-sensitive FS safe
ADR-002: File-based module loading — eliminates venv import failures
ADR-003: Wrapper-first strategy — OSS tools over original reimplementation
ADR-004: Soft failures — modules warn+skip, never hard-exit pipeline
KNOWN_DEBT: api/auth.py · api/rbac.py — hardcoded secrets (Phase 4 fix)
REGRESSION_GATE: python run.py --dry-run exits 0 · always · on every commit
NEXT_TASK: BD: implement modules/username_enum/maigret_wrapper.py per ADR-003
REF: REF:SNAP-P0-FINAL in all Phase 1a work
INHERITS: REF:SNAP-P0-FINAL — all prior context applies
SPRINT_GOAL: Two working OSS wrappers in recon_discovery phase with schema + tests
BRANCH: feat/phase-1a-username-email (cut from develop)
NEW_TABLES_THIS_SPRINT:
username_hits (id, profile_file, username, site, url, status, response_ms, raw_json, retrieved_at)
email_service_hits (id, profile_file, email, service, domain, registered, rate_limited, raw_json, retrieved_at)
TOOLS_TO_WRAP:
maigret>=0.4.4 — subprocess preferred (json output flag) — fallback: library
holehe>=1.6.1 — async library call — asyncio.run() wrapper
ASSIGNMENTS:
BD → implement both wrappers · write unit test stubs
QA → write integration tests · fixture: tmp sqlite · fixture: mock subprocess
LD → review PRs · update registry + install_modules
SA → review schema before BD starts (no schema changes after BD starts)
SM → issue SNAP-P1A-002 on merge to develop
GATE_CRITERIA:
python run.py --list-modules → maigret_wrapper [ready]
python run.py --list-modules → holehe_wrapper [ready]
python run.py --dry-run → phase recon_discovery has 2 modules, both complete
pytest tests/test_phase1a.py -v → all pass
bandit modules/username_enum/ → no HIGH findings
BLOCKS: None at sprint start
SIGNAL: Use REF:SNAP-P1A-001 in all commits, PRs, and handoff docs this sprint
Snapshot Naming Convention
SNAP-{PHASE}-{SEQUENCE}
SNAP-P0-FINAL Phase 0 closing snapshot
SNAP-P1A-001 Phase 1a sprint 1 kickoff
SNAP-P1A-002 Phase 1a closing (post-merge)
SNAP-P1B-001 Phase 1b kickoff
SNAP-P2-001 Phase 2 kickoff
SNAP-HOTFIX-001 Emergency hotfix context
docs/snapshots/
├── SNAP-P0-FINAL.md
├── SNAP-P1A-001.md
├── SNAP-P1A-002.md ← SM writes this after 1a merges
└── ...
Every snapshot references its parent:
INHERITS: REF:SNAP-P0-FINAL
Using Snapshots for LLM Context Resumption
When development requires LLM assistance across session boundaries, paste the most recent SNAP document as the first message. The snapshot provides: current phase, completed artifacts, active branch, module contract, key decisions (ADRs), known debt, and next task. This replaces the need to re-read the entire conversation history and prevents regression to stale assumptions.
// Meta-prompt template for LLM context resumption:
"You are continuing development on mod-osint.
Current context snapshot: [paste SNAP-P1A-001]
Task: Implement modules/username_enum/maigret_wrapper.py
Contract: run(dry_run=False) → dict, no hard exits, env config only
Schema: [paste username_hits CREATE TABLE]
Style: match existing modules in modules/profile_intel/
Reference implementation: modules/profile_intel/passive_identity_recon.py"
Pseudocode & Stubs
// Phase 2 correlation engine · Phase 3 report gen · Phase 4 API
Phase 2 — Correlation Engine (entity_resolver.py)
CLASS EntityResolver:
// Reads all intelligence tables, builds canonical subject records
FUNCTION resolve_all(conn) → list[Subject]:
// Step 1: Collect all unique identifiers across all source tables
identifiers = {}
identifiers["email"] = SELECT DISTINCT query_value FROM breach_hits WHERE query_type='email'
identifiers["username"] = SELECT DISTINCT username FROM username_hits WHERE status='found'
identifiers["name"] = SELECT DISTINCT name FROM results WHERE name IS NOT NULL
identifiers["phone"] = SELECT DISTINCT query_value FROM breach_hits WHERE query_type='phone'
identifiers["ip"] = SELECT DISTINCT ip FROM ip_geo_hits
// Step 2: Group identifiers by profile_file (known subject)
by_profile = GROUP all identifiers BY profile_file
// Step 3: For each profile, create/update a Subject record
subjects = []
FOR EACH profile_file, id_set in by_profile:
subject = get_or_create_subject(conn, profile_file)
upsert_identifiers(conn, subject.id, id_set)
subject.confidence = ConfidenceScorer.score(subject, id_set)
subjects.append(subject)
// Step 4: Cross-subject linking (same email in two profiles?)
detect_cross_profile_links(conn, subjects)
RETURN subjects
CLASS ConfidenceScorer:
WEIGHTS = {
"email_exact": 1.0, // exact email match = strongest signal
"username_found": 0.7,
"name_exact": 0.6,
"name_fuzzy": 0.3,
"phone_exact": 0.9,
"ip_corroborate": 0.4,
"corroboration": 0.1, // +0.1 per additional source confirming
}
FUNCTION score(subject, id_set) → float:
score = 0.0
sources_confirming = COUNT DISTINCT source_tables for this subject
score += WEIGHTS[each matched id type]
score += WEIGHTS["corroboration"] * (sources_confirming - 1)
RETURN min(1.0, score) // clamp to [0.0, 1.0]
Phase 3 — HTML Report Generator (html_report.py)
FUNCTION generate_html_report(subject_id: int, output_path: str) → str:
// Fetch all data for this subject
subject = fetch_subject(conn, subject_id)
recon_hits = fetch_recon(conn, subject.profile_file)
breach_hits = fetch_breaches(conn, subject.profile_file)
usernames = fetch_username_hits(conn, subject.profile_file)
timeline = fetch_timeline(conn, subject_id)
geo_points = fetch_geo(conn, subject.profile_file)
// Load Jinja2 template
env = jinja2.Environment(loader=FileSystemLoader("modules/report_gen/templates"))
template = env.get_template("report.html.j2")
// Render with context
html = template.render(
subject=subject,
recon_hits=recon_hits,
breach_hits=breach_hits,
usernames=usernames,
timeline=timeline,
geo_points=geo_points,
generated_at=utcnow(),
confidence_pct=int(subject.confidence * 100)
)
WRITE html to output_path
RETURN output_path
FUNCTION generate_pdf(subject_id, output_path) → str:
html_path = generate_html_report(subject_id, "/tmp/report_tmp.html")
weasyprint.HTML(html_path).write_pdf(output_path)
RETURN output_path
Phase 4 — FastAPI Route Stubs (api/routes/profiles.py)
ROUTER = APIRouter(prefix="/api/v1/profiles", tags=["profiles"])
@ROUTER.post("/", response_model=ProfileResponse, status_code=201)
ASYNC FUNCTION create_profile(
data: ProfileCreate, // Pydantic model — validates all fields
current_user = require_role("analyst")
) → ProfileResponse:
validate_profile_data(data)
path = save_profile_json(data, PROFILES_DIR)
RETURN ProfileResponse(id=uuid(), path=path, status="created")
@ROUTER.post("/{profile_id}/run")
ASYNC FUNCTION trigger_run(
profile_id: str,
phases: list[str] = ["profile_intel"],
current_user = require_role("analyst")
) → RunResponse:
run_id = uuid()
// Dispatch to background task — don't block the HTTP response
background_tasks.add_task(execute_pipeline, run_id, profile_id, phases)
RETURN RunResponse(run_id=run_id, status="queued")
@ROUTER.get("/{profile_id}/report")
ASYNC FUNCTION get_report(
profile_id: str,
fmt: str = "json", // json | html | pdf | stix | csv
current_user = require_role("viewer")
) → FileResponse | JSONResponse:
MATCH fmt:
"pdf" → generate_pdf(profile_id) → FileResponse
"html" → generate_html(profile_id) → FileResponse
"stix" → generate_stix(profile_id) → JSONResponse
"csv" → export_csv(profile_id) → FileResponse
_ → get_merged_json(profile_id) → JSONResponse
Module Contract Compliance Checker (Phase 6 — Extension Framework)
FUNCTION validate_module_contract(module_path: str) → ValidationResult:
// Load module without executing run()
mod = spec_from_file_location(name, path)
checks = []
// Check 1: has MODULE_META
checks.append(hasattr(mod, "MODULE_META"))
// Check 2: has run() with correct signature
sig = inspect.signature(mod.run)
checks.append("dry_run" in sig.parameters)
// Check 3: run() return annotation is dict
checks.append(sig.return_annotation == dict)
// Check 4: has validate()
checks.append(hasattr(mod, "validate"))
// Check 5: has schema() if output_tables is non-empty
has_outputs = len(mod.MODULE_META.get("output_tables", [])) > 0
IF has_outputs: checks.append(hasattr(mod, "schema"))
// Check 6: dry_run=True returns without side effects
result = mod.run(dry_run=True)
checks.append(isinstance(result, dict))
checks.append(result.get("dry_run") == True)
RETURN ValidationResult(passed=all(checks), checks=checks)