[infinispan-dev] distributed fork-join executor prototype
Paolo Romano
romano at inesc-id.pt
Fri May 10 19:23:49 EDT 2013
Very interesting stuff Matt!
I find the design that leverages on Infinispan for distributing the
tasks - while maximizing locality and exploiting Infinispan's
fault-tolerance capabilities - modular and elegant. It will be
interesting to evaluate if there is a significant overhead with respect
to a purely JGroups-based mechanism.
We will be looking at your code, and may pheraps come up with some
use-cases.
Cheers,
Paolo
On 5/9/13 10:52 AM, Manik Surtani wrote:
> Hi Matt.
>
> Thanks for resurrecting and sorry for not responding on the original
> thread.
>
> Interesting discussions. I would intuitively agree with Bela were it
> not for the fact that, as you said, you're actually storing data,
> require high availability of that data during server failure, and may
> possibly chain tasks (as per a local fork-join) and want to maintain
> server affinity across related tasks.
>
> You could achieve all of this with JGroups but would end up
> re-implementing certain bits of Infinispan, specifically:
> - task failover
> - task cancelling
> - consistent hash based node selection for delivery, ensuring affinity
> across data and tasks
>
> So I think it is a pretty valid use case to make use of what
> Infinispan already has.
>
> I'll have a look at your sources and comment more. I'm sure Paolo and
> others at CloudTM will probably do the same.
>
> Cheers
> Manik
>
> On 9 May 2013, at 01:50, Matt Hoffman <matt at mhoffman.org
> <mailto:matt at mhoffman.org>> wrote:
>
>> Resurrecting this topic: I've put some sample code on my fork here:
>> https://github.com/matthoffman/infinispan/tree/dfj
>> There's a README there that offers an overview. I hope to do some
>> performance comparisons over the weekend, but in the meantime, the
>> code is there if you're curious.
>>
>> I'm mainly interested whether this is the type of thing that
>> Infinispan may be interested in, as an alternative to the current
>> distributed executor (much like the ForkJoinPool in the JDK is an
>> alternative to a traditional thread pool).
>> If so, the next steps would be to do some performance tests, pick one
>> implementation to move forward with (out of the 3 prototypes on that
>> branch) and clean it up sufficiently to be considered for a pull
>> request.
>>
>> If its not something that Infinispan is interested in, then I'll
>> change approaches and make it more generic, so that it isn't
>> Infinispan-specific and can be used with other transports. I'm open
>> to either option... I think there are pros and cons either way.
>>
>>
>> Thanks!
>>
>>
>> matt
>>
>>
>>
>> On Mon, Mar 4, 2013 at 7:07 AM, Paolo Romano <romano at inesc-id.pt
>> <mailto:romano at inesc-id.pt>> wrote:
>>
>> This sounds really interesting Matt. In the Cloud-TM project
>> (www.cloudtm.eu <http://www.cloudtm.eu/>) we are currently
>> developing a parallel graph-analysis algorithm on top of
>> Infinispan's DEF. I would be really curious to take a look at the
>> framework you developed, and see how it could be exploited in our
>> application.
>>
>> Regards,
>>
>> Paolo
>>
>>
>> --
>>
>> Paolo Romano, PhD
>> Coordinator of the Cloud-TM ICT FP7 Project (www.cloudtm.eu <http://www.cloudtm.eu/>)
>> Senior Researcher @ INESC-ID (www.inesc-id.pt <http://www.inesc-id.pt/>)
>> Assistant Professor @ Instituto Superior Tecnico (www.ist.utl.pt <http://www.ist.utl.pt/>)
>> Rua Alves Redol, 9
>> 1000-059, Lisbon Portugal
>> Tel.+ 351 21 3100300 <tel:%2B%20351%2021%203100300>
>> Fax+ 351 21 3145843 <tel:%2B%20351%2021%203145843>
>> Webpagehttp://www.gsd.inesc-id.pt/~romanop <http://www.gsd.inesc-id.pt/%7Eromanop>
>>
>>
>>
>> On 3/2/13 5:40 PM, matt hoffman wrote:
>>>
>>> Hey guys,
>>>
>>> I've been working on a prototype of integrating Infinispan into
>>> our app. We do a lot of distributed processing across a small
>>> cluster, so I've played with Infinispan's existing distributed
>>> execution framework (which is nice), as well as using Infinispan
>>> alongside a normal message queue to distribute tasks. But I've
>>> also put together a prototype of a new distributed execution
>>> framework using fork-join pools that you all might be interested
>>> in. If it sounds like something that would be worthwhile for
>>> Infinispan, I can raise a Jira and submit a pull request with
>>> what I have so far. I'd need to get the CA and company policy
>>> stuff finalized; that might take a couple days. Meanwhile, in
>>> case there is any interest, I've described the approach I've
>>> taken below.
>>>
>>> First, a little background:
>>>
>>> A while back I worked on a side project that integrated a
>>> distributed work-stealing algorithm into the standard JDK
>>> fork-join queue. It used JGroups for communication, because it
>>> was quick and easy for prototyping. So this week I thought i'd
>>> take a stab at porting that over to Infinispan. The algorithm I
>>> came up with for Infinispan is a bit less of a work-stealing
>>> algorithm, to take advantage of Infinispan's built-in
>>> distribution capabilities, but I think it's still fairly efficient.
>>>
>>> My basic approach was to take in a cache in the constructor,
>>> much like the existing distributed executor, and then create a
>>> parallel, DIST-mode cache that uses the same hash & grouping
>>> configuration as the original cache. That new parallel cache is
>>> the "task cache", and we use that to distribute available tasks
>>> across the cluster. It's a distributed cache so that tasks are
>>> partitioned across a large cluster, and it uses the hashing
>>> config of the original cache and a KeyAffinityService to attempt
>>> to distribute the tasks to the same nodes that contain the data
>>> being worked on. Nodes use cache listeners to be notified when
>>> there is new work available, and the atomic replace() to "check
>>> out" the tasks for execution, and "check in" the results.
>>>
>>> The basic algorithm is something like this:
>>>
>>> For a refresher, a normal FJ pool has a fork() method that takes
>>> in a task, and then places that task on an internal queue
>>> (actually, one of several queues). When threads are idle, they
>>> look to the nearest work queue for work. If that work queue does
>>> not have work, they "steal" work from another thread's queue.
>>> So in the best case, tasks remain on the same thread as the task
>>> that spawned them, so tasks that process the same data as their
>>> parents may still have that data in the CPU's cache, etc.
>>> There's more to it than that, but that's the basic idea.
>>>
>>> This distributed algorithm just adds an extra layer on top for
>>> tasks that are marked "distributable" (by extending
>>> DistributedFJTask instead of the normal ForkJoinTask). When you
>>> call fork() with a DistributedFJTask, it first checks to see if
>>> the local pool's work queue is empty. If so, we just go ahead
>>> and submit it locally; there's no reason to distribute it. If
>>> not, we put the task in the task cache, and let Infinispan
>>> distribute it. When a node has no more work to do in its
>>> internal fork-join queues, it looks at the task cache and tries
>>> to pull work from there.
>>>
>>> So, it isn't really a "work-stealing" algorithm, per se; the
>>> distributable tasks are being distributed eagerly using
>>> Infinispan's normal cache distribution. But I'm hoping that
>>> doing that also makes it easier to handle node failure, since
>>> nodes collectively share a common picture of the work to be done.
>>>
>>> This approach required one change to the actual FJ classes
>>> themselves (in org.infinispan.util.concurrent.jdk8backported).
>>> That's probably the most controversial change. I had to make the
>>> original ForkJoinTask's fork() method non-final in order to
>>> extend it cleanly. There's probably a way around that, but
>>> that's the cleanest option I have thought of thus far.
>>>
>>> And lastly, it's not done yet: basic task distribution is
>>> working, but I haven't tackled failover to any real extent yet.
>>> The biggest questions, though, are around what to do with the
>>> existing distributed execution interfaces. For example,
>>> DistributedTask has a getCallable() method because it assumes
>>> it's wrapping a Callable. But ForkJoinTasks don't extend
>>> Callable. I could put in a shim to wrap the DistributedFJTasks
>>> into Callables for the sake of that method, but I don't know if
>>> it's worth it. Similarly, the DistributedExecutorService
>>> interface exposes a lot of submit-to-specific-address or
>>> submit-to-all-addresses methods, which are an odd fit here since
>>> tasks are distributed via their own cache. Even if I used a
>>> KeyAffinityService to target the task to the given Address, it
>>> might get picked up by another node that shares that same
>>> hash. But I can add in a direct-to-single-Address capability in
>>> if that seems worthwhile. Alternately, I can just use entirely
>>> different interfaces (DistributedFJExecutorService,
>>> DistributedFJTask?).
>>>
>>> Thoughts? Concerns? Glaring issues?
>>>
>>>
>>>
>>> _______________________________________________
>>> infinispan-dev mailing list
>>> infinispan-dev at lists.jboss.org <mailto:infinispan-dev at lists.jboss.org>
>>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>>
>>
>> _______________________________________________
>> infinispan-dev mailing list
>> infinispan-dev at lists.jboss.org
>> <mailto:infinispan-dev at lists.jboss.org>
>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>>
>>
>> _______________________________________________
>> infinispan-dev mailing list
>> infinispan-dev at lists.jboss.org <mailto:infinispan-dev at lists.jboss.org>
>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>
> --
> Manik Surtani
> manik at jboss.org <mailto:manik at jboss.org>
> twitter.com/maniksurtani <http://twitter.com/maniksurtani>
>
> Platform Architect, JBoss Data Grid
> http://red.ht/data-grid
>
>
>
> _______________________________________________
> infinispan-dev mailing list
> infinispan-dev at lists.jboss.org
> https://lists.jboss.org/mailman/listinfo/infinispan-dev
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.jboss.org/pipermail/infinispan-dev/attachments/20130511/f81ed3f7/attachment-0001.html
More information about the infinispan-dev
mailing list