[infinispan-commits] Infinispan SVN: r1378 - in trunk/core: src/main/java/org/infinispan/remoting/transport/jgroups and 1 other directory.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Fri Jan 15 12:56:55 EST 2010


Author: manik.surtani at jboss.com
Date: 2010-01-15 12:56:54 -0500 (Fri, 15 Jan 2010)
New Revision: 1378

Modified:
   trunk/core/pom.xml
   trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/CommandAwareRpcDispatcher.java
   trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java
Log:
Upgraded to JGroups 2.9.0.Alpha4 and changed some code in the way RPCs are issued and results collected, for better parallelism and a more reliable ANYCAST like behaviour.

Modified: trunk/core/pom.xml
===================================================================
--- trunk/core/pom.xml	2010-01-15 17:22:22 UTC (rev 1377)
+++ trunk/core/pom.xml	2010-01-15 17:56:54 UTC (rev 1378)
@@ -18,7 +18,7 @@
    <properties>
       <version.jboss.common.core>2.2.14.GA</version.jboss.common.core>
       <version.jboss.marshalling>1.2.0.GA</version.jboss.marshalling>
-      <version.jgroups>2.8.0.GA</version.jgroups>
+      <version.jgroups>2.9.0.Alpha4</version.jgroups>
       <version.jta>1.0.1.GA</version.jta>
       <version.org.jboss.naming>5.0.3.GA</version.org.jboss.naming>
       <version.rhq.pluginAnnotations>1.4.0.B01</version.rhq.pluginAnnotations>

Modified: trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/CommandAwareRpcDispatcher.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/CommandAwareRpcDispatcher.java	2010-01-15 17:22:22 UTC (rev 1377)
+++ trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/CommandAwareRpcDispatcher.java	2010-01-15 17:56:54 UTC (rev 1378)
@@ -31,12 +31,15 @@
 import org.infinispan.remoting.responses.RequestIgnoredResponse;
 import org.infinispan.remoting.responses.Response;
 import org.infinispan.remoting.transport.DistributedSync;
+import org.infinispan.util.Util;
+import org.infinispan.util.concurrent.TimeoutException;
 import org.infinispan.util.logging.Log;
 import org.infinispan.util.logging.LogFactory;
 import org.jgroups.Address;
 import org.jgroups.Channel;
 import org.jgroups.Message;
 import org.jgroups.blocks.GroupRequest;
+import org.jgroups.blocks.RequestOptions;
 import org.jgroups.blocks.RpcDispatcher;
 import org.jgroups.blocks.RspFilter;
 import org.jgroups.util.Buffer;
@@ -44,14 +47,22 @@
 import org.jgroups.util.RspList;
 
 import java.io.NotSerializableException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.Vector;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.infinispan.util.Util.formatString;
+import static org.infinispan.util.Util.prettyPrintTime;
 
 /**
  * A JGroups RPC dispatcher that knows how to deal with {@link ReplicableCommand}s.
@@ -94,15 +105,12 @@
       return true;
    }
 
-   /**
-    * Similar to {@link #callRemoteMethods(java.util.Vector, org.jgroups.blocks.MethodCall, int, long, boolean, boolean,
-    * org.jgroups.blocks.RspFilter)} except that this version is aware of {@link ReplicableCommand} objects.
-    */
    public RspList invokeRemoteCommands(Vector<Address> dests, ReplicableCommand command, int mode, long timeout,
-                                       boolean anycasting, boolean oob, RspFilter filter, boolean supportReplay, boolean asyncMarshalling)
+                                       boolean anycasting, boolean oob, RspFilter filter, boolean supportReplay, boolean asyncMarshalling,
+                                       boolean broadcast)
          throws NotSerializableException, ExecutionException, InterruptedException {
 
-      ReplicationTask task = new ReplicationTask(command, oob, dests, mode, timeout, anycasting, filter, supportReplay);
+      ReplicationTask task = new ReplicationTask(command, oob, dests, mode, timeout, anycasting, filter, supportReplay, broadcast);
 
       if (asyncMarshalling) {
          asyncExecutor.submit(task);
@@ -114,6 +122,7 @@
          } catch (Exception e) {
             throw new CacheException(e);
          }
+         if (mode == GroupRequest.GET_NONE) return null; // "Traditional" async.
          if (response.isEmpty() || containsOnlyNulls(response))
             return null;
          else
@@ -197,10 +206,11 @@
       private boolean anycasting;
       private RspFilter filter;
       boolean supportReplay = false;
+      boolean broadcast = false;
 
       private ReplicationTask(ReplicableCommand command, boolean oob, Vector<Address> dests,
                               int mode, long timeout,
-                              boolean anycasting, RspFilter filter, boolean supportReplay) {
+                              boolean anycasting, RspFilter filter, boolean supportReplay, boolean broadcast) {
          this.command = command;
          this.oob = oob;
          this.dests = dests;
@@ -209,8 +219,18 @@
          this.anycasting = anycasting;
          this.filter = filter;
          this.supportReplay = supportReplay;
+         this.broadcast = broadcast;
       }
 
+      private Message constructMessage(Buffer buf, Address recipient) {
+         Message msg = new Message();
+         msg.setBuffer(buf);
+         if (oob) msg.setFlag(Message.OOB);
+         if (mode != GroupRequest.GET_NONE) msg.setFlag(Message.DONT_BUNDLE);
+         if (recipient != null) msg.setDest(recipient);
+         return msg;
+      }
+
       public RspList call() throws Exception {
          Buffer buf;
          try {
@@ -220,59 +240,102 @@
             throw new RuntimeException("Failure to marshal argument(s)", e);
          }
 
-         Message msg = new Message();
-         msg.setBuffer(buf);
-         if (oob) msg.setFlag(Message.OOB);
          // Replay capability requires responses from all members!
          int mode = supportReplay ? GroupRequest.GET_ALL : this.mode;
- 
+
          // if there is a JOIN in progress, wait for this to complete.
          // See ISPN-83 for more details.  Once ISPN-83 is addressed, this may no longer be needed.
          distributedSync.blockUntilNoJoinsInProgress();
-         
-         //Use JGroups 2.8 feature Message.DONT_BUNDLE flag to override message bundling at the transport level
-         //for asynchronous calls
-         //See ISPN-192 for more details
-         if (mode != GroupRequest.GET_NONE) {
-            msg.setFlag(Message.DONT_BUNDLE);
-         }
 
          if (filter != null) mode = GroupRequest.GET_FIRST;
 
-         RspList retval = castMessage(dests, msg, mode, timeout, anycasting, filter);
-         if (trace) log.trace("responses: {0}", retval);
+         RspList retval = null;
 
-         // a null response is 99% likely to be due to a marshalling problem - we throw a NSE, this needs to be changed when
-         // JGroups supports http://jira.jboss.com/jira/browse/JGRP-193
-         // the serialization problem could be on the remote end and this is why we cannot catch this above, when marshalling.
-         if (retval == null)
-            throw new NotSerializableException("RpcDispatcher returned a null.  This is most often caused by args for "
-                  + command.getClass().getSimpleName() + " not being serializable.");
+         if (broadcast) {
+            retval = castMessage(dests, constructMessage(buf, null), mode, timeout, false, filter);
+         } else {
+            Set<Address> targets = new HashSet<Address>(dests); // should sufficiently randomize order.
+            RequestOptions opts = new RequestOptions();
+            opts.setMode(mode);
+            opts.setTimeout(timeout);
 
-         if (supportReplay) {
-            boolean replay = false;
-            Vector<Address> ignorers = new Vector<Address>();
-            for (Map.Entry<Address, Rsp> entry : retval.entrySet()) {
-               Object value = entry.getValue().getValue();
-               if (value instanceof RequestIgnoredResponse) {
-                  ignorers.add(entry.getKey());
-               } else if (value instanceof ExtendedResponse) {
-                  ExtendedResponse extended = (ExtendedResponse) value;
-                  replay |= extended.isReplayIgnoredRequests();
-                  entry.getValue().setValue(extended.getResponse());
+            targets.remove(channel.getAddress()); // just in case
+
+            // if at all possible, try not to use JGroups' ANYCAST for now.  Multiple (parallel) UNICASTs are much faster.
+            if (filter != null) {
+               // This is possibly a remote GET.
+               // TODO this is sub-optimal and sequential (for now), until JGroups provides notifying futures - JGRP-1030
+               for (Address a : targets) {
+                  Object response = sendMessage(constructMessage(buf, a), opts);
+                  filter.isAcceptable(response, a);
+                  if (!filter.needMoreResponses()) {
+                     retval = new RspList(Collections.singleton(new Rsp(a, response)));
+                     break;
+                  }
                }
+
+            } else if (mode == GroupRequest.GET_ALL) {
+               // A SYNC call that needs to go everywhere
+               Map<Address, Future<Object>> futures = new HashMap<Address, Future<Object>>(targets.size());
+
+               for (Address dest : targets) futures.put(dest, sendMessageWithFuture(constructMessage(buf, dest), opts));
+
+               retval = new RspList();
+
+               // a get() on each future will block till that call completes.
+               for (Map.Entry<Address, Future<Object>> entry : futures.entrySet()) {
+                  try {
+                     retval.addRsp(entry.getKey(), entry.getValue().get(timeout, MILLISECONDS));
+                  } catch (java.util.concurrent.TimeoutException te) {
+                     throw new TimeoutException(formatString("Timed out after {0} waiting for a response from {1}",
+                                                             prettyPrintTime(timeout), entry.getKey()));
+                  }
+               }
+
+            } else if (mode == GroupRequest.GET_NONE) {
+               // An ASYNC call.  We don't care about responses.
+               for (Address dest : targets) sendMessageWithFuture(constructMessage(buf, dest), opts);
             }
+         }
 
-            if (replay && ignorers.size() > 0) {
-               //Since we are making a sync call make sure we don't bundle
-               //See ISPN-192 for more details               
-               msg.setFlag(Message.DONT_BUNDLE);
-               
-               if (trace)
-                  log.trace("Replaying message to ignoring senders: " + ignorers);
-               RspList responses = castMessage(ignorers, msg, GroupRequest.GET_ALL, timeout, anycasting, filter);
-               if (responses != null)
-                  retval.putAll(responses);
+         // we only bother parsing responses if we are not in ASYNC mode.
+         if (mode != GroupRequest.GET_NONE) {
+
+            if (trace) log.trace("responses: {0}", retval);
+
+            // a null response is 99% likely to be due to a marshalling problem - we throw a NSE, this needs to be changed when
+            // JGroups supports http://jira.jboss.com/jira/browse/JGRP-193
+            // the serialization problem could be on the remote end and this is why we cannot catch this above, when marshalling.
+            if (retval == null)
+               throw new NotSerializableException("RpcDispatcher returned a null.  This is most often caused by args for "
+                     + command.getClass().getSimpleName() + " not being serializable.");
+
+            if (supportReplay) {
+               boolean replay = false;
+               Vector<Address> ignorers = new Vector<Address>();
+               for (Map.Entry<Address, Rsp> entry : retval.entrySet()) {
+                  Object value = entry.getValue().getValue();
+                  if (value instanceof RequestIgnoredResponse) {
+                     ignorers.add(entry.getKey());
+                  } else if (value instanceof ExtendedResponse) {
+                     ExtendedResponse extended = (ExtendedResponse) value;
+                     replay |= extended.isReplayIgnoredRequests();
+                     entry.getValue().setValue(extended.getResponse());
+                  }
+               }
+
+               if (replay && ignorers.size() > 0) {
+                  Message msg = constructMessage(buf, null);
+                  //Since we are making a sync call make sure we don't bundle
+                  //See ISPN-192 for more details
+                  msg.setFlag(Message.DONT_BUNDLE);
+
+                  if (trace)
+                     log.trace("Replaying message to ignoring senders: " + ignorers);
+                  RspList responses = castMessage(ignorers, msg, GroupRequest.GET_ALL, timeout, anycasting, filter);
+                  if (responses != null)
+                     retval.putAll(responses);
+               }
             }
          }
 

Modified: trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java	2010-01-15 17:22:22 UTC (rev 1377)
+++ trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java	2010-01-15 17:56:54 UTC (rev 1378)
@@ -400,7 +400,7 @@
       try {
          RspList rsps = dispatcher.invokeRemoteCommands(toJGroupsAddressVector(recipients), rpcCommand, toJGroupsMode(mode),
                                                         timeout, recipients != null, usePriorityQueue,
-                                                        toJGroupsFilter(responseFilter), supportReplay, asyncMarshalling);
+                                                        toJGroupsFilter(responseFilter), supportReplay, asyncMarshalling, recipients == null || recipients.size() == members.size());
 
          if (mode.isAsynchronous()) return Collections.emptyList();// async case
 
@@ -579,6 +579,7 @@
    private Vector<org.jgroups.Address> toJGroupsAddressVector(Collection<Address> list) {
       if (list == null) return null;
       if (list.isEmpty()) return new Vector<org.jgroups.Address>();
+
       // optimize for the single node case
       Vector<org.jgroups.Address> retval = new Vector<org.jgroups.Address>(list.size());
       for (Address a : list) retval.add(toJGroupsAddress(a));



More information about the infinispan-commits mailing list