[infinispan-commits] Infinispan SVN: r1378 - in trunk/core: src/main/java/org/infinispan/remoting/transport/jgroups and 1 other directory.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Fri Jan 15 12:56:55 EST 2010
Author: manik.surtani at jboss.com
Date: 2010-01-15 12:56:54 -0500 (Fri, 15 Jan 2010)
New Revision: 1378
Modified:
trunk/core/pom.xml
trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/CommandAwareRpcDispatcher.java
trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java
Log:
Upgraded to JGroups 2.9.0.Alpha4 and changed some code in the way RPCs are issued and results collected, for better parallelism and a more reliable ANYCAST like behaviour.
Modified: trunk/core/pom.xml
===================================================================
--- trunk/core/pom.xml 2010-01-15 17:22:22 UTC (rev 1377)
+++ trunk/core/pom.xml 2010-01-15 17:56:54 UTC (rev 1378)
@@ -18,7 +18,7 @@
<properties>
<version.jboss.common.core>2.2.14.GA</version.jboss.common.core>
<version.jboss.marshalling>1.2.0.GA</version.jboss.marshalling>
- <version.jgroups>2.8.0.GA</version.jgroups>
+ <version.jgroups>2.9.0.Alpha4</version.jgroups>
<version.jta>1.0.1.GA</version.jta>
<version.org.jboss.naming>5.0.3.GA</version.org.jboss.naming>
<version.rhq.pluginAnnotations>1.4.0.B01</version.rhq.pluginAnnotations>
Modified: trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/CommandAwareRpcDispatcher.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/CommandAwareRpcDispatcher.java 2010-01-15 17:22:22 UTC (rev 1377)
+++ trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/CommandAwareRpcDispatcher.java 2010-01-15 17:56:54 UTC (rev 1378)
@@ -31,12 +31,15 @@
import org.infinispan.remoting.responses.RequestIgnoredResponse;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.transport.DistributedSync;
+import org.infinispan.util.Util;
+import org.infinispan.util.concurrent.TimeoutException;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.jgroups.Address;
import org.jgroups.Channel;
import org.jgroups.Message;
import org.jgroups.blocks.GroupRequest;
+import org.jgroups.blocks.RequestOptions;
import org.jgroups.blocks.RpcDispatcher;
import org.jgroups.blocks.RspFilter;
import org.jgroups.util.Buffer;
@@ -44,14 +47,22 @@
import org.jgroups.util.RspList;
import java.io.NotSerializableException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import java.util.Vector;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.infinispan.util.Util.formatString;
+import static org.infinispan.util.Util.prettyPrintTime;
/**
* A JGroups RPC dispatcher that knows how to deal with {@link ReplicableCommand}s.
@@ -94,15 +105,12 @@
return true;
}
- /**
- * Similar to {@link #callRemoteMethods(java.util.Vector, org.jgroups.blocks.MethodCall, int, long, boolean, boolean,
- * org.jgroups.blocks.RspFilter)} except that this version is aware of {@link ReplicableCommand} objects.
- */
public RspList invokeRemoteCommands(Vector<Address> dests, ReplicableCommand command, int mode, long timeout,
- boolean anycasting, boolean oob, RspFilter filter, boolean supportReplay, boolean asyncMarshalling)
+ boolean anycasting, boolean oob, RspFilter filter, boolean supportReplay, boolean asyncMarshalling,
+ boolean broadcast)
throws NotSerializableException, ExecutionException, InterruptedException {
- ReplicationTask task = new ReplicationTask(command, oob, dests, mode, timeout, anycasting, filter, supportReplay);
+ ReplicationTask task = new ReplicationTask(command, oob, dests, mode, timeout, anycasting, filter, supportReplay, broadcast);
if (asyncMarshalling) {
asyncExecutor.submit(task);
@@ -114,6 +122,7 @@
} catch (Exception e) {
throw new CacheException(e);
}
+ if (mode == GroupRequest.GET_NONE) return null; // "Traditional" async.
if (response.isEmpty() || containsOnlyNulls(response))
return null;
else
@@ -197,10 +206,11 @@
private boolean anycasting;
private RspFilter filter;
boolean supportReplay = false;
+ boolean broadcast = false;
private ReplicationTask(ReplicableCommand command, boolean oob, Vector<Address> dests,
int mode, long timeout,
- boolean anycasting, RspFilter filter, boolean supportReplay) {
+ boolean anycasting, RspFilter filter, boolean supportReplay, boolean broadcast) {
this.command = command;
this.oob = oob;
this.dests = dests;
@@ -209,8 +219,18 @@
this.anycasting = anycasting;
this.filter = filter;
this.supportReplay = supportReplay;
+ this.broadcast = broadcast;
}
+ private Message constructMessage(Buffer buf, Address recipient) {
+ Message msg = new Message();
+ msg.setBuffer(buf);
+ if (oob) msg.setFlag(Message.OOB);
+ if (mode != GroupRequest.GET_NONE) msg.setFlag(Message.DONT_BUNDLE);
+ if (recipient != null) msg.setDest(recipient);
+ return msg;
+ }
+
public RspList call() throws Exception {
Buffer buf;
try {
@@ -220,59 +240,102 @@
throw new RuntimeException("Failure to marshal argument(s)", e);
}
- Message msg = new Message();
- msg.setBuffer(buf);
- if (oob) msg.setFlag(Message.OOB);
// Replay capability requires responses from all members!
int mode = supportReplay ? GroupRequest.GET_ALL : this.mode;
-
+
// if there is a JOIN in progress, wait for this to complete.
// See ISPN-83 for more details. Once ISPN-83 is addressed, this may no longer be needed.
distributedSync.blockUntilNoJoinsInProgress();
-
- //Use JGroups 2.8 feature Message.DONT_BUNDLE flag to override message bundling at the transport level
- //for asynchronous calls
- //See ISPN-192 for more details
- if (mode != GroupRequest.GET_NONE) {
- msg.setFlag(Message.DONT_BUNDLE);
- }
if (filter != null) mode = GroupRequest.GET_FIRST;
- RspList retval = castMessage(dests, msg, mode, timeout, anycasting, filter);
- if (trace) log.trace("responses: {0}", retval);
+ RspList retval = null;
- // a null response is 99% likely to be due to a marshalling problem - we throw a NSE, this needs to be changed when
- // JGroups supports http://jira.jboss.com/jira/browse/JGRP-193
- // the serialization problem could be on the remote end and this is why we cannot catch this above, when marshalling.
- if (retval == null)
- throw new NotSerializableException("RpcDispatcher returned a null. This is most often caused by args for "
- + command.getClass().getSimpleName() + " not being serializable.");
+ if (broadcast) {
+ retval = castMessage(dests, constructMessage(buf, null), mode, timeout, false, filter);
+ } else {
+ Set<Address> targets = new HashSet<Address>(dests); // should sufficiently randomize order.
+ RequestOptions opts = new RequestOptions();
+ opts.setMode(mode);
+ opts.setTimeout(timeout);
- if (supportReplay) {
- boolean replay = false;
- Vector<Address> ignorers = new Vector<Address>();
- for (Map.Entry<Address, Rsp> entry : retval.entrySet()) {
- Object value = entry.getValue().getValue();
- if (value instanceof RequestIgnoredResponse) {
- ignorers.add(entry.getKey());
- } else if (value instanceof ExtendedResponse) {
- ExtendedResponse extended = (ExtendedResponse) value;
- replay |= extended.isReplayIgnoredRequests();
- entry.getValue().setValue(extended.getResponse());
+ targets.remove(channel.getAddress()); // just in case
+
+ // if at all possible, try not to use JGroups' ANYCAST for now. Multiple (parallel) UNICASTs are much faster.
+ if (filter != null) {
+ // This is possibly a remote GET.
+ // TODO this is sub-optimal and sequential (for now), until JGroups provides notifying futures - JGRP-1030
+ for (Address a : targets) {
+ Object response = sendMessage(constructMessage(buf, a), opts);
+ filter.isAcceptable(response, a);
+ if (!filter.needMoreResponses()) {
+ retval = new RspList(Collections.singleton(new Rsp(a, response)));
+ break;
+ }
}
+
+ } else if (mode == GroupRequest.GET_ALL) {
+ // A SYNC call that needs to go everywhere
+ Map<Address, Future<Object>> futures = new HashMap<Address, Future<Object>>(targets.size());
+
+ for (Address dest : targets) futures.put(dest, sendMessageWithFuture(constructMessage(buf, dest), opts));
+
+ retval = new RspList();
+
+ // a get() on each future will block till that call completes.
+ for (Map.Entry<Address, Future<Object>> entry : futures.entrySet()) {
+ try {
+ retval.addRsp(entry.getKey(), entry.getValue().get(timeout, MILLISECONDS));
+ } catch (java.util.concurrent.TimeoutException te) {
+ throw new TimeoutException(formatString("Timed out after {0} waiting for a response from {1}",
+ prettyPrintTime(timeout), entry.getKey()));
+ }
+ }
+
+ } else if (mode == GroupRequest.GET_NONE) {
+ // An ASYNC call. We don't care about responses.
+ for (Address dest : targets) sendMessageWithFuture(constructMessage(buf, dest), opts);
}
+ }
- if (replay && ignorers.size() > 0) {
- //Since we are making a sync call make sure we don't bundle
- //See ISPN-192 for more details
- msg.setFlag(Message.DONT_BUNDLE);
-
- if (trace)
- log.trace("Replaying message to ignoring senders: " + ignorers);
- RspList responses = castMessage(ignorers, msg, GroupRequest.GET_ALL, timeout, anycasting, filter);
- if (responses != null)
- retval.putAll(responses);
+ // we only bother parsing responses if we are not in ASYNC mode.
+ if (mode != GroupRequest.GET_NONE) {
+
+ if (trace) log.trace("responses: {0}", retval);
+
+ // a null response is 99% likely to be due to a marshalling problem - we throw a NSE, this needs to be changed when
+ // JGroups supports http://jira.jboss.com/jira/browse/JGRP-193
+ // the serialization problem could be on the remote end and this is why we cannot catch this above, when marshalling.
+ if (retval == null)
+ throw new NotSerializableException("RpcDispatcher returned a null. This is most often caused by args for "
+ + command.getClass().getSimpleName() + " not being serializable.");
+
+ if (supportReplay) {
+ boolean replay = false;
+ Vector<Address> ignorers = new Vector<Address>();
+ for (Map.Entry<Address, Rsp> entry : retval.entrySet()) {
+ Object value = entry.getValue().getValue();
+ if (value instanceof RequestIgnoredResponse) {
+ ignorers.add(entry.getKey());
+ } else if (value instanceof ExtendedResponse) {
+ ExtendedResponse extended = (ExtendedResponse) value;
+ replay |= extended.isReplayIgnoredRequests();
+ entry.getValue().setValue(extended.getResponse());
+ }
+ }
+
+ if (replay && ignorers.size() > 0) {
+ Message msg = constructMessage(buf, null);
+ //Since we are making a sync call make sure we don't bundle
+ //See ISPN-192 for more details
+ msg.setFlag(Message.DONT_BUNDLE);
+
+ if (trace)
+ log.trace("Replaying message to ignoring senders: " + ignorers);
+ RspList responses = castMessage(ignorers, msg, GroupRequest.GET_ALL, timeout, anycasting, filter);
+ if (responses != null)
+ retval.putAll(responses);
+ }
}
}
Modified: trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java 2010-01-15 17:22:22 UTC (rev 1377)
+++ trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java 2010-01-15 17:56:54 UTC (rev 1378)
@@ -400,7 +400,7 @@
try {
RspList rsps = dispatcher.invokeRemoteCommands(toJGroupsAddressVector(recipients), rpcCommand, toJGroupsMode(mode),
timeout, recipients != null, usePriorityQueue,
- toJGroupsFilter(responseFilter), supportReplay, asyncMarshalling);
+ toJGroupsFilter(responseFilter), supportReplay, asyncMarshalling, recipients == null || recipients.size() == members.size());
if (mode.isAsynchronous()) return Collections.emptyList();// async case
@@ -579,6 +579,7 @@
private Vector<org.jgroups.Address> toJGroupsAddressVector(Collection<Address> list) {
if (list == null) return null;
if (list.isEmpty()) return new Vector<org.jgroups.Address>();
+
// optimize for the single node case
Vector<org.jgroups.Address> retval = new Vector<org.jgroups.Address>(list.size());
for (Address a : list) retval.add(toJGroupsAddress(a));
More information about the infinispan-commits
mailing list