Beem: Race condition in blockchain.blocks() when using threads

Project Information

The blockchain class enables to access the Steem blockchain and read data from it. blockchain.blocks() can be used to stream a number of blocks from an API node. blockchain.blocks() has an option to use multi-threading to speed up fetching the blocks from API nodes considerably.

Expected behavior

blockchain.blocks(start=start, stop=stop, threading=True) should return all requested blocks.

Actual behavior

blockchain.blocks(start=start, stop=stop, threading=True) may raise a RuntimeError due to a race condition when a dictionary is changed while being iterated on. The streaming of the blocks stops at non-deterministic block numbers.

Traceback (most recent call last):
  File "block_race_condition.py", line 9, in <module>
    for block in b.blocks(start=10000, stop=20000, threading=True, thread_num=8):
  File "/usr/local/lib/python3.6/site-packages/beem/blockchain.py", line 241, in blocks
    results = [r.result() for r in as_completed(futures)]
  File "/usr/local/lib/python3.6/site-packages/beem/blockchain.py", line 241, in <listcomp>
    results = [r.result() for r in as_completed(futures)]
  File "/usr/local/lib/python3.6/concurrent/futures/_base.py", line 425, in result
    return self.__get_result()
  File "/usr/local/lib/python3.6/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.6/concurrent/futures/thread.py", line 56, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/usr/local/lib/python3.6/site-packages/beem/block.py", line 68, in __init__
    steem_instance=steem_instance
  File "/usr/local/lib/python3.6/site-packages/beem/blockchainobject.py", line 138, in __init__
    self.cache()
  File "/usr/local/lib/python3.6/site-packages/beem/blockchainobject.py", line 163, in cache
    BlockchainObject._cache[self.get(self.id_item)] = self
  File "/usr/local/lib/python3.6/site-packages/beem/blockchainobject.py", line 33, in __setitem__
    self.clear_expired_items()
  File "/usr/local/lib/python3.6/site-packages/beem/blockchainobject.py", line 48, in clear_expired_items
    for key in self.keys():
RuntimeError: dictionary changed size during iteration

The corresponding code part is in https://github.com/holgern/beem/blob/bd8cf746f7a3287654c9084da6e641bb4838e7ca/beem/blockchainobject.py#L47

Screenshot_2018-05-28_16-58-06.png
self.keys() changes while being iterated on.

How to reproduce

#!/usr/bin/python

from beem.blockchain import Blockchain
from beem import Steem
import sys

b = Blockchain()

for block in b.blocks(start=10000, stop=20000, threading=True, thread_num=8):
    sys.stdout.write("%s (%s)\r" % (block['id'], block['timestamp']))

This code sometimes results in the RuntimeError above. I've been using the same structure in a long running loop and hit the RuntimeError by chance every now and then. With the example code above, the block number at which the error occurs changes and it may need dozens of runs until it actually fails. However, the problem can be articifially amplified by adding a time.sleep(1) into the corresponding for key in self.keys(): loop:

diff --git a/beem/blockchainobject.py b/beem/blockchainobject.py
index 03d22de..09d56ca 100644
--- a/beem/blockchainobject.py
+++ b/beem/blockchainobject.py
@@ -9,6 +9,7 @@ from beemgraphenebase.py23 import bytes_types, integer_types, string_types, text
 from beem.instance import shared_steem_instance
 from datetime import datetime, timedelta
 import json
+import time


 @python_2_unicode_compatible
@@ -46,6 +47,7 @@ class ObjectCache(dict):
         keys = []
         for key in self.keys():
             keys.append(key)
+            time.sleep(1)
         for key in keys:
             value = dict.__getitem__(self, key)
             if datetime.utcnow() >= value["expires"]:

With this change, the library spends significant amounts of time in the iteration loop on self.keys(). Technically it does not make a difference apart from being terribly slow now. With this in place, the above exception can be triggered with the given code sample within seconds.

The exception is not raised when multi-threading is disabled via threading=False. To my understanding, multiple threads are accessing the same ObjectCache() instance. Disabling the cache via the use_cache=False flag in the BlockchainObject constructor defaults for testing purposes did not work:

Traceback (most recent call last):
  File "block_race_condition.py", line 10, in <module>
    sys.stdout.write("%s (%s)\r" % (block['id'], block['timestamp']))
  File "/usr/local/lib/python3.6/site-packages/beem/blockchainobject.py", line 173, in __getitem__
    self.refresh()
  File "/usr/local/lib/python3.6/site-packages/beem/block.py", line 76, in refresh
    self.identifier = int(self.identifier)
TypeError: int() argument must be a string, a bytes-like object or a number, not 'NoneType'

Using a copy of self.keys() via for key in self.keys()[:]: will only shift the problem, because two instances may still try to access/delete the same cache entries independently.

Environment

# beempy --version
beempy, version 0.19.32
# python --version
Python 3.6.5

GitHub Account

https://github.com/crokkon
A Github issue has been created:
https://github.com/holgern/beem/issues/16

H2
H3
H4
3 columns
2 columns
1 column
5 Comments