Skip to content

Data Flow

How data moves through the system — from external APIs to stored artifacts and queryable history.

Pipeline

One pipeline, two depths. light (daily triage) and full (weekly assessment) produce the same artifact structure. CLI writes JSON artifacts at each stage; API reads them to serve the frontend. SQLite write-through provides queryable history, but JSON on disk is authoritative.

Gather → Reflect → Hypothesize → Research → Score → Annotate → Finalize
  P1       P1.5        P2          P2.5       P3       P4          P5

Artifacts: artifacts/reports/weekly/{DATE}/{RUN_ID}/

Phase Actor Daily (light) Weekly (full)
1 — Gather Automation yes yes
1.5 — Reflect Agent yes
2 — Hypothesize Agent scan only full assess + generate
2.5 — Research Agent yes
3 — Score Agent yes
4 — Annotate Agent executive_summary only full annotations
5 — Finalize Automation yes

Phase 1 — Gather (Automation)

openfin review gather --depth light|fullReviewOrchestrator.run(depth=...)

Gather step light full Source
Positions yes yes app.list_positions() (SnapTrade)
Market data (indices, sectors) yes yes app.get_market_overview() (Finnhub)
Macro indicators yes yes app.get_macro_snapshot() (FRED)
Macro calendar yes yes investments/macro_calendar.yaml (manual)
Watchlist quotes yes yes app.get_watchlist_report() (Finnhub)
News per symbol yes yes app.get_watchlist_news() (Finnhub)
Quotes yes yes app.get_quotes() (Finnhub)
Forex, commodities yes yes app.get_forex_quotes() / app.get_commodity_quotes()
Earnings dates yes yes app.get_earnings_report() (Finnhub)
Hypotheses yes yes get_hypotheses_for_thesis() (DB)
Search yes app.get_batch_symbol_search() (Brave)
Prior scores yes get_score_history() (DB)
Analysis signals yes get_latest_snapshot("stock_analysis") (DB)
Social signals yes get_latest_snapshot("social_signal") (DB)

Then:

  1. compute_portfolio_context() — aggregates positions into total value, cost basis, P&L, day change, per-symbol weight%.
  2. _compute_risk_snapshot() — real risk metrics: concentration (>15% weight), volatility (>3% day / >10% week).
  3. Queries hypotheses — active + recently resolved per thesis from thesis_hypotheses.
  4. Computes thesis health — recency-weighted confirmed/invalidated ratio → untested/strong/mixed/weakening/failing.
  5. Computes time pressure — elapsed time vs thesis time_horizon → early/mid/late/overdue.
  6. Loads macro calendarinvestments/macro_calendar.yaml filtered to next 30 days, rendered with countdown labels.
  7. Builds context packets — thesis packets (with hypotheses, health, pressure, upcoming earnings), holding packets, and legacy symbol contexts.
  8. Persists artifacts, inserts review_runs row (status="collecting").

Context packets

Two primary packet types (used for scoring):

Thesis packets (theses/{SLUG}.md) — one per active thesis. Contains: - Narrative, time horizon, thesis health (from hypothesis outcomes), time pressure - Active hypotheses (claim + invalidation + linked evidence) and recently resolved hypotheses - Upcoming earnings dates with countdown - Per-symbol: market data + valuation, enriched position (P&L, weight%), prior review scores, analysis signals, social signals, news (with age labels like "2d ago"), search

Holding packets (holdings/{SYMBOL}.md) — one per portfolio position. Contains: - Enriched position: qty, avg price, market value, cost basis, P&L, gain%, weight%, tax status - Prior review scores, analysis signals, social signals - Market data + valuation, news (with age labels), search, thesis references

Legacy packets (context/{SYMBOL}.md) — deprecated, still generated for backward compat.

Artifacts: inputs.json (ReviewInputs), context.json (ReviewContext), theses/{SLUG}.md, holdings/{SYMBOL}.md, summary.json (ReviewSummary), overview.md (with catalyst calendar section)

Phase 1.5 — Reflect on Prior Week (Agent, weekly only)

Agent retrieves prior run context and compares against this week's data:

openfin review list -n 5                    # find last finalized run
openfin review show <PRIOR_RUN_ID>          # prior scores, annotations, decisions
openfin review lookup <THESIS_SLUG>         # observation history
openfin thesis status <SLUG>                # hypothesis state + health

Reflection covers: hypothesis quality (were resolved hypotheses well-formed?), decision accuracy (did last week's actions play out?), thesis evolution (did catalyst states shift?).

Recorded as an annotation:

openfin review annotate --run-id <RUN_ID> --field "obs:<thesis-slug>" \
  --value "REFLECTION: <what changed, hypothesis quality, decision accuracy>"

Data flow: DB reads (review_runs, rubric_scores, review_annotations, thesis_hypotheses) → agent reasoning → observations.json + review_annotations row.

Phase 2 — Hypothesis Assessment (Agent)

Agent reads theses/{SLUG}.md and assesses prior hypotheses against new data:

openfin review hypothesis update <ID> --status confirmed --resolution "What happened"
openfin review hypothesis create <SLUG> --claim "If A then B" --invalidation "Unless C"
openfin review evidence NVDA --run-id ID --source-type news --claim "..." \
  --direction confirming --hypothesis-id <ID>

Hypotheses persist in thesis_hypotheses across review runs. Evidence can link to a hypothesis via --hypothesis-id. At least 3 active hypotheses per thesis (including one bear-case with BEAR: prefix).

Catalyst state assessment recorded as annotation: CATALYST_STATE: PRICED_IN|ABSORBING|DIVERGENT|IMPULSE_RISK.

Daily (light) mode: scan hypotheses for approaching horizons or contradictory evidence; flag but don't do full assessment.

Data flow: thesis packets (filesystem) → agent reasoning → thesis_hypotheses rows + decision_evidence rows + observations.json.

Phase 2.5 — Self-Directed Research (Agent, weekly only)

When packet data is thin, a hypothesis needs verification, or reflection flagged a gap:

openfin research news <SYMBOL> --limit 10          # deeper news pull
openfin research search "<specific query>"          # targeted web search
openfin research sec-filings <SYMBOL> --type 10-Q   # recent filings list
openfin research sec-read <SYMBOL> --type 8-K       # read a filing

Findings recorded as evidence linked to hypotheses. Targeted, not exhaustive — 3-5 queries per review is typical.

Data flow: external APIs → agent reasoning → decision_evidence rows (linked to hypotheses).

Phase 3 — Rubric Scoring (Agent, weekly only)

openfin review score, openfin review evidence

Agent reads thesis packets for thesis_alignment and holding packets for news_sentiment / valuation_signal:

openfin review score NVDA -m thesis_alignment -s 8 -r "Strong narrative" --run-id ID
openfin review score NVDA -m news_sentiment -s 7 -r "Positive coverage" --run-id ID
openfin review score NVDA -m valuation_signal -s 6 -r "Fair value" --run-id ID
openfin review score NVDA -m social_signal -s 5 -r "Mixed signals" --run-id ID

Each command updates scoring.json (ReviewScoring) and writes through to rubric_scores / decision_evidence tables.

Rubric weights (~/.openfin/scoring/*.yaml): thesis_alignment (0.35), news_sentiment (0.25), valuation_signal (0.25), social_signal (0.15).

Data flow: thesis + holding packets → agent reasoning → scoring.json + rubric_scores rows + decision_evidence rows.

Phase 4 — Annotation (Agent)

openfin review annotate — writes narrative fields:

openfin review annotate --run-id ID --field executive_summary --value "Markets showed..."

Fields: executive_summary, portfolio_summary, market_summary, macro_summary, watchlist_summary, news_summary → write through to summary.json. obs: prefixed fields → accumulate to observations.json + review_annotations rows. daily_summary, daily_action_items → DB only.

Data flow: agent reasoning → summary.json + observations.json + review_annotations rows.

Phase 5 — Finalize (Automation, weekly only)

openfin review finalize

  1. Composite scores: weighted average per symbol (thesis_alignment=0.35, news_sentiment=0.25, valuation_signal=0.25, social_signal=0.15). Confidence = submitted weights / total weights.
  2. Actions: BUY_MORE (≥0.70), HOLD (≥0.40), TRIM (≥0.20), EXIT (<0.20). Modulated by thesis health + time pressure.
  3. Decisions: one Decision per symbol (action, confidence, rationale, evidence, scorecard, entry_zone, exit_trigger, sizing_note).
  4. Backfills action items into summary.json, generates report.md.

Data flow: scoring.json → composite math → decisions.json + report.md + backfill rubric_scores composites + set review_runs.status = "finalized".

Artifact Tree

{RUN_ID}/
  metadata.json       # run_id, date, trigger
  inputs.json         # P1: raw data + portfolio_context + macro_events (ReviewInputs)
  context.json        # P1: symbol contexts with prior scores + signals (ReviewContext)
  theses/{SLUG}.md    # P1: per-thesis scoring packet
  holdings/{SYM}.md   # P1: per-holding scoring packet
  context/{SYM}.md    # P1: legacy per-symbol packet (deprecated)
  summary.json        # P1+4: narratives + portfolio_context + action items (ReviewSummary)
  overview.md         # P1: compact overview with catalyst calendar
  observations.json   # P1.5+2+4: accumulated obs: annotations (filesystem audit trail)
  scoring.json        # P2-3: scores + evidence (ReviewScoring)
  decisions.json      # P5: final decisions + risk snapshot (ReviewDecisions)
  report.md           # P5: final rendered report

Persistence: Dual-Write Pattern

The review pipeline uses filesystem as source of truth with best-effort DB write-through:

Agent CLI command
    ├── Write to filesystem artifact (scoring.json, observations.json, summary.json)
    └── DB write-through (swallows exceptions with warning if DB unavailable)
         ├── rubric_scores
         ├── decision_evidence
         └── review_annotations

This ensures the review can always proceed even if DB is unavailable. The DB provides queryable history and cross-run trends.

Database (write-through)

Table Phases Purpose
review_runs 1, 5 Run metadata, status (collecting → scoring → finalized)
thesis_hypotheses 2 AI-generated hypotheses, persist across runs
rubric_scores 3, 5 Score history, composite backfill at finalize
decision_evidence 2-3 Evidence audit trail (optional hypothesis_id link)
review_annotations 1.5, 2, 4 Annotation audit trail (obs: fields + summaries)
data_snapshots 1 Historical market data snapshots (versioned JSON payloads)
thesis_snapshots on save Thesis audit log (YAML is source of truth)

Payload Versioning

JSON payloads in data_snapshots and filesystem artifacts carry a _v field for schema versioning. See db/payload_version.py.

Write path: stamp(data_type, json_str) injects "_v": N before storage.

Read path: upgrade(data_type, raw) walks the upgrade chain from the payload's _v to current, strips _v, returns a clean dict for Model.model_validate().

When a model shape changes:

  1. Bump the version in CURRENT_VERSIONS[data_type].
  2. Register an upgrade function:
    @register_upgrade("position", from_version=1)
    def _pos_v1_to_v2(d: dict) -> dict:
        d.setdefault("new_field", None)
        d["_v"] = 2
        return d
    
  3. Old payloads auto-upgrade on read. New payloads get the new version stamp.

Graceful degradation: if no upgrade registered for a version gap, Pydantic defaults fill missing fields. Only removals/renames need explicit upgrade functions.

Integration points:

Path Function Stamps/Upgrades
db/repository.py:save_snapshots() Write stamp()
api/routers/portfolio.py:_parse_positions() Read upgrade("position")
utils/watchlist.py:load_portfolio_positions() Read upgrade("position")
services/review_orchestrator.py:_persist() Write stamp("review_*")
services/review_orchestrator.py:load_cached_report() Read upgrade("review_*")
cli/review.py (score/finalize/annotate) Read+Write upgrade() + stamp()
services/review_orchestrator.py:_gather_analysis_signals() Read upgrade("stock_analysis")
services/review_orchestrator.py:_gather_social_signals() Read upgrade("social_signal")

Pre-versioning data (no _v field) is treated as version 0.

API Routes

Endpoint Source Returns
GET /api/runs/ review_runs table Run list with status/dates
GET /api/runs/{id}/summary summary.json Narratives + portfolio context
GET /api/runs/{id}/decisions decisions.json Decisions + risk snapshot + scorecards
GET /api/runs/{id}/scoring scoring.json Scores + evidence
GET /api/runs/{id}/artifacts/theses/{file} theses/{file} Thesis packet
GET /api/runs/{id}/artifacts/holdings/{file} holdings/{file} Holding packet
GET /api/market/ inputs.json Indices, sectors, macro
GET /api/scores/history/{symbol} rubric_scores table Historical scores

Detail views read JSON from disk. Only /api/runs/ and /api/scores/history/ query the DB.

Notes

  • The system does not call LLM APIs. All AI reasoning happens externally — the agent reads packets, reasons, and writes results back via CLI commands.
  • Narrative fields in summary.json are agent-written text, not computed.
  • Structured data lives in inputs.json; /api/market/ serves it directly.
  • PortfolioContext is computed at collection time and persisted in both inputs.json and summary.json.
  • Thesis health and time pressure are computed at query time from thesis_hypotheses, not stored.
  • Hypotheses outlive review runs — they track the evolving understanding of a thesis across weeks.
  • observations.json accumulates during the review and is persisted to review_annotations rows for cross-run queries.