tsio.py 25.3 KB
Newer Older
1
from datetime import datetime
2
from contextlib import contextmanager
3
import logging
4
import zlib
5
import math
6
7

import pandas as pd
8
from pandas.api.types import is_datetimetz
9
10
import numpy as np

11
from sqlalchemy import Table, Column, Integer, ForeignKey
Aurélien Campéas's avatar
Aurélien Campéas committed
12
from sqlalchemy.sql.expression import select, func, desc
13
from sqlalchemy.dialects.postgresql import BYTEA
14

15
from tshistory.schema import tsschema
16
17


18
L = logging.getLogger('tshistory.tsio')
19
20


21
def tojson(ts):
22
    if not isinstance(ts.index, pd.MultiIndex):
23
24
        return ts.to_json(date_format='iso',
                          double_precision=-int(math.log10(TimeSerie._precision)))
25

26
27
28
    # multi index case
    return ts.to_frame().reset_index().to_json(date_format='iso')

Aurélien Campéas's avatar
Aurélien Campéas committed
29

30
31
32
33
34
def num2float(pdobj):
    # get a Series or a Dataframe column
    if str(pdobj.dtype).startswith('int'):
        return pdobj.astype('float64')
    return pdobj
35

Aurélien Campéas's avatar
Aurélien Campéas committed
36

37
def fromjson(jsonb, tsname):
38
39
40
41
    return _fromjson(jsonb, tsname).fillna(value=np.nan)


def _fromjson(jsonb, tsname):
42
43
44
    if jsonb == '{}':
        return pd.Series(name=tsname)

45
    result = pd.read_json(jsonb, typ='series', dtype=False)
46
    result.name = tsname
47
    if isinstance(result.index, pd.DatetimeIndex):
48
        result = num2float(result)
49
50
51
52
53
54
55
56
        return result

    # multi index case
    columns = result.index.values.tolist()
    columns.remove(tsname)
    result = pd.read_json(jsonb, typ='frame',
                          convert_dates=columns)
    result.set_index(sorted(columns), inplace=True)
Aurélien Campéas's avatar
Aurélien Campéas committed
57
    return num2float(result.iloc[:, 0])  # get a Series object
58
59


60
61
62
63
64
65
66
67
68
69
70
71
def tzaware_serie(ts):
    if isinstance(ts.index, pd.MultiIndex):
        tzaware = [is_datetimetz(ts.index.get_level_values(idx_name))
                   for idx_name in ts.index.names]
        assert all(tzaware) or not any(tzaware), (
            'all your indexes must be '
            'either tzaware or none of them'
        )
        return all(tzaware)
    return is_datetimetz(ts.index)


72
73
74
75
76
77
def subset(ts, fromdate, todate):
    if fromdate is None and todate is None:
        return ts
    return ts.loc[fromdate:todate]


78
79
80
81
82
83
84
85
86
87
88
89
90
def inject_in_index(serie, revdate):
    if isinstance(serie.index, pd.MultiIndex):
        mindex = [(revdate, *rest) for rest in serie.index]
        serie.index = pd.MultiIndex.from_tuples(mindex, names=[
            'insertion_date', *serie.index.names]
        )
        return
    mindex = [(revdate, valuestamp) for valuestamp in serie.index]
    serie.index = pd.MultiIndex.from_tuples(mindex, names=[
        'insertion_date', 'value_date']
    )


91
class TimeSerie(object):
92
    _csid = None
93
    _snapshot_interval = 10
94
    _precision = 1e-14
95
    namespace = 'tsh'
96
    schema = None
97
98
99

    def __init__(self, namespace='tsh'):
        self.namespace = namespace
100
101
        self.schema = tsschema(namespace)
        self.schema.define()
102
        self.metadatacache = {}
103
104
105

    # API : changeset, insert, get, delete
    @contextmanager
106
    def newchangeset(self, cn, author, _insertion_date=None):
107
108
109
110
111
112
        """A context manager to allow insertion of several series within the
        same changeset identifier

        This allows to group changes to several series, hence
        producing a macro-change.

113
114
        _insertion_date is *only* provided for migration purposes and
        not part of the API.
115
        """
116
        assert self._csid is None
117
        self._csid = self._newchangeset(cn, author, _insertion_date)
118
        self._author = author
119
120
        yield
        del self._csid
121
        del self._author
122

123
    def insert(self, cn, newts, name, author=None, _insertion_date=None,
124
               extra_scalars={}):
125
        """Create a new revision of a given time series
126

127
        newts: pandas.Series with date index
128

129
        name: str unique identifier of the serie
130
131
132
133

        author: str free-form author name (mandatory, unless provided
        to the newchangeset context manager).

134
        """
135
136
        assert self._csid or author, 'author is mandatory'
        if self._csid and author:
137
138
            L.info('author r{} will not be used when in a changeset'.format(author))
            author = None
139
        assert isinstance(newts, pd.Series)
140
        assert not newts.index.duplicated().any()
141

142
        newts = num2float(newts)
143

144
        if not len(newts):
145
            return
146

147
148
149
150
        assert ('<M8[ns]' == newts.index.dtype or
                'datetime' in str(newts.index.dtype) or
                isinstance(newts.index, pd.MultiIndex))

151
        newts.name = name
152
        table = self._get_ts_table(cn, name)
153

154
155
156
157
        if isinstance(newts.index, pd.MultiIndex):
            # we impose an order to survive rountrips
            newts = newts.reorder_levels(sorted(newts.index.names))

158
159
        if table is None:
            # initial insertion
160
161
            if newts.isnull().all():
                return None
162
            newts = newts[~newts.isnull()]
163
            table = self._make_ts_table(cn, name, tzaware=tzaware_serie(newts))
164
            csid = self._csid or self._newchangeset(cn, author, _insertion_date)
165
            value = {
166
                'csid': csid,
167
                'snapshot': self._serialize(newts),
168
            }
169
170
            # callback for extenders
            self._complete_insertion_value(value, extra_scalars)
171
172
            cn.execute(table.insert().values(value))
            self._finalize_insertion(cn, csid, name)
173
174
            L.info('first insertion of %s (size=%s) by %s',
                   name, len(newts), author or self._author)
175
            return newts
176

177
        diff, newsnapshot = self._compute_diff_and_newsnapshot(
178
            cn, table, newts, **extra_scalars
179
180
        )
        if diff is None:
181
182
            L.info('no difference in %s by %s (for ts of size %s)',
                   name, author or self._author, len(newts))
183
184
            return

185
        tip_id = self._get_tip_id(cn, table)
186
        csid = self._csid or self._newchangeset(cn, author, _insertion_date)
187
        value = {
188
            'csid': csid,
189
190
            'diff': self._serialize(diff),
            'snapshot': self._serialize(newsnapshot),
191
192
193
194
            'parent': tip_id,
        }
        # callback for extenders
        self._complete_insertion_value(value, extra_scalars)
195
196
        cn.execute(table.insert().values(value))
        self._finalize_insertion(cn, csid, name)
197

198
        if tip_id > 1 and tip_id % self._snapshot_interval:
199
            self._purge_snapshot_at(cn, table, tip_id)
200
201
        L.info('inserted diff (size=%s) for ts %s by %s',
               len(diff), name, author or self._author)
202
        return diff
203

204
205
    def get(self, cn, name, revision_date=None,
            from_value_date=None, to_value_date=None):
206
207
208
209
210
        """Compute and return the serie of a given name

        revision_date: datetime filter to get previous versions of the
        serie

211
        """
212
        table = self._get_ts_table(cn, name)
213
214
        if table is None:
            return
215

216
217
218
        qfilter = []
        if revision_date:
            qfilter.append(lambda cset, _: cset.c.insertion_date <= revision_date)
219
220
221
        current = self._build_snapshot_upto(cn, table, qfilter,
                                            from_value_date=from_value_date,
                                            to_value_date=to_value_date)
222

223
224
        if current is not None:
            current.name = name
225
            current = current[~current.isnull()]
226
        return current
227

228
229
230
231
232
233
234
235
236
237
238
239
    def get_serie_metadata(self, cn, tsname):
        """Return metadata dict of timeserie."""
        if tsname in self.metadatacache:
            return self.metadatacache[tsname]
        reg = self.schema.registry
        sql = select([reg.c.metadata]).where(
            reg.c.name == tsname
        )
        meta = cn.execute(sql).scalar()
        self.metadatacache[tsname] = meta
        return meta

240
241
    def get_group(self, cn, name, revision_date=None):
        csid = self._latest_csid_for(cn, name)
242
243

        group = {}
244
245
        for seriename in self._changeset_series(cn, csid):
            serie = self.get(cn, seriename, revision_date)
246
247
248
249
            if serie is not None:
                group[seriename] = serie
        return group

250
251
    def get_history(self, cn, name,
                    from_insertion_date=None,
252
253
                    to_insertion_date=None,
                    from_value_date=None,
254
255
                    to_value_date=None,
                    diffmode=False):
256
257
258
259
        table = self._get_ts_table(cn, name)
        if table is None:
            return

260
        # compute diffs above the snapshot
261
262
263
264
265
266
267
268
269
270
271
        cset = self.schema.changeset
        diffsql = select([cset.c.id, cset.c.insertion_date, table.c.diff]
        ).order_by(cset.c.id
        ).where(table.c.csid == cset.c.id)

        if from_insertion_date:
            diffsql = diffsql.where(cset.c.insertion_date >= from_insertion_date)
        if to_insertion_date:
            diffsql = diffsql.where(cset.c.insertion_date <= to_insertion_date)

        diffs = cn.execute(diffsql).fetchall()
272
273
274
275
276
        if not diffs:
            # it's fine to ask for an insertion date range
            # where noting did happen, but you get nothing
            return

277
278
279
280
281
282
283
        if diffmode:
            series = []
            for csid, revdate, diff in diffs:
                if diff is None:  # we must fetch the initial snapshot
                    sql = select([table.c.snapshot]).where(table.c.csid == csid)
                    diff = cn.execute(sql).scalar()
                serie = subset(self._deserialize(diff, name), from_value_date, to_value_date)
284
                serie = self._ensure_tz_consistency(cn, serie)
285
                inject_in_index(serie, revdate)
286
287
288
289
290
                series.append(serie)
            series = pd.concat(series)
            series.name = name
            return series

291
292
        csid, revdate, diff_ = diffs[0]
        snapshot = self._build_snapshot_upto(cn, table, [
293
            lambda cset, _: cset.c.id <= csid
294
        ], from_value_date, to_value_date)
295

296
        series = [(revdate, subset(snapshot, from_value_date, to_value_date))]
297
        for csid_, revdate, diff in diffs[1:]:
298
299
            diff = subset(self._deserialize(diff, table.name),
                          from_value_date, to_value_date)
300
            diff = self._ensure_tz_consistency(cn, diff)
301

302
303
304
305
            serie = self._apply_diff(series[-1][1], diff)
            series.append((revdate, serie))

        for revdate, serie in series:
306
            inject_in_index(serie, revdate)
307
308
309
310

        serie = pd.concat([serie for revdate_, serie in series])
        serie.name = name
        return serie
311

312
313
314
    def exists(self, cn, name):
        return self._get_ts_table(cn, name) is not None

315
    def latest_insertion_date(self, cn, name):
316
        cset = self.schema.changeset
317
        tstable = self._get_ts_table(cn, name)
318
319
        sql = select([func.max(cset.c.insertion_date)]
        ).where(tstable.c.csid == cset.c.id)
320
        return cn.execute(sql).scalar()
321

322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
    def changeset_at(self, cn, seriename, revdate, mode='strict'):
        assert mode in ('strict', 'before', 'after')
        cset = self.schema.changeset
        table = self._table_definition_for(seriename)
        sql = select([table.c.csid]).where(
            table.c.csid == cset.c.id
        )
        if mode == 'strict':
            sql = sql.where(cset.c.insertion_date == revdate)
        elif mode == 'before':
            sql = sql.where(cset.c.insertion_date <= revdate)
        else:
            sql = sql.where(cset.c.insertion_date >= revdate)
        return cn.execute(sql).scalar()

    def strip(self, cn, seriename, csid):
        logs = self.log(cn, fromrev=csid, names=(seriename,))
        assert logs

        # put stripping info in the metadata
        cset = self.schema.changeset
        cset_serie = self.schema.changeset_series
        for log in logs:
            # update changeset.metadata
            metadata = cn.execute(
                select([cset.c.metadata]).where(cset.c.id == log['rev'])
            ).scalar() or {}
            metadata['tshistory.info'] = 'got stripped from {}'.format(csid)
            sql = cset.update().where(cset.c.id == log['rev']
            ).values(metadata=metadata)
            cn.execute(sql)
            # delete changset_serie item
            sql = cset_serie.delete().where(
                cset_serie.c.csid == log['rev']
            ).where(
                cset_serie.c.serie == seriename
            )
            cn.execute(sql)

        # wipe the diffs
        table = self._table_definition_for(seriename)
        cn.execute(table.delete().where(table.c.csid == csid))
        # rebuild the top-level snapshot
        cstip = self._latest_csid_for(cn, seriename)
        if cn.execute(select([table.c.snapshot]).where(table.c.csid == cstip)).scalar() is None:
            snap = self._build_snapshot_upto(
                cn, table,
                qfilter=(lambda cset, _t: cset.c.id < csid,)
            )
            sql = table.update().where(
                table.c.csid == cstip
            ).values(
                snapshot=self._serialize(snap)
            )
            cn.execute(sql)

378
    def info(self, cn):
379
380
        """Gather global statistics on the current tshistory repository
        """
381
        sql = 'select count(*) from {}.registry'.format(self.namespace)
382
        stats = {'series count': cn.execute(sql).scalar()}
383
        sql = 'select max(id) from {}.changeset'.format(self.namespace)
384
        stats['changeset count'] = cn.execute(sql).scalar()
385
        sql = 'select distinct name from {}.registry order by name'.format(self.namespace)
386
        stats['serie names'] = [row for row, in cn.execute(sql).fetchall()]
387
388
        return stats

389
    def log(self, cn, limit=0, diff=False, names=None, authors=None,
390
            stripped=False,
391
392
            fromrev=None, torev=None,
            fromdate=None, todate=None):
393
394
395
396
        """Build a structure showing the history of all the series in the db,
        per changeset, in chronological order.
        """
        log = []
397
398
399
400
401
        cset, cset_series, reg = (
            self.schema.changeset,
            self.schema.changeset_series,
            self.schema.registry
        )
402

403
        sql = select([cset.c.id, cset.c.author, cset.c.insertion_date, cset.c.metadata]
404
        ).distinct().order_by(desc(cset.c.id))
405
406
407
408

        if limit:
            sql = sql.limit(limit)

409
410
411
        if names:
            sql = sql.where(reg.c.name.in_(names))

412
413
414
        if authors:
            sql = sql.where(cset.c.author.in_(authors))

415
416
417
418
419
420
        if fromrev:
            sql = sql.where(cset.c.id >= fromrev)

        if torev:
            sql = sql.where(cset.c.id <= torev)

421
422
423
424
425
426
        if fromdate:
            sql = sql.where(cset.c.insertion_date >= fromdate)

        if todate:
            sql = sql.where(cset.c.insertion_date <= todate)

427
428
429
430
431
432
        if stripped:
            # outerjoin to show dead things
            sql = sql.select_from(cset.outerjoin(cset_series))
        else:
            sql = sql.where(cset.c.id == cset_series.c.csid
            ).where(cset_series.c.serie == reg.c.name)
433

434
        rset = cn.execute(sql)
435
        for csetid, author, revdate, meta in rset.fetchall():
436
            log.append({'rev': csetid, 'author': author, 'date': revdate,
437
                        'meta': meta or {},
438
                        'names': self._changeset_series(cn, csetid)})
439
440
441

        if diff:
            for rev in log:
442
                rev['diff'] = {name: self._diff(cn, rev['rev'], name)
443
444
                               for name in rev['names']}

445
        log.sort(key=lambda rev: rev['rev'])
446
447
        return log

448
449
    # /API
    # Helpers
450

451
452
453
    # ts serialisation

    def _serialize(self, ts):
454
455
        if ts is None:
            return None
456
        return zlib.compress(tojson(ts).encode('utf-8'))
457
458

    def _deserialize(self, ts, name):
459
        return fromjson(zlib.decompress(ts).decode('utf-8'), name)
460

461
462
463
464
465
466
467
    def _ensure_tz_consistency(self, cn, ts):
        """Return timeserie with tz aware index or not depending on metadata
        tzaware.
        """
        assert ts.name is not None
        metadata = self.get_serie_metadata(cn, ts.name)
        if metadata and metadata.get('tzaware', False):
468
469
470
            if isinstance(ts.index, pd.MultiIndex):
                for i in range(len(ts.index.levels)):
                    ts.index = ts.index.set_levels(
471
                        ts.index.levels[i].tz_localize('UTC'),
472
473
                        level=i)
                return ts
474
475
476
            return ts.tz_localize('UTC')
        return ts

477
    # serie table handling
478

479
480
    def _ts_table_name(self, seriename):
        # namespace.seriename
481
        return '{}.timeserie.{}'.format(self.namespace, seriename)
482

483
    def _table_definition_for(self, seriename):
484
        return Table(
485
            seriename, self.schema.meta,
486
            Column('id', Integer, primary_key=True),
487
488
            Column('csid', Integer,
                   ForeignKey('{}.changeset.id'.format(self.namespace)),
489
                   index=True, nullable=False),
490
            # constraint: there is either .diff or .snapshot
491
492
            Column('diff', BYTEA),
            Column('snapshot', BYTEA),
493
494
            Column('parent',
                   Integer,
495
496
                   ForeignKey('{}.timeserie.{}.id'.format(self.namespace,
                                                          seriename),
497
                              ondelete='cascade'),
498
499
500
                   nullable=True,
                   unique=True,
                   index=True),
501
            schema='{}.timeserie'.format(self.namespace),
502
            extend_existing=True
503
504
        )

505
    def _make_ts_table(self, cn, name, tzaware=False):
506
        tablename = self._ts_table_name(name)
507
        table = self._table_definition_for(name)
508
        table.create(cn)
509
        sql = self.schema.registry.insert().values(
510
            name=name,
511
512
513
            table_name=tablename,
            metadata={'tzaware': tzaware},
        )
514
        cn.execute(sql)
515
516
        return table

517
    def _get_ts_table(self, cn, name):
518
        reg = self.schema.registry
519
520
        tablename = self._ts_table_name(name)
        sql = reg.select().where(reg.c.table_name == tablename)
521
        tid = cn.execute(sql).scalar()
522
        if tid:
523
            return self._table_definition_for(name)
524

525
526
    # changeset handling

527
    def _newchangeset(self, cn, author, _insertion_date=None):
528
        table = self.schema.changeset
529
530
        sql = table.insert().values(
            author=author,
531
            insertion_date=_insertion_date or datetime.now())
532
        return cn.execute(sql).inserted_primary_key[0]
533

534
535
    def _latest_csid_for(self, cn, name):
        table = self._get_ts_table(cn, name)
Aurélien Campéas's avatar
Aurélien Campéas committed
536
        sql = select([func.max(table.c.csid)])
537
        return cn.execute(sql).scalar()
538

539
    def _changeset_series(self, cn, csid):
540
        cset_serie = self.schema.changeset_series
541
542
543
        sql = select([cset_serie.c.serie]
        ).where(cset_serie.c.csid == csid)

544
        return [seriename for seriename, in cn.execute(sql).fetchall()]
545
546
547

    # insertion handling

548
    def _get_tip_id(self, cn, table):
549
        " get the *local* id "
550
        sql = select([func.max(table.c.id)])
551
        return cn.execute(sql).scalar()
552

553
554
555
    def _complete_insertion_value(self, value, extra_scalars):
        pass

556
    def _finalize_insertion(self, cn, csid, name):
557
        table = self.schema.changeset_series
558
559
560
561
        sql = table.insert().values(
            csid=csid,
            serie=name
        )
562
        cn.execute(sql)
563

564
565
    # snapshot handling

566
567
    def _purge_snapshot_at(self, cn, table, diffid):
        cn.execute(
568
569
570
571
572
            table.update(
            ).where(table.c.id == diffid
            ).values(snapshot=None)
        )

573
    def _validate_type(self, oldts, newts, name):
574
575
576
        if (oldts is None or
            oldts.isnull().all() or
            newts.isnull().all()):
577
578
579
580
581
582
583
584
            return
        old_type = oldts.dtype
        new_type = newts.dtype
        if new_type != old_type:
            m = 'Type error when inserting {}, new type is {}, type in base is {}'.format(
                name, new_type, old_type)
            raise Exception(m)

585
586
    def _compute_diff_and_newsnapshot(self, cn, table, newts, **extra_scalars):
        snapshot = self._build_snapshot_upto(cn, table)
587
        self._validate_type(snapshot, newts, table.name)
588
589
590
591
592
593
594
595
596
        diff = self._compute_diff(snapshot, newts)

        if len(diff) == 0:
            return None, None

        # full state computation & insertion
        newsnapshot = self._apply_diff(snapshot, diff)
        return diff, newsnapshot

597
598
    def _find_snapshot(self, cn, table, qfilter=(), column='snapshot',
                       from_value_date=None, to_value_date=None):
599
        cset = self.schema.changeset
600
601
602
        sql = select([table.c.id, table.c[column]]
        ).order_by(desc(table.c.id)
        ).limit(1
603
604
        ).where(table.c[column] != None
        ).select_from(table.join(cset))
605
606

        if qfilter:
607
            sql = sql.where(table.c.csid <= cset.c.id)
608
609
610
611
            for filtercb in qfilter:
                sql = sql.where(filtercb(cset, table))

        try:
612
            snapid, snapdata = cn.execute(sql).fetchone()
613
614
            snapdata = subset(self._deserialize(snapdata, table.name),
                              from_value_date, to_value_date)
615
            snapdata = self._ensure_tz_consistency(cn, snapdata)
616
617
        except TypeError:
            return None, None
618
        return snapid, snapdata
619

620
621
    def _build_snapshot_upto(self, cn, table, qfilter=(),
                             from_value_date=None, to_value_date=None):
622
623
624
        snapid, snapshot = self._find_snapshot(cn, table, qfilter,
                                               from_value_date=from_value_date,
                                               to_value_date=to_value_date)
625
626
627
        if snapid is None:
            return None

628
        cset = self.schema.changeset
629
630
        # beware the potential cartesian product
        # between table & cset if there is no qfilter
631
        sql = select([table.c.id,
632
                      table.c.diff,
633
                      table.c.parent,
634
635
                      cset.c.insertion_date]
        ).order_by(table.c.id
636
        ).where(table.c.id > snapid)
637

638
639
640
641
        if qfilter:
            sql = sql.where(table.c.csid == cset.c.id)
            for filtercb in qfilter:
                sql = sql.where(filtercb(cset, table))
642

643
        alldiffs = pd.read_sql(sql, cn)
644
645

        if len(alldiffs) == 0:
646
            return snapshot
647

648
        # initial ts
649
        ts = self._deserialize(alldiffs.loc[0, 'diff'], table.name)
650
        ts = self._ensure_tz_consistency(cn, ts)
651
        for row in alldiffs.loc[1:].itertuples():
652
653
            diff = subset(self._deserialize(row.diff, table.name),
                          from_value_date, to_value_date)
654
            diff = self._ensure_tz_consistency(cn, diff)
655
            ts = self._apply_diff(ts, diff)
656
        ts = self._apply_diff(snapshot, ts)
657
658
        assert ts.index.dtype.name == 'datetime64[ns]' or len(ts) == 0
        return ts
659
660
661

    # diff handling

662
663
    def _diff(self, cn, csetid, name):
        table = self._get_ts_table(cn, name)
664
        cset = self.schema.changeset
665
666
667
668
669
670

        def filtercset(sql):
            return sql.where(table.c.csid == cset.c.id
            ).where(cset.c.id == csetid)

        sql = filtercset(select([table.c.id]))
671
        tsid = cn.execute(sql).scalar()
672
673
674
675
676
677
678

        if tsid == 1:
            sql = select([table.c.snapshot])
        else:
            sql = select([table.c.diff])
        sql = filtercset(sql)

679
680
        ts = self._deserialize(cn.execute(sql).scalar(), name)
        return self._ensure_tz_consistency(cn, ts)
681

682
    def _compute_diff(self, fromts, tots):
683
684
        """Compute the difference between fromts and tots
        (like in tots - fromts).
685
686

        """
687
        if fromts is None:
688
            return tots
689
690
691
        fromts = fromts[~fromts.isnull()]
        if not len(fromts):
            return tots
Aurélien Campéas's avatar
Aurélien Campéas committed
692

693
694
695
696
697
        mask_overlap = tots.index.isin(fromts.index)
        fromts_overlap = fromts[tots.index[mask_overlap]]
        tots_overlap = tots[mask_overlap]

        if fromts.dtype == 'float64':
698
            mask_equal = np.isclose(fromts_overlap, tots_overlap,
699
                                    rtol=0, atol=self._precision)
700
701
702
        else:
            mask_equal = fromts_overlap == tots_overlap

703
704
705
        mask_na_equal = fromts_overlap.isnull() & tots_overlap.isnull()
        mask_equal = mask_equal | mask_na_equal

706
707
        diff_overlap = tots[mask_overlap][~mask_equal]
        diff_new = tots[~mask_overlap]
708
        diff_new = diff_new[~diff_new.isnull()]
709
        return pd.concat([diff_overlap, diff_new])
710
711
712

    def _apply_diff(self, base_ts, new_ts):
        """Produce a new ts using base_ts as a base and taking any
713
        intersecting and new values from new_ts.
714
715
716
717
718
719
720
721
722
723

        """
        if base_ts is None:
            return new_ts
        if new_ts is None:
            return base_ts
        result_ts = pd.Series([0.0], index=base_ts.index.union(new_ts.index))
        result_ts[base_ts.index] = base_ts
        result_ts[new_ts.index] = new_ts
        result_ts.sort_index(inplace=True)
724
        result_ts.name = base_ts.name
725
        return result_ts