Files
Eira-Back-End/main.py
2025-06-16 01:02:22 +02:00

514 lines
16 KiB
Python
Executable File
Raw Permalink Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, FileResponse
from fastapi import HTTPException
from pathlib import Path
from scheduler.scheduler import start_scheduler, scheduler, get_jobs_info, my_job
from scheduler.cache import cache
from scheduler.jobs import update_daily_tasks, update_weather, refresh_meme, update_current_weather, update_daily_surprise, update_dressing_advice, update_morning_briefing_transcript, update_news, get_relevant_news_titles, update_quick_insight
from webpush import WebPush, WebPushSubscription
from cryptography.hazmat.primitives import serialization
import json
import uvicorn
import requests
import random
import os
import base64
from datetime import datetime, timezone
from dateutil import parser
from urllib.parse import unquote
from pydantic import BaseModel
from typing import Optional, Dict
app = FastAPI()
# Allow frontend requests (adjust origin for production)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Or ["http://localhost:5173"] for Vite
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
TODOIST_API_TOKEN = "c2233236d19d56128c89ed6b0a9d10a9e7b287f1"
ACCUWEATHER_API_KEY = "YHeMcr9Aa96Goer8CANIB2E6QIbr5Dp0"
LOCATION_KEY = "251518"
job_functions = {
"daily_quick_insight": update_quick_insight,
"select_relevant_news": get_relevant_news_titles,
"top_news": update_news,
"morning_briefing_transcript": update_morning_briefing_transcript,
"daily_tasks": update_daily_tasks,
"daily_weather": update_weather,
"current_weather": update_current_weather,
"daily_dressing_advice": update_dressing_advice,
"daily_surprise": update_daily_surprise,
"daily_meme": refresh_meme,
"test_job": my_job,
}
wp = WebPush(
public_key=Path("./public_key.pem"),
private_key=Path("./private_key.pem"),
subscriber="admin@mail.com",
)
class ParcelAddRequest(BaseModel):
nickname: str
tracking_code: str
postal_code: str
async def get_parcel_cache() -> Dict[str, dict]:
return cache.get("parcel_data") or {}
async def save_parcel_cache(data: Dict[str, dict]):
cache.set("parcel_data", data)
@app.delete("/api/parcels/remove/{tracking_code}")
async def remove_parcel(tracking_code: str):
parcel_cache = await get_parcel_cache()
if tracking_code not in parcel_cache:
raise HTTPException(status_code=404, detail="Parcel not found")
del parcel_cache[tracking_code]
await save_parcel_cache(parcel_cache)
return {"message": f"Parcel {tracking_code} removed"}
@app.post("/api/parcels/add")
async def add_parcel(parcel: ParcelAddRequest):
parcel_cache = await get_parcel_cache()
if parcel.tracking_code in parcel_cache:
raise HTTPException(status_code=400, detail="Parcel already tracked")
parcel_cache[parcel.tracking_code] = {
"nickname": parcel.nickname,
"postal_code": parcel.postal_code,
}
await save_parcel_cache(parcel_cache)
return {"message": "Parcel added"}
@app.get("/api/parcels")
async def get_parcels():
parcel_cache = await get_parcel_cache()
parcels = []
for tracking_code, info in parcel_cache.items():
tracking_url = f"https://jouw.postnl.nl/track-and-trace/{tracking_code}-NL-{info['postal_code']}"
parcels.append({
"nickname": info["nickname"],
"tracking_code": tracking_code,
"postal_code": info["postal_code"],
"tracking_url": tracking_url,
})
return {"parcels": parcels}
def get_subscriptions() -> list[dict]:
return cache.get("push_subscriptions") or []
def save_subscriptions(subs: list[dict]):
cache.set("push_subscriptions", subs)
def add_subscription(sub_dict: dict):
current_subs = get_subscriptions()
if sub_dict not in current_subs:
current_subs.append(sub_dict)
save_subscriptions(current_subs)
print("✅ New subscription added:", sub_dict)
else:
print(" Subscription already exists:", sub_dict["endpoint"])
def remove_subscription(sub_dict: dict):
current_subs = get_subscriptions()
updated_subs = [sub for sub in current_subs if sub != sub_dict]
if len(updated_subs) != len(current_subs):
save_subscriptions(updated_subs)
print("🗑️ Subscription removed:", sub_dict)
else:
print("⚠️ Subscription not found:", sub_dict)
def subscription_exists(sub: dict) -> bool:
current_subs = get_subscriptions()
return any(sub == existing for existing in current_subs)
@app.post("/api/send-all")
async def send_push_to_all():
subscriptions = get_subscriptions()
if not subscriptions:
return JSONResponse({"message": "No subscribers yet"}, status_code=400)
payload = {
"title": "📬 Your Morning Briefing is Ready!",
"body": "Click here to read it.",
"icon": "/logo.png",
"badge": "/logo.png",
"data": {"url": "/morning-briefing"}
}
to_remove = []
for sub_dict in subscriptions:
try:
subscription = WebPushSubscription(**sub_dict)
message = wp.get(message=json.dumps(payload), subscription=subscription)
resp = requests.post(url=subscription.endpoint, data=message.encrypted, headers=message.headers)
resp.raise_for_status()
print("✅ Push sent to", subscription.endpoint)
except Exception as ex:
print("❌ Push failed for", subscription.endpoint, ":", ex)
# Mark for removal if failure indicates invalid subscription (e.g., 404 or 410)
if hasattr(ex, 'response') and ex.response is not None:
status_code = ex.response.status_code
if status_code in (404, 410): # Gone or Not Found = subscription no longer valid
to_remove.append(sub_dict)
try:
print("Details:", ex.response.json())
except:
print("Response body:", ex.response.text)
if to_remove:
# Remove invalid subscriptions from cache
for bad_sub in to_remove:
if bad_sub in subscriptions:
subscriptions.remove(bad_sub)
# Update cache with cleaned list
cache.set("push_subscriptions", subscriptions)
print(f"Removed {len(to_remove)} invalid subscriptions from cache.")
return {"message": "Push sent to all subscribers (invalid subs removed)"}
@app.post("/api/send-test")
async def send_test_push(request: Request):
try:
sub_dict = await request.json()
except Exception:
raise HTTPException(status_code=400, detail="Invalid JSON body")
if not isinstance(sub_dict, dict) or "endpoint" not in sub_dict:
raise HTTPException(status_code=400, detail="Missing or invalid subscription data")
try:
subscription = WebPushSubscription(**sub_dict)
payload = {
"title": "🔔 Test Notification",
"body": "This is a test push just for you!",
"icon": "/logo.png",
"badge": "/logo.png",
"data": {"url": "/settings"}
}
message = wp.get(message=json.dumps(payload), subscription=subscription)
resp = requests.post(
url=subscription.endpoint,
data=message.encrypted,
headers=message.headers
)
resp.raise_for_status()
print("✅ Test push sent to", subscription.endpoint)
return {"message": "Test push sent successfully"}
except Exception as ex:
print("❌ Test push failed:", ex)
if hasattr(ex, 'response') and ex.response is not None:
status_code = ex.response.status_code
# Remove the subscription if invalid
if status_code in (404, 410):
subscriptions = get_subscriptions()
if sub_dict in subscriptions:
subscriptions.remove(sub_dict)
cache.set("push_subscriptions", subscriptions)
print("Removed invalid subscription from cache due to failure.")
try:
print("Details:", ex.response.json())
except:
print("Response body:", ex.response.text)
raise HTTPException(status_code=500, detail="Failed to send test push")
@app.post("/api/subscribe")
async def subscribe(sub: dict):
if subscription_exists(sub):
return {"message": "Already subscribed"}
add_subscription(sub)
return {"message": "Subscription stored"}
@app.post("/api/unsubscribe")
async def unsubscribe(sub: dict) -> JSONResponse:
#sub_dict = subscription.model_dump()
remove_subscription(sub)
return JSONResponse(content={"status": "unsubscribed"})
@app.post("/api/check-subscription")
async def check_subscription_exists(sub: dict):
if subscription_exists(sub):
return {"exists": True}
return {"exists": False}
@app.get("/insight/daily")
def get_daily_quick_insight():
insight = cache.get("daily_quick_insight_data")
if insight:
return {"source": "cache", "data": insight}
else:
return {"error": "No insight available yet"}
@app.get("/news/relevant")
def get_relevant_news_articles():
articles = cache.get("top_news_data")
selected_titles = cache.get("select_relevant_news_data")
if not articles:
return {"error": "No news data available yet"}
if not selected_titles:
return {"error": "No selected relevant news titles available yet"}
# Normalize titles for matching
title_set = set(t.strip() for t in selected_titles)
filtered_articles = [
article for article in articles
if article.get("title", "").strip() in title_set
]
return {
"source": "cache",
"data": filtered_articles
}
@app.post("/audio/{filename}/get")
async def get_audio_file(filename: str):
AUDIO_DIR = "audio"
# Sanitize filename to prevent path traversal
safe_filename = os.path.basename(filename).replace("..", "").replace("/", "")
file_path = os.path.join(AUDIO_DIR, safe_filename)
if not os.path.isfile(file_path):
raise HTTPException(status_code=404, detail="Audio file not found.")
return FileResponse(
path=file_path,
media_type="audio/wav",
filename=safe_filename
)
@app.get("/advice/dressing")
def get_daily_dressing_advice():
advice = cache.get("daily_dressing_advice_data")
if advice:
return {"source": "cache", "data": advice}
else:
return {"error": "No advice available yet"}
@app.post("/todo/{task_title}/{due_datetime}/complete")
def complete_todoist_task(task_title: str, due_datetime: str):
headers = {
"Authorization": f"Bearer {TODOIST_API_TOKEN}",
"Content-Type": "application/json"
}
# Decode and normalize inputs
task_title = unquote(task_title)
due_datetime = unquote(due_datetime).replace("Z", "+00:00")
try:
target_dt = datetime.fromisoformat(due_datetime)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid datetime format. Use ISO format like 2025-06-13T08:00:00")
# Fetch all open tasks to find the matching one
response = requests.get("https://api.todoist.com/rest/v2/tasks", headers=headers)
if response.status_code != 200:
raise HTTPException(status_code=500, detail="Failed to fetch tasks from Todoist")
tasks = response.json()
# Match task by title and due datetime
matching_task = next((
t for t in tasks
if t["content"] == task_title and
"due" in t and
"datetime" in t["due"] and
datetime.fromisoformat(t["due"]["datetime"].replace("Z", "+00:00")) == target_dt
), None)
if not matching_task:
raise HTTPException(status_code=404, detail="Task not found")
# Mark the task as complete
task_id = matching_task["id"]
complete_url = f"https://api.todoist.com/rest/v2/tasks/{task_id}/close"
complete_response = requests.post(complete_url, headers=headers)
if complete_response.status_code in [200, 204]:
return {"message": "Task marked as complete", "task_id": task_id}
else:
raise HTTPException(
status_code=500,
detail={
"error": "Failed to complete task in Todoist",
"details": complete_response.json()
}
)
@app.post("/todo/{task_title}/{due_datetime}/create")
def create_todoist_task(task_title: str, due_datetime: str):
headers = {
"Authorization": f"Bearer {TODOIST_API_TOKEN}",
"Content-Type": "application/json"
}
# Decode the task title in case it has URL-encoded characters
task_title = unquote(task_title)
due_datetime = unquote(due_datetime)
due_datetime = due_datetime.replace("Z", "+00:00")
# Validate the datetime format
try:
dt = datetime.fromisoformat(due_datetime)
except ValueError:
return {"error": "Invalid datetime format. Use ISO format like 2025-06-13T08:00:00"}
payload = {
"content": task_title,
"due_datetime": due_datetime
}
response = requests.post(
"https://api.todoist.com/rest/v2/tasks",
headers=headers,
json=payload
)
if response.status_code == 200 or response.status_code == 204:
return {"message": "Task created", "task": payload}
else:
return {
"error": "Failed to create task in Todoist",
"details": response.json()
}
@app.get("/todo/today")
def get_tasks_today():
tasks = cache.get("daily_tasks_data")
if tasks:
return {"source": "cache", "data": tasks}
else:
return {"error": "No tasks available yet"}
def get_due_datetime(task):
try:
dt_str = task.get('due', {}).get('datetime')
if dt_str:
dt = parser.isoparse(dt_str)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
except Exception:
pass
return datetime.max.replace(tzinfo=timezone.utc)
@app.get("/todo/all")
def get_todoist_tasks(project_id=None, filter_query=None):
"""
Fetches tasks from Todoist.
- project_id: optional ID of a specific project.
- filter_query: Todoist filter string, e.g. "today | overdue".
"""
headers = {
"Authorization": f"Bearer {TODOIST_API_TOKEN}"
}
params = {}
if project_id:
params['project_id'] = project_id
if filter_query:
params['filter'] = filter_query
response = requests.get("https://api.todoist.com/rest/v2/tasks", headers=headers, params=params)
data = response.json()
data.sort(key=get_due_datetime)
if response.status_code == 200:
return {"source": "todoist", "data": data}
else:
return {"error": "Something went wrong trying to contact the Todoist API"}
@app.get("/surprise/daily")
def get_daily_surprise():
surprise = cache.get("daily_surprise_data")
if surprise:
return {"source": "cache", "data": surprise}
else:
return {"error": "No surprise available yet"}
@app.get("/meme/daily")
def get_daily_meme():
meme = cache.get("daily_meme_data")
if meme:
return {"source": "cache", "data": meme}
else:
return {"source": "none", "data": None}
@app.get("/weather/daily")
def get_daily_forecast():
forecast = cache.get("daily_forecast_data")
if forecast:
return {"source": "cache", "data": forecast}
else:
return {"source": "none", "data": None}
@app.get("/weather/current")
def get_current_weather():
current_data = cache.get("current_weather_data")
if current_data:
return {"source": "cache", "data": current_data}
else:
return {"source": "none", "data": None}
@app.get("/admin/jobs")
def list_jobs():
return get_jobs_info()
@app.post("/admin/jobs/{job_id}/run")
def run_job_now(job_id: str):
job = scheduler.get_job(job_id)
if not job:
raise HTTPException(status_code=404, detail="Job not found")
fn = job_functions.get(job_id)
if not fn:
raise HTTPException(status_code=400, detail="No callable associated with this job")
try:
fn() # Directly call the job function
return {"status": "triggered"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.on_event("startup")
async def on_startup():
start_scheduler()
@app.on_event("shutdown")
async def on_shutdown():
scheduler.shutdown(wait=False)
# Hardcoded run block
if __name__ == "__main__":
uvicorn.run("main:app", host="0.0.0.0", port=8000, ssl_keyfile="key.pem", ssl_certfile="cert.pem", reload=True)