Files
Project-Monitor/backend/get_notifications.py
2025-11-01 16:05:34 +01:00

114 lines
3.7 KiB
Python

import os
import requests
from datetime import datetime
from model import SessionLocal, Subscription, Settings, Notification
import json
# Constants
NTFY_TOKEN = os.getenv("NTFY_TOKEN")
def fetch_ntfy_notifications(base_url, subscriptions):
"""Fetch notifications from the ntfy.sh server for the given subscriptions using streaming."""
headers = {
"Accept": "application/json",
"Authorization": f"Bearer {NTFY_TOKEN}" if NTFY_TOKEN else None,
}
notifications = []
for subscription in subscriptions:
print(f"Fetching notifications for {subscription.topic}")
topic = subscription.topic
last_message_id = subscription.last_message_id
since_param = "all" if last_message_id is None else last_message_id
url = f"{base_url}/{topic}/json?poll=1&since={since_param}"
response = requests.get(url, headers=headers, stream=True)
response.raise_for_status()
for line in response.iter_lines():
if line:
notification = json.loads(line)
if notification.get("event") == "message":
notifications.append(notification)
print(f"Fetched {len(notifications)} notifications")
return notifications
def save_notifications_to_db(notifications, topic_to_subscription, db):
"""Save the fetched notifications to the database and update last_message_id."""
last_message_ids = {}
for notification in notifications:
topic = notification["topic"]
last_message_ids[topic] = notification["id"]
subscription_id = topic_to_subscription.get(notification["topic"])
if subscription_id:
new_notification = Notification(
title=notification.get("title", "No Title"),
message=notification.get("message", ""),
priority=notification.get("priority", 3),
created_at=datetime.fromtimestamp(notification["time"]),
subscription_id=subscription_id,
)
db.add(new_notification)
for topic, message_id in last_message_ids.items():
subscription_id = topic_to_subscription.get(topic)
if subscription_id:
subscription = (
db.query(Subscription)
.filter(Subscription.id == subscription_id)
.first()
)
if subscription:
subscription.last_message_id = message_id
db.commit()
def process_user_notifications(user_settings, db):
"""Process notifications for a specific user's subscriptions."""
ntfy_url = user_settings.ntfy_url
if not ntfy_url:
print(f"Ntfy URL not found for user ID {user_settings.user_id}. Skipping...")
return
# Get all subscriptions for the user
subscriptions = (
db.query(Subscription)
.filter(Subscription.user_id == user_settings.user_id)
.all()
)
topic_to_subscription = {
subscription.topic: subscription.id for subscription in subscriptions
}
# Fetch notifications from ntfy.sh
notifications = fetch_ntfy_notifications(ntfy_url, subscriptions)
# Save notifications to the database
save_notifications_to_db(notifications, topic_to_subscription, db)
def main():
"""Main function to fetch and save notifications for all users."""
db = SessionLocal()
# Get all user settings
user_settings_list = db.query(Settings).all()
if not user_settings_list:
print("No user settings found.")
return
# Process notifications for each user
for user_settings in user_settings_list:
print(f"Processing notifications for user ID {user_settings.user_id}")
process_user_notifications(user_settings, db)
db.close()
if __name__ == "__main__":
main()