[infinispan-dev] distributed fork-join executor prototype

Paolo Romano romano at inesc-id.pt
Fri May 10 19:23:49 EDT 2013


Very interesting stuff Matt!

I find the design that leverages on Infinispan for distributing the 
tasks - while maximizing locality and exploiting Infinispan's 
fault-tolerance capabilities - modular and elegant. It will be 
interesting to evaluate if there is a significant overhead with respect 
to a purely JGroups-based mechanism.

We will be looking at your code, and may pheraps come up with some 
use-cases.

Cheers,

     Paolo

On 5/9/13 10:52 AM, Manik Surtani wrote:
> Hi Matt.
>
> Thanks for resurrecting and sorry for not responding on the original 
> thread.
>
> Interesting discussions.  I would intuitively agree with Bela were it 
> not for the fact that, as you said, you're actually storing data, 
> require high availability of that data during server failure, and may 
> possibly chain tasks (as per a local fork-join) and want to maintain 
> server affinity across related tasks.
>
> You could achieve all of this with JGroups but would end up 
> re-implementing certain bits of Infinispan, specifically:
> - task failover
> - task cancelling
> - consistent hash based node selection for delivery, ensuring affinity 
> across data and tasks
>
> So I think it is a pretty valid use case to make use of what 
> Infinispan already has.
>
> I'll have a look at your sources and comment more.  I'm sure Paolo and 
> others at CloudTM will probably do the same.
>
> Cheers
> Manik
>
> On 9 May 2013, at 01:50, Matt Hoffman <matt at mhoffman.org 
> <mailto:matt at mhoffman.org>> wrote:
>
>> Resurrecting this topic:  I've put some sample code on my fork here: 
>> https://github.com/matthoffman/infinispan/tree/dfj
>> There's a README there that offers an overview. I hope to do some 
>> performance comparisons over the weekend, but in the meantime, the 
>> code is there if you're curious.
>>
>> I'm mainly interested whether this is the type of thing that 
>> Infinispan may be interested in, as an alternative to the current 
>> distributed executor (much like the ForkJoinPool in the JDK is an 
>> alternative to a traditional thread pool).
>> If so, the next steps would be to do some performance tests, pick one 
>> implementation to move forward with (out of the 3 prototypes on that 
>> branch) and clean it up sufficiently to be considered for a pull 
>> request.
>>
>> If its not something that Infinispan is interested in, then I'll 
>> change approaches and make it more generic, so that it isn't 
>> Infinispan-specific and can be used with other transports. I'm open 
>> to either option... I think there are pros and cons either way.
>>
>>
>> Thanks!
>>
>>
>> matt
>>
>>
>>
>> On Mon, Mar 4, 2013 at 7:07 AM, Paolo Romano <romano at inesc-id.pt 
>> <mailto:romano at inesc-id.pt>> wrote:
>>
>>     This sounds really interesting Matt. In the Cloud-TM project
>>     (www.cloudtm.eu <http://www.cloudtm.eu/>) we are currently
>>     developing a parallel graph-analysis algorithm on top of
>>     Infinispan's DEF. I would be really curious to take a look at the
>>     framework you developed, and see how it could be exploited in our
>>     application.
>>
>>     Regards,
>>
>>         Paolo
>>
>>
>>     -- 
>>
>>     Paolo Romano, PhD
>>     Coordinator of the Cloud-TM ICT FP7 Project (www.cloudtm.eu  <http://www.cloudtm.eu/>)
>>     Senior Researcher @ INESC-ID (www.inesc-id.pt  <http://www.inesc-id.pt/>)
>>     Assistant Professor @ Instituto Superior Tecnico (www.ist.utl.pt  <http://www.ist.utl.pt/>)
>>     Rua Alves Redol, 9
>>     1000-059, Lisbon Portugal
>>     Tel.+ 351 21 3100300  <tel:%2B%20351%2021%203100300>
>>     Fax+ 351 21 3145843  <tel:%2B%20351%2021%203145843>
>>     Webpagehttp://www.gsd.inesc-id.pt/~romanop  <http://www.gsd.inesc-id.pt/%7Eromanop>
>>
>>
>>
>>     On 3/2/13 5:40 PM, matt hoffman wrote:
>>>
>>>     Hey guys,
>>>
>>>     I've been working on a prototype of integrating Infinispan into
>>>     our app.  We do a lot of distributed processing across a small
>>>     cluster, so I've played with Infinispan's existing distributed
>>>     execution framework (which is nice), as well as using Infinispan
>>>     alongside a normal message queue to distribute tasks.  But I've
>>>     also put together a prototype of a new distributed execution
>>>     framework using fork-join pools that you all might be interested
>>>     in.  If it sounds like something that would be worthwhile for
>>>     Infinispan, I can raise a Jira and submit a pull request with
>>>     what I have so far. I'd need to get the CA and company policy
>>>     stuff finalized; that might take a couple days. Meanwhile, in
>>>     case there is any interest, I've described the approach I've
>>>     taken below.
>>>
>>>     First, a little background:
>>>
>>>     A while back I worked on a side project that integrated a
>>>     distributed work-stealing algorithm into the standard JDK
>>>     fork-join queue.  It used JGroups for communication, because it
>>>     was quick and easy for prototyping. So this week I thought i'd
>>>     take a stab at porting that over to Infinispan. The algorithm I
>>>     came up with for Infinispan is a bit less of a work-stealing
>>>     algorithm, to take advantage of Infinispan's built-in
>>>     distribution capabilities, but I think it's still fairly efficient.
>>>
>>>     My basic approach was to take in a cache in the constructor,
>>>     much like the existing distributed executor, and then create a
>>>     parallel, DIST-mode cache that uses the same hash & grouping
>>>     configuration as the original cache. That new parallel cache is
>>>     the "task cache", and we use that to distribute available tasks
>>>     across the cluster. It's a distributed cache so that tasks are
>>>     partitioned across a large cluster, and it uses the hashing
>>>     config of the original cache and a KeyAffinityService to attempt
>>>     to distribute the tasks to the same nodes that contain the data
>>>     being worked on. Nodes use cache listeners to be notified when
>>>     there is new work available, and the atomic replace() to "check
>>>     out" the tasks for execution, and "check in" the results.
>>>
>>>     The basic algorithm is something like this:
>>>
>>>     For a refresher, a normal FJ pool has a fork() method that takes
>>>     in a task, and then places that task on an internal queue
>>>     (actually, one of several queues). When threads are idle, they
>>>     look to the nearest work queue for work. If that work queue does
>>>     not have work, they "steal" work from another thread's queue. 
>>>     So in the best case, tasks remain on the same thread as the task
>>>     that spawned them, so tasks that process the same data as their
>>>     parents may still have that data in the CPU's cache, etc.
>>>     There's more to it than that, but that's the basic idea.
>>>
>>>     This distributed algorithm just adds an extra layer on top for
>>>     tasks that are marked "distributable" (by extending
>>>     DistributedFJTask instead of the normal ForkJoinTask). When you
>>>     call fork() with a DistributedFJTask, it first checks to see if
>>>     the local pool's work queue is empty. If so, we just go ahead
>>>     and submit it locally; there's no reason to distribute it. If
>>>     not, we put the task in the task cache, and let Infinispan
>>>     distribute it. When a node has no more work to do in its
>>>     internal fork-join queues, it looks at the task cache and tries
>>>     to pull work from there.
>>>
>>>     So, it isn't really a "work-stealing" algorithm, per se; the
>>>     distributable tasks are being distributed eagerly using
>>>     Infinispan's normal cache distribution. But I'm hoping that
>>>     doing that also makes it easier to handle node failure, since
>>>     nodes collectively share a common picture of the work to be done.
>>>
>>>     This approach required one change to the actual FJ classes
>>>     themselves (in org.infinispan.util.concurrent.jdk8backported).
>>>     That's probably the most controversial change. I had to make the
>>>     original ForkJoinTask's fork() method non-final in order to
>>>     extend it cleanly. There's probably a way around that, but
>>>     that's the cleanest option I have thought of thus far.
>>>
>>>     And lastly, it's not done yet: basic task distribution is
>>>     working, but I haven't tackled failover to any real extent yet.
>>>     The biggest questions, though, are around what to do with the
>>>     existing distributed execution interfaces. For example,
>>>     DistributedTask has a getCallable() method because it assumes
>>>     it's wrapping a Callable. But ForkJoinTasks don't extend
>>>     Callable. I could put in a shim to wrap the DistributedFJTasks
>>>     into Callables for the sake of that method, but I don't know if
>>>     it's worth it.  Similarly, the DistributedExecutorService
>>>     interface exposes a lot of submit-to-specific-address or
>>>     submit-to-all-addresses methods, which are an odd fit here since
>>>     tasks are distributed via their own cache.  Even if I used a
>>>     KeyAffinityService to target the task to the given Address, it
>>>     might get picked up by another node that shares that same
>>>     hash. But I can add in a direct-to-single-Address capability in
>>>     if that seems worthwhile. Alternately, I can just use entirely
>>>     different interfaces (DistributedFJExecutorService,
>>>     DistributedFJTask?).
>>>
>>>     Thoughts?  Concerns?  Glaring issues?
>>>
>>>
>>>
>>>     _______________________________________________
>>>     infinispan-dev mailing list
>>>     infinispan-dev at lists.jboss.org  <mailto:infinispan-dev at lists.jboss.org>
>>>     https://lists.jboss.org/mailman/listinfo/infinispan-dev
>>
>>
>>     _______________________________________________
>>     infinispan-dev mailing list
>>     infinispan-dev at lists.jboss.org
>>     <mailto:infinispan-dev at lists.jboss.org>
>>     https://lists.jboss.org/mailman/listinfo/infinispan-dev
>>
>>
>> _______________________________________________
>> infinispan-dev mailing list
>> infinispan-dev at lists.jboss.org <mailto:infinispan-dev at lists.jboss.org>
>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>
> --
> Manik Surtani
> manik at jboss.org <mailto:manik at jboss.org>
> twitter.com/maniksurtani <http://twitter.com/maniksurtani>
>
> Platform Architect, JBoss Data Grid
> http://red.ht/data-grid
>
>
>
> _______________________________________________
> infinispan-dev mailing list
> infinispan-dev at lists.jboss.org
> https://lists.jboss.org/mailman/listinfo/infinispan-dev

-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.jboss.org/pipermail/infinispan-dev/attachments/20130511/f81ed3f7/attachment-0001.html 


More information about the infinispan-dev mailing list