Created
January 2, 2025 10:29
-
-
Save biplob004/ab322b3b5c630d2e7e7958e2590f29c0 to your computer and use it in GitHub Desktop.
Gmail Autoreply using OPENAI ChatGPT Model
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # ---------------------------------------------------------- | |
| # You need to have a file named ".env" and environment variables should be defined there before running this code, | |
| # Alternatively you can edit this code and hadcode your api key and sender email id and password | |
| # Important Note: you need to create app password for your gmail and use it here instead of gmail login password. | |
| # ---------------------------------------------------------- | |
| import imaplib | |
| import email | |
| import smtplib | |
| from email.mime.text import MIMEText | |
| from email.utils import parsedate_to_datetime | |
| import os | |
| import time | |
| from dotenv import load_dotenv | |
| import re | |
| load_dotenv() | |
| from openai import OpenAI | |
| openai_client = OpenAI(api_key=os.getenv('OPENAI_API_KEY')) | |
| sender_email = os.getenv('GMAIL_ID') | |
| email_app_passwd = os.getenv('GMAIL_APP_PASSWORD') | |
| def send_email(subject, body, recipient): | |
| msg = MIMEText(body) | |
| msg['Subject'] = subject | |
| msg['From'] = sender_email | |
| msg['To'] = recipient | |
| with smtplib.SMTP_SSL('smtp.gmail.com', 465) as smtp_server: | |
| smtp_server.login(sender_email, email_app_passwd) | |
| smtp_server.sendmail(sender_email, recipient, msg.as_string()) | |
| def reply_email(messages): | |
| system_prompt = '''Acts as you are me, reply to message behalf of me, | |
| Output message should be only in plain text only. | |
| example output: | |
| Hello, I am fine, tell me about you.''' | |
| completion = openai_client.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=[{"role": "system", "content": system_prompt}] + messages | |
| ) | |
| return completion.choices[0].message.content | |
| def extract_msg_info(msg): | |
| splited_text = msg.split('wrote:') | |
| message = splited_text[-1].strip() | |
| if len(splited_text) != 2: | |
| return None, None, message | |
| email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b' | |
| datetime_pattern = r'\w{3},\s(?:\w+\s\d{1,2},\s\d{4}|\d{1,2}\s\w+\s\d{4})\sat\s\d{1,2}:\d{2}\s[AP]M' | |
| emails = re.findall(email_pattern, splited_text[0]) | |
| datetimes = re.findall(datetime_pattern, splited_text[0]) | |
| email = emails[0] if emails else None | |
| date_str = datetimes[0] if datetimes else None | |
| return email, date_str, message | |
| def fetch_new_emails(mail): | |
| """Fetch and print new (unseen) emails.""" | |
| mail.select('inbox') # Select the inbox folder | |
| result, data = mail.search(None, 'UNSEEN') # Search for unseen emails | |
| email_ids = data[0].split() | |
| for e_id in email_ids: | |
| # Fetch each email by ID | |
| result, msg_data = mail.fetch(e_id, '(RFC822)') | |
| raw_email = msg_data[0][1] | |
| # Parse the raw email into a message object | |
| msg = email.message_from_bytes(raw_email) | |
| # Print basic email details | |
| subject_str = msg['subject'] | |
| from_email = msg['from'] | |
| # print("To:", msg['to']) | |
| # Parse the received date and time | |
| received_date = None | |
| for header in msg.items(): | |
| if header[0].lower() == 'received': | |
| date_string = header[1].split('\n')[-1].strip() | |
| received_date = parsedate_to_datetime(date_string) | |
| # print('Received:', received_date) | |
| break | |
| # Extract and print the email body | |
| body = "" | |
| if msg.is_multipart(): | |
| for part in msg.walk(): | |
| if part.get_content_type() == "text/plain": | |
| body = part.get_payload(decode=True).decode() | |
| break | |
| else: | |
| body = msg.get_payload(decode=True).decode() | |
| # Remove >>> characters at the start of lines | |
| clean_text = re.sub(r'^(\s*)>+[\s]*', r'\1', body, flags=re.MULTILINE) | |
| message_history = [] | |
| for msg in clean_text.split('\nOn '): | |
| if not msg.strip(): | |
| continue | |
| email_str, date_str, message = extract_msg_info(msg) | |
| msg_role = "assistant" if email_str == sender_email else "user" | |
| message_history.append({"role":msg_role, "content": f'Subject: {subject_str} \nMessage:{message}'}) | |
| reply_email_str = reply_email(message_history[::-1]) | |
| send_email(subject_str, reply_email_str, from_email) | |
| print('msg sent:', reply_email_str ) | |
| def main(): | |
| # Connect to the Gmail IMAP server | |
| mail = imaplib.IMAP4_SSL('imap.gmail.com') | |
| mail.login(sender_email, email_app_passwd) # Log in to your account | |
| try: | |
| print("Listening for new emails...") | |
| while True: | |
| fetch_new_emails(mail) # Check for new emails | |
| time.sleep(10) # Wait for 10 seconds before checking again | |
| except KeyboardInterrupt: | |
| print("Stopped by user.") | |
| finally: | |
| mail.logout() # Logout from the server | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment