Skip to content

Instantly share code, notes, and snippets.

@nnewman
Last active March 13, 2023 20:53
Show Gist options
  • Select an option

  • Save nnewman/22ebad2588504a973d17d4a8f8607e19 to your computer and use it in GitHub Desktop.

Select an option

Save nnewman/22ebad2588504a973d17d4a8f8607e19 to your computer and use it in GitHub Desktop.
import asyncio
import json
import logging
import logging.handlers
import os
import time
from contextlib import asynccontextmanager
from signal import pause
from typing import AsyncContextManager, Tuple
from gpiozero import Button, event, LED, PolledInternalDevice
from pysnoo import ActivityState, SnooAuthSession, Snoo, SnooPubNub, SessionLevel
WORKDIR = os.path.dirname(__file__)
CREDENTIALS_FILE = os.path.join(WORKDIR, '.snoo_credentials.json')
TOKEN_FILE = os.path.join(WORKDIR, '.snoo_token.txt')
HUNDRED_KB_IN_BYTES = 1024 * 100
FIVE_MINUTES_IN_SECONDS = 300
UP_BUTTON = Button(2)
DOWN_BUTTON = Button(3)
LOCK_BUTTON = Button(4)
TOGGLE_BUTTON = Button(17)
LOCK_LED = LED(27)
TOKEN_UPDATE_MUTEX = asyncio.Lock()
LOCK_STATE_ON = False
with open(CREDENTIALS_FILE, 'r') as fp:
credential_data = json.load(fp)
USERNAME, PASSWORD = credential_data['username'], credential_data['password']
logger = logging.getLogger(__name__)
log_path = os.path.join(WORKDIR, 'logs', 'snoo_listener.log')
handler = logging.handlers.RotatingFileHandler(log_path, maxBytes=HUNDRED_KB_IN_BYTES)
formatter = logging.Formatter("%(asctime)s;%(levelname)s;%(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
class PeriodicEvent(PolledInternalDevice):
"""Fake device to fire an event periodically"""
def __init__(self, period, event_delay=1.0, pin_factory=None):
self._period = period
super(PeriodicEvent, self).__init__(event_delay=event_delay, pin_factory=pin_factory)
self._fire_events(self.pin_factory.ticks(), self.is_active)
@property
def value(self):
return int(time.time()) % self._period == 0
when_activated = event()
when_deactivated = event()
FIVE_MINUTE_PERIOD = PeriodicEvent(FIVE_MINUTES_IN_SECONDS)
def _token_updater(token):
with open(TOKEN_FILE, 'w') as outfile:
json.dump(token, outfile)
def _set_lock_led(state: bool):
if state:
LOCK_LED.on()
else:
LOCK_LED.off()
async def _get_history(pubnub: SnooPubNub) -> ActivityState:
history = await pubnub.history()
return history[0]
@asynccontextmanager
async def _get_pubnub() -> AsyncContextManager[Tuple[SnooPubNub, ActivityState]]:
with open(TOKEN_FILE) as fp:
token = json.load(fp)
async with SnooAuthSession(token, _token_updater) as auth:
if not auth.authorized:
# Init Auth
with TOKEN_UPDATE_MUTEX:
new_token = await auth.fetch_token(USERNAME, PASSWORD)
_token_updater(new_token)
# Snoo API Interface
snoo = Snoo(auth)
devices = await snoo.get_devices()
if not devices:
# No devices
print('There is no Snoo connected to that account!')
return
# Snoo PubNub Interface
pubnub = SnooPubNub(
auth.access_token,
devices[0].serial_number,
f'pn-pysnoo-{devices[0].serial_number}'
)
last_activity_state = await _get_history(pubnub)
try:
yield pubnub, last_activity_state
finally:
await pubnub.stop()
def _get_lock_status(last_activity_state: ActivityState) -> bool:
return last_activity_state.state_machine.hold
async def toggle():
async with _get_pubnub() as (pubnub, last_activity_state):
if last_activity_state.state_machine.state == SessionLevel.ONLINE:
await pubnub.publish_start()
logger.info('Started')
else:
await pubnub.publish_goto_state(SessionLevel.ONLINE)
logger.info('Stopped')
_set_lock_led(_get_lock_status(last_activity_state))
async def up():
async with _get_pubnub() as (pubnub, last_activity_state):
up_transition = last_activity_state.state_machine.up_transition
if up_transition.is_active_level():
# Toggle
await pubnub.publish_goto_state(up_transition)
logger.info('Level decreased')
else:
logger.warning('Tried to increase level. No valid up-transition available!')
_set_lock_led(_get_lock_status(last_activity_state))
async def down():
async with _get_pubnub() as (pubnub, last_activity_state):
down_transition = last_activity_state.state_machine.down_transition
if down_transition.is_active_level():
# Toggle
await pubnub.publish_goto_state(down_transition)
logger.info('Level decreased')
else:
logger.warning('Tried to decrease level. No valid down-transition available!')
_set_lock_led(_get_lock_status(last_activity_state))
async def lock():
async with _get_pubnub() as (pubnub, last_activity_state):
current_state = last_activity_state.state_machine.state
current_hold = _get_lock_status(last_activity_state)
new_hold = not current_hold
if current_state.is_active_level():
# Toggle
await pubnub.publish_goto_state(current_state, new_hold)
logger.info('Level lock toggled')
_set_lock_led(new_hold)
else:
logger.warning('Cannot toggle hold when Snoo is not running!')
async def get_set_lock_status():
async with _get_pubnub() as (_, last_activity_state):
_set_lock_led(_get_lock_status(last_activity_state))
UP_BUTTON.when_pressed = lambda: asyncio.run(up())
DOWN_BUTTON.when_pressed = lambda: asyncio.run(down())
LOCK_BUTTON.when_pressed = lambda: asyncio.run(lock())
TOGGLE_BUTTON.when_pressed = lambda: asyncio.run(toggle())
FIVE_MINUTE_PERIOD.when_activated = lambda: asyncio.run(get_set_lock_status())
pause()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment