From d9225fb9158e16f35457ac8e39d6d4283571aa56 Mon Sep 17 00:00:00 2001 From: jreback Date: Fri, 3 May 2013 15:24:01 -0400 Subject: [PATCH] ENH: support for msgpack serialization/deserialization DOC: install.rst mention DOC: added license from msgpack_numpy PERF: changed Timestamp and DatetimeIndex serialization for speedups add vb_suite benchmarks ENH: added to_msgpack method in generic.py, and default import into pandas TST: all packers to always be imported, fail on usage with no msgpack installed DOC: added mentions in release notes, v0.11.1, basics ENH: provide automatic list if multiple args passed to to_msgpack DOC: changed docs to 0.12 ENH: iterator support for stream unpacking Conflicts: RELEASE.rst ENH: added support for Panel,SparseSeries,SparseDataFrame,SparsePanel,IntIndex,BlockIndex ENH: handle np.datetime64,np.timedelta64,date,timedelta types TST: added compression (zlib/blosc) via big hack DOC: moved back to 0.11.1 docs BLD: integrated with built-in msgpack DOC: io.rst fixes PERF: update vb_suite for packers TST: fix for test_list_float_complex test? PERF: prototype for packing faster PERF: was still using tolist on indicies DOC: v0.13.0.txt and release notes DOC: release notes PERF: revamples packers vbench to use packers,csv,pickle,hdf_store,hdf_table TST: better test comparison s for numpy types BLD: py3k compat --- LICENSES/MSGPACK_NUMPY_LICENSE | 33 ++ doc/source/io.rst | 68 +++++ doc/source/release.rst | 24 +- doc/source/v0.13.0.txt | 29 ++ pandas/core/generic.py | 4 + pandas/io/api.py | 1 + pandas/io/packers.py | 522 ++++++++++++++++++++++++++++++++ pandas/io/tests/test_packers.py | 372 +++++++++++++++++++++++ pandas/msgpack.pyx | 59 ++++ vb_suite/packers.py | 94 ++++++ vb_suite/suite.py | 1 + 11 files changed, 1196 insertions(+), 11 deletions(-) create mode 100644 LICENSES/MSGPACK_NUMPY_LICENSE create mode 100644 pandas/io/packers.py create mode 100644 pandas/io/tests/test_packers.py create mode 100644 vb_suite/packers.py diff --git a/LICENSES/MSGPACK_NUMPY_LICENSE b/LICENSES/MSGPACK_NUMPY_LICENSE new file mode 100644 index 0000000000000..e570011efac73 --- /dev/null +++ b/LICENSES/MSGPACK_NUMPY_LICENSE @@ -0,0 +1,33 @@ +.. -*- rst -*- + +License +======= + +Copyright (c) 2013, Lev Givon. +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + +* Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. +* Redistributions in binary form must reproduce the above + copyright notice, this list of conditions and the following + disclaimer in the documentation and/or other materials provided + with the distribution. +* Neither the name of Lev Givon nor the names of any + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/doc/source/io.rst b/doc/source/io.rst index 5e04fcff61539..67ed21112ef0c 100644 --- a/doc/source/io.rst +++ b/doc/source/io.rst @@ -36,6 +36,7 @@ object. * ``read_hdf`` * ``read_sql`` * ``read_json`` + * ``read_msgpack`` * ``read_html`` * ``read_stata`` * ``read_clipboard`` @@ -48,6 +49,7 @@ The corresponding ``writer`` functions are object methods that are accessed like * ``to_hdf`` * ``to_sql`` * ``to_json`` + * ``to_msgpack`` * ``to_html`` * ``to_stata`` * ``to_clipboard`` @@ -1732,6 +1734,72 @@ module is installed you can use it as a xlsx writer engine as follows: .. _io.hdf5: +Serialization +------------- + +msgpack +~~~~~~~ + +.. _io.msgpack: + +.. versionadded:: 0.11.1 + +Starting in 0.11.1, pandas is supporting the ``msgpack`` format for +object serialization. This is a lightweight portable binary format, similar +to binary JSON, that is highly space efficient, and provides good performance +both on the writing (serialization), and reading (deserialization). + +.. warning:: + + This is a very new feature of pandas. We intend to provide certain + optimizations in the io of the ``msgpack`` data. We do not intend this + format to change (and will be backward compatible if we do). + +.. ipython:: python + + df = DataFrame(np.random.rand(5,2),columns=list('AB')) + df.to_msgpack('foo.msg') + pd.read_msgpack('foo.msg') + s = Series(np.random.rand(5),index=date_range('20130101',periods=5)) + +You can pass a list of objects and you will receive them back on deserialization. + +.. ipython:: python + + pd.to_msgpack('foo.msg', df, 'foo', np.array([1,2,3]), s) + pd.read_msgpack('foo.msg') + +You can pass ``iterator=True`` to iterate over the unpacked results + +.. ipython:: python + + for o in pd.read_msgpack('foo.msg',iterator=True): + print o + +You can pass ``append=True`` to the writer to append to an existing pack + +.. ipython:: python + + df.to_msgpack('foo.msg',append=True) + pd.read_msgpack('foo.msg') + +Unlike other io methods, ``to_msgpack`` is available on both a per-object basis, +``df.to_msgpack()`` and using the top-level ``pd.to_msgpack(...)`` where you +can pack arbitrary collections of python lists, dicts, scalars, while intermixing +pandas objects. + +.. ipython:: python + + pd.to_msgpack('foo2.msg', { 'dict' : [ { 'df' : df }, { 'string' : 'foo' }, { 'scalar' : 1. }, { 's' : s } ] }) + pd.read_msgpack('foo2.msg') + +.. ipython:: python + :suppress: + :okexcept: + + os.remove('foo.msg') + os.remove('foo2.msg') + HDF5 (PyTables) --------------- diff --git a/doc/source/release.rst b/doc/source/release.rst index 65e6ca0e1d95c..be62ef7d31a0b 100644 --- a/doc/source/release.rst +++ b/doc/source/release.rst @@ -64,17 +64,19 @@ New features Experimental Features ~~~~~~~~~~~~~~~~~~~~~ -- The new :func:`~pandas.eval` function implements expression evaluation using - ``numexpr`` behind the scenes. This results in large speedups for complicated - expressions involving large DataFrames/Series. -- :class:`~pandas.DataFrame` has a new :meth:`~pandas.DataFrame.eval` that - evaluates an expression in the context of the ``DataFrame``. -- A :meth:`~pandas.DataFrame.query` method has been added that allows - you to select elements of a ``DataFrame`` using a natural query syntax nearly - identical to Python syntax. -- ``pd.eval`` and friends now evaluate operations involving ``datetime64`` - objects in Python space because ``numexpr`` cannot handle ``NaT`` values - (:issue:`4897`). + - The new :func:`~pandas.eval` function implements expression evaluation using + ``numexpr`` behind the scenes. This results in large speedups for complicated + expressions involving large DataFrames/Series. + - :class:`~pandas.DataFrame` has a new :meth:`~pandas.DataFrame.eval` that + evaluates an expression in the context of the ``DataFrame``. + - A :meth:`~pandas.DataFrame.query` method has been added that allows + you to select elements of a ``DataFrame`` using a natural query syntax nearly + identical to Python syntax. + - ``pd.eval`` and friends now evaluate operations involving ``datetime64`` + objects in Python space because ``numexpr`` cannot handle ``NaT`` values + (:issue:`4897`). + - Add msgpack support via ``pd.read_msgpack()`` and ``pd.to_msgpack()/df.to_msgpack()`` for serialization + of arbitrary pandas (and python objects) in a lightweight portable binary format (:issue:`686`) Improvements to existing features ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/doc/source/v0.13.0.txt b/doc/source/v0.13.0.txt index 5ff7038d02e45..9ee1ce12e0e3e 100644 --- a/doc/source/v0.13.0.txt +++ b/doc/source/v0.13.0.txt @@ -686,6 +686,35 @@ to unify methods and behaviors. Series formerly subclassed directly from s.a = 5 s +IO Enhancements +~~~~~~~~~~~~~~~ + +- ``pd.read_msgpack()`` and ``pd.to_msgpack()`` are now a supported method of serialization + of arbitrary pandas (and python objects) in a lightweight portable binary format. :ref:`See the docs` + + .. ipython:: python + + df = DataFrame(np.random.rand(5,2),columns=list('AB')) + df.to_msgpack('foo.msg') + pd.read_msgpack('foo.msg') + + s = Series(np.random.rand(5),index=date_range('20130101',periods=5)) + pd.to_msgpack('foo.msg', df, s) + pd.read_msgpack('foo.msg') + + You can pass ``iterator=True`` to iterator over the unpacked results + + .. ipython:: python + + for o in pd.read_msgpack('foo.msg',iterator=True): + print o + + .. ipython:: python + :suppress: + :okexcept: + + os.remove('foo.msg') + Bug Fixes ~~~~~~~~~ diff --git a/pandas/core/generic.py b/pandas/core/generic.py index 835b66512a89e..e96d159c26af4 100644 --- a/pandas/core/generic.py +++ b/pandas/core/generic.py @@ -805,6 +805,10 @@ def to_hdf(self, path_or_buf, key, **kwargs): from pandas.io import pytables return pytables.to_hdf(path_or_buf, key, self, **kwargs) + def to_msgpack(self, path_or_buf, **kwargs): + from pandas.io import packers + return packers.to_msgpack(path_or_buf, self, **kwargs) + def to_pickle(self, path): """ Pickle (serialize) object to input file path diff --git a/pandas/io/api.py b/pandas/io/api.py index 94deb51ab4b18..dc9ea290eb45e 100644 --- a/pandas/io/api.py +++ b/pandas/io/api.py @@ -11,3 +11,4 @@ from pandas.io.sql import read_sql from pandas.io.stata import read_stata from pandas.io.pickle import read_pickle, to_pickle +from pandas.io.packers import read_msgpack, to_msgpack diff --git a/pandas/io/packers.py b/pandas/io/packers.py new file mode 100644 index 0000000000000..68d1563d31529 --- /dev/null +++ b/pandas/io/packers.py @@ -0,0 +1,522 @@ +""" +Msgpack serializer support for reading and writing pandas data structures +to disk +""" + +# porfions of msgpack_numpy package, by Lev Givon were incorporated +# into this module (and tests_packers.py) + +""" +License +======= + +Copyright (c) 2013, Lev Givon. +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + +* Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. +* Redistributions in binary form must reproduce the above + copyright notice, this list of conditions and the following + disclaimer in the documentation and/or other materials provided + with the distribution. +* Neither the name of Lev Givon nor the names of any + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +""" + +from datetime import datetime, date, timedelta +import time +import re +import copy +import itertools +import warnings +from dateutil.parser import parse + +import numpy as np +from pandas import ( + Timestamp, Period, Series, TimeSeries, DataFrame, Panel, Panel4D, + Index, MultiIndex, Int64Index, PeriodIndex, DatetimeIndex, NaT +) +from pandas.sparse.api import SparseSeries, SparseDataFrame, SparsePanel +from pandas.sparse.array import BlockIndex, IntIndex +from pandas.tseries.api import PeriodIndex, DatetimeIndex +from pandas.core.index import Int64Index, _ensure_index +import pandas.core.common as com +from pandas.core.generic import NDFrame +from pandas.core.common import needs_i8_conversion +from pandas.core.internals import BlockManager, make_block +import pandas.core.internals as internals + +from pandas.msgpack import Unpacker as _Unpacker, Packer as _Packer +import zlib + +try: + import blosc + _BLOSC = True +except: + _BLOSC = False + +## until we can pass this into our conversion functions, +## this is pretty hacky +compressor = None + +def to_msgpack(path, *args, **kwargs): + """ + msgpack (serialize) object to input file path + + Parameters + ---------- + path : string + File path + args : an object or objects to serialize + + append : boolean whether to append to an existing msgpack + (default is False) + compress : type of compressor (zlib or blosc), default to None (no compression) + """ + global compressor + compressor = kwargs.pop('compress',None) + append = kwargs.pop('append',None) + if append: + f = open(path, 'a+b') + else: + f = open(path, 'wb') + try: + if len(args) == 1: + f.write(pack(args[0],**kwargs)) + else: + for a in args: + f.write(pack(a,**kwargs)) + finally: + f.close() + + +def read_msgpack(path, iterator=False, **kwargs): + """ + Load msgpack pandas object from the specified + file path + + Parameters + ---------- + path : string + File path + iterator : boolean, if True, return an iterator to the unpacker + (default is False) + + Returns + ------- + obj : type of object stored in file + + """ + if iterator: + return Iterator(path) + + with open(path,'rb') as fh: + l = list(unpack(fh)) + if len(l) == 1: + return l[0] + return l + +dtype_dict = { 21 : np.dtype('M8[ns]'), + u'datetime64[ns]' : np.dtype('M8[ns]'), + u'datetime64[us]' : np.dtype('M8[us]'), + 22 : np.dtype('m8[ns]'), + u'timedelta64[ns]' : np.dtype('m8[ns]'), + u'timedelta64[us]' : np.dtype('m8[us]') } + +def dtype_for(t): + if t in dtype_dict: + return dtype_dict[t] + return np.typeDict[t] + +c2f_dict = {'complex': np.float64, + 'complex128': np.float64, + 'complex64': np.float32} + +# numpy 1.6.1 compat +if hasattr(np,'float128'): + c2f_dict['complex256'] = np.float128 + +def c2f(r, i, ctype_name): + """ + Convert strings to complex number instance with specified numpy type. + """ + + ftype = c2f_dict[ctype_name] + return np.typeDict[ctype_name](ftype(r)+1j*ftype(i)) + + +def convert(values): + """ convert the numpy values to a list """ + + dtype = values.dtype + if needs_i8_conversion(dtype): + values = values.view('i8') + v = values.ravel() + + if compressor == 'zlib': + + # return string arrays like they are + if dtype == np.object_: + return v.tolist() + + # convert to a bytes array + v = v.tostring() + return zlib.compress(v) + + elif compressor == 'blosc' and _BLOSC: + + # return string arrays like they are + if dtype == np.object_: + return v.tolist() + + # convert to a bytes array + v = v.tostring() + return blosc.compress(v,typesize=dtype.itemsize) + + # ndarray (on original dtype) + if dtype == 'float64' or dtype == 'int64': + return v + + # as a list + return v.tolist() + +def unconvert(values, dtype, compress=None): + + if dtype == np.object_: + return np.array(values,dtype=object) + + if compress == 'zlib': + + values = zlib.decompress(values) + return np.frombuffer(values,dtype=dtype) + + elif compress == 'blosc': + + if not _BLOSC: + raise Exception("cannot uncompress w/o blosc") + + # decompress + values = blosc.decompress(values) + + return np.frombuffer(values,dtype=dtype) + + # as a list + return np.array(values,dtype=dtype) + +def encode(obj): + """ + Data encoder + """ + + tobj = type(obj) + if isinstance(obj, Index): + if isinstance(obj, PeriodIndex): + return {'typ' : 'period_index', + 'klass' : obj.__class__.__name__, + 'name' : getattr(obj,'name',None), + 'freq' : obj.freqstr, + 'dtype': obj.dtype.num, + 'data': convert(obj.asi8) } + elif isinstance(obj, DatetimeIndex): + return {'typ' : 'datetime_index', + 'klass' : obj.__class__.__name__, + 'name' : getattr(obj,'name',None), + 'dtype': obj.dtype.num, + 'data': convert(obj.asi8), + 'freq' : obj.freqstr, + 'tz' : obj.tz} + elif isinstance(obj, MultiIndex): + return {'typ' : 'multi_index', + 'klass' : obj.__class__.__name__, + 'names' : getattr(obj,'names',None), + 'dtype': obj.dtype.num, + 'data': convert(obj.values) } + else: + return {'typ' : 'index', + 'klass' : obj.__class__.__name__, + 'name' : getattr(obj,'name',None), + 'dtype': obj.dtype.num, + 'data': obj.tolist() } + elif isinstance(obj, Series): + if isinstance(obj, SparseSeries): + d = {'typ' : 'sparse_series', + 'klass' : obj.__class__.__name__, + 'dtype': obj.dtype.num, + 'index' : obj.index, + 'sp_index' : obj.sp_index, + 'sp_values' : convert(obj.sp_values), + 'compress' : compressor} + for f in ['name','fill_value','kind']: + d[f] = getattr(obj,f,None) + return d + else: + return {'typ' : 'series', + 'klass' : obj.__class__.__name__, + 'name' : getattr(obj,'name',None), + 'index' : obj.index, + 'dtype': obj.dtype.num, + 'data': convert(obj.values), + 'compress' : compressor} + elif issubclass(tobj, NDFrame): + if isinstance(obj, SparseDataFrame): + d = {'typ' : 'sparse_dataframe', + 'klass' : obj.__class__.__name__, + 'columns' : obj.columns } + for f in ['default_fill_value','default_kind']: + d[f] = getattr(obj,f,None) + d['data'] = dict([ (name,ss) for name,ss in obj.iterkv() ]) + return d + elif isinstance(obj, SparsePanel): + d = {'typ' : 'sparse_panel', + 'klass' : obj.__class__.__name__, + 'items' : obj.items } + for f in ['default_fill_value','default_kind']: + d[f] = getattr(obj,f,None) + d['data'] = dict([ (name,df) for name,df in obj.iterkv() ]) + return d + else: + + data = obj._data + if not data.is_consolidated(): + data = data.consolidate() + + # the block manager + return {'typ' : 'block_manager', + 'klass' : obj.__class__.__name__, + 'axes' : data.axes, + 'blocks' : [ { 'items' : b.items, + 'values' : convert(b.values), + 'shape' : b.values.shape, + 'dtype' : b.dtype.num, + 'klass' : b.__class__.__name__, + 'compress' : compressor + } for b in data.blocks ] } + + elif isinstance(obj, (datetime,date,np.datetime64,timedelta,np.timedelta64)): + if isinstance(obj, Timestamp): + tz = obj.tzinfo + if tz is not None: + tz = tz.zone + offset = obj.offset + if offset is not None: + offset = offset.freqstr + return {'typ' : 'timestamp', + 'value': obj.value, + 'offset' : offset, + 'tz' : tz} + elif isinstance(obj, np.timedelta64): + return { 'typ' : 'timedelta64', + 'data' : obj.view('i8') } + elif isinstance(obj, timedelta): + return { 'typ' : 'timedelta', + 'data' : (obj.days,obj.seconds,obj.microseconds) } + elif isinstance(obj, np.datetime64): + return { 'typ' : 'datetime64', + 'data' : str(obj) } + elif isinstance(obj, datetime): + return { 'typ' : 'datetime', + 'data' : obj.isoformat() } + elif isinstance(obj, date): + return { 'typ' : 'date', + 'data' : obj.isoformat() } + raise Exception("cannot encode this datetimelike object: %s" % obj) + elif isinstance(obj, Period): + return {'typ' : 'period', + 'ordinal' : obj.ordinal, + 'freq' : obj.freq } + elif isinstance(obj, BlockIndex): + return { 'typ' : 'block_index', + 'klass' : obj.__class__.__name__, + 'blocs' : obj.blocs, + 'blengths' : obj.blengths, + 'length' : obj.length } + elif isinstance(obj, IntIndex): + return { 'typ' : 'int_index', + 'klass' : obj.__class__.__name__, + 'indices' : obj.indices, + 'length' : obj.length } + elif isinstance(obj, np.ndarray) and obj.dtype not in ['float64','int64']: + return {'typ' : 'ndarray', + 'shape': obj.shape, + 'ndim': obj.ndim, + 'dtype': obj.dtype.num, + 'data': convert(obj), + 'compress' : compressor } + elif isinstance(obj, np.number): + if np.iscomplexobj(obj): + return {'typ' : 'np_scalar', + 'sub_typ' : 'np_complex', + 'dtype': obj.dtype.name, + 'real': obj.real.__repr__(), + 'imag': obj.imag.__repr__()} + else: + return {'typ' : 'np_scalar', + 'dtype': obj.dtype.name, + 'data': obj.__repr__()} + elif isinstance(obj, complex): + return {'typ' : 'np_complex', + 'real': obj.real.__repr__(), + 'imag': obj.imag.__repr__()} + + return obj + +def decode(obj): + """ + Decoder for deserializing numpy data types. + """ + + typ = obj.get('typ') + if typ is None: + return obj + elif typ == 'timestamp': + return Timestamp(obj['value'],tz=obj['tz'],offset=obj['offset']) + elif typ == 'period': + return Period(ordinal=obj['ordinal'],freq=obj['freq']) + elif typ == 'index': + dtype = dtype_for(obj['dtype']) + data = obj['data'] + return globals()[obj['klass']](data,dtype=dtype,name=obj['name']) + elif typ == 'multi_index': + return globals()[obj['klass']].from_tuples(obj['data'],names=obj['names']) + elif typ == 'period_index': + return globals()[obj['klass']](obj['data'],name=obj['name'],freq=obj['freq']) + elif typ == 'datetime_index': + return globals()[obj['klass']](obj['data'],freq=obj['freq'],tz=obj['tz'],name=obj['name']) + elif typ == 'series': + dtype = dtype_for(obj['dtype']) + index = obj['index'] + return globals()[obj['klass']](unconvert(obj['data'],dtype,obj['compress']),index=index,name=obj['name']) + elif typ == 'block_manager': + axes = obj['axes'] + + def create_block(b): + dtype = dtype_for(b['dtype']) + return make_block(unconvert(b['values'],dtype,b['compress']).reshape(b['shape']),b['items'],axes[0],klass=getattr(internals,b['klass'])) + + blocks = [ create_block(b) for b in obj['blocks'] ] + return globals()[obj['klass']](BlockManager(blocks, axes)) + elif typ == 'datetime': + return parse(obj['data']) + elif typ == 'datetime64': + return np.datetime64(parse(obj['data'])) + elif typ == 'date': + return parse(obj['data']).date() + elif typ == 'timedelta': + return timedelta(*obj['data']) + elif typ == 'timedelta64': + return np.timedelta64(int(obj['data'])) + elif typ == 'sparse_series': + dtype = dtype_for(obj['dtype']) + return globals()[obj['klass']](unconvert(obj['sp_values'],dtype,obj['compress']),sparse_index=obj['sp_index'], + index=obj['index'],fill_value=obj['fill_value'],kind=obj['kind'],name=obj['name']) + elif typ == 'sparse_dataframe': + return globals()[obj['klass']](obj['data'], + columns=obj['columns'],default_fill_value=obj['default_fill_value'],default_kind=obj['default_kind']) + elif typ == 'sparse_panel': + return globals()[obj['klass']](obj['data'], + items=obj['items'],default_fill_value=obj['default_fill_value'],default_kind=obj['default_kind']) + elif typ == 'block_index': + return globals()[obj['klass']](obj['length'],obj['blocs'],obj['blengths']) + elif typ == 'int_index': + return globals()[obj['klass']](obj['length'],obj['indices']) + elif typ == 'ndarray': + return unconvert(obj['data'],np.typeDict[obj['dtype']],obj.get('compress')).reshape(obj['shape']) + elif typ == 'np_scalar': + if obj.get('sub_typ') == 'np_complex': + return c2f(obj['real'], obj['imag'], obj['dtype']) + else: + dtype = dtype_for(obj['dtype']) + try: + return dtype(obj['data']) + except: + return dtype.type(obj['data']) + elif typ == 'np_complex': + return complex(obj['real']+'+'+obj['imag']+'j') + elif isinstance(obj, (dict,list,set)): + return obj + else: + return obj + +def pack(o, default=encode, + encoding='utf-8', unicode_errors='strict', use_single_float=False): + """ + Pack an object and return the packed bytes. + """ + + return Packer(default=default, encoding=encoding, + unicode_errors=unicode_errors, + use_single_float=use_single_float).pack(o) + +def unpack(packed, object_hook=decode, + list_hook=None, use_list=False, encoding='utf-8', + unicode_errors='strict', object_pairs_hook=None): + """ + Unpack a packed object, return an iterator + Note: packed lists will be returned as tuples + """ + + return Unpacker(packed, object_hook=object_hook, + list_hook=list_hook, + use_list=use_list, encoding=encoding, + unicode_errors=unicode_errors, + object_pairs_hook=object_pairs_hook) + +class Packer(_Packer): + def __init__(self, default=encode, + encoding='utf-8', + unicode_errors='strict', + use_single_float=False): + super(Packer, self).__init__(default=default, + encoding=encoding, + unicode_errors=unicode_errors, + use_single_float=use_single_float) + +class Unpacker(_Unpacker): + def __init__(self, file_like=None, read_size=0, use_list=False, + object_hook=decode, + object_pairs_hook=None, list_hook=None, encoding='utf-8', + unicode_errors='strict', max_buffer_size=0): + super(Unpacker, self).__init__(file_like=file_like, + read_size=read_size, + use_list=use_list, + object_hook=object_hook, + object_pairs_hook=object_pairs_hook, + list_hook=list_hook, + encoding=encoding, + unicode_errors=unicode_errors, + max_buffer_size=max_buffer_size) + +class Iterator(object): + """ manage the unpacking iteration, + close the file on completion """ + + def __init__(self, path, **kwargs): + self.path = path + self.kwargs = kwargs + + def __iter__(self): + + try: + fh = open(self.path,'rb') + unpacker = unpack(fh) + for o in unpacker: + yield o + finally: + fh.close() diff --git a/pandas/io/tests/test_packers.py b/pandas/io/tests/test_packers.py new file mode 100644 index 0000000000000..7575c1e319a59 --- /dev/null +++ b/pandas/io/tests/test_packers.py @@ -0,0 +1,372 @@ +import nose +import unittest +import os +import sys +import warnings + +import datetime +import numpy as np + +from pandas import (Series, DataFrame, Panel, MultiIndex, bdate_range, + date_range, period_range, Index, SparseSeries, SparseDataFrame, + SparsePanel) +import pandas.util.testing as tm +from pandas.util.testing import ensure_clean +from pandas.tests.test_series import assert_series_equal +from pandas.tests.test_frame import assert_frame_equal +from pandas.tests.test_panel import assert_panel_equal + +import pandas +from pandas.sparse.tests.test_sparse import assert_sp_series_equal, assert_sp_frame_equal +from pandas import concat, Timestamp, tslib + +from numpy.testing.decorators import slow +nan = np.nan + +from pandas.io.packers import to_msgpack, read_msgpack + +_multiprocess_can_split_ = False + +def check_arbitrary(a, b): + + if isinstance(a,(list,tuple)) and isinstance(b,(list,tuple)): + assert(len(a) == len(b)) + for a_, b_ in zip(a,b): + check_arbitrary(a_,b_) + elif isinstance(a,Panel): + assert_panel_equal(a,b) + elif isinstance(a,DataFrame): + assert_frame_equal(a,b) + elif isinstance(a,Series): + assert_series_equal(a,b) + else: + assert(a == b) + +class Test(unittest.TestCase): + + def setUp(self): + self.path = '__%s__.msg' % tm.rands(10) + + def tearDown(self): + pass + + def encode_decode(self, x, **kwargs): + with ensure_clean(self.path) as p: + to_msgpack(p,x,**kwargs) + return read_msgpack(p,**kwargs) + +class TestNumpy(Test): + + def test_numpy_scalar_float(self): + x = np.float32(np.random.rand()) + x_rec = self.encode_decode(x) + self.assert_(np.allclose(x,x_rec) and type(x) == type(x_rec)) + + def test_numpy_scalar_complex(self): + x = np.complex64(np.random.rand()+1j*np.random.rand()) + x_rec = self.encode_decode(x) + self.assert_(np.allclose(x,x_rec) and type(x) == type(x_rec)) + + def test_scalar_float(self): + x = np.random.rand() + x_rec = self.encode_decode(x) + self.assert_(np.allclose(x,x_rec) and type(x) == type(x_rec)) + + def test_scalar_complex(self): + x = np.random.rand()+1j*np.random.rand() + x_rec = self.encode_decode(x) + self.assert_(np.allclose(x,x_rec) and type(x) == type(x_rec)) + + def test_list_numpy_float(self): + raise nose.SkipTest('buggy test') + x = [np.float32(np.random.rand()) for i in range(5)] + x_rec = self.encode_decode(x) + self.assert_(all(map(lambda x,y: + x == y, x, x_rec)) and \ + all(map(lambda x,y: type(x) == type(y), x, x_rec))) + + def test_list_numpy_float_complex(self): + if not hasattr(np,'complex128'): + raise nose.SkipTest('numpy cant handle complex128') + + # buggy test + raise nose.SkipTest('buggy test') + x = [np.float32(np.random.rand()) for i in range(5)] + \ + [np.complex128(np.random.rand()+1j*np.random.rand()) for i in range(5)] + x_rec = self.encode_decode(x) + self.assert_(all(map(lambda x,y: x == y, x, x_rec)) and \ + all(map(lambda x,y: type(x) == type(y), x, x_rec))) + + def test_list_float(self): + x = [np.random.rand() for i in range(5)] + x_rec = self.encode_decode(x) + self.assert_(all(map(lambda x,y: x == y, x, x_rec)) and \ + all(map(lambda x,y: type(x) == type(y), x, x_rec))) + + def test_list_float_complex(self): + x = [np.random.rand() for i in range(5)] + \ + [(np.random.rand()+1j*np.random.rand()) for i in range(5)] + x_rec = self.encode_decode(x) + self.assert_(all(map(lambda x,y: x == y, x, x_rec)) and \ + all(map(lambda x,y: type(x) == type(y), x, x_rec))) + + def test_dict_float(self): + x = {'foo': 1.0, 'bar': 2.0} + x_rec = self.encode_decode(x) + self.assert_(all(map(lambda x,y: x == y, x.values(), x_rec.values())) and \ + all(map(lambda x,y: type(x) == type(y), x.values(), x_rec.values()))) + + def test_dict_complex(self): + x = {'foo': 1.0+1.0j, 'bar': 2.0+2.0j} + x_rec = self.encode_decode(x) + self.assert_(all(map(lambda x,y: x == y, x.values(), x_rec.values())) and \ + all(map(lambda x,y: type(x) == type(y), x.values(), x_rec.values()))) + + def test_dict_numpy_float(self): + x = {'foo': np.float32(1.0), 'bar': np.float32(2.0)} + x_rec = self.encode_decode(x) + self.assert_(all(map(lambda x,y: x == y, x.values(), x_rec.values())) and \ + all(map(lambda x,y: type(x) == type(y), x.values(), x_rec.values()))) + + def test_dict_numpy_complex(self): + x = {'foo': np.complex128(1.0+1.0j), 'bar': np.complex128(2.0+2.0j)} + x_rec = self.encode_decode(x) + self.assert_(all(map(lambda x,y: x == y, x.values(), x_rec.values())) and \ + all(map(lambda x,y: type(x) == type(y), x.values(), x_rec.values()))) + + def test_numpy_array_float(self): + x = np.random.rand(5).astype(np.float32) + x_rec = self.encode_decode(x) + self.assert_(all(map(lambda x,y: x == y, x, x_rec)) and \ + x.dtype == x_rec.dtype) + def test_numpy_array_complex(self): + x = (np.random.rand(5)+1j*np.random.rand(5)).astype(np.complex128) + x_rec = self.encode_decode(x) + self.assert_(all(map(lambda x,y: x == y, x, x_rec)) and \ + x.dtype == x_rec.dtype) + + def test_list_mixed(self): + x = [1.0, np.float32(3.5), np.complex128(4.25), u'foo'] + x_rec = self.encode_decode(x) + self.assert_(all(map(lambda x,y: x == y, x, x_rec)) and \ + all(map(lambda x,y: type(x) == type(y), x, x_rec))) +class TestBasic(Test): + + def test_timestamp(self): + + for i in [ Timestamp('20130101'), Timestamp('20130101',tz='US/Eastern'), + Timestamp('201301010501') ]: + i_rec = self.encode_decode(i) + self.assert_(i == i_rec) + + def test_datetimes(self): + + for i in [ datetime.datetime(2013,1,1), datetime.datetime(2013,1,1,5,1), + datetime.date(2013,1,1), np.datetime64(datetime.datetime(2013,1,5,2,15)) ]: + i_rec = self.encode_decode(i) + self.assert_(i == i_rec) + + def test_timedeltas(self): + + for i in [ datetime.timedelta(days=1), + datetime.timedelta(days=1,seconds=10), + np.timedelta64(1000000) ]: + i_rec = self.encode_decode(i) + self.assert_(i == i_rec) + + +class TestIndex(Test): + + def setUp(self): + super(TestIndex, self).setUp() + + self.d = { + 'string' : tm.makeStringIndex(100), + 'date' : tm.makeDateIndex(100), + 'int' : tm.makeIntIndex(100), + 'float' : tm.makeFloatIndex(100), + 'empty' : Index([]), + 'tuple' : Index(zip(['foo', 'bar', 'baz'], [1, 2, 3])), + 'period' : Index(period_range('2012-1-1', freq='M', periods=3)), + 'date2' : Index(date_range('2013-01-1', periods=10)), + 'bdate' : Index(bdate_range('2013-01-02',periods=10)), + } + + self.mi = { + 'reg' : MultiIndex.from_tuples([('bar', 'one'), ('baz', 'two'), ('foo', 'two'), + ('qux', 'one'), ('qux', 'two')], names=['first','second']), + } + + def test_basic_index(self): + + for s, i in self.d.items(): + i_rec = self.encode_decode(i) + self.assert_(i.equals(i_rec)) + + def test_multi_index(self): + + for s, i in self.mi.items(): + i_rec = self.encode_decode(i) + self.assert_(i.equals(i_rec)) + + def test_unicode(self): + i = tm.makeUnicodeIndex(100) + i_rec = self.encode_decode(i) + self.assert_(i.equals(i_rec)) + +class TestSeries(Test): + + def setUp(self): + super(TestSeries, self).setUp() + + self.d = {} + + + s = tm.makeStringSeries() + s.name = 'string' + self.d['string'] = s + + s = tm.makeObjectSeries() + s.name = 'object' + self.d['object'] = s + + s = Series(tslib.iNaT, dtype='M8[ns]', index=range(5)) + self.d['date'] = s + + data = { + 'A': [0., 1., 2., 3., np.nan], + 'B': [0, 1, 0, 1, 0], + 'C': ['foo1', 'foo2', 'foo3', 'foo4', 'foo5'], + 'D': date_range('1/1/2009', periods=5), + 'E' : [0., 1, Timestamp('20100101'),'foo',2.], + } + + self.d['float'] = Series(data['A']) + self.d['int'] = Series(data['B']) + self.d['mixed'] = Series(data['E']) + + def test_basic(self): + + for s, i in self.d.items(): + i_rec = self.encode_decode(i) + assert_series_equal(i,i_rec) + +class TestNDFrame(Test): + + def setUp(self): + super(TestNDFrame, self).setUp() + + data = { + 'A': [0., 1., 2., 3., np.nan], + 'B': [0, 1, 0, 1, 0], + 'C': ['foo1', 'foo2', 'foo3', 'foo4', 'foo5'], + 'D': date_range('1/1/2009', periods=5), + 'E' : [0., 1, Timestamp('20100101'),'foo',2.], + } + + self.frame = { 'float' : DataFrame(dict(A = data['A'], B = Series(data['A']) + 1)), + 'int' : DataFrame(dict(A = data['B'], B = Series(data['B']) + 1)), + 'mixed' : DataFrame(dict([ (k,data[k]) for k in ['A','B','C','D']])) } + + self.panel = { 'float' : Panel(dict(ItemA = self.frame['float'], ItemB = self.frame['float']+1)) } + + def test_basic_frame(self): + + for s, i in self.frame.items(): + i_rec = self.encode_decode(i) + assert_frame_equal(i,i_rec) + + def test_basic_panel(self): + + for s, i in self.panel.items(): + i_rec = self.encode_decode(i) + assert_panel_equal(i,i_rec) + + def test_multi(self): + + i_rec = self.encode_decode(self.frame) + for k in self.frame.keys(): + assert_frame_equal(self.frame[k],i_rec[k]) + + l = tuple([ self.frame['float'], self.frame['float'].A, self.frame['float'].B, None ]) + l_rec = self.encode_decode(l) + check_arbitrary(l,l_rec) + + # this is an oddity in that packed lists will be returned as tuples + l = [ self.frame['float'], self.frame['float'].A, self.frame['float'].B, None ] + l_rec = self.encode_decode(l) + self.assert_(isinstance(l_rec,tuple)) + check_arbitrary(l,l_rec) + + def test_iterator(self): + + l = [ self.frame['float'], self.frame['float'].A, self.frame['float'].B, None ] + + with ensure_clean(self.path) as path: + to_msgpack(path,*l) + for i, packed in enumerate(read_msgpack(path, iterator=True)): + check_arbitrary(packed,l[i]) + +class TestSparse(Test): + + def _check_roundtrip(self, obj, comparator, **kwargs): + + i_rec = self.encode_decode(obj) + comparator(obj,i_rec,**kwargs) + + def test_sparse_series(self): + + s = tm.makeStringSeries() + s[3:5] = np.nan + ss = s.to_sparse() + self._check_roundtrip(ss, tm.assert_series_equal, + check_series_type=True) + + ss2 = s.to_sparse(kind='integer') + self._check_roundtrip(ss2, tm.assert_series_equal, + check_series_type=True) + + ss3 = s.to_sparse(fill_value=0) + self._check_roundtrip(ss3, tm.assert_series_equal, + check_series_type=True) + + def test_sparse_frame(self): + + s = tm.makeDataFrame() + s.ix[3:5, 1:3] = np.nan + s.ix[8:10, -2] = np.nan + ss = s.to_sparse() + + self._check_roundtrip(ss, tm.assert_frame_equal, + check_frame_type=True) + + ss2 = s.to_sparse(kind='integer') + self._check_roundtrip(ss2, tm.assert_frame_equal, + check_frame_type=True) + + ss3 = s.to_sparse(fill_value=0) + self._check_roundtrip(ss3, tm.assert_frame_equal, + check_frame_type=True) + + def test_sparse_panel(self): + + items = ['x', 'y', 'z'] + p = Panel(dict((i, tm.makeDataFrame().ix[:2, :2]) for i in items)) + sp = p.to_sparse() + + self._check_roundtrip(sp, tm.assert_panel_equal, + check_panel_type=True) + + sp2 = p.to_sparse(kind='integer') + self._check_roundtrip(sp2, tm.assert_panel_equal, + check_panel_type=True) + + sp3 = p.to_sparse(fill_value=0) + self._check_roundtrip(sp3, tm.assert_panel_equal, + check_panel_type=True) + + +if __name__ == '__main__': + import nose + nose.runmodule(argv=[__file__, '-vvs', '-x', '--pdb', '--pdb-failure'], + exit=False) diff --git a/pandas/msgpack.pyx b/pandas/msgpack.pyx index a04a19d280467..7d492b6ddda45 100644 --- a/pandas/msgpack.pyx +++ b/pandas/msgpack.pyx @@ -1,5 +1,6 @@ # coding: utf-8 #cython: embedsignature=True +#cython: profile=False from cpython cimport * cdef extern from "Python.h": @@ -12,6 +13,9 @@ from libc.stdlib cimport * from libc.string cimport * from libc.limits cimport * +import cython +import numpy as np +from numpy cimport * class UnpackException(Exception): pass @@ -154,6 +158,8 @@ cdef class Packer(object): def __dealloc__(self): free(self.pk.buf); + @cython.boundscheck(False) + @cython.wraparound(False) cdef int _pack(self, object o, int nest_limit=DEFAULT_RECURSE_LIMIT) except -1: cdef long long llval cdef unsigned long long ullval @@ -163,6 +169,13 @@ cdef class Packer(object): cdef char* rawval cdef int ret cdef dict d + cdef object dtype + + cdef int n,i + cdef double f8val + cdef int64_t i8val + cdef ndarray[float64_t,ndim=1] array_double + cdef ndarray[int64_t,ndim=1] array_int if nest_limit < 0: raise PackValueError("recursion limit exceeded.") @@ -227,6 +240,45 @@ cdef class Packer(object): for v in o: ret = self._pack(v, nest_limit-1) if ret != 0: break + + # ndarray support ONLY (and float64/int64) for now + elif isinstance(o, np.ndarray) and not hasattr(o,'values') and (o.dtype == 'float64' or o.dtype == 'int64'): + + ret = msgpack_pack_map(&self.pk, 5) + if ret != 0: return -1 + + dtype = o.dtype + self.pack_pair('typ', 'ndarray', nest_limit) + self.pack_pair('shape', o.shape, nest_limit) + self.pack_pair('ndim', o.ndim, nest_limit) + self.pack_pair('dtype', dtype.num, nest_limit) + + ret = self._pack('data', nest_limit-1) + if ret != 0: return ret + + if dtype == 'float64': + array_double = o.ravel() + n = len(array_double) + ret = msgpack_pack_array(&self.pk, n) + if ret != 0: return ret + + for i in range(n): + + f8val = array_double[i] + ret = msgpack_pack_double(&self.pk, f8val) + if ret != 0: break + elif dtype == 'int64': + array_int = o.ravel() + n = len(array_int) + ret = msgpack_pack_array(&self.pk, n) + if ret != 0: return ret + + for i in range(n): + + i8val = array_int[i] + ret = msgpack_pack_long(&self.pk, i8val) + if ret != 0: break + elif self._default: o = self._default(o) ret = self._pack(o, nest_limit-1) @@ -300,6 +352,13 @@ cdef class Packer(object): return PyBytes_FromStringAndSize(self.pk.buf, self.pk.length) + cdef inline pack_pair(self, object k, object v, int nest_limit): + ret = self._pack(k, nest_limit-1) + if ret != 0: raise Exception("cannot pack : %s" % k) + ret = self._pack(v, nest_limit-1) + if ret != 0: raise Exception("cannot pack : %s" % v) + return ret + def pack(object o, object stream, default=None, encoding='utf-8', unicode_errors='strict'): """ pack an object `o` and write it to stream).""" diff --git a/vb_suite/packers.py b/vb_suite/packers.py new file mode 100644 index 0000000000000..9af6a6b1b0c4e --- /dev/null +++ b/vb_suite/packers.py @@ -0,0 +1,94 @@ +from vbench.api import Benchmark +from datetime import datetime + +start_date = datetime(2013, 5, 1) + +common_setup = """from pandas_vb_common import * +import os +import pandas as pd +from pandas.core import common as com + +f = '__test__.msg' +def remove(f): + try: + os.remove(f) + except: + pass + +index = date_range('20000101',periods=50000,freq='H') +df = DataFrame({'float1' : randn(50000), + 'float2' : randn(50000)}, + index=index) +remove(f) +""" + +#---------------------------------------------------------------------- +# msgpack + +setup = common_setup + """ +df.to_msgpack(f) +""" + +packers_read_pack = Benchmark("pd.read_msgpack(f)", setup, start_date=start_date) + +setup = common_setup + """ +""" + +packers_write_pack = Benchmark("df.to_msgpack(f)", setup, cleanup="remove(f)", start_date=start_date) + +#---------------------------------------------------------------------- +# pickle + +setup = common_setup + """ +df.to_pickle(f) +""" + +packers_read_pickle = Benchmark("pd.read_pickle(f)", setup, start_date=start_date) + +setup = common_setup + """ +""" + +packers_write_pickle = Benchmark("df.to_pickle(f)", setup, cleanup="remove(f)", start_date=start_date) + +#---------------------------------------------------------------------- +# csv + +setup = common_setup + """ +df.to_csv(f) +""" + +packers_read_csv = Benchmark("pd.read_csv(f)", setup, start_date=start_date) + +setup = common_setup + """ +""" + +packers_write_csv = Benchmark("df.to_csv(f)", setup, cleanup="remove(f)", start_date=start_date) + +#---------------------------------------------------------------------- +# hdf store + +setup = common_setup + """ +df.to_hdf(f,'df') +""" + +packers_read_hdf_store = Benchmark("pd.read_hdf(f,'df')", setup, start_date=start_date) + +setup = common_setup + """ +""" + +packers_write_hdf_store = Benchmark("df.to_hdf(f,'df')", setup, cleanup="remove(f)", start_date=start_date) + +#---------------------------------------------------------------------- +# hdf table + +setup = common_setup + """ +df.to_hdf(f,'df',table=True) +""" + +packers_read_hdf_table = Benchmark("pd.read_hdf(f,'df')", setup, start_date=start_date) + +setup = common_setup + """ +""" + +packers_write_hdf_table = Benchmark("df.to_hdf(f,'df',table=True)", setup, cleanup="remove(f)", start_date=start_date) + diff --git a/vb_suite/suite.py b/vb_suite/suite.py index 57920fcbf7c19..e5002ef78ab9b 100644 --- a/vb_suite/suite.py +++ b/vb_suite/suite.py @@ -16,6 +16,7 @@ 'join_merge', 'miscellaneous', 'panel_ctor', + 'packers', 'parser', 'plotting', 'reindex',