reduced slack functionality, higher ease of use. Database migration wip

This commit is contained in:
Remy Moll 2022-09-05 16:29:19 +02:00
parent 60c9e88c7b
commit 2e65828bbb
35 changed files with 789 additions and 998 deletions

View File

@ -1,25 +1,8 @@
# Usage:
# docker compose --env-file env/<mode> run <args> news_fetch && docker-compose --env-file env/production down
version: "3.9" version: "3.9"
services: services:
geckodriver: vpn: # Creates a connection behind the ETH Firewall to access NAS and Postgres
image: selenium/standalone-firefox:103.0
volumes:
- ${XSOCK-/dev/null}:${XSOCK-/tmp/sock}
- ${XAUTHORITY-/dev/null}:/home/auto_news/.Xauthority
environment:
- DISPLAY=$DISPLAY
- START_VNC=false
- START_XVFB=false
user: 1001:1001
expose: # exposed to other docker-compose services only
- "4444"
vpn:
image: wazum/openconnect-proxy:latest image: wazum/openconnect-proxy:latest
env_file: env_file:
- ${CONTAINER_DATA}/config/vpn.config - ${CONTAINER_DATA}/config/vpn.config
@ -28,13 +11,14 @@ services:
volumes: volumes:
- /dev/net/tun:/dev/net/tun - /dev/net/tun:/dev/net/tun
# alternative to cap_add & volumes: specify privileged: true # alternative to cap_add & volumes: specify privileged: true
expose: ["5432"] # exposed here because db_passhtrough uses this network. See below for more details
nas_sync: nas_sync: # Syncs locally downloaded files with the NAS-share on nas22.ethz.ch/...
depends_on: depends_on:
- vpn # used to establish a connection to the SMB server - vpn
network_mode: "service:vpn" network_mode: "service:vpn" # used to establish a connection to the SMB server from inside ETH network
build: nas_sync build: nas_sync # local folder to build
image: nas_sync:latest image: nas_sync:latest
cap_add: # capabilities needed for mounting the SMB share cap_add: # capabilities needed for mounting the SMB share
- SYS_ADMIN - SYS_ADMIN
@ -49,33 +33,56 @@ services:
- /sync/nas_sync.config - /sync/nas_sync.config
news_fetch: geckodriver: # separate docker container for pdf-download. This hugely improves stability (and creates shorter build times for the containers)
image: selenium/standalone-firefox:103.0 # latest version because it mirrors the locally installed version (which is automatically updated)
environment:
- START_VNC=${HEADFULL-false} # as opposed to headless, used when requiring supervision (eg. for websites that crash)
- START_XVFB=${HEADFULL-false}
expose: ["4444"] # exposed to other docker-compose services only
ports:
- 7900:7900 # port for webvnc
db_passthrough: # Allows a container on the local network to connect to a service (here postgres) through the vpn
network_mode: "service:vpn"
image: alpine/socat:latest
command: ["tcp-listen:5432,reuseaddr,fork", "tcp-connect:id-hdb-psgr-cp48.ethz.ch:5432"]
# expose: ["5432"] We would want this passthrough to expose its ports to the other containers
# BUT since it uses the same network as the vpn-service, it can't expose ports on its own. 5432 is therefore exposed under service.vpn.expose
news_fetch: # Orchestration of the automatic download. It generates pdfs (via the geckodriver container), fetches descriptions, triggers a snaphsot (on archive.org) and writes to a db
build: news_fetch build: news_fetch
image: news_fetch:latest image: news_fetch:latest
depends_on: # when using docker compose run news_fetch, the dependencies are started as well depends_on: # when using docker compose run news_fetch, the dependencies are started as well
- nas_sync - nas_sync
- geckodriver - geckodriver
- db_passthrough
volumes: volumes:
- ${CONTAINER_DATA}:/app/containerdata # always set - ${CONTAINER_DATA}:/app/containerdata # always set
- ${CODE:-/dev/null}:/code # not set in prod, defaults to /dev/null - ${CODE:-/dev/null}:/code # not set in prod, defaults to /dev/null
environment: environment:
- DEBUG=${DEBUG} - DEBUG=${DEBUG}
- HEADLESS=${HEADLESS} - UNAME=${UNAME}
- REDUCEDFETCH=${REDUCEDFETCH} entrypoint: ${ENTRYPOINT:-python runner.py} # by default launch workers as defined in the Dockerfile
entrypoint: ${ENTRYPOINT:-python3 runner.py} # by default launch workers as defined in the Dockerfile # stdin_open: ${INTERACTIVE:-false} # docker run -i
stdin_open: ${INTERACTIVE:-false} # docker run -i # tty: ${INTERACTIVE:-false} # docker run -t
tty: ${INTERACTIVE:-false} # docker run -t
news_check: news_check: # Creates a small webapp on http://localhost:8080 to check previously generated pdfs (some of which are unusable and must be marked as such)
build: news_check build: news_check
image: news_check:latest image: news_check:latest
# user: 1001:1001 # since the app writes files to the local filesystem, it must be run as the current user
depends_on:
- db_passthrough
volumes: volumes:
- ${CONTAINER_DATA}:/app/containerdata # always set - ${CONTAINER_DATA}:/app/containerdata # always set
- ${CODE:-/dev/null}:/code # not set in prod, defaults to /dev/null - ${CODE:-/dev/null}:/code # not set in prod, defaults to /dev/null
environment:
- UNAME=${UNAME}
ports: ports:
- 8080:80 # 80 inside container - "8080:80" # 80 inside container
entrypoint: ${ENTRYPOINT:-python app.py} # by default launch workers as defined in the Dockerfile
tty: true

2
env/debug vendored
View File

@ -3,8 +3,6 @@
CONTAINER_DATA=~/Bulk/COSS/Downloads/coss_archiving CONTAINER_DATA=~/Bulk/COSS/Downloads/coss_archiving
CODE=./ CODE=./
XAUTHORTIY=$XAUTHORTIY
XSOCK=/tmp/.X11-unix
DEBUG=true DEBUG=true
CHECK=false CHECK=false

44
launch Normal file
View File

@ -0,0 +1,44 @@
#!/bin/bash
set -e
set -o ignoreeof
echo "Bash script launching COSS_ARCHIVING..."
# CHANGE ME!
export CONTAINER_DATA=~/Bulk/COSS/Downloads/coss_archiving
export UNAME=remy
if [[ $1 == "debug" ]]
then
export DEBUG=true
export HEADFULL=true
export CODE=./
export ENTRYPOINT=/bin/bash
# since service ports is not enough here, also execute up, which will
docker compose up -d
elif [[ $1 == "production" ]]
then
export DEBUG=false
elif [[ $1 == "build" ]]
then
export DEBUG=false
docker compose build
exit 0
elif [[ $1 == "down" ]]
then
docker compose stop
exit 0
else
echo "Please specify the execution mode (debug/production/build) as the first argument"
exit 1
fi
shift # consumes the variable set in $1 so that $@ only contains the remaining arguments
docker compose run -it --service-ports "$@"
echo "Docker run finished, shutting down containers..."
docker compose stop
echo "Bye!"

View File

@ -15,7 +15,7 @@ runner.configuration.models.set_db(
runner.configuration.SqliteDatabase("../.dev/media_message_dummy.db"), # chat_db (not needed here) runner.configuration.SqliteDatabase("../.dev/media_message_dummy.db"), # chat_db (not needed here)
runner.configuration.SqliteDatabase("../.dev/media_downloads.db") runner.configuration.SqliteDatabase("../.dev/media_downloads.db")
) )
runner.configuration.parsed["DOWNLOADS"]["local_storage_path"] = "../.dev/" runner.configuration.main_config["DOWNLOADS"]["local_storage_path"] = "../.dev/"
def fetch(): def fetch():

View File

@ -1,25 +1,25 @@
FROM node:18.8 as build-deps FROM node:18.8 as build-deps
WORKDIR /app WORKDIR /app/client
COPY client/package.json ./ COPY client/package.json ./
COPY client/package-lock.json ./ COPY client/package-lock.json ./
COPY client/rollup.config.js ./ COPY client/rollup.config.js ./
COPY client/src ./src/ COPY client/src ./src/
RUN npm install
RUN npm run build RUN npm run build
FROM python:latest FROM python:latest
ENV TZ Europe/Zurich ENV TZ Europe/Zurich
RUN apt-get update && apt-get install -y postgresql
RUN mkdir -p /app/news_check
COPY requirements.txt /app/requirements.txt
RUN python3 -m pip install -r /app/requirements.txt
COPY --from=build-deps /app/public /app/news_check/public
COPY app /app/news_check
WORKDIR /app/news_check WORKDIR /app/news_check
CMD gunicorn app:app -w 1 --threads 2 -b 0.0.0.0:80 COPY requirements.txt requirements.txt
RUN python3 -m pip install -r requirements.txt
COPY client/public/index.html client/public/index.html
COPY --from=build-deps /app/client/public client/public/
COPY server server/
WORKDIR /app/news_check/server
# CMD python app.py

View File

@ -4,10 +4,9 @@
<meta charset='utf-8'> <meta charset='utf-8'>
<meta name='viewport' content='width=device-width,initial-scale=1'> <meta name='viewport' content='width=device-width,initial-scale=1'>
<title>Svelte app</title> <title>NEWS CHECK</title>
<link rel='icon' type='image/png' href='/favicon.png'> <link rel='icon' type='image/png' href='https://ethz.ch/etc/designs/ethz/img/icons/ETH-APP-Icons-Theme-white/192-xxxhpdi.png'>
<link rel='stylesheet' href='/global.css'>
<link rel='stylesheet' href='/build/bundle.css'> <link rel='stylesheet' href='/build/bundle.css'>
<script defer src='/build/bundle.js'></script> <script defer src='/build/bundle.js'></script>
@ -16,7 +15,7 @@
<script src="https://cdn.tailwindcss.com"></script> <script src="https://cdn.tailwindcss.com"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/pdf.js/2.0.943/pdf.min.js"></script> <script src="https://cdnjs.cloudflare.com/ajax/libs/pdf.js/2.0.943/pdf.min.js"></script>
<html data-theme="light"></html> <html data-theme="light"></html> <!-- Daisy-ui theme -->
</head> </head>

View File

@ -1,4 +1,4 @@
flask flask
peewee peewee
markdown markdown
gunicorn psycopg2

View File

@ -1,10 +0,0 @@
# Svelte.js + Flask
A super simple example of using Flask to serve a Svelte app and use it as a backend server.
Run the following for development:
- `python server.py` to start the Flask server.
- `cd client; npm install; npm run autobuild` to automatically build and reload the Svelte frontend when it's changed.
This example just queries the Flask server for a random number.

View File

@ -1,4 +1,3 @@
from crypt import methods
import json import json
from flask import Flask, send_from_directory, jsonify from flask import Flask, send_from_directory, jsonify
import random import random
@ -7,7 +6,8 @@ app = Flask(__name__)
############################################################################### ###############################################################################
# SVELTE BACKEND. Always send index.html and the requested js-files. (compiled by npm) # SVELTE 'STATIC' BACKEND. Always send index.html and the requested js-files. (compiled by npm)
@app.route("/") #index.html @app.route("/") #index.html
def base(): def base():
return send_from_directory('../client/public', 'index.html') return send_from_directory('../client/public', 'index.html')
@ -17,6 +17,7 @@ def home(path):
############################################################################### ###############################################################################
# API for news_check. # API for news_check.
@ -34,4 +35,4 @@ def set_article(id):
if __name__ == "__main__": if __name__ == "__main__":
app.run(debug=True) app.run(host="0.0.0.0", port="80")

View File

@ -1,3 +0,0 @@
{
"lockfileVersion": 1
}

20
news_check/test.py Normal file
View File

@ -0,0 +1,20 @@
import peewee
db = peewee.PostgresqlDatabase('coss_archiving', user='ca_rw', password='pleasechangeit', host='vpn', port=5432)
# db.connect()
class Pet(peewee.Model):
name = peewee.CharField()
animal_type = peewee.CharField()
class Meta:
database = db # this model uses the "people.db" database
with db:
db.create_tables([Pet])
db.get_tables()
t = Pet.create(name="Test", animal_type="test")
for pet in Pet.select():
print(pet.name)

View File

@ -1,2 +1,2 @@
.dev/ Dockerfile
__pycache__/ __pycache__/

View File

@ -3,25 +3,18 @@ FROM python:latest
ENV TZ Europe/Zurich ENV TZ Europe/Zurich
RUN apt-get update && apt-get install -y \
evince \
# for checking
xauth \
#for gui
ghostscript
# for compression
RUN useradd --create-home --shell /bin/bash --uid 1001 autonews RUN useradd --create-home --shell /bin/bash --uid 1001 autonews
# id mapped to local user # id mapped to local user
# home directory needed for pip package installation # home directory needed for pip package installation
RUN export PATH=/home/autonews/.local/bin:$PATH
RUN mkdir -p /app/auto_news RUN mkdir -p /app/auto_news
RUN chown -R autonews:autonews /app RUN chown -R autonews:autonews /app
USER autonews USER autonews
RUN export PATH=/home/autonews/.local/bin:$PATH
COPY requirements.txt /app/requirements.txt COPY requirements.txt /app/requirements.txt
RUN python3 -m pip install -r /app/requirements.txt RUN python3 -m pip install -r /app/requirements.txt
COPY app /app/auto_news COPY . /app/auto_news
WORKDIR /app/auto_news WORKDIR /app/auto_news

View File

@ -1,59 +0,0 @@
from dataclasses import dataclass
import os
import shutil
import configparser
import logging
from datetime import datetime
from peewee import SqliteDatabase
from rich.logging import RichHandler
# first things first: logging
logging.basicConfig(
format='%(message)s',
level=logging.INFO,
datefmt='%H:%M:%S', # add %Y-%m-%d if needed
handlers=[RichHandler()]
)
logger = logging.getLogger(__name__)
# load config file containing constants and secrets
parsed = configparser.ConfigParser()
parsed.read("/app/containerdata/config/news_fetch.config.ini")
if os.getenv("DEBUG", "false") == "true":
logger.warning("Found 'DEBUG=true', setting up dummy databases")
db_base_path = parsed["DATABASE"]["db_path_dev"]
parsed["SLACK"]["archive_id"] = parsed["SLACK"]["debug_id"]
parsed["MAIL"]["recipient"] = parsed["MAIL"]["sender"]
parsed["DOWNLOADS"]["local_storage_path"] = parsed["DATABASE"]["db_path_dev"]
else:
logger.warning("Found 'DEBUG=false' and running on production databases, I hope you know what you're doing...")
db_base_path = parsed["DATABASE"]["db_path_prod"]
logger.info("Backing up databases")
backup_dst = parsed["DATABASE"]["db_backup"]
today = datetime.today().strftime("%Y.%m.%d")
shutil.copyfile(
os.path.join(db_base_path, parsed["DATABASE"]["chat_db_name"]),
os.path.join(backup_dst, today + "." + parsed["DATABASE"]["chat_db_name"]),
)
shutil.copyfile(
os.path.join(db_base_path, parsed["DATABASE"]["download_db_name"]),
os.path.join(backup_dst, today + "." + parsed["DATABASE"]["download_db_name"]),
)
from utils_storage import models
# Set up the database
models.set_db(
SqliteDatabase(
os.path.join(db_base_path, parsed["DATABASE"]["chat_db_name"]),
pragmas = {'journal_mode': 'wal'} # mutliple threads can read at once
),
SqliteDatabase(
os.path.join(db_base_path, parsed["DATABASE"]["download_db_name"]),
pragmas = {'journal_mode': 'wal'} # mutliple threads can read at once
)
)

View File

@ -1,285 +0,0 @@
import logging
import configuration
import requests
import os
import time
from threading import Thread
from slack_sdk.errors import SlackApiError
logger = logging.getLogger(__name__)
config = configuration.parsed["SLACK"]
models = configuration.models
slack_client = "dummy"
LATEST_RECORDED_REACTION = 0
def init(client) -> None:
"""Starts fetching past messages and returns the freshly launched thread"""
global slack_client
slack_client = client
global LATEST_RECORDED_REACTION
try:
LATEST_RECORDED_REACTION = models.Reaction.select(models.Reaction.id).order_by("id")[-1]
except IndexError: #query is actually empty, we have never fetched any messages until now
LATEST_RECORDED_REACTION = 0
# fetch all te messages we could have possibly missed
logger.info("Querying missed messages, threads and reactions. This can take some time.")
fetch_missed_channel_messages() # not threaded
t = Thread(target = fetch_missed_channel_reactions, daemon=True) # threaded, runs in background (usually takes a long time)
t.start()
if os.getenv("REDUCEDFETCH", "false") == "true":
logger.warning("Only fetching empty threads for bot messages because 'REDUCEDFETCH=true'")
fetch_missed_thread_messages(reduced=True)
else: # perform both asyncronously
fetch_missed_thread_messages()
def get_unhandled_messages():
"""Gets all messages that have not yet been handled, be it by mistake or by downtime
As the message handler makes no distinction between channel messages and thread messages,
we don't have to worry about them here.
"""
threaded_objects = []
for t in models.Thread.select():
if t.message_count > 1: # if only one message was written, it is the channel message
msg = t.last_message
if msg.is_by_human:
threaded_objects.append(msg)
# else don't, nothing to process
logger.info(f"Set {len(threaded_objects)} thread-messages as not yet handled.")
channel_objects = [t.initiator_message for t in models.Thread.select() if (t.message_count == 1 and not t.is_fully_processed)]
logger.info(f"Set {len(channel_objects)} channel-messages as not yet handled.")
reaction_objects = list(models.Reaction.select().where(models.Reaction.id > LATEST_RECORDED_REACTION))
logger.info(f"Set {len(reaction_objects)} reactions as not yet handled.")
# the ones newer than the last before the fetch
all_messages = channel_objects + threaded_objects
return all_messages, reaction_objects
def fetch_missed_channel_messages():
# latest processed message_ts is:
presaved = models.Message.select().order_by(models.Message.ts)
if not presaved:
last_ts = 0
else:
last_message = presaved[-1]
last_ts = last_message.slack_ts
result = slack_client.conversations_history(
channel=config["archive_id"],
oldest=last_ts
)
new_messages = result.get("messages", [])
# # filter the last one, it is a duplicate! (only if the db is not empty!)
# if last_ts != 0 and len(new_messages) != 0:
# new_messages.pop(-1)
new_fetches = 0
for m in new_messages:
# print(m)
message_dict_to_model(m)
new_fetches += 1
refetch = result.get("has_more", False)
while refetch: # we have not actually fetched them all
try:
result = slack_client.conversations_history(
channel = config["archive_id"],
cursor = result["response_metadata"]["next_cursor"],
oldest = last_ts
) # fetches 100 messages, older than the [-1](=oldest) element of new_fetches
refetch = result.get("has_more", False)
new_messages = result.get("messages", [])
for m in new_messages:
message_dict_to_model(m)
new_fetches += 1
except SlackApiError: # Most likely a rate-limit
logger.error("Error while fetching channel messages. (likely rate limit) Retrying in {} seconds...".format(config["api_wait_time"]))
time.sleep(config["api_wait_time"])
refetch = True
logger.info(f"Fetched {new_fetches} new channel messages.")
def fetch_missed_thread_messages(reduced=False):
"""After having gotten all base-threads, we need to fetch all their replies"""
# I don't know of a better way: we need to fetch this for each and every thread (except if it is marked as permanently solved)
logger.info("Starting fetch of thread messages...")
if reduced:
threads = [t for t in models.Thread.select() if (t.message_count == 1 and not t.is_fully_processed)]
# this only fetches completely empty threads, which might be because the bot-message was not yet saved to the db.
# once we got all the bot-messages the remaining empty threads will be the ones we need to process.
else:
threads = [t for t in models.Thread.select() if not t.is_fully_processed]
logger.info(f"Fetching history for {len(threads)} empty threads")
new_messages = []
for i,t in enumerate(threads):
try:
messages = slack_client.conversations_replies(
channel = config["archive_id"],
ts = t.slack_ts,
oldest = t.messages[-1].slack_ts
)["messages"]
except SlackApiError:
logger.error("Hit rate limit while querying threaded messages, retrying in {}s ({}/{} queries elapsed)".format(config["api_wait_time"], i, len(threads)))
time.sleep(int(config["api_wait_time"]))
messages = slack_client.conversations_replies(
channel = config["archive_id"],
ts = t.slack_ts,
oldest = t.messages[-1].slack_ts
)["messages"]
messages.pop(0) # the first message is the one posted in the channel. We already processed it!
for m in messages:
# only append *new* messages
res = message_dict_to_model(m)
if res:
new_messages.append(res)
logger.info("Fetched {} new threaded messages.".format(len(new_messages)))
def fetch_missed_channel_reactions():
logger.info("Starting background fetch of channel reactions...")
threads = [t for t in models.Thread.select() if not t.is_fully_processed]
for i,t in enumerate(threads):
reactions = []
try:
query = slack_client.reactions_get(
channel = config["archive_id"],
timestamp = t.slack_ts
)
reactions = query.get("message", []).get("reactions", []) # default = []
except SlackApiError as e:
if e.response.get("error", "") == "message_not_found":
m = t.initiator_message
logger.warning(f"Message (id={m.id}) not found. Skipping and saving...")
# this usually means the message is past the 1000 message limit imposed by slack. Mark it as processed in the db
m.is_processed_override = True
m.save()
else: # probably a rate_limit:
logger.error("Hit rate limit while querying reactions. retrying in {}s ({}/{} queries elapsed)".format(config["api_wait_time"], i, len(threads)))
time.sleep(int(config["api_wait_time"]))
for r in reactions:
reaction_dict_to_model(r, t)
# Helpers for message conversion to db-objects
def reaction_dict_to_model(reaction, thread=None):
if thread is None:
m_ts = reaction["item"]["ts"]
message = models.Message.get(ts = float(m_ts))
thread = message.thread
if "name" in reaction.keys(): # fetched through manual api query
content = reaction["name"]
elif "reaction" in reaction.keys(): # fetched through events
content = reaction["reaction"]
else:
logger.error(f"Weird reaction received: {reaction}")
return None
r, _ = models.Reaction.get_or_create(
type = content,
message = thread.initiator_message
)
logger.info("Saved reaction [{}]".format(content))
return r
def message_dict_to_model(message):
if message["type"] == "message":
thread_ts = message["thread_ts"] if "thread_ts" in message else message["ts"]
uid = message.get("user", "BAD USER")
if uid == "BAD USER":
logger.critical("Message has no user?? {}".format(message))
return None
user, _ = models.User.get_or_create(user_id = uid)
thread, _ = models.Thread.get_or_create(thread_ts = thread_ts)
m, new = models.Message.get_or_create(
user = user,
thread = thread,
ts = message["ts"],
channel_id = config["archive_id"],
text = message["text"]
)
logger.info(f"Saved: {m} ({'new' if new else 'old'})")
files = message.get("files", [])
if len(files) >= 1:
f = files[0] #default: []
m.file_type = f["filetype"]
m.perma_link = f["url_private_download"]
m.save()
logger.info(f"Saved {m.file_type}-file for message (id={m.id})")
if new:
return m
else:
return None
else:
logger.warning("What should I do of {}".format(message))
return None
def say_substitute(*args, **kwargs):
logger.info("Now sending message through say-substitute: {}".format(" - ".join(args)))
slack_client.chat_postMessage(
channel=config["archive_id"],
text=" - ".join(args),
**kwargs
)
def save_as_related_file(url, article_object):
r = requests.get(url, headers={"Authorization": "Bearer {}".format(slack_client.token)})
saveto = article_object.save_path
ftype = url[url.rfind(".") + 1:]
fname = "{} - related no {}.{}".format(
article_object.file_name.replace(".pdf",""),
len(article_object.related) + 1,
ftype
)
with open(os.path.join(saveto, fname), "wb") as f:
f.write(r.content)
article_object.set_related([fname])
logger.info("Added {} to model {}".format(fname, article_object))
return fname
def react_file_path_message(fname, article_object):
saveto = article_object.save_path
file_path = os.path.join(saveto, fname)
if os.path.exists(file_path):
article_object.set_related([fname])
logger.info("Added {} to model {}".format(fname, article_object))
return True
else:
return False
def is_message_in_archiving(message) -> bool:
if isinstance(message, dict):
return message["channel"] == config["archive_id"]
else:
return message.channel_id == config["archive_id"]
def is_reaction_in_archiving(event) -> bool:
if isinstance(event, dict):
return event["item"]["channel"] == config["archive_id"]
else:
return event.message.channel_id == config["archive_id"]

View File

@ -1,189 +0,0 @@
from slack_bolt import App
from slack_bolt.adapter.socket_mode import SocketModeHandler
from slack_sdk.errors import SlackApiError
import logging
import configuration
from . import message_helpers
config = configuration.parsed["SLACK"]
models = configuration.models
class BotApp(App):
logger = logging.getLogger(__name__)
def __init__(self, callback, *args, **kwargs):
super().__init__(*args, **kwargs)
self.callback = callback
def pre_start(self):
message_helpers.init(self.client)
missed_messages, missed_reactions = message_helpers.get_unhandled_messages()
[self.handle_incoming_message(m) for m in missed_messages]
[self.handle_incoming_reaction(r) for r in missed_reactions]
# self.react_missed_reactions(missed_reactions)
# self.react_missed_messages(missed_messages)
self.startup_status()
def handle_incoming_reaction(self, reaction):
if isinstance(reaction, dict): #else: the reaction is already being passed as a model
# CAUTION: filter for 'changed reactions' those are nasty (usually when adding an url)
reaction = message_helpers.reaction_dict_to_model(reaction)
thread = reaction.message.thread
article_object = thread.article
if not article_object is None:
reaction = reaction.type
status = 1 if reaction == "white_check_mark" else -1
# self.logger.info(f"Applying reaction {reaction} to its root message.")
article_object.verified = status
article_object.save()
def handle_incoming_message(self, message):
"""Reacts to all messages inside channel archiving. Must then
distinguish between threaded replies and new requests
and react accordingly"""
if isinstance(message, dict): #else: the message is already being passed as a model
# CAUTION: filter for 'changed messages' those are nasty (usually when adding an url)
if message.get("subtype", "not bad") == "message_changed":
return False
message = message_helpers.message_dict_to_model(message)
# First check: belongs to thread?
is_threaded = message.thread.message_count > 1 and message != message.thread.initiator_message
if is_threaded:
self.incoming_thread_message(message)
else:
self.incoming_channel_message(message)
def incoming_thread_message(self, message):
if message.user.user_id == config["bot_id"]:
return True # ignore the files uploaded by the bot. We handled them already!
thread = message.thread
if thread.is_fully_processed:
return True
self.logger.info("Receiving thread-message")
self.respond_thread_message(message)
def incoming_channel_message(self, message):
self.logger.info(f"Handling message {message} ({len(message.urls)} urls)")
if not message.urls: # no urls in a root-message => IGNORE
message.is_processed_override = True
message.save()
return
# ensure thread is still empty, this is a scenario encountered only in testing, but let's just filter it
if message.thread.message_count > 1:
self.logger.info("Discarded message because it is actually processed.")
return
if len(message.urls) > 1:
message_helpers.say_substitute("Only the first url is being handled. Please send any subsequent url as a separate message", thread_ts=message.thread.slack_ts)
self.callback(message)
# for url in message.urls:
# self.callback(url, message)
# stop here!
def respond_thread_message(self, message, say=message_helpers.say_substitute):
thread = message.thread
article = thread.article
if message.perma_link: # file upload means new data
fname = message_helpers.save_as_related_file(message.perma_link, article)
say("File was saved as 'related file' under `{}`.".format(fname),
thread_ts=thread.slack_ts
)
else: # either a pointer to a new file (too large to upload), or trash
success = message_helpers.react_file_path_message(message.text, article)
if success:
say("File was saved as 'related file'", thread_ts=thread.slack_ts)
else:
self.logger.error("User replied to thread {} but the response did not contain a file/path".format(thread))
say("Cannot process response without associated file.",
thread_ts=thread.slack_ts
)
def respond_channel_message(self, thread, say=message_helpers.say_substitute):
article = thread.article
answers = article.slack_info
for a in answers:
if a["file_path"]:
try: # upload resulted in an error
self.client.files_upload(
channels = config["archive_id"],
initial_comment = f"<@{config['responsible_id']}> \n {a['reply_text']}",
file = a["file_path"],
thread_ts = thread.slack_ts
)
status = True
except SlackApiError as e:
say(
"File {} could not be uploaded.".format(a),
thread_ts=thread.slack_ts
)
status = False
self.logger.error(f"File upload failed: {e}")
else: # anticipated that there is no file!
say(
f"<@{config['responsible_id']}> \n {a['reply_text']}",
thread_ts=thread.slack_ts
)
status = True
def startup_status(self):
threads = [t for t in models.Thread.select()]
all_threads = len(threads)
fully_processed = len([t for t in threads if t.is_fully_processed])
fully_unprocessed = len([t for t in threads if t.message_count == 1])
articles_unprocessed = len(models.ArticleDownload.select().where(models.ArticleDownload.verified < 1))
self.logger.info(f"[bold]STATUS[/bold]: Fully processed {fully_processed}/{all_threads} threads. {fully_unprocessed} threads have 0 replies. Article-objects to verify: {articles_unprocessed}", extra={"markup": True})
class BotRunner():
"""Stupid encapsulation so that we can apply the slack decorators to the BotApp"""
def __init__(self, callback, *args, **kwargs) -> None:
self.bot_worker = BotApp(callback, token=config["auth_token"])
@self.bot_worker.event(event="message", matchers=[message_helpers.is_message_in_archiving])
def handle_incoming_message(message, say):
return self.bot_worker.handle_incoming_message(message)
@self.bot_worker.event(event="reaction_added", matchers=[message_helpers.is_reaction_in_archiving])
def handle_incoming_reaction(event, say):
return self.bot_worker.handle_incoming_reaction(event)
self.handler = SocketModeHandler(self.bot_worker, config["app_token"])
def start(self):
self.bot_worker.pre_start()
self.handler.start()
def stop(self):
self.handler.close()
print("Bye handler!")
# def respond_to_message(self, message):
# self.bot_worker.handle_incoming_message(message)

View File

@ -1,331 +0,0 @@
import logging
logger = logging.getLogger(__name__)
from peewee import *
import os
import markdown
import re
import configuration
import datetime
config = configuration.parsed["DOWNLOADS"]
slack_config = configuration.parsed["SLACK"]
## Helpers
chat_db = DatabaseProxy()
download_db = DatabaseProxy()
# set the nature of the db at runtime
class DownloadBaseModel(Model):
class Meta:
database = download_db
class ChatBaseModel(Model):
class Meta:
database = chat_db
## == Article related models == ##
class ArticleDownload(DownloadBaseModel):
title = CharField(default='')
pub_date = DateField(default = '')
download_date = DateField(default = datetime.date.today)
source_name = CharField(default = '')
article_url = TextField(default = '', unique=True)
archive_url = TextField(default = '')
file_name = TextField(default = '')
language = CharField(default = '')
summary = TextField(default = '')
comment = TextField(default = '')
verified = IntegerField(default = False)
# authors
# keywords
# ... are added through foreignkeys
def __str__(self) -> str:
if self.title != '' and self.source_name != '':
desc = f"{shorten_name(self.title)} -- {self.source_name}"
else:
desc = f"{self.article_url}"
return f"ART [{desc}]"
## Useful Properties
@property
def save_path(self):
return f"{config['local_storage_path']}/{self.download_date.year}/{self.download_date.strftime('%B')}/"
def fname_nas(self, file_name=""):
if self.download_date:
if file_name:
return "NAS: {}/{}/{}/{}".format(config["remote_storage_path"], self.download_date.year, self.download_date.strftime("%B"), file_name)
else: # return the self. name
return "NAS: {}/{}/{}/{}".format(config["remote_storage_path"], self.download_date.year, self.download_date.strftime("%B"), self.file_name)
else:
return None
@property
def fname_template(self):
if "youtube.com" in self.source_name or "youtu.be" in self.source_name:
fname = "{} -- {}".format(self.source_name, self.title)
else:
fname = "{} -- {}.pdf".format(self.source_name, self.title)
return clear_path_name(fname)
@property
def is_title_bad(self): # add incrementally
return "PUR-Abo" in self.title \
or "Redirecting" in self.title \
or "Error while running fetch" in self.title
@property
def slack_info(self):
status = [":x: No better version available", ":gear: Verification pending", ":white_check_mark: Verified by human"][self.verified + 1]
content = "\n>" + "\n>".join(self.summary.split("\n"))
file_status, msg = self.file_status()
if not file_status:
return [msg]
# everything alright: generate real content
# first the base file
if self.file_name[-4:] == ".pdf":
answer = [{ # main reply with the base pdf
"reply_text" : f"*{self.title}*\n{status}\n{content}",
"file_path" : self.save_path + self.file_name
}]
else: # don't upload if the file is too big!
location = "Not uploaded to slack, but the file will be on the NAS:\n`{}`".format(self.fname_nas())
answer = [{ # main reply with the base pdf
"reply_text" : "*{}*\n{}\n{}\n{}".format(self.title, status, content, location),
"file_path" : None
}]
# then the related files
rel_text = ""
for r in self.related:
fname = r.related_file_name
lentry = "\n• `{}` ".format(self.fname_nas(fname))
if fname[-4:] == ".pdf": # this is a manageable file, directly upload
f_ret = self.save_path + fname
answer.append({"reply_text":"", "file_path" : f_ret})
else: # not pdf <=> too large. Don't upload but mention its existence
lentry += "(not uploaded to slack, but the file will be on the NAS)"
rel_text += lentry
if rel_text:
rel_text = answer[0]["reply_text"] = answer[0]["reply_text"] + "\nRelated files:\n" + rel_text
return answer
@property
def mail_info(self):
base = [{"reply_text": "[{}]({})\n".format(self.article_url, self.article_url), "file_path":None}] + self.slack_info
return [{"reply_text": markdown.markdown(m["reply_text"]), "file_path": m["file_path"]} for m in base]
## Helpers
def set_keywords(self, keywords):
for k in keywords:
ArticleKeyword.create(
article = self,
keyword = k
)
def set_authors(self, authors):
for a in authors:
ArticleAuthor.create(
article = self,
author = a
)
def set_references(self, references):
for r in references:
ArticleReference.create(
article = self,
reference_url = r
)
def set_related(self, related):
for r in related:
ArticleRelated.create(
article = self,
related_file_name = r
)
def file_status(self):
if not self.file_name:
logger.error("Article {} has no filename!".format(self))
return False, {"reply_text": "Download failed, no file was saved.", "file_path": None}
file_path_abs = self.save_path + self.file_name
if not os.path.exists(file_path_abs):
logger.error("Article {} has a filename, but the file does not exist at that location!".format(self))
return False, {"reply_text": "Can't find file. Either the download failed or the file was moved.", "file_path": None}
return True, {}
class ArticleKeyword(DownloadBaseModel):
# instance gets created for every one keyword -> flexible in size
article = ForeignKeyField(ArticleDownload, backref='keywords')
keyword = CharField()
class ArticleAuthor(DownloadBaseModel):
article = ForeignKeyField(ArticleDownload, backref='authors')
author = CharField()
class ArticleReference(DownloadBaseModel):
article = ForeignKeyField(ArticleDownload, backref='references')
reference_url = TextField(default = '')
class ArticleRelated(DownloadBaseModel):
article = ForeignKeyField(ArticleDownload, backref='related')
related_file_name = TextField(default = '')
## == Slack-thread related models == ##
class User(ChatBaseModel):
user_id = CharField(default='', unique=True)
# messages
class Thread(ChatBaseModel):
"""The threads that concern us are only created if the base massage contains a url"""
thread_ts = FloatField(default = 0)
article = ForeignKeyField(ArticleDownload, backref="slack_thread", null=True, default=None)
# provides, ts, user, models
# messages
@property
def slack_ts(self):
str_ts = str(self.thread_ts)
cut_zeros = 6 - (len(str_ts) - str_ts.find(".") - 1) # usually there a 6 decimals. If there are less, problem!
return "{}{}".format(str_ts, cut_zeros*"0")
@property
def initiator_message(self):
try:
return self.messages[0] # TODO check if this needs sorting
except IndexError:
logger.warning(f"Thread {self} is empty. How can that be?")
return None
@property
def message_count(self):
# logger.warning("message_count was called")
return self.messages.count()
@property
def last_message(self):
messages = Message.select().where(Message.thread == self).order_by(Message.ts) # can't be empty by definition/creation
return messages[-1]
@property
def is_fully_processed(self) -> bool:
init_message = self.initiator_message
if init_message is None:
return False
if init_message.is_processed_override:
return True
# this override is set for instance, when no url was sent at all. Then set this thread to be ignored
reactions = init_message.reaction
if not reactions:
return False
else:
r = reactions[0].type # can and should only have one reaction
return r == "white_check_mark" \
or r == "x"
class Message(ChatBaseModel):
ts = FloatField(unique=True) #for sorting
channel_id = CharField(default='')
user = ForeignKeyField(User, backref="messages")
text = TextField(default='')
thread = ForeignKeyField(Thread, backref="messages", default=None)
file_type = CharField(default='')
perma_link = CharField(default='')
is_processed_override = BooleanField(default=False)
# reaction
def __str__(self) -> str:
return "MSG [{}]".format(shorten_name(self.text).replace('\n','/'))
@property
def slack_ts(self):
str_ts = str(self.ts)
cut_zeros = 6 - (len(str_ts) - str_ts.find(".") - 1) # usually there a 6 decimals. If there are less, problem!
return "{}{}".format(str_ts, cut_zeros * "0")
@property
def urls(self):
pattern = r"<(.*?)>"
matches = re.findall(pattern, self.text)
matches = [m for m in matches if "." in m]
new_matches = []
for m in matches:
if "." in m: # must contain a tld, right?
# further complication: slack automatically abreviates urls in the format:
# <url|link preview>. Lucky for us, "|" is a character derecommended in urls, meaning we can "safely" split for it and retain the first half
if "|" in m:
keep = m.split("|")[0]
else:
keep = m
new_matches.append(keep)
return new_matches
@property
def is_by_human(self):
return self.user.user_id != slack_config["bot_id"]
@property
def has_single_url(self):
return len(self.urls) == 1
class Reaction(ChatBaseModel):
type = CharField(default = "")
message = ForeignKeyField(Message, backref="reaction")
def create_tables():
with download_db:
download_db.create_tables([ArticleDownload, ArticleKeyword, ArticleAuthor, ArticleReference, ArticleRelated])
with chat_db:
chat_db.create_tables([User, Message, Thread, Reaction])
def set_db(chat_db_object, download_db_object):
chat_db.initialize(chat_db_object)
download_db.initialize(download_db_object)
create_tables()
def clear_path_name(path):
keepcharacters = (' ','.','_', '-')
converted = "".join([c if (c.isalnum() or c in keepcharacters) else "_" for c in path]).rstrip()
return converted
def shorten_name(name, offset = 50):
if len(name) > offset:
return name[:offset] + "..."
else:
return name

View File

@ -0,0 +1,66 @@
import os
import shutil
import configparser
import logging
from datetime import datetime
from peewee import SqliteDatabase, PostgresqlDatabase
from rich.logging import RichHandler
# first things first: logging
logging.basicConfig(
format='%(message)s',
level=logging.INFO,
datefmt='%H:%M:%S', # add %Y-%m-%d if needed
handlers=[RichHandler()]
)
logger = logging.getLogger(__name__)
# load config file containing constants and secrets
main_config = configparser.ConfigParser()
main_config.read("/app/containerdata/config/news_fetch.config.ini")
db_config = configparser.ConfigParser()
db_config.read("/app/containerdata/config/db.config.ini")
# DEBUG MODE:
if os.getenv("DEBUG", "false") == "true":
logger.warning("Found 'DEBUG=true', setting up dummy databases")
main_config["SLACK"]["archive_id"] = main_config["SLACK"]["debug_id"]
main_config["MAIL"]["recipient"] = main_config["MAIL"]["sender"]
main_config["DOWNLOADS"]["local_storage_path"] = main_config["DOWNLOADS"]["debug_storage_path"]
download_db = SqliteDatabase(
main_config["DATABASE"]["download_db_debug"],
pragmas = {'journal_mode': 'wal'} # mutliple threads can read at once
)
# PRODUCTION MODE:
else:
logger.warning("Found 'DEBUG=false' and running on production databases, I hope you know what you're doing...")
cred = db_config["DATABASE"]
download_db = PostgresqlDatabase(
cred["db_name"], user=cred["user_name"], password=cred["password"], host="vpn", port=5432
)
# TODO Reimplement backup/printout
# logger.info("Backing up databases")
# backup_dst = main_config["DATABASE"]["db_backup"]
# today = datetime.today().strftime("%Y.%m.%d")
# shutil.copyfile(
# os.path.join(db_base_path, main_config["DATABASE"]["chat_db_name"]),
# os.path.join(backup_dst, today + "." + main_config["DATABASE"]["chat_db_name"]),
# )
# shutil.copyfile(
# os.path.join(db_base_path, main_config["DATABASE"]["download_db_name"]),
# os.path.join(backup_dst, today + "." + main_config["DATABASE"]["download_db_name"]),
# )
from utils_storage import models
# Set up the database
models.set_db(download_db)

View File

@ -3,7 +3,6 @@ import configuration
models = configuration.models models = configuration.models
from threading import Thread from threading import Thread
import logging import logging
import os
import sys import sys
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -14,10 +13,9 @@ from utils_worker.workers import CompressWorker, DownloadWorker, FetchWorker, Up
class ArticleWatcher: class ArticleWatcher:
"""Wrapper for a newly created article object. Notifies the coordinator upon change/completition""" """Wrapper for a newly created article object. Notifies the coordinator upon change/completition"""
def __init__(self, article, thread, **kwargs) -> None: def __init__(self, article, **kwargs) -> None:
self.article_id = article.id # in case article becomes None at any point, we can still track the article self.article_id = article.id # in case article becomes None at any point, we can still track the article
self.article = article self.article = article
self.thread = thread
self.completition_notifier = kwargs.get("notifier") self.completition_notifier = kwargs.get("notifier")
self.fetch = kwargs.get("worker_fetch", None) self.fetch = kwargs.get("worker_fetch", None)
@ -50,7 +48,7 @@ class ArticleWatcher:
elif completed_action == "download": elif completed_action == "download":
self.compress.process(self) self.compress.process(self)
elif completed_action == "compress": # last step elif completed_action == "compress": # last step
self.completition_notifier(self.article, self.thread) self.completition_notifier(self.article)
# triggers action in Coordinator # triggers action in Coordinator
elif completed_action == "upload": elif completed_action == "upload":
# this case occurs when upload was faster than compression # this case occurs when upload was faster than compression
@ -118,17 +116,34 @@ class Coordinator(Thread):
def launch(self) -> None: def launch(self) -> None:
for w in [self.worker_download, self.worker_fetch, self.worker_upload, self.worker_compress]: for w in [self.worker_download, self.worker_fetch, self.worker_upload, self.worker_compress]:
if not w is None: if not w is None: # for reduced operations such as upload, some workers are set to None
w.start() w.start()
# if past messages have not been sent, they must be reevaluated
unsent = models.ArticleDownload.filter(sent = False)
# .objects.filter(sent = False)
for a in unsent:
print(a)
self.incoming_request(article=a)
def incoming_request(self, message):
"""This method is passed onto the slack worker. It gets triggered when a new message is received.""" def incoming_request(self, message=None, article=None):
"""This method is passed onto the slack worker. It then is called when a new message is received."""
if message is not None:
try:
url = message.urls[0] # ignore all the other ones url = message.urls[0] # ignore all the other ones
except IndexError:
return
article, is_new = models.ArticleDownload.get_or_create(article_url=url) article, is_new = models.ArticleDownload.get_or_create(article_url=url)
thread = message.thread article.slack_ts = message.ts # either update the timestamp (to the last reference to the article) or set it for the first time
thread.article = article elif article is not None:
thread.save() is_new = False
logger.info(f"Received article {article} in incoming_request")
else:
logger.error("Coordinator.incoming_request called with no arguments")
return
self.kwargs.update({"notifier" : self.article_complete_notifier}) self.kwargs.update({"notifier" : self.article_complete_notifier})
if is_new or (article.file_name == "" and article.verified == 0): if is_new or (article.file_name == "" and article.verified == 0):
@ -136,7 +151,6 @@ class Coordinator(Thread):
# this overwrites previously set information, but that should not be too important # this overwrites previously set information, but that should not be too important
ArticleWatcher( ArticleWatcher(
article, article,
thread,
**self.kwargs **self.kwargs
) )
@ -146,7 +160,7 @@ class Coordinator(Thread):
# the watcher will notify once it is sufficiently populated # the watcher will notify once it is sufficiently populated
else: # manually trigger notification immediatly else: # manually trigger notification immediatly
logger.info(f"Found existing article {article}. Now sending") logger.info(f"Found existing article {article}. Now sending")
self.article_complete_notifier(article, thread) self.article_complete_notifier(article)
@ -155,34 +169,33 @@ class Coordinator(Thread):
w.start() w.start()
for article in articles: for article in articles:
notifier = lambda article: print(f"Completed manual actions for {article}") notifier = lambda article: logger.info(f"Completed manual actions for {article}")
ArticleWatcher(article, None, workers_manual = workers, notifier = notifier) # Article watcher wants a thread to link article to TODO: handle threads as a kwarg ArticleWatcher(article, None, workers_manual = workers, notifier = notifier) # Article watcher wants a thread to link article to TODO: handle threads as a kwarg
def article_complete_notifier(self, article, thread): def article_complete_notifier(self, article):
if self.worker_slack is None: if self.worker_slack is None:
logger.warning("Not sending slack notifier") logger.warning("Skipping slack notification because worker is None")
else: else:
self.worker_slack.bot_worker.respond_channel_message(thread) self.worker_slack.bot_worker.respond_channel_message(article)
if self.worker_mail is None: if self.worker_mail is None:
logger.warning("Not sending mail notifier") logger.warning("Skipping mail notification because worker is None")
else: else:
self.worker_mail.send(article) self.worker_mail.send(article)
article.sent = True
article.save()
if __name__ == "__main__": if __name__ == "__main__":
coordinator = Coordinator() coordinator = Coordinator()
if os.getenv("UPLOAD", "false") == "true": if "upload" in sys.argv:
articles = models.ArticleDownload.select().where(models.ArticleDownload.archive_url == "").execute() articles = models.ArticleDownload.select().where(models.ArticleDownload.archive_url == "").execute()
logger.info(f"Launching upload to archive for {len(articles)} articles.") logger.info(f"Launching upload to archive for {len(articles)} articles.")
coordinator.manual_processing(articles, [UploadWorker()]) coordinator.manual_processing(articles, [UploadWorker()])
elif os.getenv("CHECK", "false") == "true":
from utils_check import runner as check_runner
check_runner.verify_unchecked()
else: # launch with full action else: # launch with full action
slack_runner = slack_runner.BotRunner(coordinator.incoming_request) slack_runner = slack_runner.BotRunner(coordinator.incoming_request)
kwargs = { kwargs = {
@ -196,10 +209,10 @@ if __name__ == "__main__":
try: try:
coordinator.add_workers(**kwargs) coordinator.add_workers(**kwargs)
coordinator.start() coordinator.start()
slack_runner.start() slack_runner.start() # last one to start, inside the main thread
except KeyboardInterrupt: except KeyboardInterrupt:
logger.info("Keyboard interrupt. Stopping Slack and Coordinator") logger.info("Keyboard interrupt. Stopping Slack and Coordinator")
slack_runner.stop() slack_runner.stop()
print("BYE!") logger.info("BYE!")
# coordinator was set as a daemon thread, so it will be stopped automatically # coordinator was set as a daemon thread, so it will be stopped automatically
sys.exit(0) sys.exit(0)

View File

@ -23,7 +23,7 @@ u_options = {
bot_client = WebClient( bot_client = WebClient(
token = configuration.parsed["SLACK"]["auth_token"] token = configuration.main_config["SLACK"]["auth_token"]
) )
@ -70,7 +70,7 @@ def send_reaction_to_slack_thread(article, reaction):
else: else:
ts = m.slack_ts ts = m.slack_ts
bot_client.reactions_add( bot_client.reactions_add(
channel=configuration.parsed["SLACK"]["archive_id"], channel=configuration.main_config["SLACK"]["archive_id"],
name=reaction, name=reaction,
timestamp=ts timestamp=ts
) )

View File

@ -7,7 +7,7 @@ import logging
import configuration import configuration
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
config = configuration.parsed["MAIL"] config = configuration.main_config["MAIL"]
def send(article_model): def send(article_model):
mail = MIMEMultipart() mail = MIMEMultipart()

View File

@ -0,0 +1,238 @@
from slack_bolt import App
from slack_bolt.adapter.socket_mode import SocketModeHandler
from slack_sdk.errors import SlackApiError
import logging
import re
import time
import configuration
config = configuration.main_config["SLACK"]
models = configuration.models
class MessageIsUnwanted(Exception):
# This exception is triggered when the message is either threaded (reply to another message) or weird (like an edit, a deletion, etc)
pass
class Message:
ts = str
user_id = str
text = str
logger = logging.getLogger(__name__)
def __init__(self, message_dict):
if message_dict.get("subtype", "not bad") == "message_changed":
raise MessageIsUnwanted()
if message_dict["type"] == "message":
if "thread_ts" in message_dict and (message_dict["thread_ts"] != message_dict["ts"]): # meaning it's a reply to another message
raise MessageIsUnwanted()
self.user_id = message_dict.get("user", "BAD USER")
# self.channel_id = config["archive_id"] # by construction, other messages are not intercepted
self.ts = message_dict["ts"]
self.text = message_dict["text"]
else:
self.logger.warning(f"What should I do of {message_dict}")
raise MessageIsUnwanted()
def __str__(self) -> str:
return f"MSG [{self.text}]"
@property
def urls(self):
pattern = r"<(.*?)>"
matches = re.findall(pattern, self.text)
matches = [m for m in matches if "." in m] # must contain a tld, right?
new_matches = []
for m in matches:
# further complication: slack automatically abreviates urls in the format:
# <url|link preview>. Lucky for us, "|" is a character derecommended in urls, meaning we can "safely" split for it and retain the first half
if "|" in m:
keep = m.split("|")[0]
else:
keep = m
new_matches.append(keep)
return new_matches
@property
def is_by_human(self):
return self.user.user_id != config["bot_id"]
@property
def has_single_url(self):
return len(self.urls) == 1
class BotApp(App):
logger = logging.getLogger(__name__)
def __init__(self, callback, *args, **kwargs):
super().__init__(*args, **kwargs)
self.callback = callback
def pre_start(self):
missed_messages = self.fetch_missed_channel_messages()
[self.handle_incoming_message(m) for m in missed_messages]
self.startup_status()
def say_substitute(self, *args, **kwargs):
self.client.chat_postMessage(
channel=config["archive_id"],
text=" - ".join(args),
**kwargs
)
def fetch_missed_channel_messages(self):
# latest processed message_ts is:
presaved = models.ArticleDownload.select().order_by(models.ArticleDownload.slack_ts.desc()).get_or_none()
if presaved is None:
last_ts = 0
else:
last_ts = presaved.slack_ts_full
result = self.client.conversations_history(
channel=config["archive_id"],
oldest=last_ts
)
new_messages = result.get("messages", [])
# # filter the last one, it is a duplicate! (only if the db is not empty!)
# if last_ts != 0 and len(new_messages) != 0:
# new_messages.pop(-1)
return_messages = [Message(m) for m in new_messages]
refetch = result.get("has_more", False)
while refetch: # we have not actually fetched them all
try:
result = self.client.conversations_history(
channel = config["archive_id"],
cursor = result["response_metadata"]["next_cursor"],
oldest = last_ts
) # fetches 100 messages, older than the [-1](=oldest) element of new_fetches
refetch = result.get("has_more", False)
new_messages = result.get("messages", [])
for m in new_messages:
return_messages.append(Message(m))
except SlackApiError: # Most likely a rate-limit
self.logger.error("Error while fetching channel messages. (likely rate limit) Retrying in {} seconds...".format(config["api_wait_time"]))
time.sleep(config["api_wait_time"])
refetch = True
self.logger.info(f"Fetched {len(return_messages)} new channel messages.")
return return_messages
def handle_incoming_message(self, message, say=None):
"""Reacts to all messages inside channel archiving. This either gets called when catching up on missed messages (by pre_start()) or by the SocketModeHandler in 'live' mode"""
if isinstance(message, dict):
try:
message = Message(message)
except MessageIsUnwanted:
return False
self.logger.info(f"Handling message {message} ({len(message.urls)} urls)")
if len(message.urls) > 1:
self.say_substitute("Only the first url is being handled. Please send any subsequent url as a separate message", thread_ts=message.thread.slack_ts)
self.callback(message = message)
def respond_channel_message(self, article, say=None):
if say is None:
say = self.say_substitute
answers = article.slack_info
for a in answers:
if a["file_path"]:
try:
self.client.files_upload(
channels = config["archive_id"],
initial_comment = f"{a['reply_text']}",
file = a["file_path"],
thread_ts = article.slack_ts_full
)
status = True
except SlackApiError as e: # upload resulted in an error
say(
"File {} could not be uploaded.".format(a),
thread_ts = article.slack_ts_full
)
status = False
self.logger.error(f"File upload failed: {e}")
else: # anticipated that there is no file!
say(
f"{a['reply_text']}",
thread_ts = article.slack_ts_full
)
status = True
def startup_status(self):
"""Prints an overview of the articles. This needs to be called here because it should run after having fetched the newly sent messages"""
total = models.ArticleDownload.select().count()
to_be_processed = models.ArticleDownload.select().where(models.ArticleDownload.title == "").count()
unchecked = models.ArticleDownload.select().where(models.ArticleDownload.verified == 0).count()
bad = models.ArticleDownload.select().where(models.ArticleDownload.verified == -1).count()
not_uploaded = models.ArticleDownload.select().where(models.ArticleDownload.archive_url == "").count()
self.logger.info(
f"[bold]NEWS-FETCH DATABASE STATUS[/bold]: Total entries: {total}; Not yet downloaded: {to_be_processed}; Not yet checked: {unchecked}; Not yet uploaded to archive: {not_uploaded}; Marked as bad: {bad}",
extra={"markup": True}
)
class BotRunner():
logger = logging.getLogger(__name__)
"""Stupid encapsulation so that we can apply the slack decorators to the BotApp"""
def __init__(self, callback, *args, **kwargs) -> None:
self.bot_worker = BotApp(callback, token=config["auth_token"])
@self.bot_worker.event(event="message", matchers=[is_message_in_archiving])
def handle_incoming_message(message, say):
return self.bot_worker.handle_incoming_message(message, say)
# @self.bot_worker.event(event="reaction_added", matchers=[is_reaction_in_archiving])
# def handle_incoming_reaction(event, say):
# return self.bot_worker.handle_incoming_reaction(event)
@self.bot_worker.event(event="event")
def handle_all_other_reactions(event, say):
self.logger.log("Ignoring slack event that isn't a message")
self.handler = SocketModeHandler(self.bot_worker, config["app_token"])
def start(self):
self.bot_worker.pre_start()
self.handler.start()
def stop(self):
self.handler.close()
self.logger.info("Closed Slack-Socketmodehandler")
def is_message_in_archiving(message) -> bool:
return message["channel"] == config["archive_id"]

View File

@ -0,0 +1,10 @@
def clear_path_name(path):
keepcharacters = (' ','.','_', '-')
converted = "".join([c if (c.isalnum() or c in keepcharacters) else "_" for c in path]).rstrip()
return converted
def shorten_name(name, offset = 50):
if len(name) > offset:
return name[:offset] + "..."
else:
return name

View File

@ -0,0 +1,297 @@
import logging
logger = logging.getLogger(__name__)
from peewee import *
import os
import markdown
import re
import configuration
import datetime
from . import helpers
config = configuration.main_config["DOWNLOADS"]
slack_config = configuration.main_config["SLACK"]
# set the nature of the db at runtime
download_db = DatabaseProxy()
class DownloadBaseModel(Model):
class Meta:
database = download_db
## == Article related models == ##
class ArticleDownload(DownloadBaseModel):
# in the beginning this is all we have
article_url = TextField(default = '', unique=True)
# fetch then fills in the metadata
title = CharField(default='')
@property
def is_title_bad(self): # add incrementally
return "PUR-Abo" in self.title \
or "Redirecting" in self.title \
or "Error while running fetch" in self.title
summary = TextField(default = '')
source_name = CharField(default = '')
language = CharField(default = '')
file_name = TextField(default = '')
@property
def save_path(self):
return f"{config['local_storage_path']}/{self.download_date.year}/{self.download_date.strftime('%B')}/"
@property
def fname_nas(self, file_name=""):
if self.download_date:
if file_name:
return f"NAS: {config['remote_storage_path']}/{self.download_date.year}/{self.download_date.strftime('%B')}/{file_name}"
else: # return the self. name
return f"NAS: {config['remote_storage_path']}/{self.download_date.year}/{self.download_date.strftime('%B')}/{self.file_name}"
else:
return None
@property
def fname_template(self):
if "youtube.com" in self.source_name or "youtu.be" in self.source_name:
fname = f"{self.source_name} -- {self.title}"
else:
fname = f"{self.source_name} -- {self.title}.pdf"
return helpers.clear_path_name(fname)
archive_url = TextField(default = '')
pub_date = DateField(default = '')
download_date = DateField(default = datetime.date.today)
slack_ts = FloatField(default = 0) # should be a fixed-length string but float is easier to sort by
@property
def slack_ts_full(self):
str_ts = str(self.slack_ts)
cut_zeros = 6 - (len(str_ts) - str_ts.find(".") - 1) # usually there a 6 decimals
return f"{str_ts}{cut_zeros * '0'}"
sent = BooleanField(default = False)
archived_by = CharField(default = os.getenv("UNAME"))
# need to know who saved the message because the file needs to be on their computer in order to get verified
# verification happens in a different app, but the model has the fields here as well
comment = TextField(default = '')
verified = IntegerField(default = 0) # 0 = not verified, 1 = verified, -1 = marked as bad
# authors
# keywords
# ... are added through foreignkeys
# we will also add an attribute named message, to reference which message should be replied to. This attribute does not need to be saved in the db
## Helpers specific to a single article
def __str__(self) -> str:
if self.title != '' and self.source_name != '':
desc = f"{helpers.shorten_name(self.title)} -- {self.source_name}"
else:
desc = f"{self.article_url}"
return f"ART [{desc}]"
@property
def slack_info(self):
status = [":x: No better version available", ":gear: Verification pending", ":white_check_mark: Verified by human"][self.verified + 1]
content = "\n>" + "\n>".join(self.summary.split("\n"))
file_status, msg = self.file_status()
if not file_status:
return [msg]
# everything alright: generate real content
# first the base file
if self.file_name[-4:] == ".pdf":
answer = [{ # main reply with the base pdf
"reply_text" : f"*{self.title}*\n{status}\n{content}",
"file_path" : self.save_path + self.file_name
}]
else: # don't upload if the file is too big!
location = f"Not uploaded to slack, but the file will be on the NAS:\n`{self.fname_nas}`"
answer = [{ # main reply with the base pdf
"reply_text" : f"*{self.title}*\n{status}\n{content}\n{location}",
"file_path" : None
}]
# then the related files
rel_text = ""
for r in self.related:
fname = r.related_file_name
lentry = "\n• `{}` ".format(self.fname_nas(fname))
if fname[-4:] == ".pdf": # this is a manageable file, directly upload
f_ret = self.save_path + fname
answer.append({"reply_text":"", "file_path" : f_ret})
else: # not pdf <=> too large. Don't upload but mention its existence
lentry += "(not uploaded to slack, but the file will be on the NAS)"
rel_text += lentry
if rel_text:
rel_text = answer[0]["reply_text"] = answer[0]["reply_text"] + "\nRelated files:\n" + rel_text
return answer
@property
def mail_info(self):
base = [{"reply_text": f"[{self.article_url}]({self.article_url})\n", "file_path":None}] + self.slack_info
return [{"reply_text": markdown.markdown(m["reply_text"]), "file_path": m["file_path"]} for m in base]
def set_authors(self, authors):
for a in authors:
ArticleAuthor.create(
article = self,
author = a
)
def set_related(self, related):
for r in related:
ArticleRelated.create(
article = self,
related_file_name = r
)
def file_status(self):
if not self.file_name:
logger.error(f"Article {self} has no filename!")
return False, {"reply_text": "Download failed, no file was saved.", "file_path": None}
file_path_abs = self.save_path + self.file_name
if not os.path.exists(file_path_abs):
logger.error(f"Article {self} has a filename, but the file does not exist at that location!")
return False, {"reply_text": "Can't find file. Either the download failed or the file was moved.", "file_path": None}
return True, {}
class ArticleAuthor(DownloadBaseModel):
article = ForeignKeyField(ArticleDownload, backref='authors')
author = CharField()
class ArticleRelated(DownloadBaseModel):
# Related files, such as the full text of a paper, audio files, etc.
article = ForeignKeyField(ArticleDownload, backref='related')
related_file_name = TextField(default = '')
# class Thread(ChatBaseModel):
# """The threads that concern us are only created if the base massage contains a url"""
# thread_ts = FloatField(default = 0)
# article = ForeignKeyField(ArticleDownload, backref="slack_thread", null=True, default=None)
# # provides, ts, user, models
# # messages
# @property
# def slack_ts(self):
# str_ts = str(self.thread_ts)
# cut_zeros = 6 - (len(str_ts) - str_ts.find(".") - 1) # usually there a 6 decimals. If there are less, problem!
# return "{}{}".format(str_ts, cut_zeros*"0")
# @property
# def initiator_message(self):
# try:
# return self.messages[0] # TODO check if this needs sorting
# except IndexError:
# logger.warning(f"Thread {self} is empty. How can that be?")
# return None
# @property
# def message_count(self):
# # logger.warning("message_count was called")
# return self.messages.count()
# @property
# def last_message(self):
# messages = Message.select().where(Message.thread == self).order_by(Message.ts) # can't be empty by definition/creation
# return messages[-1]
# @property
# def is_fully_processed(self) -> bool:
# init_message = self.initiator_message
# if init_message is None:
# return False
# if init_message.is_processed_override:
# return True
# # this override is set for instance, when no url was sent at all. Then set this thread to be ignored
# reactions = init_message.reaction
# if not reactions:
# return False
# else:
# r = reactions[0].type # can and should only have one reaction
# return r == "white_check_mark" \
# or r == "x"
# class Message(ChatBaseModel):
# ts = FloatField(unique=True) #for sorting
# channel_id = CharField(default='')
# user = ForeignKeyField(User, backref="messages")
# text = TextField(default='')
# thread = ForeignKeyField(Thread, backref="messages", default=None)
# file_type = CharField(default='')
# perma_link = CharField(default='')
# is_processed_override = BooleanField(default=False)
# # reaction
# def __str__(self) -> str:
# return "MSG [{}]".format(shorten_name(self.text).replace('\n','/'))
# @property
# def slack_ts(self):
# str_ts = str(self.ts)
# cut_zeros = 6 - (len(str_ts) - str_ts.find(".") - 1) # usually there a 6 decimals. If there are less, problem!
# return "{}{}".format(str_ts, cut_zeros * "0")
# @property
# def urls(self):
# pattern = r"<(.*?)>"
# matches = re.findall(pattern, self.text)
# matches = [m for m in matches if "." in m]
# new_matches = []
# for m in matches:
# if "." in m: # must contain a tld, right?
# # further complication: slack automatically abreviates urls in the format:
# # <url|link preview>. Lucky for us, "|" is a character derecommended in urls, meaning we can "safely" split for it and retain the first half
# if "|" in m:
# keep = m.split("|")[0]
# else:
# keep = m
# new_matches.append(keep)
# return new_matches
# @property
# def is_by_human(self):
# return self.user.user_id != slack_config["bot_id"]
# @property
# def has_single_url(self):
# return len(self.urls) == 1
def set_db(download_db_object):
download_db.initialize(download_db_object)
with download_db: # create tables (does nothing if they exist already)
download_db.create_tables([ArticleDownload, ArticleAuthor, ArticleRelated])

View File

@ -5,7 +5,7 @@ from pathlib import Path
import logging import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
import configuration import configuration
config = configuration.parsed["DOWNLOADS"] config = configuration.main_config["DOWNLOADS"]
shrink_sizes = [] shrink_sizes = []

View File

@ -8,7 +8,7 @@ from selenium import webdriver
import configuration import configuration
import json import json
config = configuration.parsed["DOWNLOADS"] config = configuration.main_config["DOWNLOADS"]
blacklisted = json.loads(config["blacklisted_href_domains"]) blacklisted = json.loads(config["blacklisted_href_domains"])
@ -25,10 +25,10 @@ class PDFDownloader:
options.profile = config["browser_profile_path"] options.profile = config["browser_profile_path"]
# should be options.set_preference("profile", config["browser_profile_path"]) as of selenium 4 but that doesn't work # should be options.set_preference("profile", config["browser_profile_path"]) as of selenium 4 but that doesn't work
if os.getenv("HEADLESS", "false") == "true": if os.getenv("DEBUG", "false") == "true":
options.add_argument('--headless') self.logger.warning("Opening browser GUI because of 'DEBUG=true'")
else: else:
self.logger.warning("Opening browser GUI because of 'HEADLESS=false'") options.add_argument('--headless')
options.set_preference('print.save_as_pdf.links.enabled', True) options.set_preference('print.save_as_pdf.links.enabled', True)
# Just save if the filetype is pdf already # Just save if the filetype is pdf already
@ -92,7 +92,7 @@ class PDFDownloader:
# in the mean time, get a page title if required # in the mean time, get a page title if required
if article_object.is_title_bad: if article_object.is_title_bad:
article_object.title = self.driver.title.replace(".pdf", "") article_object.title = self.driver.title.replace(".pdf", "") # some titles end with .pdf
# will be propagated to the saved file (dst) as well # will be propagated to the saved file (dst) as well
fname = article_object.fname_template fname = article_object.fname_template
@ -112,7 +112,6 @@ class PDFDownloader:
if success: if success:
article_object.file_name = fname article_object.file_name = fname
article_object.set_references(self.get_references())
else: else:
article_object.file_name = "" article_object.file_name = ""
@ -150,18 +149,6 @@ class PDFDownloader:
return False return False
def get_references(self):
try:
hrefs = [e.get_attribute("href") for e in self.driver.find_elements_by_xpath("//a[@href]")]
except:
hrefs = []
# len_old = len(hrefs)
hrefs = [h for h in hrefs \
if not sum([(domain in h) for domain in blacklisted]) # sum([True, False, False, False]) == 1 (esp. not 0)
] # filter a tiny bit at least
# self.logger.info(f"Hrefs filtered (before: {len_old}, after: {len(hrefs)})")
return hrefs

View File

@ -54,9 +54,4 @@ def get_description(article_object):
except AttributeError: except AttributeError:
pass # list would have been empty anyway pass # list would have been empty anyway
try:
article_object.set_keywords(news_article.keywords)
except AttributeError:
pass # list would have been empty anyway
return article_object return article_object