[infinispan-commits] Infinispan SVN: r1364 - trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Tue Jan 12 11:03:19 EST 2010
Author: manik.surtani at jboss.com
Date: 2010-01-12 11:03:19 -0500 (Tue, 12 Jan 2010)
New Revision: 1364
Modified:
trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java
Log:
Made this more extensible
Modified: trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java 2010-01-12 15:18:56 UTC (rev 1363)
+++ trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java 2010-01-12 16:03:19 UTC (rev 1364)
@@ -40,6 +40,7 @@
import org.infinispan.remoting.transport.Transport;
import org.infinispan.statetransfer.StateTransferException;
import org.infinispan.util.FileLookup;
+import org.infinispan.util.TypedProperties;
import org.infinispan.util.Util;
import org.infinispan.util.concurrent.TimeoutException;
import org.infinispan.util.logging.Log;
@@ -94,25 +95,25 @@
public static final String CONFIGURATION_XML = "configurationXml";
public static final String CONFIGURATION_FILE = "configurationFile";
public static final String CHANNEL_LOOKUP = "channelLookup";
- private static final String DEFAULT_JGROUPS_CONFIGURATION_FILE = "config-samples/jgroups-udp.xml";
- private boolean startChannel = true, stopChannel = true;
+ protected static final String DEFAULT_JGROUPS_CONFIGURATION_FILE = "config-samples/jgroups-udp.xml";
+ protected boolean startChannel = true, stopChannel = true;
- Channel channel;
- boolean createdChannel = false;
- Address address;
- Address physicalAddress;
- volatile List<Address> members = Collections.emptyList();
- volatile boolean coordinator = false;
- final Object membersListLock = new Object(); // guards members
+ protected Channel channel;
+ protected boolean createdChannel = false;
+ protected Address address;
+ protected Address physicalAddress;
+ protected volatile List<Address> members = Collections.emptyList();
+ protected volatile boolean coordinator = false;
+ protected final Object membersListLock = new Object(); // guards members
CommandAwareRpcDispatcher dispatcher;
static final Log log = LogFactory.getLog(JGroupsTransport.class);
static final boolean trace = log.isTraceEnabled();
- GlobalConfiguration c;
- Properties props;
- InboundInvocationHandler inboundInvocationHandler;
- Marshaller marshaller;
- ExecutorService asyncExecutor;
- CacheManagerNotifier notifier;
+ protected GlobalConfiguration c;
+ protected TypedProperties props;
+ protected InboundInvocationHandler inboundInvocationHandler;
+ protected Marshaller marshaller;
+ protected ExecutorService asyncExecutor;
+ protected CacheManagerNotifier notifier;
final ConcurrentMap<String, StateTransferMonitor> stateTransfersInProgress = new ConcurrentHashMap<String, StateTransferMonitor>();
private final JGroupsDistSync flushTracker = new JGroupsDistSync();
long distributedSyncTimeout;
@@ -145,13 +146,21 @@
}
public void start() {
- props = c.getTransportProperties();
+ props = TypedProperties.toTypedProperties(c.getTransportProperties());
distributedSyncTimeout = c.getDistributedSyncTimeout();
- log.info("Starting JGroups Channel");
+ if (log.isInfoEnabled()) log.info("Starting JGroups Channel");
initChannelAndRPCDispatcher();
+ startJGroupsChannelIfNeeded();
+ // ensure that the channel has FLUSH enabled.
+ // see ISPN-83 for details.
+ if ( channel.getProtocolStack()!= null && channel.getProtocolStack().findProtocol(FLUSH.class) == null)
+ throw new ConfigurationException("Flush should be enabled. This is related to https://jira.jboss.org/jira/browse/ISPN-83");
+ }
+
+ protected void startJGroupsChannelIfNeeded() {
if (startChannel) {
try {
channel.connect(c.getClusterName());
@@ -159,12 +168,8 @@
throw new CacheException("Unable to start JGroups Channel", e);
}
}
- log.info("Cache local address is {0}, physical addresses are {1}", getAddress(), getPhysicalAddresses());
-
- // ensure that the channel has FLUSH enabled.
- // see ISPN-83 for details.
- if ( channel.getProtocolStack()!= null && channel.getProtocolStack().findProtocol(FLUSH.class) == null)
- throw new ConfigurationException("Flush should be enabled. This is related to https://jira.jboss.org/jira/browse/ISPN-83");
+ address = new JGroupsAddress(channel.getAddress());
+ if (log.isInfoEnabled()) log.info("Cache local address is {0}, physical addresses are {1}", getAddress(), getPhysicalAddresses());
}
public int getViewId() {
@@ -193,7 +198,8 @@
dispatcher = null;
}
- private void initChannelAndRPCDispatcher() throws CacheException {
+
+ protected void initChannel() {
if (channel == null) {
createdChannel = true;
buildChannel();
@@ -209,7 +215,10 @@
}
channel.setOpt(Channel.LOCAL, false);
+ }
+ private void initChannelAndRPCDispatcher() throws CacheException {
+ initChannel();
dispatcher = new CommandAwareRpcDispatcher(channel, this,
asyncExecutor, inboundInvocationHandler, flushTracker, distributedSyncTimeout);
MarshallerAdapter adapter = new MarshallerAdapter(marshaller);
More information about the infinispan-commits
mailing list