import asyncio
import logging
import os
from datetime import datetime, timedelta, timezone
from typing import Optional

from pymongo.collection import Collection

from app.db.database import get_mongo_db
from app.v1.background.global_intraday import _get_global_zerodha_client
from app.v1.services.early_movers import build_early_movers_snapshot

logger = logging.getLogger(__name__)

EARLY_MOVERS_VERBOSE_LOGS = (os.getenv("EARLY_MOVERS_VERBOSE_LOGS", "0").strip() == "1")

# Schedule controls (defaults are intentionally OFF so the job can run any time).
EARLY_MOVERS_START_HHMM = os.getenv("EARLY_MOVERS_START_HHMM", "20:00")
EARLY_MOVERS_END_HHMM = os.getenv("EARLY_MOVERS_END_HHMM", "20:30")
EARLY_MOVERS_ENFORCE_WINDOW = (os.getenv("EARLY_MOVERS_ENFORCE_WINDOW", "0").strip() == "1")
EARLY_MOVERS_WEEKDAYS_ONLY = (os.getenv("EARLY_MOVERS_WEEKDAYS_ONLY", "0").strip() == "1")

# Dependency: require that stock history EOD refresh happened today (IST)
# Default OFF so this job is not blocked by timing.
REQUIRE_STOCK_HISTORY_EOD_TODAY = (os.getenv("EARLY_MOVERS_REQUIRE_STOCK_HISTORY_EOD_TODAY", "0").strip() == "1")
STOCK_HISTORY_LAST_EOD_META_KEY = os.getenv("STOCK_HISTORY_LAST_EOD_META_KEY", "stock_history_refresh_last_eod_run")

EARLY_MOVERS_LAST_RUN_META_KEY = os.getenv("EARLY_MOVERS_LAST_RUN_META_KEY", "early_movers_last_run_ist_date")

# Concurrency guard: prevents duplicate runs when multiple app processes are alive
# (e.g. uvicorn --reload spawns reloader+worker, or multiple workers).
EARLY_MOVERS_LOCK_META_KEY = os.getenv("EARLY_MOVERS_LOCK_META_KEY", "early_movers_snapshot_lock")
EARLY_MOVERS_LOCK_TTL_SECONDS = int(os.getenv("EARLY_MOVERS_LOCK_TTL_SECONDS", "1800"))
EARLY_MOVERS_ALLOW_RERUN_IF_SNAPSHOT_EXISTS = (os.getenv("EARLY_MOVERS_ALLOW_RERUN_IF_SNAPSHOT_EXISTS", "0").strip() == "1")

# Loop pacing
SLEEP_SECONDS_IDLE = int(os.getenv("EARLY_MOVERS_LOOP_SLEEP_SECONDS", "30"))
SLEEP_SECONDS_AFTER_RUN = int(os.getenv("EARLY_MOVERS_SLEEP_AFTER_RUN_SECONDS", "600"))

IST = timezone(timedelta(hours=5, minutes=30))


def _parse_hhmm(v: str) -> Optional[tuple[int, int]]:
    try:
        s = (v or "").strip()
        if not s:
            return None
        hh, mm = s.split(":")
        return int(hh), int(mm)
    except Exception:
        return None


def _ist_now() -> datetime:
    return datetime.now(tz=IST)


def _ist_date_str(dt: Optional[datetime] = None) -> str:
    d = dt or _ist_now()
    return d.strftime("%Y-%m-%d")


def _is_weekday_ist(dt: datetime) -> bool:
    # Trading days for NSE cash are Mon–Fri (IST).
    # Python weekday(): Mon=0 ... Sun=6.
    return dt.weekday() in (0, 1, 2, 3, 4)


def _latest_trading_day_key_ist(dt: datetime) -> str:
    """Return latest trading day key (weekends roll back to Friday)."""
    d = dt.date()
    if dt.weekday() == 5:  # Sat
        d = d - timedelta(days=1)
    elif dt.weekday() == 6:  # Sun
        d = d - timedelta(days=2)
    return d.isoformat()


def _next_trading_day_key(date_key: str) -> Optional[str]:
    """Return next trading day key (skips Sat/Sun)."""
    try:
        base = datetime.fromisoformat(str(date_key).strip()).date()
    except Exception:
        return None
    d = base + timedelta(days=1)
    while d.weekday() >= 5:
        d = d + timedelta(days=1)
    return d.isoformat()


def _in_window(now_ist: datetime) -> bool:
    st = _parse_hhmm(EARLY_MOVERS_START_HHMM)
    en = _parse_hhmm(EARLY_MOVERS_END_HHMM)
    if not st or not en:
        return False
    start = now_ist.replace(hour=st[0], minute=st[1], second=0, microsecond=0)
    end = now_ist.replace(hour=en[0], minute=en[1], second=0, microsecond=0)
    if end <= start:
        # treat as no window
        return False
    return start <= now_ist <= end


def _window_bounds(now_ist: datetime) -> Optional[tuple[datetime, datetime]]:
    st = _parse_hhmm(EARLY_MOVERS_START_HHMM)
    en = _parse_hhmm(EARLY_MOVERS_END_HHMM)
    if not st or not en:
        return None
    start = now_ist.replace(hour=st[0], minute=st[1], second=0, microsecond=0)
    end = now_ist.replace(hour=en[0], minute=en[1], second=0, microsecond=0)
    if end <= start:
        return None
    return start, end


def _seconds_until(now_ist: datetime, target_ist: datetime) -> int:
    try:
        sec = int((target_ist - now_ist).total_seconds())
    except Exception:
        sec = 0
    return max(1, sec)


def _next_window_start(now_ist: datetime) -> Optional[datetime]:
    bounds = _window_bounds(now_ist)
    if not bounds:
        return None
    start, end = bounds
    if now_ist <= start:
        return start
    # After today's start -> next day start
    next_day = (now_ist + timedelta(days=1)).replace(hour=start.hour, minute=start.minute, second=0, microsecond=0)
    return next_day


def _next_weekday_window_start(now_ist: datetime) -> Optional[datetime]:
    """Return next window start on a trading weekday (Mon–Fri)."""
    nxt = _next_window_start(now_ist)
    if not nxt:
        return None
    while nxt.weekday() >= 5:
        nxt = nxt + timedelta(days=1)
    return nxt


def _get_system_meta_value(db, key: str) -> Optional[str]:
    doc = db["system_meta"].find_one({"key": key})
    if not doc:
        return None
    v = doc.get("value")
    if v is None:
        return None
    return str(v)


def _set_system_meta_value(db, key: str, value: str) -> None:
    db["system_meta"].update_one(
        {"key": key},
        {"$set": {"key": key, "value": value, "updated_at": datetime.utcnow()}},
        upsert=True,
    )


def _snapshot_collection(db) -> Collection:
    return db[os.getenv("EARLY_MOVERS_SNAPSHOT_COLLECTION", "early_movers_snapshots")]


def _snapshot_exists(db, target_date: str) -> bool:
    doc = _snapshot_collection(db).find_one({"date": str(target_date)}, {"_id": 1, "date": 1})
    return bool(doc)


def _acquire_lock(db, *, owner: str) -> bool:
    """Acquire a best-effort distributed lock in system_meta.

    Returns True if acquired, False otherwise.
    """
    now = datetime.utcnow()
    expires = now + timedelta(seconds=max(30, int(EARLY_MOVERS_LOCK_TTL_SECONDS)))
    res = db["system_meta"].update_one(
        {
            "key": EARLY_MOVERS_LOCK_META_KEY,
            "$or": [
                {"expires_at": {"$lt": now}},
                {"expires_at": {"$exists": False}},
                {"owner": owner},
            ],
        },
        {
            "$set": {
                "key": EARLY_MOVERS_LOCK_META_KEY,
                "owner": owner,
                "expires_at": expires,
                "updated_at": now,
            }
        },
        upsert=True,
    )
    # If we modified an expired lock or inserted a new one, we own it.
    return bool(res.upserted_id) or (res.modified_count == 1)


def _release_lock(db, *, owner: str) -> None:
    try:
        db["system_meta"].update_one(
            {"key": EARLY_MOVERS_LOCK_META_KEY, "owner": owner},
            {"$set": {"expires_at": datetime.utcnow(), "updated_at": datetime.utcnow()}},
        )
    except Exception:
        logger.debug("Early movers: error releasing lock", exc_info=True)


def _already_ran_for_target(db, target_date: str) -> bool:
    v = _get_system_meta_value(db, EARLY_MOVERS_LAST_RUN_META_KEY)
    return v == target_date


def _make_zerodha_client(db):
    # Reuse the global Zerodha client wiring (consistent with other background jobs).
    z = _get_global_zerodha_client(db)
    if not z:
        raise RuntimeError("Global Zerodha client unavailable")
    return z


def early_movers_refresh_loop(*, app=None):
    async def _loop():
        msg = (
            "Early movers loop starting. enforce_window=%s window=%s-%s weekdays_only=%s require_history=%s"
            % (
                EARLY_MOVERS_ENFORCE_WINDOW,
                EARLY_MOVERS_START_HHMM,
                EARLY_MOVERS_END_HHMM,
                EARLY_MOVERS_WEEKDAYS_ONLY,
                REQUIRE_STOCK_HISTORY_EOD_TODAY,
            )
        )
        print(f"[EarlyMovers] {msg}")
        logger.info(
            "Early movers loop starting. enforce_window=%s window=%s-%s weekdays_only=%s require_history=%s",
            EARLY_MOVERS_ENFORCE_WINDOW,
            EARLY_MOVERS_START_HHMM,
            EARLY_MOVERS_END_HHMM,
            EARLY_MOVERS_WEEKDAYS_ONLY,
            REQUIRE_STOCK_HISTORY_EOD_TODAY,
        )

        while True:
            try:
                now_ist = _ist_now()
                ist_date = _ist_date_str(now_ist)

                # If we enforce the time window, do not poll every N seconds.
                # Sleep until the next window start / next day's window.
                if EARLY_MOVERS_ENFORCE_WINDOW:
                    bounds = _window_bounds(now_ist)
                    if not bounds:
                        await asyncio.sleep(SLEEP_SECONDS_IDLE)
                        continue
                    start, end = bounds

                    # Weekday-only shortcut: sleep until next weekday window.
                    if EARLY_MOVERS_WEEKDAYS_ONLY and not _is_weekday_ist(now_ist):
                        nxt = _next_weekday_window_start(now_ist)
                        if nxt:
                            await asyncio.sleep(_seconds_until(now_ist, nxt))
                            continue
                        await asyncio.sleep(SLEEP_SECONDS_IDLE)
                        continue

                    # Outside today's window: sleep until next appropriate start.
                    if now_ist < start:
                        await asyncio.sleep(_seconds_until(now_ist, start))
                        continue
                    if now_ist > end:
                        nxt = _next_window_start(now_ist)
                        if nxt:
                            await asyncio.sleep(_seconds_until(now_ist, nxt))
                            continue
                        await asyncio.sleep(SLEEP_SECONDS_IDLE)
                        continue

                if (not EARLY_MOVERS_ENFORCE_WINDOW) and EARLY_MOVERS_WEEKDAYS_ONLY and not _is_weekday_ist(now_ist):
                    if EARLY_MOVERS_VERBOSE_LOGS:
                        logger.info("Early movers skipped: not a trading weekday | now_ist=%s", now_ist.isoformat())
                    await asyncio.sleep(SLEEP_SECONDS_IDLE)
                    continue

                if (not EARLY_MOVERS_ENFORCE_WINDOW) and (EARLY_MOVERS_ENFORCE_WINDOW and not _in_window(now_ist)):
                    # defensive: should be unreachable due to earlier branch
                    await asyncio.sleep(SLEEP_SECONDS_IDLE)
                    continue

                db_gen = get_mongo_db()
                db = next(db_gen)
                result = None
                ran = False
                sleep_seconds = SLEEP_SECONDS_IDLE
                owner = f"pid={os.getpid()}"
                done_for_target = False
                try:
                    # Early movers is intended to be created from the latest available EOD history,
                    # and stored for the NEXT trading day (EOD previous day of the trade day).
                    last_eod_key = _get_system_meta_value(db, STOCK_HISTORY_LAST_EOD_META_KEY)
                    if not last_eod_key:
                        # This is the #1 reason the job appears "not running".
                        # Print at least once per loop tick so it's visible in uvicorn logs.
                        print(f"[EarlyMovers] Skipped: missing system_meta.{STOCK_HISTORY_LAST_EOD_META_KEY}")
                        if EARLY_MOVERS_VERBOSE_LOGS:
                            logger.info("Early movers skipped: stock history EOD meta missing (%s)", STOCK_HISTORY_LAST_EOD_META_KEY)
                        sleep_seconds = SLEEP_SECONDS_IDLE
                    else:
                        # Optional gate: require EOD marker to match latest trading day (weekends roll back to Fri).
                        if REQUIRE_STOCK_HISTORY_EOD_TODAY:
                            expected = _latest_trading_day_key_ist(now_ist)
                            if str(last_eod_key) != str(expected):
                                print(f"[EarlyMovers] Skipped: stale EOD history | last_eod={last_eod_key} expected={expected}")
                                if EARLY_MOVERS_VERBOSE_LOGS:
                                    logger.info(
                                        "Early movers skipped: stale EOD history | last_eod=%s expected=%s",
                                        last_eod_key,
                                        expected,
                                    )
                                sleep_seconds = SLEEP_SECONDS_IDLE
                                # Do not proceed.
                                last_eod_key = None

                        if last_eod_key:
                            target_date = _next_trading_day_key(str(last_eod_key))
                            if not target_date:
                                sleep_seconds = SLEEP_SECONDS_IDLE
                            elif _already_ran_for_target(db, target_date):
                                print(f"[EarlyMovers] Skipped: already ran | target_date={target_date}")
                                if EARLY_MOVERS_VERBOSE_LOGS:
                                    logger.info("Early movers skipped: already ran | target_date=%s", target_date)
                                done_for_target = True
                                sleep_seconds = SLEEP_SECONDS_AFTER_RUN
                            elif (not EARLY_MOVERS_ALLOW_RERUN_IF_SNAPSHOT_EXISTS) and _snapshot_exists(db, target_date):
                                # If a snapshot already exists for this target date, treat it as done.
                                _set_system_meta_value(db, EARLY_MOVERS_LAST_RUN_META_KEY, target_date)
                                print(f"[EarlyMovers] Skipped: snapshot exists | target_date={target_date}")
                                if EARLY_MOVERS_VERBOSE_LOGS:
                                    logger.info("Early movers skipped: snapshot already exists | target_date=%s", target_date)
                                done_for_target = True
                                sleep_seconds = SLEEP_SECONDS_AFTER_RUN
                            else:
                                if not _acquire_lock(db, owner=owner):
                                    print(f"[EarlyMovers] Skipped: lock busy | lock_key={EARLY_MOVERS_LOCK_META_KEY}")
                                    if EARLY_MOVERS_VERBOSE_LOGS:
                                        logger.info("Early movers skipped: lock busy (%s)", EARLY_MOVERS_LOCK_META_KEY)
                                    sleep_seconds = SLEEP_SECONDS_IDLE
                                else:
                                    try:
                                        # Re-check after acquiring lock (another process may have finished).
                                        if (not EARLY_MOVERS_ALLOW_RERUN_IF_SNAPSHOT_EXISTS) and _snapshot_exists(db, target_date):
                                            _set_system_meta_value(db, EARLY_MOVERS_LAST_RUN_META_KEY, target_date)
                                            print(f"[EarlyMovers] Skipped (post-lock): snapshot exists | target_date={target_date}")
                                            done_for_target = True
                                            sleep_seconds = SLEEP_SECONDS_AFTER_RUN
                                        else:
                                            zerodha_client = _make_zerodha_client(db)
                                            print(
                                                f"[EarlyMovers] Running snapshot | owner={owner} target_date={target_date} based_on_eod={last_eod_key}"
                                            )
                                            result = build_early_movers_snapshot(
                                                db=db,
                                                zerodha_client=zerodha_client,
                                                ist_date=target_date,
                                            )
                                            _set_system_meta_value(db, EARLY_MOVERS_LAST_RUN_META_KEY, target_date)
                                            ran = True
                                            done_for_target = True
                                            sleep_seconds = SLEEP_SECONDS_AFTER_RUN
                                    finally:
                                        _release_lock(db, owner=owner)
                finally:
                    try:
                        db_gen.close()
                    except Exception:
                        logger.debug("Early movers: error closing DB generator", exc_info=True)

                if ran:
                    print(f"[EarlyMovers] Snapshot done: {result}")
                    logger.info("Early movers snapshot done: %s", result)

                # If we enforce the schedule and we're done for this target date,
                # do not keep looping within the window. Sleep until the next day's window.
                if EARLY_MOVERS_ENFORCE_WINDOW and done_for_target:
                    nxt = _next_window_start(_ist_now())
                    if nxt:
                        await asyncio.sleep(_seconds_until(_ist_now(), nxt))
                        continue

                await asyncio.sleep(sleep_seconds)

            except Exception:
                logger.exception("Early movers loop error")
                await asyncio.sleep(SLEEP_SECONDS_IDLE)

    return _loop()
