74 lines
2.6 KiB
Python
74 lines
2.6 KiB
Python
import datetime
|
|
import httpx
|
|
import socket
|
|
from telegram.ext import ConversationHandler, CommandHandler, CallbackQueryHandler
|
|
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
|
|
from telegram.constants import ParseMode
|
|
import os
|
|
|
|
|
|
FIRST = 1
|
|
from .basehandler import BaseHandler
|
|
class StatusHandler(BaseHandler):
|
|
"""Shows a short status of the program."""
|
|
|
|
def __init__(self, entry_string, models):
|
|
self.start_time = datetime.datetime.now()
|
|
self.entry_string = entry_string
|
|
self.models = models
|
|
self.handler = ConversationHandler(
|
|
entry_points=[CommandHandler(self.entry_string, self.entry_point)],
|
|
states={
|
|
FIRST: [
|
|
CallbackQueryHandler(self.send_log, pattern="^full$"),
|
|
]
|
|
},
|
|
fallbacks=[CommandHandler('status', self.entry_point)],
|
|
)
|
|
|
|
|
|
async def entry_point(self, update, context) -> None:
|
|
await super().entry_point(update, context)
|
|
keyboard = [
|
|
[
|
|
InlineKeyboardButton("And the log?", callback_data="full"),
|
|
]
|
|
]
|
|
reply_markup = InlineKeyboardMarkup(keyboard)
|
|
|
|
delta = str(datetime.datetime.now() - self.start_time)
|
|
message = "BeebBop, this is Norbit\n"
|
|
|
|
try:
|
|
ip = httpx.get('https://api.ipify.org').text
|
|
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
|
|
s.connect(('8.8.8.8', 80))
|
|
(addr, port) = s.getsockname()
|
|
local_ips = addr
|
|
except:
|
|
ip = "not fetchable"
|
|
local_ips = "not fetchable"
|
|
|
|
message += "Status: Running 🟢\n"
|
|
message += f"Version: `{os.getenv('BOT_VERSION', 'dev')}`\n"
|
|
message += f"Uptime: `{delta[:delta.rfind('.')]}`\n"
|
|
message += f"IP \(public\): `{ip}`\n"
|
|
message += f"IP \(private\): `{local_ips}`\n"
|
|
message += f"Chat ID: `{update.effective_chat.id}`\n"
|
|
|
|
if update.message:
|
|
await update.message.reply_text(message, reply_markup=reply_markup, parse_mode=ParseMode.MARKDOWN_V2)
|
|
else:
|
|
await update._effective_chat.send_message(message, reply_markup=reply_markup, parse_mode=ParseMode.MARKDOWN_V2)
|
|
|
|
return FIRST
|
|
|
|
|
|
async def send_log(self, update, context) -> None:
|
|
query = update.callback_query
|
|
wanted = query.data.replace("status-","")
|
|
await query.answer()
|
|
await query.edit_message_text("Here you go: https://portainer.kluster.moll.re/#!/1/kubernetes/applications/journal/journal-botte")
|
|
return ConversationHandler.END
|
|
|