<html>
  <head>
    <meta content="text/html; charset=ISO-8859-1"
      http-equiv="Content-Type">
  </head>
  <body bgcolor="#FFFFFF" text="#000000">
    <div class="moz-cite-prefix">This sounds really interesting Matt. In
      the Cloud-TM project (<a class="moz-txt-link-abbreviated" href="http://www.cloudtm.eu">www.cloudtm.eu</a>) we are currently developing
      a parallel graph-analysis algorithm on top of Infinispan's DEF. I
      would be really curious to take a look at the framework you
      developed, and see how it could be exploited in our application.<br>
      <br>
      Regards,<br>
      <br>
      &nbsp;&nbsp;&nbsp; Paolo<br>
      <br>
      <br>
      <pre class="moz-signature" cols="72">-- 

Paolo Romano, PhD
Coordinator of the Cloud-TM ICT FP7 Project (<a class="moz-txt-link-abbreviated" href="http://www.cloudtm.eu">www.cloudtm.eu</a>)
Senior Researcher @ INESC-ID (<a class="moz-txt-link-abbreviated" href="http://www.inesc-id.pt">www.inesc-id.pt</a>)
Assistant Professor @ Instituto Superior Tecnico (<a class="moz-txt-link-abbreviated" href="http://www.ist.utl.pt">www.ist.utl.pt</a>)
Rua Alves Redol, 9
1000-059, Lisbon Portugal
Tel. + 351 21 3100300
Fax  + 351 21 3145843
Webpage <a class="moz-txt-link-freetext" href="http://www.gsd.inesc-id.pt/~romanop">http://www.gsd.inesc-id.pt/~romanop</a></pre>
      <br>
      <br>
      On 3/2/13 5:40 PM, matt hoffman wrote:<br>
    </div>
    <blockquote
cite="mid:CADiTnNVsc+on_6amyB-XkSnH6TknZvzJTPFc9Bgu=Euyjt1few@mail.gmail.com"
      type="cite">
      <p>Hey guys,&nbsp;</p>
      <p>I've been working on a prototype of integrating Infinispan into
        our app.&nbsp; We do a lot of distributed processing across a small
        cluster, so I've played with Infinispan's existing distributed
        execution framework (which is nice), as well as using Infinispan
        alongside a normal message queue to distribute tasks.&nbsp; But I've
        also put together a prototype of a new distributed execution
        framework using fork-join pools that you all might be interested
        in.&nbsp; If it sounds like something that would be worthwhile for
        Infinispan, I can raise a Jira and submit a pull request with
        what I have so far.&nbsp;I'd need to get the CA and company policy
        stuff finalized; that might take a couple days. Meanwhile, in
        case there is any interest, I've described the approach I've
        taken below.</p>
      <p>First, a little background:</p>
      <p>A while back I worked on a side project that integrated a
        distributed work-stealing algorithm into the standard JDK
        fork-join queue.&nbsp; It used JGroups for communication, because it
        was quick and easy for prototyping. So this week I thought i'd
        take a stab at porting that over to Infinispan. The algorithm I
        came up with for Infinispan is a bit less of a work-stealing
        algorithm, to take advantage of Infinispan's built-in
        distribution capabilities, but I think it's still fairly
        efficient.&nbsp;</p>
      <p>My basic approach was to take in a cache in the constructor,
        much like the existing distributed executor, and then create a
        parallel, DIST-mode cache that uses the same hash &amp; grouping
        configuration as the original cache. That new parallel cache is
        the "task cache", and we use that to distribute available tasks
        across the cluster. It's a distributed cache so that tasks are
        partitioned across a large cluster, and it uses the hashing
        config of the original cache and a KeyAffinityService to attempt
        to distribute the tasks to the same nodes that contain the data
        being worked on. Nodes use cache listeners to be notified when
        there is new work available, and the atomic replace() to "check
        out" the tasks for execution, and "check in" the results.&nbsp; &nbsp;</p>
      <p>The basic algorithm is something like this:&nbsp;</p>
      <p>For a refresher, a normal FJ pool has a fork() method that
        takes in a task, and then places that task on an internal queue
        (actually, one of several queues). When threads are idle, they
        look to the nearest work queue for work. If that work queue does
        not have work, they "steal" work from another thread's queue.&nbsp;
        So in the best case, tasks remain on the same thread as the task
        that spawned them, so tasks that process the same data as their
        parents may still have that data in the CPU's cache, etc.
        There's more to it than that, but that's the basic idea. &nbsp;</p>
      <p>This distributed algorithm just adds an extra layer on top for
        tasks that are marked "distributable" (by extending
        DistributedFJTask instead of the normal ForkJoinTask). When you
        call fork() with a DistributedFJTask, it first checks to see if
        the local pool's work queue is empty. If so, we just go ahead
        and submit it locally; there's no reason to distribute it. If
        not, we put the task in the task cache, and let Infinispan
        distribute it. When a node has no more work to do in its
        internal fork-join queues, it looks at the task cache and tries
        to pull work from there.&nbsp;</p>
      <p>So, it isn't really a "work-stealing" algorithm, per se; the
        distributable tasks are being distributed eagerly using
        Infinispan's normal cache distribution. But I'm hoping that
        doing that also makes it easier to handle node failure, since
        nodes collectively share a common picture of the work to be
        done.</p>
      <p>This approach required one change to the actual FJ classes
        themselves (in <span>org.infinispan.util.concurrent.jdk8backported).
        </span>That's probably the most controversial change. I had to
        make the original ForkJoinTask's fork() method non-final in
        order to extend it cleanly.&nbsp; There's probably a way around that,
        but that's the cleanest option I have thought of thus far.&nbsp;</p>
      <p>And lastly, it's not done yet: basic task distribution is
        working, but I haven't tackled failover to any real extent yet.
        The biggest questions, though, are around what to do with the
        existing distributed execution interfaces. For example,
        DistributedTask has a getCallable() method because it assumes
        it's wrapping a Callable. But ForkJoinTasks don't extend
        Callable. I could put in a shim to wrap the DistributedFJTasks
        into Callables for the sake of that method, but I don't know if
        it's worth it.&nbsp; Similarly, the DistributedExecutorService
        interface exposes a lot of submit-to-specific-address or
        submit-to-all-addresses methods, which are an odd fit here since
        tasks are distributed via their own cache.&nbsp; Even if I used a
        KeyAffinityService to target the task to the given Address, it
        might get picked up by another node that shares that same
        hash.&nbsp;But I can add in a direct-to-single-Address capability in
        if that seems worthwhile. Alternately, I can just use entirely
        different interfaces (DistributedFJExecutorService,
        DistributedFJTask?).&nbsp;</p>
      <p>Thoughts?&nbsp; Concerns?&nbsp; Glaring issues?&nbsp;</p>
      <br>
      <fieldset class="mimeAttachmentHeader"></fieldset>
      <br>
      <pre wrap="">_______________________________________________
infinispan-dev mailing list
<a class="moz-txt-link-abbreviated" href="mailto:infinispan-dev@lists.jboss.org">infinispan-dev@lists.jboss.org</a>
<a class="moz-txt-link-freetext" href="https://lists.jboss.org/mailman/listinfo/infinispan-dev">https://lists.jboss.org/mailman/listinfo/infinispan-dev</a></pre>
    </blockquote>
    <br>
  </body>
</html>