
554 lines
17 KiB
Raw Permalink Normal View History

2014-07-17 22:19:51 +00:00
#!/usr/bin/env python3
#-*- coding: utf-8 -*-
# psr/
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
# software : Kirmah <>
# version : 2.18
# date : 2014
# licence : GPLv3.0 <>
# author : a-Sansara <[a-sansara]at[clochardprod]dot[net]>
# copyright : <>
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
# This file is part of Kirmah.
# Kirmah is free software (free as in speech) : you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the License,
# or (at your option) any later version.
# Kirmah is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
# You should have received a copy of the GNU General Public License
# along with Kirmah. If not, see <>.
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ~~ module imap ~~
from imaplib import Commands, IMAP4_SSL, Time2Internaldate
from binascii import b2a_base64, a2b_base64
from codecs import register, StreamReader, StreamWriter
from email import message_from_bytes
from email.header import decode_header
from email.message import Message
from re import search as research, split as resplit
2014-07-16 23:55:55 +00:00
from multiprocessing import Process
from psr.sys import Io, Sys, Const
from psr.log import Log
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ~~ ImapUtf7 decoding/encoding ~~
def _seq_encode(seq,l):
if len(seq) > 0 :
l.append('&%s-' % str(b2a_base64(bytes(''.join(seq),'utf-16be')),'utf-8').rstrip('\n=').replace('/', ','))
elif l:
def _seq_decode(seq,l):
d = ''.join(seq[1:])
pad = 4-(len(d)%4)
l.append(str(a2b_base64(bytes(d.replace(',', '/')+pad*'=','utf-16be')),'utf-16be'))
def encode(s):
l, e, = [], []
for c in s :
if ord(c) in range(0x20,0x7e):
if e : _seq_encode(e,l)
e = []
if c == '&' : l.append('-')
else :
if e : _seq_encode(e,l)
return ''.join(l)
def decode(s):
l, d = [], []
for c in s:
if c == '&' and not d : d.append('&')
elif c == '-' and d:
if len(d) == 1: l.append('&')
else : _seq_decode(d,l)
d = []
elif d: d.append(c)
else: l.append(c)
if d: _seq_decode(d,l)
return ''.join(l)
def _encoder(s):
e = bytes(encode(s),'utf-8')
return e, len(e)
def _decoder(s):
d = decode(str(s,'utf-8'))
return d, len(d)
def _codec_imap4utf7(name):
if name == 'imap4-utf-7':
return (_encoder, _decoder, Imap4Utf7StreamReader, Imap4Utf7StreamWriter)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ~~ StreamReader & StreamWriter ~~
class Imap4Utf7StreamReader(StreamReader):
def decode(self, s, errors='strict'): return _decoder(s)
class Imap4Utf7StreamWriter(StreamWriter):
def decode(self, s, errors='strict'): return _encoder(s)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ~~ registering codec ~~
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ~~ Imap utilities ~~
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ~~ class ImapConfig ~~
class ImapConfig:
2014-07-16 23:55:55 +00:00
def __init__(self, host, user, pwd, port='993'):
"""""" = host
self.user = user
self.pwd = pwd
2014-07-16 23:55:55 +00:00
self.port = str(port)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ~~ class ImapClient ~~
class ImapClient(IMAP4_SSL):
Commands['XLIST'] = ('AUTH', 'SELECTED')
def xlist(self, directory='""', pattern='*'):
"""(X)List mailbox names in directory matching pattern. Using Google's XLIST extension
(status, [data]) = <instance>.xlist(directory='""', pattern='*')
'data' is list of XLIST responses.
thks to barduck :
try :
name = 'XLIST'
status, data = self._simple_command(name, directory, pattern)
return self._untagged_response(status, data, name)
except :
return 'NO', ''
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ~~ class ImapHelper ~~
class ImapHelper:
K_HEAD, K_DATA = 0, 1
OK = 'OK'
KO = 'NO'
ENCODING = 'utf-8'
REG_SATUS = r'^"?(.*)"? \(([^\(]*)\)'
NO_SELECT = '\\Noselect'
CHILDREN = '\\HasChildren'
NO_CHILDREN = '\\HasNoChildren'
INBOX = '\\Inbox'
DRAFTS = '\\Drafts'
TRASH = '\\Trash'
SENT = '\\Sent'
DELETED = '\\Deleted'
2014-07-16 23:55:55 +00:00
def __init__(self, conf, box='INBOX', noBoxCreat=False):
2014-07-16 23:55:55 +00:00
if != None and research('', is not None :
self.DRAFTS = self.DRAFTS[:-1]
2014-07-16 23:55:55 +00:00
self.conf = conf
self.rootBox = box
self.BOXS = {}
self.cnx = None
self.cnxusr = None
self.noBoxCreat = noBoxCreat
self.switchAccount(self.conf, self.rootBox, True)
def reconnect(self):
Sys.pwlog([(' Reconnecting... ', Const.CLZ_7, True)])
self.switchAccount(self.conf, self.rootBox, True)
def switchAccount(self, conf, box='INBOX', force=False):
2014-07-16 23:55:55 +00:00
if force or self.cnx is None or self.cnxusr is not conf.user :
2014-07-16 23:55:55 +00:00
try :
Sys.pwlog([(' Attempt to login... ' , Const.CLZ_7),
('(' , Const.CLZ_0),
(conf.user , Const.CLZ_2),
('@' , Const.CLZ_0),
( , Const.CLZ_3),
(':' , Const.CLZ_0),
(conf.port , Const.CLZ_4),
(')' , Const.CLZ_0, True)])
self.cnx = ImapClient(,conf.port)
except Exception as e :
raise BadHostException()
2014-07-16 23:55:55 +00:00
try :
status, resp = self.cnx.login(conf.user,conf.pwd)
2014-07-16 23:55:55 +00:00
except Exception as e :
status = self.KO
finally :
if status == self.KO :
self.cnxusr = None
raise BadLoginException(' Cannot login with '+conf.user+':'+conf.pwd)
else :
2014-07-16 23:55:55 +00:00
Sys.pwlog([(' Connected ', Const.CLZ_2, True),
(Const.LINE_SEP_CHAR*Const.LINE_SEP_LEN , Const.CLZ_0, True)])
self.cnxusr = conf.user
try :
status, resp =
2014-07-16 23:55:55 +00:00
if status == self.KO and not self.noBoxCreat:
status, resp =
except Exception as e :
def createBox(self, box):
status, resp = self.cnx.create(encode(box))
return status==self.OK
def deleteBox(self, box):
status, resp = self.cnx.delete(encode(box))
return status==self.OK
def initBoxNames(self):
status, resp = self.cnx.xlist()
if status == self.OK :
bdef, bname, c = None, None, None
for c in resp :
bdef, bname = c[1:-1].split(b') "/" "')
if bdef == Io.bytes(self.NO_SELECT+' '+self.CHILDREN) :
self.BOXS['/'] = Io.str(bname)
elif bdef == Io.bytes(self.NO_CHILDREN+' '+self.INBOX) :
self.BOXS[self.INBOX] = self.INBOX[1:].upper()
elif bdef == Io.bytes(self.NO_CHILDREN+' '+self.DRAFTS) :
self.BOXS[self.DRAFTS] = Io.str(bname)
elif bdef == Io.bytes(self.NO_CHILDREN+' '+self.TRASH) :
self.BOXS[self.TRASH] = Io.str(bname)
elif bdef == Io.bytes(self.NO_CHILDREN+' '+self.SENT) :
self.BOXS[self.SENT] = Io.str(bname)
else :
self.BOXS = { '/' : '/', self.INBOX : self.INBOX[1:].upper(), self.DRAFTS : self.DRAFTS[1:], self.TRASH : self.TRASH[1:], self.SENT : self.SENT[1:] }
def listBox(self, box='INBOX', pattern='*'):
status, resp = self.cnx.list(box,pattern)
l = []
for r in resp :
if r is not None :
name = Io.str(r).split(' "/" ')
l.append((name[0][1:-1].split(' '),name[1][1:-1]))
return l
def status(self, box='INBOX'):
status, resp = self.cnx.status(box, '(MESSAGES RECENT UIDNEXT UIDVALIDITY UNSEEN)')
if status == self.OK :
data = research(self.REG_SATUS, Io.str(resp[self.K_HEAD]))
l = resplit(' ',
dic = {'BOX' : box}
for i in range(len(l)):
if i%2 == 0 : dic[l[i]] = int(l[i+1])
else : dic = {}
return dic
def countSeen(self, box='INBOX'):
s = self.status(box)
return s['MESSAGES']-s['UNSEEN']
def countUnseen(self, box='INBOX'):
return self.status(box)['UNSEEN']
def countMsg(self, box='INBOX'):
return self.status(box)['MESSAGES']
def _ids(self, box='INBOX', search='ALL', charset=None, byUid=False):
status, resp =
if status == self.KO :
status, resp =, '(%s)' % search)
return resplit(' ',Io.str(resp[self.K_HEAD]))
def idsUnseen(self, box='INBOX', charset=None):
return self._ids(box,'UNSEEN', charset)
def idsMsg(self, box='INBOX', charset=None):
return self._ids(box,'ALL', charset)
def idsSeen(self, box='INBOX', charset=None):
return self._ids(box,'NOT UNSEEN', charset)
def search(self, query, byUid=False):
if byUid :
status, resp = self.cnx.uid('search', None, query)
else :
status, resp =, query)
ids = [m for m in resp[0].split()]
return ids
def searchBySubject(self, subject, byUid=False):
return'(SUBJECT "%s")' % subject, byUid)
def getUid(self, mid):
value = ''
status, resp = self.cnx.fetch(mid, '(UID)')
if status==self.OK :
# '$mid (UID $uid)'
value = resp[0][len(str(mid))+6:-1]
return value
def fetch(self, mid, query, byUid=False):
if not byUid :
status, resp = self.cnx.fetch(mid, query)
status, resp = self.cnx.uid('fetch', mid, query)
return status, resp
def headerField(self, mid, field, byUid=False):
value = ''
field = field.upper()
query = '(UID BODY[HEADER' + ('])' if field=='*' or field=='ALL' else '.FIELDS (%s)])' % field)
status, resp = self.fetch(mid, query, byUid)
if status==self.OK and resp[0]!=None:
value = Io.str(resp[0][1][len(field)+2:-4])
return value
def getSubject(self, mid, byUid=False):
status, resp = self.fetch(mid, '(UID BODY[HEADER.FIELDS (SUBJECT)])', byUid)
subject = decode_header(str(resp[self.K_HEAD][1][9:-4], 'utf-8'))[0]
s = subject[0]
if subject[1] :
s = str(s,subject[1])
return s
def _getIdsList(ids):
idslist = None
if isinstance(ids,list):
if len(ids) > 0 and ids[0]!='' and ids[0]!=None:
li = len(ids)-1
if int(ids[0])+li == int(ids[li]):
idslist = Io.str(ids[0]+b':'+ids[li]) if isinstance(ids[0],bytes) else str(ids[0])+':'+str(ids[li])
else :
idslist = Io.str(b','.join(ids)) if isinstance(ids[0],bytes) else ','.join(ids)
elif isinstance(ids, int) and ids > 0:
idslist = Io.str(ids)
return idslist
def delete(self, ids, byUid=False, expunge=True):
status, delids = None, ImapHelper._getIdsList(ids)
#~ print(delids)
if delids is not None :
if byUid:
status, resp = self.cnx.uid( 'store', delids, self.FLAGS, self.DELETED )
else :
status, resp =, self.FLAGS, self.DELETED)
if expunge :
return status == self.OK
def clearTrash(self):
ids ='ALL',True)
if len(ids) > 0 and ids[0]!='' and ids[0]!=None:
delids = ImapHelper._getIdsList(ids)
status, resp = self.cnx.uid('store', delids, self.FLAGS, self.DELETED )
2014-07-16 23:55:55 +00:00
Sys.pwlog([(' Deleting msg ', Const.CLZ_0),
(delids , Const.CLZ_1),
(' '+status , Const.CLZ_7, True)])
def getEmail(self, mid, byUid=False):
status, resp = self.fetch(mid,'(UID RFC822)', byUid)
if status == self.OK and resp[0]!=None:
msg = message_from_bytes(resp[0][1])
else :
msg = None
return msg
def getAttachment(self, msg, toDir='./', byUid=False):
2014-07-16 23:55:55 +00:00
#, toDir, byUid, False)
# p = Process(, args=(msg, toDir, byUid))
# p.start()
# p.join()
if not isinstance(msg, Message) :
msg = self.getEmail(msg, byUid)
for part in msg.walk():
filename = part.get_filename()
if part.get_content_maintype() == 'multipart' or not filename : continue
with Io.wfile(Sys.join(toDir, filename)) as fo :
2014-07-16 23:55:55 +00:00
def download(self, msg, toDir, byUid=False, reconError=True):
if not isinstance(msg, Message) :
msg = self.getEmail(msg, byUid)
for part in msg.walk():
filename = part.get_filename()
if part.get_content_maintype() == 'multipart' or not filename : continue
with Io.wfile(Sys.join(toDir, filename)) as fo :
except Exception as e :
self.reconnect(), toDir, byUid, False)
def send(self, msg, box='INBOX'):
mid = None
date = Time2Internaldate(Sys.time())
status, resp = self.cnx.append(box, '\Draft', date, bytes(msg,'utf-8'))
if status==self.OK:
mid = str(resp[0],'utf-8')[11:-11].split(' ')
return mid
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ~~ class BadLoginException ~~
class BadLoginException(BaseException):
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ~~ class BadLoginException ~~
class BadHostException(BaseException):