Last active
March 13, 2023 20:53
-
-
Save nnewman/22ebad2588504a973d17d4a8f8607e19 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import asyncio | |
| import json | |
| import logging | |
| import logging.handlers | |
| import os | |
| import time | |
| from contextlib import asynccontextmanager | |
| from signal import pause | |
| from typing import AsyncContextManager, Tuple | |
| from gpiozero import Button, event, LED, PolledInternalDevice | |
| from pysnoo import ActivityState, SnooAuthSession, Snoo, SnooPubNub, SessionLevel | |
| WORKDIR = os.path.dirname(__file__) | |
| CREDENTIALS_FILE = os.path.join(WORKDIR, '.snoo_credentials.json') | |
| TOKEN_FILE = os.path.join(WORKDIR, '.snoo_token.txt') | |
| HUNDRED_KB_IN_BYTES = 1024 * 100 | |
| FIVE_MINUTES_IN_SECONDS = 300 | |
| UP_BUTTON = Button(2) | |
| DOWN_BUTTON = Button(3) | |
| LOCK_BUTTON = Button(4) | |
| TOGGLE_BUTTON = Button(17) | |
| LOCK_LED = LED(27) | |
| TOKEN_UPDATE_MUTEX = asyncio.Lock() | |
| LOCK_STATE_ON = False | |
| with open(CREDENTIALS_FILE, 'r') as fp: | |
| credential_data = json.load(fp) | |
| USERNAME, PASSWORD = credential_data['username'], credential_data['password'] | |
| logger = logging.getLogger(__name__) | |
| log_path = os.path.join(WORKDIR, 'logs', 'snoo_listener.log') | |
| handler = logging.handlers.RotatingFileHandler(log_path, maxBytes=HUNDRED_KB_IN_BYTES) | |
| formatter = logging.Formatter("%(asctime)s;%(levelname)s;%(message)s") | |
| handler.setFormatter(formatter) | |
| logger.addHandler(handler) | |
| logger.setLevel(logging.INFO) | |
| class PeriodicEvent(PolledInternalDevice): | |
| """Fake device to fire an event periodically""" | |
| def __init__(self, period, event_delay=1.0, pin_factory=None): | |
| self._period = period | |
| super(PeriodicEvent, self).__init__(event_delay=event_delay, pin_factory=pin_factory) | |
| self._fire_events(self.pin_factory.ticks(), self.is_active) | |
| @property | |
| def value(self): | |
| return int(time.time()) % self._period == 0 | |
| when_activated = event() | |
| when_deactivated = event() | |
| FIVE_MINUTE_PERIOD = PeriodicEvent(FIVE_MINUTES_IN_SECONDS) | |
| def _token_updater(token): | |
| with open(TOKEN_FILE, 'w') as outfile: | |
| json.dump(token, outfile) | |
| def _set_lock_led(state: bool): | |
| if state: | |
| LOCK_LED.on() | |
| else: | |
| LOCK_LED.off() | |
| async def _get_history(pubnub: SnooPubNub) -> ActivityState: | |
| history = await pubnub.history() | |
| return history[0] | |
| @asynccontextmanager | |
| async def _get_pubnub() -> AsyncContextManager[Tuple[SnooPubNub, ActivityState]]: | |
| with open(TOKEN_FILE) as fp: | |
| token = json.load(fp) | |
| async with SnooAuthSession(token, _token_updater) as auth: | |
| if not auth.authorized: | |
| # Init Auth | |
| with TOKEN_UPDATE_MUTEX: | |
| new_token = await auth.fetch_token(USERNAME, PASSWORD) | |
| _token_updater(new_token) | |
| # Snoo API Interface | |
| snoo = Snoo(auth) | |
| devices = await snoo.get_devices() | |
| if not devices: | |
| # No devices | |
| print('There is no Snoo connected to that account!') | |
| return | |
| # Snoo PubNub Interface | |
| pubnub = SnooPubNub( | |
| auth.access_token, | |
| devices[0].serial_number, | |
| f'pn-pysnoo-{devices[0].serial_number}' | |
| ) | |
| last_activity_state = await _get_history(pubnub) | |
| try: | |
| yield pubnub, last_activity_state | |
| finally: | |
| await pubnub.stop() | |
| def _get_lock_status(last_activity_state: ActivityState) -> bool: | |
| return last_activity_state.state_machine.hold | |
| async def toggle(): | |
| async with _get_pubnub() as (pubnub, last_activity_state): | |
| if last_activity_state.state_machine.state == SessionLevel.ONLINE: | |
| await pubnub.publish_start() | |
| logger.info('Started') | |
| else: | |
| await pubnub.publish_goto_state(SessionLevel.ONLINE) | |
| logger.info('Stopped') | |
| _set_lock_led(_get_lock_status(last_activity_state)) | |
| async def up(): | |
| async with _get_pubnub() as (pubnub, last_activity_state): | |
| up_transition = last_activity_state.state_machine.up_transition | |
| if up_transition.is_active_level(): | |
| # Toggle | |
| await pubnub.publish_goto_state(up_transition) | |
| logger.info('Level decreased') | |
| else: | |
| logger.warning('Tried to increase level. No valid up-transition available!') | |
| _set_lock_led(_get_lock_status(last_activity_state)) | |
| async def down(): | |
| async with _get_pubnub() as (pubnub, last_activity_state): | |
| down_transition = last_activity_state.state_machine.down_transition | |
| if down_transition.is_active_level(): | |
| # Toggle | |
| await pubnub.publish_goto_state(down_transition) | |
| logger.info('Level decreased') | |
| else: | |
| logger.warning('Tried to decrease level. No valid down-transition available!') | |
| _set_lock_led(_get_lock_status(last_activity_state)) | |
| async def lock(): | |
| async with _get_pubnub() as (pubnub, last_activity_state): | |
| current_state = last_activity_state.state_machine.state | |
| current_hold = _get_lock_status(last_activity_state) | |
| new_hold = not current_hold | |
| if current_state.is_active_level(): | |
| # Toggle | |
| await pubnub.publish_goto_state(current_state, new_hold) | |
| logger.info('Level lock toggled') | |
| _set_lock_led(new_hold) | |
| else: | |
| logger.warning('Cannot toggle hold when Snoo is not running!') | |
| async def get_set_lock_status(): | |
| async with _get_pubnub() as (_, last_activity_state): | |
| _set_lock_led(_get_lock_status(last_activity_state)) | |
| UP_BUTTON.when_pressed = lambda: asyncio.run(up()) | |
| DOWN_BUTTON.when_pressed = lambda: asyncio.run(down()) | |
| LOCK_BUTTON.when_pressed = lambda: asyncio.run(lock()) | |
| TOGGLE_BUTTON.when_pressed = lambda: asyncio.run(toggle()) | |
| FIVE_MINUTE_PERIOD.when_activated = lambda: asyncio.run(get_set_lock_status()) | |
| pause() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment