import json
from datetime import datetime
from bson import json_util
from aiokafka import AIOKafkaConsumer  # ✅ this line is essential

from app.utils.connections import connected_clients


async def consume_tracking_data():
    consumer = AIOKafkaConsumer(
        "vehicle_tracking",
        bootstrap_servers='localhost:9092',
        group_id="tracking-group"
    )
    await consumer.start()
    try:
        async for msg in consumer:
            data = json.loads(msg.value.decode("utf-8"))
            print("📦 Received from Kafka:", data)
            print("Broadcasting to", len(connected_clients), "clients")

            for ws in connected_clients.copy():
                try:
                    await ws.send_text(json.dumps(data, default=json_util.default))
                except Exception as e:
                    connected_clients.remove(ws)
                    print(f"❌ Error sending to client: {e}")
    finally:
        await consumer.stop()
