Skip to content

Instantly share code, notes, and snippets.

@rohit-32
Created February 2, 2026 09:18
Show Gist options
  • Select an option

  • Save rohit-32/4fe2862fb115432e23902cba686381fc to your computer and use it in GitHub Desktop.

Select an option

Save rohit-32/4fe2862fb115432e23902cba686381fc to your computer and use it in GitHub Desktop.
Smtp Server to Push Emails in NATS
package main
import (
"bytes"
"context"
"crypto/rand"
"database/sql"
"encoding/base64"
"errors"
"fmt"
"io"
"log"
"mime"
"mime/multipart"
"net"
"net/mail"
"os"
"strings"
"time"
"github.com/emersion/go-smtp"
"shared/database"
"shared/database/gen"
"shared/nats"
)
type SMTPServer struct {
natsClient *nats.NATSClient
domain string
dbContainer *database.DBContainer
}
func (s *SMTPServer) NewSession(c *smtp.Conn) (smtp.Session, error) {
return &Session{
server: s,
}, nil
}
type Session struct {
server *SMTPServer
from string
to []string
}
func (s *Session) AuthPlain(username, password string) error {
return nil
}
func (s *Session) Mail(from string, opts *smtp.MailOptions) error {
s.from = from
return nil
}
func (s *Session) Rcpt(to string, opts *smtp.RcptOptions) error {
sanitizedAddress := sanitizeEmailAddress(to)
if sanitizedAddress == "" {
return fmt.Errorf("invalid recipient address: %s", to)
}
if err := s.server.validateRecipientDomain(sanitizedAddress); err != nil {
return err
}
s.to = append(s.to, sanitizedAddress)
return nil
}
func (s *Session) Data(r io.Reader) error {
var buf bytes.Buffer
tee := io.TeeReader(r, &buf)
emailMessage, err := s.parseEmailContent(tee)
if err != nil {
emailMessage = s.fallbackParse(string(buf.Bytes()))
}
for _, recipient := range s.to {
event := nats.CreateEmailReceivedEvent(recipient, emailMessage)
if err := s.server.natsClient.PublishEmailReceived(event); err != nil {
log.Printf("[ERROR] Failed to publish email received event: %v", err)
}
}
return nil
}
func (s *Session) Reset() {
s.from = ""
s.to = nil
}
func (s *Session) Logout() error {
return nil
}
func (s *SMTPServer) validateRecipientDomain(address string) error {
atIndex := strings.LastIndex(address, "@")
domainPart := strings.ToLower(strings.TrimSpace(address[atIndex+1:]))
if strings.EqualFold(domainPart, s.domain) {
return nil
}
userID, err := s.lookupActiveEmailOwner(address)
if err != nil {
return err
}
if userID == "" {
return fmt.Errorf("invalid recipient domain: %s", address)
}
allowed, err := s.isDomainConnectedForUser(userID, domainPart)
if err != nil {
return err
}
if !allowed {
return fmt.Errorf("invalid recipient domain: %s", address)
}
return nil
}
func (s *SMTPServer) lookupActiveEmailOwner(address string) (string, error) {
email, err := s.dbContainer.Queries.GetTempEmailByAddress(context.Background(), address)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return "", nil
}
return "", err
}
if !email.IsActive {
return "", nil
}
return email.UserID, nil
}
func (s *SMTPServer) isDomainConnectedForUser(userID, domain string) (bool, error) {
cd, err := s.dbContainer.Queries.GetConnectedDomainByUserIDAndDomain(context.Background(), gen.GetConnectedDomainByUserIDAndDomainParams{
UserID: userID,
Domain: domain,
})
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return false, nil
}
return false, err
}
return cd.Status == "verified", nil
}
func sanitizeEmailAddress(address string) string {
trimmed := strings.TrimSpace(address)
if parsed, err := mail.ParseAddress(trimmed); err == nil {
trimmed = parsed.Address
}
return strings.ToLower(strings.TrimSpace(trimmed))
}
func (s *Session) parseEmailContent(r io.Reader) (nats.EmailMessage, error) {
msg, err := mail.ReadMessage(r)
if err != nil {
return nats.EmailMessage{}, err
}
message := nats.EmailMessage{
ID: generateMessageID(),
From: s.from,
To: s.to,
ReceivedAt: time.Now(),
Subject: msg.Header.Get("Subject"),
}
_ = parseBodyAndAttachments(&message, msg.Header, msg.Body)
return message, nil
}
func parseBodyAndAttachments(message *nats.EmailMessage, header mail.Header, body io.Reader) error {
mediaType, params, err := mime.ParseMediaType(header.Get("Content-Type"))
if err != nil {
bodyBytes, _ := io.ReadAll(body)
message.Body = string(bodyBytes)
return nil
}
if strings.HasPrefix(mediaType, "multipart/") {
mr := multipart.NewReader(body, params["boundary"])
for {
p, err := mr.NextPart()
if err == io.EOF {
break
}
if err != nil {
return err
}
if strings.HasPrefix(p.Header.Get("Content-Type"), "multipart/") {
_ = parseBodyAndAttachments(message, mail.Header(p.Header), p)
continue
}
partMediaType, partParams, _ := mime.ParseMediaType(p.Header.Get("Content-Type"))
contentDisposition := p.Header.Get("Content-Disposition")
if strings.HasPrefix(strings.ToLower(contentDisposition), "attachment") || (partParams != nil && partParams["name"] != "") {
transferEncoding := p.Header.Get("Content-Transfer-Encoding")
bodyBytes, _ := io.ReadAll(p)
var processedBytes []byte
if strings.ToLower(transferEncoding) == "base64" {
cleanBase64 := strings.ReplaceAll(string(bodyBytes), "\r", "")
cleanBase64 = strings.ReplaceAll(cleanBase64, "\n", "")
cleanBase64 = strings.ReplaceAll(cleanBase64, " ", "")
processedBytes, _ = base64.StdEncoding.DecodeString(cleanBase64)
} else {
processedBytes = bodyBytes
}
filename := getAttachmentFilename(p.Header, partParams)
message.Attachments = append(message.Attachments, nats.Attachment{
Filename: filename,
ContentType: partMediaType,
Size: int64(len(processedBytes)),
Data: processedBytes,
})
} else if partMediaType == "text/plain" {
bodyBytes, _ := io.ReadAll(p)
if message.Body == "" {
message.Body = string(bodyBytes)
}
} else if partMediaType == "text/html" {
bodyBytes, _ := io.ReadAll(p)
if message.HTMLBody == "" {
message.HTMLBody = string(bodyBytes)
}
}
}
} else {
bodyBytes, _ := io.ReadAll(body)
if mediaType == "text/html" {
message.HTMLBody = string(bodyBytes)
} else {
message.Body = string(bodyBytes)
}
}
return nil
}
func getAttachmentFilename(headers map[string][]string, params map[string]string) string {
if contentDisposition := headers["Content-Disposition"]; len(contentDisposition) > 0 {
disposition := contentDisposition[0]
if strings.Contains(disposition, "filename=") {
filenamePart := strings.Split(disposition, "filename=")[1]
filenamePart = strings.Trim(filenamePart, ` "`)
if strings.Contains(filenamePart, ";") {
filenamePart = strings.Split(filenamePart, ";")[0]
}
return strings.Trim(filenamePart, `"`)
}
}
if params != nil && params["name"] != "" {
return params["name"]
}
return "attachment.dat"
}
func (s *Session) fallbackParse(rawEmail string) nats.EmailMessage {
lines := strings.Split(rawEmail, "\n")
message := nats.EmailMessage{
ID: generateMessageID(),
From: s.from,
To: s.to,
ReceivedAt: time.Now(),
}
inHeaders := true
bodyLines := []string{}
for _, line := range lines {
line = strings.TrimRight(line, "\r")
if inHeaders {
if line == "" {
inHeaders = false
continue
}
if strings.HasPrefix(strings.ToLower(line), "subject:") {
message.Subject = strings.TrimSpace(line[8:])
}
} else {
bodyLines = append(bodyLines, line)
}
}
message.Body = strings.Join(bodyLines, "\n")
if strings.Contains(message.Body, "<html") || strings.Contains(message.Body, "<HTML") {
message.HTMLBody = message.Body
}
return message
}
func generateMessageID() string {
b := make([]byte, 16)
rand.Read(b)
return fmt.Sprintf("%x-%x-%x-%x-%x", b[0:4], b[4:6], b[6:8], b[8:10], b[10:16])
}
func main() {
smtpDomain := os.Getenv("SMTP_DOMAIN")
if smtpDomain == "" {
smtpDomain = "tempmail.local"
}
smtpPort := os.Getenv("SMTP_PORT")
if smtpPort == "" {
smtpPort = "25"
}
natsURL := os.Getenv("NATS_URL")
if natsURL == "" {
natsURL = "nats://localhost:4222"
}
dbContainer, err := database.NewDBContainer()
if err != nil {
log.Fatalf("[FATAL] Failed to initialize database: %v", err)
}
natsClient, err := nats.NewNATSClient(natsURL)
if err != nil {
log.Fatalf("[FATAL] Failed to connect to NATS: %v", err)
}
defer natsClient.Close()
smtpServer := &SMTPServer{
natsClient: natsClient,
domain: strings.ToLower(smtpDomain),
dbContainer: dbContainer,
}
server := smtp.NewServer(smtpServer)
server.Addr = ":" + smtpPort
server.Domain = smtpDomain
server.ReadTimeout = 10 * time.Second
server.WriteTimeout = 10 * time.Second
server.MaxMessageBytes = 30 * 1024 * 1024
server.MaxRecipients = 50
server.AllowInsecureAuth = true
log.Printf("[INFO] SMTP server starting on port %s", smtpPort)
listener, err := net.Listen("tcp", server.Addr)
if err != nil {
log.Fatalf("[FATAL] Failed to start SMTP server: %v", err)
}
if err := server.Serve(listener); err != nil {
log.Fatalf("[FATAL] SMTP server error: %v", err)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment