# !/usr/bin/env python
# -*- coding: utf-8 -*-
# psr/io.py
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
#
# software : Kirmah
# version : 2.17
# date : 2013
# licence : GPLv3.0
# author : a-Sansara <[a-sansara]at[clochardprod]dot[net]>
# copyright : pluie.org
#
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
#
# This file is part of Kirmah.
#
# Kirmah is free software (free as in speech) : you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the License,
# or (at your option) any later version.
#
# Kirmah is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License
# along with Kirmah. If not, see .
#
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ~~ module io ~~
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ~~ class Io ~~
class Io:
from io import SEEK_CUR
from gzip import compress as gzcompress, decompress as gzdecompress
from bz2 import compress as bzcompress, decompress as bzdecompress
from errno import EEXIST
from os import remove as removeFile, utime, rename
from mmap import mmap, PROT_READ
def __init__(self):
""""""
@staticmethod
def read_in_chunks(f, chsize=1024, utf8=False):
"""Lazy function (generator) to read a file piece by piece in
Utf-8 bytes
Default chunk size: 1k."""
c = 0
while True:
data = f.read(chsize)
if utf8 :
p = f.tell()
t = f.read(1)
f.seek(p)
if t!=b'' and not Io.is_utf8_start_sequence(ord(t)) :
delta = 0
for i in range(3) :
t = f.read(1)
if Io.is_utf8_start_sequence(ord(t)) :
delta += i
break
f.seek(p)
data += f.read(delta)
if not data : break
yield data, c
c += 1
@staticmethod
def read_utf8_chr(f, chsize=0, p=0):
""""""
with Io.mmap(f.fileno(), chsize*p) as mmp:
s, b, c = mmp.size(), b'', 0
while mmp.tell() < s :
b = mmp.read(1)
c = Io.count_utf8_continuation_bytes(b)
if c > 0 : b += mmp.read(c)
yield str(b,'utf-8')
@staticmethod
def is_utf8_continuation_byte(b):
""""""
return b >= 0x80 and b <= 0xbf
@staticmethod
def is_utf8_start_sequence(b):
""""""
return (b >= 0x00 and b <= 0x7f) or (b>= 0xc2 and b <= 0xf4)
@staticmethod
def count_utf8_continuation_bytes(b):
""""""
c = 0
try : d = ord(b)
except : d = int(b)
if d >= 0xc2 :
if d < 0xe0 : c = 1
elif d < 0xf0 : c = 2
else : c = 3
return c
@staticmethod
def get_data(path, binary=False, remove=False):
"""Get file content from `path`
:Returns: `str`
"""
d = ''
with Io.rfile(path, binary) as f :
d = f.read()
if remove :
Io.removeFile(path)
return d
@staticmethod
def get_file_obj(path, binary=False, writing=False, update=False, truncate=False):
""""""
if not writing :
f = open(path, mode='rt' if not binary else 'rb')
else :
if update and not Io.file_exists(path):
if binary :
f = open(path, mode='wb')
else :
f = open(path, mode='wt', encoding='utf-8')
return f
m = ('w' if truncate else 'r')+('+' if update else '')+('b' if binary else 't')
if not binary :
f = open(path, mode=m, encoding='utf-8')
else :
f = open(path, mode=m)
return f
@staticmethod
def wfile(path, binary=True):
""""""
return Io.get_file_obj(path, binary, True, True, True)
@staticmethod
def ufile(path, binary=True):
""""""
return Io.get_file_obj(path, binary, True, True, False)
@staticmethod
def rfile(path, binary=True):
""""""
return Io.get_file_obj(path, binary)
@staticmethod
def set_data(path, content, binary=False):
""""""
with Io.wfile(path, binary) as f:
f.write(content)
@staticmethod
def readmmline(f, pos=0):
""""""
f.flush()
with Io.mmap(f.fileno(), 0, prot=Io.PROT_READ) as mmp:
mmp.seek(pos)
for line in iter(mmp.readline, b''):
pos = mmp.tell()
yield pos, Io.str(line[:-1])
@staticmethod
def copy(fromPath, toPath):
""""""
if not fromPath == toPath :
with Io.rfile(fromPath) as fi :
with Io.wfile(toPath) as fo :
fo.write(fi.read())
else : raise Exception('can\t copy to myself')
@staticmethod
def bytes(sdata, encoding='utf-8'):
""""""
return bytes(sdata,encoding)
@staticmethod
def str(bdata, encoding='utf-8'):
""""""
return str(bdata,encoding) if isinstance(bdata, bytes) else str(bdata)
@staticmethod
def printableBytes(bdata):
""""""
data = ''
if isinstance(bdata,bytes) :
try:
data = Io.str(bdata)
except Exception as e:
hexv = []
for i in bdata[1:] :
hexv.append(hex(i)[2:].rjust(2,'0'))
data = ' '.join(hexv)
pass
else :
data = bdata
return bdata
@staticmethod
def is_binary(filename):
"""Check if given filename is binary."""
done = False
f = Io.get_file_obj(filename, True)
try:
CHUNKSIZE = 1024
while 1:
chunk = f.read(CHUNKSIZE)
if b'\0' in chunk: done = True # found null byte
if done or len(chunk) < CHUNKSIZE: break
finally:
f.close()
return done
@staticmethod
def file_exists(path):
""""""
exist = False
try:
if path is not None :
with open(path) as f: exist = True
except IOError as e: pass
return exist
@staticmethod
def touch(fname, times=None):
""" only existing files """
if Io.file_exists(fname):
Io.utime(fname, times)