diff --git a/inky_helper.py b/inky_helper.py new file mode 100644 index 0000000000000000000000000000000000000000..83651ff19a5ba09207f2f51987cdca37483a57f5 --- /dev/null +++ b/inky_helper.py @@ -0,0 +1,188 @@ +from pimoroni_i2c import PimoroniI2C +from pcf85063a import PCF85063A +import math +from machine import Pin, PWM, Timer +import time +import inky_frame +import json +import network +import os + +# Pin setup for VSYS_HOLD needed to sleep and wake. +HOLD_VSYS_EN_PIN = 2 +hold_vsys_en_pin = Pin(HOLD_VSYS_EN_PIN, Pin.OUT) + +# intialise the pcf85063a real time clock chip +I2C_SDA_PIN = 4 +I2C_SCL_PIN = 5 +i2c = PimoroniI2C(I2C_SDA_PIN, I2C_SCL_PIN, 100000) +rtc = PCF85063A(i2c) + +led_warn = Pin(6, Pin.OUT) + +# set up for the network LED +network_led_pwm = PWM(Pin(7)) +network_led_pwm.freq(1000) +network_led_pwm.duty_u16(0) + + +# set the brightness of the network led +def network_led(brightness): + brightness = max(0, min(100, brightness)) # clamp to range + # gamma correct the brightness (gamma 2.8) + value = int(pow(brightness / 100.0, 2.8) * 65535.0 + 0.5) + network_led_pwm.duty_u16(value) + + +network_led_timer = Timer(-1) +network_led_pulse_speed_hz = 1 + + +def network_led_callback(t): + # updates the network led brightness based on a sinusoid seeded by the current time + brightness = (math.sin(time.ticks_ms() * math.pi * 2 / (1000 / network_led_pulse_speed_hz)) * 40) + 60 + value = int(pow(brightness / 100.0, 2.8) * 65535.0 + 0.5) + network_led_pwm.duty_u16(value) + + +# set the network led into pulsing mode +def pulse_network_led(speed_hz=1): + global network_led_timer, network_led_pulse_speed_hz + network_led_pulse_speed_hz = speed_hz + network_led_timer.deinit() + network_led_timer.init(period=50, mode=Timer.PERIODIC, callback=network_led_callback) + + +# turn off the network led and disable any pulsing animation that's running +def stop_network_led(): + global network_led_timer + network_led_timer.deinit() + network_led_pwm.duty_u16(0) + + +def sleep(t): + # Time to have a little nap until the next update + rtc.clear_timer_flag() + rtc.set_timer(t, ttp=rtc.TIMER_TICK_1_OVER_60HZ) + rtc.enable_timer_interrupt(True) + + # Set the HOLD VSYS pin to an input + # this allows the device to go into sleep mode when on battery power. + hold_vsys_en_pin.init(Pin.IN) + + # Regular time.sleep for those powering from USB + time.sleep(60 * t) + + +# Turns off the button LEDs +def clear_button_leds(): + inky_frame.button_a.led_off() + inky_frame.button_b.led_off() + inky_frame.button_c.led_off() + inky_frame.button_d.led_off() + inky_frame.button_e.led_off() + + +def network_connect(SSID, PSK): + # Enable the Wireless + wlan = network.WLAN(network.STA_IF) + wlan.active(True) + + # Number of attempts to make before timeout + max_wait = 10 + + # Sets the Wireless LED pulsing and attempts to connect to your local network. + pulse_network_led() + wlan.config(pm=0xa11140) # Turn WiFi power saving off for some slow APs + wlan.connect(SSID, PSK) + + while max_wait > 0: + if wlan.status() < 0 or wlan.status() >= 3: + break + max_wait -= 1 + print('waiting for connection...') + time.sleep(1) + + stop_network_led() + network_led_pwm.duty_u16(30000) + + # Handle connection error. Switches the Warn LED on. + if wlan.status() != 3: + stop_network_led() + led_warn.on() + +def network_connect_better(SSID, PSK): + # Enable the Wireless + wlan = network.WLAN(network.STA_IF) + wlan.active(True) + + ssids = wlan.scan() + for ssid in ssids: + if ssid[0].decode('UTF-8') == SSID: + # Number of attempts to make before timeout + max_wait = 10 + + # Sets the Wireless LED pulsing and attempts to connect to your local network. + pulse_network_led() + wlan.config(pm=0xa11140) # Turn WiFi power saving off for some slow APs + wlan.connect(SSID, PSK) + + while max_wait > 0: + if wlan.status() < 0 or wlan.status() >= 3: + break + max_wait -= 1 + print('waiting for connection...') + time.sleep(1) + + stop_network_led() + network_led_pwm.duty_u16(30000) + + # Handle connection error. Switches the Warn LED on. + if wlan.status() != 3: + stop_network_led() + led_warn.on() + + return True + raise RuntimeError("Can't find SSID") + + +state = {"run": None} +app = None + + +def file_exists(filename): + try: + return (os.stat(filename)[0] & 0x4000) == 0 + except OSError: + return False + + +def clear_state(): + if file_exists("state.json"): + os.remove("state.json") + + +def save_state(data): + with open("/state.json", "w") as f: + f.write(json.dumps(data)) + f.flush() + + +def load_state(): + global state + data = json.loads(open("/state.json", "r").read()) + if type(data) is dict: + state = data + + +def update_state(running): + global state + state['run'] = running + save_state(state) + + +def launch_app(app_name): + global app + app = __import__(app_name) + print(app) + update_state(app_name) diff --git a/main.py b/main.py index d8e737c08a89c24ea2104039c505b7fe30e0a229..34d08eacc34a075ac4d4c63330aa20c3173d4be0 100644 --- a/main.py +++ b/main.py @@ -1,51 +1,63 @@ import uNextcloud +import sdcard, uos import inky_helper as ih +import inky_frame import gc -import sdcard, uos import random -from picographics import PicoGraphics, DISPLAY_INKY_FRAME_7 as DISPLAY -from jpegdec import JPEG -from secrets import WIFI_SSID, WIFI_PASSWORD, NEXTCLOUD_USERNAME, NEXTCLOUD_PASSWORD -def get_new_image(): - sd_spi = machine.SPI(0, sck=machine.Pin(18, machine.Pin.OUT), mosi=machine.Pin(19, machine.Pin.OUT), - miso=machine.Pin(16, machine.Pin.OUT)) - sd = sdcard.SDCard(sd_spi, machine.Pin(22)) - uos.mount(sd, "/sd") - gc.collect() - - ih.network_connect(WIFI_SSID, WIFI_PASSWORD) +sd_spi = machine.SPI(0, sck=machine.Pin(18, machine.Pin.OUT), mosi=machine.Pin(19, machine.Pin.OUT), miso=machine.Pin(16, machine.Pin.OUT)) +sd = sdcard.SDCard(sd_spi, machine.Pin(22)) +uos.mount(sd, "/sd") +gc.collect() +from secrets import WIFI_SSID, WIFI_PASSWORD, NEXTCLOUD_USERNAME, NEXTCLOUD_PASSWORD +try: + ih.network_connect_better(WIFI_SSID, WIFI_PASSWORD) +except RuntimeError as e: + print(e) + with open('/sd/errors.log', 'a+') as error_log: + error_log.write(f'{e} \n') + #Likely a network oops, try starting again + machine.soft_reset() + +try: nc = uNextcloud.uNextcloud("https://cloud.eyecreate.org") nc.set_auth(NEXTCLOUD_USERNAME, NEXTCLOUD_PASSWORD) response = nc.get_folder_items("eink-frame") - - try: - last = open('/sd/last_wallpaper', 'r') - last_file = last.read() - last.close() - except OSError: - last_file = "" - - if response != None: - for item in response: - if item.url_path == last_file: - response.remove(item) - ran = random.choice(response) - if ran.mimetype == "image/jpeg": - update_last = open('/sd/last_wallpaper', 'w+') +except OSError as e: + print(e) + with open('/sd/errors.log', 'a+') as error_log: + error_log.write(f'{e} \n') + #Likely a network oops, try starting again + machine.soft_reset() + +try: + last = open('/sd/last_wallpaper', 'r') + last_file = last.read() + last.close() +except OSError: + last_file = "" + +if response != None: + for item in response: + if item.url_path == last_file: + response.remove(item) + ran = random.choice(response) + if ran.mimetype == "image/jpeg": + with open('/sd/last_wallpaper', 'w+') as update_last: update_last.write(ran.url_path) - update_last.close() - print(ran.url_path) - gc.collect() - nc.download_file_to_path(ran.url_path, "/sd/current_image.jpg") - graphics = PicoGraphics(DISPLAY) - j = JPEG(graphics) - - j.open_file("/sd/current_image.jpg") - j.decode() - graphics.update() - #sleep? - - + print(ran.url_path) + gc.collect() + nc.download_file_to_path(ran.url_path, "/sd/current_image.jpg") + gc.collect() + from picographics import PicoGraphics, DISPLAY_INKY_FRAME_7 as DISPLAY + from jpegdec import JPEG + graphics = PicoGraphics(DISPLAY) + j = JPEG(graphics) + + j.open_file("/sd/current_image.jpg") + j.decode() + graphics.update() + +inky_frame.sleep_for(180) diff --git a/uNextcloud.py b/uNextcloud.py index 7bd213a87c31500669c127dfda1eeb741ca83d34..1b52b67d1f6cef3a582c90a779b66cb62c6e8596 100644 --- a/uNextcloud.py +++ b/uNextcloud.py @@ -1,7 +1,6 @@ -import ElementTree import urequests -from urequests import auth -import gc +import gc, sys +import ElementTree class uNextcloud: @@ -28,7 +27,7 @@ class uNextcloud: self.password = password def get_folder_items(self, folder_path=""): - response = urequests.request("PROPFIND", self.url+"/remote.php/dav/files/"+self.username+"/"+folder_path, auth=urequests.auth.HTTPBasicAuth(self.username, self.password)) + response = urequests.request("PROPFIND", self.url+"/remote.php/dav/files/"+self.username+"/"+folder_path, auth=(self.username, self.password)) if 200 <= response.status_code < 300: file_list = [] try: @@ -53,7 +52,7 @@ class uNextcloud: return None def download_file_to_path(self, url_path, destination_path): - response = urequests.request("GET", self.url+url_path, auth=urequests.auth.HTTPBasicAuth(self.username, self.password)) + response = urequests.request("GET", self.url+url_path, auth=(self.username, self.password)) if 200 <= response.status_code < 300: dest = open(destination_path, 'wb') data = bytearray(1024) @@ -65,7 +64,6 @@ class uNextcloud: break len += 1024 dest.write(data) - print(gc.mem_free()) dest.close() response.close() gc.collect() diff --git a/urequests/__init__.py b/urequests/__init__.py deleted file mode 100644 index 237c4c70c3f186e2caf8d10692b2608ba4bf520c..0000000000000000000000000000000000000000 --- a/urequests/__init__.py +++ /dev/null @@ -1,169 +0,0 @@ -# (c) 2016-2021 Paul Sokolovsky, MIT license, https://github.com/pfalcon/pycopy-lib -import usocket - -class Request: - pass - - -class Response: - - def __init__(self, f): - self.raw = f - self.encoding = "utf-8" - self._cached = None - - def close(self): - if self.raw: - self.raw.close() - self.raw = None - self._cached = None - - @property - def content(self): - if self._cached is None: - try: - self._cached = self.raw.read() - finally: - self.raw.close() - self.raw = None - return self._cached - - @property - def text(self): - return str(self.content, self.encoding) - - def json(self): - import ujson - return ujson.loads(self.content) - - -def request(method, url, data=None, json=None, headers={}, auth=None, stream=None, parse_headers=True): - redir_cnt = 1 - if json is not None: - assert data is None - import ujson - data = ujson.dumps(json) - - while True: - try: - proto, dummy, host, path = url.split("/", 3) - except ValueError: - proto, dummy, host = url.split("/", 2) - path = "" - if proto == "http:": - port = 80 - elif proto == "https:": - import ussl - port = 443 - else: - raise ValueError("Unsupported protocol: " + proto) - - if ":" in host: - host, port = host.split(":", 1) - port = int(port) - - if auth is not None: - req = Request() - req.method = method - req.url = url - if not headers: - # Fresh local dict, not a copy of anything. - headers = {} - req.headers = headers - req = auth(req) - headers = req.headers - - ai = usocket.getaddrinfo(host, port, 0, usocket.SOCK_STREAM) - ai = ai[0] - - resp_d = None - if parse_headers is not False: - resp_d = {} - - s = usocket.socket(ai[0], ai[1], ai[2]) - try: - s.connect(ai[-1]) - if proto == "https:": - s = ussl.wrap_socket(s, server_hostname=host) - s.write(b"%s /%s HTTP/1.0\r\n" % (method, path)) - if not "Host" in headers: - s.write(b"Host: %s\r\n" % host) - # Iterate over keys to avoid tuple alloc - for k in headers: - s.write(k) - s.write(b": ") - s.write(headers[k]) - s.write(b"\r\n") - if json is not None: - s.write(b"Content-Type: application/json\r\n") - if data: - s.write(b"Content-Length: %d\r\n" % len(data)) - s.write(b"Connection: close\r\n\r\n") - if data: - s.write(data) - - l = s.readline() - #print(l) - l = l.split(None, 2) - status = int(l[1]) - reason = "" - if len(l) > 2: - reason = l[2].rstrip() - while True: - l = s.readline() - if not l or l == b"\r\n": - break - #print(l) - - if l.startswith(b"Transfer-Encoding:"): - if b"chunked" in l: - raise ValueError("Unsupported " + l.decode()) - elif l.startswith(b"Location:") and 300 <= status <= 399: - if not redir_cnt: - raise ValueError("Too many redirects") - redir_cnt -= 1 - url = l[9:].decode().strip() - #print("redir to:", url) - status = 300 - break - - if parse_headers is False: - pass - elif parse_headers is True: - l = l.decode() - k, v = l.split(":", 1) - resp_d[k] = v.strip() - else: - parse_headers(l, resp_d) - except OSError: - s.close() - raise - - if status != 300: - break - - resp = Response(s) - resp.status_code = status - resp.reason = reason - if resp_d is not None: - resp.headers = resp_d - return resp - - -def head(url, **kw): - return request("HEAD", url, **kw) - -def get(url, **kw): - return request("GET", url, **kw) - -def post(url, **kw): - return request("POST", url, **kw) - -def put(url, **kw): - return request("PUT", url, **kw) - -def patch(url, **kw): - return request("PATCH", url, **kw) - -def delete(url, **kw): - return request("DELETE", url, **kw) diff --git a/urequests/auth.py b/urequests/auth.py deleted file mode 100644 index 4cbb29d0da5339f14b0dff24c0bd0f323b813c76..0000000000000000000000000000000000000000 --- a/urequests/auth.py +++ /dev/null @@ -1,11 +0,0 @@ -# (c) 2021 Paul Sokolovsky, MIT license, https://github.com/pfalcon/pycopy-lib -import uwwwauth - - -class HTTPBasicAuth: - def __init__(self, user, passwd): - self.auth = uwwwauth.basic_resp(user, passwd) - - def __call__(self, r): - r.headers["Authorization"] = self.auth - return r \ No newline at end of file diff --git a/uwwwauth/__init__.py b/uwwwauth/__init__.py deleted file mode 100644 index c88685ca93a33f6836429e049f6547ad73405bd2..0000000000000000000000000000000000000000 --- a/uwwwauth/__init__.py +++ /dev/null @@ -1,108 +0,0 @@ -# RFC2617, WWW-Authenticate: Basic/Digest module -# (c) 2018 Paul Sokolovsky, MIT license -import uhashlib -import ubinascii - - -# Private functions - do not use, will change - -def md5_concat(arg1, arg2, arg3): - h = uhashlib.md5(arg1) - h.update(b":") - h.update(arg2) - if arg3 is not None: - h.update(b":") - h.update(arg3) - return ubinascii.hexlify(h.digest()).decode() - - -def make_digest_ha1(a1, method, uri, nonce): - a2 = md5_concat(method, uri, None) - digest = md5_concat(a1, nonce, a2) - return digest - - -def make_digest(realm, username, passwd, method, uri, nonce): - a1 = md5_concat(username, realm, passwd) - return make_digest_ha1(a1, method, uri, nonce) - - -def parse_auth_req(line): - typ, line = line.split(None, 1) - d = {"type": typ} - for kv in line.split(","): - k, v = kv.split("=", 1) - assert v[0] == '"' and v[-1] == '"' - d[k.strip()] = v[1:-1] - return d - - -def format_resp(resp_d): - fields = [] - for k, v in resp_d.items(): - if k in ("type", "passwd"): - continue - fields.append('%s="%s"' % (k, v)) - resp_auth = ", ".join(fields) - - resp_auth = "Digest " + resp_auth - return resp_auth - - -def _digest_resp(auth_d, username, passwd, method, URL): - #print(auth_d) - - resp_d = {} - resp_d["username"] = username - resp_d["uri"] = URL - resp_d["realm"] = auth_d["realm"] - resp_d["nonce"] = auth_d["nonce"] - - digest = make_digest(auth_d["realm"], username, passwd, method, URL, auth_d["nonce"]) - resp_d["response"] = digest - #print(resp_d) - - return format_resp(resp_d) - - -# Helper functions - may change - -def basic_resp(username, passwd): - return "Basic " + ubinascii.b2a_base64("%s:%s" % (username, passwd))[:-1].decode() - - -def auth_resp(auth_line, username, passwd, method=None, URL=None): - auth_d = parse_auth_req(auth_line) - if auth_d["type"] == "Basic": - return basic_resp(username, passwd) - elif auth_d["type"] == "Digest": - assert method and URL - return _digest_resp(auth_d, username, passwd, method, URL) - else: - raise ValueError(auth_d["type"]) - - -# Public interface - -class WWWAuth: - - def __init__(self, username, passwd): - self.username = username - self.passwd = passwd - self.cached_auth_line = None - - def resp(self, auth_line, method, URL): - if auth_line.startswith("Basic"): - return basic_resp(self.username, self.passwd) - elif auth_line.startswith("Digest"): - auth_d = parse_auth_req(auth_line) - if auth_line != self.cached_auth_line: - self.ha1 = md5_concat(self.username, auth_d["realm"], self.passwd) - self.cached_auth_line = auth_line - digest = make_digest_ha1(self.ha1, method, URL, auth_d["nonce"]) - auth_d["username"] = self.username - auth_d["uri"] = URL - auth_d["response"] = digest - return format_resp(auth_d) - else: - raise ValueError("Unsupported auth: " + auth_line)