[infinispan-dev] distributed fork-join executor prototype

Paolo Romano romano at inesc-id.pt
Mon Mar 4 07:07:08 EST 2013

This sounds really interesting Matt. In the Cloud-TM project 
(www.cloudtm.eu) we are currently developing a parallel graph-analysis 
algorithm on top of Infinispan's DEF. I would be really curious to take 
a look at the framework you developed, and see how it could be exploited 
in our application.




Paolo Romano, PhD
Coordinator of the Cloud-TM ICT FP7 Project (www.cloudtm.eu)
Senior Researcher @ INESC-ID (www.inesc-id.pt)
Assistant Professor @ Instituto Superior Tecnico (www.ist.utl.pt)
Rua Alves Redol, 9
1000-059, Lisbon Portugal
Tel. + 351 21 3100300
Fax  + 351 21 3145843
Webpage http://www.gsd.inesc-id.pt/~romanop

On 3/2/13 5:40 PM, matt hoffman wrote:
> Hey guys,
> I've been working on a prototype of integrating Infinispan into our 
> app.  We do a lot of distributed processing across a small cluster, so 
> I've played with Infinispan's existing distributed execution framework 
> (which is nice), as well as using Infinispan alongside a normal 
> message queue to distribute tasks.  But I've also put together a 
> prototype of a new distributed execution framework using fork-join 
> pools that you all might be interested in.  If it sounds like 
> something that would be worthwhile for Infinispan, I can raise a Jira 
> and submit a pull request with what I have so far. I'd need to get the 
> CA and company policy stuff finalized; that might take a couple days. 
> Meanwhile, in case there is any interest, I've described the approach 
> I've taken below.
> First, a little background:
> A while back I worked on a side project that integrated a distributed 
> work-stealing algorithm into the standard JDK fork-join queue.  It 
> used JGroups for communication, because it was quick and easy for 
> prototyping. So this week I thought i'd take a stab at porting that 
> over to Infinispan. The algorithm I came up with for Infinispan is a 
> bit less of a work-stealing algorithm, to take advantage of 
> Infinispan's built-in distribution capabilities, but I think it's 
> still fairly efficient.
> My basic approach was to take in a cache in the constructor, much like 
> the existing distributed executor, and then create a parallel, 
> DIST-mode cache that uses the same hash & grouping configuration as 
> the original cache. That new parallel cache is the "task cache", and 
> we use that to distribute available tasks across the cluster. It's a 
> distributed cache so that tasks are partitioned across a large 
> cluster, and it uses the hashing config of the original cache and a 
> KeyAffinityService to attempt to distribute the tasks to the same 
> nodes that contain the data being worked on. Nodes use cache listeners 
> to be notified when there is new work available, and the atomic 
> replace() to "check out" the tasks for execution, and "check in" the 
> results.
> The basic algorithm is something like this:
> For a refresher, a normal FJ pool has a fork() method that takes in a 
> task, and then places that task on an internal queue (actually, one of 
> several queues). When threads are idle, they look to the nearest work 
> queue for work. If that work queue does not have work, they "steal" 
> work from another thread's queue. So in the best case, tasks remain on 
> the same thread as the task that spawned them, so tasks that process 
> the same data as their parents may still have that data in the CPU's 
> cache, etc. There's more to it than that, but that's the basic idea.
> This distributed algorithm just adds an extra layer on top for tasks 
> that are marked "distributable" (by extending DistributedFJTask 
> instead of the normal ForkJoinTask). When you call fork() with a 
> DistributedFJTask, it first checks to see if the local pool's work 
> queue is empty. If so, we just go ahead and submit it locally; there's 
> no reason to distribute it. If not, we put the task in the task cache, 
> and let Infinispan distribute it. When a node has no more work to do 
> in its internal fork-join queues, it looks at the task cache and tries 
> to pull work from there.
> So, it isn't really a "work-stealing" algorithm, per se; the 
> distributable tasks are being distributed eagerly using Infinispan's 
> normal cache distribution. But I'm hoping that doing that also makes 
> it easier to handle node failure, since nodes collectively share a 
> common picture of the work to be done.
> This approach required one change to the actual FJ classes themselves 
> (in org.infinispan.util.concurrent.jdk8backported). That's probably 
> the most controversial change. I had to make the original 
> ForkJoinTask's fork() method non-final in order to extend it cleanly.  
> There's probably a way around that, but that's the cleanest option I 
> have thought of thus far.
> And lastly, it's not done yet: basic task distribution is working, but 
> I haven't tackled failover to any real extent yet. The biggest 
> questions, though, are around what to do with the existing distributed 
> execution interfaces. For example, DistributedTask has a getCallable() 
> method because it assumes it's wrapping a Callable. But ForkJoinTasks 
> don't extend Callable. I could put in a shim to wrap the 
> DistributedFJTasks into Callables for the sake of that method, but I 
> don't know if it's worth it.  Similarly, the 
> DistributedExecutorService interface exposes a lot of 
> submit-to-specific-address or submit-to-all-addresses methods, which 
> are an odd fit here since tasks are distributed via their own cache.  
> Even if I used a KeyAffinityService to target the task to the given 
> Address, it might get picked up by another node that shares that same 
> hash. But I can add in a direct-to-single-Address capability in if 
> that seems worthwhile. Alternately, I can just use entirely different 
> interfaces (DistributedFJExecutorService, DistributedFJTask?).
> Thoughts?  Concerns?  Glaring issues?
> _______________________________________________
> infinispan-dev mailing list
> infinispan-dev at lists.jboss.org
> https://lists.jboss.org/mailman/listinfo/infinispan-dev

-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.jboss.org/pipermail/infinispan-dev/attachments/20130304/f873186c/attachment-0001.html 

More information about the infinispan-dev mailing list