From 2e65828bbb21406fe796e7c459de6d5cd5391020 Mon Sep 17 00:00:00 2001 From: Remy Moll Date: Mon, 5 Sep 2022 16:29:19 +0200 Subject: [PATCH] reduced slack functionality, higher ease of use. Database migration wip --- docker-compose.yaml | 73 ++-- env/debug | 2 - launch | 44 +++ misc/gather_media_files.py | 2 +- news_check/Dockerfile | 22 +- news_check/client/public/index.html | 7 +- news_check/requirements.txt | 2 +- news_check/server/README.md | 10 - news_check/server/{main.py => app.py} | 7 +- news_check/server/package-lock.json | 3 - news_check/test.py | 20 ++ .dockerignore => news_fetch/.dockerignore | 2 +- news_fetch/Dockerfile | 15 +- news_fetch/app/configuration.py | 59 ---- news_fetch/app/utils_slack/message_helpers.py | 285 --------------- news_fetch/app/utils_slack/runner.py | 189 ---------- news_fetch/app/utils_storage/models.py | 331 ------------------ news_fetch/configuration.py | 66 ++++ news_fetch/{app => }/runner.py | 65 ++-- news_fetch/{app => }/utils_check/runner.py | 4 +- news_fetch/{app => }/utils_mail/runner.py | 2 +- news_fetch/utils_slack/runner.py | 238 +++++++++++++ news_fetch/utils_storage/helpers.py | 10 + .../utils_storage/migrations/migration.001.py | 0 news_fetch/utils_storage/models.py | 297 ++++++++++++++++ news_fetch/{app => }/utils_worker/_init__.py | 0 .../{app => }/utils_worker/compress/runner.py | 2 +- .../utils_worker/download/__init__.py | 0 .../utils_worker/download/browser.py | 23 +- .../{app => }/utils_worker/download/runner.py | 0 .../utils_worker/download/youtube.py | 0 .../{app => }/utils_worker/fetch/runner.py | 7 +- .../{app => }/utils_worker/upload/runner.py | 0 .../{app => }/utils_worker/worker_template.py | 0 news_fetch/{app => }/utils_worker/workers.py | 0 35 files changed, 789 insertions(+), 998 deletions(-) create mode 100644 launch delete mode 100644 news_check/server/README.md rename news_check/server/{main.py => app.py} (84%) delete mode 100644 news_check/server/package-lock.json create mode 100644 news_check/test.py rename .dockerignore => news_fetch/.dockerignore (52%) delete mode 100644 news_fetch/app/configuration.py delete mode 100644 news_fetch/app/utils_slack/message_helpers.py delete mode 100644 news_fetch/app/utils_slack/runner.py delete mode 100644 news_fetch/app/utils_storage/models.py create mode 100644 news_fetch/configuration.py rename news_fetch/{app => }/runner.py (79%) rename news_fetch/{app => }/utils_check/runner.py (97%) rename news_fetch/{app => }/utils_mail/runner.py (97%) create mode 100644 news_fetch/utils_slack/runner.py create mode 100644 news_fetch/utils_storage/helpers.py rename news_fetch/{app => }/utils_storage/migrations/migration.001.py (100%) create mode 100644 news_fetch/utils_storage/models.py rename news_fetch/{app => }/utils_worker/_init__.py (100%) rename news_fetch/{app => }/utils_worker/compress/runner.py (96%) rename news_fetch/{app => }/utils_worker/download/__init__.py (100%) rename news_fetch/{app => }/utils_worker/download/browser.py (87%) rename news_fetch/{app => }/utils_worker/download/runner.py (100%) rename news_fetch/{app => }/utils_worker/download/youtube.py (100%) rename news_fetch/{app => }/utils_worker/fetch/runner.py (92%) rename news_fetch/{app => }/utils_worker/upload/runner.py (100%) rename news_fetch/{app => }/utils_worker/worker_template.py (100%) rename news_fetch/{app => }/utils_worker/workers.py (100%) diff --git a/docker-compose.yaml b/docker-compose.yaml index eafadab..9c91dcc 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,25 +1,8 @@ -# Usage: -# docker compose --env-file env/ run news_fetch && docker-compose --env-file env/production down - version: "3.9" services: - geckodriver: - image: selenium/standalone-firefox:103.0 - volumes: - - ${XSOCK-/dev/null}:${XSOCK-/tmp/sock} - - ${XAUTHORITY-/dev/null}:/home/auto_news/.Xauthority - environment: - - DISPLAY=$DISPLAY - - START_VNC=false - - START_XVFB=false - user: 1001:1001 - expose: # exposed to other docker-compose services only - - "4444" - - - vpn: + vpn: # Creates a connection behind the ETH Firewall to access NAS and Postgres image: wazum/openconnect-proxy:latest env_file: - ${CONTAINER_DATA}/config/vpn.config @@ -28,13 +11,14 @@ services: volumes: - /dev/net/tun:/dev/net/tun # alternative to cap_add & volumes: specify privileged: true + expose: ["5432"] # exposed here because db_passhtrough uses this network. See below for more details + - - nas_sync: + nas_sync: # Syncs locally downloaded files with the NAS-share on nas22.ethz.ch/... depends_on: - - vpn # used to establish a connection to the SMB server - network_mode: "service:vpn" - build: nas_sync + - vpn + network_mode: "service:vpn" # used to establish a connection to the SMB server from inside ETH network + build: nas_sync # local folder to build image: nas_sync:latest cap_add: # capabilities needed for mounting the SMB share - SYS_ADMIN @@ -49,33 +33,56 @@ services: - /sync/nas_sync.config - news_fetch: + geckodriver: # separate docker container for pdf-download. This hugely improves stability (and creates shorter build times for the containers) + image: selenium/standalone-firefox:103.0 # latest version because it mirrors the locally installed version (which is automatically updated) + environment: + - START_VNC=${HEADFULL-false} # as opposed to headless, used when requiring supervision (eg. for websites that crash) + - START_XVFB=${HEADFULL-false} + expose: ["4444"] # exposed to other docker-compose services only + ports: + - 7900:7900 # port for webvnc + + + db_passthrough: # Allows a container on the local network to connect to a service (here postgres) through the vpn + network_mode: "service:vpn" + image: alpine/socat:latest + command: ["tcp-listen:5432,reuseaddr,fork", "tcp-connect:id-hdb-psgr-cp48.ethz.ch:5432"] + # expose: ["5432"] We would want this passthrough to expose its ports to the other containers + # BUT since it uses the same network as the vpn-service, it can't expose ports on its own. 5432 is therefore exposed under service.vpn.expose + + + news_fetch: # Orchestration of the automatic download. It generates pdfs (via the geckodriver container), fetches descriptions, triggers a snaphsot (on archive.org) and writes to a db build: news_fetch image: news_fetch:latest depends_on: # when using docker compose run news_fetch, the dependencies are started as well - nas_sync - geckodriver + - db_passthrough volumes: - ${CONTAINER_DATA}:/app/containerdata # always set - ${CODE:-/dev/null}:/code # not set in prod, defaults to /dev/null environment: - DEBUG=${DEBUG} - - HEADLESS=${HEADLESS} - - REDUCEDFETCH=${REDUCEDFETCH} - entrypoint: ${ENTRYPOINT:-python3 runner.py} # by default launch workers as defined in the Dockerfile - stdin_open: ${INTERACTIVE:-false} # docker run -i - tty: ${INTERACTIVE:-false} # docker run -t + - UNAME=${UNAME} + entrypoint: ${ENTRYPOINT:-python runner.py} # by default launch workers as defined in the Dockerfile + # stdin_open: ${INTERACTIVE:-false} # docker run -i + # tty: ${INTERACTIVE:-false} # docker run -t - news_check: + news_check: # Creates a small webapp on http://localhost:8080 to check previously generated pdfs (some of which are unusable and must be marked as such) build: news_check image: news_check:latest - + # user: 1001:1001 # since the app writes files to the local filesystem, it must be run as the current user + depends_on: + - db_passthrough volumes: - ${CONTAINER_DATA}:/app/containerdata # always set - ${CODE:-/dev/null}:/code # not set in prod, defaults to /dev/null - + environment: + - UNAME=${UNAME} ports: - - 8080:80 # 80 inside container \ No newline at end of file + - "8080:80" # 80 inside container + entrypoint: ${ENTRYPOINT:-python app.py} # by default launch workers as defined in the Dockerfile + tty: true diff --git a/env/debug b/env/debug index ec1170f..1efd28c 100644 --- a/env/debug +++ b/env/debug @@ -3,8 +3,6 @@ CONTAINER_DATA=~/Bulk/COSS/Downloads/coss_archiving CODE=./ -XAUTHORTIY=$XAUTHORTIY -XSOCK=/tmp/.X11-unix DEBUG=true CHECK=false diff --git a/launch b/launch new file mode 100644 index 0000000..c50d61c --- /dev/null +++ b/launch @@ -0,0 +1,44 @@ +#!/bin/bash +set -e +set -o ignoreeof + +echo "Bash script launching COSS_ARCHIVING..." + + +# CHANGE ME! +export CONTAINER_DATA=~/Bulk/COSS/Downloads/coss_archiving +export UNAME=remy + + +if [[ $1 == "debug" ]] +then + export DEBUG=true + export HEADFULL=true + export CODE=./ + export ENTRYPOINT=/bin/bash + # since service ports is not enough here, also execute up, which will + docker compose up -d +elif [[ $1 == "production" ]] +then + export DEBUG=false +elif [[ $1 == "build" ]] +then + export DEBUG=false + docker compose build + exit 0 +elif [[ $1 == "down" ]] +then + docker compose stop + exit 0 +else + echo "Please specify the execution mode (debug/production/build) as the first argument" + exit 1 +fi + +shift # consumes the variable set in $1 so that $@ only contains the remaining arguments + +docker compose run -it --service-ports "$@" + +echo "Docker run finished, shutting down containers..." +docker compose stop +echo "Bye!" diff --git a/misc/gather_media_files.py b/misc/gather_media_files.py index 3b376ad..d60a4b8 100644 --- a/misc/gather_media_files.py +++ b/misc/gather_media_files.py @@ -15,7 +15,7 @@ runner.configuration.models.set_db( runner.configuration.SqliteDatabase("../.dev/media_message_dummy.db"), # chat_db (not needed here) runner.configuration.SqliteDatabase("../.dev/media_downloads.db") ) -runner.configuration.parsed["DOWNLOADS"]["local_storage_path"] = "../.dev/" +runner.configuration.main_config["DOWNLOADS"]["local_storage_path"] = "../.dev/" def fetch(): diff --git a/news_check/Dockerfile b/news_check/Dockerfile index 652622a..25df400 100644 --- a/news_check/Dockerfile +++ b/news_check/Dockerfile @@ -1,25 +1,25 @@ FROM node:18.8 as build-deps -WORKDIR /app +WORKDIR /app/client COPY client/package.json ./ COPY client/package-lock.json ./ COPY client/rollup.config.js ./ COPY client/src ./src/ +RUN npm install RUN npm run build FROM python:latest ENV TZ Europe/Zurich -RUN apt-get update && apt-get install -y postgresql - -RUN mkdir -p /app/news_check - -COPY requirements.txt /app/requirements.txt -RUN python3 -m pip install -r /app/requirements.txt -COPY --from=build-deps /app/public /app/news_check/public - -COPY app /app/news_check WORKDIR /app/news_check -CMD gunicorn app:app -w 1 --threads 2 -b 0.0.0.0:80 \ No newline at end of file +COPY requirements.txt requirements.txt +RUN python3 -m pip install -r requirements.txt + +COPY client/public/index.html client/public/index.html +COPY --from=build-deps /app/client/public client/public/ +COPY server server/ + +WORKDIR /app/news_check/server +# CMD python app.py \ No newline at end of file diff --git a/news_check/client/public/index.html b/news_check/client/public/index.html index fa7c103..871f31e 100644 --- a/news_check/client/public/index.html +++ b/news_check/client/public/index.html @@ -4,10 +4,9 @@ - Svelte app + NEWS CHECK - - + @@ -16,7 +15,7 @@ - + diff --git a/news_check/requirements.txt b/news_check/requirements.txt index 2629d7b..62abb13 100644 --- a/news_check/requirements.txt +++ b/news_check/requirements.txt @@ -1,4 +1,4 @@ flask peewee markdown -gunicorn \ No newline at end of file +psycopg2 \ No newline at end of file diff --git a/news_check/server/README.md b/news_check/server/README.md deleted file mode 100644 index b07f0e6..0000000 --- a/news_check/server/README.md +++ /dev/null @@ -1,10 +0,0 @@ -# Svelte.js + Flask - -A super simple example of using Flask to serve a Svelte app and use it as a backend server. - -Run the following for development: - -- `python server.py` to start the Flask server. -- `cd client; npm install; npm run autobuild` to automatically build and reload the Svelte frontend when it's changed. - -This example just queries the Flask server for a random number. diff --git a/news_check/server/main.py b/news_check/server/app.py similarity index 84% rename from news_check/server/main.py rename to news_check/server/app.py index 2ae4116..39dda9d 100644 --- a/news_check/server/main.py +++ b/news_check/server/app.py @@ -1,4 +1,3 @@ -from crypt import methods import json from flask import Flask, send_from_directory, jsonify import random @@ -7,7 +6,8 @@ app = Flask(__name__) ############################################################################### -# SVELTE BACKEND. Always send index.html and the requested js-files. (compiled by npm) +# SVELTE 'STATIC' BACKEND. Always send index.html and the requested js-files. (compiled by npm) + @app.route("/") #index.html def base(): return send_from_directory('../client/public', 'index.html') @@ -17,6 +17,7 @@ def home(path): + ############################################################################### # API for news_check. @@ -34,4 +35,4 @@ def set_article(id): if __name__ == "__main__": - app.run(debug=True) + app.run(host="0.0.0.0", port="80") diff --git a/news_check/server/package-lock.json b/news_check/server/package-lock.json deleted file mode 100644 index 48e341a..0000000 --- a/news_check/server/package-lock.json +++ /dev/null @@ -1,3 +0,0 @@ -{ - "lockfileVersion": 1 -} diff --git a/news_check/test.py b/news_check/test.py new file mode 100644 index 0000000..cd3fdcf --- /dev/null +++ b/news_check/test.py @@ -0,0 +1,20 @@ +import peewee + +db = peewee.PostgresqlDatabase('coss_archiving', user='ca_rw', password='pleasechangeit', host='vpn', port=5432) +# db.connect() + + +class Pet(peewee.Model): + name = peewee.CharField() + animal_type = peewee.CharField() + + class Meta: + database = db # this model uses the "people.db" database +with db: + db.create_tables([Pet]) +db.get_tables() + +t = Pet.create(name="Test", animal_type="test") + +for pet in Pet.select(): + print(pet.name) diff --git a/.dockerignore b/news_fetch/.dockerignore similarity index 52% rename from .dockerignore rename to news_fetch/.dockerignore index 1726aa3..2bf9536 100644 --- a/.dockerignore +++ b/news_fetch/.dockerignore @@ -1,2 +1,2 @@ -.dev/ +Dockerfile __pycache__/ \ No newline at end of file diff --git a/news_fetch/Dockerfile b/news_fetch/Dockerfile index 8cd350a..92a5134 100644 --- a/news_fetch/Dockerfile +++ b/news_fetch/Dockerfile @@ -3,25 +3,18 @@ FROM python:latest ENV TZ Europe/Zurich -RUN apt-get update && apt-get install -y \ -evince \ -# for checking -xauth \ -#for gui -ghostscript -# for compression - - RUN useradd --create-home --shell /bin/bash --uid 1001 autonews # id mapped to local user # home directory needed for pip package installation +RUN export PATH=/home/autonews/.local/bin:$PATH + + RUN mkdir -p /app/auto_news RUN chown -R autonews:autonews /app USER autonews -RUN export PATH=/home/autonews/.local/bin:$PATH COPY requirements.txt /app/requirements.txt RUN python3 -m pip install -r /app/requirements.txt -COPY app /app/auto_news +COPY . /app/auto_news WORKDIR /app/auto_news diff --git a/news_fetch/app/configuration.py b/news_fetch/app/configuration.py deleted file mode 100644 index 5ee9503..0000000 --- a/news_fetch/app/configuration.py +++ /dev/null @@ -1,59 +0,0 @@ -from dataclasses import dataclass -import os -import shutil -import configparser -import logging -from datetime import datetime -from peewee import SqliteDatabase -from rich.logging import RichHandler - -# first things first: logging -logging.basicConfig( - format='%(message)s', - level=logging.INFO, - datefmt='%H:%M:%S', # add %Y-%m-%d if needed - handlers=[RichHandler()] - ) -logger = logging.getLogger(__name__) - - -# load config file containing constants and secrets -parsed = configparser.ConfigParser() -parsed.read("/app/containerdata/config/news_fetch.config.ini") - -if os.getenv("DEBUG", "false") == "true": - logger.warning("Found 'DEBUG=true', setting up dummy databases") - - db_base_path = parsed["DATABASE"]["db_path_dev"] - parsed["SLACK"]["archive_id"] = parsed["SLACK"]["debug_id"] - parsed["MAIL"]["recipient"] = parsed["MAIL"]["sender"] - parsed["DOWNLOADS"]["local_storage_path"] = parsed["DATABASE"]["db_path_dev"] -else: - logger.warning("Found 'DEBUG=false' and running on production databases, I hope you know what you're doing...") - db_base_path = parsed["DATABASE"]["db_path_prod"] - logger.info("Backing up databases") - backup_dst = parsed["DATABASE"]["db_backup"] - today = datetime.today().strftime("%Y.%m.%d") - shutil.copyfile( - os.path.join(db_base_path, parsed["DATABASE"]["chat_db_name"]), - os.path.join(backup_dst, today + "." + parsed["DATABASE"]["chat_db_name"]), - ) - shutil.copyfile( - os.path.join(db_base_path, parsed["DATABASE"]["download_db_name"]), - os.path.join(backup_dst, today + "." + parsed["DATABASE"]["download_db_name"]), - ) - - -from utils_storage import models - -# Set up the database -models.set_db( - SqliteDatabase( - os.path.join(db_base_path, parsed["DATABASE"]["chat_db_name"]), - pragmas = {'journal_mode': 'wal'} # mutliple threads can read at once - ), - SqliteDatabase( - os.path.join(db_base_path, parsed["DATABASE"]["download_db_name"]), - pragmas = {'journal_mode': 'wal'} # mutliple threads can read at once - ) -) \ No newline at end of file diff --git a/news_fetch/app/utils_slack/message_helpers.py b/news_fetch/app/utils_slack/message_helpers.py deleted file mode 100644 index f0ca785..0000000 --- a/news_fetch/app/utils_slack/message_helpers.py +++ /dev/null @@ -1,285 +0,0 @@ -import logging -import configuration -import requests -import os -import time -from threading import Thread -from slack_sdk.errors import SlackApiError - -logger = logging.getLogger(__name__) -config = configuration.parsed["SLACK"] -models = configuration.models -slack_client = "dummy" -LATEST_RECORDED_REACTION = 0 - - -def init(client) -> None: - """Starts fetching past messages and returns the freshly launched thread""" - global slack_client - slack_client = client - - global LATEST_RECORDED_REACTION - try: - LATEST_RECORDED_REACTION = models.Reaction.select(models.Reaction.id).order_by("id")[-1] - except IndexError: #query is actually empty, we have never fetched any messages until now - LATEST_RECORDED_REACTION = 0 - - # fetch all te messages we could have possibly missed - logger.info("Querying missed messages, threads and reactions. This can take some time.") - fetch_missed_channel_messages() # not threaded - t = Thread(target = fetch_missed_channel_reactions, daemon=True) # threaded, runs in background (usually takes a long time) - t.start() - - if os.getenv("REDUCEDFETCH", "false") == "true": - logger.warning("Only fetching empty threads for bot messages because 'REDUCEDFETCH=true'") - fetch_missed_thread_messages(reduced=True) - else: # perform both asyncronously - fetch_missed_thread_messages() - - - -def get_unhandled_messages(): - """Gets all messages that have not yet been handled, be it by mistake or by downtime - As the message handler makes no distinction between channel messages and thread messages, - we don't have to worry about them here. - """ - - threaded_objects = [] - for t in models.Thread.select(): - if t.message_count > 1: # if only one message was written, it is the channel message - msg = t.last_message - if msg.is_by_human: - threaded_objects.append(msg) - # else don't, nothing to process - logger.info(f"Set {len(threaded_objects)} thread-messages as not yet handled.") - - - channel_objects = [t.initiator_message for t in models.Thread.select() if (t.message_count == 1 and not t.is_fully_processed)] - logger.info(f"Set {len(channel_objects)} channel-messages as not yet handled.") - - reaction_objects = list(models.Reaction.select().where(models.Reaction.id > LATEST_RECORDED_REACTION)) - logger.info(f"Set {len(reaction_objects)} reactions as not yet handled.") - # the ones newer than the last before the fetch - - all_messages = channel_objects + threaded_objects - return all_messages, reaction_objects - - -def fetch_missed_channel_messages(): - # latest processed message_ts is: - presaved = models.Message.select().order_by(models.Message.ts) - if not presaved: - last_ts = 0 - else: - last_message = presaved[-1] - last_ts = last_message.slack_ts - - result = slack_client.conversations_history( - channel=config["archive_id"], - oldest=last_ts - ) - - new_messages = result.get("messages", []) - # # filter the last one, it is a duplicate! (only if the db is not empty!) - # if last_ts != 0 and len(new_messages) != 0: - # new_messages.pop(-1) - - new_fetches = 0 - for m in new_messages: - # print(m) - message_dict_to_model(m) - new_fetches += 1 - - refetch = result.get("has_more", False) - while refetch: # we have not actually fetched them all - try: - result = slack_client.conversations_history( - channel = config["archive_id"], - cursor = result["response_metadata"]["next_cursor"], - oldest = last_ts - ) # fetches 100 messages, older than the [-1](=oldest) element of new_fetches - refetch = result.get("has_more", False) - - new_messages = result.get("messages", []) - for m in new_messages: - message_dict_to_model(m) - new_fetches += 1 - except SlackApiError: # Most likely a rate-limit - logger.error("Error while fetching channel messages. (likely rate limit) Retrying in {} seconds...".format(config["api_wait_time"])) - time.sleep(config["api_wait_time"]) - refetch = True - - logger.info(f"Fetched {new_fetches} new channel messages.") - - -def fetch_missed_thread_messages(reduced=False): - """After having gotten all base-threads, we need to fetch all their replies""" - # I don't know of a better way: we need to fetch this for each and every thread (except if it is marked as permanently solved) - logger.info("Starting fetch of thread messages...") - if reduced: - threads = [t for t in models.Thread.select() if (t.message_count == 1 and not t.is_fully_processed)] - # this only fetches completely empty threads, which might be because the bot-message was not yet saved to the db. - # once we got all the bot-messages the remaining empty threads will be the ones we need to process. - else: - threads = [t for t in models.Thread.select() if not t.is_fully_processed] - logger.info(f"Fetching history for {len(threads)} empty threads") - new_messages = [] - for i,t in enumerate(threads): - try: - messages = slack_client.conversations_replies( - channel = config["archive_id"], - ts = t.slack_ts, - oldest = t.messages[-1].slack_ts - )["messages"] - except SlackApiError: - logger.error("Hit rate limit while querying threaded messages, retrying in {}s ({}/{} queries elapsed)".format(config["api_wait_time"], i, len(threads))) - time.sleep(int(config["api_wait_time"])) - messages = slack_client.conversations_replies( - channel = config["archive_id"], - ts = t.slack_ts, - oldest = t.messages[-1].slack_ts - )["messages"] - - messages.pop(0) # the first message is the one posted in the channel. We already processed it! - - for m in messages: - # only append *new* messages - res = message_dict_to_model(m) - if res: - new_messages.append(res) - logger.info("Fetched {} new threaded messages.".format(len(new_messages))) - - -def fetch_missed_channel_reactions(): - logger.info("Starting background fetch of channel reactions...") - threads = [t for t in models.Thread.select() if not t.is_fully_processed] - for i,t in enumerate(threads): - reactions = [] - try: - query = slack_client.reactions_get( - channel = config["archive_id"], - timestamp = t.slack_ts - ) - reactions = query.get("message", []).get("reactions", []) # default = [] - except SlackApiError as e: - if e.response.get("error", "") == "message_not_found": - m = t.initiator_message - logger.warning(f"Message (id={m.id}) not found. Skipping and saving...") - # this usually means the message is past the 1000 message limit imposed by slack. Mark it as processed in the db - m.is_processed_override = True - m.save() - else: # probably a rate_limit: - logger.error("Hit rate limit while querying reactions. retrying in {}s ({}/{} queries elapsed)".format(config["api_wait_time"], i, len(threads))) - time.sleep(int(config["api_wait_time"])) - - for r in reactions: - reaction_dict_to_model(r, t) - - - - -# Helpers for message conversion to db-objects -def reaction_dict_to_model(reaction, thread=None): - if thread is None: - m_ts = reaction["item"]["ts"] - message = models.Message.get(ts = float(m_ts)) - thread = message.thread - if "name" in reaction.keys(): # fetched through manual api query - content = reaction["name"] - elif "reaction" in reaction.keys(): # fetched through events - content = reaction["reaction"] - else: - logger.error(f"Weird reaction received: {reaction}") - return None - - r, _ = models.Reaction.get_or_create( - type = content, - message = thread.initiator_message - ) - logger.info("Saved reaction [{}]".format(content)) - return r - - -def message_dict_to_model(message): - if message["type"] == "message": - thread_ts = message["thread_ts"] if "thread_ts" in message else message["ts"] - uid = message.get("user", "BAD USER") - if uid == "BAD USER": - logger.critical("Message has no user?? {}".format(message)) - return None - - user, _ = models.User.get_or_create(user_id = uid) - thread, _ = models.Thread.get_or_create(thread_ts = thread_ts) - m, new = models.Message.get_or_create( - user = user, - thread = thread, - ts = message["ts"], - channel_id = config["archive_id"], - text = message["text"] - ) - logger.info(f"Saved: {m} ({'new' if new else 'old'})") - - files = message.get("files", []) - if len(files) >= 1: - f = files[0] #default: [] - m.file_type = f["filetype"] - m.perma_link = f["url_private_download"] - m.save() - logger.info(f"Saved {m.file_type}-file for message (id={m.id})") - if new: - return m - else: - return None - else: - logger.warning("What should I do of {}".format(message)) - return None - - -def say_substitute(*args, **kwargs): - logger.info("Now sending message through say-substitute: {}".format(" - ".join(args))) - slack_client.chat_postMessage( - channel=config["archive_id"], - text=" - ".join(args), - **kwargs - ) - - -def save_as_related_file(url, article_object): - r = requests.get(url, headers={"Authorization": "Bearer {}".format(slack_client.token)}) - saveto = article_object.save_path - ftype = url[url.rfind(".") + 1:] - fname = "{} - related no {}.{}".format( - article_object.file_name.replace(".pdf",""), - len(article_object.related) + 1, - ftype - ) - with open(os.path.join(saveto, fname), "wb") as f: - f.write(r.content) - article_object.set_related([fname]) - logger.info("Added {} to model {}".format(fname, article_object)) - return fname - - -def react_file_path_message(fname, article_object): - saveto = article_object.save_path - file_path = os.path.join(saveto, fname) - if os.path.exists(file_path): - article_object.set_related([fname]) - logger.info("Added {} to model {}".format(fname, article_object)) - return True - else: - return False - - -def is_message_in_archiving(message) -> bool: - if isinstance(message, dict): - return message["channel"] == config["archive_id"] - else: - return message.channel_id == config["archive_id"] - - -def is_reaction_in_archiving(event) -> bool: - if isinstance(event, dict): - return event["item"]["channel"] == config["archive_id"] - else: - return event.message.channel_id == config["archive_id"] diff --git a/news_fetch/app/utils_slack/runner.py b/news_fetch/app/utils_slack/runner.py deleted file mode 100644 index e35117a..0000000 --- a/news_fetch/app/utils_slack/runner.py +++ /dev/null @@ -1,189 +0,0 @@ -from slack_bolt import App -from slack_bolt.adapter.socket_mode import SocketModeHandler -from slack_sdk.errors import SlackApiError - -import logging -import configuration - -from . import message_helpers - - -config = configuration.parsed["SLACK"] -models = configuration.models - -class BotApp(App): - logger = logging.getLogger(__name__) - - def __init__(self, callback, *args, **kwargs): - - super().__init__(*args, **kwargs) - self.callback = callback - - def pre_start(self): - message_helpers.init(self.client) - missed_messages, missed_reactions = message_helpers.get_unhandled_messages() - - [self.handle_incoming_message(m) for m in missed_messages] - [self.handle_incoming_reaction(r) for r in missed_reactions] - - # self.react_missed_reactions(missed_reactions) - # self.react_missed_messages(missed_messages) - self.startup_status() - - - - def handle_incoming_reaction(self, reaction): - if isinstance(reaction, dict): #else: the reaction is already being passed as a model - # CAUTION: filter for 'changed reactions' those are nasty (usually when adding an url) - reaction = message_helpers.reaction_dict_to_model(reaction) - - thread = reaction.message.thread - article_object = thread.article - if not article_object is None: - reaction = reaction.type - status = 1 if reaction == "white_check_mark" else -1 - - # self.logger.info(f"Applying reaction {reaction} to its root message.") - article_object.verified = status - article_object.save() - - - def handle_incoming_message(self, message): - """Reacts to all messages inside channel archiving. Must then - distinguish between threaded replies and new requests - and react accordingly""" - if isinstance(message, dict): #else: the message is already being passed as a model - # CAUTION: filter for 'changed messages' those are nasty (usually when adding an url) - if message.get("subtype", "not bad") == "message_changed": - return False - message = message_helpers.message_dict_to_model(message) - - # First check: belongs to thread? - is_threaded = message.thread.message_count > 1 and message != message.thread.initiator_message - if is_threaded: - self.incoming_thread_message(message) - else: - self.incoming_channel_message(message) - - - def incoming_thread_message(self, message): - if message.user.user_id == config["bot_id"]: - return True # ignore the files uploaded by the bot. We handled them already! - - thread = message.thread - if thread.is_fully_processed: - return True - - self.logger.info("Receiving thread-message") - self.respond_thread_message(message) - - - def incoming_channel_message(self, message): - self.logger.info(f"Handling message {message} ({len(message.urls)} urls)") - - if not message.urls: # no urls in a root-message => IGNORE - message.is_processed_override = True - message.save() - return - - # ensure thread is still empty, this is a scenario encountered only in testing, but let's just filter it - if message.thread.message_count > 1: - self.logger.info("Discarded message because it is actually processed.") - return - - if len(message.urls) > 1: - message_helpers.say_substitute("Only the first url is being handled. Please send any subsequent url as a separate message", thread_ts=message.thread.slack_ts) - - self.callback(message) - # for url in message.urls: - # self.callback(url, message) - # stop here! - - - - def respond_thread_message(self, message, say=message_helpers.say_substitute): - thread = message.thread - article = thread.article - if message.perma_link: # file upload means new data - fname = message_helpers.save_as_related_file(message.perma_link, article) - say("File was saved as 'related file' under `{}`.".format(fname), - thread_ts=thread.slack_ts - ) - else: # either a pointer to a new file (too large to upload), or trash - success = message_helpers.react_file_path_message(message.text, article) - if success: - say("File was saved as 'related file'", thread_ts=thread.slack_ts) - else: - self.logger.error("User replied to thread {} but the response did not contain a file/path".format(thread)) - say("Cannot process response without associated file.", - thread_ts=thread.slack_ts - ) - - - def respond_channel_message(self, thread, say=message_helpers.say_substitute): - article = thread.article - answers = article.slack_info - for a in answers: - if a["file_path"]: - try: # upload resulted in an error - self.client.files_upload( - channels = config["archive_id"], - initial_comment = f"<@{config['responsible_id']}> \n {a['reply_text']}", - file = a["file_path"], - thread_ts = thread.slack_ts - ) - status = True - except SlackApiError as e: - say( - "File {} could not be uploaded.".format(a), - thread_ts=thread.slack_ts - ) - status = False - self.logger.error(f"File upload failed: {e}") - else: # anticipated that there is no file! - say( - f"<@{config['responsible_id']}> \n {a['reply_text']}", - thread_ts=thread.slack_ts - ) - status = True - - - def startup_status(self): - threads = [t for t in models.Thread.select()] - all_threads = len(threads) - fully_processed = len([t for t in threads if t.is_fully_processed]) - fully_unprocessed = len([t for t in threads if t.message_count == 1]) - articles_unprocessed = len(models.ArticleDownload.select().where(models.ArticleDownload.verified < 1)) - self.logger.info(f"[bold]STATUS[/bold]: Fully processed {fully_processed}/{all_threads} threads. {fully_unprocessed} threads have 0 replies. Article-objects to verify: {articles_unprocessed}", extra={"markup": True}) - - - - - -class BotRunner(): - """Stupid encapsulation so that we can apply the slack decorators to the BotApp""" - def __init__(self, callback, *args, **kwargs) -> None: - self.bot_worker = BotApp(callback, token=config["auth_token"]) - - @self.bot_worker.event(event="message", matchers=[message_helpers.is_message_in_archiving]) - def handle_incoming_message(message, say): - return self.bot_worker.handle_incoming_message(message) - - @self.bot_worker.event(event="reaction_added", matchers=[message_helpers.is_reaction_in_archiving]) - def handle_incoming_reaction(event, say): - return self.bot_worker.handle_incoming_reaction(event) - - self.handler = SocketModeHandler(self.bot_worker, config["app_token"]) - - - def start(self): - self.bot_worker.pre_start() - self.handler.start() - - - def stop(self): - self.handler.close() - print("Bye handler!") - - # def respond_to_message(self, message): - # self.bot_worker.handle_incoming_message(message) \ No newline at end of file diff --git a/news_fetch/app/utils_storage/models.py b/news_fetch/app/utils_storage/models.py deleted file mode 100644 index 06739b0..0000000 --- a/news_fetch/app/utils_storage/models.py +++ /dev/null @@ -1,331 +0,0 @@ -import logging -logger = logging.getLogger(__name__) - -from peewee import * -import os -import markdown -import re -import configuration -import datetime - -config = configuration.parsed["DOWNLOADS"] -slack_config = configuration.parsed["SLACK"] - -## Helpers -chat_db = DatabaseProxy() -download_db = DatabaseProxy() - -# set the nature of the db at runtime - -class DownloadBaseModel(Model): - class Meta: - database = download_db - -class ChatBaseModel(Model): - class Meta: - database = chat_db - - - -## == Article related models == ## -class ArticleDownload(DownloadBaseModel): - title = CharField(default='') - pub_date = DateField(default = '') - download_date = DateField(default = datetime.date.today) - source_name = CharField(default = '') - article_url = TextField(default = '', unique=True) - archive_url = TextField(default = '') - file_name = TextField(default = '') - language = CharField(default = '') - summary = TextField(default = '') - comment = TextField(default = '') - verified = IntegerField(default = False) - # authors - # keywords - # ... are added through foreignkeys - - def __str__(self) -> str: - if self.title != '' and self.source_name != '': - desc = f"{shorten_name(self.title)} -- {self.source_name}" - else: - desc = f"{self.article_url}" - return f"ART [{desc}]" - - ## Useful Properties - @property - def save_path(self): - return f"{config['local_storage_path']}/{self.download_date.year}/{self.download_date.strftime('%B')}/" - - def fname_nas(self, file_name=""): - if self.download_date: - if file_name: - return "NAS: {}/{}/{}/{}".format(config["remote_storage_path"], self.download_date.year, self.download_date.strftime("%B"), file_name) - else: # return the self. name - return "NAS: {}/{}/{}/{}".format(config["remote_storage_path"], self.download_date.year, self.download_date.strftime("%B"), self.file_name) - else: - return None - - @property - def fname_template(self): - if "youtube.com" in self.source_name or "youtu.be" in self.source_name: - fname = "{} -- {}".format(self.source_name, self.title) - else: - fname = "{} -- {}.pdf".format(self.source_name, self.title) - return clear_path_name(fname) - - @property - def is_title_bad(self): # add incrementally - return "PUR-Abo" in self.title \ - or "Redirecting" in self.title \ - or "Error while running fetch" in self.title - - @property - def slack_info(self): - status = [":x: No better version available", ":gear: Verification pending", ":white_check_mark: Verified by human"][self.verified + 1] - content = "\n>" + "\n>".join(self.summary.split("\n")) - file_status, msg = self.file_status() - if not file_status: - return [msg] - - # everything alright: generate real content - # first the base file - if self.file_name[-4:] == ".pdf": - answer = [{ # main reply with the base pdf - "reply_text" : f"*{self.title}*\n{status}\n{content}", - "file_path" : self.save_path + self.file_name - }] - else: # don't upload if the file is too big! - location = "Not uploaded to slack, but the file will be on the NAS:\n`{}`".format(self.fname_nas()) - answer = [{ # main reply with the base pdf - "reply_text" : "*{}*\n{}\n{}\n{}".format(self.title, status, content, location), - "file_path" : None - }] - - # then the related files - rel_text = "" - for r in self.related: - fname = r.related_file_name - lentry = "\n• `{}` ".format(self.fname_nas(fname)) - if fname[-4:] == ".pdf": # this is a manageable file, directly upload - f_ret = self.save_path + fname - answer.append({"reply_text":"", "file_path" : f_ret}) - else: # not pdf <=> too large. Don't upload but mention its existence - lentry += "(not uploaded to slack, but the file will be on the NAS)" - - rel_text += lentry - - if rel_text: - rel_text = answer[0]["reply_text"] = answer[0]["reply_text"] + "\nRelated files:\n" + rel_text - - return answer - - @property - def mail_info(self): - base = [{"reply_text": "[{}]({})\n".format(self.article_url, self.article_url), "file_path":None}] + self.slack_info - return [{"reply_text": markdown.markdown(m["reply_text"]), "file_path": m["file_path"]} for m in base] - - - ## Helpers - def set_keywords(self, keywords): - for k in keywords: - ArticleKeyword.create( - article = self, - keyword = k - ) - - def set_authors(self, authors): - for a in authors: - ArticleAuthor.create( - article = self, - author = a - ) - - def set_references(self, references): - for r in references: - ArticleReference.create( - article = self, - reference_url = r - ) - - def set_related(self, related): - for r in related: - ArticleRelated.create( - article = self, - related_file_name = r - ) - - def file_status(self): - if not self.file_name: - logger.error("Article {} has no filename!".format(self)) - return False, {"reply_text": "Download failed, no file was saved.", "file_path": None} - - file_path_abs = self.save_path + self.file_name - if not os.path.exists(file_path_abs): - logger.error("Article {} has a filename, but the file does not exist at that location!".format(self)) - return False, {"reply_text": "Can't find file. Either the download failed or the file was moved.", "file_path": None} - - return True, {} - - -class ArticleKeyword(DownloadBaseModel): - # instance gets created for every one keyword -> flexible in size - article = ForeignKeyField(ArticleDownload, backref='keywords') - keyword = CharField() - - -class ArticleAuthor(DownloadBaseModel): - article = ForeignKeyField(ArticleDownload, backref='authors') - author = CharField() - - -class ArticleReference(DownloadBaseModel): - article = ForeignKeyField(ArticleDownload, backref='references') - reference_url = TextField(default = '') - - -class ArticleRelated(DownloadBaseModel): - article = ForeignKeyField(ArticleDownload, backref='related') - related_file_name = TextField(default = '') - - - - -## == Slack-thread related models == ## -class User(ChatBaseModel): - user_id = CharField(default='', unique=True) - # messages - - -class Thread(ChatBaseModel): - """The threads that concern us are only created if the base massage contains a url""" - thread_ts = FloatField(default = 0) - article = ForeignKeyField(ArticleDownload, backref="slack_thread", null=True, default=None) - # provides, ts, user, models - # messages - - @property - def slack_ts(self): - str_ts = str(self.thread_ts) - cut_zeros = 6 - (len(str_ts) - str_ts.find(".") - 1) # usually there a 6 decimals. If there are less, problem! - return "{}{}".format(str_ts, cut_zeros*"0") - - @property - def initiator_message(self): - try: - return self.messages[0] # TODO check if this needs sorting - except IndexError: - logger.warning(f"Thread {self} is empty. How can that be?") - return None - - @property - def message_count(self): - # logger.warning("message_count was called") - return self.messages.count() - - @property - def last_message(self): - messages = Message.select().where(Message.thread == self).order_by(Message.ts) # can't be empty by definition/creation - return messages[-1] - - @property - def is_fully_processed(self) -> bool: - init_message = self.initiator_message - if init_message is None: - return False - - if init_message.is_processed_override: - return True - # this override is set for instance, when no url was sent at all. Then set this thread to be ignored - - reactions = init_message.reaction - if not reactions: - return False - else: - r = reactions[0].type # can and should only have one reaction - return r == "white_check_mark" \ - or r == "x" - - - -class Message(ChatBaseModel): - ts = FloatField(unique=True) #for sorting - channel_id = CharField(default='') - user = ForeignKeyField(User, backref="messages") - text = TextField(default='') - thread = ForeignKeyField(Thread, backref="messages", default=None) - file_type = CharField(default='') - perma_link = CharField(default='') - is_processed_override = BooleanField(default=False) - # reaction - - def __str__(self) -> str: - return "MSG [{}]".format(shorten_name(self.text).replace('\n','/')) - - @property - def slack_ts(self): - str_ts = str(self.ts) - cut_zeros = 6 - (len(str_ts) - str_ts.find(".") - 1) # usually there a 6 decimals. If there are less, problem! - return "{}{}".format(str_ts, cut_zeros * "0") - - - @property - def urls(self): - pattern = r"<(.*?)>" - matches = re.findall(pattern, self.text) - matches = [m for m in matches if "." in m] - - new_matches = [] - for m in matches: - if "." in m: # must contain a tld, right? - # further complication: slack automatically abreviates urls in the format: - # . Lucky for us, "|" is a character derecommended in urls, meaning we can "safely" split for it and retain the first half - if "|" in m: - keep = m.split("|")[0] - else: - keep = m - new_matches.append(keep) - return new_matches - - @property - def is_by_human(self): - return self.user.user_id != slack_config["bot_id"] - - - @property - def has_single_url(self): - return len(self.urls) == 1 - - -class Reaction(ChatBaseModel): - type = CharField(default = "") - message = ForeignKeyField(Message, backref="reaction") - - - - - - - - -def create_tables(): - with download_db: - download_db.create_tables([ArticleDownload, ArticleKeyword, ArticleAuthor, ArticleReference, ArticleRelated]) - with chat_db: - chat_db.create_tables([User, Message, Thread, Reaction]) - - -def set_db(chat_db_object, download_db_object): - chat_db.initialize(chat_db_object) - download_db.initialize(download_db_object) - create_tables() - -def clear_path_name(path): - keepcharacters = (' ','.','_', '-') - converted = "".join([c if (c.isalnum() or c in keepcharacters) else "_" for c in path]).rstrip() - return converted - -def shorten_name(name, offset = 50): - if len(name) > offset: - return name[:offset] + "..." - else: - return name \ No newline at end of file diff --git a/news_fetch/configuration.py b/news_fetch/configuration.py new file mode 100644 index 0000000..f31ae3e --- /dev/null +++ b/news_fetch/configuration.py @@ -0,0 +1,66 @@ +import os +import shutil +import configparser +import logging +from datetime import datetime +from peewee import SqliteDatabase, PostgresqlDatabase +from rich.logging import RichHandler + + +# first things first: logging +logging.basicConfig( + format='%(message)s', + level=logging.INFO, + datefmt='%H:%M:%S', # add %Y-%m-%d if needed + handlers=[RichHandler()] + ) +logger = logging.getLogger(__name__) + + +# load config file containing constants and secrets +main_config = configparser.ConfigParser() +main_config.read("/app/containerdata/config/news_fetch.config.ini") +db_config = configparser.ConfigParser() +db_config.read("/app/containerdata/config/db.config.ini") + + +# DEBUG MODE: +if os.getenv("DEBUG", "false") == "true": + logger.warning("Found 'DEBUG=true', setting up dummy databases") + + main_config["SLACK"]["archive_id"] = main_config["SLACK"]["debug_id"] + main_config["MAIL"]["recipient"] = main_config["MAIL"]["sender"] + main_config["DOWNLOADS"]["local_storage_path"] = main_config["DOWNLOADS"]["debug_storage_path"] + + download_db = SqliteDatabase( + main_config["DATABASE"]["download_db_debug"], + pragmas = {'journal_mode': 'wal'} # mutliple threads can read at once + ) + +# PRODUCTION MODE: +else: + logger.warning("Found 'DEBUG=false' and running on production databases, I hope you know what you're doing...") + + cred = db_config["DATABASE"] + download_db = PostgresqlDatabase( + cred["db_name"], user=cred["user_name"], password=cred["password"], host="vpn", port=5432 + ) + # TODO Reimplement backup/printout + # logger.info("Backing up databases") + # backup_dst = main_config["DATABASE"]["db_backup"] + # today = datetime.today().strftime("%Y.%m.%d") + # shutil.copyfile( + # os.path.join(db_base_path, main_config["DATABASE"]["chat_db_name"]), + # os.path.join(backup_dst, today + "." + main_config["DATABASE"]["chat_db_name"]), + # ) + # shutil.copyfile( + # os.path.join(db_base_path, main_config["DATABASE"]["download_db_name"]), + # os.path.join(backup_dst, today + "." + main_config["DATABASE"]["download_db_name"]), + # ) + + + +from utils_storage import models + +# Set up the database +models.set_db(download_db) diff --git a/news_fetch/app/runner.py b/news_fetch/runner.py similarity index 79% rename from news_fetch/app/runner.py rename to news_fetch/runner.py index b33fb2f..263dea1 100644 --- a/news_fetch/app/runner.py +++ b/news_fetch/runner.py @@ -3,7 +3,6 @@ import configuration models = configuration.models from threading import Thread import logging -import os import sys logger = logging.getLogger(__name__) @@ -14,10 +13,9 @@ from utils_worker.workers import CompressWorker, DownloadWorker, FetchWorker, Up class ArticleWatcher: """Wrapper for a newly created article object. Notifies the coordinator upon change/completition""" - def __init__(self, article, thread, **kwargs) -> None: + def __init__(self, article, **kwargs) -> None: self.article_id = article.id # in case article becomes None at any point, we can still track the article self.article = article - self.thread = thread self.completition_notifier = kwargs.get("notifier") self.fetch = kwargs.get("worker_fetch", None) @@ -50,7 +48,7 @@ class ArticleWatcher: elif completed_action == "download": self.compress.process(self) elif completed_action == "compress": # last step - self.completition_notifier(self.article, self.thread) + self.completition_notifier(self.article) # triggers action in Coordinator elif completed_action == "upload": # this case occurs when upload was faster than compression @@ -118,17 +116,34 @@ class Coordinator(Thread): def launch(self) -> None: for w in [self.worker_download, self.worker_fetch, self.worker_upload, self.worker_compress]: - if not w is None: + if not w is None: # for reduced operations such as upload, some workers are set to None w.start() + # if past messages have not been sent, they must be reevaluated + unsent = models.ArticleDownload.filter(sent = False) + # .objects.filter(sent = False) + for a in unsent: + print(a) + self.incoming_request(article=a) + + + def incoming_request(self, message=None, article=None): + """This method is passed onto the slack worker. It then is called when a new message is received.""" + + if message is not None: + try: + url = message.urls[0] # ignore all the other ones + except IndexError: + return + article, is_new = models.ArticleDownload.get_or_create(article_url=url) + article.slack_ts = message.ts # either update the timestamp (to the last reference to the article) or set it for the first time + elif article is not None: + is_new = False + logger.info(f"Received article {article} in incoming_request") + else: + logger.error("Coordinator.incoming_request called with no arguments") + return - def incoming_request(self, message): - """This method is passed onto the slack worker. It gets triggered when a new message is received.""" - url = message.urls[0] # ignore all the other ones - article, is_new = models.ArticleDownload.get_or_create(article_url=url) - thread = message.thread - thread.article = article - thread.save() self.kwargs.update({"notifier" : self.article_complete_notifier}) if is_new or (article.file_name == "" and article.verified == 0): @@ -136,7 +151,6 @@ class Coordinator(Thread): # this overwrites previously set information, but that should not be too important ArticleWatcher( article, - thread, **self.kwargs ) @@ -146,7 +160,7 @@ class Coordinator(Thread): # the watcher will notify once it is sufficiently populated else: # manually trigger notification immediatly logger.info(f"Found existing article {article}. Now sending") - self.article_complete_notifier(article, thread) + self.article_complete_notifier(article) @@ -155,18 +169,21 @@ class Coordinator(Thread): w.start() for article in articles: - notifier = lambda article: print(f"Completed manual actions for {article}") + notifier = lambda article: logger.info(f"Completed manual actions for {article}") ArticleWatcher(article, None, workers_manual = workers, notifier = notifier) # Article watcher wants a thread to link article to TODO: handle threads as a kwarg - def article_complete_notifier(self, article, thread): + def article_complete_notifier(self, article): if self.worker_slack is None: - logger.warning("Not sending slack notifier") + logger.warning("Skipping slack notification because worker is None") else: - self.worker_slack.bot_worker.respond_channel_message(thread) + self.worker_slack.bot_worker.respond_channel_message(article) if self.worker_mail is None: - logger.warning("Not sending mail notifier") + logger.warning("Skipping mail notification because worker is None") else: self.worker_mail.send(article) + + article.sent = True + article.save() @@ -174,15 +191,11 @@ if __name__ == "__main__": coordinator = Coordinator() - if os.getenv("UPLOAD", "false") == "true": + if "upload" in sys.argv: articles = models.ArticleDownload.select().where(models.ArticleDownload.archive_url == "").execute() logger.info(f"Launching upload to archive for {len(articles)} articles.") coordinator.manual_processing(articles, [UploadWorker()]) - elif os.getenv("CHECK", "false") == "true": - from utils_check import runner as check_runner - check_runner.verify_unchecked() - else: # launch with full action slack_runner = slack_runner.BotRunner(coordinator.incoming_request) kwargs = { @@ -196,10 +209,10 @@ if __name__ == "__main__": try: coordinator.add_workers(**kwargs) coordinator.start() - slack_runner.start() + slack_runner.start() # last one to start, inside the main thread except KeyboardInterrupt: logger.info("Keyboard interrupt. Stopping Slack and Coordinator") slack_runner.stop() - print("BYE!") + logger.info("BYE!") # coordinator was set as a daemon thread, so it will be stopped automatically sys.exit(0) \ No newline at end of file diff --git a/news_fetch/app/utils_check/runner.py b/news_fetch/utils_check/runner.py similarity index 97% rename from news_fetch/app/utils_check/runner.py rename to news_fetch/utils_check/runner.py index 2325dd6..7d305bf 100644 --- a/news_fetch/app/utils_check/runner.py +++ b/news_fetch/utils_check/runner.py @@ -23,7 +23,7 @@ u_options = { bot_client = WebClient( - token = configuration.parsed["SLACK"]["auth_token"] + token = configuration.main_config["SLACK"]["auth_token"] ) @@ -70,7 +70,7 @@ def send_reaction_to_slack_thread(article, reaction): else: ts = m.slack_ts bot_client.reactions_add( - channel=configuration.parsed["SLACK"]["archive_id"], + channel=configuration.main_config["SLACK"]["archive_id"], name=reaction, timestamp=ts ) diff --git a/news_fetch/app/utils_mail/runner.py b/news_fetch/utils_mail/runner.py similarity index 97% rename from news_fetch/app/utils_mail/runner.py rename to news_fetch/utils_mail/runner.py index d7e09ba..25be18e 100644 --- a/news_fetch/app/utils_mail/runner.py +++ b/news_fetch/utils_mail/runner.py @@ -7,7 +7,7 @@ import logging import configuration logger = logging.getLogger(__name__) -config = configuration.parsed["MAIL"] +config = configuration.main_config["MAIL"] def send(article_model): mail = MIMEMultipart() diff --git a/news_fetch/utils_slack/runner.py b/news_fetch/utils_slack/runner.py new file mode 100644 index 0000000..431a20a --- /dev/null +++ b/news_fetch/utils_slack/runner.py @@ -0,0 +1,238 @@ +from slack_bolt import App +from slack_bolt.adapter.socket_mode import SocketModeHandler +from slack_sdk.errors import SlackApiError + +import logging +import re +import time + +import configuration +config = configuration.main_config["SLACK"] +models = configuration.models + +class MessageIsUnwanted(Exception): + # This exception is triggered when the message is either threaded (reply to another message) or weird (like an edit, a deletion, etc) + pass + +class Message: + ts = str + user_id = str + text = str + logger = logging.getLogger(__name__) + + + def __init__(self, message_dict): + if message_dict.get("subtype", "not bad") == "message_changed": + raise MessageIsUnwanted() + if message_dict["type"] == "message": + if "thread_ts" in message_dict and (message_dict["thread_ts"] != message_dict["ts"]): # meaning it's a reply to another message + raise MessageIsUnwanted() + + self.user_id = message_dict.get("user", "BAD USER") + # self.channel_id = config["archive_id"] # by construction, other messages are not intercepted + self.ts = message_dict["ts"] + self.text = message_dict["text"] + + else: + self.logger.warning(f"What should I do of {message_dict}") + raise MessageIsUnwanted() + + + def __str__(self) -> str: + return f"MSG [{self.text}]" + + + @property + def urls(self): + pattern = r"<(.*?)>" + matches = re.findall(pattern, self.text) + matches = [m for m in matches if "." in m] # must contain a tld, right? + + new_matches = [] + for m in matches: + # further complication: slack automatically abreviates urls in the format: + # . Lucky for us, "|" is a character derecommended in urls, meaning we can "safely" split for it and retain the first half + if "|" in m: + keep = m.split("|")[0] + else: + keep = m + new_matches.append(keep) + return new_matches + + @property + def is_by_human(self): + return self.user.user_id != config["bot_id"] + + + @property + def has_single_url(self): + return len(self.urls) == 1 + + + +class BotApp(App): + logger = logging.getLogger(__name__) + + def __init__(self, callback, *args, **kwargs): + super().__init__(*args, **kwargs) + self.callback = callback + + + def pre_start(self): + missed_messages = self.fetch_missed_channel_messages() + + [self.handle_incoming_message(m) for m in missed_messages] + self.startup_status() + + + def say_substitute(self, *args, **kwargs): + self.client.chat_postMessage( + channel=config["archive_id"], + text=" - ".join(args), + **kwargs + ) + + def fetch_missed_channel_messages(self): + # latest processed message_ts is: + presaved = models.ArticleDownload.select().order_by(models.ArticleDownload.slack_ts.desc()).get_or_none() + if presaved is None: + last_ts = 0 + else: + last_ts = presaved.slack_ts_full + + result = self.client.conversations_history( + channel=config["archive_id"], + oldest=last_ts + ) + + new_messages = result.get("messages", []) + # # filter the last one, it is a duplicate! (only if the db is not empty!) + # if last_ts != 0 and len(new_messages) != 0: + # new_messages.pop(-1) + + return_messages = [Message(m) for m in new_messages] + + refetch = result.get("has_more", False) + while refetch: # we have not actually fetched them all + try: + result = self.client.conversations_history( + channel = config["archive_id"], + cursor = result["response_metadata"]["next_cursor"], + oldest = last_ts + ) # fetches 100 messages, older than the [-1](=oldest) element of new_fetches + refetch = result.get("has_more", False) + + new_messages = result.get("messages", []) + for m in new_messages: + return_messages.append(Message(m)) + except SlackApiError: # Most likely a rate-limit + self.logger.error("Error while fetching channel messages. (likely rate limit) Retrying in {} seconds...".format(config["api_wait_time"])) + time.sleep(config["api_wait_time"]) + refetch = True + + self.logger.info(f"Fetched {len(return_messages)} new channel messages.") + return return_messages + + + + def handle_incoming_message(self, message, say=None): + """Reacts to all messages inside channel archiving. This either gets called when catching up on missed messages (by pre_start()) or by the SocketModeHandler in 'live' mode""" + if isinstance(message, dict): + try: + message = Message(message) + except MessageIsUnwanted: + return False + + + self.logger.info(f"Handling message {message} ({len(message.urls)} urls)") + + + if len(message.urls) > 1: + self.say_substitute("Only the first url is being handled. Please send any subsequent url as a separate message", thread_ts=message.thread.slack_ts) + + self.callback(message = message) + + + def respond_channel_message(self, article, say=None): + if say is None: + say = self.say_substitute + answers = article.slack_info + for a in answers: + if a["file_path"]: + try: + self.client.files_upload( + channels = config["archive_id"], + initial_comment = f"{a['reply_text']}", + file = a["file_path"], + thread_ts = article.slack_ts_full + ) + status = True + except SlackApiError as e: # upload resulted in an error + say( + "File {} could not be uploaded.".format(a), + thread_ts = article.slack_ts_full + ) + status = False + self.logger.error(f"File upload failed: {e}") + else: # anticipated that there is no file! + say( + f"{a['reply_text']}", + thread_ts = article.slack_ts_full + ) + status = True + + + def startup_status(self): + """Prints an overview of the articles. This needs to be called here because it should run after having fetched the newly sent messages""" + total = models.ArticleDownload.select().count() + to_be_processed = models.ArticleDownload.select().where(models.ArticleDownload.title == "").count() + unchecked = models.ArticleDownload.select().where(models.ArticleDownload.verified == 0).count() + bad = models.ArticleDownload.select().where(models.ArticleDownload.verified == -1).count() + not_uploaded = models.ArticleDownload.select().where(models.ArticleDownload.archive_url == "").count() + self.logger.info( + f"[bold]NEWS-FETCH DATABASE STATUS[/bold]: Total entries: {total}; Not yet downloaded: {to_be_processed}; Not yet checked: {unchecked}; Not yet uploaded to archive: {not_uploaded}; Marked as bad: {bad}", + extra={"markup": True} + ) + + + + + +class BotRunner(): + logger = logging.getLogger(__name__) + + """Stupid encapsulation so that we can apply the slack decorators to the BotApp""" + def __init__(self, callback, *args, **kwargs) -> None: + self.bot_worker = BotApp(callback, token=config["auth_token"]) + + @self.bot_worker.event(event="message", matchers=[is_message_in_archiving]) + def handle_incoming_message(message, say): + return self.bot_worker.handle_incoming_message(message, say) + + # @self.bot_worker.event(event="reaction_added", matchers=[is_reaction_in_archiving]) + # def handle_incoming_reaction(event, say): + # return self.bot_worker.handle_incoming_reaction(event) + + @self.bot_worker.event(event="event") + def handle_all_other_reactions(event, say): + self.logger.log("Ignoring slack event that isn't a message") + + self.handler = SocketModeHandler(self.bot_worker, config["app_token"]) + + + def start(self): + self.bot_worker.pre_start() + self.handler.start() + + + def stop(self): + self.handler.close() + self.logger.info("Closed Slack-Socketmodehandler") + + + + + +def is_message_in_archiving(message) -> bool: + return message["channel"] == config["archive_id"] + diff --git a/news_fetch/utils_storage/helpers.py b/news_fetch/utils_storage/helpers.py new file mode 100644 index 0000000..b15592e --- /dev/null +++ b/news_fetch/utils_storage/helpers.py @@ -0,0 +1,10 @@ +def clear_path_name(path): + keepcharacters = (' ','.','_', '-') + converted = "".join([c if (c.isalnum() or c in keepcharacters) else "_" for c in path]).rstrip() + return converted + +def shorten_name(name, offset = 50): + if len(name) > offset: + return name[:offset] + "..." + else: + return name \ No newline at end of file diff --git a/news_fetch/app/utils_storage/migrations/migration.001.py b/news_fetch/utils_storage/migrations/migration.001.py similarity index 100% rename from news_fetch/app/utils_storage/migrations/migration.001.py rename to news_fetch/utils_storage/migrations/migration.001.py diff --git a/news_fetch/utils_storage/models.py b/news_fetch/utils_storage/models.py new file mode 100644 index 0000000..937b381 --- /dev/null +++ b/news_fetch/utils_storage/models.py @@ -0,0 +1,297 @@ +import logging +logger = logging.getLogger(__name__) + +from peewee import * +import os +import markdown +import re +import configuration +import datetime + +from . import helpers +config = configuration.main_config["DOWNLOADS"] +slack_config = configuration.main_config["SLACK"] + +# set the nature of the db at runtime +download_db = DatabaseProxy() + + +class DownloadBaseModel(Model): + class Meta: + database = download_db + + + +## == Article related models == ## +class ArticleDownload(DownloadBaseModel): + # in the beginning this is all we have + article_url = TextField(default = '', unique=True) + + # fetch then fills in the metadata + title = CharField(default='') + @property + def is_title_bad(self): # add incrementally + return "PUR-Abo" in self.title \ + or "Redirecting" in self.title \ + or "Error while running fetch" in self.title + + summary = TextField(default = '') + source_name = CharField(default = '') + language = CharField(default = '') + + + file_name = TextField(default = '') + @property + def save_path(self): + return f"{config['local_storage_path']}/{self.download_date.year}/{self.download_date.strftime('%B')}/" + @property + def fname_nas(self, file_name=""): + if self.download_date: + if file_name: + return f"NAS: {config['remote_storage_path']}/{self.download_date.year}/{self.download_date.strftime('%B')}/{file_name}" + else: # return the self. name + return f"NAS: {config['remote_storage_path']}/{self.download_date.year}/{self.download_date.strftime('%B')}/{self.file_name}" + else: + return None + @property + def fname_template(self): + if "youtube.com" in self.source_name or "youtu.be" in self.source_name: + fname = f"{self.source_name} -- {self.title}" + else: + fname = f"{self.source_name} -- {self.title}.pdf" + return helpers.clear_path_name(fname) + + + archive_url = TextField(default = '') + pub_date = DateField(default = '') + download_date = DateField(default = datetime.date.today) + + slack_ts = FloatField(default = 0) # should be a fixed-length string but float is easier to sort by + @property + def slack_ts_full(self): + str_ts = str(self.slack_ts) + cut_zeros = 6 - (len(str_ts) - str_ts.find(".") - 1) # usually there a 6 decimals + return f"{str_ts}{cut_zeros * '0'}" + + sent = BooleanField(default = False) + + archived_by = CharField(default = os.getenv("UNAME")) + # need to know who saved the message because the file needs to be on their computer in order to get verified + # verification happens in a different app, but the model has the fields here as well + comment = TextField(default = '') + verified = IntegerField(default = 0) # 0 = not verified, 1 = verified, -1 = marked as bad + + # authors + # keywords + # ... are added through foreignkeys + # we will also add an attribute named message, to reference which message should be replied to. This attribute does not need to be saved in the db + + + ## Helpers specific to a single article + def __str__(self) -> str: + if self.title != '' and self.source_name != '': + desc = f"{helpers.shorten_name(self.title)} -- {self.source_name}" + else: + desc = f"{self.article_url}" + return f"ART [{desc}]" + + @property + def slack_info(self): + status = [":x: No better version available", ":gear: Verification pending", ":white_check_mark: Verified by human"][self.verified + 1] + content = "\n>" + "\n>".join(self.summary.split("\n")) + file_status, msg = self.file_status() + if not file_status: + return [msg] + + # everything alright: generate real content + # first the base file + if self.file_name[-4:] == ".pdf": + answer = [{ # main reply with the base pdf + "reply_text" : f"*{self.title}*\n{status}\n{content}", + "file_path" : self.save_path + self.file_name + }] + else: # don't upload if the file is too big! + location = f"Not uploaded to slack, but the file will be on the NAS:\n`{self.fname_nas}`" + answer = [{ # main reply with the base pdf + "reply_text" : f"*{self.title}*\n{status}\n{content}\n{location}", + "file_path" : None + }] + + # then the related files + rel_text = "" + for r in self.related: + fname = r.related_file_name + lentry = "\n• `{}` ".format(self.fname_nas(fname)) + if fname[-4:] == ".pdf": # this is a manageable file, directly upload + f_ret = self.save_path + fname + answer.append({"reply_text":"", "file_path" : f_ret}) + else: # not pdf <=> too large. Don't upload but mention its existence + lentry += "(not uploaded to slack, but the file will be on the NAS)" + + rel_text += lentry + + if rel_text: + rel_text = answer[0]["reply_text"] = answer[0]["reply_text"] + "\nRelated files:\n" + rel_text + + return answer + + @property + def mail_info(self): + base = [{"reply_text": f"[{self.article_url}]({self.article_url})\n", "file_path":None}] + self.slack_info + return [{"reply_text": markdown.markdown(m["reply_text"]), "file_path": m["file_path"]} for m in base] + + + def set_authors(self, authors): + for a in authors: + ArticleAuthor.create( + article = self, + author = a + ) + + def set_related(self, related): + for r in related: + ArticleRelated.create( + article = self, + related_file_name = r + ) + + def file_status(self): + if not self.file_name: + logger.error(f"Article {self} has no filename!") + return False, {"reply_text": "Download failed, no file was saved.", "file_path": None} + + file_path_abs = self.save_path + self.file_name + if not os.path.exists(file_path_abs): + logger.error(f"Article {self} has a filename, but the file does not exist at that location!") + return False, {"reply_text": "Can't find file. Either the download failed or the file was moved.", "file_path": None} + + return True, {} + + +class ArticleAuthor(DownloadBaseModel): + article = ForeignKeyField(ArticleDownload, backref='authors') + author = CharField() + + +class ArticleRelated(DownloadBaseModel): + # Related files, such as the full text of a paper, audio files, etc. + article = ForeignKeyField(ArticleDownload, backref='related') + related_file_name = TextField(default = '') + + + + + +# class Thread(ChatBaseModel): +# """The threads that concern us are only created if the base massage contains a url""" +# thread_ts = FloatField(default = 0) +# article = ForeignKeyField(ArticleDownload, backref="slack_thread", null=True, default=None) +# # provides, ts, user, models +# # messages + +# @property +# def slack_ts(self): +# str_ts = str(self.thread_ts) +# cut_zeros = 6 - (len(str_ts) - str_ts.find(".") - 1) # usually there a 6 decimals. If there are less, problem! +# return "{}{}".format(str_ts, cut_zeros*"0") + +# @property +# def initiator_message(self): +# try: +# return self.messages[0] # TODO check if this needs sorting +# except IndexError: +# logger.warning(f"Thread {self} is empty. How can that be?") +# return None + +# @property +# def message_count(self): +# # logger.warning("message_count was called") +# return self.messages.count() + +# @property +# def last_message(self): +# messages = Message.select().where(Message.thread == self).order_by(Message.ts) # can't be empty by definition/creation +# return messages[-1] + +# @property +# def is_fully_processed(self) -> bool: +# init_message = self.initiator_message +# if init_message is None: +# return False + +# if init_message.is_processed_override: +# return True +# # this override is set for instance, when no url was sent at all. Then set this thread to be ignored + +# reactions = init_message.reaction +# if not reactions: +# return False +# else: +# r = reactions[0].type # can and should only have one reaction +# return r == "white_check_mark" \ +# or r == "x" + + + +# class Message(ChatBaseModel): +# ts = FloatField(unique=True) #for sorting +# channel_id = CharField(default='') +# user = ForeignKeyField(User, backref="messages") +# text = TextField(default='') +# thread = ForeignKeyField(Thread, backref="messages", default=None) +# file_type = CharField(default='') +# perma_link = CharField(default='') +# is_processed_override = BooleanField(default=False) +# # reaction + +# def __str__(self) -> str: +# return "MSG [{}]".format(shorten_name(self.text).replace('\n','/')) + +# @property +# def slack_ts(self): +# str_ts = str(self.ts) +# cut_zeros = 6 - (len(str_ts) - str_ts.find(".") - 1) # usually there a 6 decimals. If there are less, problem! +# return "{}{}".format(str_ts, cut_zeros * "0") + + +# @property +# def urls(self): +# pattern = r"<(.*?)>" +# matches = re.findall(pattern, self.text) +# matches = [m for m in matches if "." in m] + +# new_matches = [] +# for m in matches: +# if "." in m: # must contain a tld, right? +# # further complication: slack automatically abreviates urls in the format: +# # . Lucky for us, "|" is a character derecommended in urls, meaning we can "safely" split for it and retain the first half +# if "|" in m: +# keep = m.split("|")[0] +# else: +# keep = m +# new_matches.append(keep) +# return new_matches + +# @property +# def is_by_human(self): +# return self.user.user_id != slack_config["bot_id"] + + +# @property +# def has_single_url(self): +# return len(self.urls) == 1 + + + + + + + + + +def set_db(download_db_object): + download_db.initialize(download_db_object) + with download_db: # create tables (does nothing if they exist already) + download_db.create_tables([ArticleDownload, ArticleAuthor, ArticleRelated]) + + diff --git a/news_fetch/app/utils_worker/_init__.py b/news_fetch/utils_worker/_init__.py similarity index 100% rename from news_fetch/app/utils_worker/_init__.py rename to news_fetch/utils_worker/_init__.py diff --git a/news_fetch/app/utils_worker/compress/runner.py b/news_fetch/utils_worker/compress/runner.py similarity index 96% rename from news_fetch/app/utils_worker/compress/runner.py rename to news_fetch/utils_worker/compress/runner.py index 385785f..fe7cfb6 100644 --- a/news_fetch/app/utils_worker/compress/runner.py +++ b/news_fetch/utils_worker/compress/runner.py @@ -5,7 +5,7 @@ from pathlib import Path import logging logger = logging.getLogger(__name__) import configuration -config = configuration.parsed["DOWNLOADS"] +config = configuration.main_config["DOWNLOADS"] shrink_sizes = [] diff --git a/news_fetch/app/utils_worker/download/__init__.py b/news_fetch/utils_worker/download/__init__.py similarity index 100% rename from news_fetch/app/utils_worker/download/__init__.py rename to news_fetch/utils_worker/download/__init__.py diff --git a/news_fetch/app/utils_worker/download/browser.py b/news_fetch/utils_worker/download/browser.py similarity index 87% rename from news_fetch/app/utils_worker/download/browser.py rename to news_fetch/utils_worker/download/browser.py index 8693e39..068f16c 100644 --- a/news_fetch/app/utils_worker/download/browser.py +++ b/news_fetch/utils_worker/download/browser.py @@ -8,7 +8,7 @@ from selenium import webdriver import configuration import json -config = configuration.parsed["DOWNLOADS"] +config = configuration.main_config["DOWNLOADS"] blacklisted = json.loads(config["blacklisted_href_domains"]) @@ -25,10 +25,10 @@ class PDFDownloader: options.profile = config["browser_profile_path"] # should be options.set_preference("profile", config["browser_profile_path"]) as of selenium 4 but that doesn't work - if os.getenv("HEADLESS", "false") == "true": - options.add_argument('--headless') + if os.getenv("DEBUG", "false") == "true": + self.logger.warning("Opening browser GUI because of 'DEBUG=true'") else: - self.logger.warning("Opening browser GUI because of 'HEADLESS=false'") + options.add_argument('--headless') options.set_preference('print.save_as_pdf.links.enabled', True) # Just save if the filetype is pdf already @@ -92,7 +92,7 @@ class PDFDownloader: # in the mean time, get a page title if required if article_object.is_title_bad: - article_object.title = self.driver.title.replace(".pdf", "") + article_object.title = self.driver.title.replace(".pdf", "") # some titles end with .pdf # will be propagated to the saved file (dst) as well fname = article_object.fname_template @@ -112,7 +112,6 @@ class PDFDownloader: if success: article_object.file_name = fname - article_object.set_references(self.get_references()) else: article_object.file_name = "" @@ -150,18 +149,6 @@ class PDFDownloader: return False - def get_references(self): - try: - hrefs = [e.get_attribute("href") for e in self.driver.find_elements_by_xpath("//a[@href]")] - except: - hrefs = [] - # len_old = len(hrefs) - hrefs = [h for h in hrefs \ - if not sum([(domain in h) for domain in blacklisted]) # sum([True, False, False, False]) == 1 (esp. not 0) - ] # filter a tiny bit at least - # self.logger.info(f"Hrefs filtered (before: {len_old}, after: {len(hrefs)})") - return hrefs - diff --git a/news_fetch/app/utils_worker/download/runner.py b/news_fetch/utils_worker/download/runner.py similarity index 100% rename from news_fetch/app/utils_worker/download/runner.py rename to news_fetch/utils_worker/download/runner.py diff --git a/news_fetch/app/utils_worker/download/youtube.py b/news_fetch/utils_worker/download/youtube.py similarity index 100% rename from news_fetch/app/utils_worker/download/youtube.py rename to news_fetch/utils_worker/download/youtube.py diff --git a/news_fetch/app/utils_worker/fetch/runner.py b/news_fetch/utils_worker/fetch/runner.py similarity index 92% rename from news_fetch/app/utils_worker/fetch/runner.py rename to news_fetch/utils_worker/fetch/runner.py index 06b2cbe..a67262c 100644 --- a/news_fetch/app/utils_worker/fetch/runner.py +++ b/news_fetch/utils_worker/fetch/runner.py @@ -53,10 +53,5 @@ def get_description(article_object): article_object.set_authors(news_article.authors) except AttributeError: pass # list would have been empty anyway - - try: - article_object.set_keywords(news_article.keywords) - except AttributeError: - pass # list would have been empty anyway - + return article_object diff --git a/news_fetch/app/utils_worker/upload/runner.py b/news_fetch/utils_worker/upload/runner.py similarity index 100% rename from news_fetch/app/utils_worker/upload/runner.py rename to news_fetch/utils_worker/upload/runner.py diff --git a/news_fetch/app/utils_worker/worker_template.py b/news_fetch/utils_worker/worker_template.py similarity index 100% rename from news_fetch/app/utils_worker/worker_template.py rename to news_fetch/utils_worker/worker_template.py diff --git a/news_fetch/app/utils_worker/workers.py b/news_fetch/utils_worker/workers.py similarity index 100% rename from news_fetch/app/utils_worker/workers.py rename to news_fetch/utils_worker/workers.py