wip: separate clock and rest into separate apps.
This commit is contained in:
92
clock/c_back.py
Normal file
92
clock/c_back.py
Normal file
@@ -0,0 +1,92 @@
|
||||
import numpy as np
|
||||
import datetime
|
||||
|
||||
from . import helpers
|
||||
|
||||
class ClockBackend:
|
||||
"""Heavy lifting of matrix operations"""
|
||||
def __init__(self):
|
||||
self.MOP = helpers.computations.MatrixOperations()
|
||||
|
||||
self.weather = {"weather":"", "high":"", "low":""}
|
||||
self.weather_raw = {}
|
||||
self.weather_face_swap = False
|
||||
|
||||
self.brightness = 1
|
||||
self.brightness_overwrite = {"value" : 1, "duration" : 0}
|
||||
|
||||
|
||||
def start(self):
|
||||
self.out = self.modules["broadcast"]
|
||||
helpers.timer.RepeatedTimer(15, self.clock_loop)
|
||||
|
||||
|
||||
|
||||
def clock_loop(self):
|
||||
print("looping")
|
||||
|
||||
t = int(datetime.datetime.now().strftime("%H%M"))
|
||||
|
||||
if t % 5 == 0:
|
||||
# switch secondary face every 5 minutes
|
||||
weather = self.modules["bot"].api_weather.show_weather([47.3769, 8.5417]) # zürich
|
||||
|
||||
if weather != self.weather_raw and len(weather) != 0:
|
||||
td = weather[1]
|
||||
low = td["temps"][0]
|
||||
high = td["temps"][1]
|
||||
self.weather["weather"] = td["short"]
|
||||
self.weather["high"] = high
|
||||
self.weather["low"] = low
|
||||
elif len(weather) == 0:
|
||||
self.weather["weather"] = "error"
|
||||
self.weather["high"] = "error"
|
||||
self.weather["low"] = "error"
|
||||
# if weather == self.weather.raw do nothing
|
||||
self.weather_face_swap = not self.weather_face_swap
|
||||
|
||||
self.send_face()
|
||||
|
||||
|
||||
|
||||
|
||||
def send_face(self):
|
||||
"""Set the clock face (time + weather) by getting updated info - gets called every minute"""
|
||||
matrices = self.MOP.clock_face(self.weather)
|
||||
if self.weather_face_swap:
|
||||
matrices = [matrices[0], matrices[2], matrices[1]]
|
||||
|
||||
# apply brightness
|
||||
b = self.get_brightness()
|
||||
matrices = [(b * m).tolist() for m in matrices]
|
||||
self.out.queue.append({"matrices" : matrices})
|
||||
|
||||
|
||||
def get_brightness(self):
|
||||
"""Checks, what brightness rules to apply"""
|
||||
|
||||
is_WE = datetime.datetime.now().weekday() > 4
|
||||
now = int(datetime.datetime.now().strftime("%H%M"))
|
||||
if (is_WE and (now > 1000 and now < 2200)) or ((not is_WE) and (now > 800 and now < 2130)):
|
||||
brightness = 0.8
|
||||
else:
|
||||
brightness = 0.01
|
||||
|
||||
return brightness
|
||||
|
||||
|
||||
# def text_scroll(self, text, color=[[200,200,200]]):
|
||||
# pixels = self.MOP.text_converter(text, 12, color)
|
||||
# sleep_time = 1 / self.tspeed
|
||||
# width = self.shape[1]
|
||||
# frames = pixels.shape[1] - width
|
||||
# if frames <= 0:
|
||||
# frames = 1
|
||||
|
||||
# for i in range(frames):
|
||||
# visible = pixels[:,i:width+i]
|
||||
# self.IO.put(visible*self.brightness)
|
||||
# time.sleep(sleep_time)
|
||||
# time.sleep(10 * sleep_time)
|
||||
|
||||
|
@@ -8,9 +8,8 @@ from . import hardware, helpers
|
||||
class SensorReadout:
|
||||
"""Overview class for (actual and potential) sensor sources"""
|
||||
|
||||
def __init__(self, prst=object):
|
||||
def __init__(self):
|
||||
""""""
|
||||
self.persistence = prst
|
||||
self.sensor_modules = { # we already call them, they are objects and not classes anymore
|
||||
"temperature" : hardware.sensors.TemperatureModule(),
|
||||
"humidity" : hardware.sensors.HumidityModule(),
|
61
clock/c_out.py
Normal file
61
clock/c_out.py
Normal file
@@ -0,0 +1,61 @@
|
||||
import datetime
|
||||
import time
|
||||
from threading import Thread
|
||||
import numpy as np
|
||||
|
||||
from . import hardware, helpers
|
||||
|
||||
|
||||
class ClockFace:
|
||||
"""Actual functions one might need for a clock"""
|
||||
|
||||
def __init__(self):
|
||||
""""""
|
||||
# added by the launcher, we have self.modules (dict)
|
||||
|
||||
|
||||
|
||||
self.IO = hardware.led.get_handler()
|
||||
self.shape = self.IO.shape # (16,32) for now
|
||||
# TODO handle differently!
|
||||
self.MOP = helpers.computations.MatrixOperations()
|
||||
|
||||
|
||||
|
||||
def start(self):
|
||||
helpers.timer.RepeatedTimer(60, self.clock_loop)
|
||||
# schedule for in 60 seconds
|
||||
self.clock_loop()
|
||||
# run once now
|
||||
# TODO start as a thread
|
||||
|
||||
|
||||
# TODO Turn off when button pressed?
|
||||
|
||||
def clock_loop(self):
|
||||
t_start = datetime.datetime.now()
|
||||
|
||||
t_minutes = int(datetime.datetime.now().strftime("%H%M"))
|
||||
|
||||
has_queue, data = self.modules["receive"].fetch_data()
|
||||
|
||||
if data == {}:
|
||||
matrices = self.MOP.get_fallback()
|
||||
else:
|
||||
matrices = [np.asarray(d).astype(int) for d in data["matrices"]]
|
||||
|
||||
self.IO.put(matrices)
|
||||
|
||||
if has_queue:
|
||||
tnext = 1
|
||||
else:
|
||||
tnext = 30
|
||||
|
||||
|
||||
t_end = datetime.datetime.now()
|
||||
delta_planned = datetime.timedelta(seconds = tnext)
|
||||
delta = delta_planned - (t_end - t_start)
|
||||
|
||||
time.sleep(max(delta.total_seconds(), 0))
|
||||
self.clock_loop()
|
||||
|
138
clock/cout.py
138
clock/cout.py
@@ -1,138 +0,0 @@
|
||||
import datetime
|
||||
import time
|
||||
from threading import Thread
|
||||
import numpy
|
||||
|
||||
from . import hardware, helpers
|
||||
|
||||
|
||||
class ClockFace:
|
||||
"""Actual functions one might need for a clock"""
|
||||
|
||||
def __init__(self, text_speed=18, prst=object):
|
||||
""""""
|
||||
# added by the launcher, we have self.modules (dict)
|
||||
|
||||
# hard coded, but can be changed to taste
|
||||
self.tspeed = text_speed
|
||||
self.primary = [200, 200, 200]
|
||||
self.secondary = [10, 200, 10]
|
||||
self.error = [200, 10, 10]
|
||||
|
||||
self.persistence = prst
|
||||
self.IO = hardware.led.get_handler()
|
||||
self.shape = self.IO.shape # (16,32) for now
|
||||
self.MOP = helpers.helper.MatrixOperations(self.shape, default_colors={"primary": self.primary, "secondary": self.secondary, "error": self.error})
|
||||
|
||||
self.output_thread = ""
|
||||
# Action the thread is currently performing
|
||||
self.output_queue = []
|
||||
# Threads to execute next
|
||||
|
||||
self.weather = {"weather":"", "high":"", "low":"", "show":"temps"}
|
||||
self.weather_raw = {}
|
||||
|
||||
self.brightness = 1
|
||||
self.brightness_overwrite = {"value" : 1, "duration" : 0}
|
||||
|
||||
|
||||
def start(self):
|
||||
self.clock_loop()
|
||||
while datetime.datetime.now().strftime("%H%M%S")[-2:] != "00":
|
||||
pass
|
||||
helpers.timer.RepeatedTimer(60, self.clock_loop)
|
||||
self.clock_loop()
|
||||
|
||||
|
||||
def clock_loop(self):
|
||||
t = int(datetime.datetime.now().strftime("%H%M"))
|
||||
|
||||
if t % 5 == 0:
|
||||
# switch secondary face every 5 minutes
|
||||
weather = self.modules["bot"].api_weather.show_weather([47.3769, 8.5417]) # zürich
|
||||
|
||||
if weather != self.weather_raw and len(weather) != 0:
|
||||
td = weather[1]
|
||||
low = td["temps"][0]
|
||||
high = td["temps"][1]
|
||||
self.weather["weather"] = td["short"]
|
||||
self.weather["high"] = high
|
||||
self.weather["low"] = low
|
||||
elif len(weather) == 0:
|
||||
self.weather["weather"] = "error"
|
||||
self.weather["high"] = "error"
|
||||
self.weather["low"] = "error"
|
||||
# if weather == self.weather.raw do nothing
|
||||
|
||||
if self.weather["show"] == "weather":
|
||||
next = "temps"
|
||||
else:
|
||||
next = "weather"
|
||||
self.weather["show"] = next
|
||||
|
||||
self.run(self.set_face,())
|
||||
|
||||
|
||||
def run(self, command, kw=()):
|
||||
"""Checks for running threads and executes the ones in queue"""
|
||||
def enhanced_run(command, kw):
|
||||
""""""
|
||||
self.output_thread = "Running " + str(command)
|
||||
command(*kw)
|
||||
self.set_brightness()
|
||||
self.output_thread = ""
|
||||
if len(self.output_queue) != 0:
|
||||
n = self.output_queue.pop(0)
|
||||
enhanced_run(n[0],n[1])
|
||||
else:
|
||||
self.set_face()
|
||||
|
||||
if len(self.output_thread) == 0:
|
||||
t = Thread(target=enhanced_run, args=(command, kw))
|
||||
t.start()
|
||||
else:
|
||||
self.output_queue.append([command,kw])
|
||||
|
||||
|
||||
############################################################################
|
||||
### basic clock commands
|
||||
def set_face(self):
|
||||
"""Set the clock face (time + weather) by getting updated info - gets called every minute"""
|
||||
face = self.MOP.clock_face(self.weather)
|
||||
self.IO.put(face * self.brightness)
|
||||
|
||||
|
||||
def set_brightness(self, value=-1, overwrite=[]):
|
||||
"""Checks, what brightness rules to apply"""
|
||||
|
||||
if value != -1:
|
||||
self.brightness = value
|
||||
return
|
||||
|
||||
if len(overwrite) != 0:
|
||||
self.brightness_overwrite = overwrite
|
||||
|
||||
is_WE = datetime.datetime.now().weekday() > 4
|
||||
now = int(datetime.datetime.now().strftime("%H%M"))
|
||||
if (is_WE and (now > 1000 and now < 2200)) or ((not is_WE) and (now > 800 and now < 2130)):
|
||||
brightness = 0.8
|
||||
else:
|
||||
brightness = 0.01
|
||||
|
||||
self.brightness = brightness
|
||||
|
||||
|
||||
def text_scroll(self, text, color=[[200,200,200]]):
|
||||
pixels = self.MOP.text_converter(text, 12, color)
|
||||
sleep_time = 1 / self.tspeed
|
||||
width = self.shape[1]
|
||||
frames = pixels.shape[1] - width
|
||||
if frames <= 0:
|
||||
frames = 1
|
||||
|
||||
for i in range(frames):
|
||||
visible = pixels[:,i:width+i]
|
||||
self.IO.put(visible*self.brightness)
|
||||
time.sleep(sleep_time)
|
||||
time.sleep(10 * sleep_time)
|
||||
|
@@ -3,7 +3,7 @@ import colorsys
|
||||
import pygame.gfxdraw
|
||||
import time
|
||||
import pygame
|
||||
import numpy
|
||||
import numpy as np
|
||||
|
||||
class ClockOut:
|
||||
"""Creates a drawable window in case the real hardware is not accessible. For development"""
|
||||
@@ -11,7 +11,7 @@ class ClockOut:
|
||||
self.pixel_size = 20
|
||||
|
||||
self.shape = shape
|
||||
self.pixels = numpy.zeros((*shape,3), dtype=int)
|
||||
self.pixels = np.zeros((*shape,3), dtype=int)
|
||||
self.WIDTH = shape[1]
|
||||
self.HEIGHT = shape[0]
|
||||
self.window_width = self.WIDTH * self.pixel_size
|
||||
@@ -22,13 +22,17 @@ class ClockOut:
|
||||
self.screen = pygame.display.set_mode([self.window_width, self.window_height])
|
||||
|
||||
|
||||
def put(self, matrix):
|
||||
def put(self, matrices):
|
||||
self.screen.fill((0, 0, 0))
|
||||
for event in pygame.event.get(): # User did something
|
||||
if event.type == pygame.QUIT:
|
||||
print("Exiting...")
|
||||
pygame.quit()
|
||||
sys.exit()
|
||||
|
||||
if self.shape == (16, 32):
|
||||
matrix = np.concatenate((matrices[0], matrices[1]), axis=1)
|
||||
|
||||
self.pixels = matrix
|
||||
self.draw_pixels()
|
||||
|
||||
|
@@ -1,6 +1,6 @@
|
||||
import colorsys
|
||||
import time
|
||||
import numpy
|
||||
import numpy as np
|
||||
try:
|
||||
import RPi.GPIO as GPIO
|
||||
SETUP_FAIL = False
|
||||
@@ -68,9 +68,10 @@ class ClockOut:
|
||||
GPIO.output(self.PIN_CLK, GPIO.LOW)
|
||||
|
||||
|
||||
def put(self, matrix):
|
||||
def put(self, matrices):
|
||||
"""Sets a height x width matrix directly"""
|
||||
self.reset_clock()
|
||||
matrix = np.concatenate((matrices[0], matrices[1]), axis=0) # or 1??
|
||||
self.show(matrix)
|
||||
|
||||
|
||||
|
@@ -1 +1 @@
|
||||
from . import helper, timer
|
||||
from . import computations, timer
|
@@ -17,19 +17,20 @@ days = np.append(np.zeros((15,16)), np.array([0,1,0,1,0,1,0,1,0,1,0,1,1,0,1,1]))
|
||||
|
||||
class MatrixOperations():
|
||||
"""Helper functions to generate frequently-used images"""
|
||||
def __init__(self, shape, default_colors):
|
||||
def __init__(self, shape=[16,16]):
|
||||
self.shape = shape
|
||||
# shape is going to be (16,32) for the moment
|
||||
self.primary = default_colors["primary"]
|
||||
self.secondary = default_colors["secondary"]
|
||||
self.error = default_colors["error"]
|
||||
self.primary = [200, 200, 200]
|
||||
self.secondary = [10, 200, 10]
|
||||
self.error = [200, 10, 10]
|
||||
|
||||
|
||||
def time_converter(self, top="", bottom=""):
|
||||
"""Converts 4-digit time to a pixel-matrix
|
||||
returns: np.array wich fills one half of self.shape (horizontally)"""
|
||||
nshape = (self.shape[0], int(self.shape[1]/2))
|
||||
pixels = np.zeros(nshape,dtype=np.uint8)
|
||||
"""Converts 4-digit time to a 16x16 pixel-matrix
|
||||
returns: np.array"""
|
||||
# nshape = (self.shape[0], int(self.shape[1]/2))
|
||||
nshape = (16, 16)
|
||||
pixels = np.zeros(nshape, dtype=np.uint8)
|
||||
|
||||
if bottom == "" or top == "":
|
||||
top = datetime.datetime.now().strftime("%H")
|
||||
@@ -62,7 +63,8 @@ class MatrixOperations():
|
||||
|
||||
|
||||
def date_converter(self):
|
||||
nshape = (self.shape[0], int(self.shape[1]/2))
|
||||
# nshape = (self.shape[0], int(self.shape[1]/2))
|
||||
nshape = (16, 16)
|
||||
today = datetime.datetime.today()
|
||||
weekday = datetime.datetime.weekday(today)
|
||||
# size of the reduced array according to weekday
|
||||
@@ -77,7 +79,8 @@ class MatrixOperations():
|
||||
|
||||
def weather_converter(self, name):
|
||||
"""Fills one half of the screen with weather info."""
|
||||
nshape = (self.shape[0], int(self.shape[1]/2))
|
||||
# nshape = (self.shape[0], int(self.shape[1]/2))
|
||||
nshape = (16, 16)
|
||||
result = np.zeros(nshape)
|
||||
cwd = __file__.replace("\\","/") # for windows
|
||||
cwd = cwd.rsplit("/", 1)[0] # the current working directory (where this file is)
|
||||
@@ -90,6 +93,8 @@ class MatrixOperations():
|
||||
|
||||
icon_loc = ["sun","moon","sun and clouds", "moon and clouds", "cloud","fog and clouds","2 clouds", "3 clouds", "rain and cloud", "rain and clouds", "rain and cloud and sun", "rain and cloud and moon", "thunder and cloud", "thunder and cloud and moon", "snow and cloud", "snow and cloud and moon", "fog","fog night"]
|
||||
#ordered 1 2 \n 3 4 \ 5 5 ...
|
||||
if name == "":
|
||||
name = "error"
|
||||
name = weather_categories[name]
|
||||
try:
|
||||
iy, ix = int(icon_loc.index(name)/2), icon_loc.index(name)%2
|
||||
@@ -137,21 +142,16 @@ class MatrixOperations():
|
||||
hour = self.time_converter()
|
||||
day = self.date_converter()
|
||||
face1 = hour + day
|
||||
# time + date:
|
||||
face1_3d = self.matrix_add_depth(face1)
|
||||
|
||||
if weather["show"] == "weather":
|
||||
face2_3d = self.weather_converter(weather["weather"])
|
||||
else:
|
||||
face2 = self.time_converter(top=str(weather["low"]), bottom=str(weather["high"]))
|
||||
face2 = np.concatenate((face2[:8,...],2*face2[8:,...]))
|
||||
face2_3d = self.matrix_add_depth(face2,[[0, 102, 255],[255, 102, 0]])
|
||||
|
||||
face = np.zeros((max(face1_3d.shape[0],face2_3d.shape[0]),face1_3d.shape[1]+face2_3d.shape[1],3))
|
||||
|
||||
face[:face1_3d.shape[0],:face1_3d.shape[1],...] = face1_3d
|
||||
face[:face2_3d.shape[0],face1_3d.shape[1]:,...] = face2_3d
|
||||
# weather icons
|
||||
face2_3d = self.weather_converter(weather["weather"])
|
||||
# weather temps
|
||||
face3 = self.time_converter(top=str(weather["low"]), bottom=str(weather["high"]))
|
||||
face3 = np.concatenate((face3[:8,...],2*face3[8:,...]))
|
||||
face3_3d = self.matrix_add_depth(face3,[[0, 102, 255],[255, 102, 0]])
|
||||
|
||||
return face
|
||||
return [face1_3d, face2_3d, face3_3d]
|
||||
|
||||
|
||||
def text_converter(self, text, height, color):
|
||||
@@ -166,3 +166,13 @@ class MatrixOperations():
|
||||
pixels = np.array(img, dtype=np.uint8)
|
||||
pixels3d = self.matrix_add_depth(pixels, color)
|
||||
return pixels3d
|
||||
|
||||
|
||||
def get_fallback(self):
|
||||
hour = self.time_converter()
|
||||
day = self.date_converter()
|
||||
face1 = hour + day
|
||||
face1_3d = self.matrix_add_depth(face1)
|
||||
|
||||
face2_3d = face3_3d = np.zeros((16,16,3))
|
||||
return [face1_3d, face2_3d, face3_3d]
|
@@ -26,4 +26,5 @@ class RepeatedTimer(object):
|
||||
|
||||
def stop(self):
|
||||
self._timer.cancel()
|
||||
self.is_running = False
|
||||
self.is_running = False
|
||||
|
Reference in New Issue
Block a user