python-imprastorage/impra/imap.py

437 lines
14 KiB
Python
Raw Normal View History

2012-09-10 22:12:39 +00:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
# #
# software : ImpraStorage <http://imprastorage.sourceforge.net/> #
2012-09-29 18:38:04 +00:00
# version : 0.6 #
2012-09-10 22:12:39 +00:00
# date : 2012 #
# licence : GPLv3.0 <http://www.gnu.org/licenses/> #
# author : a-Sansara <http://www.a-sansara.net/> #
# copyright : pluie.org <http://www.pluie.org/> #
# #
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
#
# This file is part of ImpraStorage.
#
# ImpraStorage is free software (free as in speech) : you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the License,
# or (at your option) any later version.
#
# ImpraStorage is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License
# along with ImpraStorage. If not, see <http://www.gnu.org/licenses/>.
from binascii import b2a_base64, a2b_base64
from codecs import register, StreamReader, StreamWriter
2012-09-10 22:12:39 +00:00
from email import message_from_bytes
from email.header import decode_header
from email.message import Message
from imaplib import IMAP4_SSL, Time2Internaldate
from os.path import join
from re import search, split
from time import time, sleep
from impra.util import __CALLER__, RuTime, bstr, stack, Clz
2012-09-10 22:12:39 +00:00
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ~~ package imap ~~
2012-09-10 22:12:39 +00:00
def _seq_encode(seq,l):
""""""
if len(seq) > 0 :
l.append('&%s-' % str(b2a_base64(bytes(''.join(seq),'utf-16be')),'utf-8').rstrip('\n=').replace('/', ','))
elif l:
l.append('-')
def encode(s):
""""""
l, e, = [], []
for c in s :
if ord(c) in range(0x20,0x7e):
if e : _seq_encode(e,l)
e = []
l.append(c)
if c == '&' : l.append('-')
else :
e.append(c)
if e : _seq_encode(e,l)
return ''.join(l)
def encoder(s):
""""""
e = bytes(encode(s),'utf-8')
return e, len(e)
def _seq_decode(seq,l):
""""""
d = ''.join(seq[1:])
pad = 4-(len(d)%4)
l.append(str(a2b_base64(bytes(d.replace(',', '/')+pad*'=','utf-16be')),'utf-16be'))
def decode(s):
""""""
l, d = [], []
for c in s:
if c == '&' and not d : d.append('&')
elif c == '-' and d:
if len(d) == 1: l.append('&')
else : _seq_decode(d,l)
d = []
elif d: d.append(c)
else: l.append(c)
if d: _seq_decode(d,l)
return ''.join(l)
def decoder(s):
""""""
d = decode(str(s,'utf-8'))
return d, len(d)
def _codec_imap4utf7(name):
""""""
if name == 'imap4-utf-7':
return (encoder, decoder, Imap4Utf7StreamReader, Imap4Utf7StreamWriter)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ~~ class Imap4Utf7StreamWriter ~~
class Imap4Utf7StreamReader(StreamReader):
""""""
def decode(self, s, errors='strict'):
""""""
return decoder(s)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ~~ class Imap4Utf7StreamWriter ~~
class Imap4Utf7StreamWriter(StreamWriter):
""""""
def decode(self, s, errors='strict'):
""""""
return encoder(s)
register(_codec_imap4utf7)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ~~ class ImapConfig ~~
class ImapConfig:
""""""
def __init__(self, host, port, user, pwd):
""""""
self.host = host
self.port = port
self.user = user
self.pwd = pwd
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ~~ class ImapHelper ~~
class ImapHelper:
""""""
K_HEAD, K_DATA = 0, 1
""""""
OK = 'OK'
""""""
KO = 'NO'
""""""
ENCODING = 'utf-8'
""""""
REG_SATUS = r'^"(\w*)" \(([^\(]*)\)'
""""""
NO_SELECT = '\\Noselect'
""""""
CHILDREN = '\\HasChildren'
""""""
NO_CHILDREN = '\\HasNoChildren'
""""""
BOX_BIN = '[Gmail]/Corbeille'
""""""
def __init__(self, conf, box='INBOX'):
""""""
rt = RuTime(eval(__CALLER__('conf,"'+box+'"')))
self.srv = IMAP4_SSL(conf.host,conf.port)
self.srv.login(conf.user,conf.pwd)
self.rootBox = box
status, resp = self.srv.select(self.rootBox)
if status == self.KO :
self.createBox(self.rootBox)
self.srv.select(self.rootBox)
rt.stop()
def status(self,box='INBOX'):
""""""
2012-10-01 20:08:37 +00:00
status, resp = self.srv.status(box, '(MESSAGES RECENT UIDNEXT UIDVALIDITY UNSEEN)')
2012-09-10 22:12:39 +00:00
if status == 'OK' :
data = search(self.REG_SATUS,bstr(resp[self.K_HEAD]))
l = split(' ',data.group(2))
dic = {'BOX' : data.group(1)}
for i in range(len(l)):
if i%2 == 0 : dic[l[i]] = int(l[i+1])
else : dic = {}
return dic
def countSeen(self, box='INBOX'):
""""""
2012-09-13 18:23:29 +00:00
s = self.status(box)
2012-09-10 22:12:39 +00:00
return s['MESSAGES']-s['UNSEEN']
def countUnseen(self, box='INBOX'):
""""""
2012-09-13 18:23:29 +00:00
return self.status(box)['UNSEEN']
2012-09-10 22:12:39 +00:00
def countMsg(self, box='INBOX'):
""""""
2012-09-13 18:23:29 +00:00
return self.status(box)['MESSAGES']
2012-09-10 22:12:39 +00:00
2012-09-13 18:23:29 +00:00
def _ids(self, box='INBOX', search='ALL', charset=None, byUid=False):
2012-09-10 22:12:39 +00:00
""""""
2012-09-13 18:23:29 +00:00
status, resp = self.srv.select(box)
if status == self.KO :
self.createBox(box)
self.srv.select(box)
2012-09-10 22:12:39 +00:00
status, resp = self.srv.search(charset, '(%s)' % search)
return split(' ',bstr(resp[self.K_HEAD]))
def idsUnseen(self, box='INBOX', charset=None):
""""""
return self._ids(box,'UNSEEN', charset)
def idsMsg(self, box='INBOX', charset=None):
""""""
return self._ids(box,'ALL', charset)
def idsSeen(self, box='INBOX', charset=None):
""""""
return self._ids(box,'NOT UNSEEN', charset)
def listBox(self, box='INBOX', pattern='*'):
""""""
status, resp = self.srv.list(box,pattern)
l = []
for r in resp :
name = bstr(r).split(' "/" ')
l.append((name[0][1:-1].split(' '),decode(name[1][1:-1])))
return l
def createBox(self, box):
""""""
rt = RuTime(eval(__CALLER__(box)))
status, resp = self.srv.create(encode(box))
rt.stop()
return status==self.OK
def deleteBox(self, box):
""""""
rt = RuTime(eval(__CALLER__(box)))
status, resp = self.srv.delete(encode(box))
rt.stop()
return status==self.OK
2012-09-13 18:23:29 +00:00
def subject(self, mid, byUid=False):
2012-09-10 22:12:39 +00:00
""""""
2012-09-13 18:23:29 +00:00
status, resp = self.fetch(mid, '(UID BODY[HEADER.FIELDS (SUBJECT)])', byUid)
2012-09-10 22:12:39 +00:00
subject = decode_header(str(resp[self.K_HEAD][1][9:-4], 'utf-8'))[0]
s = subject[0]
if subject[1] :
s = str(s,subject[1])
return s
2012-09-13 18:23:29 +00:00
def search(self, query, byUid=False):
if byUid :
status, resp = self.srv.uid('search', None, query)
else :
status, resp = self.srv.search(None, query)
ids = [m for m in resp[0].split()]
return ids
def searchBySubject(self, subject, byUid=False):
return self.search('(SUBJECT "%s")' % subject, byUid)
def getUid(self, mid):
""""""
value = ''
status, resp = self.srv.fetch(mid, '(UID)')
if status==self.OK :
value = resp[0][len(str(mid))+3:-1]
return value
def fetch(self, mid, query, byUid=False):
""""""
if not byUid :
status, resp = self.srv.fetch(mid, query)
else:
status, resp = self.srv.uid('fetch', mid, query)
return status, resp
def headerField(self, field, mid, byUid=False):
""""""
value = ''
status, resp = self.fetch(mid, '(UID BODY[HEADER.FIELDS (%s)])' % field.upper(), byUid)
if status==self.OK and resp[0]!=None:
value = str(resp[0][1][len(field)+2:-4],'utf-8')
return value
def email(self, mid, byUid=False):
2012-09-10 22:12:39 +00:00
""""""
2012-09-13 18:23:29 +00:00
status, resp = self.fetch(mid,'(UID RFC822)', byUid)
if status == self.OK and resp[0]!=None:
2012-09-10 22:12:39 +00:00
msg = message_from_bytes(resp[0][1])
else :
msg = None
return msg
def deleteBin(self):
""""""
2012-09-17 17:13:10 +00:00
from impra.util import DEBUG_NOTICE, DEBUG, DEBUG_LEVEL, DEBUG_INFO
rt = RuTime(eval(__CALLER__()),DEBUG_INFO)
2012-09-10 22:12:39 +00:00
self.srv.select(self.BOX_BIN)
2012-09-13 18:23:29 +00:00
ids = self.search('ALL',True)
if len(ids) > 0 and ids[0]!='' and ids[0]!=None:
print()
2012-09-13 18:23:29 +00:00
#print(str(ids[0],'utf-8').split())
2012-09-17 15:34:08 +00:00
for mid in ids:
2012-09-13 18:23:29 +00:00
#~ uid = bytes(mid)
#~ print(type(mid))
#~ print(mid)
#status, resp = self.srv.store(mid, '+FLAGS', '\\Deleted')
status, resp = self.srv.uid('store', mid, '+FLAGS', '\\Deleted' )
print(' ',end='')
Clz.print(' deleting msg ',Clz.fgN7+Clz.bg1, False)
Clz.print(str(int(mid)) ,Clz.bg1+Clz.fgB3)
2012-09-17 17:13:10 +00:00
if DEBUG and DEBUG_LEVEL <= DEBUG_NOTICE:
2012-09-17 15:34:08 +00:00
print(status)
print(resp)
2012-09-10 22:12:39 +00:00
self.srv.expunge()
print()
2012-09-10 22:12:39 +00:00
self.srv.select(self.rootBox)
rt.stop()
2012-09-24 08:02:58 +00:00
def delete(self, mid, byUid=False, expunge=True):
2012-09-10 22:12:39 +00:00
""""""
rt = RuTime(eval(__CALLER__('%i' % int(mid))))
status = None
if int(mid) > 0 :
2012-09-13 18:23:29 +00:00
if byUid:
status, resp = self.srv.uid( 'store', mid, '+FLAGS', '\\Deleted' )
else :
status, resp = self.srv.store(mid, '+FLAGS', '\\Deleted')
Clz.print(' flag msg ' , Clz.fgn7, False)
2012-09-29 18:38:04 +00:00
Clz.print(str(int(mid)), Clz.fgB1, False)
Clz.print(' as deleted', Clz.fgn7)
2012-09-24 08:02:58 +00:00
if expunge :
Clz.print('\n expunge, waiting server...\n', Clz.fgB1)
self.srv.expunge()
2012-09-29 18:38:04 +00:00
sleep(0.5)
2012-09-10 22:12:39 +00:00
rt.stop()
return status == self.OK
2012-09-29 18:38:04 +00:00
def downloadAttachment(self, msg, toDir='./', byUid=False):
2012-09-10 22:12:39 +00:00
""""""
2012-09-17 15:34:08 +00:00
from impra.util import DEBUG, DEBUG_LEVEL, DEBUG_NOTICE, DEBUG_INFO
rt = RuTime(eval(__CALLER__('%i' % int(msg))),DEBUG_INFO)
2012-09-10 22:12:39 +00:00
if not isinstance(msg, Message) :
2012-09-29 18:38:04 +00:00
msg = self.email(msg,byUid)
2012-09-10 22:12:39 +00:00
for part in msg.walk():
filename = part.get_filename()
2012-09-17 15:34:08 +00:00
if filename != None and DEBUG and DEBUG_LEVEL <= DEBUG_NOTICE : print(filename)
2012-09-10 22:12:39 +00:00
if part.get_content_maintype() == 'multipart' or not filename : continue
fp = open(join(toDir, filename), 'wb')
#print(part.get_payload(decode=True)[::-1])
fp.write(part.get_payload(decode=True))
fp.close()
rt.stop()
def send(self, msg, box='INBOX'):
""""""
from impra.util import DEBUG_INFO, DEBUG_LEVEL, DEBUG, DEBUG_NOTICE
rt = RuTime(eval(__CALLER__()))
2012-09-13 18:23:29 +00:00
mid = None
date = Time2Internaldate(time())
status, resp = self.srv.append(box, '\Draft', date, bytes(msg,'utf-8'))
if status==self.OK:
if DEBUG and DEBUG_LEVEL <= DEBUG_NOTICE:
print(status)
print(resp)
2012-09-13 18:23:29 +00:00
mid = str(resp[0],'utf-8')[11:-11].split(' ')
2012-09-10 22:12:39 +00:00
rt.stop()
2012-09-13 18:23:29 +00:00
return mid
2012-09-10 22:12:39 +00:00
if __name__ == '__main__':
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
iconf = ImapConfig("imap.gmail.com", 993, 'gpslot.001', '__gpslot#22')
ih = ImapHelper(iconf,'__SMILF')
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
print('\n--------------------------------------------------------------------')
print('-- STATUS DEFAULT BOX --')
print(str(ih.status()))
print('-- STATUS BOX __SMILF --')
print(str(ih.status('__SMILF')))
print('-- UNSEEN COUNT --')
print(str(ih.countUnseen('__SMILF')))
print('-- SEEN COUNT --')
print(str(ih.countSeen('__SMILF')))
print('-- MESSAGE COUNT --')
print(str(ih.countMsg('__SMILF')))
print('-- UNSEEN IDS --')
print(ih.idsUnseen('__SMILF'))
print('-- MESSAGES IDS --')
print(ih.idsMsg('__SMILF'))
print('-- SEEN IDS --')
lunseen = ih.idsSeen('__SMILF')
print(lunseen)
print('-- LIST BOX --')
lb = ih.listBox('')
print(lb[5][1])
print('-- SUBJECT ID 1 --')
print(ih.subject(lunseen[0]))
print('-- BODY ID 1 --')
#print(ih.body(lunseen[0]))
print('-- EMAIL ID 1 --')
# 'partial', ('1', 'RFC822', 1, 1024)),
#status, resp = ih.srv.fetch(lunseen[0],'(UID RFC822)')
#status, resp = ih.srv.fetch('4','(UID body[header.fields (from to subject date)])')
#status, resp = ih.srv.fetch(lunseen[1],'(UID RFC822.SIZE)')
#status, resp = ih.srv.fetch(lunseen[1],'(UID RFC822.HEADER)')
#status, resp = ih.srv.fetch(lunseen[1],'(UID BODYSTRUCTURE)')
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
#msg = ih.email(lunseen[0])
#print(type(msg))
#print(msg)
#print('-- ATTACHMENT ID 1 --')
#ih.downloadAttachment(lunseen[0])
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ['MIME-Version', 'Received', 'Date', 'Message-ID', 'Subject', 'From', 'To', 'Content-Type']
print('-- CREATE BOX __SMILF/böx --')
print(ih.createBox("__SMILF/böx"))
print('-- DELETE BOX böx --')
print(ih.deleteBox("böx"))
#~ OK
#~ [b'Success']
#~ True
#~ NO
#~ [b'[ALREADYEXISTS] Duplicate folder name b\xc3\xb6x (Failure)']
#~ True
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~