Commit ec7ca2a8 authored by Aurélien Campéas's avatar Aurélien Campéas
Browse files

insert: exhibit a deadlock whwhen running in parallel, at creation time

OperationalError('(psycopg2.extensions.TransactionRollbackError) deadlock detected
DETAIL:  Process 5252 waits for ShareRowExclusiveLock on relation 34066 of database 12407; blocked by process 5246.
Process 5246 waits for ShareRowExclusiveLock on relation 34066 of database 12407; blocked by process 5252.
parent 0a81ff18ba1e
......@@ -8,7 +8,8 @@ import numpy as np
import pandas as pd
from tshistory.snapshot import Snapshot
from tshistory.util import rename_series
from tshistory.util import rename_series, threadpool
from tshistory.tsio import TimeSerie
from tshistory.testutil import (
assert_df,
assert_hist,
......@@ -1242,3 +1243,29 @@ def test_index_order(engine, tsh):
with pytest.raises(AssertionError):
tsh.insert(engine, ts.sort_index(ascending=False),
'test_order', 'babar')
def test_parallel(engine, tsh):
ts = genserie(datetime(2010, 1, 1), 'D', 10)
pool = threadpool(4)
args = [
(ts, 'a', 'aurelien'),
(ts, 'b', 'arnaud'),
(ts, 'c', 'alain'),
(ts, 'd', 'andre')
]
errors = []
def insert(ts, name, author):
tsh = TimeSerie()
with engine.begin() as cn:
try:
tsh.insert(cn, ts, name, author)
except Exception as e:
errors.append(e)
pool(insert, args)
# we got a deadlock
assert len(errors)
import math
import zlib
import hashlib
import logging
import threading
import numpy as np
import pandas as pd
......@@ -155,3 +157,38 @@ def delete_series(engine, series, namespace='tsh'):
continue
print('delete', name)
tsh.delete(cn, name)
def threadpool(maxthreads):
L = logging.getLogger('parallel')
def run(func, argslist):
count = 0
threads = []
L.info('// run %s %s', func.__name__, len(argslist))
# initial threads
for count, args in enumerate(argslist, start=1):
th = threading.Thread(target=func, args=args)
threads.append(th)
L.info('// start thread %s', th.name)
th.daemon = True
th.start()
if count == maxthreads:
break
while threads:
for th in threads[:]:
th.join(1. / maxthreads)
if not th.is_alive():
threads.remove(th)
L.info('// thread %s exited, %s remaining', th.name, len(threads))
if count < len(argslist):
newth = threading.Thread(target=func, args=argslist[count])
threads.append(newth)
L.info('// thread %s started', newth.name)
newth.daemon = True
newth.start()
count += 1
return run
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment