177 lines
7.1 KiB
Python
177 lines
7.1 KiB
Python
#!/usr/bin/env python3
|
|
|
|
|
|
from email.mime.multipart import MIMEMultipart
|
|
from email.mime.text import MIMEText
|
|
from email.mime.base import MIMEBase
|
|
from email import encoders
|
|
from os import path
|
|
from smtplib import SMTP
|
|
from socket import error, create_connection, close
|
|
from time import sleep
|
|
|
|
|
|
class Mail:
|
|
"""
|
|
Sending email from Python
|
|
"""
|
|
def __init__(self, smtp_user: str, smtp_pass: str, mail_dest: str,
|
|
smtp_host: str = 'smtp.gmail.com', smtp_port: str = '587', smtp_stls: bool = True,
|
|
mail_from: str = None, mail_subj: str = 'no subject',
|
|
mail_text: str = 'no text', mail_type: str = 'plain',
|
|
mail_file: str = None) -> None:
|
|
"""
|
|
Object constructor
|
|
:param smtp_user: smtp valid user
|
|
:param smtp_pass: smtp valid password
|
|
:param mail_dest: destination addresses
|
|
:param smtp_host: smtp hostname or ip address
|
|
:param smtp_port: smtp port number
|
|
:param smtp_stls: smtp required tls
|
|
:param mail_from: mail from alias
|
|
:param mail_subj: mail subject
|
|
:param mail_text: mail body text
|
|
:param mail_type: mail body type: plain, html
|
|
:param mail_file: mail attachment files
|
|
"""
|
|
self._smtpUser = self._getText(smtp_user)
|
|
self._smtpPass = self._getText(smtp_pass)
|
|
self._mailList = list(filter(None, ''.join(self._getText(mail_dest).split()).split(',')))
|
|
self._smtpHost = self._getText(smtp_host)
|
|
self._smtpPort = self._getText(smtp_port)
|
|
self._smtpStls = smtp_stls
|
|
self._mailSubj = self._getText(mail_subj)
|
|
self._mailText = self._getText(mail_text)
|
|
self._mailType = self._getText(mail_type)
|
|
if mail_from:
|
|
self._mailFrom = self._getText(mail_from)
|
|
else:
|
|
self._mailFrom = self._smtpUser
|
|
if mail_file:
|
|
self._fileList = list(filter(None, ''.join(mail_file.split()).split(',')))
|
|
else:
|
|
self._fileList = None
|
|
self._session = None
|
|
self._mailData = MIMEMultipart()
|
|
self._mailData['From'] = self._mailFrom
|
|
self._mailData['Subject'] = self._mailSubj
|
|
self._mailData.attach(MIMEText(self._mailText, self._mailType))
|
|
if self._fileList:
|
|
for file in self._fileList:
|
|
if path.exists(file):
|
|
attachment = open(file, "rb")
|
|
content = MIMEBase('application', 'octet-stream')
|
|
content.set_payload(attachment.read())
|
|
encoders.encode_base64(content)
|
|
content.add_header("Content-Disposition", f"attachment; filename= {path.basename(file)}")
|
|
self._mailData.attach(content)
|
|
|
|
def send(self) -> str:
|
|
"""
|
|
Sending the generated email to the destination list
|
|
:return: delivery report string
|
|
"""
|
|
report = ''
|
|
try:
|
|
self._session = SMTP(self._smtpHost, int(self._smtpPort))
|
|
if self._smtpStls:
|
|
self._session.starttls()
|
|
self._session.login(self._smtpUser, self._smtpPass)
|
|
for mailTo in self._mailList:
|
|
del self._mailData['To']
|
|
self._mailData['To'] = mailTo
|
|
self._session.sendmail(self._mailFrom, mailTo, self._mailData.as_string())
|
|
report += '"' + self._mailData['Subject'] + '"' + ' send to ' + self._mailData['To'] + '\n'
|
|
self._session.quit()
|
|
except error as err:
|
|
report += "Socket Error: %s" % err
|
|
finally:
|
|
return report
|
|
|
|
@classmethod
|
|
def _getText(cls, string: str) -> str:
|
|
"""
|
|
If text is a file - read text from this file
|
|
:param string: string of text or file path
|
|
:return: string of input text or file content
|
|
"""
|
|
if path.exists(string):
|
|
return open(string).read()
|
|
else:
|
|
return string
|
|
|
|
|
|
def str2bool(value: str) -> bool:
|
|
"""
|
|
Converts a string value to boolean
|
|
:param value: string containing "true" or "false", "yes" or "no", "1" or "0"
|
|
:return: bool True or False
|
|
"""
|
|
return str(value).lower() in ("true", "yes", "1")
|
|
|
|
|
|
def isConnected(sock_host: str, sock_port: str) -> bool:
|
|
"""
|
|
Checking the connection with the host.
|
|
:param sock_host: hostname or ip address
|
|
:param sock_port: port number
|
|
:return: bool True or False
|
|
"""
|
|
try:
|
|
sock = create_connection((sock_host, int(sock_port)))
|
|
if sock is not None:
|
|
sock.close
|
|
return True
|
|
except OSError:
|
|
pass
|
|
return False
|
|
|
|
|
|
if __name__ == "__main__":
|
|
from argparse import ArgumentParser
|
|
|
|
args = ArgumentParser(
|
|
prog='Mail',
|
|
description='Sending email from Python',
|
|
epilog='Dependencies: Python 3 (tested version 3.9.5)'
|
|
)
|
|
args.add_argument('-u', '--user', type=str, required=True,
|
|
help='smtp valid user')
|
|
args.add_argument('-p', '--pass', type=str, required=True,
|
|
help='smtp valid password')
|
|
args.add_argument('-d', '--dest', type=str, required=True,
|
|
help='destination addresses')
|
|
args.add_argument('--smtp', type=str, default='smtp.gmail.com', required=False,
|
|
help='smtp hostname or ip address (default: smtp.gmail.com)')
|
|
args.add_argument('--port', type=str, default='587', required=False,
|
|
help='smtp port number (default: 587)')
|
|
args.add_argument('--stls', type=str2bool, default=True, required=False,
|
|
help='smtp required tls (default: True)')
|
|
args.add_argument('--from', type=str, default=None, required=False,
|
|
help='mail from alias (default: <user@smtp>)')
|
|
args.add_argument('--subj', type=str, default='no subject', required=False,
|
|
help='mail subject (default: "no subject")')
|
|
args.add_argument('--text', type=str, default='no text', required=False,
|
|
help='mail body text (default: "no text")')
|
|
args.add_argument('--type', type=str, default='plain', required=False,
|
|
help='mail body type: plain, html (default: plain)')
|
|
args.add_argument('--file', type=str, default=None, required=False,
|
|
help='mail attachment files (default: None)')
|
|
args.add_argument('--time', type=str, default='3', required=False,
|
|
help='minutes of attempts to send (default: 3)')
|
|
args = vars(args.parse_args())
|
|
|
|
attempts_pass, attempts_wait = 0, 60
|
|
while attempts_pass <= int(args['time']):
|
|
if not isConnected(sock_host=args['smtp'], sock_port=args['port']):
|
|
attempts_pass = attempts_pass + 1
|
|
sleep(attempts_wait)
|
|
else:
|
|
print(Mail(smtp_user=args['user'], smtp_pass=args['pass'], mail_dest=args['dest'],
|
|
smtp_host=args['smtp'], smtp_port=args['port'], smtp_stls=args['stls'],
|
|
mail_from=args['from'], mail_subj=args['subj'], mail_text=args['text'],
|
|
mail_type=args['type'], mail_file=args['file']
|
|
).send()
|
|
)
|
|
break
|