571 lines
19 KiB
Python
Executable File
571 lines
19 KiB
Python
Executable File
from datetime import datetime, timezone
|
|
from dateutil import parser
|
|
from pathlib import Path
|
|
from .cache import cache
|
|
from openai import OpenAI
|
|
|
|
import os
|
|
import requests
|
|
import random
|
|
import json
|
|
|
|
OPENAI_API_KEY = "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"
|
|
|
|
TODOIST_API_TOKEN = "c2233236d19d56128c89ed6b0a9d10a9e7b287f1"
|
|
|
|
ACCUWEATHER_API_KEY = "YHeMcr9Aa96Goer8CANIB2E6QIbr5Dp0"
|
|
LOCATION_KEY = "251518"
|
|
|
|
# Setup OpenAI client
|
|
os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY
|
|
client = OpenAI()
|
|
|
|
def update_quick_insight():
|
|
job_id = "daily_quick_insight"
|
|
try:
|
|
cache[job_id] = {"status": "running", "started_at": datetime.now().isoformat()}
|
|
|
|
# Grab cached news
|
|
news_items = cache.get("top_news_data", [])
|
|
if not news_items:
|
|
raise ValueError("No news data available in cache")
|
|
|
|
# Prepare top 10 headlines
|
|
titles = [item["title"] for item in news_items if "title" in item]
|
|
titles_text = "\n".join(f"- {title}" for title in titles)
|
|
|
|
messages = [
|
|
{
|
|
"role": "system",
|
|
"content": (
|
|
"You are a smart assistant that reads all today's headlines and generates one sharp, short insight. Focus on current trends, tech, business, or social issues. Keep it under 40 words."
|
|
)
|
|
},
|
|
{
|
|
"role": "user",
|
|
"content": f"Here are today's headlines:\n{titles_text}\n\nGive me one smart, timely insight."
|
|
}
|
|
]
|
|
|
|
# Request insight from ChatGPT
|
|
response = client.chat.completions.create(
|
|
model="gpt-4",
|
|
messages=messages,
|
|
temperature=0.7
|
|
)
|
|
|
|
insight = response.choices[0].message.content.strip()
|
|
cache.set("daily_quick_insight_data", insight)
|
|
|
|
cache[job_id] = {
|
|
"status": "completed",
|
|
"last_run": datetime.now().isoformat(),
|
|
"data": insight,
|
|
}
|
|
|
|
except Exception as e:
|
|
cache[job_id] = {
|
|
"status": "failed",
|
|
"last_run": datetime.now().isoformat(),
|
|
"error": str(e),
|
|
}
|
|
|
|
def get_relevant_news_titles():
|
|
job_id = "select_relevant_news"
|
|
try:
|
|
cache[job_id] = {"status": "running", "started_at": datetime.now().isoformat()}
|
|
|
|
articles = cache.get("top_news_data", [])
|
|
if not articles:
|
|
raise ValueError("No articles found in cache")
|
|
|
|
titles = [article.get("title", "") for article in articles if article.get("title")]
|
|
|
|
if not titles:
|
|
raise ValueError("No valid titles extracted from articles")
|
|
|
|
prompt = (
|
|
"Here are today's news headlines:\n\n"
|
|
+ "\n".join(f"- {t}" for t in titles)
|
|
+ "\n\nBased on my interests (e.g., AI, technology, programming, games, movies, entertainment), "
|
|
"please pick the 5 most relevant headlines and respond with ONLY a JSON array of strings. "
|
|
"Example format: [\"Title 1\", \"Title 2\", ...]"
|
|
)
|
|
|
|
response = client.chat.completions.create(
|
|
model="gpt-4",
|
|
messages=[
|
|
{"role": "system", "content": "You are a helpful assistant that filters relevant news."},
|
|
{"role": "user", "content": prompt}
|
|
],
|
|
temperature=0.7
|
|
)
|
|
|
|
raw_output = response.choices[0].message.content.strip()
|
|
|
|
# Try to parse the response as JSON
|
|
try:
|
|
selected_titles = json.loads(raw_output)
|
|
if not isinstance(selected_titles, list):
|
|
raise ValueError("Parsed output is not a list.")
|
|
except Exception as parse_err:
|
|
raise ValueError(f"Failed to parse response as JSON: {parse_err}\nResponse: {raw_output}")
|
|
|
|
cache.set("select_relevant_news_data", selected_titles)
|
|
|
|
cache[job_id] = {
|
|
"status": "completed",
|
|
"last_run": datetime.now().isoformat(),
|
|
"data": selected_titles
|
|
}
|
|
|
|
except Exception as e:
|
|
cache[job_id] = {
|
|
"status": "failed",
|
|
"last_run": datetime.now().isoformat(),
|
|
"error": str(e)
|
|
}
|
|
|
|
def update_news():
|
|
job_id = "top_news"
|
|
try:
|
|
cache[job_id] = {"status": "running", "started_at": datetime.now().isoformat()}
|
|
|
|
today = datetime.now().date().isoformat()
|
|
|
|
response = requests.get(
|
|
"https://newsapi.org/v2/everything",
|
|
params={
|
|
"apiKey": "55678d36d7bd45ea849943ba88dcc899",
|
|
"language": "en",
|
|
"sortBy": "publishedAt",
|
|
"pageSize": 100,
|
|
"q": "*" # Using a dash to match all articles (NewsAPI requires a `q`)
|
|
}
|
|
)
|
|
|
|
if response.status_code != 200:
|
|
raise Exception(f"NewsAPI error: {response.status_code} - {response.text}")
|
|
|
|
data = response.json()
|
|
articles = data.get("articles", [])
|
|
|
|
if not articles:
|
|
raise ValueError("No news articles found for today")
|
|
|
|
cache.set("top_news_data", articles)
|
|
|
|
cache[job_id] = {
|
|
"status": "completed",
|
|
"last_run": datetime.now().isoformat(),
|
|
"data": articles
|
|
}
|
|
|
|
except Exception as e:
|
|
cache[job_id] = {
|
|
"status": "failed",
|
|
"last_run": datetime.now().isoformat(),
|
|
"error": str(e),
|
|
}
|
|
|
|
def generate_tts(text, filename):
|
|
# Ensure /audio directory exists
|
|
output_dir = "audio"
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
# Full path to output file
|
|
output_path = os.path.join(output_dir, f"{filename}.wav")
|
|
|
|
url = "http://192.168.70.5:8880/v1/audio/speech"
|
|
headers = {
|
|
"accept": "application/json",
|
|
"x-raw-response": "test",
|
|
"Content-Type": "application/json",
|
|
}
|
|
payload = {
|
|
"model": "kokoro",
|
|
"input": text,
|
|
"voice": "af_heart",
|
|
"response_format": "wav",
|
|
"download_format": "wav",
|
|
"speed": 1,
|
|
"return_download_link": True
|
|
}
|
|
|
|
r = requests.post(url, headers=headers, json=payload)
|
|
if r.status_code == 200:
|
|
with open(output_path, "wb") as f:
|
|
f.write(r.content)
|
|
print(f"TTS audio saved to {output_path}")
|
|
return output_path
|
|
else:
|
|
raise Exception(f"Failed to generate TTS. Status code: {r.status_code}")
|
|
|
|
def update_morning_briefing_transcript():
|
|
job_id = "morning_briefing_transcript"
|
|
try:
|
|
# Mark job as running
|
|
cache[job_id] = {"status": "running", "started_at": datetime.now().isoformat()}
|
|
|
|
# Load all required data from cache
|
|
tasks = cache.get("daily_tasks_data", [])
|
|
forecast = cache.get("daily_forecast_data", {})
|
|
dressing_advice = cache.get("daily_dressing_advice_data", "")
|
|
|
|
if not tasks or not forecast or not dressing_advice:
|
|
raise ValueError("Missing required data in cache")
|
|
|
|
# Extract forecast details
|
|
date_str = forecast.get("Date", "")
|
|
date_obj = datetime.fromisoformat(date_str)
|
|
date_formatted = date_obj.strftime("%A, %B %d, %Y")
|
|
|
|
temp_min = forecast["Temperature"]["Minimum"]["Value"]
|
|
temp_max = forecast["Temperature"]["Maximum"]["Value"]
|
|
day_phrase = forecast["Day"]["IconPhrase"]
|
|
night_phrase = forecast["Night"]["IconPhrase"]
|
|
|
|
# Build input task summary
|
|
task_lines = []
|
|
for task in tasks:
|
|
due_time = task.get("due", {}).get("datetime")
|
|
task_time = ""
|
|
if due_time:
|
|
try:
|
|
dt_obj = datetime.fromisoformat(due_time.replace("Z", "+00:00"))
|
|
task_time = dt_obj.strftime("%H:%M")
|
|
task_lines.append(f"- At {task_time}, {task['content']}.")
|
|
except Exception:
|
|
task_lines.append(f"- {task['content']} (time format error).")
|
|
else:
|
|
task_lines.append(f"- {task['content']} (no specific time).")
|
|
tasks_summary = "\n".join(task_lines)
|
|
|
|
# Construct the GPT prompt
|
|
prompt = (
|
|
f"Today is {date_formatted}.\n\n"
|
|
f"Here are the tasks for today:\n{tasks_summary}\n\n"
|
|
f"The weather today will be {day_phrase} during the day and {night_phrase} at night. "
|
|
f"Temperatures range from {temp_min}°C to {temp_max}°C.\n\n"
|
|
f"Clothing advice: {dressing_advice}\n\n"
|
|
f"Write a friendly and concise morning briefing script in natural spoken English. "
|
|
f"Start with a brief greeting and mention the date. Then summarize the tasks, the weather, and the clothing advice. "
|
|
f"Make the tone encouraging and warm, as if you're helping someone start their day."
|
|
)
|
|
|
|
# Send to GPT
|
|
chat_response = client.chat.completions.create(
|
|
model="gpt-4o",
|
|
messages=[
|
|
{"role": "system", "content": "You are a helpful assistant that creates a morning briefing."},
|
|
{"role": "system", "content": "Your name is Eira and the users name is Collin. Start your briefing off with a variation on 'Hey Eira here'."},
|
|
{
|
|
"role": "system",
|
|
"content": (
|
|
"You are a helpful assistant that writes a spoken transcript for a daily morning briefing. "
|
|
"The transcript must be suitable for text-to-speech. Use complete sentences and natural language. "
|
|
"Do not use any special characters or markdown. Avoid line breaks or paragraph breaks—write as a single continuous paragraph."
|
|
)
|
|
},
|
|
{"role": "user", "content": prompt}
|
|
]
|
|
)
|
|
|
|
transcript = chat_response.choices[0].message.content.strip()
|
|
|
|
# Generate TTS Audio File
|
|
success = generate_tts(transcript, "morning_briefing")
|
|
if not success:
|
|
raise RuntimeError("TTS audio generation failed.")
|
|
|
|
# Store transcript in cache
|
|
cache.set("morning_briefing_transcript_data", transcript)
|
|
|
|
# Mark job as completed
|
|
cache[job_id] = {
|
|
"status": "completed",
|
|
"last_run": datetime.now().isoformat(),
|
|
"data": transcript,
|
|
}
|
|
|
|
except Exception as e:
|
|
# Mark job as failed
|
|
cache[job_id] = {
|
|
"status": "failed",
|
|
"last_run": datetime.now().isoformat(),
|
|
"error": str(e),
|
|
}
|
|
|
|
def get_due_datetime(task):
|
|
try:
|
|
dt_str = task.get('due', {}).get('datetime')
|
|
if dt_str:
|
|
dt = parser.isoparse(dt_str)
|
|
if dt.tzinfo is None:
|
|
dt = dt.replace(tzinfo=timezone.utc)
|
|
return dt
|
|
except Exception:
|
|
pass
|
|
return datetime.max.replace(tzinfo=timezone.utc)
|
|
|
|
def update_daily_tasks(project_id=None, filter_query="today"):
|
|
job_id = "daily_tasks"
|
|
try:
|
|
# Mark job as running
|
|
cache[job_id] = {"status": "running", "started_at": datetime.now().isoformat()}
|
|
|
|
|
|
headers = {
|
|
"Authorization": f"Bearer {TODOIST_API_TOKEN}"
|
|
}
|
|
|
|
params = {}
|
|
if project_id:
|
|
params['project_id'] = project_id
|
|
if filter_query:
|
|
params['filter'] = filter_query
|
|
|
|
# Fetch from Todoist API
|
|
response = requests.get("https://api.todoist.com/rest/v2/tasks", headers=headers, params=params)
|
|
|
|
if response.status_code != 200:
|
|
raise Exception(f"Todoist API error: {response.status_code} - {response.text}")
|
|
|
|
data = response.json()
|
|
|
|
|
|
# Sort tasks by due datetime (handle tasks without due date by putting them last)
|
|
data.sort(key=get_due_datetime)
|
|
|
|
# Cache the forecast itself (separately from job status)
|
|
cache.set("daily_tasks_data", data)
|
|
|
|
# Mark job as completed
|
|
cache[job_id] = {
|
|
"status": "completed",
|
|
"last_run": datetime.now().isoformat(),
|
|
"data": data,
|
|
}
|
|
|
|
except Exception as e:
|
|
# Mark job as failed
|
|
cache[job_id] = {
|
|
"status": "failed",
|
|
"last_run": datetime.now().isoformat(),
|
|
"error": str(e),
|
|
}
|
|
|
|
|
|
def update_weather():
|
|
job_id = "daily_weather"
|
|
try:
|
|
cache[job_id] = {"status": "running", "started_at": datetime.now().isoformat()}
|
|
|
|
response = requests.get(
|
|
f"http://dataservice.accuweather.com/forecasts/v1/daily/5day/{LOCATION_KEY}",
|
|
params={"apikey": ACCUWEATHER_API_KEY, "metric": "true"},
|
|
)
|
|
|
|
if response.status_code != 200:
|
|
raise Exception(f"AccuWeather API error: {response.status_code} - {response.text}")
|
|
|
|
data = response.json()
|
|
|
|
today_str = datetime.now().date().isoformat()
|
|
daily_forecasts = data.get("DailyForecasts", [])
|
|
|
|
today_forecast = next(
|
|
(f for f in daily_forecasts if f.get("Date", "").startswith(today_str)),
|
|
None
|
|
)
|
|
|
|
if not today_forecast:
|
|
raise ValueError(f"No forecast found for today ({today_str})")
|
|
|
|
cache.set("daily_forecast_data", today_forecast)
|
|
|
|
cache[job_id] = {
|
|
"status": "completed",
|
|
"last_run": datetime.now().isoformat(),
|
|
"data": today_forecast,
|
|
}
|
|
|
|
except Exception as e:
|
|
cache[job_id] = {
|
|
"status": "failed",
|
|
"last_run": datetime.now().isoformat(),
|
|
"error": str(e),
|
|
}
|
|
|
|
def update_current_weather():
|
|
job_id = "current_weather"
|
|
try:
|
|
# Mark job as running
|
|
cache[job_id] = {"status": "running", "started_at": datetime.now().isoformat()}
|
|
|
|
response = requests.get(
|
|
f"http://dataservice.accuweather.com/currentconditions/v1/{LOCATION_KEY}",
|
|
params={"apikey": ACCUWEATHER_API_KEY, "details": "true"},
|
|
)
|
|
|
|
if response.status_code != 200:
|
|
raise Exception(f"AccuWeather API error: {response.status_code} - {response.text}")
|
|
|
|
data = response.json()
|
|
if isinstance(data, list):
|
|
data = data[0] # AccuWeather returns a list
|
|
|
|
cache.set("current_weather_data", data)
|
|
|
|
# Mark job as completed
|
|
cache[job_id] = {
|
|
"status": "completed",
|
|
"last_run": datetime.now().isoformat(),
|
|
"data": data,
|
|
}
|
|
|
|
except Exception as e:
|
|
# Mark job as failed
|
|
cache[job_id] = {
|
|
"status": "failed",
|
|
"last_run": datetime.now().isoformat(),
|
|
"error": str(e),
|
|
}
|
|
|
|
def update_dressing_advice():
|
|
job_id = "daily_dressing_advice"
|
|
try:
|
|
# Mark job as running
|
|
cache[job_id] = {"status": "running", "started_at": datetime.now().isoformat()}
|
|
|
|
# Load cached forecast data
|
|
forecast = cache.get("daily_forecast_data")
|
|
if not forecast:
|
|
raise ValueError("No forecast data found in cache.")
|
|
|
|
# Extract relevant weather info
|
|
temp_min = forecast.get("Temperature", {}).get("Minimum", {}).get("Value")
|
|
temp_max = forecast.get("Temperature", {}).get("Maximum", {}).get("Value")
|
|
phrase_day = forecast.get("Day", {}).get("IconPhrase", "")
|
|
phrase_night = forecast.get("Night", {}).get("IconPhrase", "")
|
|
date_str = forecast.get("Date", "")
|
|
|
|
# Build prompt for GPT
|
|
prompt = (
|
|
f"Today's weather forecast for {date_str}:\n"
|
|
f"- Minimum Temperature: {temp_min}°C\n"
|
|
f"- Maximum Temperature: {temp_max}°C\n"
|
|
f"- Daytime: {phrase_day}\n"
|
|
f"- Nighttime: {phrase_night}\n\n"
|
|
f"Based on this forecast, what clothing should someone wear today? Provide practical and sensible advice."
|
|
)
|
|
|
|
# Send prompt to OpenAI
|
|
chat_response = client.chat.completions.create(
|
|
model="gpt-4.1", # or "gpt-4o" if available
|
|
messages=[
|
|
{"role": "system", "content": "You are a helpful assistant that gives dressing advice based on weather."},
|
|
{"role": "system", "content": "Respond with one paragraph of readable text. No markup or special characters please."},
|
|
{"role": "system", "content": "Don't include actual weather data in your advice."},
|
|
{"role": "user", "content": prompt}
|
|
]
|
|
)
|
|
|
|
advice = chat_response.choices[0].message.content.strip()
|
|
|
|
# Cache the advice
|
|
cache.set("daily_dressing_advice_data", advice)
|
|
|
|
# Mark job as completed
|
|
cache[job_id] = {
|
|
"status": "completed",
|
|
"last_run": datetime.now().isoformat(),
|
|
"data": advice,
|
|
}
|
|
|
|
except Exception as e:
|
|
# Mark job as failed
|
|
cache[job_id] = {
|
|
"status": "failed",
|
|
"last_run": datetime.now().isoformat(),
|
|
"error": str(e),
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
# Mark job as failed
|
|
cache[job_id] = {
|
|
"status": "failed",
|
|
"last_run": datetime.now().isoformat(),
|
|
"error": str(e),
|
|
}
|
|
|
|
|
|
def update_daily_surprise():
|
|
job_id = "daily_surprise"
|
|
try:
|
|
cache[job_id] = {"status": "running", "started_at": datetime.now().isoformat()}
|
|
|
|
# Adjusted path to project root /data/surprises.json
|
|
file_path = Path(__file__).parent.parent / "data" / "surprises.json"
|
|
with open(file_path, "r", encoding="utf-8") as f:
|
|
surprises_data = json.load(f)
|
|
|
|
surprises = surprises_data.get("surprises", [])
|
|
if not surprises:
|
|
raise Exception("No surprises found in the JSON file.")
|
|
|
|
selected = random.choice(surprises)
|
|
|
|
cache.set("daily_surprise_data", selected)
|
|
|
|
cache[job_id] = {
|
|
"status": "completed",
|
|
"last_run": datetime.now().isoformat(),
|
|
"data": selected,
|
|
}
|
|
|
|
except Exception as e:
|
|
cache[job_id] = {
|
|
"status": "failed",
|
|
"last_run": datetime.now().isoformat(),
|
|
"error": str(e),
|
|
}
|
|
|
|
def refresh_meme():
|
|
job_id = "daily_meme"
|
|
try:
|
|
cache[job_id] = {
|
|
"status": "running",
|
|
"started_at": datetime.now().isoformat(),
|
|
}
|
|
|
|
headers = {"User-Agent": "EiraAI/1.0"}
|
|
response = requests.get("https://www.reddit.com/r/dankmemes/top.json?limit=20&t=day", headers=headers)
|
|
|
|
if response.status_code != 200:
|
|
raise Exception(f"Reddit API error: {response.status_code} - {response.text}")
|
|
|
|
memes = response.json()["data"]["children"]
|
|
meme = random.choice(memes)["data"]
|
|
meme_data = {
|
|
"title": meme["title"],
|
|
"image": meme["url"],
|
|
"permalink": f"https://reddit.com{meme['permalink']}"
|
|
}
|
|
|
|
cache.set("daily_meme_data", meme_data)
|
|
|
|
cache[job_id] = {
|
|
"status": "completed",
|
|
"last_run": datetime.now().isoformat(),
|
|
"data": meme_data,
|
|
}
|
|
|
|
except Exception as e:
|
|
cache[job_id] = {
|
|
"status": "failed",
|
|
"last_run": datetime.now().isoformat(),
|
|
"error": str(e),
|
|
}
|