-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Perform batch RPC requests to ethereum node #832
Comments
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
Ah yes, thanks. I updated the issue to indicate that HTTP is the primary target. 👍 I will be happy if we find that it boosts speed for everything else, too! |
@carver having now thought about this for a few minutes, I think this is going to be easy if we build it on top of #657 since everything under Here's some imagined API w3 = Web3(...)
batch = web3.async.create_batch()
batch.eth.getBalance(addr_a)
batch.eth.getBalance(addr_b)
batch.eth.getBalance(addr_c)
bal_a, bal_b, bal_c = await batch.execute() |
Interesting idea. What happens when one tries to send multiple tranascations? Cuz in my system, I have to wait till the first transaction clears before send the next. Prob because getTransactionCount doesn’t take the pending transaction into consideration. I didn’t really dive into the problem, but wanted to bring it up. |
@dangell7 good point. We may choose to start with a minimal initial implementation which only allows batching requests that are read-only or that don't modify state. |
@pipermerriam anybody working on this? it's not all that far off the poc i did a while ago. might be able to resurrect that without too many changes as a wip starting point. |
@boneyard93501 not that I've seen |
@carver cool. if you don't mind, i'll take a stab at it starting wednesday. |
@boneyard93501 are you looking at starting into the |
@pipermerriam @carver import time
import datetime
import queue
import threading
WORKER_TIMEOUT = 5.0
def batch_request_handler(rpc_request_queue, provider, response_processing_fn):
''' '''
last_job = None
while 1:
try:
if (datetime.datetime.utcnow() - last_job).total_seconds() > WORKER_TIMEOUT:
response_processing_fn(('','', 'TimeoutError'))
break
job = rpc_request_queue.get()
if job is None:
break
last_job = datetime.datetime.utcnow()
try:
result = provider._make_request(job)
response_processing_fn((job,result, ''))
except Exception as exc:
response_processing_fn((job,'',exc))
except (KeyboardInterrupt, SystemExit):
pass
finally:
rpc_request_queue.task_done()
time.sleep(0.1)
def batch_request(provider, payload, response_processing_fn, max_request_threads=10, max_request_q=200):
'''
could change provider to provider_type and then create provider in the worker to
avoid any possible thread-safety issues now of down the road.
also should take threading.Event, adjust worker while condition accordingly, so user can kill it all at will
'''
rpc_reqeuest_queue = queue.Queue()
request_args = (rpc_reqeuest_queue, provider, response_processing_fn)
req_threads = []
for i in range(max_request_threads):
t = threading.Thread(target=batch_request_handler, args=request_args)
t.setDaemon(True) # we don't really need it since we're joining but it helps to avoid zombies
t.name = f'batch request thread {i}'
req_threads.append(t)
[t.start() for t in req_threads]
for job in payload:
rpc_reqeuest_queue.put(job)
time.sleep(WORKER_TIMEOUT + 1.0)
[t.join() for t in req_threads]
# user side
class SuperSimplClientBatchProcessingClass:
def __init__(self, batch_size):
''' '''
self.expected_results = batch_size
self.jobs_processed = 0
def job_update(self, data):
''' only requirement from web3py side is to be able to accept a (named) tuple '''
self.jobs_processed += 1
if data[1]:
pass # do something with successes
else:
pass # do something with failures including TimeoutError
@property
def done(self):
if len(self.jobs_processed) == self.jobs_processed:
return True
else:
return False
@property
def progress(self):
return {'batch-size': self.batch_size, 'processed_jobs': self.jobs_processed} |
@boneyard93501 echoing what @voith said in another issue.
|
k. converting above to asyncio Queue is fairly trivial. although you'll want to make threaded allowances (concurrent.futures, ThreadPoolExecutor). for event loop registration purposes, i was trying to find the (repo for the) async API but came up short. where can i find it? thx. now, wrapping providers is nice and may have some benefits but the real benefit of batch processing comes from muxing over websocket(s). how do you feel about being opinionated and a) provide a ws-based batch processing solution only (once a suitable ws implementation is in place) and b) provide an example(s) in the documentation on how to wrap providers right now? |
Is this what you are looking for? https://docs.python.org/3.5/library/asyncio.html
Not sure I fully understand the question. Generally, we are going to be cautious about releasing an API that we aren't sure about because of our commitment to graceful deprecation cycle for breaking API changes. I think the code and approach you are taking will be useful, but I'm not confident that it's something that we can release prior to having some baseline |
@pipermerriam actually, i was looking for other than that, i proposed, well attempted to, to not implement any batch functionality until we know what the websocket revision looks like and instead consider addressing batch with user-side implementation examples in the documentation. |
@pipermerriam @boneyard93501 I compared the execution time between requesting the transaction receipts for a particular block synchronously and asynchronously. I used
When it's run over the Internet the results are:
Overall I get 7x-11x execution time improvement if the requests are batched and sent asynchronously. My code below: import timeit
import asyncio
from aiohttp import ClientSession
from web3.providers.base import JSONBaseProvider
from web3.providers import HTTPProvider
from web3 import Web3
# synchronously request receipts for given transactions
def sync_receipts(web3, transactions):
for tran in transactions:
web3.eth.getTransactionReceipt(tran)
# asynchronous JSON RPC API request
async def async_make_request(session, url, method, params):
base_provider = JSONBaseProvider()
request_data = base_provider.encode_rpc_request(method, params)
async with session.post(url, data=request_data,
headers={'Content-Type': 'application/json'}) as response:
content = await response.read()
response = base_provider.decode_rpc_response(content)
return response
async def run(node_address, transactions):
tasks = []
# Fetch all responses within one Client session,
# keep connection alive for all requests.
async with ClientSession() as session:
for tran in transactions:
task = asyncio.ensure_future(async_make_request(session, node_address,
'eth_getTransactionReceipt',[tran.hex()]))
tasks.append(task)
responses = await asyncio.gather(*tasks)
if __name__ == "__main__":
eth_node_address = "http://localhost:8545"
web3 = Web3(HTTPProvider(eth_node_address))
block = web3.eth.getBlock(web3.eth.blockNumber)
transactions = block['transactions']
start_time = timeit.default_timer()
sync_receipts(web3, transactions)
print('sync: {:.3f}s'.format(timeit.default_timer() - start_time))
start_time = timeit.default_timer()
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(run(eth_node_address, transactions))
loop.run_until_complete(future)
print('async: {:.3f}s'.format(timeit.default_timer() - start_time)) |
Yes, these look like very nice performance boosts. I'd be excited to see someone start working on the |
@jakublipinski Have you tried batch JSON RPC requests https://www.jsonrpc.org/specification#batch? I found that it significantly boosts performance. |
Looks like I had a totally incorrect understanding of this feature. I was thinking of batching in terms of concurrency, but after reading the rpc call Batch:
Thanks @medvedev1088 |
@medvedev1088 @voith nice! That's what I meant from the beginning. I got a really good performance doing that |
Just noting that this fits nicely with the new |
Hey guys any news on this one? Any plans to implement it? Kind of sucks to have to use the multicall contract to do batch queries on the node when the nodes itself allows batch queries. The js guys already have this functionality for years now and are gonna be making fun of us pythonistas 🐍 |
Couple of thoughts:
|
@Pet3ris or anyone else on here - if you need this and want to get it rolling, I can definitely help out! I want to finish up a few other things (primarily getting async off the ground) before I put this on my immediate queue, but I can definitely provide feedback and review for anyone wanting to do the legwork now.
I think this will be a challenge, but I like @Uxio0's implementation where you pass in the function name, and a param to either raise the exception or not. I think that provides a good start and we can iterate from there.
Yeah, let's not break |
Yeah, that plus Piper's seems like a reasonable combo to me. It seems okay to just allow any exceptions to bubble up normally, though maybe we could wrap it in an exception that identifies the index of the call that raised it? Someday there will be something Exception Groups (PEP 654), but until then we can do the dumb simple thing. |
If it's of any use I'd be happy to clean up this stop-gap solution that I hacked together. It does not expose any async interfaces but it seems to enable the websocket provider to be used with a threadpool executor thus allowing multiple rpcs to be in flight without the overhead of making individual http requests. import os
import asyncio
import concurrent.futures
import contextlib
import json
import threading
import time
from typing import Any, Optional, Union
import web3
from eth_typing import URI
from web3 import HTTPProvider
from web3.providers.websocket import (
DEFAULT_WEBSOCKET_TIMEOUT,
PersistentWebSocket,
WebsocketProvider,
)
from web3.types import RPCResponse, RPCEndpoint
class Multiplexer:
@classmethod
async def new(cls, conn: PersistentWebSocket):
return cls(await conn.__aenter__(), asyncio.Queue())
def __init__(self, conn, queue):
self._conn = conn
self._queue = queue
self._events = {}
self._responses = {}
async def send(self):
while True:
data = await self._queue.get()
await self._conn.send(data)
async def recv(self):
while True:
data = await self._conn.recv()
id = json.loads(data)["id"] # TODO: error handling?
self._responses[id] = data
self._events.pop(id).set()
async def run(self):
await asyncio.gather(
self.send(),
self.recv(),
)
def __call__(self, data):
event = threading.Event()
id = json.loads(data)["id"]
self._events[id] = event
self._queue.put_nowait(data)
event.wait()
return self._responses.pop(id)
class MultiplexingWebsocketProvider(WebsocketProvider):
def __init__(
self,
endpoint_uri: Optional[Union[URI, str]] = None,
websocket_kwargs: Optional[Any] = None,
websocket_timeout: int = DEFAULT_WEBSOCKET_TIMEOUT,
) -> None:
super().__init__(endpoint_uri, websocket_kwargs, websocket_timeout)
self._multiplexer = None
self._multiplexer_fut = None
def make_request(self, method: RPCEndpoint, params: Any) -> RPCResponse:
request_data = self.encode_rpc_request(method, params)
if self._multiplexer is None:
assert self._multiplexer_fut is None
self._multiplexer = asyncio.run_coroutine_threadsafe(
Multiplexer.new(self.conn),
self._loop,
).result()
self._multiplexer_fut = asyncio.run_coroutine_threadsafe(
self._multiplexer.run(),
self._loop,
) # TODO: stop properly
return json.loads(self._multiplexer(request_data))
@contextlib.contextmanager
def timer():
t0 = time.perf_counter()
yield
t1 = time.perf_counter()
print(t1 - t0)
def stress(n):
w3 = web3.Web3(MultiplexingWebsocketProvider(os.environ["ETH_NODE_WSS"]))
# some dummy load that uses the RPC interfaces
def call(_):
nonlocal w3
return w3.eth.block_number
with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor:
list(executor.map(call, range(n)))
if __name__ == "__main__":
with timer():
stress(200) |
It's great! But, if you increase the n to 400+ for stress function, it might hang forever. |
As @rockkoca says, when increasing the number of requests to the nodes it can hang forever or even hang the node itself. I've found out that For reference, here's the implementation: https://github.com/gnosis/gnosis-py/blob/master/gnosis/eth/multicall.py and here the guide on how to use it |
@Uxio0 Great piece of code. For example: And then have something like: |
Hi, thanks. No, there's not (yet). I'm using async http via |
Just my two cents, I think the web3.js implementation of batch request utilizing the ETH nodes native json-rpc batch request capabilities have some distinct advantages:
I think a batch request functionality similar to the web3.js implementation would be great for web3.py, I use it regularly with frontend DApps and nodejs backends to vastly improve the performance when reading data from contracts. |
https://github.com/Narasimha1997/aio-eth Nice lib implements batch and concurrent requests |
Five years on and still no progress on this issue. |
need this future |
If you need this feature right now, |
I can see how 'safe-eth-py' can batch contract calls, but can it batch web3 calls like get_block? |
There's not an API to batch different type of requests, but indeed there's a |
Hello, would anyone have a solution for performing batch operations where the E.g., for # Get the results of past calls to Chainlink EACAggregatorProxy.latestAnswer
latest_answer = chainlink_price_feed_contract.functions.latestAnswer().call(
block_identifier=block_identifier
) I would like to give a different I guess I'm a bit confused about what qualifies as |
What was wrong?
There's currently not support for batch requests, when using
HTTPProvider
. Even using IPC/WS connectors I think a speed up could be possible using batch requests.How can it be fixed?
Implementing batch requests
The text was updated successfully, but these errors were encountered: