Created
November 19, 2025 03:45
-
-
Save openbrian/cd0b0401dfbeb5ad79fd522cee59b71f to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/python3 | |
| # Forgejo email interface | |
| # /etc/aliases | |
| # [email protected]: "|/usr/local/sbin/email2ticket.py" | |
| from datetime import datetime | |
| from email.parser import BytesParser | |
| from email.policy import default | |
| from random import choices | |
| from string import ascii_lowercase, digits | |
| from sys import exit, stderr, stdin | |
| from httpx import get, post, put, RequestError | |
| # TODO | |
| # - [ ] Handle human names in sender and recipient. | |
| # - [ ] validate input | |
| # - [ ] limit the input length | |
| # - [ ] create wild card account to catch typos in email addresses | |
| # - [ ] swagger has api to convert to markdown | |
| # - [ ] ignore emails from root | |
| BASE = "https://your-forge.site.example.org/api/v1" | |
| TOKEN = "" | |
| TEAM_ID = 3 | |
| headers = { | |
| "accept": "application/json", | |
| "Content-Type": "application/json", | |
| } | |
| def parse(): | |
| data = stdin.buffer.read() | |
| # Could also read encoded string. | |
| msg = BytesParser(policy=default).parsebytes(data) | |
| return msg | |
| def print_owner(): | |
| import getpass | |
| import sys | |
| import os | |
| import psutil | |
| current_pid = os.getpid() | |
| proc = psutil.Process(current_pid) | |
| owner = proc.username() | |
| username = getpass.getuser() | |
| print(f"owner: {owner}, username: {username}", file=stderr) | |
| def touch_ts(): | |
| path = "/tmp/email2ticket.out" | |
| ts = datetime.now() | |
| with open(path, 'a') as file: | |
| file.write(f"Timestamp: {ts}\n") | |
| def get_org_repo(address): | |
| # TODO: Don't assume there will be an @. | |
| parts = address.split("@") | |
| org, repo = parts[0].split('-') | |
| return org, repo | |
| def message_to_ticket(message): | |
| username = get_or_create_user(message["From"]) | |
| org, repo = get_org_repo(message["To"]) | |
| body = message.get_body(preferencelist=('plain')) | |
| body = body.get_content() if body else 'no body' | |
| return { | |
| "assignee": username, | |
| "body": body, | |
| "org": org, | |
| "repo": repo, | |
| "title": message["Subject"], | |
| } | |
| def post_ticket(assignee, body, org, repo, title): | |
| url = f"{BASE}/repos/{org}/{repo}/issues" | |
| url_with_token = f"{url}?access_token={TOKEN}" | |
| payload = { | |
| "assignees": [ | |
| assignee, | |
| ], | |
| "body": body, | |
| "title": title, | |
| } | |
| response = post(url_with_token, headers=headers, json=payload) | |
| if response.status_code != 201: | |
| print(f"POST requst failed with status code: {response.status_code}", file=stderr) | |
| print(f"Response text: {response.text}", file=stderr) | |
| exit(2) | |
| print("POST request successful", file=stderr) | |
| print(response.json()) # not logged yet by postfix because it's stdout | |
| def lookup_username(email_address): | |
| url = f"{BASE}/admin/emails/search" | |
| url = f"{url}?access_token={TOKEN}" | |
| url = f"{url}&q={email_address}" | |
| response = get(url, headers=headers) | |
| if response.status_code != 200: | |
| print(f"POST requst failed with status code: {response.status_code}", file=stderr) | |
| print(f"Response text: {response.text}", file=stderr) | |
| exit(12) | |
| records = response.json() | |
| recs = list(filter(lambda record: record["email"] == email_address, records)) | |
| # [{ | |
| # "email": "[email protected]", | |
| # "verified": true, # TODO: Take advantage of this. | |
| # "primary": true, | |
| # "user_id": 5400, | |
| # "username": "washington_1776" | |
| # }, {}, ...] | |
| if len(recs) != 1: | |
| print("Did not find exactly 1 record. Found {}".format(len(recs))) | |
| return None | |
| user_rec = recs[0] | |
| return user_rec["username"] | |
| def create_user(email_address): | |
| """ return username or bail out """ | |
| url = f"{BASE}/admin/users?access_token={TOKEN}" | |
| username = ''.join(choices(ascii_lowercase + digits, k=10)) | |
| password = ''.join(choices(ascii_lowercase + digits, k=10)) | |
| payload = { | |
| "email": email_address, | |
| "password": password, | |
| "username": f"email_{username}", | |
| } | |
| response = post(url, headers=headers, json=payload) | |
| if response.status_code != 201: | |
| print(f"POST requst failed with status code: {response.status_code}", file=stderr) | |
| print(f"Response text: {response.text}", file=stderr) | |
| exit(32) | |
| print("Created user.", file=stderr) | |
| record = response.json() | |
| print(record) # not logged yet by postfix because it's stdout | |
| return record["login"] | |
| def add_to_members(username): | |
| """ do it or bail out """ | |
| url = f"{BASE}/teams/{TEAM_ID}/members/{username}?access_token={TOKEN}" | |
| response = put(url, headers=headers) | |
| if response.status_code != 204: | |
| print(f"POST requst failed with status code: {response.status_code}", file=stderr) | |
| print(f"Response text: {response.text}", file=stderr) | |
| exit(52) | |
| print("Added to members.", file=stderr) | |
| return True | |
| def get_or_create_user(email_address): | |
| username = lookup_username(email_address) | |
| if not username: | |
| print("user does not exist", file=stderr) | |
| username = create_user(email_address) | |
| if not add_to_members(username): | |
| print("Could not add to members", file=stderr) | |
| exit(41) | |
| print(f"username <{username}>", file=stderr) | |
| return username | |
| # main | |
| #print_owner() | |
| #touch_ts() | |
| message = parse() | |
| del message["From"] | |
| message["From"] = '[email protected]' # override this for now | |
| #print('message is', file=stderr) | |
| #print(message, file=stderr) | |
| ticket = message_to_ticket(message) | |
| post_ticket(**ticket) | |
| #exit(1) # with this we force postfix to log printed statements |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment