Skip to content

Instantly share code, notes, and snippets.

@lawrence910426
Created October 15, 2025 14:40
Show Gist options
  • Select an option

  • Save lawrence910426/3fc0576899d70655fe2111b6a16c6127 to your computer and use it in GitHub Desktop.

Select an option

Save lawrence910426/3fc0576899d70655fe2111b6a16c6127 to your computer and use it in GitHub Desktop.
import time
import logging
import math
import os
import contextlib
import concurrent
import threading
import re
from types import SimpleNamespace
from decimal import *
from notify import Notify
from decimal import Decimal
import bisect
from fubon_neo.sdk import FubonSDK, Order
from fubon_neo.constant import TimeInForce, OrderType, PriceType, MarketType, BSAction
class StockHelper:
def __init__(self):
# Generate stock_ticks when the class is initialized
self.stock_ticks = self.gen_list(0.01, 50, 0.01) + \
self.gen_list(50, 1000, 0.05)
@staticmethod
def gen_list(start, end, step):
exponent = 0
while step <= 1:
step = step * 10
exponent += 1
shift = 10 ** exponent
start, end, step = int(start * shift), int(end * shift), int(step)
return [Decimal(i) / Decimal(shift) for i in range(start, end, step)]
def find_greatest_not_greater(self, value):
"""
Find the greatest tick that is not greater than the given value.
Returns None if no such tick exists.
"""
value = Decimal(value)
index = bisect.bisect_right(self.stock_ticks, value) - 1
if index >= 0:
return self.stock_ticks[index]
else:
return None
def find_smallest_not_smaller(self, value):
"""
Find the smallest tick that is not smaller than the given value.
Returns None if no such tick exists.
"""
value = Decimal(value)
index = bisect.bisect_left(self.stock_ticks, value)
if index < len(self.stock_ticks):
return self.stock_ticks[index]
else:
return None
helper = StockHelper()
class Automata:
def __init__(self, sdk, account, parameters):
self.sdk = sdk
self.account = account
# Extract from parameters
self.parameters = parameters
self.stock_id = self.parameters['stock_id']
# Automata states
self.state = "INIT"
self.previous_state = "INIT"
# NAV data
self.TX_price = self.parameters['prev_close']
self.last_show_nav = time.time()
# ETF data
self.bid_price = None
self.ask_price = None
self.bid_volume = None
self.ask_volume = None
self.market_open = False
self.bid_orders, self.ask_orders = {}, {}
# Lock is for dealing with the concurrency issues about the following variables.
self.lock = threading.Lock()
self.net_position = self.parameters["remain_position"]
self.bid_exit_orders, self.ask_exit_orders = {}, {}
# Speed limiter
self.operations_per_second = 20
self.operations = []
def place_order(self, order):
now = time.time()
self.operations = [t for t in self.operations if now - t < 1.0]
if len(self.operations) >= self.operations_per_second:
return False, None
self.operations.append(now)
place_result = self.sdk.stock.place_order(self.account, order)
if not place_result.is_success:
logging.error(order)
raise Exception(place_result.message)
return True, place_result.data
def cancel_order(self, trade):
now = time.time()
self.operations = [t for t in self.operations if now - t < 1.0]
if len(self.operations) >= self.operations_per_second:
return False
self.operations.append(now)
place_result = self.sdk.stock.cancel_order(self.account, trade)
if not place_result.is_success:
raise Exception(place_result)
return True
def update_exit_orders(self, exit_orders, optimal_price, order_action):
# Never deletes orders at optimal price. So if you manually placed exit orders,
# you might end up with excess exit orders.
for price in list(exit_orders.keys()):
if price != optimal_price:
for order in list(exit_orders[price]):
try:
if self.cancel_order(order):
exit_orders[price].remove(order)
logging.info(f"[{self.stock_id}] Removed exit order on {price}")
except Exception as e:
logging.info(f"[{self.stock_id}] Cannot cancel exit order {order} due to {e}. Already dealt?")
if len(exit_orders[price]) == 0:
del exit_orders[price]
exit_orders_list = exit_orders.setdefault(optimal_price, [])
required_orders = abs(max(-30, min(self.net_position, 30))) - len(exit_orders_list)
order = Order(
buy_sell = order_action,
symbol = self.stock_id,
price = str(optimal_price),
quantity = 1000,
market_type = MarketType.Common,
price_type = PriceType.Limit,
time_in_force = TimeInForce.ROD,
order_type = OrderType.Stock if order_action == BSAction.Buy else OrderType.DayTrade,
)
for _ in range(required_orders):
success, trade = self.place_order(order)
if success:
exit_orders_list.append(trade)
logging.info(f"[{self.stock_id}] Placed exit order on {optimal_price}")
def nav(self):
pnl_per_lot = (float(self.TX_price) - float(self.parameters['prev_close'])) * 50
pnl = self.parameters['delta_in_mx'] * pnl_per_lot
nav_total = self.parameters['prev_nav'] + pnl
nav_per_share = nav_total / self.parameters['shares_outstanding']
return nav_per_share
def loop(self):
# Show NAV
now = time.time()
if self.last_show_nav + 15 < now:
logging.info(f"TX price = {self.TX_price}")
logging.info(f"[{self.stock_id}] Market price = {(self.bid_price, self.bid_volume, self.ask_price, self.ask_volume)}")
logging.info(f"[{self.stock_id}] NAV = {self.nav()}.")
self.last_show_nav = now
# Loop automata
if self.state == "INIT":
self.state = "WAIT_OPEN"
elif self.state == "WAIT_OPEN":
if self.market_open:
self.state = "OPEN"
else:
# keep ordered lists for layer indexing, plus sets for membership tests
required_bid_prices, required_ask_prices = [], []
if self.bid_price is not None and self.ask_price is not None:
# Determine the optimal bid and the 5 ticks immediately below it
optimal_bid = min(
self.bid_price,
helper.find_greatest_not_greater(self.nav() * (1 - self.parameters['maker_threshold']))
)
optimal_bid_idx = helper.stock_ticks.index(optimal_bid)
start_bid_idx = max(0, optimal_bid_idx - 4)
# bids: layer 0 = best/closest (optimal), then one tick lower, etc.
# slice is ascending; reverse so index 0 is optimal
required_bid_prices = list(helper.stock_ticks[start_bid_idx : optimal_bid_idx + 1])[::-1]
# Determine the optimal ask and the 5 ticks immediately above it
optimal_ask = max(
self.ask_price,
helper.find_smallest_not_smaller(self.nav() * (1 + self.parameters['maker_threshold']))
)
optimal_ask_idx = helper.stock_ticks.index(optimal_ask)
end_ask_idx = min(len(helper.stock_ticks), optimal_ask_idx + 5)
# asks: slice is ascending; keep as-is so index 0 is optimal
required_ask_prices = list(helper.stock_ticks[optimal_ask_idx : end_ask_idx])
def reshape_side(side_orders, required_side_prices, side):
# Cancel orders
for p in list(side_orders.keys()):
require_cancel = False
if p not in set(required_side_prices):
require_cancel = True
else:
layer_idx = required_side_prices.index(p) # 0 = optimal, 1 = one tick below, ...
order_per_layer = self.parameters['order_per_layer'][side][layer_idx]
lots = self.parameters['lots_per_order'][side][layer_idx]
if len(side_orders[p]) != order_per_layer:
require_cancel = True
if len(side_orders[p]) > 0 and side_orders[p][0].quantity / 1000 != lots:
require_cancel = True
if require_cancel:
for t in list(side_orders[p]):
if self.cancel_order(t):
side_orders[p].remove(t)
logging.info(f"[{self.stock_id}] Removed {side} on {p}")
if len(side_orders[p]) == 0:
del side_orders[p]
# Place orders
for p in required_side_prices:
if p not in side_orders:
side_orders[p] = []
layer_idx = required_side_prices.index(p) # 0 = optimal, 1 = one tick below, ...
order_per_layer = self.parameters['order_per_layer'][side][layer_idx]
lots = self.parameters['lots_per_order'][side][layer_idx]
order = Order(
buy_sell = BSAction.Buy if side == 'bid' else BSAction.Sell,
symbol = self.stock_id,
price = str(p),
quantity = lots * 1000,
market_type = MarketType.Common,
price_type = PriceType.Limit,
time_in_force = TimeInForce.ROD,
order_type = OrderType.Stock if side == 'bid' else OrderType.DayTrade,
)
while len(side_orders[p]) < order_per_layer:
success, trade = self.place_order(order)
if success:
side_orders[p].append(trade)
logging.info(f"[{self.stock_id}] Placed {side} on {p} with size = {lots}")
reshape_side(self.bid_orders, required_bid_prices, 'bid')
reshape_side(self.ask_orders, required_ask_prices, 'ask')
elif self.state == "OPEN":
logging.info(f"[{self.stock_id}] Bid = {self.bid_orders}. Ask = {self.ask_orders}")
self.state = "WAIT_EXIT"
elif self.state == "WAIT_EXIT":
# Cancel if the price is in [nav * (1 - maker_threshold), nav * (1 + maker_threshold)]
lower_bound = self.nav() * (1 - self.parameters['maker_threshold'])
upper_bound = self.nav() * (1 + self.parameters['maker_threshold'])
for price in list(self.bid_orders):
if lower_bound <= price <= upper_bound:
for t in list(self.bid_orders[price]):
try:
if self.cancel_order(t):
self.bid_orders[price].remove(t)
except Exception as e:
logging.info(f"[{self.stock_id}] Cannot cancel bid {t} due to {e}. Already dealt?")
if len(self.bid_orders[price]) == 0:
del self.bid_orders[price]
for price in list(self.ask_orders):
if lower_bound <= price <= upper_bound:
for t in list(self.ask_orders[price]):
try:
if self.cancel_order(t):
self.ask_orders[price].remove(t)
except Exception as e:
logging.info(f"[{self.stock_id}] Cannot cancel ask {t} due to {e}. Already dealt?")
if len(self.ask_orders[price]) == 0:
del self.ask_orders[price]
with self.lock:
# Keep the exit trades on top of the book
if self.net_position > 0:
long_exit_price = Decimal(str(max(
self.ask_price,
helper.find_smallest_not_smaller(self.nav())
)))
self.update_exit_orders(self.ask_exit_orders, long_exit_price, BSAction.Sell)
if self.net_position < 0:
short_exit_price = Decimal(str(min(
self.bid_price,
helper.find_greatest_not_greater(self.nav())
)))
self.update_exit_orders(self.bid_exit_orders, short_exit_price, BSAction.Buy)
# Cancel the orders that are not necessary.
# Cacenl both side if net_postiion = 0.
# Get rid of bids if holding long position
if self.net_position >= 0:
for price in list(self.bid_exit_orders.keys()):
for order in list(self.bid_exit_orders[price]):
try:
if self.cancel_order(order):
self.bid_exit_orders[price].remove(order)
except Exception as e:
logging.info(f"[{self.stock_id}] Cannot cancel exit order {order} due to {e}. Already dealt?")
if len(self.bid_exit_orders[price]) == 0:
del self.bid_exit_orders[price]
# Get rid of asks if holding short position
if self.net_position <= 0:
for price in list(self.ask_exit_orders.keys()):
for order in list(self.ask_exit_orders[price]):
try:
if self.cancel_order(order):
self.ask_exit_orders[price].remove(order)
except Exception as e:
logging.info(f"[{self.stock_id}] Cannot cancel exit order {order} due to {e}. Already dealt?")
if len(self.ask_exit_orders[price]) == 0:
del self.ask_exit_orders[price]
elif self.state == "IDLE":
pass
if self.previous_state != self.state:
logging.info(f"State of {self.stock_id} changed from {self.previous_state} to {self.state}")
self.previous_state = self.state
def handle_futopt_message(self, quote):
self.TX_price = quote["data"]["trades"][0]["price"]
def handle_quote_message(self, quote):
if quote["channel"] == "trades":
if ('isOpen' in quote['data']) and (quote['data']['isOpen']):
self.market_open = True
# In case that open signal is not received
if ('isContinuous' in quote['data']) and (quote['data']['isContinuous']):
self.market_open = True
if quote["channel"] == "books":
if len(quote["data"]["bids"]) == 0 or len(quote["data"]["asks"]) == 0:
return
self.bid_price = Decimal(str(quote["data"]["bids"][0]["price"]))
self.ask_price = Decimal(str(quote["data"]["asks"][0]["price"]))
self.bid_volume = int(quote["data"]["bids"][0]["size"])
self.ask_volume = int(quote["data"]["asks"][0]["size"])
def handle_on_filled(self, content):
logging.info(f"{self.stock_id} has dealt message = {content}")
def _remove_from_books(books):
# books: Dict[price -> List[order/trade]]
for price, trades in list(books.items()):
for idx, t in enumerate(list(trades)):
if getattr(t, "order_no", None) == content.order_no:
del trades[idx]
if not trades:
del books[price]
return price
return None
with self.lock:
# Try exit orders first, then regular maker books
removed_from = (
_remove_from_books(self.bid_exit_orders)
or _remove_from_books(self.ask_exit_orders)
or _remove_from_books(self.bid_orders)
or _remove_from_books(self.ask_orders)
)
if removed_from is not None:
logging.info(f"[{self.stock_id}] Removed filled order {content.order_no} at {removed_from}")
# Update net position (1 lot = 1000 shares)
lots = int(content.filled_qty // 1000)
if content.buy_sell == BSAction.Sell:
self.net_position -= lots
elif content.buy_sell == BSAction.Buy:
self.net_position += lots
logging.info(f"Net position = {self.net_position}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment