tsio.py 24.8 KB
Newer Older
1
from datetime import datetime
2
from contextlib import contextmanager
3
import logging
4
import zlib
5
import math
6
7
8
9

import pandas as pd
import numpy as np

10
from sqlalchemy import Table, Column, Integer, ForeignKey
Aurélien Campéas's avatar
Aurélien Campéas committed
11
from sqlalchemy.sql.expression import select, func, desc
12
from sqlalchemy.dialects.postgresql import BYTEA
13

14
from tshistory.schema import tsschema
15
16
17
18
19
from tshistory.util import (
    inject_in_index,
    subset,
    tzaware_serie
)
20
21


22
L = logging.getLogger('tshistory.tsio')
23
24


25
def tojson(ts):
26
    if not isinstance(ts.index, pd.MultiIndex):
27
28
        return ts.to_json(date_format='iso',
                          double_precision=-int(math.log10(TimeSerie._precision)))
29

30
31
32
    # multi index case
    return ts.to_frame().reset_index().to_json(date_format='iso')

Aurélien Campéas's avatar
Aurélien Campéas committed
33

34
35
36
37
38
def num2float(pdobj):
    # get a Series or a Dataframe column
    if str(pdobj.dtype).startswith('int'):
        return pdobj.astype('float64')
    return pdobj
39

Aurélien Campéas's avatar
Aurélien Campéas committed
40

41
def fromjson(jsonb, tsname):
42
43
44
45
    return _fromjson(jsonb, tsname).fillna(value=np.nan)


def _fromjson(jsonb, tsname):
46
47
48
    if jsonb == '{}':
        return pd.Series(name=tsname)

49
    result = pd.read_json(jsonb, typ='series', dtype=False)
50
    result.name = tsname
51
    if isinstance(result.index, pd.DatetimeIndex):
52
        result = num2float(result)
53
54
55
56
57
58
59
60
        return result

    # multi index case
    columns = result.index.values.tolist()
    columns.remove(tsname)
    result = pd.read_json(jsonb, typ='frame',
                          convert_dates=columns)
    result.set_index(sorted(columns), inplace=True)
Aurélien Campéas's avatar
Aurélien Campéas committed
61
    return num2float(result.iloc[:, 0])  # get a Series object
62
63


64
class TimeSerie(object):
65
    _csid = None
66
    _snapshot_interval = 10
67
    _precision = 1e-14
68
    namespace = 'tsh'
69
    schema = None
70
71
72

    def __init__(self, namespace='tsh'):
        self.namespace = namespace
73
74
        self.schema = tsschema(namespace)
        self.schema.define()
75
        self.metadatacache = {}
76
77
78

    # API : changeset, insert, get, delete
    @contextmanager
79
    def newchangeset(self, cn, author, _insertion_date=None):
80
81
82
83
84
85
        """A context manager to allow insertion of several series within the
        same changeset identifier

        This allows to group changes to several series, hence
        producing a macro-change.

86
87
        _insertion_date is *only* provided for migration purposes and
        not part of the API.
88
        """
89
        assert self._csid is None
90
        self._csid = self._newchangeset(cn, author, _insertion_date)
91
        self._author = author
92
93
        yield
        del self._csid
94
        del self._author
95

96
    def insert(self, cn, newts, name, author=None, _insertion_date=None,
97
               extra_scalars={}):
98
        """Create a new revision of a given time series
99

100
        newts: pandas.Series with date index
101

102
        name: str unique identifier of the serie
103
104
105
106

        author: str free-form author name (mandatory, unless provided
        to the newchangeset context manager).

107
        """
108
109
        assert self._csid or author, 'author is mandatory'
        if self._csid and author:
110
111
            L.info('author r{} will not be used when in a changeset'.format(author))
            author = None
112
        assert isinstance(newts, pd.Series)
113
        assert not newts.index.duplicated().any()
114

115
        newts = num2float(newts)
116

117
        if not len(newts):
118
            return
119

120
121
122
123
        assert ('<M8[ns]' == newts.index.dtype or
                'datetime' in str(newts.index.dtype) or
                isinstance(newts.index, pd.MultiIndex))

124
        newts.name = name
125
        table = self._get_ts_table(cn, name)
126

127
128
129
130
        if isinstance(newts.index, pd.MultiIndex):
            # we impose an order to survive rountrips
            newts = newts.reorder_levels(sorted(newts.index.names))

131
132
        if table is None:
            # initial insertion
133
134
            if newts.isnull().all():
                return None
135
            newts = newts[~newts.isnull()]
136
            table = self._make_ts_table(cn, name, newts)
137
            csid = self._csid or self._newchangeset(cn, author, _insertion_date)
138
            value = {
139
                'csid': csid,
140
                'snapshot': self._serialize(newts),
141
            }
142
143
            # callback for extenders
            self._complete_insertion_value(value, extra_scalars)
144
145
            cn.execute(table.insert().values(value))
            self._finalize_insertion(cn, csid, name)
146
147
            L.info('first insertion of %s (size=%s) by %s',
                   name, len(newts), author or self._author)
148
            return newts
149

150
        diff, newsnapshot = self._compute_diff_and_newsnapshot(
151
            cn, table, newts, **extra_scalars
152
153
        )
        if diff is None:
154
155
            L.info('no difference in %s by %s (for ts of size %s)',
                   name, author or self._author, len(newts))
156
157
            return

158
        tip_id = self._get_tip_id(cn, table)
159
        csid = self._csid or self._newchangeset(cn, author, _insertion_date)
160
        value = {
161
            'csid': csid,
162
163
            'diff': self._serialize(diff),
            'snapshot': self._serialize(newsnapshot),
164
165
166
167
            'parent': tip_id,
        }
        # callback for extenders
        self._complete_insertion_value(value, extra_scalars)
168
169
        cn.execute(table.insert().values(value))
        self._finalize_insertion(cn, csid, name)
170

171
        if tip_id > 1 and tip_id % self._snapshot_interval:
172
            self._purge_snapshot_at(cn, table, tip_id)
173
174
        L.info('inserted diff (size=%s) for ts %s by %s',
               len(diff), name, author or self._author)
175
        return diff
176

177
178
    def get(self, cn, name, revision_date=None,
            from_value_date=None, to_value_date=None):
179
180
181
182
183
        """Compute and return the serie of a given name

        revision_date: datetime filter to get previous versions of the
        serie

184
        """
185
        table = self._get_ts_table(cn, name)
186
187
        if table is None:
            return
188

189
190
191
        qfilter = []
        if revision_date:
            qfilter.append(lambda cset, _: cset.c.insertion_date <= revision_date)
192
193
194
        current = self._build_snapshot_upto(cn, table, qfilter,
                                            from_value_date=from_value_date,
                                            to_value_date=to_value_date)
195

196
197
        if current is not None:
            current.name = name
198
            current = current[~current.isnull()]
199
        return current
200

201
    def metadata(self, cn, tsname):
202
203
204
205
206
207
208
209
210
211
212
        """Return metadata dict of timeserie."""
        if tsname in self.metadatacache:
            return self.metadatacache[tsname]
        reg = self.schema.registry
        sql = select([reg.c.metadata]).where(
            reg.c.name == tsname
        )
        meta = cn.execute(sql).scalar()
        self.metadatacache[tsname] = meta
        return meta

213
214
    def get_group(self, cn, name, revision_date=None):
        csid = self._latest_csid_for(cn, name)
215
216

        group = {}
217
218
        for seriename in self._changeset_series(cn, csid):
            serie = self.get(cn, seriename, revision_date)
219
220
221
222
            if serie is not None:
                group[seriename] = serie
        return group

223
224
    def get_history(self, cn, name,
                    from_insertion_date=None,
225
226
                    to_insertion_date=None,
                    from_value_date=None,
227
228
                    to_value_date=None,
                    diffmode=False):
229
230
231
232
        table = self._get_ts_table(cn, name)
        if table is None:
            return

233
        # compute diffs above the snapshot
234
235
236
237
238
239
240
241
242
243
244
        cset = self.schema.changeset
        diffsql = select([cset.c.id, cset.c.insertion_date, table.c.diff]
        ).order_by(cset.c.id
        ).where(table.c.csid == cset.c.id)

        if from_insertion_date:
            diffsql = diffsql.where(cset.c.insertion_date >= from_insertion_date)
        if to_insertion_date:
            diffsql = diffsql.where(cset.c.insertion_date <= to_insertion_date)

        diffs = cn.execute(diffsql).fetchall()
245
246
247
248
249
        if not diffs:
            # it's fine to ask for an insertion date range
            # where noting did happen, but you get nothing
            return

250
251
252
253
254
255
256
        if diffmode:
            series = []
            for csid, revdate, diff in diffs:
                if diff is None:  # we must fetch the initial snapshot
                    sql = select([table.c.snapshot]).where(table.c.csid == csid)
                    diff = cn.execute(sql).scalar()
                serie = subset(self._deserialize(diff, name), from_value_date, to_value_date)
257
                serie = self._ensure_tz_consistency(cn, serie)
258
                inject_in_index(serie, revdate)
259
260
261
262
263
                series.append(serie)
            series = pd.concat(series)
            series.name = name
            return series

264
265
        csid, revdate, diff_ = diffs[0]
        snapshot = self._build_snapshot_upto(cn, table, [
266
            lambda cset, _: cset.c.id <= csid
267
        ], from_value_date, to_value_date)
268

269
        series = [(revdate, subset(snapshot, from_value_date, to_value_date))]
270
        for csid_, revdate, diff in diffs[1:]:
271
272
            diff = subset(self._deserialize(diff, table.name),
                          from_value_date, to_value_date)
273
            diff = self._ensure_tz_consistency(cn, diff)
274

275
276
277
278
            serie = self._apply_diff(series[-1][1], diff)
            series.append((revdate, serie))

        for revdate, serie in series:
279
            inject_in_index(serie, revdate)
280
281
282
283

        serie = pd.concat([serie for revdate_, serie in series])
        serie.name = name
        return serie
284

285
286
287
    def exists(self, cn, name):
        return self._get_ts_table(cn, name) is not None

288
    def latest_insertion_date(self, cn, name):
289
        cset = self.schema.changeset
290
        tstable = self._get_ts_table(cn, name)
291
292
        sql = select([func.max(cset.c.insertion_date)]
        ).where(tstable.c.csid == cset.c.id)
293
        return cn.execute(sql).scalar()
294

295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
    def changeset_at(self, cn, seriename, revdate, mode='strict'):
        assert mode in ('strict', 'before', 'after')
        cset = self.schema.changeset
        table = self._table_definition_for(seriename)
        sql = select([table.c.csid]).where(
            table.c.csid == cset.c.id
        )
        if mode == 'strict':
            sql = sql.where(cset.c.insertion_date == revdate)
        elif mode == 'before':
            sql = sql.where(cset.c.insertion_date <= revdate)
        else:
            sql = sql.where(cset.c.insertion_date >= revdate)
        return cn.execute(sql).scalar()

    def strip(self, cn, seriename, csid):
        logs = self.log(cn, fromrev=csid, names=(seriename,))
        assert logs

        # put stripping info in the metadata
        cset = self.schema.changeset
        cset_serie = self.schema.changeset_series
        for log in logs:
            # update changeset.metadata
            metadata = cn.execute(
                select([cset.c.metadata]).where(cset.c.id == log['rev'])
            ).scalar() or {}
            metadata['tshistory.info'] = 'got stripped from {}'.format(csid)
            sql = cset.update().where(cset.c.id == log['rev']
            ).values(metadata=metadata)
            cn.execute(sql)
            # delete changset_serie item
            sql = cset_serie.delete().where(
                cset_serie.c.csid == log['rev']
            ).where(
                cset_serie.c.serie == seriename
            )
            cn.execute(sql)

        # wipe the diffs
        table = self._table_definition_for(seriename)
        cn.execute(table.delete().where(table.c.csid == csid))
        # rebuild the top-level snapshot
        cstip = self._latest_csid_for(cn, seriename)
        if cn.execute(select([table.c.snapshot]).where(table.c.csid == cstip)).scalar() is None:
            snap = self._build_snapshot_upto(
                cn, table,
                qfilter=(lambda cset, _t: cset.c.id < csid,)
            )
            sql = table.update().where(
                table.c.csid == cstip
            ).values(
                snapshot=self._serialize(snap)
            )
            cn.execute(sql)

351
    def info(self, cn):
352
353
        """Gather global statistics on the current tshistory repository
        """
354
        sql = 'select count(*) from {}.registry'.format(self.namespace)
355
        stats = {'series count': cn.execute(sql).scalar()}
356
        sql = 'select max(id) from {}.changeset'.format(self.namespace)
357
        stats['changeset count'] = cn.execute(sql).scalar()
358
        sql = 'select distinct name from {}.registry order by name'.format(self.namespace)
359
        stats['serie names'] = [row for row, in cn.execute(sql).fetchall()]
360
361
        return stats

362
    def log(self, cn, limit=0, diff=False, names=None, authors=None,
363
            stripped=False,
364
365
            fromrev=None, torev=None,
            fromdate=None, todate=None):
366
367
368
369
        """Build a structure showing the history of all the series in the db,
        per changeset, in chronological order.
        """
        log = []
370
371
372
373
374
        cset, cset_series, reg = (
            self.schema.changeset,
            self.schema.changeset_series,
            self.schema.registry
        )
375

376
        sql = select([cset.c.id, cset.c.author, cset.c.insertion_date, cset.c.metadata]
377
        ).distinct().order_by(desc(cset.c.id))
378
379
380
381

        if limit:
            sql = sql.limit(limit)

382
383
384
        if names:
            sql = sql.where(reg.c.name.in_(names))

385
386
387
        if authors:
            sql = sql.where(cset.c.author.in_(authors))

388
389
390
391
392
393
        if fromrev:
            sql = sql.where(cset.c.id >= fromrev)

        if torev:
            sql = sql.where(cset.c.id <= torev)

394
395
396
397
398
399
        if fromdate:
            sql = sql.where(cset.c.insertion_date >= fromdate)

        if todate:
            sql = sql.where(cset.c.insertion_date <= todate)

400
401
402
403
404
405
        if stripped:
            # outerjoin to show dead things
            sql = sql.select_from(cset.outerjoin(cset_series))
        else:
            sql = sql.where(cset.c.id == cset_series.c.csid
            ).where(cset_series.c.serie == reg.c.name)
406

407
        rset = cn.execute(sql)
408
        for csetid, author, revdate, meta in rset.fetchall():
409
            log.append({'rev': csetid, 'author': author, 'date': revdate,
410
                        'meta': meta or {},
411
                        'names': self._changeset_series(cn, csetid)})
412
413
414

        if diff:
            for rev in log:
415
                rev['diff'] = {name: self._diff(cn, rev['rev'], name)
416
417
                               for name in rev['names']}

418
        log.sort(key=lambda rev: rev['rev'])
419
420
        return log

421
422
    # /API
    # Helpers
423

424
425
426
    # ts serialisation

    def _serialize(self, ts):
427
428
        if ts is None:
            return None
429
        return zlib.compress(tojson(ts).encode('utf-8'))
430
431

    def _deserialize(self, ts, name):
432
        return fromjson(zlib.decompress(ts).decode('utf-8'), name)
433

434
435
436
437
438
    def _ensure_tz_consistency(self, cn, ts):
        """Return timeserie with tz aware index or not depending on metadata
        tzaware.
        """
        assert ts.name is not None
439
        metadata = self.metadata(cn, ts.name)
440
        if metadata and metadata.get('tzaware', False):
441
442
443
            if isinstance(ts.index, pd.MultiIndex):
                for i in range(len(ts.index.levels)):
                    ts.index = ts.index.set_levels(
444
                        ts.index.levels[i].tz_localize('UTC'),
445
446
                        level=i)
                return ts
447
448
449
            return ts.tz_localize('UTC')
        return ts

450
    # serie table handling
451

452
453
    def _ts_table_name(self, seriename):
        # namespace.seriename
454
        return '{}.timeserie.{}'.format(self.namespace, seriename)
455

456
    def _table_definition_for(self, seriename):
457
        return Table(
458
            seriename, self.schema.meta,
459
            Column('id', Integer, primary_key=True),
460
461
            Column('csid', Integer,
                   ForeignKey('{}.changeset.id'.format(self.namespace)),
462
                   index=True, nullable=False),
463
            # constraint: there is either .diff or .snapshot
464
465
            Column('diff', BYTEA),
            Column('snapshot', BYTEA),
466
467
            Column('parent',
                   Integer,
468
469
                   ForeignKey('{}.timeserie.{}.id'.format(self.namespace,
                                                          seriename),
470
                              ondelete='cascade'),
471
472
473
                   nullable=True,
                   unique=True,
                   index=True),
474
            schema='{}.timeserie'.format(self.namespace),
475
            extend_existing=True
476
477
        )

478
    def _make_ts_table(self, cn, name, ts):
479
        tablename = self._ts_table_name(name)
480
        table = self._table_definition_for(name)
481
        table.create(cn)
482
483
        index = ts.index
        inames = [name for name in index.names if name]
484
        sql = self.schema.registry.insert().values(
485
            name=name,
486
            table_name=tablename,
487
488
489
490
491
492
            metadata={
                'tzaware': tzaware_serie(ts),
                'index_type': index.dtype.name,
                'index_names': inames,
                'value_type': ts.dtypes.name
            },
493
        )
494
        cn.execute(sql)
495
496
        return table

497
    def _get_ts_table(self, cn, name):
498
        reg = self.schema.registry
499
500
        tablename = self._ts_table_name(name)
        sql = reg.select().where(reg.c.table_name == tablename)
501
        tid = cn.execute(sql).scalar()
502
        if tid:
503
            return self._table_definition_for(name)
504

505
506
    # changeset handling

507
    def _newchangeset(self, cn, author, _insertion_date=None):
508
        table = self.schema.changeset
509
510
        sql = table.insert().values(
            author=author,
511
            insertion_date=_insertion_date or datetime.now())
512
        return cn.execute(sql).inserted_primary_key[0]
513

514
515
    def _latest_csid_for(self, cn, name):
        table = self._get_ts_table(cn, name)
Aurélien Campéas's avatar
Aurélien Campéas committed
516
        sql = select([func.max(table.c.csid)])
517
        return cn.execute(sql).scalar()
518

519
    def _changeset_series(self, cn, csid):
520
        cset_serie = self.schema.changeset_series
521
522
523
        sql = select([cset_serie.c.serie]
        ).where(cset_serie.c.csid == csid)

524
        return [seriename for seriename, in cn.execute(sql).fetchall()]
525
526
527

    # insertion handling

528
    def _get_tip_id(self, cn, table):
529
        " get the *local* id "
530
        sql = select([func.max(table.c.id)])
531
        return cn.execute(sql).scalar()
532

533
534
535
    def _complete_insertion_value(self, value, extra_scalars):
        pass

536
    def _finalize_insertion(self, cn, csid, name):
537
        table = self.schema.changeset_series
538
539
540
541
        sql = table.insert().values(
            csid=csid,
            serie=name
        )
542
        cn.execute(sql)
543

544
545
    # snapshot handling

546
547
    def _purge_snapshot_at(self, cn, table, diffid):
        cn.execute(
548
549
550
551
552
            table.update(
            ).where(table.c.id == diffid
            ).values(snapshot=None)
        )

553
554
555
    def _validate(self, cn, name, ts):
        if ts.isnull().all():
            # ts erasure
556
            return
557
558
559
        meta = self.metadata(cn, name)
        tstype = ts.dtype
        if tstype != meta['value_type']:
560
            m = 'Type error when inserting {}, new type is {}, type in base is {}'.format(
561
                name, tstype, meta['value_type'])
562
            raise Exception(m)
563
        if ts.index.dtype.name != meta['index_type']:
564
            raise Exception('Incompatible index types')
565
566
567
568
569
        inames = [name for name in ts.index.names if name]
        if inames != meta['index_names']:
            raise Exception('Incompatible multi indexes: {} vs {}'.format(
                meta['index_names'], inames)
            )
570

571
    def _compute_diff_and_newsnapshot(self, cn, table, newts, **extra_scalars):
572
        self._validate(cn, table.name, newts)
573
        snapshot = self._build_snapshot_upto(cn, table)
574
        assert snapshot is not None
575
576
577
578
579
580
581
582
583
        diff = self._compute_diff(snapshot, newts)

        if len(diff) == 0:
            return None, None

        # full state computation & insertion
        newsnapshot = self._apply_diff(snapshot, diff)
        return diff, newsnapshot

584
585
    def _find_snapshot(self, cn, table, qfilter=(), column='snapshot',
                       from_value_date=None, to_value_date=None):
586
        cset = self.schema.changeset
587
588
589
        sql = select([table.c.id, table.c[column]]
        ).order_by(desc(table.c.id)
        ).limit(1
590
591
        ).where(table.c[column] != None
        ).select_from(table.join(cset))
592
593

        if qfilter:
594
            sql = sql.where(table.c.csid <= cset.c.id)
595
596
597
598
            for filtercb in qfilter:
                sql = sql.where(filtercb(cset, table))

        try:
599
            snapid, snapdata = cn.execute(sql).fetchone()
600
601
            snapdata = subset(self._deserialize(snapdata, table.name),
                              from_value_date, to_value_date)
602
            snapdata = self._ensure_tz_consistency(cn, snapdata)
603
604
        except TypeError:
            return None, None
605
        return snapid, snapdata
606

607
608
    def _build_snapshot_upto(self, cn, table, qfilter=(),
                             from_value_date=None, to_value_date=None):
609
610
611
        snapid, snapshot = self._find_snapshot(cn, table, qfilter,
                                               from_value_date=from_value_date,
                                               to_value_date=to_value_date)
612
613
614
        if snapid is None:
            return None

615
        cset = self.schema.changeset
616
617
        # beware the potential cartesian product
        # between table & cset if there is no qfilter
618
        sql = select([table.c.id,
619
                      table.c.diff,
620
                      table.c.parent,
621
622
                      cset.c.insertion_date]
        ).order_by(table.c.id
623
        ).where(table.c.id > snapid)
624

625
626
627
628
        if qfilter:
            sql = sql.where(table.c.csid == cset.c.id)
            for filtercb in qfilter:
                sql = sql.where(filtercb(cset, table))
629

630
        alldiffs = pd.read_sql(sql, cn)
631
632

        if len(alldiffs) == 0:
633
            return snapshot
634

635
        # initial ts
636
        ts = self._deserialize(alldiffs.loc[0, 'diff'], table.name)
637
        ts = self._ensure_tz_consistency(cn, ts)
638
        for row in alldiffs.loc[1:].itertuples():
639
640
            diff = subset(self._deserialize(row.diff, table.name),
                          from_value_date, to_value_date)
641
            diff = self._ensure_tz_consistency(cn, diff)
642
            ts = self._apply_diff(ts, diff)
643
        ts = self._apply_diff(snapshot, ts)
644
645
        assert ts.index.dtype.name == 'datetime64[ns]' or len(ts) == 0
        return ts
646
647
648

    # diff handling

649
650
    def _diff(self, cn, csetid, name):
        table = self._get_ts_table(cn, name)
651
        cset = self.schema.changeset
652
653
654
655
656
657

        def filtercset(sql):
            return sql.where(table.c.csid == cset.c.id
            ).where(cset.c.id == csetid)

        sql = filtercset(select([table.c.id]))
658
        tsid = cn.execute(sql).scalar()
659
660
661
662
663
664
665

        if tsid == 1:
            sql = select([table.c.snapshot])
        else:
            sql = select([table.c.diff])
        sql = filtercset(sql)

666
667
        ts = self._deserialize(cn.execute(sql).scalar(), name)
        return self._ensure_tz_consistency(cn, ts)
668

669
    def _compute_diff(self, fromts, tots):
670
671
        """Compute the difference between fromts and tots
        (like in tots - fromts).
672
673

        """
674
        if fromts is None:
675
            return tots
676
677
678
        fromts = fromts[~fromts.isnull()]
        if not len(fromts):
            return tots
Aurélien Campéas's avatar
Aurélien Campéas committed
679

680
681
682
683
684
        mask_overlap = tots.index.isin(fromts.index)
        fromts_overlap = fromts[tots.index[mask_overlap]]
        tots_overlap = tots[mask_overlap]

        if fromts.dtype == 'float64':
685
            mask_equal = np.isclose(fromts_overlap, tots_overlap,
686
                                    rtol=0, atol=self._precision)
687
688
689
        else:
            mask_equal = fromts_overlap == tots_overlap

690
691
692
        mask_na_equal = fromts_overlap.isnull() & tots_overlap.isnull()
        mask_equal = mask_equal | mask_na_equal

693
694
        diff_overlap = tots[mask_overlap][~mask_equal]
        diff_new = tots[~mask_overlap]
695
        diff_new = diff_new[~diff_new.isnull()]
696
        return pd.concat([diff_overlap, diff_new])
697
698
699

    def _apply_diff(self, base_ts, new_ts):
        """Produce a new ts using base_ts as a base and taking any
700
        intersecting and new values from new_ts.
701
702
703
704
705
706
707
708
709
710

        """
        if base_ts is None:
            return new_ts
        if new_ts is None:
            return base_ts
        result_ts = pd.Series([0.0], index=base_ts.index.union(new_ts.index))
        result_ts[base_ts.index] = base_ts
        result_ts[new_ts.index] = new_ts
        result_ts.sort_index(inplace=True)
711
        result_ts.name = base_ts.name
712
        return result_ts