Merge pull request 'Lists with more functionality' (#3) from checklist into main
All checks were successful
continuous-integration/drone/push Build is passing

Reviewed-on: #3
This commit is contained in:
Remy Moll 2023-06-23 11:02:23 +00:00
commit 86a9762f39
2 changed files with 126 additions and 69 deletions

View File

@ -1,16 +1,13 @@
import os
from pathlib import Path
from telegram.ext import ConversationHandler, CommandHandler, MessageHandler, filters, CallbackQueryHandler
from telegram.ext import ConversationHandler, CommandHandler, MessageHandler, filters, CallbackQueryHandler, CallbackContext
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
from .models import ListModel, set_db, db
MEDIA_DIR = Path(os.getenv("MEDIA_DIR"))
DB_DIR = MEDIA_DIR / "lists_db"
DB_DIR.mkdir(parents=True, exist_ok=True)
PERSISTENCE_DIR = Path(os.getenv("PERSISTENCE_DIR"))
NAME, NEW, ACTION, ITEMADD, ITEMREMOVE = range(5)
NAME, NEW, ACTION, ITEMADD, ITEMREMOVE, ITEMTOGGLE = range(6)
from ..basehandler import BaseHandler
@ -19,8 +16,20 @@ class ListHandler(BaseHandler):
"""Create and edit lists"""
def __init__(self, entry_string, models):
self.journal_models = models # not needed here
del models # not needed here, but part of the template
self.entry_string = entry_string
set_db(PERSISTENCE_DIR / "lists.sqlite")
self.list_overview_keyboard = [
[InlineKeyboardButton("Add item", callback_data="add")],
[InlineKeyboardButton("Toggle item", callback_data="toggle")],
[InlineKeyboardButton("Remove item", callback_data="remove")],
[InlineKeyboardButton("Clear list", callback_data="clear")],
[InlineKeyboardButton("Print list", callback_data="print")],
[InlineKeyboardButton("Delete list", callback_data="delete")],
]
self.handler = ConversationHandler(
entry_points=[CommandHandler(entry_string, self.entry_point)],
states={
@ -31,6 +40,7 @@ class ListHandler(BaseHandler):
NEW : [MessageHandler(filters.TEXT, callback=self.new_listname)],
ACTION: [
CallbackQueryHandler(self.list_add, pattern="^add$"),
CallbackQueryHandler(self.list_toggle, pattern="^toggle$"),
CallbackQueryHandler(self.list_remove, pattern="^remove$"),
CallbackQueryHandler(self.list_clear, pattern="^clear$"),
CallbackQueryHandler(self.list_delete, pattern="^delete$"),
@ -38,6 +48,7 @@ class ListHandler(BaseHandler):
CallbackQueryHandler(self.list_menu, pattern="^overview$"),
],
ITEMADD : [MessageHandler(filters.TEXT, callback=self.list_add_item)],
ITEMTOGGLE: [CallbackQueryHandler(self.list_toggle_index)],
ITEMREMOVE : [CallbackQueryHandler(self.list_remove_index)]
},
fallbacks=[CommandHandler('list', self.entry_point)],
@ -46,10 +57,9 @@ class ListHandler(BaseHandler):
async def entry_point(self, update, context) -> None:
await super().entry_point(update, context)
set_db(DB_DIR / f"chat_{update.message.chat_id}.db")
with db:
lists = ListModel.select()
keyboard = [[InlineKeyboardButton(k.name, callback_data=f"list-{k.name}")] for k in lists] + \
lists = ListModel.select().where(ListModel.chat_id == update.effective_chat.id)
keyboard = [[InlineKeyboardButton(k.name, callback_data=f"list-{k.id}")] for k in lists] + \
[[InlineKeyboardButton("New list", callback_data="new")]]
reply_markup = InlineKeyboardMarkup(keyboard)
@ -57,23 +67,16 @@ class ListHandler(BaseHandler):
return NAME
async def choose_list(self, update, context) -> None:
async def choose_list(self, update, context: CallbackContext) -> None:
query = update.callback_query
data = query.data
name = data.replace("list-","")
id = data.replace("list-","")
await query.answer()
self.current_name = name
context.user_data["current_list"] = ListModel.get(id = id)
keyboard = [
[InlineKeyboardButton("Add item", callback_data="add")],
[InlineKeyboardButton("Remove item", callback_data="remove")],
[InlineKeyboardButton("Clear list", callback_data="clear")],
[InlineKeyboardButton("Print list", callback_data="print")],
[InlineKeyboardButton("Delete list", callback_data="delete")],
]
reply_markup = InlineKeyboardMarkup(keyboard)
reply_markup = InlineKeyboardMarkup(self.list_overview_keyboard)
await query.edit_message_text("Very well. For " + name + " the following actions are available:", reply_markup=reply_markup)
await query.edit_message_text(f"Using {context.user_data['current_list'].name}. Available actions:", reply_markup=reply_markup)
return ACTION
@ -81,16 +84,9 @@ class ListHandler(BaseHandler):
query = update.callback_query
await query.answer()
keyboard = [
[InlineKeyboardButton("Add item", callback_data="add")],
[InlineKeyboardButton("Remove item", callback_data="remove")],
[InlineKeyboardButton("Clear list", callback_data="clear")],
[InlineKeyboardButton("Print list", callback_data="print")],
[InlineKeyboardButton("Delete list", callback_data="delete")],
]
reply_markup = InlineKeyboardMarkup(keyboard)
reply_markup = InlineKeyboardMarkup(self.list_overview_keyboard)
await query.edit_message_text("Very well. For " + self.current_name + " the following actions are available:", reply_markup=reply_markup)
await query.edit_message_text(f"Using {context.user_data['current_list'].name}. Available actions:", reply_markup=reply_markup)
return ACTION
@ -105,10 +101,9 @@ class ListHandler(BaseHandler):
name = update.message.text
try:
with db:
ListModel.create(name = name)
context.user_data["current_list"] = ListModel.create(name = name, chat_id=update.effective_chat.id)
keyboard = [[InlineKeyboardButton("Add an item", callback_data="add"), InlineKeyboardButton("To the menu!", callback_data="overview")]]
reply_markup = InlineKeyboardMarkup(keyboard)
self.current_name = name
await update.message.reply_text("Thanks. List " + name + " was successfully created.", reply_markup=reply_markup)
return ACTION
except Exception as e:
@ -123,13 +118,24 @@ class ListHandler(BaseHandler):
return ITEMADD
async def list_toggle(self, update, context) -> None:
query = update.callback_query
await query.answer()
list_object = context.user_data["current_list"]
keyboard = [[InlineKeyboardButton(v, callback_data=k)] for k,v in list_object.content.items()]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text("Which item would you like to toggle?", reply_markup = reply_markup)
return ITEMTOGGLE
async def list_remove(self, update, context) -> None:
query = update.callback_query
await query.answer()
with db:
list_object = ListModel.get(name = self.current_name)
keyboard = [[InlineKeyboardButton(k, callback_data=i)] for i,k in enumerate(list_object.content_list)]
list_object = context.user_data["current_list"]
keyboard = [[InlineKeyboardButton(v, callback_data=k)] for k,v in list_object.content.items()]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text("Which item would you like to remove?", reply_markup = reply_markup)
@ -139,47 +145,77 @@ class ListHandler(BaseHandler):
async def list_clear(self, update, context) -> None:
query = update.callback_query
await query.answer()
with db:
ListModel.get(name = self.current_name).content_list = []
list_object = context.user_data["current_list"]
list_object.content = {}
keyboard = [[InlineKeyboardButton("Add an item", callback_data="add"), InlineKeyboardButton("Back to the menu", callback_data="overview")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text("List " + self.current_name + " cleared", reply_markup=reply_markup)
await query.edit_message_text(f"List {list_object.name} cleared", reply_markup=reply_markup)
return ACTION
async def list_delete(self, update, context) -> None:
query = update.callback_query
await query.answer()
with db:
ListModel.get(name = self.current_name).delete_instance()
await query.edit_message_text("List " + self.current_name + " deleted")
list_object = context.user_data["current_list"]
list_object.delete_instance()
await query.edit_message_text(f"List {list_object.name} deleted")
return ConversationHandler.END
async def list_print(self, update, context) -> None:
query = update.callback_query
await query.answer()
with db:
it = ListModel.get(name = self.current_name).content_list
if it:
content = "·" + "\n· ".join(it)
list_object = context.user_data["current_list"]
content_it = list_object.content.values()
done_it = [
"· " if e is None \
else "" if e \
else "" \
for e in list_object.done_dict.values()]
if content_it:
msg_content = "\n".join([f"{d} {c}" for d, c in zip(done_it, content_it)])
else:
content = "List empty"
msg_content = "List empty"
keyboard = [[InlineKeyboardButton("Add an item", callback_data="add"), InlineKeyboardButton("Back to the menu", callback_data="overview")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text("Content of " + self.current_name + ":\n" + content, reply_markup=reply_markup)
await query.edit_message_text(f"Content of {list_object.name}:\n{msg_content}", reply_markup=reply_markup)
return ACTION
async def list_add_item(self, update, context) -> None:
item = update.message.text
with db:
ListModel.get(name = self.current_name).content_list = ListModel.get(name = self.current_name).content_list + [item]
list_object = context.user_data["current_list"]
new = list_object.content
new.update({"random_key": item})
list_object.content = new
# TODO test me!
keyboard = [[InlineKeyboardButton("Add some more", callback_data="add"), InlineKeyboardButton("Back to the menu", callback_data="overview")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text("Added " + item, reply_markup=reply_markup)
await update.message.reply_text(f"Added {item}", reply_markup=reply_markup)
return ACTION
async def list_toggle_index(self, update, context) -> None:
query = update.callback_query
toggle_key = int(query.data)
await query.answer()
list_object = context.user_data["current_list"]
old = list_object.done_dict[toggle_key]
# if all None or all False (first toggle or all false) then set all dones to False
if not any(list_object.done_dict.values()):
new_done_dict = dict.fromkeys(list_object.done_dict, False)
else: new_done_dict = list_object.done_dict
new_done_dict[toggle_key] = not old
list_object.done_dict = new_done_dict
keyboard = [[InlineKeyboardButton("Toggle another", callback_data="toggle"), InlineKeyboardButton("Back to the menu", callback_data="overview")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(f"Toggled {list_object.content[toggle_key]}", reply_markup=reply_markup)
return ACTION
@ -188,14 +224,13 @@ class ListHandler(BaseHandler):
ind = int(query.data)
await query.answer()
with db:
list_object = ListModel.get(name = self.current_name)
old = list_object.content_list
list_object = context.user_data["current_list"]
old = list_object.content
name = old.pop(ind)
list_object.content_list = old
list_object.content = old
keyboard = [[InlineKeyboardButton("Remove another", callback_data="remove"), InlineKeyboardButton("Back to the menu", callback_data="overview")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text("Removed " + name, reply_markup=reply_markup)
await query.edit_message_text(f"Removed {name}", reply_markup=reply_markup)
return ACTION

View File

@ -1,30 +1,52 @@
from peewee import *
import json
db = DatabaseProxy()
class BaseModel(Model):
class Meta:
database = db
db_table = 'lists'
class ListModel(BaseModel):
name = CharField(unique=True)
content = TextField(default="") # unlimited length, use to serialise list into
name = CharField(default="")
chat_id = IntegerField()
@property
def content_list(self):
return json.loads(self.content or '[]')
def content(self) -> dict:
return {e.id: e.entry for e in self.entries}
@content_list.setter
def content_list(self, list_content):
self.content = json.dumps(list_content)
with db:
self.save()
@content.setter
def content(self, new_content: dict):
old_content = self.content
if len(old_content) < len(new_content):
# we assume: only 1 item added (last item)
new_item = list(new_content.values())[-1]
ListEntryModel.create(list_model=self, entry = new_item)
elif len(old_content) > len(new_content):
to_delete_ids = set(old_content.keys()) - set(new_content.keys())
ListEntryModel.delete().where(ListEntryModel.id.in_(list(to_delete_ids))).execute()
@property
def done_dict(self):
return {e.id: e.done for e in self.entries}
@done_dict.setter
def done_dict(self, new_done: dict):
old_done_dict = self.done_dict
for k,d in new_done.items():
if d != old_done_dict[k]:
ListEntryModel.update(done = d).where(ListEntryModel.id == k).execute()
class ListEntryModel(BaseModel):
list_model = ForeignKeyField(ListModel, backref="entries", on_delete="CASCADE")
entry = TextField(default="")
done = BooleanField(default=None, null=True)
def set_db(db_path):
db.initialize(SqliteDatabase(db_path))
with db:
db.create_tables([ListModel], safe=True)
db.create_tables([ListModel, ListEntryModel], safe=True)