All checks were successful
		
		
	
	Build container / Build (pull_request) Successful in 43s
				
			
		
			
				
	
	
		
			222 lines
		
	
	
		
			8.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			222 lines
		
	
	
		
			8.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import datetime
 | 
						|
import os
 | 
						|
from telegram.ext import ConversationHandler, CommandHandler, MessageHandler, filters, CallbackQueryHandler
 | 
						|
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
 | 
						|
from telegram.constants import ParseMode
 | 
						|
import models
 | 
						|
 | 
						|
ENTRY_OPTIONS, CONTENT_ENTRY, DAY_RATING = range(3)
 | 
						|
BUTTON_COUNT = 5
 | 
						|
 | 
						|
 | 
						|
from .basehandler import BaseHandler
 | 
						|
 | 
						|
class JournalHandler(BaseHandler):
 | 
						|
    def __init__(self, entry_string):
 | 
						|
        self.entry_string = entry_string
 | 
						|
        self.handler = ConversationHandler(
 | 
						|
            entry_points=[CommandHandler(entry_string, self.entry_point)],
 | 
						|
            states={
 | 
						|
                ENTRY_OPTIONS: [
 | 
						|
                    CallbackQueryHandler(self.date_button, pattern=r"^\d{8}$"), # a serialized date
 | 
						|
                    CallbackQueryHandler(self.date_custom, pattern=r"^\d{1,3}$"), # a ~ small delta, symbolizing a new range to show
 | 
						|
                    CallbackQueryHandler(self.option_delete, pattern="delete"),
 | 
						|
                    MessageHandler(filters.ALL, self.date_entry),
 | 
						|
                    ],
 | 
						|
                CONTENT_ENTRY: [
 | 
						|
                    MessageHandler(filters.ALL, self.content_save),
 | 
						|
                    ],
 | 
						|
                DAY_RATING: [
 | 
						|
                    CallbackQueryHandler(self.day_rating_save),
 | 
						|
                    ],
 | 
						|
            },
 | 
						|
            fallbacks=[],
 | 
						|
        )
 | 
						|
 | 
						|
        self.current_model = None
 | 
						|
 | 
						|
 | 
						|
    async def entry_point(self, update, context):
 | 
						|
        await super().entry_point(update, context)
 | 
						|
        if models.IS_PRODUCTION and os.getenv("CHAT_ID") != str(update.message.chat_id):
 | 
						|
            await update.message.reply_text("You are not authorized to use this bot")
 | 
						|
            return ConversationHandler.END
 | 
						|
 | 
						|
        dates = [(datetime.datetime.now() - datetime.timedelta(days = i)).date() for i in range(BUTTON_COUNT + 2)][::-1]
 | 
						|
        # since there are two buttons additional buttons, we need to have two more days
 | 
						|
        names = get_names(dates)
 | 
						|
        callbacks = [d.strftime("%d%m%Y") for d in dates]
 | 
						|
 | 
						|
        options = [
 | 
						|
                [InlineKeyboardButton(n, callback_data=c)] for n,c in zip(names[::-1], callbacks[::-1])
 | 
						|
        ] + [
 | 
						|
            [
 | 
						|
                InlineKeyboardButton("<<", callback_data=BUTTON_COUNT + 2)
 | 
						|
            ],
 | 
						|
            [
 | 
						|
                InlineKeyboardButton("Delete", callback_data="delete")
 | 
						|
            ]
 | 
						|
        ]
 | 
						|
        keyboard = InlineKeyboardMarkup(options)
 | 
						|
        await update.message.reply_text("Please choose a date \(or type it in the format _DDMMYYYY_\)", reply_markup=keyboard, parse_mode=ParseMode.MARKDOWN_V2)
 | 
						|
        return ENTRY_OPTIONS
 | 
						|
 | 
						|
 | 
						|
    async def date_button(self, update, context):
 | 
						|
        query = update.callback_query
 | 
						|
        await query.answer()
 | 
						|
        date = datetime.datetime.strptime(query.data, "%d%m%Y").date()
 | 
						|
 | 
						|
        with models.db:
 | 
						|
            self.current_model, new = models.JournalEntry.get_or_create(
 | 
						|
                date = date
 | 
						|
            )
 | 
						|
        if new:
 | 
						|
            count = models.JournalEntry.select().count()
 | 
						|
            await query.edit_message_text(
 | 
						|
                text=f"Journal entry no. {count}. What happened on {self.current_model.date_pretty}?"
 | 
						|
            )
 | 
						|
        else:
 | 
						|
            await query.edit_message_text(text="An entry already exists for this date")
 | 
						|
            return ConversationHandler.END
 | 
						|
 | 
						|
        return CONTENT_ENTRY
 | 
						|
 | 
						|
 | 
						|
    async def date_custom(self, update, context):
 | 
						|
        query = update.callback_query
 | 
						|
        await query.answer()
 | 
						|
        delta = int(query.data)
 | 
						|
 | 
						|
        dates = [(datetime.datetime.now() - datetime.timedelta(days = i + delta)).date() for i in range(BUTTON_COUNT)][::-1]
 | 
						|
        names = get_names(dates)
 | 
						|
        callbacks = [d.strftime("%d%m%Y") for d in dates]
 | 
						|
 | 
						|
        options = [
 | 
						|
            [
 | 
						|
                InlineKeyboardButton(">>", callback_data=delta - BUTTON_COUNT)
 | 
						|
            ]
 | 
						|
        ] + [
 | 
						|
            [InlineKeyboardButton(n, callback_data=c)] for n,c in zip(names[::-1], callbacks[::-1])
 | 
						|
        ] + [
 | 
						|
            [
 | 
						|
                InlineKeyboardButton("<<", callback_data=delta + BUTTON_COUNT)
 | 
						|
            ],
 | 
						|
            [
 | 
						|
                InlineKeyboardButton("Delete", callback_data="delete")
 | 
						|
            ]
 | 
						|
        ]
 | 
						|
        keyboard = InlineKeyboardMarkup(options)
 | 
						|
        await query.edit_message_text("Please choose a date \(or type it in the format _DDMMYYYY_\)", parse_mode=ParseMode.MARKDOWN_V2, reply_markup=keyboard)
 | 
						|
 | 
						|
        return ENTRY_OPTIONS
 | 
						|
 | 
						|
    async def date_entry(self, update, context):
 | 
						|
        date = update.message.text
 | 
						|
 | 
						|
        try:
 | 
						|
            date = datetime.datetime.strptime(date, "%d%m%Y").date()
 | 
						|
        except ValueError:
 | 
						|
            await update.message.reply_text("Please enter the date in the format _DDMMYYYY_", parse_mode=ParseMode.MARKDOWN_V2)
 | 
						|
            return ENTRY_OPTIONS
 | 
						|
 | 
						|
        if context.chat_data.get("delete", False): # if not set, delete was not chosen
 | 
						|
            with models.db:
 | 
						|
                self.current_model = models.JournalEntry.get_or_none(
 | 
						|
                    date = date
 | 
						|
                )
 | 
						|
            if self.current_model:
 | 
						|
                await self.delete_entry(update, context)
 | 
						|
            else:
 | 
						|
                await update.message.reply_text("No entry found for this date")
 | 
						|
                context.chat_data["delete"] = False
 | 
						|
            return ConversationHandler.END
 | 
						|
        else:
 | 
						|
            with models.db:
 | 
						|
                self.current_model, new = models.JournalEntry.get_or_create(
 | 
						|
                    date = date
 | 
						|
                )
 | 
						|
            if not new:
 | 
						|
                await update.message.reply_text("An entry already exists for this date")
 | 
						|
                return ConversationHandler.END
 | 
						|
            else:
 | 
						|
                count = models.JournalEntry.select().count()
 | 
						|
                await update.message.reply_text(
 | 
						|
                    text=f"Journal entry no. {count}. What happened on {self.current_model.date_pretty}?"
 | 
						|
                )
 | 
						|
                return CONTENT_ENTRY
 | 
						|
 | 
						|
 | 
						|
    async def content_save(self, update, context):
 | 
						|
        with models.db:
 | 
						|
            self.current_model.author_id = update.message.from_user.id
 | 
						|
 | 
						|
            if update.message.text:
 | 
						|
                self.current_model.text = update.message.text
 | 
						|
            else:
 | 
						|
                if update.message.photo:
 | 
						|
                    file = await update.message.effective_attachment[-1].get_file()
 | 
						|
                else:
 | 
						|
                    file = await update.message.effective_attachment.get_file()
 | 
						|
 | 
						|
                file_bytes = await file.download_as_bytearray()
 | 
						|
                file_path = file.file_path
 | 
						|
                self.current_model.save_media(file_bytes, file_path)
 | 
						|
 | 
						|
                self.current_model.text = update.message.caption
 | 
						|
 | 
						|
            self.current_model.save()
 | 
						|
 | 
						|
        options = [
 | 
						|
            [InlineKeyboardButton(models.RATING_MAPPING[idx], callback_data=idx) for idx in [1,2,3,4,5]]
 | 
						|
        ]
 | 
						|
 | 
						|
        await update.message.reply_text(f"Saved entry ✅. How was the day?", reply_markup=InlineKeyboardMarkup(options))
 | 
						|
 | 
						|
        return DAY_RATING
 | 
						|
 | 
						|
 | 
						|
    async def day_rating_save(self, update, context):
 | 
						|
        query = update.callback_query
 | 
						|
        await query.answer()
 | 
						|
        rating = int(query.data)
 | 
						|
        with models.db:
 | 
						|
            self.current_model.rating = rating
 | 
						|
            self.current_model.save()
 | 
						|
        await query.edit_message_text(text="Rating saved ✅")
 | 
						|
        return ConversationHandler.END
 | 
						|
 | 
						|
 | 
						|
    async def option_delete(self, update, context):
 | 
						|
        query = update.callback_query
 | 
						|
        await query.answer()
 | 
						|
        await query.edit_message_text(text="Please enter the date in the format _DDMMYYYY_", parse_mode=ParseMode.MARKDOWN_V2)
 | 
						|
        context.chat_data["delete"] = True
 | 
						|
        return ENTRY_OPTIONS
 | 
						|
 | 
						|
 | 
						|
    async def delete_entry(self, update, context):
 | 
						|
        with models.db:
 | 
						|
            self.current_model.delete_instance()
 | 
						|
        context.chat_data["delete"] = False
 | 
						|
        await update.message.reply_text(text="Entry deleted ✅")
 | 
						|
 | 
						|
 | 
						|
 | 
						|
### HELPERS
 | 
						|
 | 
						|
def get_names(dates: list):
 | 
						|
    names = []
 | 
						|
    for d in dates:
 | 
						|
        suffix = ""
 | 
						|
        if models.JournalEntry.get_or_none(date = d):
 | 
						|
            suffix = " ✅"
 | 
						|
 | 
						|
        if d == datetime.datetime.now().date():
 | 
						|
            names.append("Today" + suffix)
 | 
						|
        elif d == datetime.datetime.now().date() - datetime.timedelta(days = 1):
 | 
						|
            names.append("Yesterday" + suffix)
 | 
						|
        else:
 | 
						|
            names.append(d.strftime("%d.%m.") + suffix)
 | 
						|
    return names
 |