chat photo improvements.

This commit is contained in:
Lia Schöneweiß 2023-05-24 15:52:33 +02:00
parent 0ef3371f3a
commit ffea11a6ef
3 changed files with 66 additions and 49 deletions

View File

@ -1,22 +1,45 @@
import os import os
from pathlib import Path
from telegram.ext import ExtBot from telegram.ext import ExtBot
import random from telegram.error import BadRequest
import logging
from datetime import time, timedelta, timezone, datetime, date
from peewee import fn
MEDIA_DIR = Path(os.getenv("MEDIA_DIR"))
CHAT_ID = os.getenv("CHAT_ID") CHAT_ID = os.getenv("CHAT_ID")
async def set_random(bot: ExtBot) -> None: class SetChatPhotoJob():
"""Set a random chat photo.""" def __init__(self, models, bot: ExtBot, job_queue):
if os.getenv("DOCKERIZED", "false") == "false": self.models = models
# only change image on prod self.bot = bot
return self.logger = logging.getLogger(self.__class__.__name__)
photos = list(MEDIA_DIR.glob("*.jpg")) + list(MEDIA_DIR.glob("*.png")) + list(MEDIA_DIR.glob("*.jpeg")) if os.getenv("DOCKERIZED", "false") != "true":
# when running locally, annoy the programmer every 60 seconds <3
if len(photos) == 0: job_queue.run_repeating(self.callback_photo, interval=60)
return else:
# set the message sending time; include UTC shift +2
sending_time = time(hour=12, minute=0, second=0, tzinfo=timezone(timedelta(hours=2)))
job_queue.run_monthly(self.callback_photo, when=sending_time, day=-1)
photo = random.choice(photos)
await bot.set_chat_photo(CHAT_ID, photo) async def callback_photo(self, context):
# last_seen of memory must be older than 10 days in past or None
with self.models.db:
possible_photos = self.models.JournalEntry.select().where(
self.models.JournalEntry.media_path != None
).order_by(fn.Random())
try:
chosen_entry = possible_photos.get()
except:
self.logger.warning("No photos available.")
return
chat_id = os.getenv("CHAT_ID")
try:
await self.bot.set_chat_photo(chat_id, chosen_entry.media_path)
except BadRequest:
self.logger.error("This is a private chat!")
return

View File

@ -1,41 +1,43 @@
from datetime import time, timedelta, timezone, datetime, date from datetime import time, timedelta, timezone, datetime, date
import os import os
from peewee import fn from peewee import fn
import logging
class RandomMemoryJob(): class RandomMemoryJob():
def __init__(self, models, bot, job_queue): def __init__(self, models, bot, job_queue):
self.models = models self.models = models
self.bot = bot self.bot = bot
self.logger = logging.getLogger(self.__class__.__name__)
if os.getenv("DOCKERIZED", "false") != "true":
# set the message sending time; include UTC shift +2 # when running locally, annoy the programmer every 60 seconds <3
sending_time = time(hour=12, minute=00, second=0, tzinfo=timezone(timedelta(hours=2))) job_queue.run_repeating(self.callback_memory, interval=60)
self.min_age = 0 # do not filter messages: show them all
# context.job_queue.run_daily(self.callback_memory, sending_time, chat_id=chat_id) else:
# job_queue.run_repeating(self.callback_memory, interval=10, first=sending_time) # set the message sending time; include UTC shift +2
job_queue.run_once(self.callback_memory, timedelta(seconds=2)) sending_time = time(hour=12, minute=0, second=0, tzinfo=timezone(timedelta(hours=2)))
job_queue.run_daily(self.callback_memory, sending_time)
self.min_age = 30 # days
async def callback_memory(self, context): async def callback_memory(self, context):
# last_seen of memory must be older than 10 days in past or None
possible_entries = self.models.JournalEntry.select().where(
(datetime.today().date().year - self.models.JournalEntry.last_shown.year >= 0) | (self.models.JournalEntry.last_shown == None)).where(
(datetime.today().date().month - self.models.JournalEntry.last_shown.month >= 0) | (self.models.JournalEntry.last_shown == None)).where(
(datetime.today().date().day - self.models.JournalEntry.last_shown.day >= 0) | (self.models.JournalEntry.last_shown == None)).order_by(fn.Random())
# returns if all entries have been seen recently # last_seen of memory must be older than 10 days in past or None
if len(possible_entries) == 0: with self.models.db:
print("Come back later for another memory.") possible_entries = self.models.JournalEntry.select().where(
return (self.models.JournalEntry.last_shown <= datetime.today().date() - timedelta(days=self.min_age)) | \
(self.models.JournalEntry.last_shown == None)
).order_by(fn.Random())
try:
chosen_entry = possible_entries.get()
except:
self.logger.warning("Come back later for another memory.")
return
# update the last_shown of the chosen entry # update the last_shown of the chosen entry
chosen_entry = possible_entries.get() chosen_entry.last_shown = datetime.today().date()
# chosen_entry = self.models.JournalEntry.select().get() chosen_entry.save()
# chosen_entry.last_shown = date(year=2023, month=5, day=3)
chosen_entry.last_shown = datetime.today().date()
chosen_entry.save()
chat_id = os.getenv("CHAT_ID") chat_id = os.getenv("CHAT_ID")

View File

@ -11,8 +11,7 @@ logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
level=logging.INFO level=logging.INFO
) )
import asyncio logging.getLogger("httpx").setLevel(logging.WARNING)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -31,15 +30,8 @@ def main() -> None:
application.add_handler(memory.MemoryHandler("memory", models).handler) application.add_handler(memory.MemoryHandler("memory", models).handler)
random_memory.RandomMemoryJob(models, application.bot, application.job_queue) random_memory.RandomMemoryJob(models, application.bot, application.job_queue)
chat_photo.SetChatPhotoJob(models, application.bot, application.job_queue)
# application.add_handler(CommandHandler("help", help_command))
# on non command i.e message - echo the message on Telegram
# application.add_handler(InlineQueryHandler(inline_query))
# on every start set a new chat photo
# loop = asyncio.get_event_loop()
asyncio.ensure_future(chat_photo.set_random(application.bot))
# Run the bot until the user presses Ctrl-C # Run the bot until the user presses Ctrl-C
application.run_polling() application.run_polling()