Skip to content

Instantly share code, notes, and snippets.

@biplob004
Created January 2, 2025 10:29
Show Gist options
  • Select an option

  • Save biplob004/ab322b3b5c630d2e7e7958e2590f29c0 to your computer and use it in GitHub Desktop.

Select an option

Save biplob004/ab322b3b5c630d2e7e7958e2590f29c0 to your computer and use it in GitHub Desktop.
Gmail Autoreply using OPENAI ChatGPT Model
# ----------------------------------------------------------
# You need to have a file named ".env" and environment variables should be defined there before running this code,
# Alternatively you can edit this code and hadcode your api key and sender email id and password
# Important Note: you need to create app password for your gmail and use it here instead of gmail login password.
# ----------------------------------------------------------
import imaplib
import email
import smtplib
from email.mime.text import MIMEText
from email.utils import parsedate_to_datetime
import os
import time
from dotenv import load_dotenv
import re
load_dotenv()
from openai import OpenAI
openai_client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'))
sender_email = os.getenv('GMAIL_ID')
email_app_passwd = os.getenv('GMAIL_APP_PASSWORD')
def send_email(subject, body, recipient):
msg = MIMEText(body)
msg['Subject'] = subject
msg['From'] = sender_email
msg['To'] = recipient
with smtplib.SMTP_SSL('smtp.gmail.com', 465) as smtp_server:
smtp_server.login(sender_email, email_app_passwd)
smtp_server.sendmail(sender_email, recipient, msg.as_string())
def reply_email(messages):
system_prompt = '''Acts as you are me, reply to message behalf of me,
Output message should be only in plain text only.
example output:
Hello, I am fine, tell me about you.'''
completion = openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "system", "content": system_prompt}] + messages
)
return completion.choices[0].message.content
def extract_msg_info(msg):
splited_text = msg.split('wrote:')
message = splited_text[-1].strip()
if len(splited_text) != 2:
return None, None, message
email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
datetime_pattern = r'\w{3},\s(?:\w+\s\d{1,2},\s\d{4}|\d{1,2}\s\w+\s\d{4})\sat\s\d{1,2}:\d{2}\s[AP]M'
emails = re.findall(email_pattern, splited_text[0])
datetimes = re.findall(datetime_pattern, splited_text[0])
email = emails[0] if emails else None
date_str = datetimes[0] if datetimes else None
return email, date_str, message
def fetch_new_emails(mail):
"""Fetch and print new (unseen) emails."""
mail.select('inbox') # Select the inbox folder
result, data = mail.search(None, 'UNSEEN') # Search for unseen emails
email_ids = data[0].split()
for e_id in email_ids:
# Fetch each email by ID
result, msg_data = mail.fetch(e_id, '(RFC822)')
raw_email = msg_data[0][1]
# Parse the raw email into a message object
msg = email.message_from_bytes(raw_email)
# Print basic email details
subject_str = msg['subject']
from_email = msg['from']
# print("To:", msg['to'])
# Parse the received date and time
received_date = None
for header in msg.items():
if header[0].lower() == 'received':
date_string = header[1].split('\n')[-1].strip()
received_date = parsedate_to_datetime(date_string)
# print('Received:', received_date)
break
# Extract and print the email body
body = ""
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type() == "text/plain":
body = part.get_payload(decode=True).decode()
break
else:
body = msg.get_payload(decode=True).decode()
# Remove >>> characters at the start of lines
clean_text = re.sub(r'^(\s*)>+[\s]*', r'\1', body, flags=re.MULTILINE)
message_history = []
for msg in clean_text.split('\nOn '):
if not msg.strip():
continue
email_str, date_str, message = extract_msg_info(msg)
msg_role = "assistant" if email_str == sender_email else "user"
message_history.append({"role":msg_role, "content": f'Subject: {subject_str} \nMessage:{message}'})
reply_email_str = reply_email(message_history[::-1])
send_email(subject_str, reply_email_str, from_email)
print('msg sent:', reply_email_str )
def main():
# Connect to the Gmail IMAP server
mail = imaplib.IMAP4_SSL('imap.gmail.com')
mail.login(sender_email, email_app_passwd) # Log in to your account
try:
print("Listening for new emails...")
while True:
fetch_new_emails(mail) # Check for new emails
time.sleep(10) # Wait for 10 seconds before checking again
except KeyboardInterrupt:
print("Stopped by user.")
finally:
mail.logout() # Logout from the server
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment