On 13 Jan 2010, at 17:13, Bela Ban wrote:
Manik Surtani wrote:
> So I've been spending some time thinking about how we deal with async
> tasks in Infinispan, both from an API perspective as well as an
> implementation detail, and wanted to throw a few ideas out there.
>
> First, lets understand the 4 most expensive things that happen in
> Infinispan, either simply expensive or things that could block under
> high contention (in descending order): RPC calls, marshalling,
> CacheStore and locking
In my experience, RPC calls (at least async ones) should be much less
costly than CacheStore and *un*-marshalling (marshalling *is* usually
fast). Re CacheStore: writing / reading to a disk is slower than even a
network round trip.
Well, it depends on the case. Network saturation, load and which handles concurrency
better. But either way, expensive things that can and should be parallelized.
We deal with asynchronism in a somewhat haphazard way at the moment,
> each of these functions receiving a somewhat different treatment:
>
> 1) RPC: Using JGroups' ResponseMode of waiting for none.
> 2) Marshalling: using an async repl executor to take this offline
The problem here is that you're pushing the problem of marshalling
further down the line. *Eventually* data has to be marshalled, and
somebody *has* to block ! IIRC, you used a bounded queue to place
marshalling tasks onto, so for load peaks this was fine, but for
constant high load, someone will always block on the (full) queue.
That's where it happens right now. Just before RPC. Now perhaps there is some sense
in understanding that more than one subsystem may need a marshalled representation of an
entry (e.g., the RpcManager to push across the wire, as well as a CacheStore to persist to
disk or network again), so this could happen prior to either of these calls. And also
useful to note - as you mention later - that UNmarshalling is often far slower than
marshalling, so any sync network calls should not wait for entries to be unmarshalled on
the remote end. We provide for this to some degree with the lazyDeserialization config
element which makes use of MarshalledValues. It could just be better integrated with the
rest of what we are doing re: async.
> 3) Use an AsyncStore wrapper which places tasks in an executor
Similar issue to above: at some point the thread pool might be full.
Then you need to start discarding tasks, or block.
Yes. And this is where the discussion of Executors come in. We currently allow folks to
provide separate Executors for each of these subsystems where needed, and the defaults use
JDK fixed thread pool executors. Perhaps a more sensible default is a common JDK cached
thread pool executor for *all* of them to share?
Both (2) and (3) handle temporary spikes well though...
Yup.
> 4) Nothing
>
> and to add to it,
>
> 5) READs are never asynchronous. E.g., no such thing as an async GET -
> even if it entails RPC or a CacheStore lookup (which may be a remote
> call like S3!)
>
> The impact of this approach is that end users never really get to
> benefit from the general asynchronism in place
What is this general asynchronism ? From my view of Infinispan, I don't
see a bias towards asynchronous execution, but I see an API which
supports both async and sync execution.
The APIs support both, but in reality the async API (e.g., putAsync) is not truly
non-blocking. The call may block acquiring a lock, and may suffer writing to a
CacheStore. It is (currently) only the RPC that is taken offline. Here is where I think
we can do better.
> and still needs to configure stuff in several different places. And
> internally, it makes dealing with internal APIs hard.
>
> Externally, this is quite well encapsulated in the Cache interface, by
> offering async methods such as putAsync(), etc. so there would be
> little to change here. They return Futures, parameterized to the
> actual method return type, e.g., putAsync returns Future<V> in a
> parameterized Cache<K, V>. (More precisely, they return a
> NotifyingFuture, a sub-interface of Future that allows attaching
> listeners, but that's a detail.)
>
> So I think we should start with that. The user receives a Future. This
> Future is an aggregate Future, which should aggregate and block based
> on several sub-Futures, one for each of the tasks (1 ~ 4) outlined
> above. Now what is the impact of this? Designing such a Future is easy
> enough, but how would this change internal components?
Are you suggesting the external API remains the same, but internally
futures are used ? Or do you suggest to make use of futures mandatory ?
Externally, the async API should take more tasks offline. We don't do enough in
parallel IMO.
Internally, all internal APIs should be async. And all should return futures. E.g., the
RpcManager should expose:
Future<List<Response>> RpcManager.invokeRemotely(.... );
CacheStores should expose:
Future<Void> CacheStore.store(CacheEntry e);
Future<CacheEntry> CacheStore.load(Object key);
... etc ...
This way, the InvocationInterceptor can decide whether the original call was a sync API
call ( in which case it would do a Future.get() on all subsystem futures ) or an async API
call (in which case it would create an aggregating Future to wrap all subsystem Futures
and return this).
Can you show a pesudo code sample ?
> 1) RPC. Async RPC is, IMO, broken at the moment. It is unsafe in that
> it offers no guarantees that the calls are received by the recipient
No, JGroups guarantees message delivery !
How does the caller know this though? castMessage() with GroupRequest.GET_NONE
doesn't provide for a feedback loop.
Besides that, async APIs *are*
by definition fire-and-forget (JMS topics), so IMO this is not broken !
Or do you have something akin to persistent JMS messages in mind ?
No, async (IMO) != fire-and-forget. Its just that they happen offline, potentially in
parallel, and can be checked for at a later point in time. Think NIO.
> and you have no way of knowing.
Yes, but that's the name of the game with *async* RPCs ! If you want to
know, use sync RPCs...
Aha, but I can't do multiple sync RPCs (even to different recipients) in the same
thread of execution, in parallel. I would have to do these sequentially then.
> So RPC should always be synchronous, but wrapped in a Future so that
> it is taken offline and the Future can be checked for success
Who would check on the future, e.g. pseudo code like this:
Future<String> future=cache.putWithFuture("name", "value");
String prev_val=future.get();
doesn't help, as there is no work done before the future is checked.
The benefit is doing stuff like this. Executed on Node1:
// I need guarantees for these calls. I cannot handle the uncertainty of a put not
succeeding somewhere. But I need them to run in parallel, for performance.
Future f1 = cache.putAsync(k1, v1); // k1 is mapped to Node2, Node3
Future f2 = cache.putAsync(k2, v2); // k2 is mapped to Node4, Node5
Future f3 = cache.putAsync(k3, v3); // k3 is mapped to Node6, Node7
f1.get();
f2.get();
f3.get();
The above should be quicker than doing:
cache.put(k1, v1);
cache.put(k2, v2);
cache.put(k3, v3);
since all of RPC, marshalling, cache stores, and locking can happen in parallel in the
former case, better utilizing multicore CPUs, etc etc.
Sync RPCs are a magnitude slower than async ones, so unless you call
1000 sync RPCs, get 1000 futures and then check on the futures, I don't
see the benefits of this. And code like this will be harder to write.
Harder to write, as in implementing it in Infinispan, or for end-users using the API?
> 2) Marshalling happens offline at the moment and a Future is returned
> as it stands, but this could probably be combined with RPC to a single
> Future since this step is logically before RPC, and RPC relies on
> marshalling to complete
Interesting, can you elaborate more ?
So right now we have 4 RPC modes
public enum ResponseMode {
SYNCHRONOUS,
ASYNCHRONOUS,
ASYNCHRONOUS_WITH_SYNC_MARSHALLING,
WAIT_FOR_VALID_RESPONSE;
}
These roughly translate to:
1. SYNCHRONOUS -> GroupRequest.GET_ALL
2. ASYNCHRONOUS -> GroupRequest.GET_NONE
3. ASYNCHRONOUS_WITH_SYNC_MARSHALLING -> GroupRequest.GET_NONE
4. WAIT_FOR_VALID_RESPONSE -> GroupRequest.GET_FIRST
1 is pretty straightforward. With 2, the marshalling of the ReplicableCommand and the
castMessage() call are handed off to an Executor. With 3, the marshalling and
castMessage() happen in the caller's thread - the marshalling will take time but the
castMessage() doesn't block since we use GroupRequest.GET_NONE. 4 is straightforward,
it is a sync call with a RspFilter which returns the moment we get the first valid
response.
What I am proposing is that we change to have just 1 response mode. We *always* use
GroupRequest.GET_ALL - unless an RspFilter is provided in which case it is GET_FIRST.
Both the marshalling and the castMessage() is always handed off to an Executor, the call
hence never blocks, and always returns a Future. (Or is there a better way to do this, to
harness inbuilt asynchronicity in JGroups as Krzysztof suggested?)
This way, propagating all the way back up to the Infinispan public async API, users get a
non-blocking call, with the guarantees of being able to poll the future to ensure the RPC
actually happens successfully. If they care. Or they could throw away the Future, which
would result in an "unreliable async" mode.
Cheers
Manik
--
Manik Surtani
manik(a)jboss.org
Lead, Infinispan
Lead, JBoss Cache
http://www.infinispan.org
http://www.jbosscache.org