Commit 51f74e22 authored by Aurélien Campéas's avatar Aurélien Campéas
Browse files

insertion_date: make it official and check it is always increasing

parent da5ad6f2a35e
...@@ -32,10 +32,10 @@ def test_log(engine, cli, tsh): ...@@ -32,10 +32,10 @@ def test_log(engine, cli, tsh):
def test_history(engine, cli, tsh): def test_history(engine, cli, tsh):
serie = genserie(datetime(2020, 1, 1), 'D', 3) serie = genserie(datetime(2020, 1, 1), 'D', 3)
tsh.insert(engine, serie, 'some_history', 'Babar', tsh.insert(engine, serie, 'some_history', 'Babar',
_insertion_date=utcdt(2019, 1, 1)) insertion_date=utcdt(2019, 1, 1))
serie = genserie(datetime(2020, 1, 2), 'D', 3) serie = genserie(datetime(2020, 1, 2), 'D', 3)
tsh.insert(engine, serie, 'some_history', 'Babar', tsh.insert(engine, serie, 'some_history', 'Babar',
_insertion_date=utcdt(2019, 1, 2)) insertion_date=utcdt(2019, 1, 2))
r = cli('history', engine.url, r = cli('history', engine.url,
'some_history', 'some_history',
......
...@@ -223,7 +223,7 @@ def test_append(engine, tsh): ...@@ -223,7 +223,7 @@ def test_append(engine, tsh):
freq='D', periods=10)): freq='D', periods=10)):
ts = genserie(dt, 'D', 1, [x], name='daily') ts = genserie(dt, 'D', 1, [x], name='daily')
tsh.insert(engine, ts, 'append', 'aurelien.campeas@pythonian.fr', tsh.insert(engine, ts, 'append', 'aurelien.campeas@pythonian.fr',
_insertion_date=dt) insertion_date=dt)
sql = 'select id, parent, chunk from "{}.snapshot".append order by id'.format( sql = 'select id, parent, chunk from "{}.snapshot".append order by id'.format(
tsh.namespace tsh.namespace
......
...@@ -2,6 +2,7 @@ from datetime import datetime, timedelta ...@@ -2,6 +2,7 @@ from datetime import datetime, timedelta
from pathlib import Path from pathlib import Path
import pytz import pytz
from dateutil import parser from dateutil import parser
import pytest import pytest
import numpy as np import numpy as np
...@@ -69,7 +70,7 @@ Freq: H ...@@ -69,7 +70,7 @@ Freq: H
""", ts) """, ts)
tsh.insert(engine, ts, 'tztest', 'Babar', tsh.insert(engine, ts, 'tztest', 'Babar',
_insertion_date=utcdt(2018, 1, 1)) insertion_date=utcdt(2018, 1, 1))
back = tsh.get(engine, 'tztest') back = tsh.get(engine, 'tztest')
# though un localized we understand it's been normalized to utc # though un localized we understand it's been normalized to utc
...@@ -91,7 +92,7 @@ Freq: H ...@@ -91,7 +92,7 @@ Freq: H
'H', 4, tz='UTC') 'H', 4, tz='UTC')
ts.index = ts.index.tz_convert('Europe/Paris') ts.index = ts.index.tz_convert('Europe/Paris')
tsh.insert(engine, ts, 'tztest', 'Celeste', tsh.insert(engine, ts, 'tztest', 'Celeste',
_insertion_date=utcdt(2018, 1, 3)) insertion_date=utcdt(2018, 1, 3))
ts = tsh.get(engine, 'tztest') ts = tsh.get(engine, 'tztest')
assert_df(""" assert_df("""
...@@ -382,7 +383,7 @@ def test_changeset_metadata(engine, tsh): ...@@ -382,7 +383,7 @@ def test_changeset_metadata(engine, tsh):
serie = genserie(datetime(2010, 1, 1), 'D', 1, initval=[1]) serie = genserie(datetime(2010, 1, 1), 'D', 1, initval=[1])
tsh.insert(engine, serie, 'ts-cs-metadata', 'babar', tsh.insert(engine, serie, 'ts-cs-metadata', 'babar',
{'foo': 'A', 'bar': 42}, {'foo': 'A', 'bar': 42},
_insertion_date=utcdt(2019, 1, 1) insertion_date=utcdt(2019, 1, 1)
) )
log = tsh.log(engine, 'ts-cs-metadata') log = tsh.log(engine, 'ts-cs-metadata')
...@@ -400,31 +401,31 @@ def test_revision_date(engine, tsh): ...@@ -400,31 +401,31 @@ def test_revision_date(engine, tsh):
for i in range(1, 5): for i in range(1, 5):
with engine.begin() as cn: with engine.begin() as cn:
tsh.insert(cn, genserie(datetime(2017, 1, i), 'D', 3, [i]), 'revdate', tsh.insert(cn, genserie(datetime(2017, 1, i), 'D', 3, [i]), 'revdate',
'test', _insertion_date=utcdt(2016, 1, i)) 'test', insertion_date=utcdt(2016, 1, i))
# end of prologue, now some real meat # end of prologue, now some real meat
idate0 = pd.Timestamp('2015-1-1 00:00:00', tz='UTC') idate0 = pd.Timestamp('2015-1-1 00:00:00', tz='UTC')
ts = genserie(datetime(2010, 1, 4), 'D', 4, [0], name='truc') ts = genserie(datetime(2010, 1, 4), 'D', 4, [0], name='truc')
tsh.insert(engine, ts, 'ts_through_time', tsh.insert(engine, ts, 'ts_through_time',
'test', _insertion_date=idate0) 'test', insertion_date=idate0)
assert idate0 == tsh.latest_insertion_date(engine, 'ts_through_time') assert idate0 == tsh.latest_insertion_date(engine, 'ts_through_time')
idate1 = pd.Timestamp('2015-1-1 15:45:23', tz='UTC') idate1 = pd.Timestamp('2015-1-1 15:45:23', tz='UTC')
ts = genserie(datetime(2010, 1, 4), 'D', 4, [1], name='truc') ts = genserie(datetime(2010, 1, 4), 'D', 4, [1], name='truc')
tsh.insert(engine, ts, 'ts_through_time', tsh.insert(engine, ts, 'ts_through_time',
'test', _insertion_date=idate1) 'test', insertion_date=idate1)
assert idate1 == tsh.latest_insertion_date(engine, 'ts_through_time') assert idate1 == tsh.latest_insertion_date(engine, 'ts_through_time')
idate2 = pd.Timestamp('2015-1-2 15:43:23', tz='UTC') idate2 = pd.Timestamp('2015-1-2 15:43:23', tz='UTC')
ts = genserie(datetime(2010, 1, 4), 'D', 4, [2], name='truc') ts = genserie(datetime(2010, 1, 4), 'D', 4, [2], name='truc')
tsh.insert(engine, ts, 'ts_through_time', tsh.insert(engine, ts, 'ts_through_time',
'test', _insertion_date=idate2) 'test', insertion_date=idate2)
assert idate2 == tsh.latest_insertion_date(engine, 'ts_through_time') assert idate2 == tsh.latest_insertion_date(engine, 'ts_through_time')
idate3 = pd.Timestamp('2015-1-3', tz='UTC') idate3 = pd.Timestamp('2015-1-3', tz='UTC')
ts = genserie(datetime(2010, 1, 4), 'D', 4, [3], name='truc') ts = genserie(datetime(2010, 1, 4), 'D', 4, [3], name='truc')
tsh.insert(engine, ts, 'ts_through_time', tsh.insert(engine, ts, 'ts_through_time',
'test', _insertion_date=idate3) 'test', insertion_date=idate3)
assert idate3 == tsh.latest_insertion_date(engine, 'ts_through_time') assert idate3 == tsh.latest_insertion_date(engine, 'ts_through_time')
ts = tsh.get(engine, 'ts_through_time') ts = tsh.get(engine, 'ts_through_time')
...@@ -653,7 +654,7 @@ def test_deletion_over_horizon(engine, tsh): ...@@ -653,7 +654,7 @@ def test_deletion_over_horizon(engine, tsh):
name = 'delete_over_hz' name = 'delete_over_hz'
tsh.insert(engine, ts, name, 'Babar', tsh.insert(engine, ts, name, 'Babar',
_insertion_date=idate) insertion_date=idate)
ts = pd.Series( ts = pd.Series(
[np.nan, np.nan, np.nan], [np.nan, np.nan, np.nan],
...@@ -661,7 +662,7 @@ def test_deletion_over_horizon(engine, tsh): ...@@ -661,7 +662,7 @@ def test_deletion_over_horizon(engine, tsh):
) )
tsh.insert(engine, ts, name, 'Celeste', tsh.insert(engine, ts, name, 'Celeste',
_insertion_date=idate.replace(day=2)) insertion_date=idate.replace(day=2))
ival = tsh.interval(engine, name) ival = tsh.interval(engine, name)
assert ival.left == datetime(2018, 1, 1) assert ival.left == datetime(2018, 1, 1)
assert ival.right == datetime(2018, 1, 2) assert ival.right == datetime(2018, 1, 2)
...@@ -671,7 +672,7 @@ def test_deletion_over_horizon(engine, tsh): ...@@ -671,7 +672,7 @@ def test_deletion_over_horizon(engine, tsh):
index=pd.date_range(datetime(2017, 12, 30), freq='D', periods=3) index=pd.date_range(datetime(2017, 12, 30), freq='D', periods=3)
) )
tsh.insert(engine, ts, name, 'Arthur', tsh.insert(engine, ts, name, 'Arthur',
_insertion_date=idate.replace(day=3)) insertion_date=idate.replace(day=3))
ival = tsh.interval(engine, name) ival = tsh.interval(engine, name)
assert ival.left == datetime(2018, 1, 2) assert ival.left == datetime(2018, 1, 2)
assert ival.right == datetime(2018, 1, 2) assert ival.right == datetime(2018, 1, 2)
...@@ -682,7 +683,7 @@ def test_history(engine, tsh): ...@@ -682,7 +683,7 @@ def test_history(engine, tsh):
with engine.begin() as cn: with engine.begin() as cn:
tsh.insert(cn, genserie(datetime(2017, 1, 1), 'D', numserie), 'smallserie', tsh.insert(cn, genserie(datetime(2017, 1, 1), 'D', numserie), 'smallserie',
'aurelien.campeas@pythonian.fr', 'aurelien.campeas@pythonian.fr',
_insertion_date=utcdt(2017, 2, numserie)) insertion_date=utcdt(2017, 2, numserie))
ts = tsh.get(engine, 'smallserie') ts = tsh.get(engine, 'smallserie')
assert_df(""" assert_df("""
...@@ -733,7 +734,7 @@ insertion_date value_date ...@@ -733,7 +734,7 @@ insertion_date value_date
with engine.begin() as cn: with engine.begin() as cn:
idate = idate.replace(tzinfo=pytz.timezone('UTC')) idate = idate.replace(tzinfo=pytz.timezone('UTC'))
tsh.insert(cn, histts[idate], 'smallserie2', tsh.insert(cn, histts[idate], 'smallserie2',
'aurelien.campeas@pythonian.f', _insertion_date=idate) 'aurelien.campeas@pythonian.f', insertion_date=idate)
# this is perfectly round-tripable # this is perfectly round-tripable
assert (tsh.get(engine, 'smallserie2') == ts).all() assert (tsh.get(engine, 'smallserie2') == ts).all()
...@@ -847,7 +848,7 @@ def test_delta_na(engine, tsh): ...@@ -847,7 +848,7 @@ def test_delta_na(engine, tsh):
for idx, idate in enumerate(ldates): for idx, idate in enumerate(ldates):
ts = pd.Series([idx] * 3, index = ldates) ts = pd.Series([idx] * 3, index = ldates)
tsh.insert(engine, ts, 'without_na', 'arnaud', tsh.insert(engine, ts, 'without_na', 'arnaud',
_insertion_date=idate) insertion_date=idate)
assert_df(""" assert_df("""
2015-01-20 00:00:00+00:00 2.0 2015-01-20 00:00:00+00:00 2.0
...@@ -887,7 +888,7 @@ insertion_date value_date ...@@ -887,7 +888,7 @@ insertion_date value_date
if idx == 2: if idx == 2:
serie[-1] = np.nan serie[-1] = np.nan
tsh.insert(engine, serie, 'with_na', 'arnaud', tsh.insert(engine, serie, 'with_na', 'arnaud',
_insertion_date=idate) insertion_date=idate)
# the value at 2015-01-22 is hidden by the inserted nan # the value at 2015-01-22 is hidden by the inserted nan
assert_df(""" assert_df("""
...@@ -922,7 +923,8 @@ def test_nr_gethistory(engine, tsh): ...@@ -922,7 +923,8 @@ def test_nr_gethistory(engine, tsh):
index=pd.date_range(start=datetime(2016, 12, 29), index=pd.date_range(start=datetime(2016, 12, 29),
end=datetime(2017, 1, 1), end=datetime(2017, 1, 1),
freq='D')) freq='D'))
tsh.insert(engine, s0, 'foo', 'zogzog') tsh.insert(engine, s0, 'foo', 'zogzog',
insertion_date=utcdt(2015, 12, 31))
s1 = pd.Series([1, 0, 0, 1], s1 = pd.Series([1, 0, 0, 1],
index=pd.date_range(start=datetime(2017, 1, 1), index=pd.date_range(start=datetime(2017, 1, 1),
...@@ -933,7 +935,7 @@ def test_nr_gethistory(engine, tsh): ...@@ -933,7 +935,7 @@ def test_nr_gethistory(engine, tsh):
with engine.begin() as cn: with engine.begin() as cn:
tsh.insert(cn, s1 * i, 'foo', tsh.insert(cn, s1 * i, 'foo',
'aurelien.campeas@pythonian.f', 'aurelien.campeas@pythonian.f',
_insertion_date=idate + timedelta(days=i)) insertion_date=idate + timedelta(days=i))
df = tsh.history(engine, 'foo', df = tsh.history(engine, 'foo',
datetime(2016, 1, 3), datetime(2016, 1, 3),
...@@ -1080,7 +1082,7 @@ def test_strip(engine, tsh): ...@@ -1080,7 +1082,7 @@ def test_strip(engine, tsh):
for i in range(1, 5): for i in range(1, 5):
pubdate = utcdt(2017, 1, i) pubdate = utcdt(2017, 1, i)
ts = genserie(datetime(2017, 1, 10), 'H', 1 + i) ts = genserie(datetime(2017, 1, 10), 'H', 1 + i)
tsh.insert(engine, ts, 'xserie', 'babar', _insertion_date=pubdate) tsh.insert(engine, ts, 'xserie', 'babar', insertion_date=pubdate)
# also insert something completely unrelated # also insert something completely unrelated
tsh.insert(engine, genserie(datetime(2018, 1, 1), 'D', 1 + i), tsh.insert(engine, genserie(datetime(2018, 1, 1), 'D', 1 + i),
'yserie', 'celeste') 'yserie', 'celeste')
...@@ -1185,7 +1187,7 @@ def test_staircase(engine, tsh): ...@@ -1185,7 +1187,7 @@ def test_staircase(engine, tsh):
freq='H'): freq='H'):
ts = genserie(start=idate, freq='H', repeat=7) ts = genserie(start=idate, freq='H', repeat=7)
tsh.insert(engine, ts, 'republication', 'test', tsh.insert(engine, ts, 'republication', 'test',
_insertion_date=idate) insertion_date=idate)
hist = tsh.history(engine, 'republication') hist = tsh.history(engine, 'republication')
assert_hist(""" assert_hist("""
...@@ -1255,7 +1257,7 @@ def test_staircase_2_tzaware(engine, tsh): ...@@ -1255,7 +1257,7 @@ def test_staircase_2_tzaware(engine, tsh):
end=utcdt(2015, 1, 4), end=utcdt(2015, 1, 4),
freq='D')): freq='D')):
ts = genserie(start=idate, freq='H', repeat=7) ts = genserie(start=idate, freq='H', repeat=7)
tsh.insert(engine, ts, 'repu2', 'test', _insertion_date=idate) tsh.insert(engine, ts, 'repu2', 'test', insertion_date=idate)
deltas = tsh.staircase(engine, 'repu2', delta=timedelta(hours=3)) deltas = tsh.staircase(engine, 'repu2', delta=timedelta(hours=3))
assert_df(""" assert_df("""
...@@ -1317,7 +1319,7 @@ def test_staircase_2_tznaive(engine, tsh): ...@@ -1317,7 +1319,7 @@ def test_staircase_2_tznaive(engine, tsh):
end=utcdt(2015, 1, 4), end=utcdt(2015, 1, 4),
freq='D')): freq='D')):
ts = genserie(start=idate.replace(tzinfo=None), freq='H', repeat=7) ts = genserie(start=idate.replace(tzinfo=None), freq='H', repeat=7)
tsh.insert(engine, ts, 'repu-tz-naive', 'test', _insertion_date=idate) tsh.insert(engine, ts, 'repu-tz-naive', 'test', insertion_date=idate)
deltas = tsh.staircase(engine, 'repu-tz-naive', delta=timedelta(hours=3)) deltas = tsh.staircase(engine, 'repu-tz-naive', delta=timedelta(hours=3))
assert_df(""" assert_df("""
...@@ -1553,7 +1555,7 @@ def test_insert_errors(engine, tsh): ...@@ -1553,7 +1555,7 @@ def test_insert_errors(engine, tsh):
tsh.insert(engine, ts, 'error', 42) tsh.insert(engine, ts, 'error', 42)
with pytest.raises(AssertionError): with pytest.raises(AssertionError):
tsh.insert(engine, ts, 'error', 'Babar', _insertion_date='2010-1-1') tsh.insert(engine, ts, 'error', 'Babar', insertion_date='2010-1-1')
with pytest.raises(AssertionError): with pytest.raises(AssertionError):
tsh.insert(engine, ts, 'error', 'Babar', metadata=42) tsh.insert(engine, ts, 'error', 'Babar', metadata=42)
......
...@@ -47,7 +47,7 @@ class timeseries(SeriesServices): ...@@ -47,7 +47,7 @@ class timeseries(SeriesServices):
@tx @tx
def insert(self, cn, newts, name, author, def insert(self, cn, newts, name, author,
metadata=None, metadata=None,
_insertion_date=None): insertion_date=None):
"""Create a new revision of a given time series """Create a new revision of a given time series
newts: pandas.Series with date index newts: pandas.Series with date index
...@@ -59,7 +59,7 @@ class timeseries(SeriesServices): ...@@ -59,7 +59,7 @@ class timeseries(SeriesServices):
return return
newts = self._guard_insert( newts = self._guard_insert(
newts, name, author, metadata, newts, name, author, metadata,
_insertion_date insertion_date
) )
assert ('<M8[ns]' == newts.index.dtype or assert ('<M8[ns]' == newts.index.dtype or
...@@ -72,10 +72,10 @@ class timeseries(SeriesServices): ...@@ -72,10 +72,10 @@ class timeseries(SeriesServices):
if tablename is None: if tablename is None:
seriesmeta = self._series_initial_meta(cn, name, newts) seriesmeta = self._series_initial_meta(cn, name, newts)
return self._create(cn, newts, name, author, seriesmeta, return self._create(cn, newts, name, author, seriesmeta,
metadata, _insertion_date) metadata, insertion_date)
return self._update(cn, tablename, newts, name, author, return self._update(cn, tablename, newts, name, author,
metadata, _insertion_date) metadata, insertion_date)
def list_series(self, cn): def list_series(self, cn):
"""Return the mapping of all series to their type""" """Return the mapping of all series to their type"""
...@@ -273,9 +273,11 @@ class timeseries(SeriesServices): ...@@ -273,9 +273,11 @@ class timeseries(SeriesServices):
q = select('max(insertion_date)').table( q = select('max(insertion_date)').table(
f'"{self.namespace}.revision"."{tablename}"' f'"{self.namespace}.revision"."{tablename}"'
) )
return pd.Timestamp( idate = pd.Timestamp(
q.do(cn).scalar() q.do(cn).scalar()
).astimezone('UTC') )
if not pd.isnull(idate):
return idate.astimezone('UTC')
@tx @tx
def insertion_dates(self, cn, name, def insertion_dates(self, cn, name,
...@@ -469,12 +471,6 @@ class timeseries(SeriesServices): ...@@ -469,12 +471,6 @@ class timeseries(SeriesServices):
cn.execute( cn.execute(
f'select pg_advisory_xact_lock({self.create_lock_id})' f'select pg_advisory_xact_lock({self.create_lock_id})'
) )
if insertion_date is not None:
assert insertion_date.tzinfo is not None
idate = pd.Timestamp(insertion_date)
else:
idate = pd.Timestamp(datetime.utcnow(), tz='UTC')
if metadata: if metadata:
metadata = json.dumps(metadata) metadata = json.dumps(metadata)
...@@ -541,6 +537,9 @@ class timeseries(SeriesServices): ...@@ -541,6 +537,9 @@ class timeseries(SeriesServices):
idate = pd.Timestamp(insertion_date) idate = pd.Timestamp(insertion_date)
else: else:
idate = pd.Timestamp(datetime.utcnow(), tz='UTC') idate = pd.Timestamp(datetime.utcnow(), tz='UTC')
latest_idate = self.latest_insertion_date(cn, name)
if latest_idate:
assert idate > latest_idate
if metadata: if metadata:
metadata = json.dumps(metadata) metadata = json.dumps(metadata)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment