Data Flow¶
How data moves through the system — from external APIs to stored artifacts and queryable history.
Pipeline¶
One pipeline, two depths. light (daily triage) and full (weekly assessment) produce the same artifact structure. CLI writes JSON artifacts at each stage; API reads them to serve the frontend. SQLite write-through provides queryable history, but JSON on disk is authoritative.
Gather → Reflect → Hypothesize → Research → Score → Annotate → Finalize
P1 P1.5 P2 P2.5 P3 P4 P5
Artifacts: artifacts/reports/weekly/{DATE}/{RUN_ID}/
| Phase | Actor | Daily (light) | Weekly (full) |
|---|---|---|---|
| 1 — Gather | Automation | yes | yes |
| 1.5 — Reflect | Agent | — | yes |
| 2 — Hypothesize | Agent | scan only | full assess + generate |
| 2.5 — Research | Agent | — | yes |
| 3 — Score | Agent | — | yes |
| 4 — Annotate | Agent | executive_summary only | full annotations |
| 5 — Finalize | Automation | — | yes |
Phase 1 — Gather (Automation)¶
openfin review gather --depth light|full → ReviewOrchestrator.run(depth=...)
| Gather step | light | full | Source |
|---|---|---|---|
| Positions | yes | yes | app.list_positions() (SnapTrade) |
| Market data (indices, sectors) | yes | yes | app.get_market_overview() (Finnhub) |
| Macro indicators | yes | yes | app.get_macro_snapshot() (FRED) |
| Macro calendar | yes | yes | investments/macro_calendar.yaml (manual) |
| Watchlist quotes | yes | yes | app.get_watchlist_report() (Finnhub) |
| News per symbol | yes | yes | app.get_watchlist_news() (Finnhub) |
| Quotes | yes | yes | app.get_quotes() (Finnhub) |
| Forex, commodities | yes | yes | app.get_forex_quotes() / app.get_commodity_quotes() |
| Earnings dates | yes | yes | app.get_earnings_report() (Finnhub) |
| Hypotheses | yes | yes | get_hypotheses_for_thesis() (DB) |
| Search | — | yes | app.get_batch_symbol_search() (Brave) |
| Prior scores | — | yes | get_score_history() (DB) |
| Analysis signals | — | yes | get_latest_snapshot("stock_analysis") (DB) |
| Social signals | — | yes | get_latest_snapshot("social_signal") (DB) |
Then:
compute_portfolio_context()— aggregates positions into total value, cost basis, P&L, day change, per-symbol weight%._compute_risk_snapshot()— real risk metrics: concentration (>15% weight), volatility (>3% day / >10% week).- Queries hypotheses — active + recently resolved per thesis from
thesis_hypotheses. - Computes thesis health — recency-weighted confirmed/invalidated ratio → untested/strong/mixed/weakening/failing.
- Computes time pressure — elapsed time vs thesis time_horizon → early/mid/late/overdue.
- Loads macro calendar —
investments/macro_calendar.yamlfiltered to next 30 days, rendered with countdown labels. - Builds context packets — thesis packets (with hypotheses, health, pressure, upcoming earnings), holding packets, and legacy symbol contexts.
- Persists artifacts, inserts
review_runsrow (status="collecting").
Context packets¶
Two primary packet types (used for scoring):
Thesis packets (theses/{SLUG}.md) — one per active thesis. Contains:
- Narrative, time horizon, thesis health (from hypothesis outcomes), time pressure
- Active hypotheses (claim + invalidation + linked evidence) and recently resolved hypotheses
- Upcoming earnings dates with countdown
- Per-symbol: market data + valuation, enriched position (P&L, weight%), prior review scores, analysis signals, social signals, news (with age labels like "2d ago"), search
Holding packets (holdings/{SYMBOL}.md) — one per portfolio position. Contains:
- Enriched position: qty, avg price, market value, cost basis, P&L, gain%, weight%, tax status
- Prior review scores, analysis signals, social signals
- Market data + valuation, news (with age labels), search, thesis references
Legacy packets (context/{SYMBOL}.md) — deprecated, still generated for backward compat.
Artifacts: inputs.json (ReviewInputs), context.json (ReviewContext), theses/{SLUG}.md, holdings/{SYMBOL}.md, summary.json (ReviewSummary), overview.md (with catalyst calendar section)
Phase 1.5 — Reflect on Prior Week (Agent, weekly only)¶
Agent retrieves prior run context and compares against this week's data:
openfin review list -n 5 # find last finalized run
openfin review show <PRIOR_RUN_ID> # prior scores, annotations, decisions
openfin review lookup <THESIS_SLUG> # observation history
openfin thesis status <SLUG> # hypothesis state + health
Reflection covers: hypothesis quality (were resolved hypotheses well-formed?), decision accuracy (did last week's actions play out?), thesis evolution (did catalyst states shift?).
Recorded as an annotation:
openfin review annotate --run-id <RUN_ID> --field "obs:<thesis-slug>" \
--value "REFLECTION: <what changed, hypothesis quality, decision accuracy>"
Data flow: DB reads (review_runs, rubric_scores, review_annotations, thesis_hypotheses) → agent reasoning → observations.json + review_annotations row.
Phase 2 — Hypothesis Assessment (Agent)¶
Agent reads theses/{SLUG}.md and assesses prior hypotheses against new data:
openfin review hypothesis update <ID> --status confirmed --resolution "What happened"
openfin review hypothesis create <SLUG> --claim "If A then B" --invalidation "Unless C"
openfin review evidence NVDA --run-id ID --source-type news --claim "..." \
--direction confirming --hypothesis-id <ID>
Hypotheses persist in thesis_hypotheses across review runs. Evidence can link to a hypothesis via --hypothesis-id. At least 3 active hypotheses per thesis (including one bear-case with BEAR: prefix).
Catalyst state assessment recorded as annotation: CATALYST_STATE: PRICED_IN|ABSORBING|DIVERGENT|IMPULSE_RISK.
Daily (light) mode: scan hypotheses for approaching horizons or contradictory evidence; flag but don't do full assessment.
Data flow: thesis packets (filesystem) → agent reasoning → thesis_hypotheses rows + decision_evidence rows + observations.json.
Phase 2.5 — Self-Directed Research (Agent, weekly only)¶
When packet data is thin, a hypothesis needs verification, or reflection flagged a gap:
openfin research news <SYMBOL> --limit 10 # deeper news pull
openfin research search "<specific query>" # targeted web search
openfin research sec-filings <SYMBOL> --type 10-Q # recent filings list
openfin research sec-read <SYMBOL> --type 8-K # read a filing
Findings recorded as evidence linked to hypotheses. Targeted, not exhaustive — 3-5 queries per review is typical.
Data flow: external APIs → agent reasoning → decision_evidence rows (linked to hypotheses).
Phase 3 — Rubric Scoring (Agent, weekly only)¶
openfin review score, openfin review evidence
Agent reads thesis packets for thesis_alignment and holding packets for news_sentiment / valuation_signal:
openfin review score NVDA -m thesis_alignment -s 8 -r "Strong narrative" --run-id ID
openfin review score NVDA -m news_sentiment -s 7 -r "Positive coverage" --run-id ID
openfin review score NVDA -m valuation_signal -s 6 -r "Fair value" --run-id ID
openfin review score NVDA -m social_signal -s 5 -r "Mixed signals" --run-id ID
Each command updates scoring.json (ReviewScoring) and writes through to rubric_scores / decision_evidence tables.
Rubric weights (~/.openfin/scoring/*.yaml): thesis_alignment (0.35), news_sentiment (0.25), valuation_signal (0.25), social_signal (0.15).
Data flow: thesis + holding packets → agent reasoning → scoring.json + rubric_scores rows + decision_evidence rows.
Phase 4 — Annotation (Agent)¶
openfin review annotate — writes narrative fields:
openfin review annotate --run-id ID --field executive_summary --value "Markets showed..."
Fields: executive_summary, portfolio_summary, market_summary, macro_summary, watchlist_summary, news_summary → write through to summary.json. obs: prefixed fields → accumulate to observations.json + review_annotations rows. daily_summary, daily_action_items → DB only.
Data flow: agent reasoning → summary.json + observations.json + review_annotations rows.
Phase 5 — Finalize (Automation, weekly only)¶
openfin review finalize
- Composite scores: weighted average per symbol (thesis_alignment=0.35, news_sentiment=0.25, valuation_signal=0.25, social_signal=0.15). Confidence = submitted weights / total weights.
- Actions:
BUY_MORE(≥0.70),HOLD(≥0.40),TRIM(≥0.20),EXIT(<0.20). Modulated by thesis health + time pressure. - Decisions: one
Decisionper symbol (action, confidence, rationale, evidence, scorecard, entry_zone, exit_trigger, sizing_note). - Backfills action items into
summary.json, generatesreport.md.
Data flow: scoring.json → composite math → decisions.json + report.md + backfill rubric_scores composites + set review_runs.status = "finalized".
Artifact Tree¶
{RUN_ID}/
metadata.json # run_id, date, trigger
inputs.json # P1: raw data + portfolio_context + macro_events (ReviewInputs)
context.json # P1: symbol contexts with prior scores + signals (ReviewContext)
theses/{SLUG}.md # P1: per-thesis scoring packet
holdings/{SYM}.md # P1: per-holding scoring packet
context/{SYM}.md # P1: legacy per-symbol packet (deprecated)
summary.json # P1+4: narratives + portfolio_context + action items (ReviewSummary)
overview.md # P1: compact overview with catalyst calendar
observations.json # P1.5+2+4: accumulated obs: annotations (filesystem audit trail)
scoring.json # P2-3: scores + evidence (ReviewScoring)
decisions.json # P5: final decisions + risk snapshot (ReviewDecisions)
report.md # P5: final rendered report
Persistence: Dual-Write Pattern¶
The review pipeline uses filesystem as source of truth with best-effort DB write-through:
Agent CLI command
├── Write to filesystem artifact (scoring.json, observations.json, summary.json)
└── DB write-through (swallows exceptions with warning if DB unavailable)
├── rubric_scores
├── decision_evidence
└── review_annotations
This ensures the review can always proceed even if DB is unavailable. The DB provides queryable history and cross-run trends.
Database (write-through)¶
| Table | Phases | Purpose |
|---|---|---|
review_runs |
1, 5 | Run metadata, status (collecting → scoring → finalized) |
thesis_hypotheses |
2 | AI-generated hypotheses, persist across runs |
rubric_scores |
3, 5 | Score history, composite backfill at finalize |
decision_evidence |
2-3 | Evidence audit trail (optional hypothesis_id link) |
review_annotations |
1.5, 2, 4 | Annotation audit trail (obs: fields + summaries) |
data_snapshots |
1 | Historical market data snapshots (versioned JSON payloads) |
thesis_snapshots |
on save | Thesis audit log (YAML is source of truth) |
Payload Versioning¶
JSON payloads in data_snapshots and filesystem artifacts carry a _v field for schema versioning. See db/payload_version.py.
Write path: stamp(data_type, json_str) injects "_v": N before storage.
Read path: upgrade(data_type, raw) walks the upgrade chain from the payload's _v to current, strips _v, returns a clean dict for Model.model_validate().
When a model shape changes:
- Bump the version in
CURRENT_VERSIONS[data_type]. - Register an upgrade function:
@register_upgrade("position", from_version=1) def _pos_v1_to_v2(d: dict) -> dict: d.setdefault("new_field", None) d["_v"] = 2 return d - Old payloads auto-upgrade on read. New payloads get the new version stamp.
Graceful degradation: if no upgrade registered for a version gap, Pydantic defaults fill missing fields. Only removals/renames need explicit upgrade functions.
Integration points:
| Path | Function | Stamps/Upgrades |
|---|---|---|
db/repository.py:save_snapshots() |
Write | stamp() |
api/routers/portfolio.py:_parse_positions() |
Read | upgrade("position") |
utils/watchlist.py:load_portfolio_positions() |
Read | upgrade("position") |
services/review_orchestrator.py:_persist() |
Write | stamp("review_*") |
services/review_orchestrator.py:load_cached_report() |
Read | upgrade("review_*") |
cli/review.py (score/finalize/annotate) |
Read+Write | upgrade() + stamp() |
services/review_orchestrator.py:_gather_analysis_signals() |
Read | upgrade("stock_analysis") |
services/review_orchestrator.py:_gather_social_signals() |
Read | upgrade("social_signal") |
Pre-versioning data (no _v field) is treated as version 0.
API Routes¶
| Endpoint | Source | Returns |
|---|---|---|
GET /api/runs/ |
review_runs table |
Run list with status/dates |
GET /api/runs/{id}/summary |
summary.json |
Narratives + portfolio context |
GET /api/runs/{id}/decisions |
decisions.json |
Decisions + risk snapshot + scorecards |
GET /api/runs/{id}/scoring |
scoring.json |
Scores + evidence |
GET /api/runs/{id}/artifacts/theses/{file} |
theses/{file} |
Thesis packet |
GET /api/runs/{id}/artifacts/holdings/{file} |
holdings/{file} |
Holding packet |
GET /api/market/ |
inputs.json |
Indices, sectors, macro |
GET /api/scores/history/{symbol} |
rubric_scores table |
Historical scores |
Detail views read JSON from disk. Only /api/runs/ and /api/scores/history/ query the DB.
Notes¶
- The system does not call LLM APIs. All AI reasoning happens externally — the agent reads packets, reasons, and writes results back via CLI commands.
- Narrative fields in
summary.jsonare agent-written text, not computed. - Structured data lives in
inputs.json;/api/market/serves it directly. PortfolioContextis computed at collection time and persisted in bothinputs.jsonandsummary.json.- Thesis health and time pressure are computed at query time from
thesis_hypotheses, not stored. - Hypotheses outlive review runs — they track the evolving understanding of a thesis across weeks.
observations.jsonaccumulates during the review and is persisted toreview_annotationsrows for cross-run queries.