All checks were successful
		
		
	
	Build container / Build (pull_request) Successful in 1m10s
				
			
		
			
				
	
	
		
			101 lines
		
	
	
		
			2.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			101 lines
		
	
	
		
			2.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from peewee import *
 | 
						|
from pathlib import Path
 | 
						|
import re
 | 
						|
import os
 | 
						|
import socket
 | 
						|
 | 
						|
ID_MAPPINGS = {
 | 
						|
    "Lia": 5603036217,
 | 
						|
    "Rémy": 364520272,
 | 
						|
}
 | 
						|
ID_MAPPINGS_REV = dict((v, k) for k, v in ID_MAPPINGS.items())
 | 
						|
 | 
						|
RATING_MAPPING = {
 | 
						|
    1: "🙁",
 | 
						|
    2: "😐",
 | 
						|
    3: "🙂",
 | 
						|
    4: "😃",
 | 
						|
    5: "🥰"
 | 
						|
}
 | 
						|
 | 
						|
MEDIA_DIR = Path(os.getenv("MEDIA_DIR"))
 | 
						|
MEDIA_DIR.mkdir(parents=True, exist_ok=True)
 | 
						|
 | 
						|
# check if we are running on a cluster
 | 
						|
IS_PRODUCTION = os.getenv('KUBERNETES_SERVICE_HOST') is not None
 | 
						|
 | 
						|
 | 
						|
 | 
						|
db = DatabaseProxy()
 | 
						|
 | 
						|
class BaseModel(Model):
 | 
						|
    class Meta:
 | 
						|
        database = db
 | 
						|
        db_table = 'journal'
 | 
						|
 | 
						|
 | 
						|
# model for a single journal entry
 | 
						|
class JournalEntry(BaseModel):
 | 
						|
    date = DateField(unique=True)
 | 
						|
    author = TextField(null=True)
 | 
						|
    text = TextField(null=True)
 | 
						|
    media_path = TextField(null=True)
 | 
						|
    last_shown = DateField(null=True)
 | 
						|
    rating = IntegerField(null=True) # mapped by RATING_MAPPING
 | 
						|
 | 
						|
    @property
 | 
						|
    def media(self):
 | 
						|
        return Path(self.media_path).open('rb')
 | 
						|
 | 
						|
    def save_media(self, media: bytearray, file_name: str):
 | 
						|
        ext = Path(file_name).suffix
 | 
						|
        file_name = f"{self.date.isoformat()}-media{ext}"
 | 
						|
        self.media_path = MEDIA_DIR / file_name
 | 
						|
        self.media_path.write_bytes(media)
 | 
						|
        self.save()
 | 
						|
 | 
						|
    @property
 | 
						|
    def author_id(self):
 | 
						|
        if self.author is None:
 | 
						|
            return None
 | 
						|
        else:
 | 
						|
            return ID_MAPPINGS[self.author]
 | 
						|
 | 
						|
    @author_id.setter
 | 
						|
    def author_id(self, author_id):
 | 
						|
        self.author = ID_MAPPINGS_REV[author_id]
 | 
						|
        self.save()
 | 
						|
 | 
						|
 | 
						|
    @property
 | 
						|
    def date_pretty(self) -> str:
 | 
						|
        try:
 | 
						|
            return self.date.strftime('%A, %-d. %B %Y')
 | 
						|
        except ValueError: #fck windows
 | 
						|
            return self.date.strftime('%a, %d. %b %Y')
 | 
						|
 | 
						|
    @property
 | 
						|
    def spoiler_text(self) -> str:
 | 
						|
        """Returns the text with all the frisky details hidden away"""
 | 
						|
        new_text = self.text.replace("<", "<").replace(">", ">").replace("&", "&")
 | 
						|
        pattern = re.compile(
 | 
						|
            r"("
 | 
						|
            r"(((?<=(\.|\!|\?)\s)[A-Z])|(^[A-Z]))" # beginning of a sentence
 | 
						|
            r"([^\.\!\?])+" # any character being part of a sentence
 | 
						|
            r"((\:\))|😇|😈|[Ss]ex)" # the smiley
 | 
						|
            r"([^\.\!\?])*" # continuation of sentence
 | 
						|
            r"(\.|\!|\?|\,|$)" # end of the sentence
 | 
						|
            r")"
 | 
						|
        )
 | 
						|
        matches = pattern.findall(new_text)
 | 
						|
        for match in matches:
 | 
						|
            group_to_replace = match[0]
 | 
						|
            new_text = new_text.replace(group_to_replace, f"<tg-spoiler>{group_to_replace}</tg-spoiler>")
 | 
						|
        return new_text
 | 
						|
 | 
						|
 | 
						|
def set_db(db_path):
 | 
						|
    db.initialize(SqliteDatabase(db_path))
 | 
						|
    with db:
 | 
						|
        db.create_tables([JournalEntry], safe=True)
 |