reduced slack functionality, higher ease of use. Database migration wip
This commit is contained in:
238
news_fetch/utils_slack/runner.py
Normal file
238
news_fetch/utils_slack/runner.py
Normal file
@@ -0,0 +1,238 @@
|
||||
from slack_bolt import App
|
||||
from slack_bolt.adapter.socket_mode import SocketModeHandler
|
||||
from slack_sdk.errors import SlackApiError
|
||||
|
||||
import logging
|
||||
import re
|
||||
import time
|
||||
|
||||
import configuration
|
||||
config = configuration.main_config["SLACK"]
|
||||
models = configuration.models
|
||||
|
||||
class MessageIsUnwanted(Exception):
|
||||
# This exception is triggered when the message is either threaded (reply to another message) or weird (like an edit, a deletion, etc)
|
||||
pass
|
||||
|
||||
class Message:
|
||||
ts = str
|
||||
user_id = str
|
||||
text = str
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def __init__(self, message_dict):
|
||||
if message_dict.get("subtype", "not bad") == "message_changed":
|
||||
raise MessageIsUnwanted()
|
||||
if message_dict["type"] == "message":
|
||||
if "thread_ts" in message_dict and (message_dict["thread_ts"] != message_dict["ts"]): # meaning it's a reply to another message
|
||||
raise MessageIsUnwanted()
|
||||
|
||||
self.user_id = message_dict.get("user", "BAD USER")
|
||||
# self.channel_id = config["archive_id"] # by construction, other messages are not intercepted
|
||||
self.ts = message_dict["ts"]
|
||||
self.text = message_dict["text"]
|
||||
|
||||
else:
|
||||
self.logger.warning(f"What should I do of {message_dict}")
|
||||
raise MessageIsUnwanted()
|
||||
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f"MSG [{self.text}]"
|
||||
|
||||
|
||||
@property
|
||||
def urls(self):
|
||||
pattern = r"<(.*?)>"
|
||||
matches = re.findall(pattern, self.text)
|
||||
matches = [m for m in matches if "." in m] # must contain a tld, right?
|
||||
|
||||
new_matches = []
|
||||
for m in matches:
|
||||
# further complication: slack automatically abreviates urls in the format:
|
||||
# <url|link preview>. Lucky for us, "|" is a character derecommended in urls, meaning we can "safely" split for it and retain the first half
|
||||
if "|" in m:
|
||||
keep = m.split("|")[0]
|
||||
else:
|
||||
keep = m
|
||||
new_matches.append(keep)
|
||||
return new_matches
|
||||
|
||||
@property
|
||||
def is_by_human(self):
|
||||
return self.user.user_id != config["bot_id"]
|
||||
|
||||
|
||||
@property
|
||||
def has_single_url(self):
|
||||
return len(self.urls) == 1
|
||||
|
||||
|
||||
|
||||
class BotApp(App):
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def __init__(self, callback, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.callback = callback
|
||||
|
||||
|
||||
def pre_start(self):
|
||||
missed_messages = self.fetch_missed_channel_messages()
|
||||
|
||||
[self.handle_incoming_message(m) for m in missed_messages]
|
||||
self.startup_status()
|
||||
|
||||
|
||||
def say_substitute(self, *args, **kwargs):
|
||||
self.client.chat_postMessage(
|
||||
channel=config["archive_id"],
|
||||
text=" - ".join(args),
|
||||
**kwargs
|
||||
)
|
||||
|
||||
def fetch_missed_channel_messages(self):
|
||||
# latest processed message_ts is:
|
||||
presaved = models.ArticleDownload.select().order_by(models.ArticleDownload.slack_ts.desc()).get_or_none()
|
||||
if presaved is None:
|
||||
last_ts = 0
|
||||
else:
|
||||
last_ts = presaved.slack_ts_full
|
||||
|
||||
result = self.client.conversations_history(
|
||||
channel=config["archive_id"],
|
||||
oldest=last_ts
|
||||
)
|
||||
|
||||
new_messages = result.get("messages", [])
|
||||
# # filter the last one, it is a duplicate! (only if the db is not empty!)
|
||||
# if last_ts != 0 and len(new_messages) != 0:
|
||||
# new_messages.pop(-1)
|
||||
|
||||
return_messages = [Message(m) for m in new_messages]
|
||||
|
||||
refetch = result.get("has_more", False)
|
||||
while refetch: # we have not actually fetched them all
|
||||
try:
|
||||
result = self.client.conversations_history(
|
||||
channel = config["archive_id"],
|
||||
cursor = result["response_metadata"]["next_cursor"],
|
||||
oldest = last_ts
|
||||
) # fetches 100 messages, older than the [-1](=oldest) element of new_fetches
|
||||
refetch = result.get("has_more", False)
|
||||
|
||||
new_messages = result.get("messages", [])
|
||||
for m in new_messages:
|
||||
return_messages.append(Message(m))
|
||||
except SlackApiError: # Most likely a rate-limit
|
||||
self.logger.error("Error while fetching channel messages. (likely rate limit) Retrying in {} seconds...".format(config["api_wait_time"]))
|
||||
time.sleep(config["api_wait_time"])
|
||||
refetch = True
|
||||
|
||||
self.logger.info(f"Fetched {len(return_messages)} new channel messages.")
|
||||
return return_messages
|
||||
|
||||
|
||||
|
||||
def handle_incoming_message(self, message, say=None):
|
||||
"""Reacts to all messages inside channel archiving. This either gets called when catching up on missed messages (by pre_start()) or by the SocketModeHandler in 'live' mode"""
|
||||
if isinstance(message, dict):
|
||||
try:
|
||||
message = Message(message)
|
||||
except MessageIsUnwanted:
|
||||
return False
|
||||
|
||||
|
||||
self.logger.info(f"Handling message {message} ({len(message.urls)} urls)")
|
||||
|
||||
|
||||
if len(message.urls) > 1:
|
||||
self.say_substitute("Only the first url is being handled. Please send any subsequent url as a separate message", thread_ts=message.thread.slack_ts)
|
||||
|
||||
self.callback(message = message)
|
||||
|
||||
|
||||
def respond_channel_message(self, article, say=None):
|
||||
if say is None:
|
||||
say = self.say_substitute
|
||||
answers = article.slack_info
|
||||
for a in answers:
|
||||
if a["file_path"]:
|
||||
try:
|
||||
self.client.files_upload(
|
||||
channels = config["archive_id"],
|
||||
initial_comment = f"{a['reply_text']}",
|
||||
file = a["file_path"],
|
||||
thread_ts = article.slack_ts_full
|
||||
)
|
||||
status = True
|
||||
except SlackApiError as e: # upload resulted in an error
|
||||
say(
|
||||
"File {} could not be uploaded.".format(a),
|
||||
thread_ts = article.slack_ts_full
|
||||
)
|
||||
status = False
|
||||
self.logger.error(f"File upload failed: {e}")
|
||||
else: # anticipated that there is no file!
|
||||
say(
|
||||
f"{a['reply_text']}",
|
||||
thread_ts = article.slack_ts_full
|
||||
)
|
||||
status = True
|
||||
|
||||
|
||||
def startup_status(self):
|
||||
"""Prints an overview of the articles. This needs to be called here because it should run after having fetched the newly sent messages"""
|
||||
total = models.ArticleDownload.select().count()
|
||||
to_be_processed = models.ArticleDownload.select().where(models.ArticleDownload.title == "").count()
|
||||
unchecked = models.ArticleDownload.select().where(models.ArticleDownload.verified == 0).count()
|
||||
bad = models.ArticleDownload.select().where(models.ArticleDownload.verified == -1).count()
|
||||
not_uploaded = models.ArticleDownload.select().where(models.ArticleDownload.archive_url == "").count()
|
||||
self.logger.info(
|
||||
f"[bold]NEWS-FETCH DATABASE STATUS[/bold]: Total entries: {total}; Not yet downloaded: {to_be_processed}; Not yet checked: {unchecked}; Not yet uploaded to archive: {not_uploaded}; Marked as bad: {bad}",
|
||||
extra={"markup": True}
|
||||
)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
class BotRunner():
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
"""Stupid encapsulation so that we can apply the slack decorators to the BotApp"""
|
||||
def __init__(self, callback, *args, **kwargs) -> None:
|
||||
self.bot_worker = BotApp(callback, token=config["auth_token"])
|
||||
|
||||
@self.bot_worker.event(event="message", matchers=[is_message_in_archiving])
|
||||
def handle_incoming_message(message, say):
|
||||
return self.bot_worker.handle_incoming_message(message, say)
|
||||
|
||||
# @self.bot_worker.event(event="reaction_added", matchers=[is_reaction_in_archiving])
|
||||
# def handle_incoming_reaction(event, say):
|
||||
# return self.bot_worker.handle_incoming_reaction(event)
|
||||
|
||||
@self.bot_worker.event(event="event")
|
||||
def handle_all_other_reactions(event, say):
|
||||
self.logger.log("Ignoring slack event that isn't a message")
|
||||
|
||||
self.handler = SocketModeHandler(self.bot_worker, config["app_token"])
|
||||
|
||||
|
||||
def start(self):
|
||||
self.bot_worker.pre_start()
|
||||
self.handler.start()
|
||||
|
||||
|
||||
def stop(self):
|
||||
self.handler.close()
|
||||
self.logger.info("Closed Slack-Socketmodehandler")
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def is_message_in_archiving(message) -> bool:
|
||||
return message["channel"] == config["archive_id"]
|
||||
|
Reference in New Issue
Block a user