seitime-frappe/py/webnotes/utils/email_lib/send.py
Rushabh Mehta f253fa8e0b version 2
2011-09-07 16:44:57 +05:30

293 lines
7.8 KiB
Python

"""
Sends email via outgoing server specified in "Control Panel"
Allows easy adding of Attachments of "File" objects
"""
import webnotes
import webnotes.defs
from webnotes import msgprint
import email
class EMail:
"""
Wrapper on the email module. Email object represents emails to be sent to the client.
Also provides a clean way to add binary `FileData` attachments
Also sets all messages as multipart/alternative for cleaner reading in text-only clients
"""
def __init__(self, sender='', recipients=[], subject='', from_defs=0, alternative=0, reply_to=None):
from email.mime.multipart import MIMEMultipart
if type(recipients)==str:
recipients = recipients.replace(';', ',')
recipients = recipients.split(',')
self.from_defs = from_defs
self.sender = sender
self.reply_to = reply_to or sender
self.recipients = recipients
self.subject = subject
self.msg_root = MIMEMultipart('mixed')
self.msg_multipart = MIMEMultipart('alternative')
self.msg_root.attach(self.msg_multipart)
self.cc = []
def set_text(self, message):
"""
Attach message in the text portion of multipart/alternative
"""
from email.mime.text import MIMEText
part = MIMEText(message, 'plain')
self.msg_multipart.attach(part)
def set_html(self, message):
"""
Attach message in the html portion of multipart/alternative
"""
from email.mime.text import MIMEText
part = MIMEText(message, 'html')
self.msg_multipart.attach(part)
def set_message(self, message, mime_type='text/html', as_attachment=0, filename='attachment.html'):
"""
Append the message with MIME content to the root node (as attachment)
"""
from email.mime.text import MIMEText
maintype, subtype = mime_type.split('/')
part = MIMEText(message, _subtype = subtype)
if as_attachment:
part.add_header('Content-Disposition', 'attachment', filename=filename)
self.msg_root.attach(part)
def attach_file(self, n):
"""
attach a file from the `FileData` table
"""
from webnotes.utils.file_manager import get_file
res = get_file(n)
if not res:
return
self.add_attachment(res[0], res[1])
def add_attachment(self, fname, fcontent, content_type=None):
from email.mime.audio import MIMEAudio
from email.mime.base import MIMEBase
from email.mime.image import MIMEImage
from email.mime.text import MIMEText
import mimetypes
if not content_type:
content_type, encoding = mimetypes.guess_type(fname)
if content_type is None:
# No guess could be made, or the file is encoded (compressed), so
# use a generic bag-of-bits type.
content_type = 'application/octet-stream'
maintype, subtype = content_type.split('/', 1)
if maintype == 'text':
# Note: we should handle calculating the charset
part = MIMEText(fcontent, _subtype=subtype)
elif maintype == 'image':
part = MIMEImage(fcontent, _subtype=subtype)
elif maintype == 'audio':
part = MIMEAudio(fcontent, _subtype=subtype)
else:
part = MIMEBase(maintype, subtype)
part.set_payload(fcontent)
# Encode the payload using Base64
from email import encoders
encoders.encode_base64(part)
# Set the filename parameter
if fname:
part.add_header('Content-Disposition', 'attachment', filename=fname)
self.msg_root.attach(part)
def validate(self):
"""
validate the email ids
"""
if not self.sender:
self.sender = webnotes.conn.get_value('Control Panel',None,'auto_email_id')
from webnotes.utils import validate_email_add
# validate ids
if self.sender and (not validate_email_add(self.sender)):
webnotes.msgprint("%s is not a valid email id" % self.sender, raise_exception = 1)
if self.reply_to and (not validate_email_add(self.reply_to)):
webnotes.msgprint("%s is not a valid email id" % self.reply_to, raise_exception = 1)
for e in self.recipients:
if not validate_email_add(e):
webnotes.msgprint("%s is not a valid email id" % e, raise_exception = 1)
def setup(self):
"""
setup the SMTP (outgoing) server from `Control Panel` or defs.py
"""
if self.from_defs:
self.server = getattr(webnotes.defs,'mail_server','')
self.login = getattr(webnotes.defs,'mail_login','')
self.port = getattr(webnotes.defs,'mail_port',None)
self.password = getattr(webnotes.defs,'mail_password','')
self.use_ssl = getattr(webnotes.defs,'use_ssl',0)
else:
import webnotes.model.doc
from webnotes.utils import cint
# get defaults from control panel
cp = webnotes.model.doc.Document('Control Panel','Control Panel')
self.server = cp.outgoing_mail_server or getattr(webnotes.defs,'mail_server','')
self.login = cp.mail_login or getattr(webnotes.defs,'mail_login','')
self.port = cp.mail_port or getattr(webnotes.defs,'mail_port',None)
self.password = cp.mail_password or getattr(webnotes.defs,'mail_password','')
self.use_ssl = cint(cp.use_ssl)
def make_msg(self):
self.msg_root['Subject'] = self.subject
self.msg_root['From'] = self.sender
self.msg_root['To'] = ', '.join([r.strip() for r in self.recipients])
if self.reply_to and self.reply_to != self.sender:
self.msg_root['Reply-To'] = self.reply_to
if self.cc:
self.msg_root['CC'] = ', '.join([r.strip() for r in self.cc])
def add_to_queue(self):
# write to a file called "email_queue" or as specified in email
q = EmailQueue()
q.push({
'server': self.server,
'port': self.port,
'use_ssl': self.use_ssl,
'login': self.login,
'password': self.password,
'sender': self.sender,
'recipients': self.recipients,
'msg': self.msg_root.as_string()
})
q.close()
def send(self, send_now = 0):
"""
send the message
"""
from webnotes.utils import cint
self.setup()
self.validate()
self.make_msg()
if (not send_now) and getattr(webnotes.defs, 'batch_emails', 0):
self.add_to_queue()
return
import smtplib
sess = smtplib.SMTP(self.server, self.port or None)
if self.use_ssl:
sess.ehlo()
sess.starttls()
sess.ehlo()
ret = sess.login(self.login, self.password)
# check if logged correctly
if ret[0]!=235:
msgprint(ret[1])
raise Exception
sess.sendmail(self.sender, self.recipients, self.msg_root.as_string())
try:
sess.quit()
except:
pass
# ===========================================
# Email Queue
# Maintains a list of emails in a file
# Flushes them when called from cron
# Defs settings:
# email_queue: (filename) [default: email_queue.py]
#
# From the scheduler, call: flush(qty)
# ===========================================
class EmailQueue:
def __init__(self):
self.server = self.login = self.sess = None
self.filename = getattr(webnotes.defs, 'email_queue', 'email_queue.py')
try:
f = open(self.filename, 'r')
self.queue = eval(f.read() or '[]')
f.close()
except IOError, e:
if e.args[0]==2:
self.queue = []
else:
raise e
def push(self, email):
self.queue.append(email)
def close(self):
f = open(self.filename, 'w')
f.write(str(self.queue))
f.close()
def get_smtp_session(self, e):
if self.server==e['server'] and self.login==e['login'] and self.sess:
return self.sess
webnotes.msgprint('getting server')
import smtplib
sess = smtplib.SMTP(e['server'], e['port'] or None)
if self.use_ssl:
sess.ehlo()
sess.starttls()
sess.ehlo()
ret = sess.login(e['login'], e['password'])
# check if logged correctly
if ret[0]!=235:
webnotes.msgprint(ret[1])
raise Exception
self.sess = sess
self.server, self.login = e['server'], e['login']
return sess
def flush(self, qty = 100):
f = open(self.filename, 'r')
self.queue = eval(f.read() or '[]')
if len(self.queue) < 100:
qty = len(self.queue)
for i in range(qty):
e = self.queue[i]
sess = self.get_smtp_session(e)
sess.sendmail(e['sender'], e['recipients'], e['msg'])
self.queue = self.queue[:(len(self.queue) - qty)]
self.close()