Add backend support for notifications
This commit is contained in:
107
backend/get_notifications.py
Normal file
107
backend/get_notifications.py
Normal file
@@ -0,0 +1,107 @@
|
||||
import os
|
||||
import requests
|
||||
from datetime import datetime
|
||||
from model import SessionLocal, Subscription, Settings, Notification
|
||||
import json
|
||||
|
||||
# Constants
|
||||
|
||||
NTFY_TOKEN = os.getenv("NTFY_TOKEN") or "tk_cdmwd6ix255g3qgo4dx3r0gakw4y3"
|
||||
|
||||
|
||||
def fetch_ntfy_notifications(base_url, subscriptions):
|
||||
"""Fetch notifications from the ntfy.sh server for the given subscriptions using streaming."""
|
||||
headers = {
|
||||
"Accept": "application/json",
|
||||
"Authorization": f"Bearer {NTFY_TOKEN}" if NTFY_TOKEN else None,
|
||||
}
|
||||
|
||||
notifications = []
|
||||
for subscription in subscriptions:
|
||||
topic = subscription.topic
|
||||
last_message_id = subscription.last_message_id
|
||||
since_param = "all" if last_message_id is None else last_message_id
|
||||
url = f"{base_url}/{topic}/json?poll=1&since={since_param}"
|
||||
response = requests.get(url, headers=headers, stream=True)
|
||||
|
||||
response.raise_for_status()
|
||||
|
||||
for line in response.iter_lines():
|
||||
if line:
|
||||
notification = json.loads(line)
|
||||
if notification.get("event") == "message":
|
||||
notifications.append(notification)
|
||||
|
||||
print(f"Fetched {len(notifications)} notifications")
|
||||
print(notifications)
|
||||
|
||||
return notifications
|
||||
|
||||
|
||||
def save_notifications_to_db(notifications, topic_to_subscription, db):
|
||||
"""Save the fetched notifications to the database and update last_message_id."""
|
||||
db = SessionLocal()
|
||||
last_message_ids = {}
|
||||
for notification in notifications:
|
||||
topic = notification["topic"]
|
||||
last_message_ids[topic] = notification["id"]
|
||||
subscription_id = topic_to_subscription.get(notification["topic"])
|
||||
if subscription_id:
|
||||
new_notification = Notification(
|
||||
title=notification.get("title", "No Title"),
|
||||
message=notification.get("message", ""),
|
||||
priority=notification.get("priority", 3),
|
||||
created_at=datetime.fromtimestamp(notification["time"]),
|
||||
subscription_id=subscription_id,
|
||||
)
|
||||
db.add(new_notification)
|
||||
for topic, message_id in last_message_ids.items():
|
||||
subscription_id = topic_to_subscription.get(topic)
|
||||
if subscription_id:
|
||||
subscription = (
|
||||
db.query(Subscription)
|
||||
.filter(Subscription.id == subscription_id)
|
||||
.first()
|
||||
)
|
||||
if subscription:
|
||||
subscription.last_message_id = message_id
|
||||
db.commit()
|
||||
db.close()
|
||||
|
||||
|
||||
def main():
|
||||
"""Main function to fetch and save notifications."""
|
||||
db = SessionLocal()
|
||||
|
||||
# Get the ntfy base URL from settings
|
||||
settings = db.query(Settings).filter(Settings.user == "default").first()
|
||||
if not settings:
|
||||
print("Default user settings not found.")
|
||||
return
|
||||
|
||||
ntfy_url = settings.ntfy_url
|
||||
|
||||
if not ntfy_url:
|
||||
print("Ntfy URL not found in settings.")
|
||||
return
|
||||
|
||||
# Get all subscribed topics
|
||||
subscriptions = db.query(Subscription).all()
|
||||
topic_to_subscription = {
|
||||
subscription.topic: subscription.id for subscription in subscriptions
|
||||
}
|
||||
topic_to_subscription = {
|
||||
subscription.topic: subscription.id for subscription in subscriptions
|
||||
}
|
||||
|
||||
db.close()
|
||||
|
||||
# Fetch notifications from ntfy.sh
|
||||
notifications = fetch_ntfy_notifications(ntfy_url, subscriptions)
|
||||
|
||||
# Save notifications to the database
|
||||
save_notifications_to_db(notifications, topic_to_subscription, db)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user