139 lines
4.2 KiB
Python
139 lines
4.2 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import requests
|
|
from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update
|
|
from telegram.ext import (
|
|
ApplicationBuilder,
|
|
CommandHandler,
|
|
CallbackQueryHandler,
|
|
ContextTypes,
|
|
)
|
|
|
|
from config import BOT_TOKEN, PORTAINER_API_KEY
|
|
|
|
PORTAINER_URL = "https://192.168.1.17:9443/api"
|
|
STACKS_PER_PAGE = 5
|
|
user_page = 0 # Tracks current page per user
|
|
|
|
|
|
def get_stacks():
|
|
headers = {"X-API-Key": f"{PORTAINER_API_KEY}"}
|
|
response = requests.get(
|
|
f"{PORTAINER_URL}/stacks?filters=" + '{"EndpointID":2}',
|
|
headers=headers,
|
|
verify=False,
|
|
)
|
|
|
|
return response.json() if response.ok else []
|
|
|
|
|
|
def get_stack_status(stack_id):
|
|
"""Fetch the stack status based on its ID."""
|
|
headers = {"X-API-Key": f"{PORTAINER_API_KEY}"}
|
|
response = requests.get(
|
|
f"{PORTAINER_URL}/stacks/{stack_id}?endpointId=2", headers=headers, verify=False
|
|
)
|
|
if response.ok:
|
|
stack_info = response.json()
|
|
return stack_info.get(
|
|
"Status", ""
|
|
) # Assuming the status is returned in lowercase
|
|
return None
|
|
|
|
|
|
async def start_portainer(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
global user_page
|
|
|
|
user_page = 0
|
|
await send_stack_list(update, context)
|
|
|
|
|
|
def build_stack_markup(stacks, page):
|
|
start_idx = page * STACKS_PER_PAGE
|
|
end_idx = start_idx + STACKS_PER_PAGE
|
|
sliced = stacks[start_idx:end_idx]
|
|
|
|
keyboard = []
|
|
for stack in sliced:
|
|
stack_name = stack["Name"]
|
|
stack_id = stack["Id"]
|
|
status = get_stack_status(stack_id)
|
|
|
|
# Add smiley based on stack status
|
|
if status == 1:
|
|
smiley = "🟢" # Stack is up
|
|
else:
|
|
smiley = "🔴" # Stack is down
|
|
|
|
# Button labels: Stack name with status smiley
|
|
row = [
|
|
InlineKeyboardButton(
|
|
f"{smiley} {stack_name}",
|
|
callback_data=f"portainer_select_{stack_id}",
|
|
),
|
|
InlineKeyboardButton(
|
|
f"▶️ Start", callback_data=f"portainer_start_{stack_id}"
|
|
),
|
|
InlineKeyboardButton(f"⏹ Stop", callback_data=f"portainer_stop_{stack_id}"),
|
|
]
|
|
keyboard.append(row)
|
|
|
|
nav_row = []
|
|
if page > 0:
|
|
nav_row.append(InlineKeyboardButton("⬅️ Prev", callback_data="portainer_prev"))
|
|
if end_idx < len(stacks):
|
|
nav_row.append(InlineKeyboardButton("➡️ Next", callback_data="portainer_next"))
|
|
keyboard.append(nav_row)
|
|
|
|
keyboard.append([InlineKeyboardButton("🔙 Main Menu", callback_data="menu_main")])
|
|
return InlineKeyboardMarkup(keyboard)
|
|
|
|
|
|
async def send_stack_list(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
global user_page
|
|
|
|
page = user_page
|
|
stacks = get_stacks()
|
|
|
|
if not stacks:
|
|
await update.message.reply_text(
|
|
"No stacks found or unable to connect to Portainer."
|
|
)
|
|
return
|
|
|
|
if update.callback_query:
|
|
await update.callback_query.answer()
|
|
await update.callback_query.edit_message_text(
|
|
text=f"Select a stack (page {page + 1}):",
|
|
reply_markup=build_stack_markup(stacks, page),
|
|
)
|
|
else:
|
|
await update.message.reply_text(
|
|
text=f"Select a stack (page {page + 1}):",
|
|
reply_markup=build_stack_markup(stacks, page),
|
|
)
|
|
|
|
|
|
async def button_handler(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
global user_page
|
|
|
|
query = update.callback_query
|
|
data = query.data
|
|
|
|
if data == "portainer_next":
|
|
user_page += 1
|
|
elif data == "portainer_prev":
|
|
user_page = max(0, user_page - 1)
|
|
elif data.startswith("portainer_start_") or data.startswith("portainer_stop_"):
|
|
_, action, stack_id = data.split("_")
|
|
await handle_stack_action(query, action, stack_id)
|
|
await send_stack_list(update, context)
|
|
|
|
|
|
async def handle_stack_action(query, action, stack_id):
|
|
endpoint = f"{PORTAINER_URL}/stacks/{stack_id}/{action}?endpointId=2"
|
|
headers = {"X-API-Key": f"{PORTAINER_API_KEY}"}
|
|
response = requests.post(endpoint, headers=headers, verify=False)
|
|
msg = "✅ Success!" if response.ok else f"❌ Failed: {response.text}"
|
|
await query.answer(text=msg, show_alert=True)
|