import json
from datetime import datetime
from aiokafka import AIOKafkaConsumer
from app.db import database

async def consume_tracking_data():
    consumer = AIOKafkaConsumer(
        "vehicle_tracking",
        bootstrap_servers='localhost:9092',
        group_id="tracking-group"
    )
    await consumer.start()

    # Use the generator properly
    mongo_gen = database.get_mongo_db()
    mongo = next(mongo_gen)  # ← this gives you the `db`
    collection = mongo["tracking_logs"]

    try:
        async for msg in consumer:
            data = json.loads(msg.value)
            data["received_at"] = datetime.utcnow()
            collection.insert_one(data)

        for ws in connected_clients.copy():
            try:
                await ws.send_json(data)
            except Exception:
                connected_clients.remove(ws)

        print("Broadcasting to", len(connected_clients), "clients")

    finally:
        await consumer.stop()
        try:
            next(mongo_gen)  # Trigger `finally` block to close the DB
        except StopIteration:
            pass
