Skip to content

Instantly share code, notes, and snippets.

@felddy
Created August 25, 2025 17:47
Show Gist options
  • Select an option

  • Save felddy/56025f1315b8027f0ee49a25795843ab to your computer and use it in GitHub Desktop.

Select an option

Save felddy/56025f1315b8027f0ee49a25795843ab to your computer and use it in GitHub Desktop.
Mail files directly to an SMTP server
#!/usr/bin/env python3
"""
mailfiles.py — Proof-of-concept CLI to email file(s) via SMTP.
Standard library only. Loads attachments into memory.
Defaults:
- If --server is omitted, MX of first --to recipient is used on port 25.
- Opportunistic STARTTLS: try if supported; use --require-starttls to enforce,
or --no-starttls to disable. Use --ssl for implicit TLS (e.g., 465).
Env support:
SMTP_USERNAME, SMTP_PASSWORD can supply credentials.
Examples:
./mailfiles.py --to [email protected] -- ./report.pdf syslog.txt
SMTP_USERNAME=alice SMTP_PASSWORD=app-pass \
./mailfiles.py --to [email protected] --server mail.example.com:465 --ssl a.txt b.jpg
"""
from __future__ import annotations
from email.message import EmailMessage
from email.utils import parseaddr
from pathlib import Path
from smtplib import SMTP, SMTP_SSL, SMTPException, SMTPNotSupportedError
import argparse
import getpass
import logging
import mimetypes
import os
import socket
import ssl
import subprocess
import sys
import time
def parse_server(spec: str) -> tuple[str, int]:
if ":" in spec:
host, port_str = spec.rsplit(":", 1)
try:
port = int(port_str)
except ValueError:
raise SystemExit(f"Invalid port in --server: {port_str}")
else:
host, port = spec, 25
return host, port
def _parse_mx_text(text: str) -> tuple[str, int] | None:
"""
Best-effort parser for 'dig +short mx domain' and 'nslookup -type=mx domain' output.
Returns (host, pref) for the lowest preference, or None.
"""
candidates: list[tuple[int, str]] = []
for line in text.splitlines():
line = line.strip()
if not line:
continue
parts = line.split()
# dig format: "10 mx1.example.com."
if len(parts) >= 2 and parts[0].isdigit():
try:
pref = int(parts[0])
host = parts[1].rstrip(".")
if host and host != ".":
candidates.append((pref, host))
continue
except ValueError:
pass
# nslookup format: "... mail exchanger = 10 mx1.example.com."
if "exchanger" in parts and "=" in parts:
try:
eqi = parts.index("=")
pref = int(parts[eqi + 1])
host = parts[eqi + 2].rstrip(".")
if host and host != ".":
candidates.append((pref, host))
except (ValueError, IndexError):
pass
if not candidates:
return None
candidates.sort(key=lambda t: t[0])
return candidates[0][1], candidates[0][0]
def resolve_mx_host(domain: str, timeout: int = 5) -> str | None:
"""
Resolve the lowest-preference MX for 'domain'.
Tries dnspython if available; falls back to 'dig' or 'nslookup'.
Returns MX host or None on failure.
"""
# Try dnspython if installed
try:
import dns.resolver # type: ignore
answers = dns.resolver.resolve(domain, "MX", lifetime=timeout)
records = sorted(
((r.preference, str(r.exchange).rstrip(".")) for r in answers),
key=lambda t: t[0],
)
if records and records[0][1] and records[0][1] != ".":
return records[0][1]
except Exception:
pass
# Try dig
try:
cp = subprocess.run(
["dig", "+short", "mx", domain],
capture_output=True,
text=True,
timeout=timeout,
check=False,
)
if cp.returncode == 0:
parsed = _parse_mx_text(cp.stdout)
if parsed:
host, _ = parsed
return host
except FileNotFoundError:
pass
except Exception:
pass
# Try nslookup
try:
cp = subprocess.run(
["nslookup", "-type=mx", domain],
capture_output=True,
text=True,
timeout=timeout,
check=False,
)
if cp.returncode == 0:
parsed = _parse_mx_text(cp.stdout)
if parsed:
host, _ = parsed
return host
except FileNotFoundError:
pass
except Exception:
pass
return None
def guess_from_addr() -> str:
return f"{getpass.getuser()}@{socket.getfqdn() or 'localhost'}"
def build_message(
from_addr: str,
to_addrs: list[str],
cc_addrs: list[str],
subject: str,
body: str,
file_paths: list[Path],
) -> EmailMessage:
msg = EmailMessage()
msg["From"] = from_addr
msg["To"] = ", ".join(to_addrs)
if cc_addrs:
msg["Cc"] = ", ".join(cc_addrs)
msg["Subject"] = subject
msg.set_content(body)
for p in file_paths:
if not p.is_file():
raise SystemExit(f"Not a file or does not exist: {p}")
data = p.read_bytes()
ctype, _ = mimetypes.guess_type(p.name)
if ctype is None:
maintype, subtype = "application", "octet-stream"
else:
maintype, subtype = ctype.split("/", 1)
msg.add_attachment(data, maintype=maintype, subtype=subtype, filename=p.name)
return msg
def configure_logging(args) -> logging.Logger:
# Determine level: explicit --log-level wins, else --debug, else --verbose, else WARNING
if args.log_level:
level = getattr(logging, args.log_level)
elif args.debug:
level = logging.DEBUG
elif args.verbose:
level = logging.INFO
else:
level = logging.INFO
logging.basicConfig(
level=level,
format="%(asctime)s %(levelname)s %(message)s",
)
logger = logging.getLogger("mailfiles")
# If --debug and no smtplib debug set, nudge it to 1
if args.debug and args.smtp_debug == 0:
args.smtp_debug = 1
logger.debug("--debug enabled: setting --smtp-debug=1")
return logger
def main() -> int:
ap = argparse.ArgumentParser(
description="Send one or more files as email attachments via SMTP."
)
ap.add_argument(
"--server",
required=False,
default=None,
help="SMTP server as host[:port]. If omitted, the MX of the first --to recipient is used on port 25.",
)
ap.add_argument(
"--mx",
action="store_true",
help=(
"Resolve MX for the first --to recipient and use it as the SMTP server (port 25). "
"Overrides --server if both are given."
),
)
ap.add_argument(
"--ssl",
action="store_true",
help="Use implicit SSL (e.g., port 465).",
)
ap.add_argument(
"--require-starttls",
action="store_true",
help="Require STARTTLS. Fail if the server does not support it.",
)
ap.add_argument(
"--no-starttls",
action="store_true",
help="Do not attempt STARTTLS (default is opportunistic STARTTLS when supported).",
)
ap.add_argument(
"--timeout",
type=int,
default=20,
help="SMTP socket timeout in seconds. Default: 20",
)
ap.add_argument(
"--from-addr",
dest="from_addr",
default=None,
help="From address. Default: <user>@<fqdn>",
)
ap.add_argument(
"--to",
action="append",
required=True,
help="Recipient (repeatable).",
)
ap.add_argument(
"--cc",
action="append",
default=[],
help="Optional Cc recipients (repeatable).",
)
ap.add_argument(
"--bcc",
action="append",
default=[],
help="Optional Bcc recipients (repeatable; not shown in headers).",
)
ap.add_argument(
"--subject",
default=None,
help="Subject line. Default: 'Files: <names>'",
)
ap.add_argument(
"--body",
default=None,
help="Plain-text body. Default: 'Attached N file(s).'",
)
ap.add_argument(
"--username",
default=os.environ.get("SMTP_USERNAME"),
help="SMTP username. Default: env SMTP_USERNAME.",
)
ap.add_argument(
"--password",
default=os.environ.get("SMTP_PASSWORD"),
help="SMTP password. Default: env SMTP_PASSWORD or prompted if --username set.",
)
ap.add_argument(
"--smtp-debug",
type=int,
default=0,
help="Set smtplib debug level (0,1,2). Default: 0",
)
# Logging controls
ap.add_argument(
"--log-level",
choices=["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG", "NOTSET"],
help="Logging level (default INFO; --verbose sets INFO; --debug sets DEBUG).",
)
ap.add_argument(
"--verbose",
action="store_true",
help="Enable INFO logging to stderr.",
)
ap.add_argument(
"--debug",
action="store_true",
help="Enable DEBUG logging; if --smtp-debug isn't set, it will be set to 1.",
)
ap.add_argument(
"files",
nargs="+",
metavar="FILE",
help="Files to attach. Use -- to separate options from files if needed.",
)
args = ap.parse_args()
logger = configure_logging(args)
# Decide SMTP target: --mx wins; otherwise MX if --server omitted; else explicit --server.
if args.mx or not args.server:
if not args.to:
ap.error("--mx or default-MX requires at least one --to recipient.")
_, addr = parseaddr(args.to[0])
if "@" not in addr:
ap.error(f"Cannot infer domain from recipient: {args.to[0]!r}")
domain = addr.rsplit("@", 1)[1]
logger.info("Resolving MX for %s", domain)
t0 = time.time()
mx_host = resolve_mx_host(domain, timeout=args.timeout)
dt = (time.time() - t0) * 1000
if not mx_host:
ap.error(
f"MX lookup failed for {domain}. Install 'dnspython' or ensure 'dig' or 'nslookup' is available, "
"or provide --server explicitly."
)
logger.info("Resolved MX %s in %.1f ms", mx_host, dt)
host, port = mx_host, 25
else:
host, port = parse_server(args.server)
logger.info("Using explicit SMTP server %s:%d", host, port)
from_addr = args.from_addr or guess_from_addr()
files = [Path(f) for f in args.files]
subj = args.subject or ("Files: " + ", ".join(p.name for p in files))
body = args.body or f"Attached {len(files)} file(s)."
logger.info("From: %s", from_addr)
logger.info("To: %s", ", ".join(args.to))
if args.cc:
logger.info("Cc: %s", ", ".join(args.cc))
if args.bcc:
logger.info("Bcc recipients: %d", len(args.bcc))
logger.info("Attachments: %s", ", ".join(p.name for p in files))
msg = build_message(
from_addr=from_addr,
to_addrs=args.to,
cc_addrs=args.cc,
subject=subj,
body=body,
file_paths=files,
)
all_rcpts = list({*args.to, *args.cc, *args.bcc})
if args.username and args.password is None:
# Prompt interactively if user supplied but pass not provided.
try:
args.password = getpass.getpass("SMTP password: ")
except KeyboardInterrupt:
return 130
try:
ctor = SMTP_SSL if args.ssl else SMTP
logger.info("Connecting to %s:%d (ssl=%s)", host, port, bool(args.ssl))
with ctor(host=host, port=port, timeout=args.timeout) as s:
s.set_debuglevel(args.smtp_debug)
s.ehlo()
# TLS behavior
attempt_starttls = (not args.no_starttls) and (not args.ssl)
require_starttls = args.require_starttls
if attempt_starttls:
tls_ctx = ssl.create_default_context()
if s.has_extn("starttls"):
try:
logger.info("STARTTLS advertised; negotiating TLS...")
s.starttls(context=tls_ctx)
s.ehlo() # refresh capabilities after TLS
logger.info("STARTTLS established")
except (SMTPNotSupportedError, SMTPException) as e:
logger.error("STARTTLS negotiation failed: %s", e)
if require_starttls:
logger.error("Aborting due to --require-starttls")
return 1
else:
if require_starttls:
logger.error("Server does not advertise STARTTLS; aborting.")
return 1
else:
logger.info("Server does not advertise STARTTLS; continuing without TLS")
if args.username:
logger.info("Authenticating as %s", args.username)
s.login(args.username, args.password or "")
logger.info("Sending message to %d recipient(s)", len(all_rcpts))
refused = s.send_message(msg, from_addr=from_addr, to_addrs=all_rcpts)
if refused:
# refused is a dict of {rcpt: (code, resp)}
logger.warning("Some recipients were refused: %s", refused)
# Non-zero exit to signal partial failure
return 2
logger.info("Message accepted for all recipients")
except SMTPException as e:
logging.exception("SMTP error")
print(f"SMTP error: {e}", file=sys.stderr)
return 1
except OSError as e:
logging.exception("Network/OS error")
print(f"Network or OS error: {e}", file=sys.stderr)
return 1
return 0
if __name__ == "__main__":
raise SystemExit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment