Commit 7c0ffe9e authored by Aurélien Campéas's avatar Aurélien Campéas
Browse files

snapshot/storage: low-level optimisation

Json serialization is replaced with a more low-level scheme,
affecting both string and numeric series.

Purpose is to drop the cost of de-serialization, which is
currently quite high.

For numerical values, we serialize the underlying C array
(while recording the in-memory layout/dtype).

Perf improvement on the reading phase is quite worthwhile:

Before:

TSH GET 0.005136966705322266
TSH HIST 0.5647647380828857
DELTA all value dates 2.0582079887390137
DELTA 1 day  0.20743083953857422

       class                      test      time
0  TimeSerie            bigdata_insert  1.332391
1  TimeSerie       bigdata_history_all  1.718589
2  TimeSerie    bigdata_history_chunks  1.613754
3  TimeSerie          manydiffs_insert  0.940170
4  TimeSerie     manydiffs_history_all  0.996268
5  TimeSerie  manydiffs_history_chunks  2.115351

After:

TSH GET 0.004252910614013672
TSH HIST 0.11956286430358887
DELTA all value dates 1.7346818447113037
DELTA 1 day  0.16817998886108398

       class                      test      time
0  TimeSerie            bigdata_insert  1.297348
1  TimeSerie       bigdata_history_all  0.173700
2  TimeSerie    bigdata_history_chunks  0.181005
3  TimeSerie          manydiffs_insert  0.846298
4  TimeSerie     manydiffs_history_all  0.084483
5  TimeSerie  manydiffs_history_chunks  0.216825


A few notes:

* serialization of strings is a bit tricky since we need to
  encode None/nans in its serialization and have a separator
  for their concatenation (we forbid ascii control characters
  0 and 3 to be ever used)

* we have to wrap the `index` low-level bytes string into
  a python array to work around an obscure pandas bug in
  index.isin computation (isin is attempting a mutation !)


Thanks to Alain Leufroy for the proposal !

Resolves #49.
parent e4a524270210
......@@ -287,10 +287,12 @@ def test_serie_metadata(engine, tsh):
initialmeta = tsh.metadata(engine, 'ts-metadata')
assert initialmeta == {
'tzaware': False,
'index_dtype': '<M8[ns]',
'index_names': [],
'index_type': 'datetime64[ns]',
'value_type': 'float64',
'index_names': []
'tzaware': False,
'value_dtype': '<f8',
'value_type': 'float64'
}
tsh.update_metadata(engine, 'ts-metadata',
......@@ -303,12 +305,15 @@ def test_serie_metadata(engine, tsh):
tsh.update_metadata(engine, 'ts-metadata', {'tzaware': True}, internal=True)
assert tsh.metadata(engine, 'ts-metadata') == {
'tzaware': True,
'index_type': 'datetime64[ns]',
'value_type': 'float64',
'index_dtype': '<M8[ns]',
'index_names': [],
'topic': 'banana spot price'
'index_type': 'datetime64[ns]',
'topic': 'banana spot price',
'tzaware': True,
'value_dtype': '<f8',
'value_type': 'float64'
}
# unbreak the serie for the second test pass :o
tsh.update_metadata(engine, 'ts-metadata', initialmeta, internal=True)
......@@ -547,7 +552,6 @@ def test_point_deletion(engine, tsh):
assert len(tsh.get(engine, 'ts_null')) == 0
# exhibit issue with nans handling
ts_repushed = genserie(datetime(2010, 1, 1), 'D', 11)
ts_repushed[0:3] = np.nan
......@@ -606,7 +610,8 @@ Freq: D
ts_begin = genserie(datetime(2010, 1, 1), 'D', 4, ['text'])
tsh.insert(engine, ts_begin, 'ts_full_del_str', 'test')
ts_begin.iloc[:] = np.nan
ts_begin = pd.Series([np.nan] * 4, name='ts_full_del_str',
index=ts_begin.index)
tsh.insert(engine, ts_begin, 'ts_full_del_str', 'test')
ts_end = genserie(datetime(2010, 1, 1), 'D', 4, ['text'])
......@@ -924,7 +929,7 @@ def test_precision(engine, tsh):
tsh.insert(engine, ts, 'precision', 'test')
ts_round = tsh.get(engine, 'precision')
assert 0.12345678912346 == ts_round.iloc[0]
assert 0.12345678912345678 == ts_round.iloc[0]
diff = tsh.insert(engine, ts_round, 'precision', 'test')
assert diff is None # the roundtriped series does not produce a diff when reinserted
......
import pandas as pd
import zlib
from array import array
import struct
import pandas as pd
import numpy as np
from sqlalchemy import Table, Column, Integer, ForeignKey, Index
from sqlalchemy.sql.elements import NONE_NAME
......@@ -7,10 +11,8 @@ from sqlalchemy.sql.expression import select, asc, desc
from sqlalchemy.dialects.postgresql import BYTEA, TIMESTAMP
from tshistory.util import (
fromjson,
subset,
SeriesServices,
tojson
SeriesServices
)
TABLES = {}
......@@ -55,13 +57,29 @@ class Snapshot(SeriesServices):
# optimized/asymmetric de/serialisation
@property
def isstr(self):
return self.tsh.metadata(self.cn, self.seriename)['value_type'] == 'object'
def _serialize(self, ts):
if ts is None:
return None
return zlib.compress(tojson(ts, self._precision).encode('utf-8')[1:-1])
def _deserialize(self, bytestring):
return zlib.decompress(bytestring)
# use `view` as a workarround for "cannot include dtype 'M' in a buffer"
indexes = ts.index.view(np.uint8).data.tobytes()
indexes_size = struct.pack('!L', len(indexes))
if self.isstr:
# string separatd by 0 and nones/nans represented as 3 (ETX)
END, ETX = b'\0'.decode(), b'\3'.decode()
# first, safety belt
for s in ts.values:
if not pd.isnull(s):
assert END not in s and ETX not in s
values = b'\0'.join(b'\3' if pd.isnull(v) else v.encode('utf-8')
for v in ts.values)
else:
values = ts.values.data.tobytes()
return zlib.compress(indexes_size + indexes + values)
def _ensure_tz_consistency(self, ts):
"""Return timeserie with tz aware index or not depending on metadata
......@@ -73,12 +91,40 @@ class Snapshot(SeriesServices):
return ts.tz_localize('UTC')
return ts
def _decodechunk(self, bytestring):
bytestring = zlib.decompress(bytestring)
[indexes_size] = struct.unpack('!L', bytestring[:4])
values_offset = indexes_size + 4
return bytestring[4:values_offset], bytestring[values_offset:]
def _chunks_to_ts(self, chunks):
body = b'{' + b','.join(self._deserialize(chunk) for chunk in chunks) + b'}'
return self._ensure_tz_consistency(
fromjson(body.decode('utf-8'), self.seriename)
chunks = (self._decodechunk(chunk) for chunk in chunks)
indexchunks, valueschunks = list(zip(*chunks))
metadata = self.tsh.metadata(self.cn, self.seriename)
# array is a workaround for an obscure bug with pandas.isin
index = np.frombuffer(
array('d', b''.join(indexchunks)),
metadata['index_dtype']
)
if self.isstr:
values = [v.decode('utf-8') if v != b'\3' else None
for bvalues in valueschunks
for v in bvalues.split(b'\0')]
else:
values = np.frombuffer(
b''.join(valueschunks),
metadata['value_dtype']
)
assert len(values) == len(index)
serie = pd.Series(values, index=index)
assert serie.index.is_monotonic_increasing
serie.name = self.seriename
return self._ensure_tz_consistency(serie)
# /serialisation
def buckets(self, ts):
......
......@@ -523,8 +523,10 @@ class TimeSerie(SeriesServices):
'tzaware': tzaware_serie(ts),
'index_type': index.dtype.name,
'index_names': inames,
'index_dtype': index.dtype.str,
'value_dtype': ts.dtypes.str,
'value_type': ts.dtypes.name
},
}
)
regid = cn.execute(sql).inserted_primary_key[0]
self.registry_map[seriename] = regid
......
......@@ -136,14 +136,6 @@ class SeriesServices(object):
return hashlib.sha1(seriename.encode('utf-8')).hexdigest()
return seriename
def _serialize(self, ts):
if ts is None:
return None
return zlib.compress(tojson(ts, self._precision).encode('utf-8'))
def _deserialize(self, ts, name):
return fromjson(zlib.decompress(ts).decode('utf-8'), name)
def rename_series(engine, serie_map, namespace='tsh'):
from tshistory.schema import tsschema
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment