554 lines
17 KiB
Python
Executable File
554 lines
17 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
#-*- coding: utf-8 -*-
|
|
# psr/imap.py
|
|
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
|
|
#
|
|
# software : Kirmah <http://kirmah.sourceforge.net/>
|
|
# version : 2.18
|
|
# date : 2014
|
|
# licence : GPLv3.0 <http://www.gnu.org/licenses/>
|
|
# author : a-Sansara <[a-sansara]at[clochardprod]dot[net]>
|
|
# copyright : pluie.org <http://www.pluie.org/>
|
|
#
|
|
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
|
|
#
|
|
# This file is part of Kirmah.
|
|
#
|
|
# Kirmah is free software (free as in speech) : you can redistribute it
|
|
# and/or modify it under the terms of the GNU General Public License as
|
|
# published by the Free Software Foundation, either version 3 of the License,
|
|
# or (at your option) any later version.
|
|
#
|
|
# Kirmah is distributed in the hope that it will be useful, but WITHOUT
|
|
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
|
|
# more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with Kirmah. If not, see <http://www.gnu.org/licenses/>.
|
|
#
|
|
|
|
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
# ~~ module imap ~~
|
|
|
|
from imaplib import Commands, IMAP4_SSL, Time2Internaldate
|
|
from binascii import b2a_base64, a2b_base64
|
|
from codecs import register, StreamReader, StreamWriter
|
|
from email import message_from_bytes
|
|
from email.header import decode_header
|
|
from email.message import Message
|
|
from re import search as research, split as resplit
|
|
from multiprocessing import Process
|
|
from psr.sys import Io, Sys, Const
|
|
from psr.log import Log
|
|
|
|
|
|
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
# ~~ ImapUtf7 decoding/encoding ~~
|
|
|
|
def _seq_encode(seq,l):
|
|
""""""
|
|
if len(seq) > 0 :
|
|
l.append('&%s-' % str(b2a_base64(bytes(''.join(seq),'utf-16be')),'utf-8').rstrip('\n=').replace('/', ','))
|
|
elif l:
|
|
l.append('-')
|
|
|
|
|
|
def _seq_decode(seq,l):
|
|
""""""
|
|
d = ''.join(seq[1:])
|
|
pad = 4-(len(d)%4)
|
|
l.append(str(a2b_base64(bytes(d.replace(',', '/')+pad*'=','utf-16be')),'utf-16be'))
|
|
|
|
|
|
def encode(s):
|
|
""""""
|
|
l, e, = [], []
|
|
for c in s :
|
|
if ord(c) in range(0x20,0x7e):
|
|
if e : _seq_encode(e,l)
|
|
e = []
|
|
l.append(c)
|
|
if c == '&' : l.append('-')
|
|
else :
|
|
e.append(c)
|
|
if e : _seq_encode(e,l)
|
|
return ''.join(l)
|
|
|
|
|
|
def decode(s):
|
|
""""""
|
|
l, d = [], []
|
|
for c in s:
|
|
if c == '&' and not d : d.append('&')
|
|
elif c == '-' and d:
|
|
if len(d) == 1: l.append('&')
|
|
else : _seq_decode(d,l)
|
|
d = []
|
|
elif d: d.append(c)
|
|
else: l.append(c)
|
|
if d: _seq_decode(d,l)
|
|
return ''.join(l)
|
|
|
|
|
|
def _encoder(s):
|
|
""""""
|
|
e = bytes(encode(s),'utf-8')
|
|
return e, len(e)
|
|
|
|
|
|
def _decoder(s):
|
|
""""""
|
|
d = decode(str(s,'utf-8'))
|
|
return d, len(d)
|
|
|
|
|
|
def _codec_imap4utf7(name):
|
|
""""""
|
|
if name == 'imap4-utf-7':
|
|
return (_encoder, _decoder, Imap4Utf7StreamReader, Imap4Utf7StreamWriter)
|
|
|
|
|
|
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
# ~~ StreamReader & StreamWriter ~~
|
|
|
|
class Imap4Utf7StreamReader(StreamReader):
|
|
def decode(self, s, errors='strict'): return _decoder(s)
|
|
|
|
class Imap4Utf7StreamWriter(StreamWriter):
|
|
def decode(self, s, errors='strict'): return _encoder(s)
|
|
|
|
|
|
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
# ~~ registering codec ~~
|
|
|
|
register(_codec_imap4utf7)
|
|
|
|
|
|
|
|
|
|
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
# ~~ Imap utilities ~~
|
|
|
|
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
# ~~ class ImapConfig ~~
|
|
|
|
class ImapConfig:
|
|
""""""
|
|
|
|
def __init__(self, host, user, pwd, port='993'):
|
|
""""""
|
|
self.host = host
|
|
self.user = user
|
|
self.pwd = pwd
|
|
self.port = str(port)
|
|
|
|
|
|
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
# ~~ class ImapClient ~~
|
|
|
|
class ImapClient(IMAP4_SSL):
|
|
""""""
|
|
|
|
Commands['XLIST'] = ('AUTH', 'SELECTED')
|
|
|
|
@Log(Const.LOG_DEBUG)
|
|
def xlist(self, directory='""', pattern='*'):
|
|
"""(X)List mailbox names in directory matching pattern. Using Google's XLIST extension
|
|
|
|
(status, [data]) = <instance>.xlist(directory='""', pattern='*')
|
|
|
|
'data' is list of XLIST responses.
|
|
|
|
thks to barduck : http://stackoverflow.com/users/602242/barduck
|
|
"""
|
|
try :
|
|
name = 'XLIST'
|
|
status, data = self._simple_command(name, directory, pattern)
|
|
return self._untagged_response(status, data, name)
|
|
except :
|
|
return 'NO', ''
|
|
|
|
|
|
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
# ~~ class ImapHelper ~~
|
|
|
|
class ImapHelper:
|
|
""""""
|
|
|
|
K_HEAD, K_DATA = 0, 1
|
|
""""""
|
|
OK = 'OK'
|
|
""""""
|
|
KO = 'NO'
|
|
""""""
|
|
ENCODING = 'utf-8'
|
|
""""""
|
|
REG_SATUS = r'^"?(.*)"? \(([^\(]*)\)'
|
|
""""""
|
|
NO_SELECT = '\\Noselect'
|
|
""""""
|
|
CHILDREN = '\\HasChildren'
|
|
""""""
|
|
NO_CHILDREN = '\\HasNoChildren'
|
|
""""""
|
|
INBOX = '\\Inbox'
|
|
""""""
|
|
DRAFTS = '\\Drafts'
|
|
""""""
|
|
TRASH = '\\Trash'
|
|
""""""
|
|
SENT = '\\Sent'
|
|
""""""
|
|
DELETED = '\\Deleted'
|
|
""""""
|
|
FLAGS = '+FLAGS'
|
|
""""""
|
|
|
|
@Log(Const.LOG_BUILD)
|
|
def __init__(self, conf, box='INBOX', noBoxCreat=False):
|
|
""""""
|
|
if conf.host != None and research('yahoo.com', conf.host) is not None :
|
|
self.DRAFTS = self.DRAFTS[:-1]
|
|
self.conf = conf
|
|
self.rootBox = box
|
|
self.BOXS = {}
|
|
self.cnx = None
|
|
self.cnxusr = None
|
|
self.noBoxCreat = noBoxCreat
|
|
self.switchAccount(self.conf, self.rootBox, True)
|
|
|
|
@Log()
|
|
def reconnect(self):
|
|
""""""
|
|
Sys.pwlog([(' Reconnecting... ', Const.CLZ_7, True)])
|
|
self.switchAccount(self.conf, self.rootBox, True)
|
|
|
|
|
|
@Log()
|
|
def switchAccount(self, conf, box='INBOX', force=False):
|
|
""""""
|
|
if force or self.cnx is None or self.cnxusr is not conf.user :
|
|
try :
|
|
Sys.pwlog([(' Attempt to login... ' , Const.CLZ_7),
|
|
('(' , Const.CLZ_0),
|
|
(conf.user , Const.CLZ_2),
|
|
('@' , Const.CLZ_0),
|
|
(conf.host , Const.CLZ_3),
|
|
(':' , Const.CLZ_0),
|
|
(conf.port , Const.CLZ_4),
|
|
(')' , Const.CLZ_0, True)])
|
|
|
|
self.cnx = ImapClient(conf.host,conf.port)
|
|
except Exception as e :
|
|
raise BadHostException()
|
|
|
|
try :
|
|
status, resp = self.cnx.login(conf.user,conf.pwd)
|
|
|
|
except Exception as e :
|
|
status = self.KO
|
|
pass
|
|
finally :
|
|
if status == self.KO :
|
|
self.cnxusr = None
|
|
raise BadLoginException(' Cannot login with '+conf.user+':'+conf.pwd)
|
|
else :
|
|
Sys.pwlog([(' Connected ', Const.CLZ_2, True),
|
|
(Const.LINE_SEP_CHAR*Const.LINE_SEP_LEN , Const.CLZ_0, True)])
|
|
self.cnxusr = conf.user
|
|
try :
|
|
status, resp = self.cnx.select(self.rootBox)
|
|
if status == self.KO and not self.noBoxCreat:
|
|
self.createBox(self.rootBox)
|
|
status, resp = self.cnx.select(self.rootBox)
|
|
self.initBoxNames()
|
|
except Exception as e :
|
|
print(e)
|
|
|
|
|
|
@Log()
|
|
def createBox(self, box):
|
|
""""""
|
|
status, resp = self.cnx.create(encode(box))
|
|
return status==self.OK
|
|
|
|
|
|
@Log()
|
|
def deleteBox(self, box):
|
|
""""""
|
|
status, resp = self.cnx.delete(encode(box))
|
|
return status==self.OK
|
|
|
|
|
|
@Log(Const.LOG_DEBUG)
|
|
def initBoxNames(self):
|
|
""""""
|
|
status, resp = self.cnx.xlist()
|
|
if status == self.OK :
|
|
bdef, bname, c = None, None, None
|
|
for c in resp :
|
|
bdef, bname = c[1:-1].split(b') "/" "')
|
|
if bdef == Io.bytes(self.NO_SELECT+' '+self.CHILDREN) :
|
|
self.BOXS['/'] = Io.str(bname)
|
|
elif bdef == Io.bytes(self.NO_CHILDREN+' '+self.INBOX) :
|
|
self.BOXS[self.INBOX] = self.INBOX[1:].upper()
|
|
elif bdef == Io.bytes(self.NO_CHILDREN+' '+self.DRAFTS) :
|
|
self.BOXS[self.DRAFTS] = Io.str(bname)
|
|
elif bdef == Io.bytes(self.NO_CHILDREN+' '+self.TRASH) :
|
|
self.BOXS[self.TRASH] = Io.str(bname)
|
|
elif bdef == Io.bytes(self.NO_CHILDREN+' '+self.SENT) :
|
|
self.BOXS[self.SENT] = Io.str(bname)
|
|
else :
|
|
self.BOXS = { '/' : '/', self.INBOX : self.INBOX[1:].upper(), self.DRAFTS : self.DRAFTS[1:], self.TRASH : self.TRASH[1:], self.SENT : self.SENT[1:] }
|
|
|
|
|
|
@Log(Const.LOG_DEBUG)
|
|
def listBox(self, box='INBOX', pattern='*'):
|
|
""""""
|
|
status, resp = self.cnx.list(box,pattern)
|
|
l = []
|
|
for r in resp :
|
|
if r is not None :
|
|
name = Io.str(r).split(' "/" ')
|
|
l.append((name[0][1:-1].split(' '),name[1][1:-1]))
|
|
return l
|
|
|
|
|
|
@Log(Const.LOG_DEBUG)
|
|
def status(self, box='INBOX'):
|
|
""""""
|
|
status, resp = self.cnx.status(box, '(MESSAGES RECENT UIDNEXT UIDVALIDITY UNSEEN)')
|
|
if status == self.OK :
|
|
data = research(self.REG_SATUS, Io.str(resp[self.K_HEAD]))
|
|
l = resplit(' ',data.group(2))
|
|
dic = {'BOX' : box}
|
|
for i in range(len(l)):
|
|
if i%2 == 0 : dic[l[i]] = int(l[i+1])
|
|
else : dic = {}
|
|
return dic
|
|
|
|
|
|
@Log()
|
|
def countSeen(self, box='INBOX'):
|
|
""""""
|
|
s = self.status(box)
|
|
return s['MESSAGES']-s['UNSEEN']
|
|
|
|
|
|
@Log()
|
|
def countUnseen(self, box='INBOX'):
|
|
""""""
|
|
return self.status(box)['UNSEEN']
|
|
|
|
|
|
@Log()
|
|
def countMsg(self, box='INBOX'):
|
|
""""""
|
|
return self.status(box)['MESSAGES']
|
|
|
|
|
|
@Log(Const.LOG_DEBUG)
|
|
def _ids(self, box='INBOX', search='ALL', charset=None, byUid=False):
|
|
""""""
|
|
status, resp = self.cnx.select(box)
|
|
if status == self.KO :
|
|
self.createBox(box)
|
|
self.cnx.select(box)
|
|
status, resp = self.cnx.search(charset, '(%s)' % search)
|
|
return resplit(' ',Io.str(resp[self.K_HEAD]))
|
|
|
|
|
|
@Log()
|
|
def idsUnseen(self, box='INBOX', charset=None):
|
|
""""""
|
|
return self._ids(box,'UNSEEN', charset)
|
|
|
|
|
|
@Log()
|
|
def idsMsg(self, box='INBOX', charset=None):
|
|
""""""
|
|
return self._ids(box,'ALL', charset)
|
|
|
|
|
|
@Log()
|
|
def idsSeen(self, box='INBOX', charset=None):
|
|
""""""
|
|
return self._ids(box,'NOT UNSEEN', charset)
|
|
|
|
|
|
@Log(Const.LOG_DEBUG)
|
|
def search(self, query, byUid=False):
|
|
""""""
|
|
if byUid :
|
|
status, resp = self.cnx.uid('search', None, query)
|
|
else :
|
|
status, resp = self.cnx.search(None, query)
|
|
ids = [m for m in resp[0].split()]
|
|
return ids
|
|
|
|
|
|
@Log()
|
|
def searchBySubject(self, subject, byUid=False):
|
|
""""""
|
|
return self.search('(SUBJECT "%s")' % subject, byUid)
|
|
|
|
|
|
@Log()
|
|
def getUid(self, mid):
|
|
""""""
|
|
value = ''
|
|
status, resp = self.cnx.fetch(mid, '(UID)')
|
|
if status==self.OK :
|
|
# '$mid (UID $uid)'
|
|
value = resp[0][len(str(mid))+6:-1]
|
|
return value
|
|
|
|
|
|
@Log(Const.LOG_DEBUG)
|
|
def fetch(self, mid, query, byUid=False):
|
|
""""""
|
|
if not byUid :
|
|
status, resp = self.cnx.fetch(mid, query)
|
|
else:
|
|
status, resp = self.cnx.uid('fetch', mid, query)
|
|
return status, resp
|
|
|
|
|
|
@Log()
|
|
def headerField(self, mid, field, byUid=False):
|
|
""""""
|
|
value = ''
|
|
field = field.upper()
|
|
query = '(UID BODY[HEADER' + ('])' if field=='*' or field=='ALL' else '.FIELDS (%s)])' % field)
|
|
status, resp = self.fetch(mid, query, byUid)
|
|
if status==self.OK and resp[0]!=None:
|
|
value = Io.str(resp[0][1][len(field)+2:-4])
|
|
return value
|
|
|
|
|
|
@Log()
|
|
def getSubject(self, mid, byUid=False):
|
|
""""""
|
|
status, resp = self.fetch(mid, '(UID BODY[HEADER.FIELDS (SUBJECT)])', byUid)
|
|
subject = decode_header(str(resp[self.K_HEAD][1][9:-4], 'utf-8'))[0]
|
|
s = subject[0]
|
|
if subject[1] :
|
|
s = str(s,subject[1])
|
|
return s
|
|
|
|
|
|
@staticmethod
|
|
def _getIdsList(ids):
|
|
idslist = None
|
|
if isinstance(ids,list):
|
|
if len(ids) > 0 and ids[0]!='' and ids[0]!=None:
|
|
li = len(ids)-1
|
|
if int(ids[0])+li == int(ids[li]):
|
|
idslist = Io.str(ids[0]+b':'+ids[li]) if isinstance(ids[0],bytes) else str(ids[0])+':'+str(ids[li])
|
|
else :
|
|
idslist = Io.str(b','.join(ids)) if isinstance(ids[0],bytes) else ','.join(ids)
|
|
elif isinstance(ids, int) and ids > 0:
|
|
idslist = Io.str(ids)
|
|
return idslist
|
|
|
|
|
|
@Log()
|
|
def delete(self, ids, byUid=False, expunge=True):
|
|
""""""
|
|
status, delids = None, ImapHelper._getIdsList(ids)
|
|
#~ print(delids)
|
|
if delids is not None :
|
|
if byUid:
|
|
status, resp = self.cnx.uid( 'store', delids, self.FLAGS, self.DELETED )
|
|
else :
|
|
status, resp = self.cnx.store(delids, self.FLAGS, self.DELETED)
|
|
if expunge :
|
|
self.cnx.expunge()
|
|
return status == self.OK
|
|
|
|
|
|
@Log()
|
|
def clearTrash(self):
|
|
""""""
|
|
self.cnx.select(self.BOXS[self.TRASH])
|
|
ids = self.search('ALL',True)
|
|
if len(ids) > 0 and ids[0]!='' and ids[0]!=None:
|
|
delids = ImapHelper._getIdsList(ids)
|
|
status, resp = self.cnx.uid('store', delids, self.FLAGS, self.DELETED )
|
|
|
|
Sys.pwlog([(' Deleting msg ', Const.CLZ_0),
|
|
(delids , Const.CLZ_1),
|
|
(' '+status , Const.CLZ_7, True)])
|
|
self.cnx.expunge()
|
|
self.cnx.select(self.rootBox)
|
|
|
|
|
|
@Log()
|
|
def getEmail(self, mid, byUid=False):
|
|
""""""
|
|
status, resp = self.fetch(mid,'(UID RFC822)', byUid)
|
|
if status == self.OK and resp[0]!=None:
|
|
msg = message_from_bytes(resp[0][1])
|
|
else :
|
|
msg = None
|
|
return msg
|
|
|
|
|
|
@Log(Const.LOG_APP)
|
|
def getAttachment(self, msg, toDir='./', byUid=False):
|
|
""""""
|
|
# self.download(msg, toDir, byUid, False)
|
|
# p = Process(target=self.download, args=(msg, toDir, byUid))
|
|
# p.start()
|
|
# p.join()
|
|
if not isinstance(msg, Message) :
|
|
msg = self.getEmail(msg, byUid)
|
|
for part in msg.walk():
|
|
filename = part.get_filename()
|
|
if part.get_content_maintype() == 'multipart' or not filename : continue
|
|
with Io.wfile(Sys.join(toDir, filename)) as fo :
|
|
fo.write(part.get_payload(decode=True))
|
|
|
|
|
|
@Log(Const.LOG_APP)
|
|
def download(self, msg, toDir, byUid=False, reconError=True):
|
|
""""""
|
|
try:
|
|
if not isinstance(msg, Message) :
|
|
msg = self.getEmail(msg, byUid)
|
|
for part in msg.walk():
|
|
filename = part.get_filename()
|
|
if part.get_content_maintype() == 'multipart' or not filename : continue
|
|
with Io.wfile(Sys.join(toDir, filename)) as fo :
|
|
fo.write(part.get_payload(decode=True))
|
|
except Exception as e :
|
|
print(e)
|
|
self.reconnect()
|
|
self.download(msg, toDir, byUid, False)
|
|
|
|
|
|
@Log()
|
|
def send(self, msg, box='INBOX'):
|
|
""""""
|
|
mid = None
|
|
date = Time2Internaldate(Sys.time())
|
|
status, resp = self.cnx.append(box, '\Draft', date, bytes(msg,'utf-8'))
|
|
if status==self.OK:
|
|
mid = str(resp[0],'utf-8')[11:-11].split(' ')
|
|
return mid
|
|
|
|
|
|
|
|
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
# ~~ class BadLoginException ~~
|
|
|
|
class BadLoginException(BaseException):
|
|
pass
|
|
|
|
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
# ~~ class BadLoginException ~~
|
|
|
|
class BadHostException(BaseException):
|
|
pass
|