if True:
    import logging
    import os
    from datetime import date, datetime, timedelta
    from typing import Any, Dict, List, Optional, Tuple

    from app.v1.services.stock_history import get_stock_history
    from app.v1.services.zerodha.client import ZerodhaClient
    from app.v1.services.early_movers_gpt import gpt_evaluate_candidates, gpt_limit_per_side, gpt_meta

    logger = logging.getLogger(__name__)

    EARLY_MOVERS_SNAPSHOT_COLLECTION = os.getenv("EARLY_MOVERS_SNAPSHOT_COLLECTION", "early_movers_snapshots")

    # Universe / filters
    MIN_DAILY_CANDLES = int(os.getenv("EARLY_MOVERS_MIN_DAILY_CANDLES", "240"))
    MIN_PRICE = float(os.getenv("EARLY_MOVERS_MIN_PRICE", "30"))
    MAX_PRICE = float(os.getenv("EARLY_MOVERS_MAX_PRICE", "3000"))
    # NOTE: Defaults are intentionally NON-ZERO to prevent NAV-like / illiquid instruments
    # from polluting the list when env is misconfigured.
    MIN_AVG_VOL_20D = float(os.getenv("EARLY_MOVERS_MIN_AVG_VOL_20D", "50000"))
    MIN_AVG_DV_20D = float(os.getenv("EARLY_MOVERS_MIN_AVG_DV_20D", "10000000"))  # 1 Cr
    MIN_VOL_DAYS_20D = int(os.getenv("EARLY_MOVERS_MIN_VOL_DAYS_20D", "20"))

    # Structural heartbeat: if ATR as % of price is too small, breakout is meaningless.
    MIN_ATR_PCT = float(os.getenv("EARLY_MOVERS_MIN_ATR_PCT", "0.15"))

    # Optional: exclude NAV-like / fund-like / ETF symbols and names.
    EXCLUDE_KEYWORDS_CSV = os.getenv(
        "EARLY_MOVERS_EXCLUDE_KEYWORDS",
        "LIQUID,ETF,BEES,GOLD,SILVER,CASH",
    )
    EXCLUDE_SUFFIXES_CSV = os.getenv(
        "EARLY_MOVERS_EXCLUDE_SUFFIXES",
        "ADD,BETF,IETF",
    )

    # Optional: treat GPT confidence as a veto for candidate inclusion.
    GPT_MIN_CONFIDENCE = float(os.getenv("EARLY_MOVERS_GPT_MIN_CONFIDENCE", "0"))

    # Output
    TOP_N = int(os.getenv("EARLY_MOVERS_TOP_N", "20"))
    MIN_SCORE = int(os.getenv("EARLY_MOVERS_MIN_SCORE", "70"))

    # Candidates used for optional GPT evaluation
    CANDIDATE_LIMIT_PER_SIDE = int(os.getenv("EARLY_MOVERS_CANDIDATE_LIMIT_PER_SIDE", "50"))

    # Final ranking blend if GPT is enabled
    GPT_BLEND_WEIGHT = float(os.getenv("EARLY_MOVERS_GPT_BLEND_WEIGHT", "0.30"))

    # Market context
    INDEX_SYMBOL = (os.getenv("EARLY_MOVERS_INDEX_SYMBOL", "NIFTY 50") or "NIFTY 50").strip().upper()


    # Risk penalties applied to the final blended score.
    # Keep conservative: only known flags penalize; unknown flags = 0.
    RISK_PENALTY_POINTS: Dict[str, int] = {
        "GAP_DAY": 6,
        "HIGH_VOLATILITY": 10,
        "VOLUME_SPIKE": 6,
        "EXTENDED_BREAKOUT": 8,
        "EXTENDED_BREAKDOWN": 8,
        "BAD_ATR": 15,
    }


    def _safe_float(v: Any) -> float:
        try:
            if v is None:
                return 0.0
            return float(v)
        except Exception:
            return 0.0


    def _parse_csv_words(v: Any) -> List[str]:
        if v is None:
            return []
        if isinstance(v, (list, tuple)):
            parts = [str(x) for x in v]
        else:
            parts = str(v).split(",")
        out: List[str] = []
        for p in parts:
            s = str(p or "").strip().upper()
            if s:
                out.append(s)
        return list(dict.fromkeys(out))


    _EXCLUDE_KEYWORDS = _parse_csv_words(EXCLUDE_KEYWORDS_CSV)
    _EXCLUDE_SUFFIXES = _parse_csv_words(EXCLUDE_SUFFIXES_CSV)


    def _is_excluded_instrument(*, symbol: str, name: Optional[str] = None) -> bool:
        sym = (symbol or "").strip().upper()
        nm = (name or "").strip().upper()
        if not sym:
            return True

        # Suffix filter (safer than substring for short tokens like "ADD").
        for suf in _EXCLUDE_SUFFIXES:
            if suf and sym.endswith(suf):
                return True

        # Substring filter in symbol or name.
        hay = f"{sym} {nm}".strip()
        for k in _EXCLUDE_KEYWORDS:
            if k and k in hay:
                return True
        return False


    def _sma(values: List[float], period: int) -> Optional[float]:
        if period <= 0:
            return None
        if len(values) < period:
            return None
        w = values[-period:]
        return sum(w) / float(period)


    def _true_ranges(candles: List[Dict[str, Any]]) -> List[float]:
        out: List[float] = []
        prev_close: Optional[float] = None
        for c in candles:
            h = _safe_float(c.get("high"))
            l = _safe_float(c.get("low"))
            cl = _safe_float(c.get("close"))
            if prev_close is None:
                tr = h - l
            else:
                tr = max(h - l, abs(h - prev_close), abs(l - prev_close))
            out.append(float(tr))
            prev_close = cl
        return out


    def _atr(candles: List[Dict[str, Any]], period: int) -> Optional[float]:
        if period <= 0:
            return None
        if len(candles) < (period + 1):
            return None
        trs = _true_ranges(candles[-(period + 1) :])
        trs = trs[-period:]
        if not trs:
            return None
        return sum(trs) / float(len(trs))


    def _pct(a: float, b: float) -> Optional[float]:
        # (a-b)/b * 100
        if b == 0:
            return None
        return (a - b) / b * 100.0


    def _return_pct(latest: float, past: float) -> Optional[float]:
        if past == 0:
            return None
        return (latest / past - 1.0) * 100.0


    def _market_context(db, zerodha_client: ZerodhaClient) -> Dict[str, Any]:
        """Compute a simple market regime tag from NIFTY.

        Keep it lightweight and robust: if index not available, return UNKNOWN.
        """
        try:
            t0 = datetime.utcnow()
            logger.info("[EarlyMovers] market_context start")
            # Some datasets don't mark instrument_type as INDEX; prefer correctness over strictness.
            idx = db["stocks"].find_one(
                {
                    "exchange": "NSE",
                    "instrument_token": {"$ne": None},
                    "symbol": INDEX_SYMBOL,
                },
                {"_id": 0, "instrument_token": 1, "symbol": 1},
            )

            if not idx:
                idx = db["stocks"].find_one(
                    {
                        "exchange": "NSE",
                        "instrument_token": {"$ne": None},
                        "symbol": {"$in": ["NIFTY 50", "NIFTY50", "NIFTY"]},
                    },
                    {"_id": 0, "instrument_token": 1, "symbol": 1},
                )

            if not idx:
                # As a last attempt, allow any instrument_type but still require an instrument_token.
                idx = db["stocks"].find_one(
                    {
                        "exchange": "NSE",
                        "instrument_token": {"$ne": None},
                        "symbol": {"$regex": r"^NIFTY", "$options": "i"},
                    },
                    {"_id": 0, "instrument_token": 1, "symbol": 1},
                )

            token = idx.get("instrument_token") if isinstance(idx, dict) else None
            if token is None:
                logger.info("[EarlyMovers] market_context done | tag=UNKNOWN (no token) ms=%d", int((datetime.utcnow() - t0).total_seconds() * 1000))
                return {"tag": "UNKNOWN", "index": INDEX_SYMBOL, "nifty_5d_return": None}

            now = datetime.utcnow()
            frm = (now.replace(hour=0, minute=0, second=0, microsecond=0) - timedelta(days=40)).strftime("%Y-%m-%d")
            to = now.strftime("%Y-%m-%d")

            logger.info("[EarlyMovers] market_context fetch start | token=%s from=%s to=%s", str(token), frm, to)
            raw = zerodha_client.get_historical_data_records(int(token), interval="day", from_date=frm, to_date=to)
            if not raw:
                logger.info("[EarlyMovers] market_context done | tag=UNKNOWN (no data) ms=%d", int((datetime.utcnow() - t0).total_seconds() * 1000))
                return {"tag": "UNKNOWN", "index": idx.get("symbol") or INDEX_SYMBOL, "nifty_5d_return": None}

            # Normalize closes
            closes: List[float] = []
            for r in raw:
                if not isinstance(r, dict):
                    continue
                closes.append(_safe_float(r.get("close")))
            closes = [c for c in closes if c > 0]
            if len(closes) < 55:
                # still usable for 5d return / SMA20
                pass

            sma20_now = _sma(closes, 20)
            sma50_now = _sma(closes, 50)
            sma20_prev = _sma(closes[:-5], 20) if len(closes) >= 25 else None
            sma50_prev = _sma(closes[:-5], 50) if len(closes) >= 55 else None

            tag = "SIDEWAYS"
            if sma20_now and sma50_now and sma20_prev and sma50_prev:
                slope20 = sma20_now - sma20_prev
                slope50 = sma50_now - sma50_prev
                if sma20_now >= sma50_now and slope20 > 0 and slope50 > 0:
                    tag = "BULLISH"
                elif sma20_now <= sma50_now and slope20 < 0 and slope50 < 0:
                    tag = "BEARISH"

            nifty_5d = None
            if len(closes) >= 6:
                nifty_5d = _return_pct(closes[-1], closes[-6])

            out = {
                "tag": tag,
                "index": idx.get("symbol") or INDEX_SYMBOL,
                "nifty_5d_return": nifty_5d,
            }
            logger.info(
                "[EarlyMovers] market_context done | tag=%s nifty_5d=%s ms=%d",
                out.get("tag"),
                out.get("nifty_5d_return"),
                int((datetime.utcnow() - t0).total_seconds() * 1000),
            )
            return out
        except Exception:
            return {"tag": "UNKNOWN", "index": INDEX_SYMBOL, "nifty_5d_return": None}


    def _parse_day(d: Any) -> Optional[date]:
        if isinstance(d, date) and not isinstance(d, datetime):
            return d
        if isinstance(d, datetime):
            return d.date()
        if isinstance(d, str) and d.strip():
            try:
                # date-only or full datetime ISO
                return datetime.fromisoformat(d.strip().replace("Z", "+00:00")).date()
            except Exception:
                return None
        return None


    def _aggregate_ohlcv(dailies: List[Dict[str, Any]], *, mode: str, limit: int = 12) -> List[Dict[str, Any]]:
        """Aggregate daily candles into weekly/monthly OHLCV.

        mode: 'week' or 'month'
        Output candle uses: date (period end), open/high/low/close/volume
        """
        if not dailies:
            return []

        buckets: Dict[str, List[Dict[str, Any]]] = {}
        keys_in_order: List[str] = []

        for c in dailies:
            dt = _parse_day(c.get("date"))
            if not dt:
                continue

            if mode == "week":
                y, w, _ = dt.isocalendar()
                k = f"{y:04d}-W{w:02d}"
            elif mode == "month":
                k = f"{dt.year:04d}-{dt.month:02d}"
            else:
                return []

            if k not in buckets:
                buckets[k] = []
                keys_in_order.append(k)
            buckets[k].append(c)

        out: List[Dict[str, Any]] = []
        for k in keys_in_order:
            rows = buckets.get(k) or []
            if not rows:
                continue

            # Ensure in chronological order within bucket
            rows_sorted = sorted(rows, key=lambda x: (_parse_day(x.get("date")) or date.min))
            o = _safe_float(rows_sorted[0].get("open"))
            h = max(_safe_float(r.get("high")) for r in rows_sorted)
            l = min(_safe_float(r.get("low")) for r in rows_sorted)
            cl = _safe_float(rows_sorted[-1].get("close"))
            v = sum(_safe_float(r.get("volume")) for r in rows_sorted)
            end_dt = _parse_day(rows_sorted[-1].get("date"))
            out.append({"date": end_dt.isoformat() if end_dt else k, "open": o, "high": h, "low": l, "close": cl, "volume": v})

        # Drop the in-progress current period candle (prevents partial week/month leakage).
        if out:
            last_dt = _parse_day(out[-1].get("date"))

            def _last_business_day_of_month(d: date) -> date:
                # Weekend-adjusted last calendar day (ignores exchange holidays).
                if d.month == 12:
                    next_month = date(d.year + 1, 1, 1)
                else:
                    next_month = date(d.year, d.month + 1, 1)
                last = next_month - timedelta(days=1)
                while last.weekday() >= 5:
                    last -= timedelta(days=1)
                return last

            drop_last = False
            if isinstance(last_dt, date):
                if mode == "week":
                    # Consider a week complete only at Friday close.
                    drop_last = last_dt.weekday() != 4
                elif mode == "month":
                    drop_last = last_dt != _last_business_day_of_month(last_dt)

            if drop_last:
                out = out[:-1]

        if limit > 0:
            out = out[-limit:]
        return out


    def _risk_flags_basic(candles: List[Dict[str, Any]], *, key_level: float, bias: str, atr10: Optional[float], atr_ratio: Optional[float], vol_ratio: Optional[float]) -> List[str]:
        flags: List[str] = []
        if not candles:
            return flags

        today = candles[-1] or {}
        close = _safe_float(today.get("close"))
        open_ = _safe_float(today.get("open"))
        prev = candles[-2] if len(candles) >= 2 else None
        prev_close = _safe_float((prev or {}).get("close"))

        if prev_close > 0:
            gap = abs(open_ - prev_close) / prev_close * 100.0
            if gap >= 1.5:
                flags.append("GAP_DAY")

        if atr_ratio is not None and atr_ratio > 0.95:
            flags.append("HIGH_VOLATILITY")

        if vol_ratio is not None and vol_ratio > 1.8:
            flags.append("VOLUME_SPIKE")

        if key_level > 0 and close > 0:
            # Always tag "near level" to make the snapshot explainable.
            dist = abs(key_level - close) / key_level * 100.0
            if dist <= 1.5:
                if bias == "BULLISH":
                    flags.append("NEAR_RESISTANCE")
                elif bias == "BEARISH":
                    flags.append("NEAR_SUPPORT")

            if bias == "BULLISH" and close > key_level * 1.01:
                flags.append("EXTENDED_BREAKOUT")
            if bias == "BEARISH" and close < key_level * 0.99:
                flags.append("EXTENDED_BREAKDOWN")

        if atr10 is not None and atr10 <= 0:
            flags.append("BAD_ATR")

        return flags


    def _score_bullish(candles: List[Dict[str, Any]], market: Dict[str, Any]) -> Optional[Dict[str, Any]]:
        if len(candles) < 60:
            return None

        today = candles[-1]
        prev_window = candles[-21:-1]
        if len(prev_window) < 20:
            return None

        close = _safe_float(today.get("close"))
        open_ = _safe_float(today.get("open"))
        high = _safe_float(today.get("high"))
        low = _safe_float(today.get("low"))
        vol = _safe_float(today.get("volume"))

        # Stage-2 health check (mandatory): bullish candidates must be above SMA200.
        closes = [_safe_float(c.get("close")) for c in candles]
        sma200 = _sma(closes, 200)
        if not sma200 or sma200 <= 0:
            return None
        if close < sma200:
            return None

        resistance = max(_safe_float(c.get("high")) for c in prev_window)
        if resistance <= 0:
            return None

        # Reject: too late
        if close > resistance * 1.02:
            return None

        atr10 = _atr(candles, 10)
        if not atr10 or atr10 <= 0:
            return None

        # Heartbeat check: extremely low ATR% instruments behave like NAV products.
        if close > 0 and MIN_ATR_PCT > 0:
            atr_pct = (atr10 / close) * 100.0
            if atr_pct < float(MIN_ATR_PCT):
                return None

        today_range = max(0.0, high - low)
        if today_range > 1.8 * atr10:
            return None

        # Wick rejection near resistance
        body = abs(close - open_)
        upper_wick = max(0.0, high - max(open_, close))
        dist_to_res = ((resistance - close) / resistance) * 100.0
        if dist_to_res <= 1.5 and body > 0 and upper_wick > body * 1.5:
            return None

        # Hard scoring
        score = 0

        # 1) Proximity sweet spot (30)
        prox_pts = 0
        if close > resistance:
            prox_pts = 15
        else:
            if 0.0 <= dist_to_res <= 1.5:
                prox_pts = 30
            elif 1.5 < dist_to_res <= 3.0:
                prox_pts = 10
            else:
                return None
        score += prox_pts

        # 2) VCP via ATR ratio (25)
        atr_ratio = today_range / atr10
        vcp_pts = 0
        if atr_ratio < 0.70:
            vcp_pts = 25
        elif atr_ratio < 0.85:
            vcp_pts = 15
        elif atr_ratio > 1.0:
            vcp_pts = 0
        else:
            vcp_pts = 5
        score += vcp_pts

        # 3) Volume dry-up (20)
        vols = [_safe_float(c.get("volume")) for c in candles[-21:-1] if _safe_float(c.get("volume")) > 0]
        avg20 = (sum(vols) / len(vols)) if vols else 0.0
        vol_pts = 0
        vol_ratio = (vol / avg20) if avg20 > 0 else None
        if vol_ratio is None:
            vol_pts = 0
        else:
            if vol_ratio < 0.8:
                vol_pts = 20
            elif vol_ratio <= 1.2:
                vol_pts = 10
            elif vol_ratio > 1.5:
                # penalty if volume spike without real move
                move = abs(close - open_) / close * 100.0 if close > 0 else 0.0
                vol_pts = -10 if move < 0.4 else 0
            else:
                vol_pts = 5
        score += vol_pts

        # 4) Relative strength vs index (15)
        rs_pts = 0
        stock_5d = None
        if len(candles) >= 6:
            stock_5d = _return_pct(_safe_float(candles[-1].get("close")), _safe_float(candles[-6].get("close")))
        nifty_5d = market.get("nifty_5d_return") if isinstance(market, dict) else None

        rs_diff = None
        if isinstance(stock_5d, (int, float)) and isinstance(nifty_5d, (int, float)):
            rs_diff = stock_5d - nifty_5d
            if nifty_5d < 0 and stock_5d > 0:
                rs_pts = 15
            elif rs_diff > 0:
                rs_pts = 10
            else:
                rs_pts = 0
        score += rs_pts

        # 5) SMA20 magnetism (10)
        sma20 = _sma(closes, 20)
        if not sma20 or sma20 <= 0:
            return None

        sma_dist = abs(close - sma20) / sma20 * 100.0
        sma_pts = 0
        if sma_dist < 1.0:
            sma_pts = 10
        elif sma_dist < 2.0:
            sma_pts = 5
        elif sma_dist > 3.0:
            return None
        score += sma_pts

        score = max(0, min(100, int(round(score))))

        return {
            "bias": "BULLISH",
            "score": score,
            "key_level": resistance,
            "distance_to_level_pct": round(dist_to_res, 2),
            "atr10": round(atr10, 4),
            "atr_ratio": round(atr_ratio, 3),
            "vol_ratio": round(vol_ratio, 3) if isinstance(vol_ratio, (int, float)) else None,
            "relative_strength_5d": round(rs_diff, 3) if isinstance(rs_diff, (int, float)) else None,
            "sma20_dist_pct": round(sma_dist, 2),
            "market_context": market.get("tag") if isinstance(market, dict) else None,
        }


    def _score_bearish(candles: List[Dict[str, Any]], market: Dict[str, Any]) -> Optional[Dict[str, Any]]:
        if len(candles) < 60:
            return None

        today = candles[-1]
        prev_window = candles[-21:-1]
        if len(prev_window) < 20:
            return None

        close = _safe_float(today.get("close"))
        open_ = _safe_float(today.get("open"))
        high = _safe_float(today.get("high"))
        low = _safe_float(today.get("low"))
        vol = _safe_float(today.get("volume"))

        support = min(_safe_float(c.get("low")) for c in prev_window)
        if support <= 0:
            return None

        # Reject: too late
        if close < support * 0.98:
            return None

        atr10 = _atr(candles, 10)
        if not atr10 or atr10 <= 0:
            return None

        # Heartbeat check: extremely low ATR% instruments behave like NAV products.
        if close > 0 and MIN_ATR_PCT > 0:
            atr_pct = (atr10 / close) * 100.0
            if atr_pct < float(MIN_ATR_PCT):
                return None

        today_range = max(0.0, high - low)
        if today_range > 1.8 * atr10:
            return None

        # SMA20 rule: bearish should be below SMA20
        closes = [_safe_float(c.get("close")) for c in candles]
        sma20 = _sma(closes, 20)
        if not sma20 or sma20 <= 0:
            return None
        if close >= sma20:
            return None

        # Wick rejection near support
        body = abs(close - open_)
        lower_wick = max(0.0, min(open_, close) - low)
        dist_to_sup = ((close - support) / support) * 100.0
        if dist_to_sup <= 1.5 and body > 0 and lower_wick > body * 1.5:
            return None

        score = 0

        # 1) Proximity sweet spot (30)
        prox_pts = 0
        if close < support:
            prox_pts = 15
        else:
            if 0.0 <= dist_to_sup <= 1.5:
                prox_pts = 30
            elif 1.5 < dist_to_sup <= 3.0:
                prox_pts = 10
            else:
                return None
        score += prox_pts

        # 2) VCP via ATR ratio (25)
        atr_ratio = today_range / atr10
        vcp_pts = 0
        if atr_ratio < 0.70:
            vcp_pts = 25
        elif atr_ratio < 0.85:
            vcp_pts = 15
        elif atr_ratio > 1.0:
            vcp_pts = 0
        else:
            vcp_pts = 5
        score += vcp_pts

        # 3) Volume dry-up (20)
        vols = [_safe_float(c.get("volume")) for c in candles[-21:-1] if _safe_float(c.get("volume")) > 0]
        avg20 = (sum(vols) / len(vols)) if vols else 0.0
        vol_pts = 0
        vol_ratio = (vol / avg20) if avg20 > 0 else None
        if vol_ratio is None:
            vol_pts = 0
        else:
            if vol_ratio < 0.8:
                vol_pts = 20
            elif vol_ratio <= 1.2:
                vol_pts = 10
            elif vol_ratio > 1.5:
                move = abs(close - open_) / close * 100.0 if close > 0 else 0.0
                vol_pts = -10 if move < 0.4 else 0
            else:
                vol_pts = 5
        score += vol_pts

        # 4) Relative weakness vs index (15)
        rs_pts = 0
        stock_5d = None
        if len(candles) >= 6:
            stock_5d = _return_pct(_safe_float(candles[-1].get("close")), _safe_float(candles[-6].get("close")))
        nifty_5d = market.get("nifty_5d_return") if isinstance(market, dict) else None

        rs_diff = None
        if isinstance(stock_5d, (int, float)) and isinstance(nifty_5d, (int, float)):
            rs_diff = stock_5d - nifty_5d
            if nifty_5d > 0 and stock_5d < 0:
                rs_pts = 15
            elif rs_diff < 0:
                rs_pts = 10
            else:
                rs_pts = 0
        score += rs_pts

        # 5) SMA20 magnetism (10)
        sma_dist = abs(close - sma20) / sma20 * 100.0
        sma_pts = 0
        if sma_dist < 1.0:
            sma_pts = 10
        elif sma_dist < 2.0:
            sma_pts = 5
        elif sma_dist > 3.0:
            return None
        score += sma_pts

        # Bonus: below SMA200
        sma200 = _sma(closes, 200)
        if sma200 and sma200 > 0 and close < sma200:
            score += 5

        score = max(0, min(100, int(round(score))))

        return {
            "bias": "BEARISH",
            "score": score,
            "key_level": support,
            "distance_to_level_pct": round(dist_to_sup, 2),
            "atr10": round(atr10, 4),
            "atr_ratio": round(atr_ratio, 3),
            "vol_ratio": round(vol_ratio, 3) if isinstance(vol_ratio, (int, float)) else None,
            "relative_strength_5d": round(rs_diff, 3) if isinstance(rs_diff, (int, float)) else None,
            "sma20_dist_pct": round(sma_dist, 2),
            "market_context": market.get("tag") if isinstance(market, dict) else None,
        }


    def _compact_candidate_payload(*, symbol: str, bias: str, algo: Dict[str, Any], daily: List[Dict[str, Any]]) -> Dict[str, Any]:
        """Create compact payload for GPT.

        IMPORTANT: This payload is intentionally FEATURES-ONLY (no raw OHLC arrays)
        to reduce tokens and prevent the model from "reading charts".

        If you ever want to re-introduce OHLC summaries, see the optional block
        near the return dict.
        """

        INCLUDE_OHLC_IN_GPT_PAYLOAD = False  # flip to True if you want to send candle summaries again

        def _compact_candle(c: Dict[str, Any]) -> Dict[str, Any]:
            if not isinstance(c, dict):
                return {}
            d = c.get("date")
            if isinstance(d, datetime):
                ds = d.date().isoformat()
            else:
                ds = str(d)[:10] if d is not None else ""
            out = {
                "date": ds,
                "open": round(_safe_float(c.get("open")), 2),
                "high": round(_safe_float(c.get("high")), 2),
                "low": round(_safe_float(c.get("low")), 2),
                "close": round(_safe_float(c.get("close")), 2),
                "volume": int(round(_safe_float(c.get("volume")))),
            }
            return out

        closes = [_safe_float(c.get("close")) for c in (daily or [])]
        today = (daily[-1] if isinstance(daily, list) and daily else {}) or {}
        close = _safe_float(today.get("close"))
        open_ = _safe_float(today.get("open"))
        high = _safe_float(today.get("high"))
        low = _safe_float(today.get("low"))

        sma20 = _sma(closes, 20)
        sma50 = _sma(closes, 50)
        sma200 = _sma(closes, 200)

        def _dist_pct(x: Optional[float], y: Optional[float]) -> Optional[float]:
            if not isinstance(x, (int, float)) or not isinstance(y, (int, float)) or y == 0:
                return None
            return round(abs(float(x) - float(y)) / float(y) * 100.0, 2)

        sma20_dist = algo.get("sma20_dist_pct")
        sma50_dist = _dist_pct(close, sma50)

        atr10 = algo.get("atr10")
        atr_pct = None
        if isinstance(atr10, (int, float)) and close > 0:
            atr_pct = round(float(atr10) / float(close) * 100.0, 2)

        atr_ratio = algo.get("atr_ratio")
        vol_ratio = algo.get("vol_ratio")
        vol_state = "UNKNOWN"
        if isinstance(atr_ratio, (int, float)):
            if float(atr_ratio) < 0.80:
                vol_state = "CONTRACTING"
            elif float(atr_ratio) > 1.05:
                vol_state = "EXPANDING"
            else:
                vol_state = "NEUTRAL"

        # Volume trend from last 20 sessions
        vols20 = [_safe_float((c or {}).get("volume")) for c in (daily[-20:] if len(daily) >= 20 else daily)]
        v5 = sum(vols20[-5:]) / 5.0 if len(vols20) >= 5 else None
        v15 = sum(vols20[:-5]) / float(len(vols20[:-5])) if len(vols20) >= 15 else None
        vol_trend = "UNKNOWN"
        if isinstance(v5, (int, float)) and isinstance(v15, (int, float)) and v15 > 0:
            r = float(v5) / float(v15)
            if r < 0.85:
                vol_trend = "DRYING_UP"
            elif r > 1.15:
                vol_trend = "RISING"
            else:
                vol_trend = "FLAT"

        breakout_volume = "UNKNOWN"
        if isinstance(vol_ratio, (int, float)):
            breakout_volume = "CONFIRMED" if float(vol_ratio) >= 1.2 else "NOT_CONFIRMED"

        body = abs(close - open_)
        upper_wick = max(0.0, high - max(open_, close))
        lower_wick = max(0.0, min(open_, close) - low)
        no_rejection_wicks = None
        if body > 0:
            if (bias or "").strip().upper() == "BULLISH":
                no_rejection_wicks = upper_wick <= body * 1.25
            elif (bias or "").strip().upper() == "BEARISH":
                no_rejection_wicks = lower_wick <= body * 1.25

        dist_to_level = algo.get("distance_to_level_pct")
        near_key_level = None
        if isinstance(dist_to_level, (int, float)):
            near_key_level = float(dist_to_level) <= 1.5

        daily_dir = "UNKNOWN"
        if sma20 and sma50 and close > 0:
            if close > sma20 > sma50:
                daily_dir = "UP"
            elif close < sma20 < sma50:
                daily_dir = "DOWN"
            else:
                daily_dir = "BASE"

        # Lightweight higher-timeframe context via aggregated weekly/monthly closes.
        week = _aggregate_ohlcv(daily, mode="week", limit=60)
        month = _aggregate_ohlcv(daily, mode="month", limit=60)
        week_closes = [_safe_float(c.get("close")) for c in (week or []) if isinstance(c, dict)]
        month_closes = [_safe_float(c.get("close")) for c in (month or []) if isinstance(c, dict)]

        def _tf_dir(values: List[float]) -> str:
            if len(values) < 10:
                return "UNKNOWN"
            s10 = _sma(values, 10)
            s20 = _sma(values, 20)
            last = values[-1]
            if s10 and s20 and last > 0:
                if last > s10 > s20:
                    return "UP"
                if last < s10 < s20:
                    return "DOWN"
                return "BASE"
            return "UNKNOWN"

        weekly_dir = _tf_dir(week_closes)
        monthly_ctx = _tf_dir(month_closes)
        monthly_tag = monthly_ctx
        # If monthly is near a recent range extreme, call it RESISTANCE/SUPPORT.
        if len(month) >= 7 and isinstance(month[-1], dict):
            last_m_close = _safe_float((month[-1] or {}).get("close"))
            prev = [c for c in month[:-1] if isinstance(c, dict)]
            prev_high = max((_safe_float(c.get("high")) for c in prev), default=0.0)
            prev_low = min((_safe_float(c.get("low")) for c in prev), default=0.0)
            if prev_high > 0 and last_m_close >= prev_high * 0.99:
                monthly_tag = "RESISTANCE"
            elif prev_low > 0 and last_m_close <= prev_low * 1.01:
                monthly_tag = "SUPPORT"

        structure: List[str] = []
        if isinstance(atr_ratio, (int, float)) and float(atr_ratio) < 0.75:
            structure.append("TIGHT_RANGE")
        if no_rejection_wicks is True:
            structure.append("NO_REJECTION_WICKS")

        flags: List[str] = []
        if near_key_level is True:
            flags.append("NEAR_KEY_LEVEL")
        if isinstance(sma200, (int, float)) and sma200 > 0 and close > 0:
            if (bias or "").strip().upper() == "BULLISH" and close < float(sma200):
                flags.append("BELOW_SMA200")
            if (bias or "").strip().upper() == "BEARISH" and close > float(sma200):
                flags.append("ABOVE_SMA200")

        out: Dict[str, Any] = {
            "symbol": symbol,
            "bias": bias,
            "algo_score": int(algo.get("score") or 0),
            "market": {
                "regime": (algo.get("market_context") or "UNKNOWN"),
                "index_5d": algo.get("relative_strength_5d"),
            },
            "mtf": {"daily": daily_dir, "weekly": weekly_dir, "monthly": monthly_tag},
            "location": {
                "key_level": float(algo.get("key_level") or 0.0),
                "distance_to_level_pct": dist_to_level,
                "near_key_level": near_key_level,
                "sma20_dist_pct": sma20_dist,
                "sma50_dist_pct": sma50_dist,
            },
            "volatility": {"atr_pct": atr_pct, "state": vol_state},
            "volume": {"trend_20d": vol_trend, "breakout_volume": breakout_volume, "vol_ratio": vol_ratio},
            "structure": structure,
            "flags": flags,
            "risk_flags": list(algo.get("risk_flags") or []),
        }

        # Optional OHLC summaries (kept for future use; disabled by default)
        # If you enable this, GPT will get some limited OHLC context (higher token cost).
        if INCLUDE_OHLC_IN_GPT_PAYLOAD:
            last_daily = daily[-10:] if len(daily) >= 10 else daily
            week_compact = [_compact_candle(c) for c in (week or []) if isinstance(c, dict)]
            month_compact = [_compact_candle(c) for c in (month or []) if isinstance(c, dict)]
            daily_compact = [_compact_candle(c) for c in (last_daily or []) if isinstance(c, dict)]
            out["daily_last10"] = daily_compact
            out["weekly_last12"] = week_compact[-12:]
            out["monthly_last12"] = month_compact[-12:]

        return out


    def build_early_movers_snapshot(*, db, zerodha_client: ZerodhaClient, ist_date: str) -> Dict[str, Any]:
        """Compute and persist the early movers snapshot for the given IST date."""
        t_all = datetime.utcnow()
        logger.info("[EarlyMovers] snapshot start | date=%s", ist_date)
        market = _market_context(db, zerodha_client)

        # Universe: equities only (consistent with your history refresh)
        stocks = list(
            db["stocks"].find(
                {
                    "is_active": {"$ne": False},
                    "exchange": "NSE",
                    "instrument_token": {"$ne": None},
                    "stock_id": {"$ne": None},
                    "instrument_type": "EQ",
                },
                {"_id": 0, "symbol": 1, "stock_id": 1, "name": 1},
            )
        )

        logger.info("[EarlyMovers] universe loaded | stocks=%d", len(stocks))

        stock_ids: List[str] = []
        symbols_by_id: Dict[str, str] = {}
        ids_by_symbol: Dict[str, str] = {}
        names_by_symbol: Dict[str, str] = {}
        for s in stocks:
            sid = str(s.get("stock_id") or "").strip()
            sym = (s.get("symbol") or "").strip().upper()
            if not sid or not sym:
                continue
            stock_ids.append(sid)
            symbols_by_id[sid] = sym
            ids_by_symbol[sym] = sid
            nm = (s.get("name") or "").strip()
            if nm:
                names_by_symbol[sym] = nm

        history_by_id: Dict[str, List[Dict[str, Any]]] = {}
        if stock_ids:
            docs = list(db["stock_history"].find({"stock_id": {"$in": stock_ids}}, {"_id": 0, "stock_id": 1, "day": 1}))
            for d in docs:
                sid = str(d.get("stock_id") or "").strip()
                day = d.get("day")
                if not sid or not isinstance(day, list):
                    continue
                history_by_id[sid] = day

        logger.info("[EarlyMovers] stock_history loaded | ids=%d docs=%d", len(stock_ids), len(history_by_id))

        bullish_algo: List[Dict[str, Any]] = []
        bearish_algo: List[Dict[str, Any]] = []

        considered = 0
        passed_hard_filters = 0
        filtered_by_candles = 0
        filtered_by_price = 0
        filtered_by_vol = 0
        filtered_by_vol_days = 0
        filtered_by_dv = 0
        filtered_by_exclude = 0

        t_loop = datetime.utcnow()
        for sid, sym in symbols_by_id.items():
            considered += 1

            if _is_excluded_instrument(symbol=sym, name=names_by_symbol.get(sym)):
                filtered_by_exclude += 1
                continue

            candles = history_by_id.get(sid)
            if not candles or len(candles) < MIN_DAILY_CANDLES:
                filtered_by_candles += 1
                continue

            # Hard filters
            close = _safe_float((candles[-1] or {}).get("close"))
            if close < MIN_PRICE or close > MAX_PRICE:
                filtered_by_price += 1
                continue

            last20 = candles[-20:]
            vols = [_safe_float((c or {}).get("volume")) for c in last20]
            pos_vols = [v for v in vols if v > 0]
            vol_days = len(pos_vols)
            avg20 = (sum(pos_vols) / len(pos_vols)) if pos_vols else 0.0

            if MIN_VOL_DAYS_20D > 0 and vol_days < MIN_VOL_DAYS_20D:
                filtered_by_vol_days += 1
                continue

            if avg20 < MIN_AVG_VOL_20D:
                filtered_by_vol += 1
                continue

            if MIN_AVG_DV_20D > 0:
                dvs: List[float] = []
                for c in last20:
                    cl = _safe_float((c or {}).get("close"))
                    v = _safe_float((c or {}).get("volume"))
                    if cl > 0 and v > 0:
                        dvs.append(cl * v)
                avg_dv20 = (sum(dvs) / len(dvs)) if dvs else 0.0
                if avg_dv20 < MIN_AVG_DV_20D:
                    filtered_by_dv += 1
                    continue

            passed_hard_filters += 1

            b = _score_bullish(candles, market)
            if b and int(b.get("score") or 0) >= MIN_SCORE:
                flags = _risk_flags_basic(
                    candles,
                    key_level=float(b.get("key_level") or 0.0),
                    bias="BULLISH",
                    atr10=(float(b.get("atr10")) if b.get("atr10") is not None else None),
                    atr_ratio=(float(b.get("atr_ratio")) if b.get("atr_ratio") is not None else None),
                    vol_ratio=(float(b.get("vol_ratio")) if b.get("vol_ratio") is not None else None),
                )
                bullish_algo.append({"symbol": sym, **b, "risk_flags": flags})

            s2 = _score_bearish(candles, market)
            if s2 and int(s2.get("score") or 0) >= MIN_SCORE:
                flags = _risk_flags_basic(
                    candles,
                    key_level=float(s2.get("key_level") or 0.0),
                    bias="BEARISH",
                    atr10=(float(s2.get("atr10")) if s2.get("atr10") is not None else None),
                    atr_ratio=(float(s2.get("atr_ratio")) if s2.get("atr_ratio") is not None else None),
                    vol_ratio=(float(s2.get("vol_ratio")) if s2.get("vol_ratio") is not None else None),
                )
                bearish_algo.append({"symbol": sym, **s2, "risk_flags": flags})

            if considered % 500 == 0:
                ms = int((datetime.utcnow() - t_loop).total_seconds() * 1000)
                logger.info(
                    "[EarlyMovers] scan progress | considered=%d passed_hard=%d bull=%d bear=%d ms=%d",
                    considered,
                    passed_hard_filters,
                    len(bullish_algo),
                    len(bearish_algo),
                    ms,
                )

        bullish_algo.sort(key=lambda x: (int(x.get("score") or 0), x.get("symbol") or ""), reverse=True)
        bearish_algo.sort(key=lambda x: (int(x.get("score") or 0), x.get("symbol") or ""), reverse=True)

        logger.info(
            "[EarlyMovers] hard filters summary | considered=%d passed=%d excluded=%d candles=%d price=%d vol=%d vol_days=%d dv=%d",
            considered,
            passed_hard_filters,
            filtered_by_exclude,
            filtered_by_candles,
            filtered_by_price,
            filtered_by_vol,
            filtered_by_vol_days,
            filtered_by_dv,
        )

        # Candidate pools for GPT (top 50 per side)
        bull_candidates = bullish_algo[: max(0, CANDIDATE_LIMIT_PER_SIDE)]
        bear_candidates = bearish_algo[: max(0, CANDIDATE_LIMIT_PER_SIDE)]

        logger.info(
            "[EarlyMovers] candidates | bull=%d bear=%d gpt_mode=%s gpt_limit_per_side=%d",
            len(bull_candidates),
            len(bear_candidates),
            (gpt_meta().get("mode") if isinstance(gpt_meta(), dict) else None),
            int(gpt_limit_per_side()),
        )

        # Build compact payloads for GPT (top N per side)
        gpt_limit = gpt_limit_per_side()
        gpt_payload: List[Dict[str, Any]] = []
        if gpt_limit > 0:
            for it in bull_candidates[:gpt_limit]:
                sym = it.get("symbol")
                if not sym:
                    continue
                # Find daily history (already in history_by_id)
                sid = ids_by_symbol.get(sym)
                d = history_by_id.get(sid or "") if sid else None
                if isinstance(d, list) and d:
                    gpt_payload.append(_compact_candidate_payload(symbol=sym, bias="BULLISH", algo=it, daily=d))

            for it in bear_candidates[:gpt_limit]:
                sym = it.get("symbol")
                if not sym:
                    continue
                sid = ids_by_symbol.get(sym)
                d = history_by_id.get(sid or "") if sid else None
                if isinstance(d, list) and d:
                    gpt_payload.append(_compact_candidate_payload(symbol=sym, bias="BEARISH", algo=it, daily=d))

        logger.info("[EarlyMovers] gpt call start | payload=%d", len(gpt_payload))
        t_gpt = datetime.utcnow()
        gpt_items, gpt_errors = gpt_evaluate_candidates(market=market, candidates=gpt_payload)
        logger.info(
            "[EarlyMovers] gpt call done | results=%d errors=%d ms=%d",
            len(gpt_items),
            (len(gpt_errors) if isinstance(gpt_errors, list) else 0),
            int((datetime.utcnow() - t_gpt).total_seconds() * 1000),
        )
        gpt_by_symbol: Dict[Tuple[str, str], Dict[str, Any]] = {}
        for gi in gpt_items:
            k = (gi.get("symbol"), gi.get("bias"))
            if k[0] and k[1]:
                gpt_by_symbol[k] = gi

        # If GPT returned fewer items than requested, keep snapshots explainable:
        # attach an explicit placeholder per missing symbol (so UI shows notes).
        missing = 0
        for p in gpt_payload:
            if not isinstance(p, dict):
                continue
            sym = (p.get("symbol") or "").strip().upper()
            b = (p.get("bias") or "").strip().upper()
            if not sym or b not in ("BULLISH", "BEARISH"):
                continue
            k = (sym, b)
            if k in gpt_by_symbol:
                continue
            gpt_by_symbol[k] = {
                "symbol": sym,
                "bias": b,
                "confidence": None,  # do not blend when missing
                "bias_ok": False,
                "setup_quality": "BAD",
                "risk_flags": ["NO_GPT_OUTPUT"],
                "notes": "GPT did not return an analysis item for this symbol in the batch response.",
            }
            missing += 1
        if missing and isinstance(gpt_errors, list):
            gpt_errors.append(f"missing_items_filled={missing}")

        # Blend final score + attach GPT notes, then pick top N
        def _blend_int(algo_score: int, gpt_conf: Optional[float]) -> int:
            if gpt_conf is None:
                return int(algo_score)
            w = max(0.0, min(1.0, float(GPT_BLEND_WEIGHT)))
            return int(round(algo_score * (1.0 - w) + float(gpt_conf) * w))

        def _blend_float(algo_score: int, gpt_conf: Optional[float]) -> float:
            if gpt_conf is None:
                return float(algo_score)
            w = max(0.0, min(1.0, float(GPT_BLEND_WEIGHT)))
            return float(algo_score) * (1.0 - w) + float(gpt_conf) * w

        def _risk_penalty(flags: List[str]) -> int:
            pts = 0
            for f in flags or []:
                pts += int(RISK_PENALTY_POINTS.get(str(f).strip().upper(), 0))
            return max(0, int(pts))

        bullish_final: List[Dict[str, Any]] = []
        for it in bull_candidates:
            sym = it.get("symbol")
            algo_score = int(it.get("score") or 0)
            gi = gpt_by_symbol.get((sym, "BULLISH"))
            conf = float(gi.get("confidence")) if isinstance(gi, dict) and gi.get("confidence") is not None else None

            if isinstance(gi, dict) and float(GPT_MIN_CONFIDENCE or 0) > 0:
                if conf is None or float(conf) < float(GPT_MIN_CONFIDENCE):
                    continue
            risk = list(it.get("risk_flags") or [])
            if isinstance(gi, dict):
                for rf in gi.get("risk_flags") or []:
                    if rf not in risk:
                        risk.append(rf)

            penalty = _risk_penalty(risk)
            blended_int = _blend_int(algo_score, conf)
            blended_float = _blend_float(algo_score, conf)
            bullish_final.append(
                {
                    **it,
                    "algo_score": algo_score,
                    "gpt": gi if isinstance(gi, dict) else None,
                    "final_score": max(0, int(blended_int) - int(penalty)),
                    "final_score_float": round(max(0.0, float(blended_float) - float(penalty)), 2),
                    "risk_flags": risk,
                    "risk_penalty": int(penalty),
                }
            )

        bearish_final: List[Dict[str, Any]] = []
        for it in bear_candidates:
            sym = it.get("symbol")
            algo_score = int(it.get("score") or 0)
            gi = gpt_by_symbol.get((sym, "BEARISH"))
            conf = float(gi.get("confidence")) if isinstance(gi, dict) and gi.get("confidence") is not None else None

            if isinstance(gi, dict) and float(GPT_MIN_CONFIDENCE or 0) > 0:
                if conf is None or float(conf) < float(GPT_MIN_CONFIDENCE):
                    continue
            risk = list(it.get("risk_flags") or [])
            if isinstance(gi, dict):
                for rf in gi.get("risk_flags") or []:
                    if rf not in risk:
                        risk.append(rf)

            penalty = _risk_penalty(risk)
            blended_int = _blend_int(algo_score, conf)
            blended_float = _blend_float(algo_score, conf)
            bearish_final.append(
                {
                    **it,
                    "algo_score": algo_score,
                    "gpt": gi if isinstance(gi, dict) else None,
                    "final_score": max(0, int(blended_int) - int(penalty)),
                    "final_score_float": round(max(0.0, float(blended_float) - float(penalty)), 2),
                    "risk_flags": risk,
                    "risk_penalty": int(penalty),
                }
            )

        bullish_final.sort(key=lambda x: (int(x.get("final_score") or 0), x.get("symbol") or ""), reverse=True)
        bearish_final.sort(key=lambda x: (int(x.get("final_score") or 0), x.get("symbol") or ""), reverse=True)

        def _strategy_for(it: Dict[str, Any]) -> str:
            b = (it.get("bias") or "").strip().upper()
            if b == "BULLISH":
                return "BREAKOUT"
            if b == "BEARISH":
                return "BREAKDOWN"
            return "UNKNOWN"

        # Add convenience fields expected by frontend; does not change scoring.
        for it in bullish_final:
            it.setdefault("strategy", _strategy_for(it))
            it.setdefault("overall_score", int(it.get("final_score") or 0))
            it.setdefault("overall_score_float", float(it.get("final_score_float")) if it.get("final_score_float") is not None else float(it.get("overall_score") or 0))
            it.setdefault("early_mover_score", int(it.get("algo_score") or 0))
        for it in bearish_final:
            it.setdefault("strategy", _strategy_for(it))
            it.setdefault("overall_score", int(it.get("final_score") or 0))
            it.setdefault("overall_score_float", float(it.get("final_score_float")) if it.get("final_score_float") is not None else float(it.get("overall_score") or 0))
            it.setdefault("early_mover_score", int(it.get("algo_score") or 0))

        bullish_top = bullish_final[: max(1, TOP_N)]
        bearish_top = bearish_final[: max(1, TOP_N)]

        snap: Dict[str, Any] = {
            "date": ist_date,
            "generated_at": datetime.utcnow(),
            "market": market,
            "universe": {
                "considered": considered,
                "passed_hard_filters": passed_hard_filters,
                "min_daily_candles": MIN_DAILY_CANDLES,
                "min_price": MIN_PRICE,
                "max_price": MAX_PRICE,
                "min_avg_vol_20d": MIN_AVG_VOL_20D,
                "min_avg_dv_20d": MIN_AVG_DV_20D,
                "min_vol_days_20d": MIN_VOL_DAYS_20D,
                "min_atr_pct": MIN_ATR_PCT,
                "excluded_keywords": _EXCLUDE_KEYWORDS,
                "excluded_suffixes": _EXCLUDE_SUFFIXES,
                "gpt_min_confidence": GPT_MIN_CONFIDENCE,
            },
            "algo": {
                "candidate_limit_per_side": CANDIDATE_LIMIT_PER_SIDE,
                "bullish_candidates": len(bull_candidates),
                "bearish_candidates": len(bear_candidates),
            },
            "gpt": {
                **gpt_meta(),
                "payload_count": len(gpt_payload),
                "result_count": len(gpt_items),
                "errors": gpt_errors,
            },
            "top": {
                "bullish": bullish_top,
                "bearish": bearish_top,
            },
        }

        db[EARLY_MOVERS_SNAPSHOT_COLLECTION].update_one(
            {"date": ist_date},
            {"$set": snap, "$setOnInsert": {"created_at": datetime.utcnow()}},
            upsert=True,
        )

        logger.info(
            "[EarlyMovers] snapshot saved | date=%s top_bull=%d top_bear=%d total_ms=%d",
            ist_date,
            len(bullish_top),
            len(bearish_top),
            int((datetime.utcnow() - t_all).total_seconds() * 1000),
        )

        return {
            "ok": True,
            "date": ist_date,
            "market": market,
            "bullish_candidates": len(bull_candidates),
            "bearish_candidates": len(bear_candidates),
            "bullish_top": len(snap.get("top", {}).get("bullish") or []),
            "bearish_top": len(snap.get("top", {}).get("bearish") or []),
            "gpt_payload": len(gpt_payload),
            "gpt_results": len(gpt_items),
            "gpt_errors": len(gpt_errors),
        }
