Created
August 25, 2025 17:47
-
-
Save felddy/56025f1315b8027f0ee49a25795843ab to your computer and use it in GitHub Desktop.
Mail files directly to an SMTP server
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| mailfiles.py — Proof-of-concept CLI to email file(s) via SMTP. | |
| Standard library only. Loads attachments into memory. | |
| Defaults: | |
| - If --server is omitted, MX of first --to recipient is used on port 25. | |
| - Opportunistic STARTTLS: try if supported; use --require-starttls to enforce, | |
| or --no-starttls to disable. Use --ssl for implicit TLS (e.g., 465). | |
| Env support: | |
| SMTP_USERNAME, SMTP_PASSWORD can supply credentials. | |
| Examples: | |
| ./mailfiles.py --to [email protected] -- ./report.pdf syslog.txt | |
| SMTP_USERNAME=alice SMTP_PASSWORD=app-pass \ | |
| ./mailfiles.py --to [email protected] --server mail.example.com:465 --ssl a.txt b.jpg | |
| """ | |
| from __future__ import annotations | |
| from email.message import EmailMessage | |
| from email.utils import parseaddr | |
| from pathlib import Path | |
| from smtplib import SMTP, SMTP_SSL, SMTPException, SMTPNotSupportedError | |
| import argparse | |
| import getpass | |
| import logging | |
| import mimetypes | |
| import os | |
| import socket | |
| import ssl | |
| import subprocess | |
| import sys | |
| import time | |
| def parse_server(spec: str) -> tuple[str, int]: | |
| if ":" in spec: | |
| host, port_str = spec.rsplit(":", 1) | |
| try: | |
| port = int(port_str) | |
| except ValueError: | |
| raise SystemExit(f"Invalid port in --server: {port_str}") | |
| else: | |
| host, port = spec, 25 | |
| return host, port | |
| def _parse_mx_text(text: str) -> tuple[str, int] | None: | |
| """ | |
| Best-effort parser for 'dig +short mx domain' and 'nslookup -type=mx domain' output. | |
| Returns (host, pref) for the lowest preference, or None. | |
| """ | |
| candidates: list[tuple[int, str]] = [] | |
| for line in text.splitlines(): | |
| line = line.strip() | |
| if not line: | |
| continue | |
| parts = line.split() | |
| # dig format: "10 mx1.example.com." | |
| if len(parts) >= 2 and parts[0].isdigit(): | |
| try: | |
| pref = int(parts[0]) | |
| host = parts[1].rstrip(".") | |
| if host and host != ".": | |
| candidates.append((pref, host)) | |
| continue | |
| except ValueError: | |
| pass | |
| # nslookup format: "... mail exchanger = 10 mx1.example.com." | |
| if "exchanger" in parts and "=" in parts: | |
| try: | |
| eqi = parts.index("=") | |
| pref = int(parts[eqi + 1]) | |
| host = parts[eqi + 2].rstrip(".") | |
| if host and host != ".": | |
| candidates.append((pref, host)) | |
| except (ValueError, IndexError): | |
| pass | |
| if not candidates: | |
| return None | |
| candidates.sort(key=lambda t: t[0]) | |
| return candidates[0][1], candidates[0][0] | |
| def resolve_mx_host(domain: str, timeout: int = 5) -> str | None: | |
| """ | |
| Resolve the lowest-preference MX for 'domain'. | |
| Tries dnspython if available; falls back to 'dig' or 'nslookup'. | |
| Returns MX host or None on failure. | |
| """ | |
| # Try dnspython if installed | |
| try: | |
| import dns.resolver # type: ignore | |
| answers = dns.resolver.resolve(domain, "MX", lifetime=timeout) | |
| records = sorted( | |
| ((r.preference, str(r.exchange).rstrip(".")) for r in answers), | |
| key=lambda t: t[0], | |
| ) | |
| if records and records[0][1] and records[0][1] != ".": | |
| return records[0][1] | |
| except Exception: | |
| pass | |
| # Try dig | |
| try: | |
| cp = subprocess.run( | |
| ["dig", "+short", "mx", domain], | |
| capture_output=True, | |
| text=True, | |
| timeout=timeout, | |
| check=False, | |
| ) | |
| if cp.returncode == 0: | |
| parsed = _parse_mx_text(cp.stdout) | |
| if parsed: | |
| host, _ = parsed | |
| return host | |
| except FileNotFoundError: | |
| pass | |
| except Exception: | |
| pass | |
| # Try nslookup | |
| try: | |
| cp = subprocess.run( | |
| ["nslookup", "-type=mx", domain], | |
| capture_output=True, | |
| text=True, | |
| timeout=timeout, | |
| check=False, | |
| ) | |
| if cp.returncode == 0: | |
| parsed = _parse_mx_text(cp.stdout) | |
| if parsed: | |
| host, _ = parsed | |
| return host | |
| except FileNotFoundError: | |
| pass | |
| except Exception: | |
| pass | |
| return None | |
| def guess_from_addr() -> str: | |
| return f"{getpass.getuser()}@{socket.getfqdn() or 'localhost'}" | |
| def build_message( | |
| from_addr: str, | |
| to_addrs: list[str], | |
| cc_addrs: list[str], | |
| subject: str, | |
| body: str, | |
| file_paths: list[Path], | |
| ) -> EmailMessage: | |
| msg = EmailMessage() | |
| msg["From"] = from_addr | |
| msg["To"] = ", ".join(to_addrs) | |
| if cc_addrs: | |
| msg["Cc"] = ", ".join(cc_addrs) | |
| msg["Subject"] = subject | |
| msg.set_content(body) | |
| for p in file_paths: | |
| if not p.is_file(): | |
| raise SystemExit(f"Not a file or does not exist: {p}") | |
| data = p.read_bytes() | |
| ctype, _ = mimetypes.guess_type(p.name) | |
| if ctype is None: | |
| maintype, subtype = "application", "octet-stream" | |
| else: | |
| maintype, subtype = ctype.split("/", 1) | |
| msg.add_attachment(data, maintype=maintype, subtype=subtype, filename=p.name) | |
| return msg | |
| def configure_logging(args) -> logging.Logger: | |
| # Determine level: explicit --log-level wins, else --debug, else --verbose, else WARNING | |
| if args.log_level: | |
| level = getattr(logging, args.log_level) | |
| elif args.debug: | |
| level = logging.DEBUG | |
| elif args.verbose: | |
| level = logging.INFO | |
| else: | |
| level = logging.INFO | |
| logging.basicConfig( | |
| level=level, | |
| format="%(asctime)s %(levelname)s %(message)s", | |
| ) | |
| logger = logging.getLogger("mailfiles") | |
| # If --debug and no smtplib debug set, nudge it to 1 | |
| if args.debug and args.smtp_debug == 0: | |
| args.smtp_debug = 1 | |
| logger.debug("--debug enabled: setting --smtp-debug=1") | |
| return logger | |
| def main() -> int: | |
| ap = argparse.ArgumentParser( | |
| description="Send one or more files as email attachments via SMTP." | |
| ) | |
| ap.add_argument( | |
| "--server", | |
| required=False, | |
| default=None, | |
| help="SMTP server as host[:port]. If omitted, the MX of the first --to recipient is used on port 25.", | |
| ) | |
| ap.add_argument( | |
| "--mx", | |
| action="store_true", | |
| help=( | |
| "Resolve MX for the first --to recipient and use it as the SMTP server (port 25). " | |
| "Overrides --server if both are given." | |
| ), | |
| ) | |
| ap.add_argument( | |
| "--ssl", | |
| action="store_true", | |
| help="Use implicit SSL (e.g., port 465).", | |
| ) | |
| ap.add_argument( | |
| "--require-starttls", | |
| action="store_true", | |
| help="Require STARTTLS. Fail if the server does not support it.", | |
| ) | |
| ap.add_argument( | |
| "--no-starttls", | |
| action="store_true", | |
| help="Do not attempt STARTTLS (default is opportunistic STARTTLS when supported).", | |
| ) | |
| ap.add_argument( | |
| "--timeout", | |
| type=int, | |
| default=20, | |
| help="SMTP socket timeout in seconds. Default: 20", | |
| ) | |
| ap.add_argument( | |
| "--from-addr", | |
| dest="from_addr", | |
| default=None, | |
| help="From address. Default: <user>@<fqdn>", | |
| ) | |
| ap.add_argument( | |
| "--to", | |
| action="append", | |
| required=True, | |
| help="Recipient (repeatable).", | |
| ) | |
| ap.add_argument( | |
| "--cc", | |
| action="append", | |
| default=[], | |
| help="Optional Cc recipients (repeatable).", | |
| ) | |
| ap.add_argument( | |
| "--bcc", | |
| action="append", | |
| default=[], | |
| help="Optional Bcc recipients (repeatable; not shown in headers).", | |
| ) | |
| ap.add_argument( | |
| "--subject", | |
| default=None, | |
| help="Subject line. Default: 'Files: <names>'", | |
| ) | |
| ap.add_argument( | |
| "--body", | |
| default=None, | |
| help="Plain-text body. Default: 'Attached N file(s).'", | |
| ) | |
| ap.add_argument( | |
| "--username", | |
| default=os.environ.get("SMTP_USERNAME"), | |
| help="SMTP username. Default: env SMTP_USERNAME.", | |
| ) | |
| ap.add_argument( | |
| "--password", | |
| default=os.environ.get("SMTP_PASSWORD"), | |
| help="SMTP password. Default: env SMTP_PASSWORD or prompted if --username set.", | |
| ) | |
| ap.add_argument( | |
| "--smtp-debug", | |
| type=int, | |
| default=0, | |
| help="Set smtplib debug level (0,1,2). Default: 0", | |
| ) | |
| # Logging controls | |
| ap.add_argument( | |
| "--log-level", | |
| choices=["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG", "NOTSET"], | |
| help="Logging level (default INFO; --verbose sets INFO; --debug sets DEBUG).", | |
| ) | |
| ap.add_argument( | |
| "--verbose", | |
| action="store_true", | |
| help="Enable INFO logging to stderr.", | |
| ) | |
| ap.add_argument( | |
| "--debug", | |
| action="store_true", | |
| help="Enable DEBUG logging; if --smtp-debug isn't set, it will be set to 1.", | |
| ) | |
| ap.add_argument( | |
| "files", | |
| nargs="+", | |
| metavar="FILE", | |
| help="Files to attach. Use -- to separate options from files if needed.", | |
| ) | |
| args = ap.parse_args() | |
| logger = configure_logging(args) | |
| # Decide SMTP target: --mx wins; otherwise MX if --server omitted; else explicit --server. | |
| if args.mx or not args.server: | |
| if not args.to: | |
| ap.error("--mx or default-MX requires at least one --to recipient.") | |
| _, addr = parseaddr(args.to[0]) | |
| if "@" not in addr: | |
| ap.error(f"Cannot infer domain from recipient: {args.to[0]!r}") | |
| domain = addr.rsplit("@", 1)[1] | |
| logger.info("Resolving MX for %s", domain) | |
| t0 = time.time() | |
| mx_host = resolve_mx_host(domain, timeout=args.timeout) | |
| dt = (time.time() - t0) * 1000 | |
| if not mx_host: | |
| ap.error( | |
| f"MX lookup failed for {domain}. Install 'dnspython' or ensure 'dig' or 'nslookup' is available, " | |
| "or provide --server explicitly." | |
| ) | |
| logger.info("Resolved MX %s in %.1f ms", mx_host, dt) | |
| host, port = mx_host, 25 | |
| else: | |
| host, port = parse_server(args.server) | |
| logger.info("Using explicit SMTP server %s:%d", host, port) | |
| from_addr = args.from_addr or guess_from_addr() | |
| files = [Path(f) for f in args.files] | |
| subj = args.subject or ("Files: " + ", ".join(p.name for p in files)) | |
| body = args.body or f"Attached {len(files)} file(s)." | |
| logger.info("From: %s", from_addr) | |
| logger.info("To: %s", ", ".join(args.to)) | |
| if args.cc: | |
| logger.info("Cc: %s", ", ".join(args.cc)) | |
| if args.bcc: | |
| logger.info("Bcc recipients: %d", len(args.bcc)) | |
| logger.info("Attachments: %s", ", ".join(p.name for p in files)) | |
| msg = build_message( | |
| from_addr=from_addr, | |
| to_addrs=args.to, | |
| cc_addrs=args.cc, | |
| subject=subj, | |
| body=body, | |
| file_paths=files, | |
| ) | |
| all_rcpts = list({*args.to, *args.cc, *args.bcc}) | |
| if args.username and args.password is None: | |
| # Prompt interactively if user supplied but pass not provided. | |
| try: | |
| args.password = getpass.getpass("SMTP password: ") | |
| except KeyboardInterrupt: | |
| return 130 | |
| try: | |
| ctor = SMTP_SSL if args.ssl else SMTP | |
| logger.info("Connecting to %s:%d (ssl=%s)", host, port, bool(args.ssl)) | |
| with ctor(host=host, port=port, timeout=args.timeout) as s: | |
| s.set_debuglevel(args.smtp_debug) | |
| s.ehlo() | |
| # TLS behavior | |
| attempt_starttls = (not args.no_starttls) and (not args.ssl) | |
| require_starttls = args.require_starttls | |
| if attempt_starttls: | |
| tls_ctx = ssl.create_default_context() | |
| if s.has_extn("starttls"): | |
| try: | |
| logger.info("STARTTLS advertised; negotiating TLS...") | |
| s.starttls(context=tls_ctx) | |
| s.ehlo() # refresh capabilities after TLS | |
| logger.info("STARTTLS established") | |
| except (SMTPNotSupportedError, SMTPException) as e: | |
| logger.error("STARTTLS negotiation failed: %s", e) | |
| if require_starttls: | |
| logger.error("Aborting due to --require-starttls") | |
| return 1 | |
| else: | |
| if require_starttls: | |
| logger.error("Server does not advertise STARTTLS; aborting.") | |
| return 1 | |
| else: | |
| logger.info("Server does not advertise STARTTLS; continuing without TLS") | |
| if args.username: | |
| logger.info("Authenticating as %s", args.username) | |
| s.login(args.username, args.password or "") | |
| logger.info("Sending message to %d recipient(s)", len(all_rcpts)) | |
| refused = s.send_message(msg, from_addr=from_addr, to_addrs=all_rcpts) | |
| if refused: | |
| # refused is a dict of {rcpt: (code, resp)} | |
| logger.warning("Some recipients were refused: %s", refused) | |
| # Non-zero exit to signal partial failure | |
| return 2 | |
| logger.info("Message accepted for all recipients") | |
| except SMTPException as e: | |
| logging.exception("SMTP error") | |
| print(f"SMTP error: {e}", file=sys.stderr) | |
| return 1 | |
| except OSError as e: | |
| logging.exception("Network/OS error") | |
| print(f"Network or OS error: {e}", file=sys.stderr) | |
| return 1 | |
| return 0 | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment