Better folder structure, added a few ambient sensors

This commit is contained in:
Remy Moll
2021-04-15 19:50:37 +02:00
parent c61ee3ea72
commit 03d419f8f7
45 changed files with 711 additions and 1285 deletions

View File

@@ -1 +1,6 @@
# Placeholder
from . import keys
from . import reddit
from . import weather
from . import reddit
from . import search
from . import metmuseum

View File

@@ -1,20 +0,0 @@
import googlesearch
def query(params):
param_string = ""
for word in params:
param_string += word + "+"
param_string = param_string[:-1]
search_url = "https://google.com/search?q=" + param_string
try:
res = googlesearch.search(param_string.replace("+"," ") ,num=5,start=0,stop=5)
send_string = "Results for <b>" + param_string.replace("+"," ") + "</b>:\n\n"
for url in res:
send_string += url + "\n\n"
send_string += "Search url:\n" + search_url
except:
send_string = "Search url:\n" + search_url
return send_string

39
bot/api/metmuseum.py Normal file
View File

@@ -0,0 +1,39 @@
import requests
import random
from PIL import Image
import io
class ArtFetch:
def __init__(self):
self.base_url = "https://collectionapi.metmuseum.org/"
self.objects = self.fetch_objects() # chosen set of images to select randomly
def fetch_objects(self):
"""We restrict ourselves to a few domains."""
# fetch all departements
t = requests.get(self.base_url + "public/collection/v1/departments").json()
deps = t["departments"]
keep_id = []
for d in deps:
name = d["displayName"]
if name == "American Decorative Arts" or name == "Arts of Africa, Oceania, and the Americas" or name == "Asian Art" or name == "European Paintings":
keep_id.append(str(d["departmentId"]))
# fetch artworks listed under these departments
data = {"departmentIds" : "|".join(keep_id)}
t = requests.get(self.base_url + "public/collection/v1/objects",params=data).json()
# num = t["total"]
ids = t["objectIDs"]
return ids
def get_random_art(self):
"""Returns an image object of a randomly selected artwork"""
# fetch the artwork's url
r_id = self.objects[random.randint(0,len(self.objects))]
t = requests.get(self.base_url + "public/collection/v1/objects/" + str(r_id)).json()
im_url = t["primaryImageSmall"]
# download the image
resp = requests.get(im_url)
img = Image.open(io.BytesIO(resp.content))
return img

View File

@@ -1,50 +1,56 @@
import praw
try:
import bot.api.keys as keys
except:
import keys
stream = praw.Reddit(client_id = keys.reddit_id, client_secret = keys.reddit_secret, user_agent=keys.reddit_user_agent)
def get_top(subreddit, number, return_type="text"):
if return_type == "text":
message = ""
try:
for submission in stream.subreddit(subreddit).top(limit=number):
if not submission.stickied:
message += "<b>" + submission.title + "</b>" + "\n" + submission.selftext + "\n\n\n"
return message
except:
return "Api call failed, sorry"
else:
images = []
try:
for submission in stream.subreddit(subreddit).top(limit=number):
if not submission.stickied:
t = {"image": submission.url, "caption": submission.title}
images.append(t)
return images
except:
return ["Api call failed, sorry"]
def get_random_rising(subreddit, number, return_type="text"):
if return_type == "text":
message = ""
try:
for submission in stream.subreddit(subreddit).random_rising(limit=number):
if not submission.stickied:
message += "<b>" + submission.title + "</b>" + "\n" + submission.selftext + "\n\n\n"
return message
except:
return "Api call failed, sorry"
else:
images = []
try:
for submission in stream.subreddit(subreddit).random_rising(limit=number):
if not submission.stickied:
t = {"image": submission.url, "caption": submission.title}
images.append(t)
return images
except:
return ["Api call failed, sorry"]
class RedditFetch():
def __init__(self, key):
self.stream = praw.Reddit(client_id = key["id"], client_secret = key["secret"], user_agent=key["user_agent"])
def get_top(self, subreddit, number, return_type="text"):
if return_type == "text":
posts = []
try:
for submission in self.stream.subreddit(subreddit).top(limit=number):
p = {}
if not submission.stickied:
p["title"] = submission.title
p["content"] = submission.selftext
posts.append(p)
return posts
except:
return []
else:
images = []
try:
for submission in self.stream.subreddit(subreddit).top(limit=number):
if not submission.stickied:
t = {"image": submission.url, "caption": submission.title}
images.append(t)
return images
except:
return []
def get_random_rising(self, subreddit, number, return_type="text"):
if return_type == "text":
posts = []
try:
for submission in self.stream.subreddit(subreddit).random_rising(limit=number):
p = {}
if not submission.stickied:
p["title"] = submission.title
p["content"] = submission.selftext
posts.append(p)
return posts
except:
return []
else:
images = []
try:
for submission in self.stream.subreddit(subreddit).random_rising(limit=number):
if not submission.stickied:
t = {"image": submission.url, "caption": submission.title}
images.append(t)
return images
except:
return []

21
bot/api/search.py Normal file
View File

@@ -0,0 +1,21 @@
import duckduckpy
class WebSearch():
def __init__(self):
self.search = duckduckpy.query
def get_result(self, query):
try:
res = []
response = self.search(query, container = "dict")["related_topics"]
for r in response:
if "text" in r:
res.append({
"text" : r["text"],
"url": r["first_url"]
})
except:
res = ["Connection error"]
return res
# TODO: this api has more potential. Extract images or quick facts!

View File

@@ -1,150 +0,0 @@
import emoji
import requests
import datetime
import bot.api.keys
class TelegramIO():
def __init__(self, persistence):
"""Inits the Telegram-Interface
"""
self.base_url = "https://api.telegram.org/bot" + bot.api.keys.telegram_api + "/"
self.persistence = persistence
# Dynamic variables for answering
self.chat_id = ""
self.offset = 0
self.message_id = ""
self.message_queue = []
def update_commands(self,commands):
self.commands = commands
########################################################################
"""Helper-Functions"""
def fetch_updates(self):
""""""
update_url = self.base_url + "getUpdates"
data = {"offset":self.offset}
try:
result = requests.post(update_url,data=data)
result = result.json()["result"]
self.message_queue = result
except:
result = []
return len(result)
def process_message(self):
"""Inspects the first message from self.message_queue and reacts accordingly."""
message_data = self.message_queue.pop(0)
self.increase_counter("receive_activity")
self.offset = message_data["update_id"] + 1
if "edited_message" in message_data:
return
message = message_data["message"]
self.message_id = message["message_id"]
self.chat_id = message["chat"]["id"]
author = message["from"]
if str(author["id"]) not in self.persistence["bot"]["chat_members"]:
name = ""
if "first_name" in author:
name += author["first_name"] + " "
if "last_name" in author:
name += author["last_name"]
if len(name) == 0:
name = "anonymous"
self.persistence["bot"]["chat_members"][str(author["id"])] = name # seems like the conversion to string is handled implicitly, but it got me really confused
self.send_message("Welcome to this chat " + name + "!")
if "text" in message:
print("Chat said: ", emoji.demojize(message["text"]))
if "entities" in message:
for entry in message["entities"]:
if entry["type"] == "bot_command":
return message["text"] #self.handle_command(message["text"][1:])
elif "photo" in message:
print("Photo received, what do I do?")
return
def send_thinking_note(self):
data = {
"chat_id" : self.chat_id,
"action" : "typing",
}
send_url = self.base_url + "sendChatAction"
try:
r = requests.post(send_url, data=data)
except:
print("Could not show that I'm thinking =(")
def send_message(self, message):
if message == "" or message == None:
return
print("SENDING: " + message)
# message = message.replace("<","&lt;").replace(">", "&gt;")
# TODO: sanitize input but keep relevant tags
data = {
'chat_id': self.chat_id,
'text': emoji.emojize(message),
"parse_mode": "HTML",
"reply_to_message_id" : self.message_id,
}
send_url = self.base_url + "sendMessage"
try:
r = requests.post(send_url, data=data)
if (r.status_code != 200):
raise Exception
self.increase_counter("send_activity")
except:
out = datetime.datetime.now().strftime("%d.%m.%y - %H:%M")
out += " @ " + "telegram.send_message"
out += " --> " + "did not send:\n" + message
self.persistence["bot"]["log"] += [out]
def send_photo(self, url, caption):
print("SENDING PHOTO: " + url)
data = {
'chat_id': self.chat_id,
'photo': url,
"parse_mode": "HTML",
"reply_to_message_id" : self.message_id,
'caption' : caption,
}
send_url = self.base_url + "sendPhoto"
try:
r = requests.post(send_url, data=data)
self.increase_counter("send_activity")
except:
out = datetime.datetime.now().strftime("%d.%m.%y - %H:%M")
out += " @ " + "telegram.send_photo"
out += " --> " + "did not send:\n" + url
self.persistence["bot"]["log"] += [out]
def increase_counter(self, counter_name):
current_hour = int(datetime.datetime.now().timestamp() // 3600)
if len(self.persistence["bot"][counter_name]["hour"]) == 0 or current_hour != self.persistence["bot"][counter_name]["hour"][-1]:
self.persistence["bot"][counter_name]["hour"].append(current_hour)
self.persistence["bot"][counter_name]["count"].append(1)
else:
self.persistence["bot"][counter_name]["count"][-1] += 1

View File

@@ -1,14 +1,13 @@
import requests
import bot.api.keys
import datetime
class WeatherFetch():
def __init__(self):
def __init__(self, key):
self.last_fetch = datetime.datetime.fromtimestamp(0)
self.last_weather = ""
self.url = "https://api.openweathermap.org/data/2.5/onecall?"
self.key = bot.api.keys.weather_api
self.key = key
def show_weather(self, location):
delta = datetime.datetime.now() - self.last_fetch
@@ -59,3 +58,25 @@ class WeatherFetch():
ret_weather = self.last_weather
return ret_weather
# def get_weather_by_city(self, city):
# loc = get_coords_from_city(self, city)
# weather = self.show_weather(loc)
# return weather
# def get_coords_from_city(self, city):
# url = "https://devru-latitude-longitude-find-v1.p.rapidapi.com/latlon.php"
# data = {"location": city}
# headers = {
# "x-rapidapi-key" : "d4e0ab7ab3mshd5dde5a282649e0p11fd98jsnc93afd98e3aa",
# "x-rapidapi-host" : "devru-latitude-longitude-find-v1.p.rapidapi.com",
# }
# #try:
# resp = requests.request("GET", url, headers=headers, params=data)
# result = resp.text
# #except:
# # result = "???"
# return result

2
bot/commands/__init__.py Normal file
View File

@@ -0,0 +1,2 @@
# Placeholder
from . import clock, help, weather, status, zvv, lists, alias, plaintext, reddit, search

65
bot/commands/alias.py Normal file
View File

@@ -0,0 +1,65 @@
from .template import *
FIRST = range(1)
class Alias(BotFunc):
"""create a new command for command-paths you often use"""
def __init__(self, dispatcher, prst):
super().__init__(prst)
self.dispatcher = dispatcher
# do not interact with him yet!
def create_handler(self):
conv_handler = ConversationHandler(
entry_points=[CommandHandler('alias', self.entry_point)],
states={
FIRST: [
CallbackQueryHandler(self.print_all, pattern="^all$"),
CallbackQueryHandler(self.create_alias, pattern="^new$"),
CallbackQueryHandler(self.delete_alias, pattern='^delete$'),
]
},
fallbacks=[CommandHandler('alias', self.entry_point)],
)
return conv_handler
def entry_point(self, update: Update, context: CallbackContext) -> None:
super().entry_point()
test = self.dispatcher
print(self.dispatcher.handlers[0])
keyboard = [
[InlineKeyboardButton("All aliases", callback_data="all")],
[InlineKeyboardButton("Create new alias", callback_data="new")],
[InlineKeyboardButton("Delete alias", callback_data="delete")],
]
reply_markup = InlineKeyboardMarkup(keyboard)
update.message.reply_text("What exactly do you want?", reply_markup=reply_markup)
return FIRST
def print_all(self, update: Update, context: CallbackContext) -> None:
query = update.callback_query
query.answer()
all_alias = ""
for k in self.persistence["bot"]["aliases"]:
all_alias += k + " - " + self.persistence["bot"]["aliases"] +"\n"
query.edit_message_text(text="List of all commands:\n" + all_alias)
return ConversationHandler.END
def create_alias(self, update: Update, context: CallbackContext) -> None:
query = update.callback_query
query.answer()
all_alias = ""
for k in self.persistence["bot"]["aliases"]:
all_alias += k + " - " + self.persistence["bot"]["aliases"] +"\n"
query.edit_message_text(text="List of all commands:\n" + all_alias)
return ConversationHandler.END
def delete_alias(self, update: Update, context: CallbackContext) -> None:
return ConversationHandler.END

211
bot/commands/clock.py Normal file
View File

@@ -0,0 +1,211 @@
from .template import *
import time
import numpy
from PIL import Image
import io
CHOOSE, ADDARG = range(2)
MESSAGE, WAKE, ALARM, IMAGE, ART = range(3,8)
class Clock(BotFunc):
"""pass on commands to clock-module"""
def __init__(self, prst, clock_module, art_api):
super().__init__(prst)
self.clock = clock_module
self.api_art = art_api
def create_handler(self):
handler = ConversationHandler(
entry_points=[CommandHandler("clock", self.entry_point)],
states={
CHOOSE : [
CallbackQueryHandler(self.wake_light, pattern="^wake$"),
CallbackQueryHandler(self.alarm_blink, pattern="^alarm$"),
CallbackQueryHandler(self.show_message, pattern="^message$"),
CallbackQueryHandler(self.show_image, pattern="^image$"),
CallbackQueryHandler(self.art_gallery, pattern="^gallery$"),
],
ADDARG : [MessageHandler(Filters.text, callback=self.get_arg1)],
MESSAGE: [MessageHandler(Filters.text, callback=self.exec_show_message)],
WAKE : [MessageHandler(Filters.text, callback=self.exec_wake_light)],
ALARM : [MessageHandler(Filters.text, callback=self.exec_alarm_blink)],
IMAGE : [MessageHandler(Filters.photo, callback=self.exec_show_image)],
ART : [MessageHandler(Filters.text, callback=self.exec_art_gallery)],
},
fallbacks=[CommandHandler('clock', self.entry_point)],
)
return handler
def entry_point(self, update: Update, context: CallbackContext) -> None:
super().entry_point()
keyboard = [
[InlineKeyboardButton("Make a wake-light", callback_data="wake")],
[InlineKeyboardButton("Blink as alarm", callback_data="alarm")],
[InlineKeyboardButton("Show a message", callback_data="message")],
[InlineKeyboardButton("Show an image", callback_data="image")],
[InlineKeyboardButton("Art gallery!", callback_data="gallery")],
]
reply_markup = InlineKeyboardMarkup(keyboard)
update.message.reply_text("What exactly do you want?", reply_markup=reply_markup)
return CHOOSE
def wake_light(self, update: Update, context: CallbackContext) -> None:
query = update.callback_query
query.answer()
query.edit_message_text("Ok. How long should the color cycle last? (In seconds)")
return WAKE
def alarm_blink(self, update: Update, context: CallbackContext) -> None:
query = update.callback_query
query.answer()
query.edit_message_text("Ok. How long should it blink? (In seconds)")
self.next_state = {ALARM : "What frequency (Hertz)"}
return ADDARG
def show_message(self, update: Update, context: CallbackContext) -> None:
query = update.callback_query
query.answer()
query.edit_message_text("Ok. What message will I show?")
return MESSAGE
def show_image(self, update: Update, context: CallbackContext) -> None:
query = update.callback_query
query.answer()
query.edit_message_text("How long (in minutes) should the image be displayed?")
self.next_state = {IMAGE : "Please send me the photo to display."}
return ADDARG
def art_gallery(self, update: Update, context: CallbackContext) -> None:
query = update.callback_query
query.answer()
query.edit_message_text("Ok. How long should we display art? (in hours")
self.next_state = {ART : "And how many artworks would you like to see during that time?"}
return ADDARG
def get_arg1(self, update: Update, context: CallbackContext) -> None:
a = update.message.text
self.additional_argument = a
update.message.reply_text("Furthermore: "+ list(self.next_state.values())[0])
return list(self.next_state.keys())[0]
###### actually running clock actions
def exec_wake_light(self, update: Update, context: CallbackContext) -> None:
duration = update.message.text
def output(duration):
self.clock.set_brightness(value=1)
start_color = numpy.array([153, 0, 51])
end_color = numpy.array([255, 255, 0])
col_show = numpy.zeros((*self.clock.shape, 3))
col_show[:,:,...] = start_color
gradient = end_color - start_color
# 20 steps should be fine => sleep_time = duration / 20
for i in range(20):
ct = i/20 * gradient
col_show[:,:,...] = [int(x) for x in ct+start_color]
self.clock.IO.put(col_show)
time.sleep(int(duration) / 20)
self.clock.run(output,(duration,))
return ConversationHandler.END
def exec_alarm_blink(self, update: Update, context: CallbackContext) -> None:
duration = self.additional_argument
frequency = update.message.text
def output(duration, frequency):
self.clock.set_brightness(value=1)
duration = int(duration)
frequency = int(frequency)
n = duration * frequency / 2
empty = numpy.zeros((*self.clock.shape,3))
red = empty.copy()
red[...,0] = 255
for i in range(int(n)):
self.clock.IO.put(red)
time.sleep(1/frequency)
self.clock.IO.put(empty)
time.sleep(1/frequency)
if not(duration == 0 or frequency == 0):
update.message.reply_text("Now blinking")
self.clock.run(output,(duration, frequency))
return ConversationHandler.END
def exec_show_image(self, update: Update, context: CallbackContext) -> None:
duration = self.additional_argument
i = update.message.photo
img = update.message.photo[0]
bot = img.bot
id = img.file_id
file = bot.getFile(id).download_as_bytearray()
width = self.clock.shape[1]
height = self.clock.shape[0]
img = Image.open(io.BytesIO(file))
im_height = img.height
im_width = img.width
scalex = im_width // width
scaley = im_height // height
scale = min(scalex, scaley)
t = img.resize((width, height),box=(0,0,width*scale,height*scale))
a = numpy.asarray(t)
def output(image, duration):
self.clock.IO.put(image)
time.sleep(int(duration) * 60)
self.clock.run(output,(a, duration))
return ConversationHandler.END
def exec_show_message(self, update: Update, context: CallbackContext) -> None:
message_str = update.message.text
update.message.reply_text("Now showing: " + message_str)
self.clock.run(self.clock.text_scroll,(message_str,))
return ConversationHandler.END
def exec_art_gallery(self, update: Update, context: CallbackContext) -> None:
duration = float(self.additional_argument)
number = int(update.message.text)
def output(number, duration):
for i in range(number):
img = self.api_art.get_random_art() # returns an PIL.Image object
im_height = img.height
im_width = img.width
width = self.clock.shape[1]
height = self.clock.shape[0]
scalex = im_width // width
scaley = im_height // height
scale = min(scalex, scaley)
t = img.resize((width, height),box=(0,0,width*scale,height*scale))
a = numpy.asarray(t)
self.clock.IO.put(a)
time.sleep(duration*3600 / number)
update.message.reply_text("Ok. Showing art for the next "+ str(duration) + " hours.")
self.clock.run(output,(number, duration))
return ConversationHandler.END

126
bot/commands/help.py Normal file
View File

@@ -0,0 +1,126 @@
from .template import *
FIRST, EXECUTE = range(2)
class Help(BotFunc):
"""Shows the functions and their usage"""
def __init__(self, prst):
super().__init__(prst)
self.available_commands = {}
def create_handler(self):
conv_handler = ConversationHandler(
entry_points=[CommandHandler('help', self.entry_point)],
states={
FIRST: [
CallbackQueryHandler(self.print_all, pattern="^all$"),
CallbackQueryHandler(self.choose_specific, pattern="^specific$"),
CallbackQueryHandler(self.print_one, pattern='func-'),
],
EXECUTE :[CallbackQueryHandler(self.execute_now)],
# ConversationHandler.TIMEOUT : [
# CallbackQueryHandler(self.timeout)
# ]
},
fallbacks=[CommandHandler('help', self.entry_point)],
conversation_timeout=15,
)
return conv_handler
def add_commands(self, commands):
# commands is a dict {"name": class}
for k in commands:
if k != "plaintext":
self.available_commands[k] = commands[k].__doc__
def entry_point(self, update: Update, context: CallbackContext) -> None:
super().entry_point()
keyboard = [
[
InlineKeyboardButton("All commands", callback_data="all"),
InlineKeyboardButton("Just one", callback_data="specific"),
]
]
reply_markup = InlineKeyboardMarkup(keyboard)
if update.message:
update.message.reply_text("What exactly do you want?", reply_markup=reply_markup)
else:
update._effective_chat.send_message("What exactly do you want?", reply_markup=reply_markup)
return FIRST
def print_all(self, update: Update, context: CallbackContext) -> None:
query = update.callback_query
query.answer()
all_cmd = ""
for h in self.available_commands:
all_cmd += h + " - `" + self.available_commands[h] + "`\n"
query.edit_message_text(text="List of all commands:\n" + all_cmd, parse_mode = ParseMode.MARKDOWN)
return ConversationHandler.END
def choose_specific(self, update: Update, context: CallbackContext) -> None:
query = update.callback_query
query.answer()
keyboard = [[InlineKeyboardButton(k, callback_data="func-" + k)] for k in self.available_commands]
reply_markup = InlineKeyboardMarkup(keyboard)
query.edit_message_text(
text="What command should be printed?", reply_markup=reply_markup
)
return FIRST
def print_one(self, update: Update, context: CallbackContext) -> None:
"""Show new choice of buttons"""
query = update.callback_query
name = query.data.replace("func-", "")
query.answer()
message = name + ": `" + self.available_commands[name] + "`"
keyboard = [[InlineKeyboardButton("Call " + name + " now", callback_data=name),]]
reply_markup = InlineKeyboardMarkup(keyboard)
query.edit_message_text(
text= message,
reply_markup = reply_markup,
parse_mode = ParseMode.MARKDOWN_V2
)
return EXECUTE
def execute_now(self, update: Update, context: CallbackContext) -> None:
query = update.callback_query
name = query.data
query.answer()
funcs = context.dispatcher.handlers[0]
for func in funcs:
if name == func.entry_points[0].command[0]:
break
callback = func.entry_points[0].handle_update
callback(update, context.dispatcher, check_result=True, context=context)
return ConversationHandler.END
def timeout(self, update: Update, context: CallbackContext) -> None:
"""For dying conversation. Currently unused."""
query = update.callback_query
name = query.data.replace("func-", "")
query.answer()
message = name + ": `" + self.available_commands[name] + "`"
query.edit_message_text(
text= "EHHHHH",
parse_mode = ParseMode.MARKDOWN_V2
)
return ConversationHandler.END

177
bot/commands/lists.py Normal file
View File

@@ -0,0 +1,177 @@
from .template import *
import datetime
import requests
NAME, NEW, ACTION, ITEMADD, ITEMREMOVE = range(5)
class Lists(BotFunc):
"""Create and edit lists"""
def __init__(self, prst):
super().__init__(prst)
self.current_name = ""
def create_handler(self):
conv_handler = ConversationHandler(
entry_points=[CommandHandler('list', self.entry_point)],
states={
NAME: [
CallbackQueryHandler(self.choose_list, pattern="^list-"),
CallbackQueryHandler(self.new_list, pattern="^new$"),
],
NEW : [MessageHandler(Filters.text, callback=self.new_listname)],
ACTION: [
CallbackQueryHandler(self.list_add, pattern="^add$"),
CallbackQueryHandler(self.list_remove, pattern="^remove$"),
CallbackQueryHandler(self.list_clear, pattern="^clear$"),
CallbackQueryHandler(self.list_delete, pattern="^delete$"),
CallbackQueryHandler(self.list_print, pattern="^print$"),
CallbackQueryHandler(self.list_menu, pattern="^overview$"),
],
ITEMADD : [MessageHandler(Filters.text, callback=self.list_add_item)],
ITEMREMOVE : [CallbackQueryHandler(self.list_remove_index)]
},
fallbacks=[CommandHandler('list', self.entry_point)],
)
return conv_handler
def entry_point(self, update: Update, context: CallbackContext) -> None:
super().entry_point()
keyboard = [[InlineKeyboardButton(k, callback_data="list-"+k)] for k in self.persistence["global"]["lists"]] + [[InlineKeyboardButton("New list", callback_data="new")]]
reply_markup = InlineKeyboardMarkup(keyboard)
update.message.reply_text(text="Here are the existing lists. You can also create a new one:", reply_markup=reply_markup)
return NAME
def choose_list(self, update: Update, context: CallbackContext) -> None:
query = update.callback_query
data = query.data
name = data.replace("list-","")
query.answer()
self.current_name = name
keyboard = [
[InlineKeyboardButton("Add item", callback_data="add")],
[InlineKeyboardButton("Remove item", callback_data="remove")],
[InlineKeyboardButton("Clear list", callback_data="clear")],
[InlineKeyboardButton("Print list", callback_data="print")],
[InlineKeyboardButton("Delete list", callback_data="delete")],
]
reply_markup = InlineKeyboardMarkup(keyboard)
query.edit_message_text("Very well. For " + name + " the following actions are available:", reply_markup=reply_markup)
return ACTION
def list_menu(self, update: Update, context: CallbackContext) -> None:
query = update.callback_query
query.answer()
keyboard = [
[InlineKeyboardButton("Add item", callback_data="add")],
[InlineKeyboardButton("Remove item", callback_data="remove")],
[InlineKeyboardButton("Clear list", callback_data="clear")],
[InlineKeyboardButton("Print list", callback_data="print")],
[InlineKeyboardButton("Delete list", callback_data="delete")],
]
reply_markup = InlineKeyboardMarkup(keyboard)
query.edit_message_text("Very well. For " + self.current_name + " the following actions are available:", reply_markup=reply_markup)
return ACTION
def new_list(self, update: Update, context: CallbackContext) -> None:
query = update.callback_query
query.answer()
query.edit_message_text("What's the name of the new list?")
return NEW
def new_listname(self, update: Update, context: CallbackContext) -> None:
name = update.message.text
if name not in self.persistence["global"]["lists"]:
self.persistence["global"]["lists"][name] = []
keyboard = [[InlineKeyboardButton("Add an item", callback_data="add"), InlineKeyboardButton("To the menu!", callback_data="overview")]]
reply_markup = InlineKeyboardMarkup(keyboard)
self.current_name = name
update.message.reply_text("Thanks. List " + name + " was successfully created.", reply_markup=reply_markup)
return ACTION
else:
update.message.reply_text("Oh no! That list already exists")
return ConversationHandler.END
def list_add(self, update: Update, context: CallbackContext) -> None:
query = update.callback_query
query.answer()
query.edit_message_text("What would you like to add?")
return ITEMADD
def list_remove(self, update: Update, context: CallbackContext) -> None:
query = update.callback_query
query.answer()
keyboard = [[InlineKeyboardButton(k, callback_data=i)] for i,k in enumerate(self.persistence["global"]["lists"][self.current_name])]
reply_markup = InlineKeyboardMarkup(keyboard)
query.edit_message_text("Which item would you like to remove?", reply_markup = reply_markup)
return ITEMREMOVE
def list_clear(self, update: Update, context: CallbackContext) -> None:
query = update.callback_query
query.answer()
self.persistence["global"]["lists"][self.current_name] = []
keyboard = [[InlineKeyboardButton("Add an item", callback_data="add"), InlineKeyboardButton("Back to the menu", callback_data="overview")]]
reply_markup = InlineKeyboardMarkup(keyboard)
query.edit_message_text("List " + self.current_name + " cleared", reply_markup=reply_markup)
return ACTION
def list_delete(self, update: Update, context: CallbackContext) -> None:
query = update.callback_query
query.answer()
self.persistence["global"]["lists"].pop(self.current_name, None)
query.edit_message_text("List " + self.current_name + " deleted")
return ConversationHandler.END
def list_print(self, update: Update, context: CallbackContext) -> None:
query = update.callback_query
query.answer()
content = "\n".join(self.persistence["global"]["lists"][self.current_name])
keyboard = [[InlineKeyboardButton("Add an item", callback_data="add"), InlineKeyboardButton("Back to the menu", callback_data="overview")]]
reply_markup = InlineKeyboardMarkup(keyboard)
query.edit_message_text("Content of " + self.current_name + ":\n" + content, reply_markup=reply_markup)
return ACTION
def list_add_item(self, update: Update, context: CallbackContext) -> None:
name = update.message.text
self.persistence["global"]["lists"][self.current_name] += [name]
keyboard = [[InlineKeyboardButton("Add some more", callback_data="add"), InlineKeyboardButton("Back to the menu", callback_data="overview")]]
reply_markup = InlineKeyboardMarkup(keyboard)
update.message.reply_text("Added " + name, reply_markup=reply_markup)
return ACTION
def list_remove_index(self, update: Update, context: CallbackContext) -> None:
query = update.callback_query
ind = int(query.data)
query.answer()
old = self.persistence["global"]["lists"][self.current_name]
name = old.pop(ind)
self.persistence["global"]["lists"][self.current_name] = old
keyboard = [[InlineKeyboardButton("Remove another", callback_data="remove"), InlineKeyboardButton("Back to the menu", callback_data="overview")]]
reply_markup = InlineKeyboardMarkup(keyboard)
query.edit_message_text("Removed " + name, reply_markup=reply_markup)
return ACTION

15
bot/commands/plaintext.py Normal file
View File

@@ -0,0 +1,15 @@
from .template import *
class Plain(BotFunc):
"""Not a command: just keeps logs and usage_data"""
def __init__(self, prst):
super().__init__(prst)
def create_handler(self):
h = MessageHandler(Filters.text, callback=self.add_to_log)
return h
def add_to_log(self, update: Update, context: CallbackContext) -> None:
super().entry_point()
super().increase_counter("receive_activity")

177
bot/commands/reddit.py Normal file
View File

@@ -0,0 +1,177 @@
from .template import *
CHOOSE_NUM = 1
class Joke(BotFunc):
"""Tells a joke from reddit."""
def __init__(self, api, prst):
super().__init__(prst)
self.available_commands = {}
self.api = api
def create_handler(self):
conv_handler = ConversationHandler(
entry_points=[CommandHandler('joke', self.entry_point)],
states={
CHOOSE_NUM: [CallbackQueryHandler(self.get_jokes),],
},
fallbacks=[CommandHandler('joke', self.entry_point)],
# conversation_timeout=5,
)
return conv_handler
def entry_point(self, update: Update, context: CallbackContext) -> None:
super().entry_point()
keyboard = [[InlineKeyboardButton(str(i), callback_data=str(i)) for i in range(1,11)]]
reply_markup = InlineKeyboardMarkup(keyboard)
update.message.reply_text("How many jokes?", reply_markup=reply_markup)
return CHOOSE_NUM
def get_jokes(self, update: Update, context: CallbackContext) -> None:
query = update.callback_query
number = int(query.data)
query.answer()
jokes = self.api.get_random_rising("jokes", number, "text")
# formating
message = ""
for j in jokes:
message += "<b>" + j["title"] + "</b> \n" + j["content"] + "\n\n"
if message == "":
message += "Could not fetch jokes."
query.edit_message_text(text = message, parse_mode = ParseMode.HTML)
return ConversationHandler.END
CHOOSE_TOPIC = 0
class Meme(BotFunc):
"""Gets the latest memes from reddit"""
def __init__(self, api, prst):
super().__init__(prst)
self.available_commands = {}
self.api = api
def create_handler(self):
conv_handler = ConversationHandler(
entry_points=[CommandHandler('meme', self.entry_point)],
states={
CHOOSE_TOPIC: [CallbackQueryHandler(self.choose_topic)],
CHOOSE_NUM :[CallbackQueryHandler(self.get_memes)],
},
fallbacks=[CommandHandler('meme', self.entry_point)],
)
return conv_handler
def entry_point(self, update: Update, context: CallbackContext) -> None:
super().entry_point()
keyboard = [
[InlineKeyboardButton("General", callback_data="memes"),],
[InlineKeyboardButton("Dank memes", callback_data="dankmemes"),],
[InlineKeyboardButton("Maths", callback_data="mathmemes"),],
[InlineKeyboardButton("Physics", callback_data="physicsmemes"),],
[InlineKeyboardButton("Biology", callback_data="biologymemes"),],
]
reply_markup = InlineKeyboardMarkup(keyboard)
update.message.reply_text("What kind of memes?", reply_markup=reply_markup)
return CHOOSE_TOPIC
def choose_topic(self, update: Update, context: CallbackContext) -> None:
super().entry_point()
query = update.callback_query
d = query.data
query.answer()
keyboard = [[InlineKeyboardButton(str(i), callback_data=d + "-" + str(i)) for i in range(1,11)]]
reply_markup = InlineKeyboardMarkup(keyboard)
query.edit_message_text("How many memes?", reply_markup=reply_markup)
return CHOOSE_NUM
def get_memes(self, update: Update, context: CallbackContext) -> None:
query = update.callback_query
data = query.data.split("-")
query.answer()
memes = self.api.get_random_rising(data[0], int(data[1]), "photo")
if len(memes) != 0:
for m in memes:
update.effective_chat.send_photo(photo = m["image"],caption = m["caption"])
else:
update.effective_chat.send_message("Sorry, the meme won't yeet.")
return ConversationHandler.END
# class News(BotFunc):
# """Gets the latest news from reddit"""
# def __init__(self, api, prst):
# super().__init__(prst)
# self.available_commands = {}
# self.api = api
# def create_handler(self):
# conv_handler = ConversationHandler(
# entry_points=[CommandHandler('news', self.entry_point)],
# states={
# CHOOSE_TOPIC: [CallbackQueryHandler(self.choose_topic)],
# CHOOSE_NUM :[CallbackQueryHandler(self.get_news)],
# },
# fallbacks=[CommandHandler('news', self.entry_point)],
# )
# return conv_handler
# def entry_point(self, update: Update, context: CallbackContext) -> None:
# super().entry_point()
# keyboard = [
# [InlineKeyboardButton("World", callback_data="worldnews"),],
# [InlineKeyboardButton("Germany", callback_data="germannews"),],
# [InlineKeyboardButton("France", callback_data="francenews"),],
# [InlineKeyboardButton("Europe", callback_data="eunews"),],
# [InlineKeyboardButton("USA", callback_data="usanews"),],
# ]
# reply_markup = InlineKeyboardMarkup(keyboard)
# update.message.reply_text("What kind of news?", reply_markup=reply_markup)
# return CHOOSE_TOPIC
# def choose_topic(self, update: Update, context: CallbackContext) -> None:
# super().entry_point()
# query = update.callback_query
# d = query.data
# query.answer()
# keyboard = [[InlineKeyboardButton(str(i), callback_data=d + "-" + str(i)) for i in range(1,11)]]
# reply_markup = InlineKeyboardMarkup(keyboard)
# query.edit_message_text("How many entries?", reply_markup=reply_markup)
# return CHOOSE_NUM
# def get_news(self, update: Update, context: CallbackContext) -> None:
# query = update.callback_query
# data = query.data.split("-")
# query.answer()
# #try:
# news = self.api.get_top(data[0], data[1], "text")
# # formating
# message = ""
# for j in news:
# message += "<b>" + j["title"] + "</b> \n" + j["content"] + "\n\n"
# if message == "":
# message += "Could not fetch news."
# query.edit_message_text(news, paresemode=ParseMode.HTML)
# return ConversationHandler.END

58
bot/commands/search.py Normal file
View File

@@ -0,0 +1,58 @@
from .template import *
SEARCH, MORE = range(2)
class Search(BotFunc):
"""Browse the web for a topic."""
def __init__(self, api, prst):
super().__init__(prst)
self.available_commands = {}
self.api = api
def create_handler(self):
conv_handler = ConversationHandler(
entry_points=[CommandHandler('search', self.entry_point)],
states={
SEARCH: [MessageHandler(Filters.text, self.get_results),],
MORE: [CallbackQueryHandler(self.show_more, pattern="^more$"),],
},
fallbacks=[CommandHandler('search', self.entry_point)],
conversation_timeout=20,
)
return conv_handler
def entry_point(self, update: Update, context: CallbackContext) -> None:
super().entry_point()
update.message.reply_text("What are we searching?")
return SEARCH
def get_results(self, update: Update, context: CallbackContext) -> None:
search = update.message.text
results = self.api.get_result(search)
keyboard = [[InlineKeyboardButton("More!", callback_data="more")]]
reply_markup = InlineKeyboardMarkup(keyboard)
# formating
self.results = results
first = results[0]
message = first["text"] + "\n(" + first["url"] + ")\n\n"
update.message.reply_text(text = message, reply_markup=reply_markup)
return MORE
def show_more(self, update: Update, context: CallbackContext) -> None:
query = update.callback_query
query.answer()
message = ""
for r in self.results:
message += r["text"] + "\n(" + r["url"] + ")\n\n"
query.edit_message_text(message)
return ConversationHandler.END

102
bot/commands/status.py Normal file
View File

@@ -0,0 +1,102 @@
from .template import *
import datetime
import requests
import socket
import numpy as np
import os
import json
FIRST = 1
class Status(BotFunc):
"""Shows a short status of the program."""
def __init__(self, name, version, prst):
super().__init__(prst)
self.start_time = datetime.datetime.now()
self.name = name
self.version = version
def create_handler(self):
conv_handler = ConversationHandler(
entry_points=[CommandHandler('status', self.entry_point)],
states={
FIRST: [
CallbackQueryHandler(self.send_log, pattern="^full$"),
]
},
fallbacks=[CommandHandler('status', self.entry_point)],
)
return conv_handler
def entry_point(self, update: Update, context: CallbackContext) -> None:
super().entry_point()
keyboard = [
[
InlineKeyboardButton("And the log?", callback_data="full"),
]
]
reply_markup = InlineKeyboardMarkup(keyboard)
delta = str(datetime.datetime.now() - self.start_time)
message = "BeebBop, this is " + self.name + " (V." + self.version + ")\n"
try:
ip = requests.get('https://api.ipify.org').text
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.connect(('8.8.8.8', 80))
(addr, port) = s.getsockname()
local_ips = addr
except:
ip = "not fetchable"
local_ips = "not fetchable"
message += "Status: Running 🟢\n"
message += "Uptime: `" + delta[:delta.rfind(".")] + "`\n"
message += "Reboots: `" + str(self.persistence["global"]["reboots"]) + "`\n"
message += "IP (public): `" + ip + "`\n"
message += "IP (private): `" + str(local_ips) + "`\n"
u = str(self.get_ngrok_url())
message += "URL: [" + u + "](" + u + ")\n"
tot_r = np.array(self.persistence["bot"]["receive_activity"]["count"]).sum()
message += "Total messages read: `" + str(tot_r) + "`\n"
tot_s = np.array(self.persistence["bot"]["send_activity"]["count"]).sum()
message += "Total messages sent: `" + str(tot_s) + "`\n"
tot_e = np.array(self.persistence["bot"]["execute_activity"]["count"]).sum()
message += "Commands executed `" + str(tot_e) + "`\n"
if update.message:
update.message.reply_text(message, reply_markup=reply_markup, parse_mode=ParseMode.MARKDOWN)
else:
update._effective_chat.send_message(message, reply_markup=reply_markup, parse_mode=ParseMode.MARKDOWN)
return FIRST
def send_log(self, update: Update, context: CallbackContext) -> None:
query = update.callback_query
wanted = query.data.replace("status-","")
query.answer()
with open("persistence/complete.log") as l:
query.message.reply_document(l)
return ConversationHandler.END
def get_ngrok_url(self):
try:
url = "http://localhost:4040/api/tunnels/"
res = requests.get(url)
res_unicode = res.content.decode("utf-8")
res_json = json.loads(res_unicode)
for i in res_json["tunnels"]:
if i['name'] == 'command_line':
return i['public_url']
break
except:
return "Not available"

33
bot/commands/template.py Normal file
View File

@@ -0,0 +1,33 @@
import logging
from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update, ParseMode
from telegram.ext import Updater, CommandHandler, CallbackQueryHandler, CallbackContext, MessageHandler, Filters
from telegram.ext import (
Updater,
CommandHandler,
CallbackQueryHandler,
ConversationHandler,
CallbackContext,
)
import datetime
class BotFunc():
"""Base class for a specific bot-functionality"""
def __init__(self, prst):
self.logger = logging.getLogger(__name__)
self.persistence = prst
def entry_point(self):
self.increase_counter("execute_activity")
def increase_counter(self, counter_name):
current_hour = int(datetime.datetime.now().timestamp() // 3600)
if len(self.persistence["bot"][counter_name]["hour"]) == 0 or current_hour != self.persistence["bot"][counter_name]["hour"][-1]:
self.persistence["bot"][counter_name]["hour"].append(current_hour)
self.persistence["bot"][counter_name]["count"].append(1)
else:
self.persistence["bot"][counter_name]["count"][-1] += 1

116
bot/commands/weather.py Normal file
View File

@@ -0,0 +1,116 @@
from .template import *
import datetime
FIRST = 1
class Weather(BotFunc):
"""Shows a weatherforecast for a given location"""
def __init__(self, api, prst):
"""initialize api and persistence"""
super().__init__(prst)
self.api = api
self.city = ""
def create_handler(self):
"""returns the handlers with button-logic"""
conv_handler = ConversationHandler(
entry_points=[CommandHandler('weather', self.entry_point)],
states={
FIRST: [
CallbackQueryHandler(self.choose_city, pattern="^city-"),
CallbackQueryHandler(self.choose_time, pattern="^time-"),
]
},
fallbacks=[CommandHandler('weather', self.entry_point)],
)
return conv_handler
def entry_point(self, update: Update, context: CallbackContext) -> None:
"""Reacts the call of the command. Prints the first buttons"""
super().entry_point()
keyboard = [
[
InlineKeyboardButton("Zürich", callback_data="city-zurich"),
InlineKeyboardButton("Freiburg", callback_data="city-freiburg"),
InlineKeyboardButton("Mulhouse", callback_data="city-mulhouse"),
]
]
reply_markup = InlineKeyboardMarkup(keyboard)
if update.message:
update.message.reply_text("Which city?", reply_markup=reply_markup)
else:
update.callback_query.edit_message_text("Which city", reply_markup=reply_markup)
return FIRST
def choose_city(self, update: Update, context: CallbackContext) -> None:
"""Prompt same text & keyboard as `start` does but not as new message"""
# Get CallbackQuery from Update
query = update.callback_query
data = query.data
self.city = data.replace("city-","")
query.answer()
keyboard = [
[
InlineKeyboardButton("Now", callback_data="time-now"),
InlineKeyboardButton("Tomorrow", callback_data="time-tomorrow"),
InlineKeyboardButton("7 days", callback_data="time-7"),
]
]
reply_markup = InlineKeyboardMarkup(keyboard)
query.edit_message_text(
text = "Which time?", reply_markup=reply_markup
)
return FIRST
def choose_time(self, update: Update, context: CallbackContext) -> None:
"""Show new choice of buttons"""
query = update.callback_query
query.answer()
forecast_time = query.data.replace("time-","")
weather = self.get_weather(self.city, forecast_time)
query.edit_message_text(
text = "Weather: \n\n" + weather,
parse_mode = ParseMode.HTML
)
return ConversationHandler.END
def get_weather(self, city, forecast_time) -> None:
"""get the weather that matches the given params"""
locations = {"freiburg": [47.9990, 7.8421], "zurich": [47.3769, 8.5417], "mulhouse": [47.7508, 7.3359]}
city = locations[city]
categories = {"Clouds": "", "Rain": "🌧", "Thunderstorm": "🌩", "Drizzle": ":droplet:", "Snow": "", "Clear": "", "Mist": "🌫", "Smoke": "Smoke", "Haze": "Haze", "Dust": "Dust", "Fog": "Fog", "Sand": "Sand", "Dust": "Dust", "Ash": "Ash", "Squall": "Squall", "Tornado": "Tornado",}
days = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"]
today = datetime.datetime.today().weekday()
weather = self.api.show_weather(city)
message = ""
if forecast_time == "now" or forecast_time == "7":
now = weather.pop(0)
message += "<b>Now:</b> " + categories[now["short"]] + "\n"
message += "🌡" + str(now["temps"][0]) + "°\n\n"
tod = weather.pop(0)
message += "<b>" + "Today" + ":</b> " + categories[tod["short"]] + "\n"
message += "🌡 ❄ " + str(tod["temps"][0]) + "° , 🌡 🔥 " + str(tod["temps"][1]) + "°\n\n"
if forecast_time == "tomorrow" or forecast_time == "7":
if forecast_time == "tomorrow": # previous statement was not executed: tomorrow is at weather[2]
tom = weather.pop(2)
else:
tom = weather.pop(0)
message += "<b>" + "Tomorrow" + ":</b> " + categories[tom["short"]] + "\n"
message += "🌡 ❄ " + str(tom["temps"][0]) + "° , 🌡 🔥 " + str(tom["temps"][1]) + "°\n\n"
if forecast_time == "7":
for i, day in enumerate(weather):
message += "<b>" + days[(today + i + 2) % 7] + ":</b> " + categories[day["short"]] + "\n"
message += "🌡 ❄ " + str(day["temps"][0]) + "° , 🌡 🔥 " + str(day["temps"][1]) + "°\n\n"
return message

86
bot/commands/zvv.py Normal file
View File

@@ -0,0 +1,86 @@
from .template import *
import datetime
import requests
START, DEST = range(2)
class Zvv(BotFunc):
"""Connects to the swiss travel-api to get public transport routes"""
def __init__(self, prst):
super().__init__(prst)
self.start = ""
self.dest = ""
pass
def create_handler(self):
conv_handler = ConversationHandler(
entry_points=[CommandHandler('zvv', self.entry_point)],
states={
START: [MessageHandler(Filters.text, callback=self.get_start)],
DEST: [MessageHandler(Filters.text, callback=self.get_dest)]
},
fallbacks=[CommandHandler('zvv', self.entry_point)],
)
return conv_handler
def entry_point(self, update: Update, context: CallbackContext) -> None:
super().entry_point()
update.message.reply_text("What is the start point?")
return START
def get_start(self, update: Update, context: CallbackContext) -> None:
loc = update.message.text
self.start = loc
update.message.reply_text("Ok. Going from " + loc + ", what is the destination?")
return DEST
def get_dest(self, update: Update, context: CallbackContext) -> None:
loc = update.message.text
self.dest = loc
route = self.get_result()
update.message.reply_text("Her are the routes I've got:\n" + route)
return ConversationHandler.END
def get_result(self):
url = "http://transport.opendata.ch/v1/connections"
start = self.start
dest = self.dest
data = {"from" : start, "to" : dest, "limit" : 2}
try:
routes = requests.get(url, params=data).json()
result = routes["connections"]
text = result[0]["from"]["station"]["name"] + "" + result[0]["to"]["station"]["name"] + "\n\n"
for con in result:
text += "Start: " + datetime.datetime.fromtimestamp(int(con["from"]["departureTimestamp"])).strftime("%d/%m - %H:%M") + "\n"
text += "🏁 " + datetime.datetime.fromtimestamp(int(con["to"]["arrivalTimestamp"])).strftime("%d/%m - %H:%M") + "\n"
text += "" + con["duration"] + "\n"
text += "🗺️ Route:\n"
for step in con["sections"]:
if step["journey"] != None:
text += step["journey"]["passList"][0]["station"]["name"] + " (" + datetime.datetime.fromtimestamp(int(step["journey"]["passList"][0]["departureTimestamp"])).strftime("%H:%M") + ")\n"
text += "➡️ Linie " + self.number_to_emoji(step["journey"]["number"]) + "\n"
text += step["journey"]["passList"][-1]["station"]["name"] + " (" + datetime.datetime.fromtimestamp(int(step["journey"]["passList"][-1]["arrivalTimestamp"])).strftime("%H:%M") +")\n"
else:
text += "Walk."
text += "\n"
return text
except:
return "Invalid api call."
def number_to_emoji(self, number):
out = ""
numbers = ["0","1","2","3","4","5","6","7","8","9"]
for i in str(number):
out += numbers[int(i)]
return str(out)

View File

@@ -1,96 +0,0 @@
import datetime
from bot.api import telegram, google, weather, reddit
import Levenshtein as lev
class BotFramework():
"""Main functionality for a bot """
def __init__(self, name, version, prst):
"""Inits the Bot with a few conf. vars
Args: -> name:str - Name of the bot
-> version:str - Version number
-> prst:shelveObj - persistence
"""
self.version = version
self.name = name
# Persistent variable
self.persistence = prst
# Uptime counter
self.start_time = datetime.datetime.now()
self.telegram = telegram.TelegramIO(self.persistence)
self.weather = weather.WeatherFetch()
def react_chats(self):
"""Checks unanswered messages and answers them"""
num = self.telegram.fetch_updates()
for i in range(num):
self.react_command()
def react_command(self):
"""Reacts if a new command is present
Returns command, params iff the command is a hardware-one (for the clock), else None"""
message = self.telegram.process_message()
if message == None:
return
message = message[1:] #remove first "/"
tmp = message.split(" ")
cmd = tmp[0]
params = tmp[1:]
def call_command(cmd, par):
result = self.commands[cmd](*par)
# *params means the list is unpacked and handed over as separate arguments.
self.telegram.increase_counter("execute_activity")
return result
if self.is_command(cmd): # first word
result = call_command(cmd, params)
elif cmd in self.persistence["bot"]["aliases"]:
dealias = self.persistence["bot"]["aliases"][cmd].split(" ") # as a list
new_cmd = dealias[0]
params = dealias[1:] + params
result = "Substituted <code>" + cmd + "</code> to <code>" + self.persistence["bot"]["aliases"][cmd] + "</code> and got:\n\n"
result += call_command(new_cmd, params)
else:
result = "Command <code>" + tmp[0] + "</code> not found."
self.telegram.send_message(result)
def is_command(self, input):
"""checks if we have a command. Returns true if yes and False if not
Also sends a mesage if close to an existing command
"""
max_match = 0
command_candidate = ""
for command in self.commands.keys():
match = lev.ratio(input.lower(),command)
if match > 0.7 and match > max_match:
max_match = match
command_candidate = command
if max_match == 1:
return True
if max_match != 0:
self.telegram.send_message("Did you mean <code>" + command_candidate + "</code>?")
return False
def write_bot_log(self, function_name, error_message):
""""""
out = datetime.datetime.now().strftime("%d.%m.%y - %H:%M")
out += " @ " + function_name
out += " --> " + error_message
self.persistence["bot"]["log"] += [out]

View File

@@ -1,510 +1,63 @@
import datetime
from bot.api import telegram, google, weather, reddit
from telegram.ext import Updater, CommandHandler, CallbackQueryHandler, CallbackContext
import requests
import socket
import numpy as np
import time
import json
import datetime
import emoji
from . import api, commands
import bot.framework as FW
import logging
logger = logging.getLogger(__name__)
class ChatBot(FW.BotFramework):
""""""
def __init__(self, name, version, prst, hw_commands):
class ChatBot():
"""better framwork - unites all functions"""
def __init__(self, name, version, prst):
"""Inits the Bot with a few conf. vars
Args: -> name:str - Name of the bot
-> version:str - Version number
-> hw_commands - dict with commands executable by the clock module
-> prst:dict - persistence (overloaded dict that writes to json file)
-> logger - logging object to unify log messages
"""
super().__init__(name, version, prst)
# added by the launcher, we have self.modules (dict)
# Available commands. Must be manually updated!
self.commands = dict({
"help" : self.bot_show_help,
"status" : self.bot_print_status,
"log" : self.bot_print_log,
"lorem" : self.bot_print_lorem,
"weather" : self.bot_show_weather,
"google" : self.bot_google_search,
"events" : self.bot_print_events,
"wikipedia" : self.bot_show_wikipedia,
"zvv" : self.bot_zvv,
"cronjob" : self.bot_cronjob,
"joke" : self.bot_tell_joke,
"meme" : self.bot_send_meme,
"news" : self.bot_send_news,
"list" : self.bot_list,
"alias" : self.bot_save_alias,
}, **hw_commands)
# concat bot_commands + hw-commands
############################################################################
"""BOT-Commands: implementation"""
def bot_print_lorem(self, *args):
"""Prints a placeholder text."""
if "full" in args:
message = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. At tellus at urna condimentum mattis pellentesque id nibh. Convallis aenean et tortor at risus viverra adipiscing at in. Aliquet risus feugiat in ante metus dictum. Tincidunt augue interdum velit euismod in pellentesque massa placerat duis. Tincidunt vitae semper quis lectus nulla at. Quam nulla porttitor massa id neque aliquam vestibulum morbi blandit. Phasellus egestas tellus rutrum tellus pellentesque eu tincidunt. Gravida rutrum quisque non tellus orci. Adipiscing at in tellus integer feugiat. Integer quis auctor elit sed vulputate mi sit amet mauris. Risus pretium quam vulputate dignissim suspendisse in est. Cras fermentum odio eu feugiat pretium. Ut etiam sit amet nisl purus in mollis nunc sed. Elementum tempus egestas sed sed risus pretium quam. Massa ultricies mi quis hendrerit dolor magna eget."
else:
message = "Lorem ipsum dolor sit amet, bla bla bla..."
return message
def bot_print_status(self, *args):
"""Prints the bots current status and relevant information"""
delta = str(datetime.datetime.now() - self.start_time)
message = "BeebBop, this is " + self.name + " (V." + self.version + ")\n"
try:
ip = requests.get('https://api.ipify.org').text
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.connect(('8.8.8.8', 80))
(addr, port) = s.getsockname()
local_ips = addr
except:
ip = "not fetchable"
local_ips = "not fetchable"
message += "<pre>Status: Running :green_circle:\n"
message += "Uptime: " + delta[:delta.rfind(".")] + "\n"
message += "Reboots: " + str(self.persistence["global"]["reboots"]) + "\n"
message += "IP (public): " + ip + "\n"
message += "IP (private): " + str(local_ips) + "\n"
tot_r = np.array(self.persistence["bot"]["receive_activity"]["count"]).sum()
message += "Total messages read: " + str(tot_r) + "\n"
tot_s = np.array(self.persistence["bot"]["send_activity"]["count"]).sum()
message += "Total messages sent: " + str(tot_s) + "\n"
tot_e = np.array(self.persistence["bot"]["execute_activity"]["count"]).sum()
message += "Commands executed " + str(tot_e) + "</pre>"
return message
if "full" in args:
self.bot_print_log()
def bot_show_weather(self, *args):
"""Shows a weather-forecast for a given location"""
if len(args) != 1:
return "Invalid Syntax, please give one parameter, the location"
locations = {"freiburg": [47.9990, 7.8421], "zurich": [47.3769, 8.5417], "mulhouse": [47.7508, 7.3359]}
loc = args[0]
if loc.lower().replace("ü","u") in locations:
city = locations[loc.lower().replace("ü","u")]
else:
return "Couldn't find city, it might be added later though."
categories = {"Clouds": ":cloud:", "Rain": ":cloud_with_rain:", "Thunderstorm": "thunder_cloud_rain", "Drizzle": ":droplet:", "Snow": ":cloud_snow:", "Clear": ":sun:", "Mist": "Mist", "Smoke": "Smoke", "Haze": "Haze", "Dust": "Dust", "Fog": "Fog", "Sand": "Sand", "Dust": "Dust", "Ash": "Ash", "Squall": "Squall", "Tornado": "Tornado",}
days = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"]
today = datetime.datetime.today().weekday()
weather = self.weather.show_weather(city)
now = weather.pop(0)
message = "<b>Now:</b> " + categories[now["short"]] + "\n"
message += ":thermometer: " + str(now["temps"][0]) + "°\n\n"
for i, day in enumerate(weather):
if i == 0:
message += "<b>" + "Today" + ":</b> " + categories[day["short"]] + "\n"
else:
message += "<b>" + days[(today + i + 1) % 7] + ":</b> " + categories[day["short"]] + "\n"
message += ":thermometer: :fast_down_button: " + str(day["temps"][0]) + "° , :thermometer: :fast_up_button: " + str(day["temps"][1]) + "°\n\n"
return message
def bot_google_search(self, *args):
"""Does a google search and shows relevant links"""
if len(args) < 1:
return "Please tell me what to look for"
send_string = google.query(args)
return send_string
def bot_print_events(self, *args):
"""Shows a list of couple-related events and a countdown"""
events = {
"anniversary :red_heart:" : datetime.date(datetime.datetime.now().year,12,7),
"valentine's day :rose:": datetime.date(datetime.datetime.now().year,2,14),
"Marine's birthday :party_popper:": datetime.date(datetime.datetime.now().year,8,31),
"Remy's birthday :party_popper:": datetime.date(datetime.datetime.now().year,3,25),
"Christmas :wrapped_gift:" : datetime.date(datetime.datetime.now().year,12,24),
}
send_string = "Upcoming events: \n"
for key in events:
delta = events[key] - datetime.date.today()
if delta < datetime.timedelta(0):
delta += datetime.timedelta(days = 365)
send_string += key + ": " + str(delta.days) + " days \n"
return send_string
def bot_show_help(self, *args):
"""Show a help message.
Usage: help {keyword}
Keywords:
* no kw - list of all commands
* full - all commands and their docstring
* command-name - specific command and its docstring
"""
description = False
if len(args) > 0:
if args[0] == "full":
description = True
elif args[0] in self.commands:
send_text = "<b>" + args[0] + "</b>\n"
send_text += "<code>" + self.commands[args[0]].__doc__ + "</code>"
return send_text
send_text = "BeebBop, this is " + self.name + " (V." + self.version + ")\n"
send_text += "Here is what I can do up to now: \n"
entries = sorted(list(self.commands.keys()))
for entry in entries:
send_text += "<b>" + entry + "</b>"
if description:
send_text += " - <code>" + self.commands[entry].__doc__ + "</code>\n\n"
else:
send_text += "\n"
return send_text
def bot_print_log(self, *args):
"""Show an error-log, mostly of bad api-requests.
Usage: log {keyword}
Keywords:
* clear - clears log
* system - shows python output
"""
if "clear" in args:
self.persistence["bot"]["log"] = []
return "Log cleared"
elif "system" in args:
path="persistence/log.txt"
try:
file = open(path,"r")
content = file.read()
file.close()
return content
except:
return "could not read File"
send_text = ""
for event in self.persistence["bot"]["log"]:
send_text += event + "\n"
if send_text == "":
send_text += "No errors up to now"
return send_text
def bot_show_wikipedia(self, *args):
"""Shows the wikipedia entry for a given term
Usage: wikipedia &lt;language&gt; &lt;term&gt;
Keywords:
* language - de, fr, en ...
* term - search term, can consist of multiple words
"""
if len(args) == 0:
return "Please provide the first argument for language (de or fr or en or ...) and then your query"
args = list(args)
if len(args) >= 2:
url = "https://" + args.pop(0) + ".wikipedia.org/wiki/" + args.pop(0)
for word in args:
url += "_" + word
else:
url = "https://en.wikipedia.org/wiki/" + args[0]
print(url)
r = requests.get(url)
if r.status_code == 404:
return "No result found for query (404)"
return url
def bot_zvv(self, *args):
"""Uses the swiss travel api to return the best route between a start- and endpoint.
Usage: zvv &lt;start&gt; 'to' &lt;finish&gt;
Keywords:
* start - start point (can be more than 1 word9
* end - end point
"""
if len(args) < 3:
return "Please specify a start- and endpoint as well as a separator (the 'to')"
url = "http://transport.opendata.ch/v1/connections"
args = list(args)
goal = " ".join(args[:args.index("to")])
dest = " ".join(args[args.index("to")+1:])
data = {"from" : goal, "to" : dest, "limit" : 2}
try:
routes = requests.get(url, params=data).json()
result = routes["connections"]
text = result[0]["from"]["station"]["name"] + " :fast-forward_button: " + result[0]["to"]["station"]["name"] + "\n\n"
for con in result:
text += "Start: " + datetime.datetime.fromtimestamp(int(con["from"]["departureTimestamp"])).strftime("%d/%m - %H:%M") + "\n"
text += ":chequered_flag: " + datetime.datetime.fromtimestamp(int(con["to"]["arrivalTimestamp"])).strftime("%d/%m - %H:%M") + "\n"
text += ":hourglass_not_done: " + con["duration"] + "\n"
text += ":world_map: Route:\n"
for step in con["sections"]:
if step["journey"] != None:
text += step["journey"]["passList"][0]["station"]["name"] + " (" + datetime.datetime.fromtimestamp(int(step["journey"]["passList"][0]["departureTimestamp"])).strftime("%H:%M") + ")\n"
text += ":right_arrow: Linie " + step["journey"]["number"] + "\n"
text += step["journey"]["passList"][-1]["station"]["name"] + " (" + datetime.datetime.fromtimestamp(int(step["journey"]["passList"][-1]["arrivalTimestamp"])).strftime("%H:%M") +")\n"
else:
text += "Walk."
text += "\n"
return text
except:
return "Invalid api call."
def bot_cronjob(self, params):
"""Allows you to add a timed command, in a crontab-like syntax. Not implemented yet.
Example usage: /cronjob add 0 8 * * * weather Zürich
"""
return "I'm not functional yet. But when I am, it is gonna be legendary!"
def match_reddit_params(self, *args):
""" matches a list of two elements to a dict
returns: {"int": number, "str": name}
"""
r = {"int": 1, "str": "default"}
print(args)
if len(args) == 2:
p1, p2 = args[0], args[1]
try:
try:
r1 = int(p1)
r2 = p2
except:
r1 = int(p2)
r2 = p1
r["int"] = r1
r["str"] = r2
except:
self.write_bot_log("match_reddit_params", "could not match given params to known pattern")
elif len(args) == 1:
try:
try:
r["int"] = int(args[0])
except:
r["str"] = args[0]
except:
self.write_bot_log("match_reddit_params", "could not match given params to known pattern")
return r
def bot_tell_joke(self, *args):
"""Tells you the top joke on r/jokes
Usage: joke {number}
Keywords:
* number - number of jokes
"""
params_sorted = self.match_reddit_params(*args)
number = params_sorted["int"]
if len(params_sorted) > 1:
self.telegram.send_message("Ignored other params than number of jokes")
joke = reddit.get_random_rising("jokes", number, "text")
return joke
def bot_send_meme(self, *args):
"""Sends a meme from r/"""
subnames = {
"default" : "memes", # general case
"physics" : "physicsmemes",
"dank" : "dankmemes",
"biology" : "biologymemes",
"math" : "mathmemes"
}
params_sorted = self.match_reddit_params(*args)
number = params_sorted["int"]
if params_sorted["str"] in subnames:
subreddit_name = subnames[params_sorted["str"]]
else:
subreddit_name = subnames["default"]
urls = reddit.get_random_rising(subreddit_name, number, "photo")
for u in urls:
try:
self.telegram.send_photo(u["image"], u["caption"])
except:
self.write_bot_log("bot_send_meme", "could not send image")
return "Meme won't yeet"
return ""
def bot_send_news(self, *args):
"""Sends the first entries for new from r/"""
subnames = {
"default" : "worldnews",
"germany" : "germannews",
"france" : "francenews",
"europe" : "eunews",
"usa" : "usanews"
}
params_sorted = self.match_reddit_params(*args)
number = params_sorted["int"]
if params_sorted["str"] in subnames:
subreddit_name = subnames[params_sorted["str"]]
else:
subreddit_name = subnames["default"]
text = reddit.get_top(subreddit_name, number, "text")
return text
def bot_list(self, *args):
"""Interacts with a list (like a shopping list eg.)
Usage list &lt;name&gt; &lt;action&gt; {object}
Keyword:
* name - name of list
* action - create, delete, all, print, clear, add, remove
* object - might not be needed: index to delete, or item to add
Example usage:
list create shopping : creates list name shopping
list shopping add bread : adds bread to the list
list shopping print
list shopping clear
list all
"""
output = ""
# args = list(args)
if len(args) == 0:
return "Missing parameters"
try:
if args[0] == "all":
try:
return "Existing lists are: " + " ".join(list(self.persistence["global"]["lists"].keys()))
except:
return "No lists created."
if len(args) < 2:
return "Missing parameters"
if args[0] == "create":
lname = " ".join(args[1:])
self.persistence["global"]["lists"][lname] = []
output = "Created list " + lname
elif args[0] == "delete":
lname = " ".join(args[1:])
self.persistence["global"]["lists"].pop(lname, None) # no error if key doesnt exist
output = "Deleted list " + lname
else:
lname = args[0]
act = args[1]
if act == "print":
sl = self.persistence["global"]["lists"][lname]
output += "Content of " + lname + ":\n"
for ind,thing in enumerate(sl):
output += str(ind+1) + ". " + thing + "\n"
elif act == "clear":
self.persistence["global"]["lists"][lname] = []
output = "Cleared list " + lname
elif act == "add":
if len(args) < 3:
return "Missing paramaeter"
add = " ".join(args[2:])
self.persistence["global"]["lists"][lname] += [add]
return "Added " + add + "."
elif act == "remove":
if len(args) < 3:
return "Missing paramaeter"
try:
ind = int(args[2]) - 1
item = self.persistence["global"]["lists"][lname].pop(ind)
return "Removed " + item + " from list " + lname + "."
except:
return "Couldn't remove item."
return "Not working yet"
except:
output = "Could not handle your request. Maybe check the keys?"
return output
def bot_save_alias(self, *args):
"""Save a shortcut for special commands (+params)
Usage: alias &lt;alias-name&gt; {&lt;alias-name&gt; &lt;command&gt;}
Keywords:
* action - all, add, delete or clear (deleta all)
* alias-name - short name
* command - command to be executed, can contain arguments for the command
Example usage:
* alias sa list shopping add
* alias sp list shopping print
Means that '/sa ...' will now be treated as if typed '/list shopping add ...'
"""
# args = list(args)
if len(args) == 0:
return "Missing parameters"
try:
if args[0] == "clear":
self.persistence["bot"]["aliases"] = {}
return "All aliases cleared"
elif args[0] == "all":
try:
output = "Existing aliases are:\n"
for j, k in self.persistence["bot"]["aliases"].items():
output += j + " -&gt; " + k + "\n"
return output
except:
return "No aliases created."
if len(args) < 2:
return "Missing parameters"
if args[0] == "delete":
ak = args[1]
self.persistence["bot"]["aliases"].pop(ak, None) # no error if key doesnt exist
return "Deleted alias " + ak
if len(args) < 3:
return "Missing parameters"
if args[0] == "add":
ak = args[1]
cmd = " ".join(args[2:])
self.persistence["bot"]["aliases"][ak] = cmd
return "Created alias for " + ak
except:
return "Could not handle your request. Maybe check the keys?"
return "Bad input..."
self.persistence = prst
# Import submodules
self.api_weather = api.weather.WeatherFetch(api.keys.weather_api)
self.api_reddit = api.reddit.RedditFetch(api.keys.reddit_api)
self.api_search = api.search.WebSearch()
self.api_art = api.metmuseum.ArtFetch()
# and so on
self.telegram = Updater(api.keys.telegram_api, use_context=True)
self.dispatcher = self.telegram.dispatcher
self.commands = commands
# Mark them as available
self.help_module = self.commands.help.Help(prst)
self.sub_modules = {
"weather": self.commands.weather.Weather(self.api_weather, prst),
"help" : self.help_module,
"status" : self.commands.status.Status(name, version, prst),
"zvv" : self.commands.zvv.Zvv(prst),
"list" : self.commands.lists.Lists(prst),
# "alias" : commands.alias.Alias(self.dispatcher, prst),
"joke" : self.commands.reddit.Joke(self.api_reddit, prst),
"meme" : self.commands.reddit.Meme(self.api_reddit, prst),
# "news" : self.commands.reddit.News(self.api_reddit, prst),
"search" : self.commands.search.Search(self.api_search, prst),
"plaintext" : self.commands.plaintext.Plain(prst) # for handling non-command messages that should simply contribute to statistics
}
# must be a class that has a method create_handler
def add_commands(self):
for k in self.sub_modules:
self.dispatcher.add_handler(self.sub_modules[k].create_handler())
self.help_module.add_commands(self.sub_modules)
def start(self):
self.sub_modules = {**{"clock" : self.commands.clock.Clock(self.persistence, self.modules["clock"], self.api_art)}, **self.sub_modules}
self.add_commands()
self.telegram.start_polling()
# self.telegram.idle()