Source code for pwnypack.marshal

"""
This module contains functions to load and unserialize data (including .pyc
files) serialized using the :mod:`marshal` module on most version of python.
"""

import datetime
from enum import IntEnum

import six

from pwnypack.bytecode import CodeObject
from pwnypack.py_internals import PY_INTERNALS, get_py_internals
from pwnypack.packing import U16, u16, U32, u32, u64, unpack
from pwnypack.target import Target


__all__ = ['marshal_load', 'marshal_loads', 'pyc_load', 'pyc_loads']


MAGIC_MAP = dict(
    (internals['magic'], internals)
    for version, internals in six.iteritems(PY_INTERNALS)
    if version is not None
)
MARSHAL_TARGET = Target(Target.Arch.unknown, Target.Bits.bits_32, Target.Endian.little)
NULL = object()
PyLong_MARSHAL_SHIFT = 15
FLAG_REF = 0x80


class ObjectType(IntEnum):
    """
    Enumeration used internally to describe / parse a marshal object type.
    """

    null = ord('0')
    none = ord('N')
    false = ord('F')
    true = ord('T')
    stopiter = ord('S')
    ellipsis = ord('.')
    int = ord('i')
    int64 = ord('I')
    float = ord('f')
    binary_float = ord('g')
    complex = ord('x')
    binary_complex = ord('y')
    long = ord('l')
    string = ord('s')
    stringref = ord('R')
    interned = ord('t')
    ref = ord('r')
    tuple = ord('(')
    list = ord('[')
    dict = ord('{')
    code = ord('c')
    unicode = ord('u')
    unknown = ord('?')
    set = ord('<')
    frozenset = ord('>')
    ascii = ord('a')
    ascii_interned = ord('A')
    small_tuple = ord(')')
    short_ascii = ord('z')
    short_ascii_interned = ord('Z')


[docs]def marshal_load(fp, origin=None): """ Unserialize data serialized with :func:`marshal.dump`. This function works across python versions. Marshalled code objects are returned as instances of :class:`~pwnypack.bytecode.CodeObject`. Arguments: fp(file): A file or file-like object that contains the serialized data. origin(dict): The opcode specification of the python version that generated the data. If you provide ``None``, the specs for the currently running python version will be used. Returns: The unserialized data. """ origin = get_py_internals(origin) version = origin['version'] refs = [] def ref(o, flags): if flags & FLAG_REF: refs.append(o) return o def read_byte(): return six.byte2int(fp.read(1)) def read_short(): return u16(fp.read(2), target=MARSHAL_TARGET) def read_long(): return u32(fp.read(4), target=MARSHAL_TARGET) def read_int64(): return u64(fp.read(8), target=MARSHAL_TARGET) def read_float_binary(): return unpack('d', fp.read(8), target=MARSHAL_TARGET)[0] def read_bytes(): return fp.read(read_long()) def read_bytes_short(): return fp.read(read_byte()) def read_float_text(): return float(read_bytes_short()) def read_object(): c = six.byte2int(fp.read(1)) flags = c & FLAG_REF c = ObjectType(c & ~FLAG_REF) if c is ObjectType.null: return NULL elif c is ObjectType.none: return None elif c is ObjectType.stopiter: return StopIteration elif c is ObjectType.ellipsis: return Ellipsis elif c is ObjectType.false: return False elif c is ObjectType.true: return True elif c is ObjectType.int: return ref(read_long(), flags) elif c is ObjectType.int64: return ref(read_int64(), flags) elif c is ObjectType.long: n = read_long() r = sum( read_short() << (i * PyLong_MARSHAL_SHIFT) for i in range(abs(n)) ) return ref(-r if n < 0 else r, flags) elif c is ObjectType.float: return ref(read_float_text(), flags) elif c is ObjectType.binary_float: return ref(read_float_binary(), flags) elif c is ObjectType.complex: real = read_float_text() imag = read_float_text() return ref(complex(real, imag), flags) elif c is ObjectType.binary_complex: real = read_float_binary() imag = read_float_binary() return ref(complex(real, imag), flags) elif c is ObjectType.string: return ref(read_bytes(), flags) elif c is ObjectType.unicode: return ref(read_bytes().decode('utf-8'), flags) elif c is ObjectType.interned: if version < 30: return ref(read_bytes(), FLAG_REF) else: return ref(read_bytes().decode('utf-8'), flags) elif c is ObjectType.ascii: return ref(read_bytes().decode('ascii'), flags) elif c is ObjectType.ascii_interned: return ref(read_bytes().decode('ascii'), flags) elif c is ObjectType.short_ascii: return ref(read_bytes_short().decode('ascii'), flags) elif c is ObjectType.short_ascii_interned: return ref(read_bytes_short().decode('ascii'), flags) elif c in (ObjectType.tuple, ObjectType.small_tuple, ObjectType.frozenset): ref_index = len(refs) ref(NULL, flags) r_type = frozenset if c is ObjectType.frozenset else tuple n = read_byte() if c is ObjectType.small_tuple else read_long() r = r_type(read_object() for _ in range(n)) if flags & FLAG_REF: refs[ref_index] = r return r elif c is ObjectType.list: r = ref([], flags) for _ in range(read_long()): r.append(read_object()) return r elif c is ObjectType.set: r = ref(set(), flags) for _ in range(read_long()): r.add(read_object()) return r elif c is ObjectType.dict: r = ref({}, flags) while True: k = read_object() if k is NULL: break r[k] = read_object() return r elif c in (ObjectType.stringref, ObjectType.ref): return refs[read_long()] elif c is ObjectType.code: ref_index = len(refs) ref(NULL, flags) co_argcount = read_long() if version < 30: co_kwonlyargcount = 0 else: co_kwonlyargcount = read_long() co_nlocals = read_long() co_stacksize = read_long() co_flags = read_long() co_code = read_object() co_consts = read_object() co_names = read_object() co_varnames = read_object() co_freevars = read_object() co_cellvars = read_object() co_filename = read_object() co_name = read_object() co_firstlineno = read_long() co_lnotab = read_object() r = CodeObject( co_argcount, co_kwonlyargcount, co_nlocals, co_stacksize, co_flags, co_code, co_consts, co_names, co_varnames, co_filename, co_name, co_firstlineno, co_lnotab, co_freevars, co_cellvars, origin, ) if flags & FLAG_REF: refs[ref_index] = r return r else: raise ValueError('Unexpected object type %s.' % c) return read_object()
[docs]def marshal_loads(data, origin=None): """ Load data serialized with :func:`marshal.dump` from a bytestring. Arguments: data(bytes): The marshalled data. origin(dict): The opcode specification of the python version that generated the data. If you provide ``None``, the specs for the currently running python version will be used. Returns: The unserialized data. """ return marshal_load(six.BytesIO(data), origin)
class PycFile(object): """ This class describes a parsed .pyc file and is returned by :func:`pyc_load` and :func:`pyc_loads`. """ def __init__(self, magic, origin, timestamp, file_size, code): self.magic = magic #: The magic number of the python version that created the file. self.origin = origin #: The internals of the accompanying python version. self.timestamp = timestamp #: The timestamp of the original source file. self.file_size = file_size #: The original source file's since (or None if version < 3.3). self.code = code #: The :class:`CodeObject` instance that represents the contents of the .pyc file.
[docs]def pyc_load(fp): """ Load a .pyc file from a file-like object. Arguments: fp(file): The file-like object to read. Returns: PycFile: The parsed representation of the .pyc file. """ magic_1 = U16(fp.read(2), target=MARSHAL_TARGET) magic_2 = U16(fp.read(2), target=MARSHAL_TARGET) internals = MAGIC_MAP.get(magic_1) if internals is None: raise ValueError('Invalid or unknown magic (%d).' % magic_1) if magic_2 != 2573: raise ValueError('Invalid secondary magic (%d).' % magic_2) timestamp = datetime.datetime.fromtimestamp(U32(fp.read(4), target=MARSHAL_TARGET)) if internals['version'] >= 33: file_size = U32(fp.read(4)) else: file_size = None code_object = marshal_load(fp, internals) return PycFile(magic_1, internals, timestamp, file_size, code_object)
[docs]def pyc_loads(data): """ Load a .pyc file from a bytestring. Arguments: data(bytes): The content of the .pyc file. Returns: PycFile: The parsed representation of the .pyc file. """ return pyc_load(six.BytesIO(data))