Initial Code

First Commit
This commit is contained in:
2025-06-16 01:02:22 +02:00
commit 55d25b0eb9
19 changed files with 1444 additions and 0 deletions

BIN
__pycache__/main.cpython-310.pyc Executable file

Binary file not shown.

1
applicationServerKey Executable file
View File

@@ -0,0 +1 @@
BNf7z00erjRX3kUBfGZK-TE1Tz6Zypb1I4aDVZkaWO1113xV_L6hDMbe_Evv8ruUiu-E88xPhRNIEL4ayjqcL5o

BIN
audio/morning_briefing.wav Executable file

Binary file not shown.

15
cert.conf Executable file
View File

@@ -0,0 +1,15 @@
[req]
default_bits = 2048
prompt = no
default_md = sha256
distinguished_name = dn
x509_extensions = v3_req
[dn]
CN = 192.168.70.11
[v3_req]
subjectAltName = @alt_names
[alt_names]
IP.1 = 192.168.70.11

18
cert.pem Executable file
View File

@@ -0,0 +1,18 @@
-----BEGIN CERTIFICATE-----
MIIC8DCCAdigAwIBAgIUfYhDachzIQ0cUwg6udGQupwWqPswDQYJKoZIhvcNAQEL
BQAwGDEWMBQGA1UEAwwNMTkyLjE2OC43MC4xMTAeFw0yNTA2MTMxNzU5MDlaFw0y
NjA2MTMxNzU5MDlaMBgxFjAUBgNVBAMMDTE5Mi4xNjguNzAuMTEwggEiMA0GCSqG
SIb3DQEBAQUAA4IBDwAwggEKAoIBAQCcRXDC71Qgz5zEauV5ZwzdJCRGDv226b3s
0iQwVOPGBWRUX6ov7X4Q3cGCPWs4Qs6b7KE794l5gH4+GOKrrMEOSXV9pUjWO7pA
9ON0dr5iW1EFyjJiHmc5xPhuBKCGwZB21FEwAV4Jsbmn9MJPH3/lviWWBZeLgRTF
x1Ov/L/Hvb6Dpz5w84zfdeCw7PBiyZ6I4yvTBP6gTV/gj/TSR925Udnf0xP+a4+y
dhipweVhQuErsoQlwDVs12Hqvjfbv/DhKwGgAM+Az7TFg1zCvnkuBVgMGU/7Xo+u
enb6WDXfQFb1P9Hu5ldP4+ABgih0pQA2VkD3com/DkRYj4IL3bFzAgMBAAGjMjAw
MA8GA1UdEQQIMAaHBMCoRgswHQYDVR0OBBYEFLdoqA0m9lSxiOZT1RlKkxl3Phcr
MA0GCSqGSIb3DQEBCwUAA4IBAQBkQ6NIW16AFAf/+xPAJOP3zL1IDN3PKTW4FIq8
JWzBLSv0xYQG2VzXqX04fUfFs5UGyTWQuDDlpWhzNvLp12mdjQnu+bnKSo3xZXOQ
NT4euxDba8R2AT1/j9WhfaWJqe/Xo3IWhx2XupyNTOduvX7DnWqmVgsRO+bNeb6g
+uE35ayEyQKEavZjTrnrUpxCp2LqwHTqFLBDelyNJWTVB9fDxpfDFU6ipqkSvoq7
cJ9RcwK+be9cg9xImrEPkJ7NX3o86aMhJIPNBlDNOen0+WteQJd+cgpM3JsQ1E5F
rKBLWSGgns1sjbUhqZKZPKPxE/LI8HeGrRBaS84H7+/TEafZ
-----END CERTIFICATE-----

109
data/surprises.json Executable file
View File

@@ -0,0 +1,109 @@
{
"surprises": [
"Did you know? The first computer virus was created in 1983 and was called 'Elk Cloner.'",
"Fun fact: The first video game, 'Tennis for Two,' was made in 1958 on an oscilloscope.",
"Tech tip: Using a dark mode UI can reduce eye strain and save battery life on OLED screens.",
"Gaming trivia: The longest game of chess ever played lasted 20 hours and 15 minutes!",
"Movie fact: 'Blade Runner' inspired the look of many future cyberpunk stories and films.",
"Design tip: White space is not empty space — it helps create focus and balance in layouts.",
"Programming trivia: The name 'Python' comes from 'Monty Pythons Flying Circus,' not the snake.",
"Entertainment fact: The lightsaber sound effect was created by mixing humming from an old film projector and interference from a TV set.",
"Sci-fi lore: The word 'robot' was first introduced in Karel Čapeks 1920 play 'R.U.R.'",
"Did you know? The first webcam was used to monitor a coffee pot at Cambridge University.",
"Tech tip: Git was created by Linus Torvalds to help develop the Linux kernel efficiently.",
"Gaming fact: The original 'Doom' popularized 3D graphics in video games in 1993.",
"Movie trivia: 'The Matrix' was inspired by Japanese anime like 'Ghost in the Shell.'",
"Design tip: Using a grid system can improve consistency in UI and graphic designs.",
"Programming fun: The first computer bug was an actual moth stuck in a relay!",
"Entertainment fact: 'Star Wars' stormtroopers' armor was inspired by samurai armor.",
"Sci-fi fact: Isaac Asimovs Three Laws of Robotics influence many AI ethics discussions today.",
"Did you know? The worlds first website is still online at info.cern.ch.",
"Tech tip: Using meaningful variable names can save debugging time later.",
"Gaming trivia: 'Minecraft' was created by Markus Persson in just six days.",
"Movie fact: The 'Alien' xenomorph design was inspired by insect anatomy.",
"Design tip: Contrast is key for readability—make sure text stands out against backgrounds.",
"Programming fun: 'Hello, World!' is traditionally the first program learned in many languages.",
"Entertainment fact: 'The Big Lebowski' became a cult classic despite poor initial reviews.",
"Sci-fi lore: The phrase 'May the Force be with you' is one of the most quoted movie lines ever.",
"Did you know? The first emoji was created in 1999 by Shigetaka Kurita in Japan.",
"Tech tip: Keyboard shortcuts can drastically improve coding efficiency.",
"Gaming fact: Speedrunning communities can complete games in minutes by exploiting glitches.",
"Movie trivia: The sound of the T-800 endoskeleton in 'Terminator' was created with a human skull and a hammer.",
"Design tip: Consistency in iconography helps users navigate software intuitively.",
"Programming trivia: The first computer programmer was Ada Lovelace in the 1800s.",
"Entertainment fact: 'Fight Club' was adapted from Chuck Palahniuks novel of the same name.",
"Sci-fi fact: 'Neuromancer' by William Gibson is credited with popularizing the cyberpunk genre.",
"Did you know? The Linux mascot, Tux the penguin, was chosen by Linus Torvalds himself.",
"Tech tip: Using version control like Git prevents lost code and eases collaboration.",
"Gaming trivia: 'Pac-Man' was originally called 'Pakkuman' in Japan.",
"Movie fact: '2001: A Space Odyssey' influenced visual effects in countless sci-fi movies.",
"Design tip: Use a limited color palette to create a cohesive visual identity.",
"Programming fun: The term 'bug' in software predates computers and was used in hardware.",
"Entertainment fact: 'The Rocky Horror Picture Show' holds the record for longest-running theatrical release.",
"Sci-fi lore: The phrase 'Ill be back' was improvised by Arnold Schwarzenegger in 'The Terminator.'",
"Did you know? CAPTCHA tests are used to differentiate humans from bots online.",
"Tech tip: Writing modular code improves maintainability and scalability.",
"Gaming fact: 'The Legend of Zelda' was one of the first open-world games.",
"Movie trivia: 'The Fifth Element' costumes were designed by Jean-Paul Gaultier.",
"Design tip: Negative space can form interesting shapes that add meaning to your design.",
"Programming trivia: Java was originally called Oak.",
"Entertainment fact: 'The Princess Bride' is famous for its witty dialogue and quotable lines.",
"Sci-fi fact: The 'Tricorder' device from Star Trek inspired real-world portable diagnostic tools.",
"Did you know? The first computer mouse was made of wood.",
"Tech tip: Automated testing helps catch bugs before deployment.",
"Gaming trivia: The Konami Code (↑↑↓↓←→←→BA) unlocks cheats in many classic games.",
"Movie fact: 'Mad Max: Fury Road' used practical effects instead of CGI for most stunts.",
"Design tip: Typography affects mood—choose fonts that reflect your brands personality.",
"Programming fun: The emoji 😄 has a Unicode codepoint of U+1F604.",
"Entertainment fact: 'Donnie Darko' became a cult hit after its DVD release.",
"Sci-fi lore: The Prime Directive in Star Trek forbids interference with alien civilizations.",
"Did you know? The first 1GB hard drive weighed over 500 pounds.",
"Tech tip: Refactoring code regularly keeps your project clean and efficient.",
"Gaming fact: 'Half-Life' was praised for integrating story and gameplay seamlessly.",
"Movie trivia: 'Inception' used rotating sets to create zero-gravity effects.",
"Design tip: Use color psychology to evoke emotions in your users.",
"Programming trivia: The first email was sent in 1971 by Ray Tomlinson.",
"Entertainment fact: 'The Shawshank Redemption' initially underperformed but became iconic over time.",
"Sci-fi fact: The movie 'Her' explores themes of AI-human relationships.",
"Did you know? Early video game graphics used only a few pixels per character.",
"Tech tip: Keyboard-driven development can speed up your workflow.",
"Gaming trivia: The famous 'Zelda' chest opening sound is from an old cassette tape.",
"Movie fact: The voice of HAL 9000 in '2001: A Space Odyssey' is Douglas Rain.",
"Design tip: Use alignment to create order and organization in layouts.",
"Programming fun: The Python logo is inspired by the Monty Python comedy troupe.",
"Entertainment fact: 'The Room' is famous for its bizarre dialogue and cult following.",
"Sci-fi lore: 'Dune' features one of the most complex fictional universes ever created.",
"Did you know? The QWERTY keyboard was designed to prevent typewriter jams.",
"Tech tip: Continuous integration automates testing and deployment.",
"Gaming fact: 'Portal' combines puzzle-solving with storytelling in a unique way.",
"Movie trivia: 'Ghostbusters' proton packs were inspired by particle accelerators.",
"Design tip: Responsive design ensures your site looks good on all devices.",
"Programming trivia: The first computer bug was actually a dead moth found in a Harvard Mark II.",
"Entertainment fact: 'Pulp Fiction' changed the narrative style of modern movies.",
"Sci-fi fact: 'Star Wars' lightsabers were inspired by samurai swords.",
"Did you know? The first website was published in 1991 by Tim Berners-Lee.",
"Tech tip: Use linters to catch syntax errors before running code.",
"Gaming trivia: 'Final Fantasy' was named because it was supposed to be the last game by its creator.",
"Movie fact: 'The Lord of the Rings' trilogy was filmed over 8 years.",
"Design tip: Use hierarchy to guide users through your content.",
"Programming fun: The JavaScript language was created in just 10 days.",
"Entertainment fact: 'Back to the Future' used a DeLorean as a time machine.",
"Sci-fi lore: The 'Matrix' movie introduced 'bullet time' visual effects.",
"Did you know? The term 'debugging' was popularized by Grace Hopper.",
"Tech tip: Profiling tools help optimize your programs performance.",
"Gaming fact: 'The Witcher 3' features over 100 hours of gameplay content.",
"Movie trivia: 'Jurassic Park' was groundbreaking in CGI dinosaur effects.",
"Design tip: Use consistent spacing to create a balanced layout.",
"Programming trivia: The first computer game was 'Spacewar!' created in 1962.",
"Entertainment fact: 'Fight Club' explores themes of identity and consumerism.",
"Sci-fi fact: The 'Star Trek' communicator inspired early mobile phones.",
"Did you know? The first emoji was just 176 pixels.",
"Tech tip: Keeping dependencies updated prevents security vulnerabilities.",
"Gaming trivia: 'Tetris' was originally developed in the Soviet Union.",
"Movie fact: 'The Godfather' features an iconic opening with a wedding scene.",
"Design tip: Minimalism often leads to clearer communication.",
"Programming fun: The C programming language was created in 1972.",
"Entertainment fact: 'Stranger Things' pays homage to 80s sci-fi and horror.",
"Sci-fi lore: The 'Doctor Who' TARDIS is bigger on the inside."
]
}

23
generate.py Executable file
View File

@@ -0,0 +1,23 @@
from cryptography.hazmat.primitives import serialization
import base64
def base64url_encode(data: bytes) -> str:
return base64.urlsafe_b64encode(data).decode('utf-8').rstrip('=')
# Load PEM public key
with open("vapid_public.pem", "rb") as f:
pem_data = f.read()
public_key = serialization.load_pem_public_key(pem_data)
# Extract the raw uncompressed public key bytes (0x04 + X + Y)
public_numbers = public_key.public_numbers()
x_bytes = public_numbers.x.to_bytes(32, 'big')
y_bytes = public_numbers.y.to_bytes(32, 'big')
uncompressed_key = b'\x04' + x_bytes + y_bytes
# Base64url encode the uncompressed key bytes
public_key_b64url = base64url_encode(uncompressed_key)
print("Base64url-encoded public key (use in frontend):")
print(public_key_b64url)

BIN
jobs.sqlite Executable file

Binary file not shown.

28
key.pem Executable file
View File

@@ -0,0 +1,28 @@
-----BEGIN PRIVATE KEY-----
MIIEvAIBADANBgkqhkiG9w0BAQEFAASCBKYwggSiAgEAAoIBAQCcRXDC71Qgz5zE
auV5ZwzdJCRGDv226b3s0iQwVOPGBWRUX6ov7X4Q3cGCPWs4Qs6b7KE794l5gH4+
GOKrrMEOSXV9pUjWO7pA9ON0dr5iW1EFyjJiHmc5xPhuBKCGwZB21FEwAV4Jsbmn
9MJPH3/lviWWBZeLgRTFx1Ov/L/Hvb6Dpz5w84zfdeCw7PBiyZ6I4yvTBP6gTV/g
j/TSR925Udnf0xP+a4+ydhipweVhQuErsoQlwDVs12Hqvjfbv/DhKwGgAM+Az7TF
g1zCvnkuBVgMGU/7Xo+uenb6WDXfQFb1P9Hu5ldP4+ABgih0pQA2VkD3com/DkRY
j4IL3bFzAgMBAAECggEAEC1uIXgGNQlZGMpaLCm7Xq9NpAUMReHHvBr7DRfUppzL
SKLIqwKCgPt7+2AhV+cunVyvSlZzYGUpbINF//C34aNAKvwv4KG5Q902KMncJrF+
X0OR2K9+DzBxaW5IMfsa2fpb10Tk98ryxczkSq2fn2iCWpgEmrgHrvvWEYtwTRyR
kVhyaQuHet3xqlgNt5SLDNbLILq5vYX6imenzHIrlqqhgUWPu6I+c5tfebThE8Ch
jDnYdzLKub5XOvFEYW33W+CKhQiRMumkUZmnT89kJVFpqmhaeMVi+MoHPi77haQu
2Xlj3ITtKGCUc1/FW318Di0EFfzK59bhw8akfrKwgQKBgQDHOAswy8162Hep/tn4
B751eeMeuj2P/yCQwCvDziHb5UL7JoEi/gG0tYAWVwTDbgW7v8zw92bcJir8LDgH
iruMvmjvQJSdVAd53K7AFytCD+15PjODgqQjZFBW7C/FaoJuP6YhJmUUFLEdO4Sr
lP4uja22miaLR7Fh0j4nT5J78wKBgQDIz7s64ypQf5JouAJAiLDXY8rUdWZ/Pdp0
Sd8ZaQxAxfTdjs+vyqA5vdJA/qzNZ3yQa0JkG33bvJ/e/R+ZZUkcbhO64sSnBCF6
nNZQNJ0o3Yxyf5AmMjXgXm1dROtEJxkqstY4eYmh969S/a0DZqqg2FTWssRPV5Sr
VYhNa/jUgQKBgCueNK0JYkLsVD7vIrAwmd3Ewxp+Mm5tsKagvLRRVI59ws2UX5/W
t27mclXx1YQoea7izq9Pq8C1xqGR2O6lzGyDYBxKKn/L3xsqW0SJjhoXvKnYGZVI
rEFnfEOb0NXLfSVAYFJPwr/DzAeXeXPL2l88+/TICXpjRw4g5yTaQjtLAoGAeCMP
OGO/ZPltdgjgEAu3AKVLK2AxOd6fWwL23hTip73GiOZvihQHV4QgzZGPgDNxalVB
GvL3kQZ//sAgr3zRCJkyZJRWYjUuclRyQvm3LEOfSsDWVIOlh932P96DgfqCK/Fi
B9duZ9/unA/3+hjp6+CAoRHCMFWNNBOvv+HgtAECgYBLSKqvN6ZG+C7IQ/ywRvM2
+dufxvAZEeqhBchp/vrmZ0fU4IMNFOVyIkGibkm4LTbDufSCe/TjzOvM01PsE/FL
NTaN+03tkIethDJqE1yOMkd5weL0bN3FSyhzIjiQddWqIZxAglRT3Nzx3He2DKIv
zsEGzlQlJzNuNkwUjk4QOw==
-----END PRIVATE KEY-----

514
main.py Executable file
View File

@@ -0,0 +1,514 @@
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, FileResponse
from fastapi import HTTPException
from pathlib import Path
from scheduler.scheduler import start_scheduler, scheduler, get_jobs_info, my_job
from scheduler.cache import cache
from scheduler.jobs import update_daily_tasks, update_weather, refresh_meme, update_current_weather, update_daily_surprise, update_dressing_advice, update_morning_briefing_transcript, update_news, get_relevant_news_titles, update_quick_insight
from webpush import WebPush, WebPushSubscription
from cryptography.hazmat.primitives import serialization
import json
import uvicorn
import requests
import random
import os
import base64
from datetime import datetime, timezone
from dateutil import parser
from urllib.parse import unquote
from pydantic import BaseModel
from typing import Optional, Dict
app = FastAPI()
# Allow frontend requests (adjust origin for production)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Or ["http://localhost:5173"] for Vite
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
TODOIST_API_TOKEN = "c2233236d19d56128c89ed6b0a9d10a9e7b287f1"
ACCUWEATHER_API_KEY = "YHeMcr9Aa96Goer8CANIB2E6QIbr5Dp0"
LOCATION_KEY = "251518"
job_functions = {
"daily_quick_insight": update_quick_insight,
"select_relevant_news": get_relevant_news_titles,
"top_news": update_news,
"morning_briefing_transcript": update_morning_briefing_transcript,
"daily_tasks": update_daily_tasks,
"daily_weather": update_weather,
"current_weather": update_current_weather,
"daily_dressing_advice": update_dressing_advice,
"daily_surprise": update_daily_surprise,
"daily_meme": refresh_meme,
"test_job": my_job,
}
wp = WebPush(
public_key=Path("./public_key.pem"),
private_key=Path("./private_key.pem"),
subscriber="admin@mail.com",
)
class ParcelAddRequest(BaseModel):
nickname: str
tracking_code: str
postal_code: str
async def get_parcel_cache() -> Dict[str, dict]:
return cache.get("parcel_data") or {}
async def save_parcel_cache(data: Dict[str, dict]):
cache.set("parcel_data", data)
@app.delete("/api/parcels/remove/{tracking_code}")
async def remove_parcel(tracking_code: str):
parcel_cache = await get_parcel_cache()
if tracking_code not in parcel_cache:
raise HTTPException(status_code=404, detail="Parcel not found")
del parcel_cache[tracking_code]
await save_parcel_cache(parcel_cache)
return {"message": f"Parcel {tracking_code} removed"}
@app.post("/api/parcels/add")
async def add_parcel(parcel: ParcelAddRequest):
parcel_cache = await get_parcel_cache()
if parcel.tracking_code in parcel_cache:
raise HTTPException(status_code=400, detail="Parcel already tracked")
parcel_cache[parcel.tracking_code] = {
"nickname": parcel.nickname,
"postal_code": parcel.postal_code,
}
await save_parcel_cache(parcel_cache)
return {"message": "Parcel added"}
@app.get("/api/parcels")
async def get_parcels():
parcel_cache = await get_parcel_cache()
parcels = []
for tracking_code, info in parcel_cache.items():
tracking_url = f"https://jouw.postnl.nl/track-and-trace/{tracking_code}-NL-{info['postal_code']}"
parcels.append({
"nickname": info["nickname"],
"tracking_code": tracking_code,
"postal_code": info["postal_code"],
"tracking_url": tracking_url,
})
return {"parcels": parcels}
def get_subscriptions() -> list[dict]:
return cache.get("push_subscriptions") or []
def save_subscriptions(subs: list[dict]):
cache.set("push_subscriptions", subs)
def add_subscription(sub_dict: dict):
current_subs = get_subscriptions()
if sub_dict not in current_subs:
current_subs.append(sub_dict)
save_subscriptions(current_subs)
print("✅ New subscription added:", sub_dict)
else:
print(" Subscription already exists:", sub_dict["endpoint"])
def remove_subscription(sub_dict: dict):
current_subs = get_subscriptions()
updated_subs = [sub for sub in current_subs if sub != sub_dict]
if len(updated_subs) != len(current_subs):
save_subscriptions(updated_subs)
print("🗑️ Subscription removed:", sub_dict)
else:
print("⚠️ Subscription not found:", sub_dict)
def subscription_exists(sub: dict) -> bool:
current_subs = get_subscriptions()
return any(sub == existing for existing in current_subs)
@app.post("/api/send-all")
async def send_push_to_all():
subscriptions = get_subscriptions()
if not subscriptions:
return JSONResponse({"message": "No subscribers yet"}, status_code=400)
payload = {
"title": "📬 Your Morning Briefing is Ready!",
"body": "Click here to read it.",
"icon": "/logo.png",
"badge": "/logo.png",
"data": {"url": "/morning-briefing"}
}
to_remove = []
for sub_dict in subscriptions:
try:
subscription = WebPushSubscription(**sub_dict)
message = wp.get(message=json.dumps(payload), subscription=subscription)
resp = requests.post(url=subscription.endpoint, data=message.encrypted, headers=message.headers)
resp.raise_for_status()
print("✅ Push sent to", subscription.endpoint)
except Exception as ex:
print("❌ Push failed for", subscription.endpoint, ":", ex)
# Mark for removal if failure indicates invalid subscription (e.g., 404 or 410)
if hasattr(ex, 'response') and ex.response is not None:
status_code = ex.response.status_code
if status_code in (404, 410): # Gone or Not Found = subscription no longer valid
to_remove.append(sub_dict)
try:
print("Details:", ex.response.json())
except:
print("Response body:", ex.response.text)
if to_remove:
# Remove invalid subscriptions from cache
for bad_sub in to_remove:
if bad_sub in subscriptions:
subscriptions.remove(bad_sub)
# Update cache with cleaned list
cache.set("push_subscriptions", subscriptions)
print(f"Removed {len(to_remove)} invalid subscriptions from cache.")
return {"message": "Push sent to all subscribers (invalid subs removed)"}
@app.post("/api/send-test")
async def send_test_push(request: Request):
try:
sub_dict = await request.json()
except Exception:
raise HTTPException(status_code=400, detail="Invalid JSON body")
if not isinstance(sub_dict, dict) or "endpoint" not in sub_dict:
raise HTTPException(status_code=400, detail="Missing or invalid subscription data")
try:
subscription = WebPushSubscription(**sub_dict)
payload = {
"title": "🔔 Test Notification",
"body": "This is a test push just for you!",
"icon": "/logo.png",
"badge": "/logo.png",
"data": {"url": "/settings"}
}
message = wp.get(message=json.dumps(payload), subscription=subscription)
resp = requests.post(
url=subscription.endpoint,
data=message.encrypted,
headers=message.headers
)
resp.raise_for_status()
print("✅ Test push sent to", subscription.endpoint)
return {"message": "Test push sent successfully"}
except Exception as ex:
print("❌ Test push failed:", ex)
if hasattr(ex, 'response') and ex.response is not None:
status_code = ex.response.status_code
# Remove the subscription if invalid
if status_code in (404, 410):
subscriptions = get_subscriptions()
if sub_dict in subscriptions:
subscriptions.remove(sub_dict)
cache.set("push_subscriptions", subscriptions)
print("Removed invalid subscription from cache due to failure.")
try:
print("Details:", ex.response.json())
except:
print("Response body:", ex.response.text)
raise HTTPException(status_code=500, detail="Failed to send test push")
@app.post("/api/subscribe")
async def subscribe(sub: dict):
if subscription_exists(sub):
return {"message": "Already subscribed"}
add_subscription(sub)
return {"message": "Subscription stored"}
@app.post("/api/unsubscribe")
async def unsubscribe(sub: dict) -> JSONResponse:
#sub_dict = subscription.model_dump()
remove_subscription(sub)
return JSONResponse(content={"status": "unsubscribed"})
@app.post("/api/check-subscription")
async def check_subscription_exists(sub: dict):
if subscription_exists(sub):
return {"exists": True}
return {"exists": False}
@app.get("/insight/daily")
def get_daily_quick_insight():
insight = cache.get("daily_quick_insight_data")
if insight:
return {"source": "cache", "data": insight}
else:
return {"error": "No insight available yet"}
@app.get("/news/relevant")
def get_relevant_news_articles():
articles = cache.get("top_news_data")
selected_titles = cache.get("select_relevant_news_data")
if not articles:
return {"error": "No news data available yet"}
if not selected_titles:
return {"error": "No selected relevant news titles available yet"}
# Normalize titles for matching
title_set = set(t.strip() for t in selected_titles)
filtered_articles = [
article for article in articles
if article.get("title", "").strip() in title_set
]
return {
"source": "cache",
"data": filtered_articles
}
@app.post("/audio/{filename}/get")
async def get_audio_file(filename: str):
AUDIO_DIR = "audio"
# Sanitize filename to prevent path traversal
safe_filename = os.path.basename(filename).replace("..", "").replace("/", "")
file_path = os.path.join(AUDIO_DIR, safe_filename)
if not os.path.isfile(file_path):
raise HTTPException(status_code=404, detail="Audio file not found.")
return FileResponse(
path=file_path,
media_type="audio/wav",
filename=safe_filename
)
@app.get("/advice/dressing")
def get_daily_dressing_advice():
advice = cache.get("daily_dressing_advice_data")
if advice:
return {"source": "cache", "data": advice}
else:
return {"error": "No advice available yet"}
@app.post("/todo/{task_title}/{due_datetime}/complete")
def complete_todoist_task(task_title: str, due_datetime: str):
headers = {
"Authorization": f"Bearer {TODOIST_API_TOKEN}",
"Content-Type": "application/json"
}
# Decode and normalize inputs
task_title = unquote(task_title)
due_datetime = unquote(due_datetime).replace("Z", "+00:00")
try:
target_dt = datetime.fromisoformat(due_datetime)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid datetime format. Use ISO format like 2025-06-13T08:00:00")
# Fetch all open tasks to find the matching one
response = requests.get("https://api.todoist.com/rest/v2/tasks", headers=headers)
if response.status_code != 200:
raise HTTPException(status_code=500, detail="Failed to fetch tasks from Todoist")
tasks = response.json()
# Match task by title and due datetime
matching_task = next((
t for t in tasks
if t["content"] == task_title and
"due" in t and
"datetime" in t["due"] and
datetime.fromisoformat(t["due"]["datetime"].replace("Z", "+00:00")) == target_dt
), None)
if not matching_task:
raise HTTPException(status_code=404, detail="Task not found")
# Mark the task as complete
task_id = matching_task["id"]
complete_url = f"https://api.todoist.com/rest/v2/tasks/{task_id}/close"
complete_response = requests.post(complete_url, headers=headers)
if complete_response.status_code in [200, 204]:
return {"message": "Task marked as complete", "task_id": task_id}
else:
raise HTTPException(
status_code=500,
detail={
"error": "Failed to complete task in Todoist",
"details": complete_response.json()
}
)
@app.post("/todo/{task_title}/{due_datetime}/create")
def create_todoist_task(task_title: str, due_datetime: str):
headers = {
"Authorization": f"Bearer {TODOIST_API_TOKEN}",
"Content-Type": "application/json"
}
# Decode the task title in case it has URL-encoded characters
task_title = unquote(task_title)
due_datetime = unquote(due_datetime)
due_datetime = due_datetime.replace("Z", "+00:00")
# Validate the datetime format
try:
dt = datetime.fromisoformat(due_datetime)
except ValueError:
return {"error": "Invalid datetime format. Use ISO format like 2025-06-13T08:00:00"}
payload = {
"content": task_title,
"due_datetime": due_datetime
}
response = requests.post(
"https://api.todoist.com/rest/v2/tasks",
headers=headers,
json=payload
)
if response.status_code == 200 or response.status_code == 204:
return {"message": "Task created", "task": payload}
else:
return {
"error": "Failed to create task in Todoist",
"details": response.json()
}
@app.get("/todo/today")
def get_tasks_today():
tasks = cache.get("daily_tasks_data")
if tasks:
return {"source": "cache", "data": tasks}
else:
return {"error": "No tasks available yet"}
def get_due_datetime(task):
try:
dt_str = task.get('due', {}).get('datetime')
if dt_str:
dt = parser.isoparse(dt_str)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
except Exception:
pass
return datetime.max.replace(tzinfo=timezone.utc)
@app.get("/todo/all")
def get_todoist_tasks(project_id=None, filter_query=None):
"""
Fetches tasks from Todoist.
- project_id: optional ID of a specific project.
- filter_query: Todoist filter string, e.g. "today | overdue".
"""
headers = {
"Authorization": f"Bearer {TODOIST_API_TOKEN}"
}
params = {}
if project_id:
params['project_id'] = project_id
if filter_query:
params['filter'] = filter_query
response = requests.get("https://api.todoist.com/rest/v2/tasks", headers=headers, params=params)
data = response.json()
data.sort(key=get_due_datetime)
if response.status_code == 200:
return {"source": "todoist", "data": data}
else:
return {"error": "Something went wrong trying to contact the Todoist API"}
@app.get("/surprise/daily")
def get_daily_surprise():
surprise = cache.get("daily_surprise_data")
if surprise:
return {"source": "cache", "data": surprise}
else:
return {"error": "No surprise available yet"}
@app.get("/meme/daily")
def get_daily_meme():
meme = cache.get("daily_meme_data")
if meme:
return {"source": "cache", "data": meme}
else:
return {"source": "none", "data": None}
@app.get("/weather/daily")
def get_daily_forecast():
forecast = cache.get("daily_forecast_data")
if forecast:
return {"source": "cache", "data": forecast}
else:
return {"source": "none", "data": None}
@app.get("/weather/current")
def get_current_weather():
current_data = cache.get("current_weather_data")
if current_data:
return {"source": "cache", "data": current_data}
else:
return {"source": "none", "data": None}
@app.get("/admin/jobs")
def list_jobs():
return get_jobs_info()
@app.post("/admin/jobs/{job_id}/run")
def run_job_now(job_id: str):
job = scheduler.get_job(job_id)
if not job:
raise HTTPException(status_code=404, detail="Job not found")
fn = job_functions.get(job_id)
if not fn:
raise HTTPException(status_code=400, detail="No callable associated with this job")
try:
fn() # Directly call the job function
return {"status": "triggered"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.on_event("startup")
async def on_startup():
start_scheduler()
@app.on_event("shutdown")
async def on_shutdown():
scheduler.shutdown(wait=False)
# Hardcoded run block
if __name__ == "__main__":
uvicorn.run("main:app", host="0.0.0.0", port=8000, ssl_keyfile="key.pem", ssl_certfile="cert.pem", reload=True)

5
private_key.pem Executable file
View File

@@ -0,0 +1,5 @@
-----BEGIN PRIVATE KEY-----
MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgmkI00EjnLa7dGWzm
FOVUTxljJH93jixCbPmrzSFVMRChRANCAATX+89NHq40V95FAXxmSvkxNU8+mcqW
9SOGg1WZGljtddd8Vfy+oQzG3vxL7/K7lIrvhPPMT4UTSBC+Gso6nC+a
-----END PRIVATE KEY-----

4
public_key.pem Executable file
View File

@@ -0,0 +1,4 @@
-----BEGIN PUBLIC KEY-----
MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAE1/vPTR6uNFfeRQF8Zkr5MTVPPpnK
lvUjhoNVmRpY7XXXfFX8vqEMxt78S+/yu5SK74TzzE+FE0gQvhrKOpwvmg==
-----END PUBLIC KEY-----

Binary file not shown.

Binary file not shown.

Binary file not shown.

3
scheduler/cache.py Executable file
View File

@@ -0,0 +1,3 @@
from diskcache import Cache
cache = Cache(".cache") # Creates a folder named 'cache'

570
scheduler/jobs.py Executable file
View File

@@ -0,0 +1,570 @@
from datetime import datetime, timezone
from dateutil import parser
from pathlib import Path
from .cache import cache
from openai import OpenAI
import os
import requests
import random
import json
OPENAI_API_KEY = "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"
TODOIST_API_TOKEN = "c2233236d19d56128c89ed6b0a9d10a9e7b287f1"
ACCUWEATHER_API_KEY = "YHeMcr9Aa96Goer8CANIB2E6QIbr5Dp0"
LOCATION_KEY = "251518"
# Setup OpenAI client
os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY
client = OpenAI()
def update_quick_insight():
job_id = "daily_quick_insight"
try:
cache[job_id] = {"status": "running", "started_at": datetime.now().isoformat()}
# Grab cached news
news_items = cache.get("top_news_data", [])
if not news_items:
raise ValueError("No news data available in cache")
# Prepare top 10 headlines
titles = [item["title"] for item in news_items if "title" in item]
titles_text = "\n".join(f"- {title}" for title in titles)
messages = [
{
"role": "system",
"content": (
"You are a smart assistant that reads all today's headlines and generates one sharp, short insight. Focus on current trends, tech, business, or social issues. Keep it under 40 words."
)
},
{
"role": "user",
"content": f"Here are today's headlines:\n{titles_text}\n\nGive me one smart, timely insight."
}
]
# Request insight from ChatGPT
response = client.chat.completions.create(
model="gpt-4",
messages=messages,
temperature=0.7
)
insight = response.choices[0].message.content.strip()
cache.set("daily_quick_insight_data", insight)
cache[job_id] = {
"status": "completed",
"last_run": datetime.now().isoformat(),
"data": insight,
}
except Exception as e:
cache[job_id] = {
"status": "failed",
"last_run": datetime.now().isoformat(),
"error": str(e),
}
def get_relevant_news_titles():
job_id = "select_relevant_news"
try:
cache[job_id] = {"status": "running", "started_at": datetime.now().isoformat()}
articles = cache.get("top_news_data", [])
if not articles:
raise ValueError("No articles found in cache")
titles = [article.get("title", "") for article in articles if article.get("title")]
if not titles:
raise ValueError("No valid titles extracted from articles")
prompt = (
"Here are today's news headlines:\n\n"
+ "\n".join(f"- {t}" for t in titles)
+ "\n\nBased on my interests (e.g., AI, technology, programming, games, movies, entertainment), "
"please pick the 5 most relevant headlines and respond with ONLY a JSON array of strings. "
"Example format: [\"Title 1\", \"Title 2\", ...]"
)
response = client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": "You are a helpful assistant that filters relevant news."},
{"role": "user", "content": prompt}
],
temperature=0.7
)
raw_output = response.choices[0].message.content.strip()
# Try to parse the response as JSON
try:
selected_titles = json.loads(raw_output)
if not isinstance(selected_titles, list):
raise ValueError("Parsed output is not a list.")
except Exception as parse_err:
raise ValueError(f"Failed to parse response as JSON: {parse_err}\nResponse: {raw_output}")
cache.set("select_relevant_news_data", selected_titles)
cache[job_id] = {
"status": "completed",
"last_run": datetime.now().isoformat(),
"data": selected_titles
}
except Exception as e:
cache[job_id] = {
"status": "failed",
"last_run": datetime.now().isoformat(),
"error": str(e)
}
def update_news():
job_id = "top_news"
try:
cache[job_id] = {"status": "running", "started_at": datetime.now().isoformat()}
today = datetime.now().date().isoformat()
response = requests.get(
"https://newsapi.org/v2/everything",
params={
"apiKey": "55678d36d7bd45ea849943ba88dcc899",
"language": "en",
"sortBy": "publishedAt",
"pageSize": 100,
"q": "*" # Using a dash to match all articles (NewsAPI requires a `q`)
}
)
if response.status_code != 200:
raise Exception(f"NewsAPI error: {response.status_code} - {response.text}")
data = response.json()
articles = data.get("articles", [])
if not articles:
raise ValueError("No news articles found for today")
cache.set("top_news_data", articles)
cache[job_id] = {
"status": "completed",
"last_run": datetime.now().isoformat(),
"data": articles
}
except Exception as e:
cache[job_id] = {
"status": "failed",
"last_run": datetime.now().isoformat(),
"error": str(e),
}
def generate_tts(text, filename):
# Ensure /audio directory exists
output_dir = "audio"
os.makedirs(output_dir, exist_ok=True)
# Full path to output file
output_path = os.path.join(output_dir, f"{filename}.wav")
url = "http://192.168.70.5:8880/v1/audio/speech"
headers = {
"accept": "application/json",
"x-raw-response": "test",
"Content-Type": "application/json",
}
payload = {
"model": "kokoro",
"input": text,
"voice": "af_heart",
"response_format": "wav",
"download_format": "wav",
"speed": 1,
"return_download_link": True
}
r = requests.post(url, headers=headers, json=payload)
if r.status_code == 200:
with open(output_path, "wb") as f:
f.write(r.content)
print(f"TTS audio saved to {output_path}")
return output_path
else:
raise Exception(f"Failed to generate TTS. Status code: {r.status_code}")
def update_morning_briefing_transcript():
job_id = "morning_briefing_transcript"
try:
# Mark job as running
cache[job_id] = {"status": "running", "started_at": datetime.now().isoformat()}
# Load all required data from cache
tasks = cache.get("daily_tasks_data", [])
forecast = cache.get("daily_forecast_data", {})
dressing_advice = cache.get("daily_dressing_advice_data", "")
if not tasks or not forecast or not dressing_advice:
raise ValueError("Missing required data in cache")
# Extract forecast details
date_str = forecast.get("Date", "")
date_obj = datetime.fromisoformat(date_str)
date_formatted = date_obj.strftime("%A, %B %d, %Y")
temp_min = forecast["Temperature"]["Minimum"]["Value"]
temp_max = forecast["Temperature"]["Maximum"]["Value"]
day_phrase = forecast["Day"]["IconPhrase"]
night_phrase = forecast["Night"]["IconPhrase"]
# Build input task summary
task_lines = []
for task in tasks:
due_time = task.get("due", {}).get("datetime")
task_time = ""
if due_time:
try:
dt_obj = datetime.fromisoformat(due_time.replace("Z", "+00:00"))
task_time = dt_obj.strftime("%H:%M")
task_lines.append(f"- At {task_time}, {task['content']}.")
except Exception:
task_lines.append(f"- {task['content']} (time format error).")
else:
task_lines.append(f"- {task['content']} (no specific time).")
tasks_summary = "\n".join(task_lines)
# Construct the GPT prompt
prompt = (
f"Today is {date_formatted}.\n\n"
f"Here are the tasks for today:\n{tasks_summary}\n\n"
f"The weather today will be {day_phrase} during the day and {night_phrase} at night. "
f"Temperatures range from {temp_min}°C to {temp_max}°C.\n\n"
f"Clothing advice: {dressing_advice}\n\n"
f"Write a friendly and concise morning briefing script in natural spoken English. "
f"Start with a brief greeting and mention the date. Then summarize the tasks, the weather, and the clothing advice. "
f"Make the tone encouraging and warm, as if you're helping someone start their day."
)
# Send to GPT
chat_response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "You are a helpful assistant that creates a morning briefing."},
{"role": "system", "content": "Your name is Eira and the users name is Collin. Start your briefing off with a variation on 'Hey Eira here'."},
{
"role": "system",
"content": (
"You are a helpful assistant that writes a spoken transcript for a daily morning briefing. "
"The transcript must be suitable for text-to-speech. Use complete sentences and natural language. "
"Do not use any special characters or markdown. Avoid line breaks or paragraph breaks—write as a single continuous paragraph."
)
},
{"role": "user", "content": prompt}
]
)
transcript = chat_response.choices[0].message.content.strip()
# Generate TTS Audio File
success = generate_tts(transcript, "morning_briefing")
if not success:
raise RuntimeError("TTS audio generation failed.")
# Store transcript in cache
cache.set("morning_briefing_transcript_data", transcript)
# Mark job as completed
cache[job_id] = {
"status": "completed",
"last_run": datetime.now().isoformat(),
"data": transcript,
}
except Exception as e:
# Mark job as failed
cache[job_id] = {
"status": "failed",
"last_run": datetime.now().isoformat(),
"error": str(e),
}
def get_due_datetime(task):
try:
dt_str = task.get('due', {}).get('datetime')
if dt_str:
dt = parser.isoparse(dt_str)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
except Exception:
pass
return datetime.max.replace(tzinfo=timezone.utc)
def update_daily_tasks(project_id=None, filter_query="today"):
job_id = "daily_tasks"
try:
# Mark job as running
cache[job_id] = {"status": "running", "started_at": datetime.now().isoformat()}
headers = {
"Authorization": f"Bearer {TODOIST_API_TOKEN}"
}
params = {}
if project_id:
params['project_id'] = project_id
if filter_query:
params['filter'] = filter_query
# Fetch from Todoist API
response = requests.get("https://api.todoist.com/rest/v2/tasks", headers=headers, params=params)
if response.status_code != 200:
raise Exception(f"Todoist API error: {response.status_code} - {response.text}")
data = response.json()
# Sort tasks by due datetime (handle tasks without due date by putting them last)
data.sort(key=get_due_datetime)
# Cache the forecast itself (separately from job status)
cache.set("daily_tasks_data", data)
# Mark job as completed
cache[job_id] = {
"status": "completed",
"last_run": datetime.now().isoformat(),
"data": data,
}
except Exception as e:
# Mark job as failed
cache[job_id] = {
"status": "failed",
"last_run": datetime.now().isoformat(),
"error": str(e),
}
def update_weather():
job_id = "daily_weather"
try:
cache[job_id] = {"status": "running", "started_at": datetime.now().isoformat()}
response = requests.get(
f"http://dataservice.accuweather.com/forecasts/v1/daily/5day/{LOCATION_KEY}",
params={"apikey": ACCUWEATHER_API_KEY, "metric": "true"},
)
if response.status_code != 200:
raise Exception(f"AccuWeather API error: {response.status_code} - {response.text}")
data = response.json()
today_str = datetime.now().date().isoformat()
daily_forecasts = data.get("DailyForecasts", [])
today_forecast = next(
(f for f in daily_forecasts if f.get("Date", "").startswith(today_str)),
None
)
if not today_forecast:
raise ValueError(f"No forecast found for today ({today_str})")
cache.set("daily_forecast_data", today_forecast)
cache[job_id] = {
"status": "completed",
"last_run": datetime.now().isoformat(),
"data": today_forecast,
}
except Exception as e:
cache[job_id] = {
"status": "failed",
"last_run": datetime.now().isoformat(),
"error": str(e),
}
def update_current_weather():
job_id = "current_weather"
try:
# Mark job as running
cache[job_id] = {"status": "running", "started_at": datetime.now().isoformat()}
response = requests.get(
f"http://dataservice.accuweather.com/currentconditions/v1/{LOCATION_KEY}",
params={"apikey": ACCUWEATHER_API_KEY, "details": "true"},
)
if response.status_code != 200:
raise Exception(f"AccuWeather API error: {response.status_code} - {response.text}")
data = response.json()
if isinstance(data, list):
data = data[0] # AccuWeather returns a list
cache.set("current_weather_data", data)
# Mark job as completed
cache[job_id] = {
"status": "completed",
"last_run": datetime.now().isoformat(),
"data": data,
}
except Exception as e:
# Mark job as failed
cache[job_id] = {
"status": "failed",
"last_run": datetime.now().isoformat(),
"error": str(e),
}
def update_dressing_advice():
job_id = "daily_dressing_advice"
try:
# Mark job as running
cache[job_id] = {"status": "running", "started_at": datetime.now().isoformat()}
# Load cached forecast data
forecast = cache.get("daily_forecast_data")
if not forecast:
raise ValueError("No forecast data found in cache.")
# Extract relevant weather info
temp_min = forecast.get("Temperature", {}).get("Minimum", {}).get("Value")
temp_max = forecast.get("Temperature", {}).get("Maximum", {}).get("Value")
phrase_day = forecast.get("Day", {}).get("IconPhrase", "")
phrase_night = forecast.get("Night", {}).get("IconPhrase", "")
date_str = forecast.get("Date", "")
# Build prompt for GPT
prompt = (
f"Today's weather forecast for {date_str}:\n"
f"- Minimum Temperature: {temp_min}°C\n"
f"- Maximum Temperature: {temp_max}°C\n"
f"- Daytime: {phrase_day}\n"
f"- Nighttime: {phrase_night}\n\n"
f"Based on this forecast, what clothing should someone wear today? Provide practical and sensible advice."
)
# Send prompt to OpenAI
chat_response = client.chat.completions.create(
model="gpt-4.1", # or "gpt-4o" if available
messages=[
{"role": "system", "content": "You are a helpful assistant that gives dressing advice based on weather."},
{"role": "system", "content": "Respond with one paragraph of readable text. No markup or special characters please."},
{"role": "system", "content": "Don't include actual weather data in your advice."},
{"role": "user", "content": prompt}
]
)
advice = chat_response.choices[0].message.content.strip()
# Cache the advice
cache.set("daily_dressing_advice_data", advice)
# Mark job as completed
cache[job_id] = {
"status": "completed",
"last_run": datetime.now().isoformat(),
"data": advice,
}
except Exception as e:
# Mark job as failed
cache[job_id] = {
"status": "failed",
"last_run": datetime.now().isoformat(),
"error": str(e),
}
except Exception as e:
# Mark job as failed
cache[job_id] = {
"status": "failed",
"last_run": datetime.now().isoformat(),
"error": str(e),
}
def update_daily_surprise():
job_id = "daily_surprise"
try:
cache[job_id] = {"status": "running", "started_at": datetime.now().isoformat()}
# Adjusted path to project root /data/surprises.json
file_path = Path(__file__).parent.parent / "data" / "surprises.json"
with open(file_path, "r", encoding="utf-8") as f:
surprises_data = json.load(f)
surprises = surprises_data.get("surprises", [])
if not surprises:
raise Exception("No surprises found in the JSON file.")
selected = random.choice(surprises)
cache.set("daily_surprise_data", selected)
cache[job_id] = {
"status": "completed",
"last_run": datetime.now().isoformat(),
"data": selected,
}
except Exception as e:
cache[job_id] = {
"status": "failed",
"last_run": datetime.now().isoformat(),
"error": str(e),
}
def refresh_meme():
job_id = "daily_meme"
try:
cache[job_id] = {
"status": "running",
"started_at": datetime.now().isoformat(),
}
headers = {"User-Agent": "EiraAI/1.0"}
response = requests.get("https://www.reddit.com/r/dankmemes/top.json?limit=20&t=day", headers=headers)
if response.status_code != 200:
raise Exception(f"Reddit API error: {response.status_code} - {response.text}")
memes = response.json()["data"]["children"]
meme = random.choice(memes)["data"]
meme_data = {
"title": meme["title"],
"image": meme["url"],
"permalink": f"https://reddit.com{meme['permalink']}"
}
cache.set("daily_meme_data", meme_data)
cache[job_id] = {
"status": "completed",
"last_run": datetime.now().isoformat(),
"data": meme_data,
}
except Exception as e:
cache[job_id] = {
"status": "failed",
"last_run": datetime.now().isoformat(),
"error": str(e),
}

144
scheduler/scheduler.py Executable file
View File

@@ -0,0 +1,144 @@
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from .jobs import update_daily_tasks, update_weather, refresh_meme, update_current_weather, update_daily_surprise, update_dressing_advice, update_morning_briefing_transcript, update_news, get_relevant_news_titles, update_quick_insight
import asyncio
import atexit
from datetime import datetime
from .cache import cache
scheduler = AsyncIOScheduler()
def get_jobs_info():
jobs_info = []
for job in scheduler.get_jobs():
job_data = cache.get(job.id, {})
jobs_info.append({
"id": job.id,
"status": job_data.get("status", "unknown"),
"last_run": job_data.get("last_run"),
"next_run": job.next_run_time.isoformat() if job.next_run_time else None,
"error": job_data.get("error"),
"data": job_data.get("data"),
})
return jobs_info
def my_job():
job_id = "test_job"
try:
# Mark job as running
cache[job_id] = {"status": "running", "started_at": datetime.now().isoformat()}
# Simulate work
print(f"Job ran at {datetime.now().isoformat()}")
# Store success result
cache[job_id] = {
"status": "completed",
"last_run": datetime.now().isoformat(),
"data": {"note": "Test job executed successfully."}
}
except Exception as e:
# Store failure result
cache[job_id] = {
"status": "failed",
"last_run": datetime.now().isoformat(),
"error": str(e)
}
def start_scheduler():
scheduler.configure(
jobstores={
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
},
job_defaults={
'coalesce': False,
'misfire_grace_time': 300 # 5 min grace period for missed jobs
}
)
scheduler.start()
# Only add jobs if they don't already exist
if not scheduler.get_job("daily_quick_insight"):
scheduler.add_job(
update_quick_insight,
trigger=CronTrigger(hour=11, minute=5),
id="daily_quick_insight",
)
if not scheduler.get_job("select_relevant_news"):
scheduler.add_job(
get_relevant_news_titles,
trigger=CronTrigger(hour=11, minute=5),
id="select_relevant_news",
)
if not scheduler.get_job("top_news"):
scheduler.add_job(
update_news,
trigger=CronTrigger(hour=11, minute=0),
id="top_news",
)
if not scheduler.get_job("morning_briefing_transcript"):
scheduler.add_job(
update_morning_briefing_transcript,
trigger=CronTrigger(hour=6, minute=10),
id="morning_briefing_transcript",
)
if not scheduler.get_job("daily_tasks"):
scheduler.add_job(
update_daily_tasks,
trigger=CronTrigger(hour=6, minute=0),
id="daily_tasks",
)
if not scheduler.get_job("daily_weather"):
scheduler.add_job(
update_weather,
trigger=CronTrigger(hour=6, minute=0),
id="daily_weather",
)
if not scheduler.get_job("current_weather"):
scheduler.add_job(
update_current_weather,
trigger=IntervalTrigger(hours=1),
id="current_weather",
)
if not scheduler.get_job("daily_dressing_advice"):
scheduler.add_job(
update_dressing_advice,
trigger=CronTrigger(hour=6, minute=5),
id="daily_dressing_advice",
)
if not scheduler.get_job("daily_surprise"):
scheduler.add_job(
update_daily_surprise,
trigger=CronTrigger(hour=6, minute=0),
id="daily_surprise",
)
if not scheduler.get_job("daily_meme"):
scheduler.add_job(
refresh_meme,
trigger=CronTrigger(hour=6, minute=0),
id="daily_meme",
)
if not scheduler.get_job("test_job"):
scheduler.add_job(
my_job,
trigger=IntervalTrigger(seconds=30),
id="test_job",
)
atexit.register(lambda: scheduler.shutdown(wait=False))

10
test.py Executable file
View File

@@ -0,0 +1,10 @@
from cryptography.hazmat.primitives import serialization
with open("vapid_private.pem", "rb") as f:
key_data = f.read()
try:
private_key = serialization.load_pem_private_key(key_data, password=None)
print("PEM loaded successfully!")
except Exception as e:
print("Failed to load PEM:", e)