from fastapi import FastAPI, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse, FileResponse from fastapi import HTTPException from pathlib import Path from scheduler.scheduler import start_scheduler, scheduler, get_jobs_info, my_job from scheduler.cache import cache from scheduler.jobs import update_daily_tasks, update_weather, refresh_meme, update_current_weather, update_daily_surprise, update_dressing_advice, update_morning_briefing_transcript, update_news, get_relevant_news_titles, update_quick_insight from webpush import WebPush, WebPushSubscription from cryptography.hazmat.primitives import serialization import json import uvicorn import requests import random import os import base64 from datetime import datetime, timezone from dateutil import parser from urllib.parse import unquote from pydantic import BaseModel from typing import Optional, Dict app = FastAPI() # Allow frontend requests (adjust origin for production) app.add_middleware( CORSMiddleware, allow_origins=["*"], # Or ["http://localhost:5173"] for Vite allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) TODOIST_API_TOKEN = "c2233236d19d56128c89ed6b0a9d10a9e7b287f1" ACCUWEATHER_API_KEY = "YHeMcr9Aa96Goer8CANIB2E6QIbr5Dp0" LOCATION_KEY = "251518" job_functions = { "daily_quick_insight": update_quick_insight, "select_relevant_news": get_relevant_news_titles, "top_news": update_news, "morning_briefing_transcript": update_morning_briefing_transcript, "daily_tasks": update_daily_tasks, "daily_weather": update_weather, "current_weather": update_current_weather, "daily_dressing_advice": update_dressing_advice, "daily_surprise": update_daily_surprise, "daily_meme": refresh_meme, "test_job": my_job, } wp = WebPush( public_key=Path("./public_key.pem"), private_key=Path("./private_key.pem"), subscriber="admin@mail.com", ) class ParcelAddRequest(BaseModel): nickname: str tracking_code: str postal_code: str async def get_parcel_cache() -> Dict[str, dict]: return cache.get("parcel_data") or {} async def save_parcel_cache(data: Dict[str, dict]): cache.set("parcel_data", data) @app.delete("/api/parcels/remove/{tracking_code}") async def remove_parcel(tracking_code: str): parcel_cache = await get_parcel_cache() if tracking_code not in parcel_cache: raise HTTPException(status_code=404, detail="Parcel not found") del parcel_cache[tracking_code] await save_parcel_cache(parcel_cache) return {"message": f"Parcel {tracking_code} removed"} @app.post("/api/parcels/add") async def add_parcel(parcel: ParcelAddRequest): parcel_cache = await get_parcel_cache() if parcel.tracking_code in parcel_cache: raise HTTPException(status_code=400, detail="Parcel already tracked") parcel_cache[parcel.tracking_code] = { "nickname": parcel.nickname, "postal_code": parcel.postal_code, } await save_parcel_cache(parcel_cache) return {"message": "Parcel added"} @app.get("/api/parcels") async def get_parcels(): parcel_cache = await get_parcel_cache() parcels = [] for tracking_code, info in parcel_cache.items(): tracking_url = f"https://jouw.postnl.nl/track-and-trace/{tracking_code}-NL-{info['postal_code']}" parcels.append({ "nickname": info["nickname"], "tracking_code": tracking_code, "postal_code": info["postal_code"], "tracking_url": tracking_url, }) return {"parcels": parcels} def get_subscriptions() -> list[dict]: return cache.get("push_subscriptions") or [] def save_subscriptions(subs: list[dict]): cache.set("push_subscriptions", subs) def add_subscription(sub_dict: dict): current_subs = get_subscriptions() if sub_dict not in current_subs: current_subs.append(sub_dict) save_subscriptions(current_subs) print("✅ New subscription added:", sub_dict) else: print("â„šī¸ Subscription already exists:", sub_dict["endpoint"]) def remove_subscription(sub_dict: dict): current_subs = get_subscriptions() updated_subs = [sub for sub in current_subs if sub != sub_dict] if len(updated_subs) != len(current_subs): save_subscriptions(updated_subs) print("đŸ—‘ī¸ Subscription removed:", sub_dict) else: print("âš ī¸ Subscription not found:", sub_dict) def subscription_exists(sub: dict) -> bool: current_subs = get_subscriptions() return any(sub == existing for existing in current_subs) @app.post("/api/send-all") async def send_push_to_all(): subscriptions = get_subscriptions() if not subscriptions: return JSONResponse({"message": "No subscribers yet"}, status_code=400) payload = { "title": "đŸ“Ŧ Your Morning Briefing is Ready!", "body": "Click here to read it.", "icon": "/logo.png", "badge": "/logo.png", "data": {"url": "/morning-briefing"} } to_remove = [] for sub_dict in subscriptions: try: subscription = WebPushSubscription(**sub_dict) message = wp.get(message=json.dumps(payload), subscription=subscription) resp = requests.post(url=subscription.endpoint, data=message.encrypted, headers=message.headers) resp.raise_for_status() print("✅ Push sent to", subscription.endpoint) except Exception as ex: print("❌ Push failed for", subscription.endpoint, ":", ex) # Mark for removal if failure indicates invalid subscription (e.g., 404 or 410) if hasattr(ex, 'response') and ex.response is not None: status_code = ex.response.status_code if status_code in (404, 410): # Gone or Not Found = subscription no longer valid to_remove.append(sub_dict) try: print("Details:", ex.response.json()) except: print("Response body:", ex.response.text) if to_remove: # Remove invalid subscriptions from cache for bad_sub in to_remove: if bad_sub in subscriptions: subscriptions.remove(bad_sub) # Update cache with cleaned list cache.set("push_subscriptions", subscriptions) print(f"Removed {len(to_remove)} invalid subscriptions from cache.") return {"message": "Push sent to all subscribers (invalid subs removed)"} @app.post("/api/send-test") async def send_test_push(request: Request): try: sub_dict = await request.json() except Exception: raise HTTPException(status_code=400, detail="Invalid JSON body") if not isinstance(sub_dict, dict) or "endpoint" not in sub_dict: raise HTTPException(status_code=400, detail="Missing or invalid subscription data") try: subscription = WebPushSubscription(**sub_dict) payload = { "title": "🔔 Test Notification", "body": "This is a test push just for you!", "icon": "/logo.png", "badge": "/logo.png", "data": {"url": "/settings"} } message = wp.get(message=json.dumps(payload), subscription=subscription) resp = requests.post( url=subscription.endpoint, data=message.encrypted, headers=message.headers ) resp.raise_for_status() print("✅ Test push sent to", subscription.endpoint) return {"message": "Test push sent successfully"} except Exception as ex: print("❌ Test push failed:", ex) if hasattr(ex, 'response') and ex.response is not None: status_code = ex.response.status_code # Remove the subscription if invalid if status_code in (404, 410): subscriptions = get_subscriptions() if sub_dict in subscriptions: subscriptions.remove(sub_dict) cache.set("push_subscriptions", subscriptions) print("Removed invalid subscription from cache due to failure.") try: print("Details:", ex.response.json()) except: print("Response body:", ex.response.text) raise HTTPException(status_code=500, detail="Failed to send test push") @app.post("/api/subscribe") async def subscribe(sub: dict): if subscription_exists(sub): return {"message": "Already subscribed"} add_subscription(sub) return {"message": "Subscription stored"} @app.post("/api/unsubscribe") async def unsubscribe(sub: dict) -> JSONResponse: #sub_dict = subscription.model_dump() remove_subscription(sub) return JSONResponse(content={"status": "unsubscribed"}) @app.post("/api/check-subscription") async def check_subscription_exists(sub: dict): if subscription_exists(sub): return {"exists": True} return {"exists": False} @app.get("/insight/daily") def get_daily_quick_insight(): insight = cache.get("daily_quick_insight_data") if insight: return {"source": "cache", "data": insight} else: return {"error": "No insight available yet"} @app.get("/news/relevant") def get_relevant_news_articles(): articles = cache.get("top_news_data") selected_titles = cache.get("select_relevant_news_data") if not articles: return {"error": "No news data available yet"} if not selected_titles: return {"error": "No selected relevant news titles available yet"} # Normalize titles for matching title_set = set(t.strip() for t in selected_titles) filtered_articles = [ article for article in articles if article.get("title", "").strip() in title_set ] return { "source": "cache", "data": filtered_articles } @app.post("/audio/{filename}/get") async def get_audio_file(filename: str): AUDIO_DIR = "audio" # Sanitize filename to prevent path traversal safe_filename = os.path.basename(filename).replace("..", "").replace("/", "") file_path = os.path.join(AUDIO_DIR, safe_filename) if not os.path.isfile(file_path): raise HTTPException(status_code=404, detail="Audio file not found.") return FileResponse( path=file_path, media_type="audio/wav", filename=safe_filename ) @app.get("/advice/dressing") def get_daily_dressing_advice(): advice = cache.get("daily_dressing_advice_data") if advice: return {"source": "cache", "data": advice} else: return {"error": "No advice available yet"} @app.post("/todo/{task_title}/{due_datetime}/complete") def complete_todoist_task(task_title: str, due_datetime: str): headers = { "Authorization": f"Bearer {TODOIST_API_TOKEN}", "Content-Type": "application/json" } # Decode and normalize inputs task_title = unquote(task_title) due_datetime = unquote(due_datetime).replace("Z", "+00:00") try: target_dt = datetime.fromisoformat(due_datetime) except ValueError: raise HTTPException(status_code=400, detail="Invalid datetime format. Use ISO format like 2025-06-13T08:00:00") # Fetch all open tasks to find the matching one response = requests.get("https://api.todoist.com/rest/v2/tasks", headers=headers) if response.status_code != 200: raise HTTPException(status_code=500, detail="Failed to fetch tasks from Todoist") tasks = response.json() # Match task by title and due datetime matching_task = next(( t for t in tasks if t["content"] == task_title and "due" in t and "datetime" in t["due"] and datetime.fromisoformat(t["due"]["datetime"].replace("Z", "+00:00")) == target_dt ), None) if not matching_task: raise HTTPException(status_code=404, detail="Task not found") # Mark the task as complete task_id = matching_task["id"] complete_url = f"https://api.todoist.com/rest/v2/tasks/{task_id}/close" complete_response = requests.post(complete_url, headers=headers) if complete_response.status_code in [200, 204]: return {"message": "Task marked as complete", "task_id": task_id} else: raise HTTPException( status_code=500, detail={ "error": "Failed to complete task in Todoist", "details": complete_response.json() } ) @app.post("/todo/{task_title}/{due_datetime}/create") def create_todoist_task(task_title: str, due_datetime: str): headers = { "Authorization": f"Bearer {TODOIST_API_TOKEN}", "Content-Type": "application/json" } # Decode the task title in case it has URL-encoded characters task_title = unquote(task_title) due_datetime = unquote(due_datetime) due_datetime = due_datetime.replace("Z", "+00:00") # Validate the datetime format try: dt = datetime.fromisoformat(due_datetime) except ValueError: return {"error": "Invalid datetime format. Use ISO format like 2025-06-13T08:00:00"} payload = { "content": task_title, "due_datetime": due_datetime } response = requests.post( "https://api.todoist.com/rest/v2/tasks", headers=headers, json=payload ) if response.status_code == 200 or response.status_code == 204: return {"message": "Task created", "task": payload} else: return { "error": "Failed to create task in Todoist", "details": response.json() } @app.get("/todo/today") def get_tasks_today(): tasks = cache.get("daily_tasks_data") if tasks: return {"source": "cache", "data": tasks} else: return {"error": "No tasks available yet"} def get_due_datetime(task): try: dt_str = task.get('due', {}).get('datetime') if dt_str: dt = parser.isoparse(dt_str) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt except Exception: pass return datetime.max.replace(tzinfo=timezone.utc) @app.get("/todo/all") def get_todoist_tasks(project_id=None, filter_query=None): """ Fetches tasks from Todoist. - project_id: optional ID of a specific project. - filter_query: Todoist filter string, e.g. "today | overdue". """ headers = { "Authorization": f"Bearer {TODOIST_API_TOKEN}" } params = {} if project_id: params['project_id'] = project_id if filter_query: params['filter'] = filter_query response = requests.get("https://api.todoist.com/rest/v2/tasks", headers=headers, params=params) data = response.json() data.sort(key=get_due_datetime) if response.status_code == 200: return {"source": "todoist", "data": data} else: return {"error": "Something went wrong trying to contact the Todoist API"} @app.get("/surprise/daily") def get_daily_surprise(): surprise = cache.get("daily_surprise_data") if surprise: return {"source": "cache", "data": surprise} else: return {"error": "No surprise available yet"} @app.get("/meme/daily") def get_daily_meme(): meme = cache.get("daily_meme_data") if meme: return {"source": "cache", "data": meme} else: return {"source": "none", "data": None} @app.get("/weather/daily") def get_daily_forecast(): forecast = cache.get("daily_forecast_data") if forecast: return {"source": "cache", "data": forecast} else: return {"source": "none", "data": None} @app.get("/weather/current") def get_current_weather(): current_data = cache.get("current_weather_data") if current_data: return {"source": "cache", "data": current_data} else: return {"source": "none", "data": None} @app.get("/admin/jobs") def list_jobs(): return get_jobs_info() @app.post("/admin/jobs/{job_id}/run") def run_job_now(job_id: str): job = scheduler.get_job(job_id) if not job: raise HTTPException(status_code=404, detail="Job not found") fn = job_functions.get(job_id) if not fn: raise HTTPException(status_code=400, detail="No callable associated with this job") try: fn() # Directly call the job function return {"status": "triggered"} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.on_event("startup") async def on_startup(): start_scheduler() @app.on_event("shutdown") async def on_shutdown(): scheduler.shutdown(wait=False) # Hardcoded run block if __name__ == "__main__": uvicorn.run("main:app", host="0.0.0.0", port=8000, ssl_keyfile="key.pem", ssl_certfile="cert.pem", reload=True)