Created
February 1, 2026 09:11
-
-
Save kawai-neko-meow/3ffe5e29ddaa6fb48045fbf0a3148523 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| TOKEN = "oauth:<TOKEN>" | |
| import socket | |
| import time | |
| import dataclasses | |
| @dataclasses.dataclass(frozen=True) | |
| class Message: | |
| command: str | |
| user: str | None | |
| host: str | None | |
| args: dict | |
| params: str | |
| tags: dict | |
| class IRC: | |
| ''' | |
| PONG | |
| GLOBALUSERSTATE ROOMSTATE USERSTATE - CAP REQ :twitch.tv/commands | |
| CLEARCHAT CLEARMSG HOSTTARGET | |
| NOTICE RECONNECT USERNOTICE | |
| WHISPER NOTICE PART PING PRIVMSG | |
| ''' | |
| irc = socket.socket() | |
| def __init__(self): | |
| # Define the socket | |
| self.irc = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
| def _send(self, command, *args): | |
| command = ' '.join((command, *args, '\r\n')).encode('utf-8') | |
| self.irc.send(command) | |
| def connect(self, server, port, user, password): | |
| self.irc.connect((server, port)) | |
| self._send('PASS', password) | |
| self._send('NICK', user) | |
| def join(self, *channels): | |
| self._send('JOIN', ','.join(channels)) | |
| def set_caps(self, *caps): | |
| self._send('CAP', 'REQ', ':' + ' '.join(caps)) | |
| def send_message(self, channel, msg): | |
| self._send('PRIVMSG', channel, ":" + msg) | |
| def leave(self, channel): | |
| self._send('PART', channel) | |
| def get_response(self) -> list[Message]: | |
| data = [] | |
| self.irc.setblocking(False) | |
| try: | |
| while True: | |
| chunk = self.irc.recv(4096) | |
| data.append(chunk) | |
| except BlockingIOError: | |
| pass | |
| finally: | |
| self.irc.setblocking(True) | |
| resp = b''.join(data).decode('utf-8') | |
| messages = [] | |
| for i in resp.split('\r\n'): | |
| if i: | |
| try: | |
| i = _parse_message(i) | |
| except Exception: | |
| print('PARSING', i) | |
| raise | |
| if i is not None: | |
| messages.append(i) | |
| return messages | |
| def _split_message(message: str): | |
| idx = 0 | |
| tags = None | |
| source = None | |
| params = None | |
| if message[idx] == '@': | |
| idx += 1 | |
| end_idx = message.index(' ', idx) | |
| tags = message[idx:end_idx] | |
| idx = end_idx + 1 | |
| if message[idx] == ':': | |
| idx += 1 | |
| end_idx = message.index(' ', idx) | |
| source = message[idx:end_idx] | |
| idx = end_idx + 1 | |
| end_idx = message.find(':', idx) | |
| if end_idx == -1: | |
| end_idx = len(message) | |
| command = message[idx:end_idx].strip() | |
| if end_idx != len(message): | |
| idx = end_idx + 1 | |
| params = message[idx:] | |
| return tags, source, command, params | |
| def _parse_message(message: str): | |
| tags, source, command, params = _split_message(message) | |
| user = host = None | |
| command, args = _parse_command(command) | |
| if command is None: | |
| return None | |
| if tags is not None: | |
| tags = _parse_tags(tags) | |
| if source is not None: | |
| user, host = _parse_source(source) | |
| if params is not None and params.startswith('!'): | |
| _parse_params(params, args) | |
| return Message(command, user, host, args, params, tags) | |
| def _parse_tags(tags: str): | |
| result = {} | |
| for tag in tags.split(';'): | |
| tag = tag.split('=', maxsplit=1) | |
| if len(tag) == 1 or not tag[1]: | |
| value = None | |
| else: | |
| value = tag[1] | |
| result[tag[0]] = value | |
| return result | |
| def _parse_command(command: str): | |
| parsed = None | |
| cmd, *args = command.split(' ') | |
| if cmd in {'JOIN', 'PART', 'NOTICE', 'CLEARCHAT', 'HOSTTARGET', 'PRIVMSG', 'USERSTATE', | |
| 'ROOMSTATE', 'USERNOTICE', 'CLEARMSG'}: | |
| parsed = { | |
| 'channel': args[0], | |
| } | |
| elif cmd in {'001', '002', '003', '004', '375', '372', '376'}: | |
| parsed = { | |
| 'user_name': args[0], | |
| } | |
| elif cmd == 'WHISPER': | |
| parsed = { | |
| 'from_user': args[0], | |
| } | |
| elif cmd in {'PING', 'GLOBALUSERSTATE', 'RECONNECT'}: | |
| parsed = {} | |
| elif cmd == '353': | |
| parsed = { | |
| 'user_name': args[0], | |
| 'type': args[1], | |
| 'channel': args[2], | |
| } | |
| elif cmd == '366': | |
| parsed = { | |
| 'user_name': args[0], | |
| 'channel': args[1], | |
| } | |
| elif cmd == 'CAP': | |
| parsed = { | |
| 'isCapRequestEnabled': args[1] == 'ACK', | |
| } | |
| elif cmd == '421': | |
| print('Unsupported', args) | |
| else: | |
| print('Got unknown', cmd, args) | |
| return cmd, parsed | |
| def _parse_source(source: str): | |
| parts = source.split('!') | |
| if len(parts) == 2: | |
| return parts[0], parts[1] | |
| return None, parts[0] | |
| def _parse_params(params: str, command): | |
| command_parts = params[1:].strip() | |
| params_idx = command_parts.find(' ') | |
| if params_idx == -1: | |
| command['bot_command'] = command_parts | |
| else: | |
| command['bot_command'] = command_parts[:params_idx] | |
| command['bot_command_params'] = command_parts[params_idx:].strip() | |
| def _parse_tags2(msg: Message): | |
| if not msg.tags: | |
| return | |
| for tag_name, tag_value in msg.tags.items(): | |
| if tag_value is None: | |
| continue | |
| if tag_name in {'badges', 'badges-info'}: | |
| tag_value = dict((pair.split('/') for pair in tag_value.split(','))) | |
| elif tag_name == 'emotes': | |
| dictEmotes = {} | |
| for emote in tag_value.split('/'): | |
| parts = emote.split(':') | |
| text_positions = [] | |
| for position in parts[1].split(','): | |
| pos_parts = position.split('-') | |
| text_positions.append((int(pos_parts[0]), int(pos_parts[1]))) | |
| dictEmotes[parts[0]] = text_positions | |
| tag_value = dictEmotes | |
| elif tag_name == 'emote-sets': | |
| tag_value = tag_value.split(',') | |
| msg.tags[tag_name] = tag_value | |
| def main(): | |
| irc = IRC() | |
| irc.connect('irc.chat.twitch.tv', 6667, 'kawai_neko', TOKEN) | |
| irc.set_caps('twitch.tv/commands', 'twitch.tv/membership', 'twitch.tv/tags') | |
| time.sleep(1) | |
| irc.join('#keysie') | |
| while True: | |
| msgs = irc.get_response() | |
| for msg in msgs: | |
| _parse_tags2(msg) | |
| if msg.command in {'001', '002', '003', '004', '375', '372', '376', '366'}: | |
| print('[SERVER]', msg.params) | |
| elif msg.command == '353': | |
| print('[SERVER] Users in channel:', msg.params) | |
| elif msg.command == 'CAP': | |
| print('[SERVER]', 'Caps set' if msg.args['isCapRequestEnabled'] else 'Caps not set') | |
| elif msg.command == 'JOIN': | |
| print(f'[SERVER{msg.args["channel"]}] {msg.user} joined channel!') | |
| elif msg.command == 'PART': | |
| print(f'[SERVER{msg.args["channel"]}] {msg.user} left channel!') | |
| elif msg.command == 'USERSTATE': | |
| print(f'[SERVER{msg.args["channel"]}] Our state in channel:', msg.tags) | |
| elif msg.command == 'ROOMSTATE': | |
| print(f'[SERVER{msg.args["channel"]}] State of channel:', msg.tags) | |
| elif msg.command == 'PRIVMSG': | |
| print(f'[{msg.user}{msg.args["channel"]}] {msg.params}') | |
| print(' ', msg.tags) | |
| else: | |
| print(msg) | |
| time.sleep(1) | |
| if __name__ == '__main__': | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment