Created
October 15, 2025 14:40
-
-
Save lawrence910426/3fc0576899d70655fe2111b6a16c6127 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import time | |
| import logging | |
| import math | |
| import os | |
| import contextlib | |
| import concurrent | |
| import threading | |
| import re | |
| from types import SimpleNamespace | |
| from decimal import * | |
| from notify import Notify | |
| from decimal import Decimal | |
| import bisect | |
| from fubon_neo.sdk import FubonSDK, Order | |
| from fubon_neo.constant import TimeInForce, OrderType, PriceType, MarketType, BSAction | |
| class StockHelper: | |
| def __init__(self): | |
| # Generate stock_ticks when the class is initialized | |
| self.stock_ticks = self.gen_list(0.01, 50, 0.01) + \ | |
| self.gen_list(50, 1000, 0.05) | |
| @staticmethod | |
| def gen_list(start, end, step): | |
| exponent = 0 | |
| while step <= 1: | |
| step = step * 10 | |
| exponent += 1 | |
| shift = 10 ** exponent | |
| start, end, step = int(start * shift), int(end * shift), int(step) | |
| return [Decimal(i) / Decimal(shift) for i in range(start, end, step)] | |
| def find_greatest_not_greater(self, value): | |
| """ | |
| Find the greatest tick that is not greater than the given value. | |
| Returns None if no such tick exists. | |
| """ | |
| value = Decimal(value) | |
| index = bisect.bisect_right(self.stock_ticks, value) - 1 | |
| if index >= 0: | |
| return self.stock_ticks[index] | |
| else: | |
| return None | |
| def find_smallest_not_smaller(self, value): | |
| """ | |
| Find the smallest tick that is not smaller than the given value. | |
| Returns None if no such tick exists. | |
| """ | |
| value = Decimal(value) | |
| index = bisect.bisect_left(self.stock_ticks, value) | |
| if index < len(self.stock_ticks): | |
| return self.stock_ticks[index] | |
| else: | |
| return None | |
| helper = StockHelper() | |
| class Automata: | |
| def __init__(self, sdk, account, parameters): | |
| self.sdk = sdk | |
| self.account = account | |
| # Extract from parameters | |
| self.parameters = parameters | |
| self.stock_id = self.parameters['stock_id'] | |
| # Automata states | |
| self.state = "INIT" | |
| self.previous_state = "INIT" | |
| # NAV data | |
| self.TX_price = self.parameters['prev_close'] | |
| self.last_show_nav = time.time() | |
| # ETF data | |
| self.bid_price = None | |
| self.ask_price = None | |
| self.bid_volume = None | |
| self.ask_volume = None | |
| self.market_open = False | |
| self.bid_orders, self.ask_orders = {}, {} | |
| # Lock is for dealing with the concurrency issues about the following variables. | |
| self.lock = threading.Lock() | |
| self.net_position = self.parameters["remain_position"] | |
| self.bid_exit_orders, self.ask_exit_orders = {}, {} | |
| # Speed limiter | |
| self.operations_per_second = 20 | |
| self.operations = [] | |
| def place_order(self, order): | |
| now = time.time() | |
| self.operations = [t for t in self.operations if now - t < 1.0] | |
| if len(self.operations) >= self.operations_per_second: | |
| return False, None | |
| self.operations.append(now) | |
| place_result = self.sdk.stock.place_order(self.account, order) | |
| if not place_result.is_success: | |
| logging.error(order) | |
| raise Exception(place_result.message) | |
| return True, place_result.data | |
| def cancel_order(self, trade): | |
| now = time.time() | |
| self.operations = [t for t in self.operations if now - t < 1.0] | |
| if len(self.operations) >= self.operations_per_second: | |
| return False | |
| self.operations.append(now) | |
| place_result = self.sdk.stock.cancel_order(self.account, trade) | |
| if not place_result.is_success: | |
| raise Exception(place_result) | |
| return True | |
| def update_exit_orders(self, exit_orders, optimal_price, order_action): | |
| # Never deletes orders at optimal price. So if you manually placed exit orders, | |
| # you might end up with excess exit orders. | |
| for price in list(exit_orders.keys()): | |
| if price != optimal_price: | |
| for order in list(exit_orders[price]): | |
| try: | |
| if self.cancel_order(order): | |
| exit_orders[price].remove(order) | |
| logging.info(f"[{self.stock_id}] Removed exit order on {price}") | |
| except Exception as e: | |
| logging.info(f"[{self.stock_id}] Cannot cancel exit order {order} due to {e}. Already dealt?") | |
| if len(exit_orders[price]) == 0: | |
| del exit_orders[price] | |
| exit_orders_list = exit_orders.setdefault(optimal_price, []) | |
| required_orders = abs(max(-30, min(self.net_position, 30))) - len(exit_orders_list) | |
| order = Order( | |
| buy_sell = order_action, | |
| symbol = self.stock_id, | |
| price = str(optimal_price), | |
| quantity = 1000, | |
| market_type = MarketType.Common, | |
| price_type = PriceType.Limit, | |
| time_in_force = TimeInForce.ROD, | |
| order_type = OrderType.Stock if order_action == BSAction.Buy else OrderType.DayTrade, | |
| ) | |
| for _ in range(required_orders): | |
| success, trade = self.place_order(order) | |
| if success: | |
| exit_orders_list.append(trade) | |
| logging.info(f"[{self.stock_id}] Placed exit order on {optimal_price}") | |
| def nav(self): | |
| pnl_per_lot = (float(self.TX_price) - float(self.parameters['prev_close'])) * 50 | |
| pnl = self.parameters['delta_in_mx'] * pnl_per_lot | |
| nav_total = self.parameters['prev_nav'] + pnl | |
| nav_per_share = nav_total / self.parameters['shares_outstanding'] | |
| return nav_per_share | |
| def loop(self): | |
| # Show NAV | |
| now = time.time() | |
| if self.last_show_nav + 15 < now: | |
| logging.info(f"TX price = {self.TX_price}") | |
| logging.info(f"[{self.stock_id}] Market price = {(self.bid_price, self.bid_volume, self.ask_price, self.ask_volume)}") | |
| logging.info(f"[{self.stock_id}] NAV = {self.nav()}.") | |
| self.last_show_nav = now | |
| # Loop automata | |
| if self.state == "INIT": | |
| self.state = "WAIT_OPEN" | |
| elif self.state == "WAIT_OPEN": | |
| if self.market_open: | |
| self.state = "OPEN" | |
| else: | |
| # keep ordered lists for layer indexing, plus sets for membership tests | |
| required_bid_prices, required_ask_prices = [], [] | |
| if self.bid_price is not None and self.ask_price is not None: | |
| # Determine the optimal bid and the 5 ticks immediately below it | |
| optimal_bid = min( | |
| self.bid_price, | |
| helper.find_greatest_not_greater(self.nav() * (1 - self.parameters['maker_threshold'])) | |
| ) | |
| optimal_bid_idx = helper.stock_ticks.index(optimal_bid) | |
| start_bid_idx = max(0, optimal_bid_idx - 4) | |
| # bids: layer 0 = best/closest (optimal), then one tick lower, etc. | |
| # slice is ascending; reverse so index 0 is optimal | |
| required_bid_prices = list(helper.stock_ticks[start_bid_idx : optimal_bid_idx + 1])[::-1] | |
| # Determine the optimal ask and the 5 ticks immediately above it | |
| optimal_ask = max( | |
| self.ask_price, | |
| helper.find_smallest_not_smaller(self.nav() * (1 + self.parameters['maker_threshold'])) | |
| ) | |
| optimal_ask_idx = helper.stock_ticks.index(optimal_ask) | |
| end_ask_idx = min(len(helper.stock_ticks), optimal_ask_idx + 5) | |
| # asks: slice is ascending; keep as-is so index 0 is optimal | |
| required_ask_prices = list(helper.stock_ticks[optimal_ask_idx : end_ask_idx]) | |
| def reshape_side(side_orders, required_side_prices, side): | |
| # Cancel orders | |
| for p in list(side_orders.keys()): | |
| require_cancel = False | |
| if p not in set(required_side_prices): | |
| require_cancel = True | |
| else: | |
| layer_idx = required_side_prices.index(p) # 0 = optimal, 1 = one tick below, ... | |
| order_per_layer = self.parameters['order_per_layer'][side][layer_idx] | |
| lots = self.parameters['lots_per_order'][side][layer_idx] | |
| if len(side_orders[p]) != order_per_layer: | |
| require_cancel = True | |
| if len(side_orders[p]) > 0 and side_orders[p][0].quantity / 1000 != lots: | |
| require_cancel = True | |
| if require_cancel: | |
| for t in list(side_orders[p]): | |
| if self.cancel_order(t): | |
| side_orders[p].remove(t) | |
| logging.info(f"[{self.stock_id}] Removed {side} on {p}") | |
| if len(side_orders[p]) == 0: | |
| del side_orders[p] | |
| # Place orders | |
| for p in required_side_prices: | |
| if p not in side_orders: | |
| side_orders[p] = [] | |
| layer_idx = required_side_prices.index(p) # 0 = optimal, 1 = one tick below, ... | |
| order_per_layer = self.parameters['order_per_layer'][side][layer_idx] | |
| lots = self.parameters['lots_per_order'][side][layer_idx] | |
| order = Order( | |
| buy_sell = BSAction.Buy if side == 'bid' else BSAction.Sell, | |
| symbol = self.stock_id, | |
| price = str(p), | |
| quantity = lots * 1000, | |
| market_type = MarketType.Common, | |
| price_type = PriceType.Limit, | |
| time_in_force = TimeInForce.ROD, | |
| order_type = OrderType.Stock if side == 'bid' else OrderType.DayTrade, | |
| ) | |
| while len(side_orders[p]) < order_per_layer: | |
| success, trade = self.place_order(order) | |
| if success: | |
| side_orders[p].append(trade) | |
| logging.info(f"[{self.stock_id}] Placed {side} on {p} with size = {lots}") | |
| reshape_side(self.bid_orders, required_bid_prices, 'bid') | |
| reshape_side(self.ask_orders, required_ask_prices, 'ask') | |
| elif self.state == "OPEN": | |
| logging.info(f"[{self.stock_id}] Bid = {self.bid_orders}. Ask = {self.ask_orders}") | |
| self.state = "WAIT_EXIT" | |
| elif self.state == "WAIT_EXIT": | |
| # Cancel if the price is in [nav * (1 - maker_threshold), nav * (1 + maker_threshold)] | |
| lower_bound = self.nav() * (1 - self.parameters['maker_threshold']) | |
| upper_bound = self.nav() * (1 + self.parameters['maker_threshold']) | |
| for price in list(self.bid_orders): | |
| if lower_bound <= price <= upper_bound: | |
| for t in list(self.bid_orders[price]): | |
| try: | |
| if self.cancel_order(t): | |
| self.bid_orders[price].remove(t) | |
| except Exception as e: | |
| logging.info(f"[{self.stock_id}] Cannot cancel bid {t} due to {e}. Already dealt?") | |
| if len(self.bid_orders[price]) == 0: | |
| del self.bid_orders[price] | |
| for price in list(self.ask_orders): | |
| if lower_bound <= price <= upper_bound: | |
| for t in list(self.ask_orders[price]): | |
| try: | |
| if self.cancel_order(t): | |
| self.ask_orders[price].remove(t) | |
| except Exception as e: | |
| logging.info(f"[{self.stock_id}] Cannot cancel ask {t} due to {e}. Already dealt?") | |
| if len(self.ask_orders[price]) == 0: | |
| del self.ask_orders[price] | |
| with self.lock: | |
| # Keep the exit trades on top of the book | |
| if self.net_position > 0: | |
| long_exit_price = Decimal(str(max( | |
| self.ask_price, | |
| helper.find_smallest_not_smaller(self.nav()) | |
| ))) | |
| self.update_exit_orders(self.ask_exit_orders, long_exit_price, BSAction.Sell) | |
| if self.net_position < 0: | |
| short_exit_price = Decimal(str(min( | |
| self.bid_price, | |
| helper.find_greatest_not_greater(self.nav()) | |
| ))) | |
| self.update_exit_orders(self.bid_exit_orders, short_exit_price, BSAction.Buy) | |
| # Cancel the orders that are not necessary. | |
| # Cacenl both side if net_postiion = 0. | |
| # Get rid of bids if holding long position | |
| if self.net_position >= 0: | |
| for price in list(self.bid_exit_orders.keys()): | |
| for order in list(self.bid_exit_orders[price]): | |
| try: | |
| if self.cancel_order(order): | |
| self.bid_exit_orders[price].remove(order) | |
| except Exception as e: | |
| logging.info(f"[{self.stock_id}] Cannot cancel exit order {order} due to {e}. Already dealt?") | |
| if len(self.bid_exit_orders[price]) == 0: | |
| del self.bid_exit_orders[price] | |
| # Get rid of asks if holding short position | |
| if self.net_position <= 0: | |
| for price in list(self.ask_exit_orders.keys()): | |
| for order in list(self.ask_exit_orders[price]): | |
| try: | |
| if self.cancel_order(order): | |
| self.ask_exit_orders[price].remove(order) | |
| except Exception as e: | |
| logging.info(f"[{self.stock_id}] Cannot cancel exit order {order} due to {e}. Already dealt?") | |
| if len(self.ask_exit_orders[price]) == 0: | |
| del self.ask_exit_orders[price] | |
| elif self.state == "IDLE": | |
| pass | |
| if self.previous_state != self.state: | |
| logging.info(f"State of {self.stock_id} changed from {self.previous_state} to {self.state}") | |
| self.previous_state = self.state | |
| def handle_futopt_message(self, quote): | |
| self.TX_price = quote["data"]["trades"][0]["price"] | |
| def handle_quote_message(self, quote): | |
| if quote["channel"] == "trades": | |
| if ('isOpen' in quote['data']) and (quote['data']['isOpen']): | |
| self.market_open = True | |
| # In case that open signal is not received | |
| if ('isContinuous' in quote['data']) and (quote['data']['isContinuous']): | |
| self.market_open = True | |
| if quote["channel"] == "books": | |
| if len(quote["data"]["bids"]) == 0 or len(quote["data"]["asks"]) == 0: | |
| return | |
| self.bid_price = Decimal(str(quote["data"]["bids"][0]["price"])) | |
| self.ask_price = Decimal(str(quote["data"]["asks"][0]["price"])) | |
| self.bid_volume = int(quote["data"]["bids"][0]["size"]) | |
| self.ask_volume = int(quote["data"]["asks"][0]["size"]) | |
| def handle_on_filled(self, content): | |
| logging.info(f"{self.stock_id} has dealt message = {content}") | |
| def _remove_from_books(books): | |
| # books: Dict[price -> List[order/trade]] | |
| for price, trades in list(books.items()): | |
| for idx, t in enumerate(list(trades)): | |
| if getattr(t, "order_no", None) == content.order_no: | |
| del trades[idx] | |
| if not trades: | |
| del books[price] | |
| return price | |
| return None | |
| with self.lock: | |
| # Try exit orders first, then regular maker books | |
| removed_from = ( | |
| _remove_from_books(self.bid_exit_orders) | |
| or _remove_from_books(self.ask_exit_orders) | |
| or _remove_from_books(self.bid_orders) | |
| or _remove_from_books(self.ask_orders) | |
| ) | |
| if removed_from is not None: | |
| logging.info(f"[{self.stock_id}] Removed filled order {content.order_no} at {removed_from}") | |
| # Update net position (1 lot = 1000 shares) | |
| lots = int(content.filled_qty // 1000) | |
| if content.buy_sell == BSAction.Sell: | |
| self.net_position -= lots | |
| elif content.buy_sell == BSAction.Buy: | |
| self.net_position += lots | |
| logging.info(f"Net position = {self.net_position}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment