[infinispan-dev] Rethinking asynchronism in Infinispan

Manik Surtani manik at jboss.org
Wed Jan 13 12:56:15 EST 2010


On 13 Jan 2010, at 17:13, Bela Ban wrote:

> 
> 
> Manik Surtani wrote:
>> So I've been spending some time thinking about how we deal with async 
>> tasks in Infinispan, both from an API perspective as well as an 
>> implementation detail, and wanted to throw a few ideas out there.
>> 
>> First, lets understand the 4 most expensive things that happen in 
>> Infinispan, either simply expensive or things that could block under 
>> high contention (in descending order): RPC calls, marshalling, 
>> CacheStore and locking
> 
> 
> In my experience, RPC calls (at least async ones) should be much less 
> costly than CacheStore and *un*-marshalling (marshalling *is* usually 
> fast). Re CacheStore: writing / reading to a disk is slower than even a 
> network round trip.

Well, it depends on the case.  Network saturation, load and which handles concurrency better.  But either way, expensive things that can and should be parallelized.

> We deal with asynchronism in a somewhat haphazard way at the moment, 
>> each of these functions receiving a somewhat different treatment:
>> 
>> 1) RPC: Using JGroups' ResponseMode of waiting for none.
>> 2) Marshalling: using an async repl executor to take this offline
> 
> The problem here is that you're pushing the problem of marshalling 
> further down the line. *Eventually* data has to be marshalled, and 
> somebody *has* to block ! IIRC, you used a bounded queue to place 
> marshalling tasks onto, so for load peaks this was fine, but for 
> constant high load, someone will always block on the (full) queue.

That's where it happens right now.  Just before RPC.  Now perhaps there is some sense in understanding that more than one subsystem may need a marshalled representation of an entry (e.g., the RpcManager to push across the wire, as well as a CacheStore to persist to disk or network again), so this could happen prior to either of these calls.  And also useful to note - as you mention later - that UNmarshalling is often far slower than marshalling, so any sync network calls should not wait for entries to be unmarshalled on the remote end.  We provide for this to some degree with the lazyDeserialization config element which makes use of MarshalledValues.  It could just be better integrated with the rest of what we are doing re: async.

> 
>> 3) Use an AsyncStore wrapper which places tasks in an executor
> 
> Similar issue to above: at some point the thread pool might be full. 
> Then you need to start discarding tasks, or block.

Yes.  And this is where the discussion of Executors come in.  We currently allow folks to provide separate Executors for each of these subsystems where needed, and the defaults use JDK fixed thread pool executors.  Perhaps a more sensible default is a common JDK cached thread pool executor for *all* of them to share?

> Both (2) and (3) handle temporary spikes well though...

Yup.

> 
>> 4) Nothing
>> 
>> and to add to it,
>> 
>> 5) READs are never asynchronous. E.g., no such thing as an async GET - 
>> even if it entails RPC or a CacheStore lookup (which may be a remote 
>> call like S3!)
>> 
>> The impact of this approach is that end users never really get to 
>> benefit from the general asynchronism in place
> 
> What is this general asynchronism ? From my view of Infinispan, I don't 
> see a bias towards asynchronous execution, but I see an API which 
> supports both async and sync execution.

The APIs support both, but in reality the async API (e.g., putAsync) is not truly non-blocking.  The call may block acquiring a lock, and may suffer writing to a CacheStore.  It is (currently) only the RPC that is taken offline.  Here is where I think we can do better.

> 
>> and still needs to configure stuff in several different places. And 
>> internally, it makes dealing with internal APIs hard.
>> 
>> Externally, this is quite well encapsulated in the Cache interface, by 
>> offering async methods such as putAsync(), etc. so there would be 
>> little to change here. They return Futures, parameterized to the 
>> actual method return type, e.g., putAsync returns Future<V> in a 
>> parameterized Cache<K, V>. (More precisely, they return a 
>> NotifyingFuture, a sub-interface of Future that allows attaching 
>> listeners, but that's a detail.)
>> 
>> So I think we should start with that. The user receives a Future. This 
>> Future is an aggregate Future, which should aggregate and block based 
>> on several sub-Futures, one for each of the tasks (1 ~ 4) outlined 
>> above. Now what is the impact of this? Designing such a Future is easy 
>> enough, but how would this change internal components?
> 
> Are you suggesting the external API remains the same, but internally 
> futures are used ? Or do you suggest to make use of futures mandatory ?

Externally, the async API should take more tasks offline.  We don't do enough in parallel IMO.

Internally, all internal APIs should be async.  And all should return futures.  E.g., the RpcManager should expose:

	Future<List<Response>> RpcManager.invokeRemotely(.... );

CacheStores should expose:

	Future<Void> CacheStore.store(CacheEntry e);
	Future<CacheEntry> CacheStore.load(Object key);
	... etc ...

This way, the InvocationInterceptor can decide whether the original call was a sync API call ( in which case it would do a Future.get() on all subsystem futures ) or an async API call (in which case it would create an aggregating Future to wrap all subsystem Futures and return this).


> Can you show a pesudo code sample ?
> 
> 
>> 1) RPC. Async RPC is, IMO, broken at the moment. It is unsafe in that 
>> it offers no guarantees that the calls are received by the recipient
> 
> No, JGroups guarantees message delivery !

How does the caller know this though?  castMessage() with GroupRequest.GET_NONE doesn't provide for a feedback loop.

> Besides that, async APIs *are* 
> by definition fire-and-forget (JMS topics), so IMO this is not broken !
> 
> Or do you have something akin to persistent JMS messages in mind ?

No, async (IMO) != fire-and-forget.  Its just that they happen offline, potentially in parallel, and can be checked for at a later point in time.  Think NIO.

> 
> 
>> and you have no way of knowing.
> 
> Yes, but that's the name of the game with *async* RPCs ! If you want to 
> know, use sync RPCs...

Aha, but I can't do multiple sync RPCs (even to different recipients) in the same thread of execution, in parallel.  I would have to do these sequentially then.

> 
>> So RPC should always be synchronous, but wrapped in a Future so that 
>> it is taken offline and the Future can be checked for success
> 
> Who would check on the future, e.g. pseudo code like this:
> 
> Future<String> future=cache.putWithFuture("name", "value");
> String prev_val=future.get();
> 
> doesn't help, as there is no work done before the future is checked.

The benefit is doing stuff like this.  Executed on Node1:

// I need guarantees for these calls.  I cannot handle the uncertainty of a put not succeeding somewhere.  But I need them to run in parallel, for performance.

	Future f1 = cache.putAsync(k1, v1);  // k1 is mapped to Node2, Node3
	Future f2 = cache.putAsync(k2, v2);  // k2 is mapped to Node4, Node5
	Future f3 = cache.putAsync(k3, v3);  // k3 is mapped to Node6, Node7

	f1.get();
	f2.get();
	f3.get();

The above should be quicker than doing:

	cache.put(k1, v1);
	cache.put(k2, v2);
	cache.put(k3, v3);

since all of RPC, marshalling, cache stores, and locking can happen in parallel in the former case, better utilizing multicore CPUs, etc etc.

> Sync RPCs are a magnitude slower than async ones, so unless you call 
> 1000 sync RPCs, get 1000 futures and then check on the futures, I don't 
> see the benefits of this. And code like this will be harder to write.

Harder to write, as in implementing it in Infinispan, or for end-users using the API?

> 
>> 2) Marshalling happens offline at the moment and a Future is returned 
>> as it stands, but this could probably be combined with RPC to a single 
>> Future since this step is logically before RPC, and RPC relies on 
>> marshalling to complete
> 
> Interesting, can you elaborate more ?

So right now we have 4 RPC modes

public enum ResponseMode {
   SYNCHRONOUS,
   ASYNCHRONOUS,
   ASYNCHRONOUS_WITH_SYNC_MARSHALLING,
   WAIT_FOR_VALID_RESPONSE;
}

These roughly translate to:

1.  SYNCHRONOUS -> GroupRequest.GET_ALL
2.  ASYNCHRONOUS -> GroupRequest.GET_NONE
3.  ASYNCHRONOUS_WITH_SYNC_MARSHALLING -> GroupRequest.GET_NONE
4.  WAIT_FOR_VALID_RESPONSE -> GroupRequest.GET_FIRST

1 is pretty straightforward.  With 2, the marshalling of the ReplicableCommand and the castMessage() call are handed off to an Executor.  With 3, the marshalling and castMessage() happen in the caller's thread - the marshalling will take time but the castMessage() doesn't block since we use GroupRequest.GET_NONE.  4 is straightforward, it is a sync call with a RspFilter which returns the moment we get the first valid response.

What I am proposing is that we change to have just 1 response mode.  We *always* use GroupRequest.GET_ALL - unless an RspFilter is provided in which case it is GET_FIRST.  Both the marshalling and the castMessage() is always handed off to an Executor, the call hence never blocks, and always returns a Future.  (Or is there a better way to do this, to harness inbuilt asynchronicity in JGroups as Krzysztof suggested?)

This way, propagating all the way back up to the Infinispan public async API, users get a non-blocking call, with the guarantees of being able to poll the future to ensure the RPC actually happens successfully.  If they care.  Or they could throw away the Future, which would result in an "unreliable async" mode.

Cheers
Manik

--
Manik Surtani
manik at jboss.org
Lead, Infinispan
Lead, JBoss Cache
http://www.infinispan.org
http://www.jbosscache.org








More information about the infinispan-dev mailing list