Readded clock commands

This commit is contained in:
Remy Moll 2021-02-18 10:28:23 +01:00
parent a50c857204
commit d9ee163ad6
7 changed files with 250 additions and 236 deletions

View File

@ -1,23 +1,165 @@
from .template import * from .template import *
import time
import numpy
CHOOSE, ARGS = range(2) CHOOSE, ADDARG = range(2)
MESSAGE, WAKE, ALARM, IMAGE, ART = range(3,8)
class Clock(BotFunc): class Clock(BotFunc):
"""pass on commands to clock-module""" """pass on commands to clock-module"""
def __init__(self, prst, hw_commands): def __init__(self, prst, clock_module):
super().__init__(prst) super().__init__(prst)
self.hw_commands = hw_commands self.clock = clock_module
def create_handler(self): def create_handler(self):
handler = ConversationHandler( handler = ConversationHandler(
entry_points=[CommandHandler("clock", self.entry_point)], entry_points=[CommandHandler("clock", self.entry_point)],
states={ states={
CHOOSE : [], CHOOSE : [
ARGS : [] CallbackQueryHandler(self.wake_light, pattern="^wake$"),
CallbackQueryHandler(self.alarm_blink, pattern="^alarm$"),
CallbackQueryHandler(self.show_message, pattern="^message$"),
CallbackQueryHandler(self.show_image, pattern="^image$"),
CallbackQueryHandler(self.art_gallery, pattern="^gallery$"),
],
ADDARG : [MessageHandler(Filters.text, callback=self.get_arg1)],
MESSAGE: [MessageHandler(Filters.text, callback=self.exec_show_message)],
WAKE : [MessageHandler(Filters.text, callback=self.exec_wake_light)],
ALARM : [MessageHandler(Filters.text, callback=self.exec_alarm_blink)],
IMAGE : [MessageHandler(Filters.photo, callback=self.exec_show_image)],
ART : [MessageHandler(Filters.text, callback=self.exec_art_gallery)],
}, },
fallbacks=[CommandHandler('clock', self.entry_point)], fallbacks=[CommandHandler('clock', self.entry_point)],
) )
return handler return handler
def entry_point(self): def entry_point(self, update: Update, context: CallbackContext) -> None:
super().entry_point() super().entry_point()
keyboard = [
[InlineKeyboardButton("Make a wake-light", callback_data="wake")],
[InlineKeyboardButton("Blink as alarm", callback_data="alarm")],
[InlineKeyboardButton("Show a message", callback_data="message")],
[InlineKeyboardButton("Show an image", callback_data="image")],
[InlineKeyboardButton("Art gallery!", callback_data="gallery")],
]
reply_markup = InlineKeyboardMarkup(keyboard)
update.message.reply_text("What exactly do you want?", reply_markup=reply_markup)
return CHOOSE
def wake_light(self, update: Update, context: CallbackContext) -> None:
query = update.callback_query
query.answer()
query.edit_message_text("Ok. How long should the color cycle last? (In seconds)")
return WAKE
def alarm_blink(self, update: Update, context: CallbackContext) -> None:
query = update.callback_query
query.answer()
query.edit_message_text("Ok. How long should it blink? (In seconds)")
self.next_state = {"ALARM" : "What frequency (Hertz)"}
return ADDARG
def show_message(self, update: Update, context: CallbackContext) -> None:
query = update.callback_query
query.answer()
query.edit_message_text("Ok. What message will I show?")
return MESSAGE
def show_image(self, update: Update, context: CallbackContext) -> None:
query = update.callback_query
query.answer()
query.edit_message_text("How long (in minutes) should the image be displayed?")
self.next_state = {"IMAGE" : "Please send me the photo to display."}
return ADDARG
def art_gallery(self, update: Update, context: CallbackContext) -> None:
query = update.callback_query
query.answer()
query.edit_message_text("Ok. How long should we display art? (in hours")
return ART
def get_arg1(self, update: Update, context: CallbackContext) -> None:
a = update.message.text
self.additional_argument = a
update.message.reply_text("Furthermore: "+ list(self.next_state.values())[0])
return list(self.next_state.keys())[0]
###### actually running clock actions
def exec_wake_light(self, update: Update, context: CallbackContext) -> None:
duration = update.message.text
def output(duration):
self.clock.set_brightness(value=0.1)
start_color = numpy.array([153, 0, 51])
end_color = numpy.array([255, 255, 0])
empty = numpy.zeros((16,32))
ones = empty
ones[ones == 0] = 1
gradient = end_color - start_color
# 20 steps should be fine => sleep_time = duration / 20
for i in range(20):
ct = i/20 * gradient
col = [int(x) for x in ct+start_color]
self.clock.IO.set_matrix(ones,colors=[col])
time.sleep(int(duration) / 20)
self.clock.run(output,(duration,))
return ConversationHandler.END
def exec_alarm_blink(self, update: Update, context: CallbackContext) -> None:
duration = self.additional_argument
frequency = update.message.text
def output(duration, frequency):
self.set_brightness(value=1)
duration = int(duration)
frequency = int(frequency)
n = duration * frequency / 2
empty = numpy.zeros((16,32))
red = empty.copy()
red[red == 0] = 3
for i in range(int(n)):
self.IO.set_matrix(red)
time.sleep(1/frequency)
self.IO.set_matrix(empty)
time.sleep(1/frequency)
if not(duration == 0 or frequency == 0):
update.message.reply_text("Now blinking")
self.clock.run(output,(duration, frequency))
print("DOOONE")
return ConversationHandler.END
def exec_show_image(self, update: Update, context: CallbackContext) -> None:
duration = self.additional_argument
img = update.message.photo
def output(image, duration):
self.clock.IO.set_matrix_rgb([100,0,0])
self.clock.run(output,("image", duration))
return ConversationHandler.END
def exec_show_message(self, update: Update, context: CallbackContext) -> None:
message_str = update.message.text
update.message.reply_text("Now showing: " + message_str)
self.clock.run(self.clock.IO.text_scroll,(message_str, self.clock.tspeed, [200,200,200]))
return ConversationHandler.END
def exec_art_gallery(self, update: Update, context: CallbackContext) -> None:
update.message.reply_text("Puuh, thats tough, I'm not ready for that.")
return ConversationHandler.END

View File

@ -61,7 +61,7 @@ class Status(BotFunc):
message += "IP (public): `" + ip + "`\n" message += "IP (public): `" + ip + "`\n"
message += "IP (private): `" + str(local_ips) + "`\n" message += "IP (private): `" + str(local_ips) + "`\n"
u = str(self.get_ngrok_url()) u = str(self.get_ngrok_url())
message += "URL: [" + u + "](" + u + "]\n" message += "URL: [" + u + "](" + u + ")\n"
tot_r = np.array(self.persistence["bot"]["receive_activity"]["count"]).sum() tot_r = np.array(self.persistence["bot"]["receive_activity"]["count"]).sum()
message += "Total messages read: `" + str(tot_r) + "`\n" message += "Total messages read: `" + str(tot_r) + "`\n"

View File

@ -8,7 +8,7 @@ logger = logging.getLogger(__name__)
class ChatBot(): class ChatBot():
"""better framwork - unites all functions""" """better framwork - unites all functions"""
def __init__(self, name, version, hw_commands, prst): def __init__(self, name, version, prst):
"""Inits the Bot with a few conf. vars """Inits the Bot with a few conf. vars
Args: -> name:str - Name of the bot Args: -> name:str - Name of the bot
-> version:str - Version number -> version:str - Version number
@ -16,7 +16,9 @@ class ChatBot():
-> prst:dict - persistence (overloaded dict that writes to json file) -> prst:dict - persistence (overloaded dict that writes to json file)
-> logger - logging object to unify log messages -> logger - logging object to unify log messages
""" """
# added by the launcher, we have self.modules (dict)
self.persistence = prst
# Import submodules # Import submodules
self.api_weather = api.weather.WeatherFetch(api.keys.weather_api) self.api_weather = api.weather.WeatherFetch(api.keys.weather_api)
# self.reddit_api = api.reddit.RedditFetch() # self.reddit_api = api.reddit.RedditFetch()
@ -24,18 +26,19 @@ class ChatBot():
self.telegram = Updater(api.keys.telegram_api, use_context=True) self.telegram = Updater(api.keys.telegram_api, use_context=True)
self.dispatcher = self.telegram.dispatcher self.dispatcher = self.telegram.dispatcher
self.commands = commands
# Mark them as available # Mark them as available
self.help_module = commands.help.Help(prst) self.help_module = self.commands.help.Help(prst)
self.sub_modules = { self.sub_modules = {
"weather": commands.weather.Weather(self.api_weather, prst), "weather": self.commands.weather.Weather(self.api_weather, prst),
"help" : self.help_module, "help" : self.help_module,
"status" : commands.status.Status(name, version, prst), "status" : self.commands.status.Status(name, version, prst),
"zvv" : commands.zvv.Zvv(prst), "zvv" : self.commands.zvv.Zvv(prst),
"list" : commands.lists.Lists(prst), "list" : self.commands.lists.Lists(prst),
#"alias" : commands.alias.Alias(self.dispatcher, prst), #"alias" : commands.alias.Alias(self.dispatcher, prst),
"clock" : commands.clock.Clock(prst, hw_commands), "plaintext" : self.commands.plaintext.Plain(prst) # for handling non-command messages that should simply contribute to statistics
"plaintext" : commands.plaintext.Plain(prst) # for handling non-command messages that should simply contribute to statistics
} }
# "events" : self.bot_print_events, # "events" : self.bot_print_events,
@ -46,9 +49,6 @@ class ChatBot():
# "news" : self.bot_send_news, # "news" : self.bot_send_news,
# } # }
# must be a class that has a method create_handler # must be a class that has a method create_handler
self.add_commands()
def add_commands(self): def add_commands(self):
for k in self.sub_modules: for k in self.sub_modules:
@ -56,6 +56,8 @@ class ChatBot():
self.help_module.add_commands(self.sub_modules) self.help_module.add_commands(self.sub_modules)
def START(self): def start(self):
self.sub_modules = {**{"clock" : self.commands.clock.Clock(self.persistence, self.modules["clock"])}, **self.sub_modules}
self.add_commands()
self.telegram.start_polling() self.telegram.start_polling()
# self.telegram.idle() # self.telegram.idle()

View File

@ -1,19 +1,19 @@
import datetime import datetime
import time import time
import json import json
from threading import Thread from threading import Thread, Timer
import numpy import numpy
from clock.api import led from clock.api import led
################################################################################
#start of actual programm.
class ClockFace(object): class ClockFace(object):
"""Actual functions one might need for a clock""" """Actual functions one might need for a clock"""
def __init__(self, text_speed=18, prst=""): def __init__(self, text_speed=18, prst=""):
"""""" """"""
# added by the launcher, we have self.modules (dict)
self.persistence = prst self.persistence = prst
self.IO = led.OutputHandler(32,16) self.IO = led.OutputHandler(32,16)
self.tspeed = text_speed self.tspeed = text_speed
@ -23,16 +23,47 @@ class ClockFace(object):
self.output_queue = [] self.output_queue = []
# Threads to execute next # Threads to execute next
self.commands = { self.weather = {"weather":"", "high":"", "low":"", "show":"temps"}
"blink" : self.alarm_blink, self.weather_raw = {}
"wakeup" : self.wake_light, # different?
"showmessage" : self.show_message,
}
self.weather = ""
self.brightness_overwrite = {"value" : 1, "duration" : 0} self.brightness_overwrite = {"value" : 1, "duration" : 0}
def start(self):
while datetime.datetime.now().strftime("%H%M%S")[-2:] != "00":
pass
RepeatedTimer(60, self.clock_loop)
def clock_loop(self):
t = int(datetime.datetime.now().strftime("%H%M"))
if t % 5 == 0:
# switch secondary face every 5 minutes
weather = self.modules["bot"].api_weather.show_weather([47.3769, 8.5417]) # zürich
if weather != self.weather_raw and len(weather) != 0:
td = weather[1]
low = td["temps"][0]
high = td["temps"][1]
self.weather["weather"] = td["short"]
self.weather["high"] = high
self.weather["low"] = low
elif len(weather) == 0:
self.weather["weather"] = "error"
self.weather["high"] = "error"
self.weather["low"] = "error"
# if weather == self.weather.raw do nothing
if self.weather["show"] == "weather":
next = "temps"
else:
next = "weather"
self.weather["show"] = next
self.set_face()
def run(self, command, kw=()): def run(self, command, kw=()):
"""Checks for running threads and executes the ones in queue""" """Checks for running threads and executes the ones in queue"""
def enhanced_run(command, kw): def enhanced_run(command, kw):
@ -56,15 +87,9 @@ class ClockFace(object):
############################################################################ ############################################################################
### basic clock commands ### basic clock commands
def set_face(self, weather): def set_face(self):
"""""" """"""
self.weather = weather self.run(self.IO.clock_face,(self.weather,))
self.run(self.IO.clock_face,(weather,))
def text_scroll(self, text, color=""):
""""""
self.run(self.IO.text_scroll,(text, self.tspeed, color))
def set_brightness(self, overwrite=[],value=-1): def set_brightness(self, overwrite=[],value=-1):
@ -88,63 +113,34 @@ class ClockFace(object):
self.IO.output.set_brightness(brightness) self.IO.output.set_brightness(brightness)
############################################################################
### Higher level commands, accessible from the chat-bot
def external_action(self, command, params):
""""""
self.commands[command](*params)
def wake_light(self, duration=600):
"""Simulates a sunris, takes one optional parameter: the duration"""
def output(duration):
self.set_brightness(value=0.1)
start_color = numpy.array([153, 0, 51])
end_color = numpy.array([255, 255, 0])
empty = numpy.zeros((16,32))
ones = empty
ones[ones == 0] = 1
gradient = end_color - start_color
# 20 steps should be fine => sleep_time = duration / 20
for i in range(20):
ct = i/20 * gradient
col = [int(x) for x in ct+start_color]
self.IO.set_matrix(ones,colors=[col])
time.sleep(int(duration) / 20)
self.run(output,(duration,)) #######################################################
class RepeatedTimer(object):
def __init__(self, interval, function, *args, **kwargs):
self._timer = None
self.interval = interval
self.function = function
self.args = args
self.kwargs = kwargs
self.is_running = False
self.next_call = time.time()
self.start()
def _run(self):
self.is_running = False
self.start()
self.function(*self.args, **self.kwargs)
def alarm_blink(self, duration=0, frequency=0): def start(self):
"""Blinks the whole screen (red-black). Duration in seconds, frequency in Hertz""" if not self.is_running:
def output(duration, frequency): self.next_call += self.interval
self.set_brightness(value=1) self._timer = Timer(self.next_call - time.time(), self._run)
duration = int(duration) self._timer.start()
frequency = int(frequency) self.is_running = True
n = duration * frequency / 2
empty = numpy.zeros((16,32))
red = empty.copy()
red[red == 0] = 3
for i in range(int(n)):
self.IO.set_matrix(red)
time.sleep(1/frequency)
self.IO.set_matrix(empty)
time.sleep(1/frequency)
if not(duration == 0 or frequency == 0):
self.run(output,(duration, frequency))
def stop(self):
def image_show(self, image, duration): self._timer.cancel()
"""Shows a 16x32 image for duration seconds""" self.is_running = False
def output(image, duration):
self.IO.set_matrix_rgb(red)
self.run(output,(image, duration))
def show_message(self, *args):
"""Runs a text message over the screen. Obviously needs the text"""
# keep in mind, in this case args is a tuple of all words
message_str = " ".join(args)
print("SHOWING (CLOCK): " + message_str)
self.text_scroll(message_str)

View File

@ -18,6 +18,8 @@ import requests
class DashBoard(): class DashBoard():
"""""" """"""
# added by the launcher, we have self.modules (dict)
def __init__(self, host_ip, prst): def __init__(self, host_ip, prst):
## pre-sets ## pre-sets
@ -51,7 +53,7 @@ class DashBoard():
return kids return kids
def START(self): def start(self):
self.app.run_server(host=self.host_ip, port=80)#, debug=True) self.app.run_server(host=self.host_ip, port=80)#, debug=True)
@ -173,7 +175,8 @@ class DashBoard():
body = [] body = []
try: try:
content = self.bot.api_weather.show_weather([47.3769, 8.5417]) # still zürich bot = self.modules["bot"]
content = bot.api_weather.show_weather([47.3769, 8.5417]) # still zürich
wt = content.pop(0) wt = content.pop(0)
body.append(weather_item("Jetzt", wt["short"], wt["temps"])) body.append(weather_item("Jetzt", wt["short"], wt["temps"]))

View File

@ -1,10 +1,8 @@
# functionality # functionality
import bot.main
import bot2.main import bot2.main
import clock.main import clock.main
import dashboard.main import dashboard.main
# wrapper
import wrapper
import persistence.main import persistence.main
# various # various
@ -12,6 +10,7 @@ import logging
from threading import Thread from threading import Thread
import os import os
if os.name == "nt": if os.name == "nt":
logging.basicConfig( logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO
@ -38,28 +37,18 @@ class Launcher():
self.persistence["global"]["reboots"] += 1 self.persistence["global"]["reboots"] += 1
self.clock_module = clock.main.ClockFace(prst=self.persistence) self.clock_module = clock.main.ClockFace(prst=self.persistence)
self.bot_module = bot2.main.ChatBot(name="Norbit", version="3.0a", prst=self.persistence, hw_commands=self.clock_module.commands) self.bot_module = bot2.main.ChatBot(name="Norbit", version="3.0a", prst=self.persistence)
self.dashboard_module = dashboard.main.DashBoard(host_ip="0.0.0.0", prst=self.persistence) self.dashboard_module = dashboard.main.DashBoard(host_ip="0.0.0.0", prst=self.persistence)
self.threads = [] self.modules = {
#self.threads.append(Thread(target=self.chatbot)) "bot" : self.bot_module,
"dashboard" : self.dashboard_module,
self.threads.append(Thread(target=self.clock)) "clock" : self.clock_module,
self.threads.append(Thread(target=self.dashboard)) }
for i in self.threads:
i.start()
self.chatbot()
def clock(self): for module in self.modules.values():
self.clock = wrapper.ClockWrapper(self.clock_module, self.bot_module) module.modules = self.modules
module.start()
def chatbot(self):
self.bot = wrapper.BotWrapper(self.bot_module, self.clock_module)
def dashboard(self):
self.dashboard = wrapper.DashBoardWrapper(self.dashboard_module, self.bot_module)
def init_persistence(self): def init_persistence(self):

View File

@ -1,118 +0,0 @@
import time
import datetime
import logging
import threading
logger = logging.getLogger(__name__)
class Wrapper():
"""Wrapper skeleton for the modules (bot, clock, dashboard ... maybe more to come?)"""
def __init__(self, own_module, *other_modules):
self.own = own_module
self.others = other_modules
logger.debug("Starting " + self.own.__class__.__name__ + " through wrapper.")
def external_action(self, func, *args, **kwargs):
"""do a special action initiated by other modules"""
logger.info("External request to " + self.own.__class__.__name__ + ".")
class ClockWrapper(Wrapper):
"""Wrapper for the CLOCK-functionality"""
def __init__(self, own_module, *other_modules):
""""""
super().__init__(own_module, *other_modules)
self.weather = {"weather":"", "high":"", "low":"", "show":"temps"}
self.weather_raw = {}
self.START()
def START(self): # I prefer the name tick_tack
"""Runs the showing of the clock-face periodically: update every minute"""
def perform_loop():
t = int(datetime.datetime.now().strftime("%H%M"))
if t % 5 == 0:
# switch secondary face every 5 minutes
weather = self.others[0].api_weather.show_weather([47.3769, 8.5417]) # zürich
if weather != self.weather_raw and len(weather) != 0:
td = weather[1]
low = td["temps"][0]
high = td["temps"][1]
self.weather["weather"] = td["short"]
self.weather["high"] = high
self.weather["low"] = low
elif len(weather) == 0:
self.weather["weather"] = "error"
self.weather["high"] = "error"
self.weather["low"] = "error"
# if weather == self.weather.raw do nothing
if self.weather["show"] == "weather":
next = "temps"
else:
next = "weather"
self.weather["show"] = next
self.own.set_face(self.weather)
perform_loop()
while datetime.datetime.now().strftime("%H%M%S")[-2:] != "00":
pass
RepeatedTimer(60, perform_loop)
class BotWrapper(Wrapper):
"""Wrapper for the BOT-functionality"""
def __init__(self, own_module, *other_modules):
""""""
super().__init__(own_module, *other_modules)
self.own.START()
class DashBoardWrapper(Wrapper):
def __init__(self, own_module, *other_modules):
"""Wrapper for the dashboard functionality"""
super().__init__(own_module, other_modules)
# self.mainloop(1 * 3600) # 1 hour refresh-cycle
# cannot get called through mainloop, will use the included callback-functionality of Dash
self.own.bot = other_modules[0]
self.own.START()
class RepeatedTimer(object):
def __init__(self, interval, function, *args, **kwargs):
self._timer = None
self.interval = interval
self.function = function
self.args = args
self.kwargs = kwargs
self.is_running = False
self.next_call = time.time()
self.start()
def _run(self):
self.is_running = False
self.start()
self.function(*self.args, **self.kwargs)
def start(self):
if not self.is_running:
self.next_call += self.interval
self._timer = threading.Timer(self.next_call - time.time(), self._run)
self._timer.start()
self.is_running = True
def stop(self):
self._timer.cancel()
self.is_running = False