switched to uv and gitea-actions-based pipeline
All checks were successful
Build container / Build (pull_request) Successful in 43s

This commit is contained in:
2025-07-29 14:28:07 +02:00
parent b12eb62b41
commit 29d951427d
18 changed files with 279 additions and 194 deletions

View File

@@ -38,7 +38,7 @@ class JournalHandler(BaseHandler):
async def entry_point(self, update, context):
await super().entry_point(update, context)
if os.getenv("DOCKERIZED", "false") == "true" and os.getenv("CHAT_ID") != str(update.message.chat_id):
if models.IS_PRODUCTION and os.getenv("CHAT_ID") != str(update.message.chat_id):
await update.message.reply_text("You are not authorized to use this bot")
return ConversationHandler.END
@@ -79,7 +79,7 @@ class JournalHandler(BaseHandler):
else:
await query.edit_message_text(text="An entry already exists for this date")
return ConversationHandler.END
return CONTENT_ENTRY
@@ -113,13 +113,13 @@ class JournalHandler(BaseHandler):
async def date_entry(self, update, context):
date = update.message.text
try:
date = datetime.datetime.strptime(date, "%d%m%Y").date()
except ValueError:
await update.message.reply_text("Please enter the date in the format _DDMMYYYY_", parse_mode=ParseMode.MARKDOWN_V2)
return ENTRY_OPTIONS
if context.chat_data.get("delete", False): # if not set, delete was not chosen
with models.db:
self.current_model = models.JournalEntry.get_or_none(
@@ -158,7 +158,7 @@ class JournalHandler(BaseHandler):
file = await update.message.effective_attachment[-1].get_file()
else:
file = await update.message.effective_attachment.get_file()
file_bytes = await file.download_as_bytearray()
file_path = file.file_path
self.current_model.save_media(file_bytes, file_path)
@@ -211,7 +211,7 @@ def get_names(dates: list):
suffix = ""
if models.JournalEntry.get_or_none(date = d):
suffix = ""
if d == datetime.datetime.now().date():
names.append("Today" + suffix)
elif d == datetime.datetime.now().date() - datetime.timedelta(days = 1):

View File

@@ -3,7 +3,7 @@ from telegram.ext import ConversationHandler, CommandHandler, MessageHandler, fi
from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update, InputMediaPhoto
import models
from telegram.constants import ParseMode
# ACTION_CHOICE, DATE_ENTRY, ADD_CONTENT = range(3)
MEMORY_CHOICE = range(1)
@@ -28,7 +28,7 @@ class MemoryHandler(BaseHandler):
async def entry_point(self, update: Update, context: CallbackContext):
await super().entry_point(update, context)
if os.getenv("DOCKERIZED", "false") == "true" and os.getenv("CHAT_ID") != str(update.message.chat_id):
if models.IS_PRODUCTION and os.getenv("CHAT_ID") != str(update.message.chat_id):
await update.message.reply_text("You are not authorized to use this bot")
return ConversationHandler.END
@@ -56,9 +56,9 @@ class MemoryHandler(BaseHandler):
await update.message.reply_text(
f"Which moment would you like to remember?", reply_markup=keyboard
)
context.chat_data["kept_matches"] = list(matching_models)
return MEMORY_CHOICE
async def choose_memory(self, update: Update, context: CallbackContext):

View File

@@ -13,8 +13,8 @@ class SetChatPhotoJob():
def __init__(self, bot: ExtBot, job_queue):
self.bot = bot
self.logger = logging.getLogger(self.__class__.__name__)
if os.getenv("DOCKERIZED", "false") != "true":
if models.IS_PRODUCTION:
# when running locally, annoy the programmer every 60 seconds <3
job_queue.run_repeating(self.callback_photo, interval=60)
else:
@@ -24,13 +24,13 @@ class SetChatPhotoJob():
async def callback_photo(self, context):
# last_seen of memory must be older than 10 days in past or None
with models.db:
possible_photos = models.JournalEntry.select().where(
models.JournalEntry.media_path != None
).order_by(fn.Random())
try:
chosen_entry = possible_photos.get()
except:

View File

@@ -4,13 +4,14 @@ import os
from peewee import fn
import logging
import models
from telegram.ext import ExtBot
class RandomMemoryJob():
def __init__(self, bot, job_queue):
def __init__(self, bot: ExtBot, job_queue):
self.bot = bot
self.logger = logging.getLogger(self.__class__.__name__)
if os.getenv("DOCKERIZED", "false") != "true":
if models.IS_PRODUCTION:
# when running locally, annoy the programmer every 60 seconds <3
job_queue.run_repeating(self.callback_memory, interval=3600)
self.min_age = 0 # do not filter messages: show them all
@@ -22,14 +23,14 @@ class RandomMemoryJob():
async def callback_memory(self, context):
# last_seen of memory must be older than 10 days in past or None
with models.db:
possible_entries = models.JournalEntry.select().where(
(models.JournalEntry.last_shown <= datetime.today().date() - timedelta(days=self.min_age)) | \
(models.JournalEntry.last_shown == None)
).order_by(fn.Random())
try:
chosen_entry = possible_entries.get()
except:

View File

@@ -2,7 +2,7 @@ from peewee import *
from pathlib import Path
import re
import os
import datetime
import socket
ID_MAPPINGS = {
"Lia": 5603036217,
@@ -11,9 +11,9 @@ ID_MAPPINGS = {
ID_MAPPINGS_REV = dict((v, k) for k, v in ID_MAPPINGS.items())
RATING_MAPPING = {
1: "😵",
2: "☹️",
3: "😐",
1: "🙁",
2: "😐",
3: "🙂",
4: "😃",
5: "🥰"
}
@@ -21,6 +21,11 @@ RATING_MAPPING = {
MEDIA_DIR = Path(os.getenv("MEDIA_DIR"))
MEDIA_DIR.mkdir(parents=True, exist_ok=True)
# check if we are running on a cluster
IS_PRODUCTION = os.getenv('KUBERNETES_SERVICE_HOST') is not None
db = DatabaseProxy()
class BaseModel(Model):
@@ -37,11 +42,11 @@ class JournalEntry(BaseModel):
media_path = TextField(null=True)
last_shown = DateField(null=True)
rating = IntegerField(null=True) # mapped by RATING_MAPPING
@property
def media(self):
return Path(self.media_path).open('rb')
def save_media(self, media: bytearray, file_name: str):
ext = Path(file_name).suffix
file_name = f"{self.date.isoformat()}-media{ext}"
@@ -87,7 +92,7 @@ class JournalEntry(BaseModel):
group_to_replace = match[0]
new_text = new_text.replace(group_to_replace, f"<tg-spoiler>{group_to_replace}</tg-spoiler>")
return new_text
def set_db(db_path):
db.initialize(SqliteDatabase(db_path))