Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perform batch RPC requests to ethereum node #832

Closed
Uxio0 opened this issue May 9, 2018 · 52 comments · Fixed by #3370
Closed

Perform batch RPC requests to ethereum node #832

Uxio0 opened this issue May 9, 2018 · 52 comments · Fixed by #3370

Comments

@Uxio0
Copy link
Contributor

Uxio0 commented May 9, 2018

  • Version: 4.2.0
  • Python: 3.6
  • OS: osx/linux/win

What was wrong?

There's currently not support for batch requests, when using HTTPProvider. Even using IPC/WS connectors I think a speed up could be possible using batch requests.

How can it be fixed?

Implementing batch requests

@carver

This comment has been minimized.

@pipermerriam

This comment has been minimized.

@Uxio0

This comment has been minimized.

@carver

This comment has been minimized.

@Uxio0

This comment has been minimized.

@carver
Copy link
Collaborator

carver commented May 9, 2018

Ah yes, thanks. I updated the issue to indicate that HTTP is the primary target. 👍

I will be happy if we find that it boosts speed for everything else, too!

@pipermerriam
Copy link
Member

@carver having now thought about this for a few minutes, I think this is going to be easy if we build it on top of #657 since everything under web3.async.* will be coroutine based and thus, batching them to run concurrently (in the asyncio version of concurrency) should be really easy to expose .

Here's some imagined API

w3 = Web3(...)

batch = web3.async.create_batch()
batch.eth.getBalance(addr_a)
batch.eth.getBalance(addr_b)
batch.eth.getBalance(addr_c)
bal_a, bal_b, bal_c = await batch.execute()
@dangell7
Copy link

dangell7 commented May 9, 2018

Interesting idea. What happens when one tries to send multiple tranascations? Cuz in my system, I have to wait till the first transaction clears before send the next. Prob because getTransactionCount doesn’t take the pending transaction into consideration. I didn’t really dive into the problem, but wanted to bring it up.

@pipermerriam
Copy link
Member

@dangell7 good point. We may choose to start with a minimal initial implementation which only allows batching requests that are read-only or that don't modify state.

@boneyard93501
Copy link
Contributor

@pipermerriam anybody working on this? it's not all that far off the poc i did a while ago. might be able to resurrect that without too many changes as a wip starting point.

@carver
Copy link
Collaborator

carver commented May 21, 2018

@boneyard93501 not that I've seen

@boneyard93501
Copy link
Contributor

@carver cool. if you don't mind, i'll take a stab at it starting wednesday.

@pipermerriam
Copy link
Member

@boneyard93501 are you looking at starting into the web3.async API? If not, can you provide a proposal for what you're looking to build? Basic API and a basic description of how it'll work.

@boneyard93501
Copy link
Contributor

@pipermerriam @carver
if a solution is supposed to be provider agnostic, i think a thread-based pub-sub solution is probably the most straight forward (and could be wrapped in an event-loop, if needed). quick and dirty outline, untested, unoptimized/un-toolzed:

import time
import datetime
import queue
import threading


WORKER_TIMEOUT = 5.0


def batch_request_handler(rpc_request_queue, provider, response_processing_fn):
    '''  '''
    last_job = None
    while 1:
        try:
            if (datetime.datetime.utcnow() - last_job).total_seconds() > WORKER_TIMEOUT:
                response_processing_fn(('','', 'TimeoutError'))
                break
            job = rpc_request_queue.get()
            if job is None:
                break
            last_job = datetime.datetime.utcnow()
            try:
                result = provider._make_request(job)
                response_processing_fn((job,result, ''))
            except Exception as exc:
                response_processing_fn((job,'',exc))
        except (KeyboardInterrupt, SystemExit):
            pass
        finally:
            rpc_request_queue.task_done()
        time.sleep(0.1)


def batch_request(provider, payload, response_processing_fn, max_request_threads=10, max_request_q=200):
    '''
        could change provider to provider_type and then create provider in the worker to
        avoid any possible thread-safety issues now of down the road.
        also should take threading.Event, adjust worker while condition accordingly, so user can kill it all at will
   '''

    rpc_reqeuest_queue = queue.Queue()

    request_args = (rpc_reqeuest_queue, provider, response_processing_fn)
    req_threads = []

    for i in range(max_request_threads):
        t = threading.Thread(target=batch_request_handler, args=request_args)
        t.setDaemon(True)  # we don't really need it since we're joining but it helps to avoid zombies
        t.name = f'batch request thread {i}'
        req_threads.append(t)

    [t.start() for t in req_threads]

    for job in payload:
        rpc_reqeuest_queue.put(job)

    time.sleep(WORKER_TIMEOUT + 1.0)
    [t.join() for t in req_threads]

 

# user side
class SuperSimplClientBatchProcessingClass:
    def __init__(self, batch_size):
        ''' '''
        self.expected_results = batch_size
        self.jobs_processed = 0

    def job_update(self, data):
        ''' only requirement from web3py side is to be able to accept a (named) tuple '''
        self.jobs_processed += 1
        if data[1]:
            pass  # do something with successes
        else:
            pass  # do something with failures including TimeoutError

    @property
    def done(self):
        if len(self.jobs_processed) == self.jobs_processed:
            return True
        else:
            return False

    @property
    def progress(self):
        return {'batch-size': self.batch_size, 'processed_jobs': self.jobs_processed}
@pipermerriam
Copy link
Member

@boneyard93501 echoing what @voith said in another issue.

  1. I think asyncio is going to be our goto for concurrency (and conversely, I'm hesitant to add any thread based concurrency for that reason).
  2. I don't have a specific concern here, but I know historically that users who've tried to use web3.py in conjunction with threads have had issues, so a thread based solution seems likely to have unexpected problems.
@boneyard93501
Copy link
Contributor

k. converting above to asyncio Queue is fairly trivial. although you'll want to make threaded allowances (concurrent.futures, ThreadPoolExecutor). for event loop registration purposes, i was trying to find the (repo for the) async API but came up short. where can i find it? thx.

now, wrapping providers is nice and may have some benefits but the real benefit of batch processing comes from muxing over websocket(s). how do you feel about being opinionated and a) provide a ws-based batch processing solution only (once a suitable ws implementation is in place) and b) provide an example(s) in the documentation on how to wrap providers right now?

@pipermerriam
Copy link
Member

i was trying to find the (repo for the) async API but came up short. where can i find it? thx.

Is this what you are looking for?

https://docs.python.org/3.5/library/asyncio.html

how do you feel about being opinionated and a) provide a ws-based batch processing solution only (once a suitable ws implementation is in place) and b) provide an example(s) in the documentation on how to wrap providers right now?

Not sure I fully understand the question. Generally, we are going to be cautious about releasing an API that we aren't sure about because of our commitment to graceful deprecation cycle for breaking API changes.

I think the code and approach you are taking will be useful, but I'm not confident that it's something that we can release prior to having some baseline web3.async API to build on. Basically, we've removed all async from web3 to make room for a sensible async approach. We think that our plans for a web3.async namespace are that approach, however, we need the basics in place before we can start adding features like async batched requests. It's hard for me to evaluate the API your proposing without already having some basis for what it's being built upon... make sense?

@boneyard93501
Copy link
Contributor

@pipermerriam actually, i was looking for web3.async API which i gather doesn't exist just yet.

other than that, i proposed, well attempted to, to not implement any batch functionality until we know what the websocket revision looks like and instead consider addressing batch with user-side implementation examples in the documentation.

@jakublipinski
Copy link

@pipermerriam @boneyard93501 I compared the execution time between requesting the transaction receipts for a particular block synchronously and asynchronously. I used asyncio and aiohttp module for async HTTP requests. I based my code on the excellent article by @pawelmhm.
The results are very promising. When the program is run locally on a machine with a running parity node I get:

sync: 1.843s
async: 0.264s

When it's run over the Internet the results are:

sync: 8.701s
async: 0.757s

Overall I get 7x-11x execution time improvement if the requests are batched and sent asynchronously. My code below:

import timeit
import asyncio

from aiohttp import ClientSession

from web3.providers.base import JSONBaseProvider
from web3.providers import HTTPProvider
from web3 import Web3

# synchronously request receipts for given transactions
def sync_receipts(web3, transactions):
    for tran in transactions:
        web3.eth.getTransactionReceipt(tran)

# asynchronous JSON RPC API request
async def async_make_request(session, url, method, params):
    base_provider = JSONBaseProvider()
    request_data = base_provider.encode_rpc_request(method, params)
    async with session.post(url, data=request_data,
                        headers={'Content-Type': 'application/json'}) as response:
        content = await response.read()
    response = base_provider.decode_rpc_response(content)
    return response

async def run(node_address, transactions):
    tasks = []

    # Fetch all responses within one Client session,
    # keep connection alive for all requests.
    async with ClientSession() as session:
        for tran in transactions:
            task = asyncio.ensure_future(async_make_request(session, node_address,
                                                            'eth_getTransactionReceipt',[tran.hex()]))
            tasks.append(task)

        responses = await asyncio.gather(*tasks)

if __name__ == "__main__":
    eth_node_address = "http://localhost:8545"
    web3 = Web3(HTTPProvider(eth_node_address))

    block = web3.eth.getBlock(web3.eth.blockNumber)
    transactions = block['transactions']

    start_time = timeit.default_timer()
    sync_receipts(web3, transactions)
    print('sync: {:.3f}s'.format(timeit.default_timer() - start_time))

    start_time = timeit.default_timer()
    loop = asyncio.get_event_loop()
    future = asyncio.ensure_future(run(eth_node_address, transactions))
    loop.run_until_complete(future)
    print('async: {:.3f}s'.format(timeit.default_timer() - start_time))
@pipermerriam
Copy link
Member

Yes, these look like very nice performance boosts. I'd be excited to see someone start working on the web3.async API so that users can start taking advantage of the performance gains.

@medvedev1088
Copy link
Contributor

medvedev1088 commented Aug 19, 2018

@jakublipinski Have you tried batch JSON RPC requests https://www.jsonrpc.org/specification#batch? I found that it significantly boosts performance.

@voith
Copy link
Contributor

voith commented Aug 20, 2018

Looks like I had a totally incorrect understanding of this feature. I was thinking of batching in terms of concurrency, but after reading the json-rpc documentation, batching seems to mean packing several request objects into a single request.

rpc call Batch:

--> [
        {"jsonrpc": "2.0", "method": "sum", "params": [1,2,4], "id": "1"},
        {"jsonrpc": "2.0", "method": "notify_hello", "params": [7]},
        {"jsonrpc": "2.0", "method": "subtract", "params": [42,23], "id": "2"},
        {"foo": "boo"},
        {"jsonrpc": "2.0", "method": "foo.get", "params": {"name": "myself"}, "id": "5"},
        {"jsonrpc": "2.0", "method": "get_data", "id": "9"} 
    ]
<-- [
        {"jsonrpc": "2.0", "result": 7, "id": "1"},
        {"jsonrpc": "2.0", "result": 19, "id": "2"},
        {"jsonrpc": "2.0", "error": {"code": -32600, "message": "Invalid Request"}, "id": null},
        {"jsonrpc": "2.0", "error": {"code": -32601, "message": "Method not found"}, "id": "5"},
        {"jsonrpc": "2.0", "result": ["hello", 5], "id": "9"}
    ]

Thanks @medvedev1088

@Uxio0
Copy link
Contributor Author

Uxio0 commented Aug 20, 2018

@medvedev1088 @voith nice! That's what I meant from the beginning. I got a really good performance doing that

@medvedev1088
Copy link
Contributor

@Uxio0 @voith Any ideas how it can be added to web3.py? I'm afraid it will require substantial refactoring.

@pipermerriam
Copy link
Member

Just noting that this fits nicely with the new Method class that is used to implement the various APIs. I think we now have a much firmer foundation to support this functionality if anyone wants to try to take a stab at it.

@LefterisJP
Copy link

Hey guys any news on this one? Any plans to implement it?

Kind of sucks to have to use the multicall contract to do batch queries on the node when the nodes itself allows batch queries.

The js guys already have this functionality for years now and are gonna be making fun of us pythonistas 🐍

@Pet3ris
Copy link

Pet3ris commented Apr 9, 2021

Couple of thoughts:

  • First, I really like the proposal and have an immediate need for this feature
  • I think @pipermerriam proposal makes a lot of sense
  • I wonder what the implications are for exception handling, how easy will it be to disambiguate which request was problematic
  • Finally, I'm relying on make_request for request forwarding so would be nice if that didn't break during the refactor :)
@kclowes
Copy link
Collaborator

kclowes commented Apr 9, 2021

@Pet3ris or anyone else on here - if you need this and want to get it rolling, I can definitely help out! I want to finish up a few other things (primarily getting async off the ground) before I put this on my immediate queue, but I can definitely provide feedback and review for anyone wanting to do the legwork now.

I wonder what the implications are for exception handling, how easy will it be to disambiguate which request was problematic

I think this will be a challenge, but I like @Uxio0's implementation where you pass in the function name, and a param to either raise the exception or not. I think that provides a good start and we can iterate from there.

Finally, I'm relying on make_request for request forwarding so would be nice if that didn't break during the refactor :)

Yeah, let's not break make_request! I think it would be better to add a new method like make_batch_request or something.

@carver
Copy link
Collaborator

carver commented Apr 9, 2021

I think this will be a challenge, but I like @Uxio0's implementation where you pass in the function name, and a param to either raise the exception or not. I think that provides a good start and we can iterate from there.

Yeah, that plus Piper's seems like a reasonable combo to me.

It seems okay to just allow any exceptions to bubble up normally, though maybe we could wrap it in an exception that identifies the index of the call that raised it? Someday there will be something Exception Groups (PEP 654), but until then we can do the dumb simple thing.

@apljungquist
Copy link

If it's of any use I'd be happy to clean up this stop-gap solution that I hacked together. It does not expose any async interfaces but it seems to enable the websocket provider to be used with a threadpool executor thus allowing multiple rpcs to be in flight without the overhead of making individual http requests.

import os
import asyncio
import concurrent.futures
import contextlib
import json
import threading
import time
from typing import Any, Optional, Union

import web3
from eth_typing import URI
from web3 import HTTPProvider
from web3.providers.websocket import (
    DEFAULT_WEBSOCKET_TIMEOUT,
    PersistentWebSocket,
    WebsocketProvider,
)
from web3.types import RPCResponse, RPCEndpoint


class Multiplexer:
    @classmethod
    async def new(cls, conn: PersistentWebSocket):
        return cls(await conn.__aenter__(), asyncio.Queue())

    def __init__(self, conn, queue):
        self._conn = conn
        self._queue = queue
        self._events = {}
        self._responses = {}

    async def send(self):
        while True:
            data = await self._queue.get()
            await self._conn.send(data)

    async def recv(self):
        while True:
            data = await self._conn.recv()
            id = json.loads(data)["id"]  # TODO: error handling?
            self._responses[id] = data
            self._events.pop(id).set()

    async def run(self):
        await asyncio.gather(
            self.send(),
            self.recv(),
        )

    def __call__(self, data):
        event = threading.Event()
        id = json.loads(data)["id"]
        self._events[id] = event
        self._queue.put_nowait(data)
        event.wait()
        return self._responses.pop(id)


class MultiplexingWebsocketProvider(WebsocketProvider):
    def __init__(
        self,
        endpoint_uri: Optional[Union[URI, str]] = None,
        websocket_kwargs: Optional[Any] = None,
        websocket_timeout: int = DEFAULT_WEBSOCKET_TIMEOUT,
    ) -> None:
        super().__init__(endpoint_uri, websocket_kwargs, websocket_timeout)
        self._multiplexer = None
        self._multiplexer_fut = None

    def make_request(self, method: RPCEndpoint, params: Any) -> RPCResponse:
        request_data = self.encode_rpc_request(method, params)
        if self._multiplexer is None:
            assert self._multiplexer_fut is None
            self._multiplexer = asyncio.run_coroutine_threadsafe(
                Multiplexer.new(self.conn),
                self._loop,
            ).result()
            self._multiplexer_fut = asyncio.run_coroutine_threadsafe(
                self._multiplexer.run(),
                self._loop,
            )  # TODO: stop properly
        return json.loads(self._multiplexer(request_data))


@contextlib.contextmanager
def timer():
    t0 = time.perf_counter()
    yield
    t1 = time.perf_counter()
    print(t1 - t0)


def stress(n):
    w3 = web3.Web3(MultiplexingWebsocketProvider(os.environ["ETH_NODE_WSS"]))

    # some dummy load that uses the RPC interfaces
    def call(_):
        nonlocal w3
        return w3.eth.block_number

    with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor:
        list(executor.map(call, range(n)))


if __name__ == "__main__":
    with timer():
        stress(200)
@rockkoca
Copy link

rockkoca commented Sep 5, 2021

If it's of any use I'd be happy to clean up this stop-gap solution that I hacked together. It does not expose any async interfaces but it seems to enable the websocket provider to be used with a threadpool executor thus allowing multiple rpcs to be in flight without the overhead of making individual http requests.

import os
import asyncio
import concurrent.futures
import contextlib
import json
import threading
import time
from typing import Any, Optional, Union

import web3
from eth_typing import URI
from web3 import HTTPProvider
from web3.providers.websocket import (
    DEFAULT_WEBSOCKET_TIMEOUT,
    PersistentWebSocket,
    WebsocketProvider,
)
from web3.types import RPCResponse, RPCEndpoint


class Multiplexer:
    @classmethod
    async def new(cls, conn: PersistentWebSocket):
        return cls(await conn.__aenter__(), asyncio.Queue())

    def __init__(self, conn, queue):
        self._conn = conn
        self._queue = queue
        self._events = {}
        self._responses = {}

    async def send(self):
        while True:
            data = await self._queue.get()
            await self._conn.send(data)

    async def recv(self):
        while True:
            data = await self._conn.recv()
            id = json.loads(data)["id"]  # TODO: error handling?
            self._responses[id] = data
            self._events.pop(id).set()

    async def run(self):
        await asyncio.gather(
            self.send(),
            self.recv(),
        )

    def __call__(self, data):
        event = threading.Event()
        id = json.loads(data)["id"]
        self._events[id] = event
        self._queue.put_nowait(data)
        event.wait()
        return self._responses.pop(id)


class MultiplexingWebsocketProvider(WebsocketProvider):
    def __init__(
        self,
        endpoint_uri: Optional[Union[URI, str]] = None,
        websocket_kwargs: Optional[Any] = None,
        websocket_timeout: int = DEFAULT_WEBSOCKET_TIMEOUT,
    ) -> None:
        super().__init__(endpoint_uri, websocket_kwargs, websocket_timeout)
        self._multiplexer = None
        self._multiplexer_fut = None

    def make_request(self, method: RPCEndpoint, params: Any) -> RPCResponse:
        request_data = self.encode_rpc_request(method, params)
        if self._multiplexer is None:
            assert self._multiplexer_fut is None
            self._multiplexer = asyncio.run_coroutine_threadsafe(
                Multiplexer.new(self.conn),
                self._loop,
            ).result()
            self._multiplexer_fut = asyncio.run_coroutine_threadsafe(
                self._multiplexer.run(),
                self._loop,
            )  # TODO: stop properly
        return json.loads(self._multiplexer(request_data))


@contextlib.contextmanager
def timer():
    t0 = time.perf_counter()
    yield
    t1 = time.perf_counter()
    print(t1 - t0)


def stress(n):
    w3 = web3.Web3(MultiplexingWebsocketProvider(os.environ["ETH_NODE_WSS"]))

    # some dummy load that uses the RPC interfaces
    def call(_):
        nonlocal w3
        return w3.eth.block_number

    with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor:
        list(executor.map(call, range(n)))


if __name__ == "__main__":
    with timer():
        stress(200)

It's great! But, if you increase the n to 400+ for stress function, it might hang forever.

@Uxio0
Copy link
Contributor Author

Uxio0 commented Oct 4, 2021

As @rockkoca says, when increasing the number of requests to the nodes it can hang forever or even hang the node itself.

I've found out that batch_requests are not very well handled by the nodes, and there's even some node services that don't enable batch requests. For our use case I decided to implement multicall support and it's really really fast compared to doing calls using batch_requests (orders of magnitude).

For reference, here's the implementation: https://github.com/gnosis/gnosis-py/blob/master/gnosis/eth/multicall.py and here the guide on how to use it

@Cyboria
Copy link

Cyboria commented Oct 26, 2021

@Uxio0 Great piece of code.
Is there an option to use the multicall function on an async connection (using AsyncHTTPProvider)

For example:
async_connection = Web3(AsyncHTTPProvider('rpc_url'), modules={'eth': (AsyncEth,), 'net': (AsyncNet,)}, middlewares=[])

And then have something like:
name, symbol = await ethereum_client.batch_call([ erc721_contract.functions.name(), erc721_contract.functions.symbol(), ])

@Uxio0
Copy link
Contributor Author

Uxio0 commented Oct 26, 2021

@Uxio0 Great piece of code. Is there an option to use the multicall function on an async connection (using AsyncHTTPProvider)

Hi, thanks. No, there's not (yet). I'm using async http via gevent patching, but not using the native async provider.

@wuya666
Copy link

wuya666 commented Apr 4, 2022

Just my two cents, I think the web3.js implementation of batch request utilizing the ETH nodes native json-rpc batch request capabilities have some distinct advantages:

  1. Compared to utilizing client-side concurrency with asyncio or something, the node's native batch request is vastly superior in terms of performance. The time to execute a couple thousands requests is basically the same as executing a single request. The performance gain of utilizing client-side concurrency is nowhere near that.
  2. Compared to the multicall contract method, it's not limited by gas restrictions. Yes even reading contract will cost some "virtual" gas and depending on the complexity of the contract logic, batching a couple hundreds calls may exceed the limit. I think the json-rpc batch request doesn't have this kind of limit.
  3. Also it's more versatile than the multicall contract method, you can practically batch any json-rpc request, like getting transaction receipts, checking eth balance at different block heights, get block info, etc. etc. which are impossible to accomplish with the multicall contract method.

I think a batch request functionality similar to the web3.js implementation would be great for web3.py, I use it regularly with frontend DApps and nodejs backends to vastly improve the performance when reading data from contracts.

@sandybradley
Copy link

https://github.com/Narasimha1997/aio-eth

Nice lib implements batch and concurrent requests

@Fooyao
Copy link

Fooyao commented May 25, 2023

Five years on and still no progress on this issue.

@shaojiankui
Copy link

need this future

@Uxio0
Copy link
Contributor Author

Uxio0 commented Sep 22, 2023

If you need this feature right now, safe-eth-py supports batching and multicall: https://safe-eth-py.readthedocs.io/en/latest/quickstart.html#gnosis-eth

@dreadedhamish
Copy link

If you need this feature right now, safe-eth-py supports batching and multicall: https://safe-eth-py.readthedocs.io/en/latest/quickstart.html#gnosis-eth

I can see how 'safe-eth-py' can batch contract calls, but can it batch web3 calls like get_block?

@Uxio0
Copy link
Contributor Author

Uxio0 commented Oct 14, 2023

I can see how 'safe-eth-py' can batch contract calls, but can it batch web3 calls like get_block?

There's not an API to batch different type of requests, but indeed there's a get_blocks method allowing you to get multiple block in the same request.

@acemasterjb
Copy link

Hello,

would anyone have a solution for performing batch operations where the block_identifier is parameterized?

E.g., for

# Get the results of past calls to Chainlink EACAggregatorProxy.latestAnswer
latest_answer = chainlink_price_feed_contract.functions.latestAnswer().call(
    block_identifier=block_identifier
)

I would like to give a different block_identifier for a given range of block heights/numbers. Is this possible with batch_requests?

I guess I'm a bit confused about what qualifies as Method[Callable[..., Any]] which is accepted by batch_request().add.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment