Source code for pagesign

# -*- coding: utf-8 -*-
#
# Copyright (C) 2021 Red Dove Consultants Limited
#
import base64
import functools
import hashlib
import json
import logging
import os
import re
import shutil
# import stat
import subprocess
import sys
import tempfile
import threading

__version__ = '0.1.1.dev0'
__author__ = 'Vinay Sajip'
__date__ = "$05-Dec-2021 12:39:53$"

if sys.version_info[:2] < (3, 6):  # pragma: no cover
    raise ImportError('This module requires Python >= 3.6 to run.')

logger = logging.getLogger(__name__)

__all__ = [
    'Identity',
    'remove_identities',
    'clear_identities',
    'list_identities',
    'encrypt',
    'decrypt',
    'sign',
    'verify',
    'encrypt_and_sign',
    'verify_and_decrypt'
]

if os.name == 'nt':
    PAGESIGN_DIR = os.path.join(os.environ['LOCALAPPDATA'], 'pagesign')
else:
    PAGESIGN_DIR = os.path.expanduser('~/.pagesign')

CREATED_PATTERN = re.compile('# created: (.*)', re.I)
APK_PATTERN = re.compile('# public key: (.*)', re.I)
ASK_PATTERN = re.compile(r'AGE-SECRET-KEY-.*')
MPI_PATTERN = re.compile(r'minisign public key (\S+)')

if not os.path.exists(PAGESIGN_DIR):  # pragma: no cover
    os.makedirs(PAGESIGN_DIR)

if not os.path.isdir(PAGESIGN_DIR):  # pragma: no cover
    raise ValueError('%s exists but is not a directory.' % PAGESIGN_DIR)

os.chmod(PAGESIGN_DIR, 0o700)


def _load_keys():
    result = {}
    p = os.path.join(PAGESIGN_DIR, 'keys')
    if os.path.exists(p):
        with open(p, encoding='utf-8') as f:
            result = json.load(f)
    return result


def _save_keys(keys):
    p = os.path.join(PAGESIGN_DIR, 'keys')
    with open(p, 'w', encoding='utf-8') as f:
        json.dump(keys, f, indent=2, sort_keys=True)
    os.chmod(p, 0o600)


KEYS = _load_keys()

PUBLIC_ATTRS = ('created', 'crypt_public', 'sign_public', 'sign_id')

ATTRS = PUBLIC_ATTRS + ('crypt_secret', 'sign_secret', 'sign_pass')


def clear_identities(keys=KEYS):
    if len(keys):
        keys.clear()
        _save_keys(keys)


def remove_identities(*args):
    changed = False
    for name in args:
        if name in KEYS:
            del KEYS[name]
            changed = True
    if changed:
        _save_keys(KEYS)


def list_identities():
    return KEYS.items()


def _make_password(length):
    return base64.b64encode(os.urandom(length)).decode('ascii')


def _read_out(stream, result, key='stdout'):
    data = b''
    while True:
        c = stream.read1(100)
        if not c:
            break
        data += c
    result[key] = data


# def _read_age_encrypt_err(passphrase, stream, stdin, result):
    # data = b''
    # pwd = (passphrase + os.linesep).encode('ascii')
    # pwd_written = 0
    # sep = os.linesep.encode('ascii')
    # prompt1 = b'Enter passphrase (leave empty to autogenerate a secure one): '
    # prompt2 = prompt1 + sep + b'Confirm passphrase: '
    # prompts = (prompt1, prompt2)
    # while True:
        # c = stream.read1(100)
        # data += c
        # # print('err: %s' % data)
        # if data in prompts:
            # stdin.write(pwd)
            # stdin.flush()
            # pwd_written += 1
            # if pwd_written == 2:
                # stdin.close()
                # break
    # result['stderr'] = data


# def _read_age_decrypt_err(passphrase, stream, stdin, result):
    # data = b''
    # pwd = (passphrase + os.linesep).encode('ascii')
    # while True:
        # c = stream.read1(100)
        # data += c
        # # print('err: %s' % data)
        # if data == b'Enter passphrase: ':
            # stdin.write(pwd)
            # stdin.flush()
            # stdin.close()
            # break
    # result['stderr'] = data


def _run_command(cmd, wd, err_reader=None, decode=True):
    # print('Running: %s' % (cmd if isinstance(cmd, str) else ' '.join(cmd)))
    # if cmd[0] == 'age': import pdb; pdb.set_trace()
    if not isinstance(cmd, list):
        cmd = cmd.split()
    logger.debug('Running: %s' % cmd)
    kwargs = {
        'cwd': wd,
        'stdout': subprocess.PIPE,
        'stderr': subprocess.PIPE
    }
    if err_reader:
        kwargs['stdin'] = subprocess.PIPE
    p = subprocess.Popen(cmd, **kwargs)
    if err_reader is None:
        stdout, stderr = p.communicate()
    else:
        data = {}
        rout = threading.Thread(target=_read_out, args=(p.stdout, data))
        rout.daemon = True
        rout.start()

        rerr = threading.Thread(target=err_reader, args=(p.stderr, p.stdin, data))
        rerr.daemon = True
        rerr.start()

        rout.join()
        rerr.join()

        p.wait()

        stdout = data['stdout']
        stderr = data['stderr']

        p.stdout.close()
        p.stderr.close()

    if p.returncode == 0:
        if decode:
            stdout = stdout.decode('utf-8')
            stderr = stderr.decode('utf-8')
        return stdout, stderr
    else:  # pragma: no cover
        # import pdb; pdb.set_trace()
        # if False:
            # print('Command %r failed with return code %d' % (cmd[0], p.returncode))
            # print('stdout was:')
            # if stdout:
                # print(stdout.decode('utf-8'))
            # print('stderr was:')
            # if stderr:
                # print(stderr.decode('utf-8'))
            # print('Raising an exception')
        raise subprocess.CalledProcessError(p.returncode, p.args,
                                            output=stdout, stderr=stderr)


def _get_work_file(**kwargs):
    fd, result = tempfile.mkstemp(**kwargs)
    os.close(fd)
    return result


def _shred(path, delete=True):
    size = os.stat(path).st_size
    passes = 2
    with open(path, 'wb') as f:
        for i in range(passes):
            if i > 0:
                f.seek(0)
            f.write(os.urandom(size))
    if delete:
        os.remove(path)


[docs]class Identity: encoding = 'utf-8'
[docs] def __init__(self, name=None): if name: if name in KEYS: self.__dict__.update(KEYS[name]) else: # pragma: no cover raise ValueError('No such identity: %r' % name) else: # Generate a new identity wd = tempfile.mkdtemp(dir=PAGESIGN_DIR, prefix='work-') try: p = os.path.join(wd, 'age-key') cmd = 'age-keygen -o %s' % p _run_command(cmd, wd) with open(p, encoding=self.encoding) as f: lines = f.read().splitlines() for line in lines: m = CREATED_PATTERN.match(line) if m: self.created = m.groups()[0] continue m = APK_PATTERN.match(line) if m: self.crypt_public = m.groups()[0] continue m = ASK_PATTERN.match(line) if m: self.crypt_secret = line _shred(p, False) # the whole directory will get removed sfn = _get_work_file(prefix='msk-', dir=wd) pfn = _get_work_file(prefix='mpk-', dir=wd) self.sign_pass = _make_password(12) cmd = 'minisign -fG -p %s -s %s' % (pfn, sfn) _run_command(cmd, wd, self._read_minisign_gen_err) with open(pfn, encoding=self.encoding) as f: lines = f.read().splitlines() for line in lines: m = MPI_PATTERN.search(line) if m: self.sign_id = m.groups()[0] else: self.sign_public = line with open(sfn, encoding=self.encoding) as f: self.sign_secret = f.read() _shred(sfn, False) # the whole directory will get removed finally: shutil.rmtree(wd) for attr in ATTRS: assert hasattr(self, attr)
[docs] def save(self, name): d = dict(self.__dict__) # might need to remove some attrs from d here ... KEYS[name] = d _save_keys(KEYS)
def _read_minisign_gen_err(self, stream, stdin, result): data = b'' pwd = (self.sign_pass + os.linesep).encode('ascii') pwd_written = 0 sep = os.linesep.encode('ascii') prompt1 = b'Password: ' prompt2 = prompt1 + sep + b'Password (one more time): ' prompts = (prompt1, prompt2) while True: c = stream.read1(100) data += c # print('err: %s' % data) if data in prompts: stdin.write(pwd) stdin.flush() pwd_written += 1 # print('Wrote pwd') if pwd_written == 2: stdin.close() break result['stderr'] = data def _read_minisign_sign_err(self, stream, stdin, result): data = b'' pwd = (self.sign_pass + os.linesep).encode('ascii') while True: c = stream.read(1) data += c # print('err: %s' % data) if data == b'Password: ': stdin.write(pwd) stdin.close() break result['stderr'] = data
[docs] def export(self): d = dict(self.__dict__) for k in self.__dict__: if '_secret' in k or '_pass' in k: del d[k] return d
[docs] @classmethod def imported(cls, d, name): result = object.__new__(cls) for k in PUBLIC_ATTRS: try: setattr(result, k, d[k]) except KeyError: # pragma: no cover logger.warning('Attribute absent: %s', k) result.save(name) return result
def _get_encryption_command(recipients, armor): if not recipients: # pragma: no cover raise ValueError('At least one recipient needs to be specified.') result = ['age', '-e'] if armor: result.append('-a') if isinstance(recipients, str): recipients = [recipients] if not isinstance(recipients, (list, tuple)): raise ValueError('invalid recipients: %s' % recipients) for r in recipients: if r not in KEYS: # pragma: no cover raise ValueError('No such recipient: %s' % r) info = KEYS[r] result.extend(['-r', info['crypt_public']]) return result
[docs]def encrypt(path, recipients, outpath=None, armor=False): if not os.path.isfile(path): # pragma: no cover raise ValueError('No such file: %s' % path) if outpath is None: outpath = '%s.age' % path else: # pragma: no cover d = os.path.dirname(outpath) if not os.path.exists(d): os.makedirs(d) elif not os.path.isdir(d): # pragma: no cover raise ValueError('Not a directory: %s' % d) # if dir, assume writeable, for now cmd = _get_encryption_command(recipients, armor) cmd.extend(['-o', outpath]) cmd.append(path) _run_command(cmd, os.getcwd()) return outpath
def _data_writer(data, stream, stdin, result): stdin.write(data) stdin.close() _read_out(stream, result, 'stderr')
[docs]def encrypt_mem(data, recipients, armor=False): cmd = _get_encryption_command(recipients, armor) if isinstance(data, str): data = data.encode('utf-8') if not isinstance(data, bytes): # pragma: no cover raise TypeError('invalid data: %s' % data) err_reader = functools.partial(_data_writer, data) stdout, stderr = _run_command(cmd, os.getcwd(), err_reader, False) return stdout
def _get_decryption_command(identities): if not identities: # pragma: no cover raise ValueError('At least one identity needs to be specified.') cmd = ['age', '-d'] if isinstance(identities, str): identities = [identities] if not isinstance(identities, (list, tuple)): # pragma: no cover raise ValueError('invalid identities: %s' % identities) fn = _get_work_file(dir=PAGESIGN_DIR, prefix='ident-') ident_values = [] for ident in identities: if ident not in KEYS: # pragma: no cover raise ValueError('No such identity: %s' % ident) ident_values.append(KEYS[ident]['crypt_secret']) with open(fn, 'w', encoding='utf-8') as f: f.write('\n'.join(ident_values)) cmd.extend(['-i', fn]) return cmd, fn
[docs]def decrypt(path, identities, outpath=None): if not os.path.isfile(path): # pragma: no cover raise ValueError('No such file: %s' % path) if outpath is None: if path.endswith('.age'): outpath = path[:-4] else: outpath = '%s.dec' % path else: d = os.path.dirname(outpath) if not os.path.exists(d): # pragma: no cover os.makedirs(d) elif not os.path.isdir(d): # pragma: no cover raise ValueError('Not a directory: %s' % d) # if dir, assume writeable, for now cmd, fn = _get_decryption_command(identities) # import pdb; pdb.set_trace() try: cmd.extend(['-o', outpath]) cmd.append(path) _run_command(cmd, os.getcwd()) return outpath finally: _shred(fn)
[docs]def decrypt_mem(data, identities): cmd, fn = _get_decryption_command(identities) if isinstance(data, str): # pragma: no cover data = data.encode('utf-8') if not isinstance(data, bytes): # pragma: no cover raise TypeError('invalid data: %s' % data) err_reader = functools.partial(_data_writer, data) try: stdout, stderr = _run_command(cmd, os.getcwd(), err_reader, False) return stdout finally: _shred(fn)
[docs]def sign(path, identity, outpath=None): if not identity: # pragma: no cover raise ValueError('An identity needs to be specified.') if identity not in KEYS: # pragma: no cover raise ValueError('No such identity: %s' % identity) ident = Identity(identity) if not os.path.isfile(path): # pragma: no cover raise ValueError('No such file: %s' % path) if outpath is None: outpath = '%s.sig' % path else: d = os.path.dirname(outpath) if not os.path.exists(d): # pragma: no cover os.makedirs(d) elif not os.path.isdir(d): # pragma: no cover raise ValueError('Not a directory: %s' % d) # if dir, assume writeable, for now fd, fn = tempfile.mkstemp(dir=PAGESIGN_DIR, prefix='seckey-') os.write(fd, (KEYS[identity]['sign_secret'] + os.linesep).encode('ascii')) os.close(fd) try: cmd = ['minisign', '-S', '-x', outpath, '-s', fn, '-m', path] _run_command(cmd, os.getcwd(), ident._read_minisign_sign_err) finally: _shred(fn) return outpath
[docs]def verify(path, identity, sigpath=None): if not identity: # pragma: no cover raise ValueError('An identity needs to be specified.') if identity not in KEYS: # pragma: no cover raise ValueError('No such identity: %s' % identity) ident = Identity(identity) if not os.path.isfile(path): # pragma: no cover raise ValueError('No such file: %s' % path) if sigpath is None: sigpath = '%s.sig' % path if not os.path.isfile(sigpath): # pragma: no cover raise ValueError('No such file: %s' % sigpath) cmd = ['minisign', '-V', '-x', sigpath, '-P', ident.sign_public, '-m', path] # import pdb; pdb.set_trace() _run_command(cmd, os.getcwd())
def _get_b64(path): with open(path, 'rb') as f: return base64.b64encode(f.read()).decode('ascii')
[docs]def encrypt_and_sign(path, recipients, signer, armor=False, outpath=None, sigpath=None): if not recipients or not signer: # pragma: no cover raise ValueError('At least one recipient (and one signer) needs to be specified.') if not os.path.isfile(path): raise ValueError('No such file: %s' % path) naive = False if naive: # pragma: no cover outpath = encrypt(path, recipients, outpath=outpath, armor=armor) sigpath = sign(outpath, signer, outpath=sigpath) return outpath, sigpath else: # Use a sign/encrypt/sign strategy: # 1. Sign the plaintext. # 2. Construct a JSON of the base64-encoded plaintext and signature. # 3. Encrypt that. # 4. Hash all the recipient public keys into a list. # 5. Construct a JSON of the encrypted data and recipient hashes. # 6. Sign that. fn = _get_work_file(dir=PAGESIGN_DIR, prefix='sig-') sigpath = sign(path, signer, fn) inner = { 'plaintext': _get_b64(path), 'signature': _get_b64(sigpath) } os.remove(sigpath) data = json.dumps(inner).encode('ascii') encrypted = encrypt_mem(data, recipients, armor) if not armor: encrypted = base64.b64encode(encrypted) if isinstance(recipients, str): recipients = [recipients] # if we encrypted OK, there can't have been problems with the recipients hashes = [] for r in recipients: info = KEYS[r] pk = info['crypt_public'].encode('ascii') hashes.append(hashlib.sha256(pk).hexdigest()) outer = { 'encrypted': encrypted.decode('ascii'), 'armored': armor, 'recipients': hashes } data = json.dumps(outer).encode('ascii') outpath = _get_work_file(dir=PAGESIGN_DIR, prefix='message-') with open(outpath, 'wb') as f: f.write(data) sigpath = sign(outpath, signer) return outpath, sigpath
[docs]def verify_and_decrypt(path, recipients, signer, outpath=None, sigpath=None): if not signer or not recipients: # pragma: no cover raise ValueError('At least one recipient (and one signer) needs to be specified.') if not os.path.isfile(path): # pragma: no cover raise ValueError('No such file: %s' % path) if sigpath is None: sigpath = path + '.sig' if not os.path.exists(sigpath): # pragma: no cover raise ValueError('no such file: %s' % sigpath) verify(path, signer, sigpath) naive = False if naive: # pragma: no cover return decrypt(path, recipients, outpath) else: with open(path, 'r', encoding='ascii') as f: outer = json.load(f) encrypted = outer['encrypted'].encode('ascii') if not outer['armored']: encrypted = base64.b64decode(encrypted) hashes = set(outer['recipients']) if isinstance(recipients, str): recipients = [recipients] for r in recipients: if r not in KEYS: # pragma: no cover raise ValueError('No such recipient: %s' % r) info = KEYS[r] pk = info['crypt_public'].encode('ascii') h = hashlib.sha256(pk).hexdigest() if h not in hashes: # pragma: no cover raise ValueError('Not a valid recipient: %s' % r) decrypted = decrypt_mem(encrypted, recipients).decode('ascii') inner = json.loads(decrypted) fd, outpath = tempfile.mkstemp(dir=PAGESIGN_DIR, prefix='msg-') os.write(fd, base64.b64decode(inner['plaintext'].encode('ascii'))) os.close(fd) sigpath = outpath + '.sig' with open(sigpath, 'wb') as f: f.write(base64.b64decode(inner['signature'].encode('ascii'))) verify(outpath, signer, sigpath) os.remove(sigpath) return outpath