cli.py 8.97 KB
Newer Older
Aurélien Campéas's avatar
Aurélien Campéas committed
1
import os, signal
2
from pkg_resources import iter_entry_points
Aurélien Campéas's avatar
Aurélien Campéas committed
3
4
from time import time
import random
Aurélien Campéas's avatar
Aurélien Campéas committed
5
from pathlib import Path
6
7

from dateutil import parser
8
from json import dumps
9

10
import click
11
from sqlalchemy import create_engine
12
from dateutil.parser import parse as temporal
13
import pandas as pd
14

15
from tshistory.tsio import timeseries
16
17
18
from tshistory.util import (
    delete_series,
    find_dburi,
19
    fromjson
20
21
)

22
import tshistory.schema
23

24

25
26
27
# Override points
# * for the log command

28
REVFMT = """
29
30
31
32
33
revision: {rev}
author:   {author}
date:     {date}
""".strip()

Aurélien Campéas's avatar
Aurélien Campéas committed
34

35
def format_rev(rev):
36
    fmt = REVFMT + '\n'
37
38
39
40
    if rev.get('diff'):
        fmt += 'series: {names}\n\n'
        lines = []
        for ts in rev['diff'].values():
41
            lines.append(ts.to_string())
42
43
44
45
46
47
48
        fmt += '\n'.join(lines)
    else:
        fmt += 'series:   {names}'

    return fmt.format(**rev)


49
@click.group()
50
def tsh():
51
    pass
52
53


54
55
56
57
@tsh.command()
@click.argument('db-uri')
@click.argument('seriename')
@click.option('--json', is_flag=True, default=False)
58
59
@click.option('--namespace', default='tsh')
def get(db_uri, seriename, json, namespace='tsh'):
60
    """show a serie in its current state """
61
    engine = create_engine(find_dburi(db_uri))
62
    tsh = timeseries(namespace)
63

64
    ts = tsh.get(engine, seriename)
65
66
67
68
69
70
71
    if json:
        print(ts.to_json())
    else:
        with pd.option_context('display.max_rows', None, 'display.max_columns', 3):
            print(ts)


72
73
74
75
@tsh.command()
@click.argument('db-uri')
@click.argument('seriename')
@click.option('--json', is_flag=True, default=False)
76
77
@click.option('--from-insertion-date', type=temporal)
@click.option('--to-insertion-date', type=temporal)
78
79
80
@click.option('--from-value-date', type=temporal)
@click.option('--to-value-date', type=temporal)
@click.option('--diff/--no-diff', is_flag=True, default=True)
81
@click.option('--namespace', default='tsh')
82
83
def history(db_uri, seriename,
            from_insertion_date, to_insertion_date,
84
            from_value_date, to_value_date,
85
86
            diff, json,
            namespace='tsh'):
87
    """show a serie full history """
88
    engine = create_engine(find_dburi(db_uri))
89

90
    tsh = timeseries(namespace)
91
    with engine.begin() as cn:
92
        hist = tsh.history(
93
94
95
96
97
            cn, seriename,
            from_insertion_date, to_insertion_date,
            from_value_date, to_value_date,
            diffmode=diff
        )
98
    if json:
99
100
101
102
103
104
105
106
        out = {
            str(idate): {
                str(vdate): val
                for vdate, val in ts.to_dict().items()
            }
            for idate, ts in hist.items()
        }
        print(dumps(out))
107
    else:
108
109
        for idate in hist:
            print(hist[idate])
110
111


112
@tsh.command()
113
114
@click.argument('db-uri')
@click.option('--limit', '-l', default=None)
115
@click.option('--serie', '-s', multiple=True)
116
117
@click.option('--from-rev')
@click.option('--to-rev')
118
119
@click.option('--from-insertion-date', type=temporal)
@click.option('--to-insertion-date', type=temporal)
120
@click.option('--namespace', default='tsh')
121
def log(db_uri, limit, serie, from_rev, to_rev,
122
123
        from_insertion_date, to_insertion_date,
        namespace='tsh'):
124
    """show revision history of entire repository or series"""
125
    engine = create_engine(find_dburi(db_uri))
126
    tsh = timeseries(namespace)
127

128
    for rev in tsh.log(engine, limit=limit, names=serie,
129
130
                       fromrev=from_rev, torev=to_rev,
                       fromdate=from_insertion_date, todate=to_insertion_date):
131
132
133
134
135
        rev['names'] = ','.join(rev['names'])
        print(format_rev(rev))
        print()


136
137
138
139
140
141
INFOFMT = """
changeset count: {changeset count}
series count:    {series count}
series names:    {serie names}
""".strip()

Aurélien Campéas's avatar
Aurélien Campéas committed
142

143
144
@tsh.command()
@click.argument('db-uri')
145
146
@click.option('--namespace', default='tsh')
def info(db_uri, namespace='tsh'):
147
    """show global statistics of the repository"""
148
    engine = create_engine(find_dburi(db_uri))
149

150
    info = timeseries(namespace).info(engine)
151
152
153
154
    info['serie names'] = ', '.join(info['serie names'])
    print(INFOFMT.format(**info))


155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# series maintenance

@tsh.command()
@click.argument('db-uri')
@click.argument('mapfile', type=click.Path(exists=True))
@click.option('--namespace', default='tsh')
def rename(db_uri, mapfile, namespace='tsh'):
    """rename series by providing a map file (csv format)

    map file header must be `old,new`
    """
    seriesmap = {
        p.old: p.new
        for p in pd.read_csv(mapfile).itertuples()
    }
170
    engine = create_engine(find_dburi(db_uri))
171
    tsh = timeseries(namespace)
172
173
174
175
    for old, new in seriesmap.items():
        with engine.begin() as cn:
            print('rename', old, '->', new)
            tsh.rename(cn, old, new)
176
177


178
179
@tsh.command()
@click.argument('db-uri')
180
181
@click.option('--series')
@click.option('--deletefile', type=click.Path(exists=True))
182
@click.option('--namespace', default='tsh')
183
def delete(db_uri, series=None, deletefile=None, namespace='tsh'):
184
185
186
187
    """delete series by providing a one-column file (csv format)

    file header must be `name`
    """
188
189
190
191
192
193
194
195
196
197
198
199
    if not (series or deletefile):
        print('You must provide a series name _or_ a csv file path')
        return

    if deletefile:
        series = [
            p.name
            for p in pd.read_csv(deletefile).itertuples()
        ]
    else:
        series = [series]

200
    engine = create_engine(find_dburi(db_uri))
201
202
203
    delete_series(engine, series, namespace)


204
205
# db maintenance

206
@tsh.command(name='init-db')
207
@click.argument('db-uri')
208
@click.option('--reset', is_flag=True, default=False)
209
210
@click.option('--namespace', default='tsh')
def init_db(db_uri, reset=False, namespace='tsh'):
211
    """initialize an new db."""
212
    engine = create_engine(find_dburi(db_uri))
213
214
215
    schem = tshistory.schema.tsschema(namespace)
    schem.define()

216
    if reset:
217
218
        assert schem.exists(engine)
        schem.destroy(engine)
219

220
    schem.create(engine)
221

Aurélien Campéas's avatar
Aurélien Campéas committed
222
223
224

@tsh.command(name='check')
@click.argument('db-uri')
225
@click.option('--series', default=None, help='series name to check')
Aurélien Campéas's avatar
Aurélien Campéas committed
226
@click.option('--namespace', default='tsh')
227
def check(db_uri, series=None, namespace='tsh'):
Aurélien Campéas's avatar
Aurélien Campéas committed
228
    "coherence checks of the db"
229
    e = create_engine(find_dburi(db_uri))
230
231
232
233
234
    if series is None:
        sql = 'select seriename from "{}".registry order by seriename'.format(namespace)
        series = [row.seriename for row in e.execute(sql)]
    else:
        series = [series]
Aurélien Campéas's avatar
Aurélien Campéas committed
235

236
    tsh = timeseries(namespace)
Aurélien Campéas's avatar
Aurélien Campéas committed
237
238
    for idx, s in enumerate(series):
        t0 = time()
239
        with e.begin() as cn:
240
            hist = tsh.history(cn, s)
Aurélien Campéas's avatar
Aurélien Campéas committed
241
        start, end = None, None
242
        mon = True
Aurélien Campéas's avatar
Aurélien Campéas committed
243
244
245
246
247
        for ts in hist.values():
            cmin = ts.index.min()
            cmax = ts.index.max()
            start = min(start or cmin, cmin)
            end = max(end or cmax, cmax)
248
            mon = ts.index.is_monotonic_increasing
Aurélien Campéas's avatar
Aurélien Campéas committed
249
        ival = tsh.interval(e, s)
250
251
252
253
        if ival.left != start:
            print('  start:', s, f'{ival.left} != {start}')
        if ival.right != end:
            print('  end:', s, f'{ival.right} != {end}')
254
255
256
257
        monmsg = '' if mon else 'non-monotonic'
        print(idx, s, 'inserts={}, read-time={} {}'.format(
            len(hist), time() - t0, monmsg)
        )
Aurélien Campéas's avatar
Aurélien Campéas committed
258
259


260
261
262
263
@tsh.command(name='shell')
@click.argument('db-uri')
@click.option('--namespace', default='tsh')
def shell(db_uri, namespace='tsh'):
264
    e = create_engine(find_dburi(db_uri))
265

266
    tsh = timeseries(namespace)
267
268
269
    import pdb; pdb.set_trace()


270
271
272
273
274
@tsh.command(name='migrate-0.6-to-0.7')
@click.argument('db-uri')
@click.option('--namespace', default='tsh')
def migrate_dot_6_to_dot_7(db_uri, namespace='tsh'):
    e = create_engine(find_dburi(db_uri))
275
    tsh = timeseries(namespace)
276
277
    with e.begin() as cn:
        # drop not null
278
        print(f'{namespace}: alter changeset_series table')
279
280
281
282
283
284
285
286
287
288
        cn.execute(f'alter table "{namespace}".changeset_series '
                   'alter column serie drop not null')
        # alter foreign key on delete: delete -> set null
        cn.execute(f'alter table "{namespace}".changeset_series '
                   'drop constraint "changeset_series_serie_fkey"')
        cn.execute(f'alter table "{namespace}".changeset_series '
                   'add constraint "changeset_series_serie_fkey" '
                   f'foreign key (serie) references "{namespace}".registry (id) '
                   'on delete set null')

289
290
291
292
293
294
295
296
297
298
    print('reclaim unreachable chunks left behind by strip')
    from tshistory.snapshot import Snapshot
    for name in tsh.list_series(e):
        snap = Snapshot(e, tsh, name)
        garb = snap.garbage()
        if garb:
            print('************************', name, 'garbage =', len(garb))
            if reclaim:
                snap.reclaim()

299

300
301
302
303
for ep in iter_entry_points('tshistory.subcommands'):
    tsh.add_command(ep.load())


304
if __name__ == '__main__':
305
    tsh()