114 lines
3.7 KiB
Python
114 lines
3.7 KiB
Python
import os
|
|
import requests
|
|
from datetime import datetime
|
|
from model import SessionLocal, Subscription, Settings, Notification
|
|
import json
|
|
|
|
# Constants
|
|
NTFY_TOKEN = os.getenv("NTFY_TOKEN")
|
|
|
|
|
|
def fetch_ntfy_notifications(base_url, subscriptions):
|
|
"""Fetch notifications from the ntfy.sh server for the given subscriptions using streaming."""
|
|
headers = {
|
|
"Accept": "application/json",
|
|
"Authorization": f"Bearer {NTFY_TOKEN}" if NTFY_TOKEN else None,
|
|
}
|
|
|
|
notifications = []
|
|
for subscription in subscriptions:
|
|
print(f"Fetching notifications for {subscription.topic}")
|
|
topic = subscription.topic
|
|
last_message_id = subscription.last_message_id
|
|
since_param = "all" if last_message_id is None else last_message_id
|
|
url = f"{base_url}/{topic}/json?poll=1&since={since_param}"
|
|
response = requests.get(url, headers=headers, stream=True)
|
|
|
|
response.raise_for_status()
|
|
|
|
for line in response.iter_lines():
|
|
if line:
|
|
notification = json.loads(line)
|
|
if notification.get("event") == "message":
|
|
notifications.append(notification)
|
|
|
|
print(f"Fetched {len(notifications)} notifications")
|
|
return notifications
|
|
|
|
|
|
def save_notifications_to_db(notifications, topic_to_subscription, db):
|
|
"""Save the fetched notifications to the database and update last_message_id."""
|
|
last_message_ids = {}
|
|
for notification in notifications:
|
|
topic = notification["topic"]
|
|
last_message_ids[topic] = notification["id"]
|
|
subscription_id = topic_to_subscription.get(notification["topic"])
|
|
if subscription_id:
|
|
new_notification = Notification(
|
|
title=notification.get("title", "No Title"),
|
|
message=notification.get("message", ""),
|
|
priority=notification.get("priority", 3),
|
|
created_at=datetime.fromtimestamp(notification["time"]),
|
|
subscription_id=subscription_id,
|
|
)
|
|
db.add(new_notification)
|
|
for topic, message_id in last_message_ids.items():
|
|
subscription_id = topic_to_subscription.get(topic)
|
|
if subscription_id:
|
|
subscription = (
|
|
db.query(Subscription)
|
|
.filter(Subscription.id == subscription_id)
|
|
.first()
|
|
)
|
|
if subscription:
|
|
subscription.last_message_id = message_id
|
|
db.commit()
|
|
|
|
|
|
def process_user_notifications(user_settings, db):
|
|
"""Process notifications for a specific user's subscriptions."""
|
|
ntfy_url = user_settings.ntfy_url
|
|
|
|
if not ntfy_url:
|
|
print(f"Ntfy URL not found for user ID {user_settings.user_id}. Skipping...")
|
|
return
|
|
|
|
# Get all subscriptions for the user
|
|
subscriptions = (
|
|
db.query(Subscription)
|
|
.filter(Subscription.user_id == user_settings.user_id)
|
|
.all()
|
|
)
|
|
topic_to_subscription = {
|
|
subscription.topic: subscription.id for subscription in subscriptions
|
|
}
|
|
|
|
# Fetch notifications from ntfy.sh
|
|
notifications = fetch_ntfy_notifications(ntfy_url, subscriptions)
|
|
|
|
# Save notifications to the database
|
|
save_notifications_to_db(notifications, topic_to_subscription, db)
|
|
|
|
|
|
def main():
|
|
"""Main function to fetch and save notifications for all users."""
|
|
db = SessionLocal()
|
|
|
|
# Get all user settings
|
|
user_settings_list = db.query(Settings).all()
|
|
|
|
if not user_settings_list:
|
|
print("No user settings found.")
|
|
return
|
|
|
|
# Process notifications for each user
|
|
for user_settings in user_settings_list:
|
|
print(f"Processing notifications for user ID {user_settings.user_id}")
|
|
process_user_notifications(user_settings, db)
|
|
|
|
db.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|