[jboss-cvs] JBoss Messaging SVN: r3307 - in branches/Branch_Stable: src/main/org/jboss/jms/client/container and 7 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Nov 9 15:43:00 EST 2007
Author: timfox
Date: 2007-11-09 15:43:00 -0500 (Fri, 09 Nov 2007)
New Revision: 3307
Modified:
branches/Branch_Stable/src/etc/server/default/deploy/mssql-persistence-service.xml
branches/Branch_Stable/src/etc/server/default/deploy/mysql-persistence-service.xml
branches/Branch_Stable/src/etc/server/default/deploy/oracle-persistence-service.xml
branches/Branch_Stable/src/etc/server/default/deploy/postgresql-persistence-service.xml
branches/Branch_Stable/src/etc/server/default/deploy/sybase-persistence-service.xml
branches/Branch_Stable/src/main/org/jboss/jms/client/container/ProducerAspect.java
branches/Branch_Stable/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
branches/Branch_Stable/src/main/org/jboss/jms/server/messagecounter/MessageCounterManager.java
branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/JDBCSupport.java
branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/BrowserTest.java
branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/GroupManagementTest.java
branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-1146 http://jira.jboss.org/jira/browse/JBMESSAGING-1142 http://jira.jboss.org/jira/browse/JBMESSAGING-1130
Modified: branches/Branch_Stable/src/etc/server/default/deploy/mssql-persistence-service.xml
===================================================================
--- branches/Branch_Stable/src/etc/server/default/deploy/mssql-persistence-service.xml 2007-11-09 17:00:32 UTC (rev 3306)
+++ branches/Branch_Stable/src/etc/server/default/deploy/mssql-persistence-service.xml 2007-11-09 20:43:00 UTC (rev 3307)
@@ -132,11 +132,11 @@
<!-- Max time to wait for state to arrive when the post office joins the cluster -->
- <attribute name="StateTimeout">5000</attribute>
+ <attribute name="StateTimeout">30000</attribute>
<!-- Max time to wait for a synchronous call to node members using the MessageDispatcher -->
- <attribute name="CastTimeout">5000</attribute>
+ <attribute name="CastTimeout">30000</attribute>
<!-- JGroups stack configuration for the data channel - used for sending data across the cluster -->
Modified: branches/Branch_Stable/src/etc/server/default/deploy/mysql-persistence-service.xml
===================================================================
--- branches/Branch_Stable/src/etc/server/default/deploy/mysql-persistence-service.xml 2007-11-09 17:00:32 UTC (rev 3306)
+++ branches/Branch_Stable/src/etc/server/default/deploy/mysql-persistence-service.xml 2007-11-09 20:43:00 UTC (rev 3307)
@@ -132,11 +132,11 @@
<!-- Max time to wait for state to arrive when the post office joins the cluster -->
- <attribute name="StateTimeout">5000</attribute>
+ <attribute name="StateTimeout">300000</attribute>
<!-- Max time to wait for a synchronous call to node members using the MessageDispatcher -->
- <attribute name="CastTimeout">50000</attribute>
+ <attribute name="CastTimeout">300000</attribute>
<!-- JGroups stack configuration for the data channel - used for sending data across the cluster -->
Modified: branches/Branch_Stable/src/etc/server/default/deploy/oracle-persistence-service.xml
===================================================================
--- branches/Branch_Stable/src/etc/server/default/deploy/oracle-persistence-service.xml 2007-11-09 17:00:32 UTC (rev 3306)
+++ branches/Branch_Stable/src/etc/server/default/deploy/oracle-persistence-service.xml 2007-11-09 20:43:00 UTC (rev 3307)
@@ -136,11 +136,11 @@
<!-- Max time to wait for state to arrive when the post office joins the cluster -->
- <attribute name="StateTimeout">5000</attribute>
+ <attribute name="StateTimeout">30000</attribute>
<!-- Max time to wait for a synchronous call to node members using the MessageDispatcher -->
- <attribute name="CastTimeout">5000</attribute>
+ <attribute name="CastTimeout">30000</attribute>
<!-- JGroups stack configuration for the data channel - used for sending data across the cluster -->
Modified: branches/Branch_Stable/src/etc/server/default/deploy/postgresql-persistence-service.xml
===================================================================
--- branches/Branch_Stable/src/etc/server/default/deploy/postgresql-persistence-service.xml 2007-11-09 17:00:32 UTC (rev 3306)
+++ branches/Branch_Stable/src/etc/server/default/deploy/postgresql-persistence-service.xml 2007-11-09 20:43:00 UTC (rev 3307)
@@ -132,11 +132,11 @@
<!-- Max time to wait for state to arrive when the post office joins the cluster -->
- <attribute name="StateTimeout">5000</attribute>
+ <attribute name="StateTimeout">30000</attribute>
<!-- Max time to wait for a synchronous call to node members using the MessageDispatcher -->
- <attribute name="CastTimeout">5000</attribute>
+ <attribute name="CastTimeout">30000</attribute>
<!-- Max number of concurrent replications -->
Modified: branches/Branch_Stable/src/etc/server/default/deploy/sybase-persistence-service.xml
===================================================================
--- branches/Branch_Stable/src/etc/server/default/deploy/sybase-persistence-service.xml 2007-11-09 17:00:32 UTC (rev 3306)
+++ branches/Branch_Stable/src/etc/server/default/deploy/sybase-persistence-service.xml 2007-11-09 20:43:00 UTC (rev 3307)
@@ -137,11 +137,11 @@
<!-- Max time to wait for state to arrive when the post office joins the cluster -->
- <attribute name="StateTimeout">5000</attribute>
+ <attribute name="StateTimeout">30000</attribute>
<!-- Max time to wait for a synchronous call to node members using the MessageDispatcher -->
- <attribute name="CastTimeout">5000</attribute>
+ <attribute name="CastTimeout">30000</attribute>
<!-- JGroups stack configuration for the data channel - used for sending data across the cluster -->
Modified: branches/Branch_Stable/src/main/org/jboss/jms/client/container/ProducerAspect.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/client/container/ProducerAspect.java 2007-11-09 17:00:32 UTC (rev 3306)
+++ branches/Branch_Stable/src/main/org/jboss/jms/client/container/ProducerAspect.java 2007-11-09 20:43:00 UTC (rev 3307)
@@ -25,6 +25,7 @@
import javax.jms.Destination;
import javax.jms.MapMessage;
import javax.jms.Message;
+import javax.jms.MessageFormatException;
import javax.jms.ObjectMessage;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
@@ -109,6 +110,11 @@
priority = producerState.getPriority();
if (trace) { log.trace("Using producer's default priority: " + priority); }
}
+ if (priority < 0 || priority > 9)
+ {
+ throw new MessageFormatException("Invalid message priority (" + priority + "). " +
+ "Valid priorities are 0-9");
+ }
m.setJMSPriority(priority);
if (producerState.isDisableMessageTimestamp())
Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2007-11-09 17:00:32 UTC (rev 3306)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2007-11-09 20:43:00 UTC (rev 3307)
@@ -55,6 +55,10 @@
import org.jboss.messaging.util.Version;
import org.jboss.remoting.InvokerLocator;
+import EDU.oswego.cs.dl.util.concurrent.Executor;
+import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
+
/**
* @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
@@ -86,6 +90,8 @@
private Map delegates;
private Replicator replicator;
+
+ private QueuedExecutor notifyExecutor;
// Constructors ---------------------------------------------------------------------------------
@@ -93,7 +99,8 @@
{
this.serverPeer = serverPeer;
endpoints = new HashMap();
- delegates = new HashMap();
+ delegates = new HashMap();
+ notifyExecutor = new QueuedExecutor(new LinkedQueue());
}
// ConnectionFactoryManager implementation ------------------------------------------------------
@@ -291,115 +298,134 @@
public void stop() throws Exception
{
initialContext.close();
+
+ notifyExecutor.shutdownNow();
log.debug("stopped");
}
// ReplicationListener interface ----------------------------------------------------------------
- public synchronized void notify(ClusterNotification notification)
+ public void notify(final ClusterNotification notification)
{
log.debug(this + " received notification from node " + notification.nodeID );
- try
+ class NotifyRunner implements Runnable
{
- if (notification.type == ClusterNotification.TYPE_NODE_JOIN || notification.type == ClusterNotification.TYPE_NODE_LEAVE)
- {
- // We respond to changes in the node-address mapping. This will be replicated whan a
- // node joins / leaves the group. When this happens we need to rebind all connection factories with the new mapping.
+ public void run()
+ {
+ try
+ {
+ if (notification.type == ClusterNotification.TYPE_NODE_JOIN || notification.type == ClusterNotification.TYPE_NODE_LEAVE)
+ {
+ // We respond to changes in the node-address mapping. This will be replicated whan a
+ // node joins / leaves the group. When this happens we need to rebind all connection factories with the new mapping.
- Map failoverMap = serverPeer.getPostOfficeInstance().getFailoverMap();
+ Map failoverMap = serverPeer.getPostOfficeInstance().getFailoverMap();
- // Rebind
+ // Rebind
- for(Iterator i = endpoints.entrySet().iterator(); i.hasNext(); )
- {
- Map.Entry entry = (Map.Entry)i.next();
- String uniqueName = (String)entry.getKey();
+ for(Iterator i = endpoints.entrySet().iterator(); i.hasNext(); )
+ {
+ Map.Entry entry = (Map.Entry)i.next();
+ String uniqueName = (String)entry.getKey();
- Object del = delegates.get(uniqueName);
+ Object del = delegates.get(uniqueName);
- if (del == null)
- {
- throw new IllegalStateException(
- "Cannot find connection factory with name " + uniqueName);
+ if (del == null)
+ {
+ throw new IllegalStateException(
+ "Cannot find connection factory with name " + uniqueName);
+ }
+
+ if (del instanceof ClientClusteredConnectionFactoryDelegate)
+ {
+ ((ClientClusteredConnectionFactoryDelegate)del).setFailoverMap(failoverMap);
+ }
+ }
}
-
- if (del instanceof ClientClusteredConnectionFactoryDelegate)
+ else if ((notification.type == ClusterNotification.TYPE_REPLICATOR_PUT || notification.type == ClusterNotification.TYPE_REPLICATOR_REMOVE) &&
+ (notification.data instanceof String) && ((String)notification.data).startsWith(Replicator.CF_PREFIX))
{
- ((ClientClusteredConnectionFactoryDelegate)del).setFailoverMap(failoverMap);
- }
- }
- }
- else if ((notification.type == ClusterNotification.TYPE_REPLICATOR_PUT || notification.type == ClusterNotification.TYPE_REPLICATOR_REMOVE) &&
- (notification.data instanceof String) && ((String)notification.data).startsWith(Replicator.CF_PREFIX))
- {
- log.debug("Updating CF information for " + notification.data);
- // A connection factory has been deployed / undeployed
+ log.debug("Updating CF information for " + notification.data);
+ // A connection factory has been deployed / undeployed
- // NOTE! All connection factories MUST be deployed on all nodes!
- // Otherwise the server might failover onto a node which doesn't have that connection factory deployed
- // so the connection won't be able to recconnect.
+ // NOTE! All connection factories MUST be deployed on all nodes!
+ // Otherwise the server might failover onto a node which doesn't have that connection factory deployed
+ // so the connection won't be able to recconnect.
- String key = (String)notification.data;
+ String key = (String)notification.data;
- String uniqueName = key.substring(Replicator.CF_PREFIX.length());
+ String uniqueName = key.substring(Replicator.CF_PREFIX.length());
- log.debug(this + " received '" + uniqueName + "' connection factory deploy / undeploy");
+ log.debug(this + " received '" + uniqueName + "' connection factory deploy / undeploy");
- ConnectionFactoryDelegate cfd = (ConnectionFactoryDelegate)delegates.get(uniqueName);
+ ConnectionFactoryDelegate cfd = (ConnectionFactoryDelegate)delegates.get(uniqueName);
- if (cfd == null)
- {
- //This is ok - connection factory a might be deployed on node A before being deployed on node B so
- //node B might get the notification before it has deployed a itself
- }
- else
- {
- if (cfd instanceof ClientConnectionFactoryDelegate)
- {
- //Non clustered - ignore
+ if (cfd == null)
+ {
+ //This is ok - connection factory a might be deployed on node A before being deployed on node B so
+ //node B might get the notification before it has deployed a itself
+ }
+ else
+ {
+ if (cfd instanceof ClientConnectionFactoryDelegate)
+ {
+ //Non clustered - ignore
- //We still replicate non clustered connection factories since the ClusterPullConnectionFactory
- //is non clustered but needs to be available across the cluster
- }
- else
- {
- ClientClusteredConnectionFactoryDelegate del = (ClientClusteredConnectionFactoryDelegate)cfd;
+ //We still replicate non clustered connection factories since the ClusterPullConnectionFactory
+ //is non clustered but needs to be available across the cluster
+ }
+ else
+ {
+ ClientClusteredConnectionFactoryDelegate del = (ClientClusteredConnectionFactoryDelegate)cfd;
- Map updatedReplicantMap = replicator.get(key);
+ Map updatedReplicantMap = replicator.get(key);
- List newDels = sortDelegatesOnServerID(updatedReplicantMap.values());
+ List newDels = sortDelegatesOnServerID(updatedReplicantMap.values());
- ClientConnectionFactoryDelegate[] delArr =
- (ClientConnectionFactoryDelegate[])newDels.
- toArray(new ClientConnectionFactoryDelegate[newDels.size()]);
+ ClientConnectionFactoryDelegate[] delArr =
+ (ClientConnectionFactoryDelegate[])newDels.
+ toArray(new ClientConnectionFactoryDelegate[newDels.size()]);
- Map failoverMap = serverPeer.getPostOfficeInstance().getFailoverMap();
+ Map failoverMap = serverPeer.getPostOfficeInstance().getFailoverMap();
- del.setDelegates(delArr);
- del.setFailoverMap(failoverMap);
+ del.setDelegates(delArr);
+ del.setFailoverMap(failoverMap);
- ServerConnectionFactoryEndpoint endpoint =
- (ServerConnectionFactoryEndpoint)endpoints.get(uniqueName);
+ ServerConnectionFactoryEndpoint endpoint =
+ (ServerConnectionFactoryEndpoint)endpoints.get(uniqueName);
- if (endpoint == null)
- {
- throw new IllegalStateException("Cannot find endpoint with name " + uniqueName);
- }
+ if (endpoint == null)
+ {
+ throw new IllegalStateException("Cannot find endpoint with name " + uniqueName);
+ }
- rebindConnectionFactory(initialContext, endpoint.getJNDIBindings(), del);
+ rebindConnectionFactory(initialContext, endpoint.getJNDIBindings(), del);
- endpoint.updateClusteredClients(delArr, failoverMap);
+ endpoint.updateClusteredClients(delArr, failoverMap);
+ }
+ }
}
}
+ catch (Exception e)
+ {
+ log.error("Failed to rebind connection factory", e);
+ }
}
}
- catch (Exception e)
+
+ //Run on a different thread to prevent distributed deadlock when multiple nodes are starting together
+ //and deploying connection factories concurrently
+ try
{
- log.error("Failed to rebind connection factory", e);
+ notifyExecutor.execute(new NotifyRunner());
}
+ catch (InterruptedException e)
+ {
+ //Ignore
+ }
}
// Public ---------------------------------------------------------------------------------------
Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/messagecounter/MessageCounterManager.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/messagecounter/MessageCounterManager.java 2007-11-09 17:00:32 UTC (rev 3306)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/messagecounter/MessageCounterManager.java 2007-11-09 20:43:00 UTC (rev 3307)
@@ -66,9 +66,7 @@
public synchronized void start()
{
if (started)
- {
- log.warn(this + " already started");
-
+ {
return;
}
@@ -83,11 +81,9 @@
}
public synchronized void stop()
- {
+ {
if (!started)
{
- log.warn(this + " isn't started");
-
return;
}
Modified: branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java 2007-11-09 17:00:32 UTC (rev 3306)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java 2007-11-09 20:43:00 UTC (rev 3307)
@@ -378,8 +378,6 @@
if (!rs.next())
{
rs.close();
- rs = null;
-
ps.close();
// There is a very small possibility that two threads will
@@ -405,28 +403,31 @@
return 0L;
}
-
- long nextId = rs.getLong(1);
-
- ps.close();
-
- String updateCounterSQL = getSQLStatement("UPDATE_COUNTER");
-
- ps = conn.prepareStatement(updateCounterSQL);
-
- ps.setLong(1, nextId + size);
- ps.setString(2, counterName);
-
- int rows = ps.executeUpdate();
-
- if (trace)
+ else
{
- log.trace(JDBCUtil.statementToString(updateCounterSQL,
- new Long(nextId + size), counterName)
- + " updated " + rows + " rows");
+ long nextId = rs.getLong(1);
+
+ rs.close();
+ ps.close();
+
+ String updateCounterSQL = getSQLStatement("UPDATE_COUNTER");
+
+ ps = conn.prepareStatement(updateCounterSQL);
+
+ ps.setLong(1, nextId + size);
+ ps.setString(2, counterName);
+
+ int rows = ps.executeUpdate();
+
+ if (trace)
+ {
+ log.trace(JDBCUtil.statementToString(updateCounterSQL,
+ new Long(nextId + size), counterName)
+ + " updated " + rows + " rows");
+ }
+
+ return nextId;
}
-
- return nextId;
}
finally
{
@@ -549,10 +550,7 @@
}
rs.close();
- rs = null;
-
ps.close();
- ps = null;
}
}
@@ -1236,8 +1234,6 @@
ps.close();
- ps = null;
-
// Now swap the channel id
ps = conn.prepareStatement(getSQLStatement("UPDATE_CHANNEL_ID"));
@@ -1675,7 +1671,6 @@
}
ps.close();
- ps = null;
ps = conn
.prepareStatement(getSQLStatement("COMMIT_MESSAGE_REF2"));
@@ -1857,8 +1852,7 @@
}
ps.close();
- ps = null;
-
+
ps = conn
.prepareStatement(getSQLStatement("ROLLBACK_MESSAGE_REF2"));
ps.setLong(1, tx.getId());
Modified: branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/JDBCSupport.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/JDBCSupport.java 2007-11-09 17:00:32 UTC (rev 3306)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/JDBCSupport.java 2007-11-09 20:43:00 UTC (rev 3307)
@@ -38,6 +38,7 @@
import org.jboss.logging.Logger;
import org.jboss.messaging.core.contract.MessagingComponent;
+import org.jboss.util.NestedSQLException;
/**
* Common functionality for messaging components that need to access a database.
@@ -439,17 +440,28 @@
private static final int MAX_TRIES = 25;
protected Connection conn;
+
+ private boolean getConnectionFailed;
public T execute() throws Exception
- {
+ {
Transaction tx = tm.suspend();
try
{
- conn = ds.getConnection();
+ try
+ {
+ conn = ds.getConnection();
- conn.setAutoCommit(false);
-
+ conn.setAutoCommit(false);
+ }
+ catch (Exception e)
+ {
+ getConnectionFailed = true;
+
+ throw e;
+ }
+
T res = doTransaction();
conn.commit();
@@ -498,13 +510,19 @@
}
catch (SQLException e)
{
+ if (getConnectionFailed)
+ {
+ //Do not retry - just throw the exception up
+ throw e;
+ }
+
log.warn("SQLException caught, SQLState " + e.getSQLState() + " code:" + e.getErrorCode() + "- assuming deadlock detected, try:" + (tries + 1), e);
tries++;
if (tries == MAX_TRIES)
{
log.error("Retried " + tries + " times, now giving up");
- throw new IllegalStateException("Failed to excecute transaction");
+ throw new IllegalStateException("Failed to execute transaction");
}
log.warn("Trying again after a pause");
//Now we wait for a random amount of time to minimise risk of deadlock
Modified: branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java 2007-11-09 17:00:32 UTC (rev 3306)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java 2007-11-09 20:43:00 UTC (rev 3307)
@@ -29,6 +29,9 @@
import java.util.Iterator;
import java.util.List;
import java.util.Vector;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.contract.ChannelFactory;
@@ -83,18 +86,12 @@
private Object waitLock = new Object();
- private static final int STOPPED = 1;
-
- private static final int WAITING_FOR_FIRST_VIEW = 2;
+ private AtomicBoolean ready = new AtomicBoolean(false);
- private static final int WAITING_FOR_STATE = 3;
+ private CountDownLatch latch;
- private static final int STARTED = 4;
+ private volatile boolean starting;
- private volatile int startedState;
-
- private volatile Thread viewThread;
-
//We need to process view changes on a different thread, since if we have more than one node running
//in the same VM then the thread that sends the leave message ends up executing the view change on the other node
//We probably don't need this if all nodes are in different VMs
@@ -122,8 +119,6 @@
this.dataChannel = jChannelFactory.createDataChannel();
- this.startedState = STOPPED;
-
// We don't want to receive local messages on any of the channels
controlChannel.setOpt(Channel.LOCAL, Boolean.FALSE);
@@ -141,8 +136,8 @@
dataChannel.setReceiver(dataReceiver);
- this.startedState = WAITING_FOR_FIRST_VIEW;
-
+ starting = true;
+
controlChannel.connect(groupName);
//The first thing that happens after connect is a view change arrives
@@ -150,30 +145,31 @@
//Then the control messages will start arriving.
//We can guarantee that messages won't arrive until after the state is set because we use
//the FLUSH protocol on the control channel
+
+ boolean first = !(controlChannel.getState(null, stateTimeout));
- //First wait for view
- waitForStateChange(WAITING_FOR_STATE);
-
- log.debug("First view arrived");
-
- //Now wait for state if we are not the first member
-
- if (controlChannel.getState(null, stateTimeout))
+ if (first)
+ {
+ //First member of the group
+
+ //Can now start accepting messages
+
+ ready.set(true);
+
+ latch.countDown();
+
+ starting = false;
+
+ log.debug("We are the first member of the group so no need to wait for state");
+ }
+ else
{
//We are not the first member of the group, so let's wait for state to be got and processed
- waitForStateChange(STARTED);
+ waitForState();
log.debug("State arrived");
- }
- else
- {
- //We are the first member, no need to wait
-
- startedState = STARTED;
-
- log.debug("We are the first member of the group so no need to wait for state");
- }
+ }
//Now connect the data channel.
@@ -182,11 +178,8 @@
public void stop() throws Exception
{
- if (startedState == STOPPED)
- {
- throw new IllegalStateException("Is already stopped");
- }
-
+ ready.set(false);
+
try
{
dataChannel.close();
@@ -211,7 +204,7 @@
currentView = null;
- // Workaround for JGroups
+ // FIXME - Workaround for JGroups FLUSH protocol - it needs time
Thread.sleep(1000);
}
@@ -237,7 +230,7 @@
public void multicastControl(ClusterRequest request, boolean sync) throws Exception
{
- if (startedState == STARTED)
+ if (ready.get())
{
if (trace) { log.trace(this + " multicasting " + request + " to control channel, sync=" + sync); }
@@ -265,7 +258,7 @@
public void unicastControl(ClusterRequest request, Address address, boolean sync) throws Exception
{
- if (startedState == STARTED)
+ if (ready.get())
{
if (trace) { log.trace(this + " multicasting " + request + " to control channel, sync=" + sync); }
@@ -296,7 +289,7 @@
public void multicastData(ClusterRequest request) throws Exception
{
- if (startedState == STARTED)
+ if (ready.get())
{
if (trace) { log.trace(this + " multicasting " + request + " to data channel"); }
@@ -308,7 +301,7 @@
public void unicastData(ClusterRequest request, Address address) throws Exception
{
- if (startedState == STARTED)
+ if (ready.get())
{
if (trace) { log.trace(this + " unicasting " + request + " to address " + address); }
@@ -318,40 +311,19 @@
}
}
-
- public boolean getState() throws Exception
+ private void waitForState() throws Exception
{
- boolean retrievedState = false;
-
- if (controlChannel.getState(null, stateTimeout))
- {
- //We are not the first member of the group, so let's wait for state to be got and processed
-
- waitForStateChange(STARTED);
-
- retrievedState = true;
- }
- else
- {
- this.startedState = STARTED;
- }
-
- return retrievedState;
- }
-
- private void waitForStateChange(int newState) throws Exception
- {
synchronized (waitLock)
{
long timeRemaining = stateTimeout;
long start = System.currentTimeMillis();
- while (startedState != newState && timeRemaining > 0)
+ while (!ready.get() && timeRemaining > 0)
{
waitLock.wait(stateTimeout);
- if (startedState != newState)
+ if (!ready.get())
{
long waited = System.currentTimeMillis() - start;
@@ -359,7 +331,7 @@
}
}
- if (startedState != newState)
+ if (!ready.get())
{
throw new IllegalStateException("Timed out waiting for state to change");
}
@@ -403,9 +375,9 @@
{
try
{
- if (startedState != STARTED)
+ if (!ready.get())
{
- throw new IllegalStateException("Received control message but group member is not started: " + startedState);
+ throw new IllegalStateException("Received control message but group member is not ready");
}
if (trace) { log.trace(this + ".ControlMessageListener got state"); }
@@ -430,11 +402,6 @@
{
synchronized (waitLock)
{
- if (startedState != WAITING_FOR_STATE)
- {
- throw new IllegalStateException("Received state but started state is " + startedState);
- }
-
try
{
groupListener.setState(bytes);
@@ -444,7 +411,7 @@
log.error("Failed to set state", e);
}
- startedState = STARTED;
+ ready.set(true);
waitLock.notify();
}
@@ -458,7 +425,26 @@
{
public void block()
{
- // NOOP
+ /*
+ Note - we must wait on this latch to prevent the following unlikely but possible race condition:
+ Node 1 starts (first member)
+ Node1 calls getState - this returns false
+ Node2 starts, getstate and sends message to group
+ Node 1 receives message and discards it - but it should have kept it
+ Node 1 sets ready = true
+ The latch blocks any other nodes being able to send messages before it is released
+ */
+ try
+ {
+ if (latch != null && !latch.await(stateTimeout, TimeUnit.MILLISECONDS))
+ {
+ log.warn("Timed out waiting for latch to be released");
+ }
+ }
+ catch (InterruptedException e)
+ {
+ log.warn("Thread interrupted");
+ }
}
public void suspect(Address address)
@@ -469,103 +455,69 @@
public void viewAccepted(final View newView)
{
log.debug(this + " got new view " + newView + ", old view is " + currentView);
-
- if (currentView == null)
- {
- //The first view is arriving
-
- if (startedState != WAITING_FOR_FIRST_VIEW)
- {
- throw new IllegalStateException("Got first view but started state is " + startedState);
- }
- }
- else
- {
- if (startedState != STARTED)
- {
- return;
- }
- }
-
- class ViewChangeRunnable implements Runnable
- {
- public void run()
- {
- // JGroups will make sure this method is never called by more than one thread concurrently
-
- View oldView = currentView;
-
- currentView = newView;
-
- try
- {
- // Act on membership change, on both cases when an old member left or a new member joined
-
- if (oldView != null)
- {
- List leftNodes = new ArrayList();
- for (Iterator i = oldView.getMembers().iterator(); i.hasNext(); )
- {
- Address address = (Address)i.next();
- if (!newView.containsMember(address))
- {
- leftNodes.add(address);
- }
- }
- if (!leftNodes.isEmpty())
- {
- groupListener.nodesLeft(leftNodes);
- }
- }
-
- for (Iterator i = newView.getMembers().iterator(); i.hasNext(); )
- {
- Address address = (Address)i.next();
- if (oldView == null || !oldView.containsMember(address))
- {
- groupListener.nodeJoined(address);
- }
- }
- }
- catch (Throwable e)
- {
- log.error("Caught Exception in MembershipListener", e);
- IllegalStateException e2 = new IllegalStateException(e.getMessage());
- e2.setStackTrace(e.getStackTrace());
- throw e2;
- }
-
- if (startedState == WAITING_FOR_FIRST_VIEW)
- {
- synchronized (waitLock)
- {
- startedState = WAITING_FOR_STATE;
-
- waitLock.notify();
- }
- }
- }
- }
-
- //Needs to be executed on different thread to avoid deadlock when running invm
- viewThread = new Thread(new ViewChangeRunnable());
-
- viewThread.start();
+
+ // JGroups will make sure this method is never called by more than one thread concurrently
+
+ View oldView = currentView;
+
+ currentView = newView;
+
+ //If the first view shows we are the co-ordinator i.e. first node then we can create a latch
+ //But only the first time and we don't want to do this after ready had been set to true
+ //Otherwise it will never get released!
+ if (newView.size() == 1 && starting &&
+ newView.getMembers().get(0).equals(controlChannel.getLocalAddress()) &&
+ !ready.get())
+ {
+ latch = new CountDownLatch(1);
+ }
+
+ try
+ {
+ // Act on membership change, on both cases when an old member left or a new member joined
+
+ if (oldView != null)
+ {
+ List leftNodes = new ArrayList();
+ for (Iterator i = oldView.getMembers().iterator(); i.hasNext(); )
+ {
+ Address address = (Address)i.next();
+ if (!newView.containsMember(address))
+ {
+ leftNodes.add(address);
+ }
+ }
+ if (!leftNodes.isEmpty())
+ {
+ groupListener.nodesLeft(leftNodes);
+ }
+ }
+
+ for (Iterator i = newView.getMembers().iterator(); i.hasNext(); )
+ {
+ Address address = (Address)i.next();
+ if (oldView == null || !oldView.containsMember(address))
+ {
+ groupListener.nodeJoined(address);
+ }
+ }
+ }
+ catch (Throwable e)
+ {
+ log.error("Caught Exception in MembershipListener", e);
+ IllegalStateException e2 = new IllegalStateException(e.getMessage());
+ e2.setStackTrace(e.getStackTrace());
+ throw e2;
+ }
}
-
public byte[] getState()
{
// NOOP
return null;
}
}
-
-
-
-
-
-
+
/*
* This class is used to listen for messages on the async channel
*/
@@ -598,9 +550,10 @@
try
{
- if (startedState != STARTED)
+ if (!ready.get())
{
- throw new IllegalStateException("Received data message but member is not started " + startedState);
+ //Ignore
+ return;
}
byte[] bytes = message.getBuffer();
@@ -624,7 +577,6 @@
}
}
-
/*
* This class is used to handle control channel requests
*/
@@ -636,9 +588,11 @@
try
{
- if (startedState != STARTED)
+ if (!ready.get())
{
- throw new IllegalStateException("Received control message but member is not started " + startedState);
+ //Ignore - it's valid that messages might arrive before state is got - in this case it is safe to ignore
+ //those messages
+ return null;
}
byte[] bytes = message.getBuffer();
@@ -655,7 +609,5 @@
throw e2;
}
}
- }
-
-
+ }
}
Modified: branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2007-11-09 17:00:32 UTC (rev 3306)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2007-11-09 20:43:00 UTC (rev 3307)
@@ -1430,7 +1430,7 @@
{
Map m = (Map)replicatedData.get(key);
- return m == null ? Collections.EMPTY_MAP : Collections.unmodifiableMap(m);
+ return m == null ? Collections.EMPTY_MAP : new HashMap(m);
}
}
@@ -1973,20 +1973,31 @@
failoverMap.put(theNodeID, fnodeID);
}
- int fid = ((Integer)failoverMap.get(new Integer(thisNodeID))).intValue();
+ Integer i = (Integer)failoverMap.get(new Integer(thisNodeID));
- //if we are the first node in the cluster we don't want to be our own failover node!
-
- if (fid == thisNodeID)
- {
- firstNode = true;
- failoverNodeID = -1;
+ if (i != null)
+ {
+ int fid = i.intValue();
+
+ //if we are the first node in the cluster we don't want to be our own failover node!
+
+ if (fid == thisNodeID)
+ {
+ firstNode = true;
+ failoverNodeID = -1;
+ }
+ else
+ {
+ failoverNodeID = fid;
+ firstNode = false;
+ }
}
else
{
- failoverNodeID = fid;
- firstNode = false;
- }
+ //This can occur if this node joins the group, then another node joins in quick succession before this node
+ //has had time to add its nodeid-address mapping.
+ //This is ok - it wil be shortly followed by another calculation of the map
+ }
log.debug("Updated failover map:\n" + dumpFailoverMap(failoverMap));
}
@@ -2518,6 +2529,8 @@
{
Iterator iter = loadedBindings.values().iterator();
+ log.trace("Loading bindings");
+
while (iter.hasNext())
{
Binding binding = (Binding)iter.next();
@@ -2540,6 +2553,7 @@
ClusterRequest request = new BindRequest(info, binding.allNodes);
+ log.trace("Multicasting bind all");
groupMember.multicastControl(request, false);
}
Modified: branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/BrowserTest.java
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/BrowserTest.java 2007-11-09 17:00:32 UTC (rev 3306)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/BrowserTest.java 2007-11-09 20:43:00 UTC (rev 3307)
@@ -24,14 +24,15 @@
import java.util.Enumeration;
import javax.jms.Connection;
+import javax.jms.DeliveryMode;
import javax.jms.InvalidDestinationException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
+import javax.jms.MessageFormatException;
import javax.jms.MessageProducer;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
-import javax.jms.DeliveryMode;
import org.jboss.jms.destination.JBossQueue;
@@ -292,65 +293,7 @@
removeAllMessages(queue1.getQueueName(), true, 0);
}
- }
-
-
- // Testcase for http://jira.jboss.org/jira/browse/JBMESSAGING-1144
- public void testMessageOrderOnBrowser() throws Exception
- {
-
- Connection conn = cf.createConnection();
- try
- {
- conn.start();
-
- Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer sender = session.createProducer(queue1);
-
- TextMessage message = session.createTextMessage();
- message.setText("Normal message");
- sender.send(message, DeliveryMode.NON_PERSISTENT, 4, 0);
- //sender.send(queue, message, DeliveryMode.NON_PERSISTENT, 4, 0);
- message.setText("Persistent message");
- sender.send(message, DeliveryMode.PERSISTENT, 4, 0);
- //sender.send(queue, message, DeliveryMode.PERSISTENT, 4, 0);
- message.setText("High Priority Persistent message");
- sender.send(message, DeliveryMode.PERSISTENT, 10, 0);
- //sender.send(queue, message, DeliveryMode.PERSISTENT, 10, 0);
-
- //message.setText("Expiring Persistent message");
- //sender.send(queue, message, DeliveryMode.NON_PERSISTENT, 4, 1);
-
- QueueBrowser browser = session.createBrowser(queue1);
- Enumeration i = browser.getEnumeration();
- //message = (TextMessage)enum.nextElement();
- //if( !message.getText().equals("High Priority Persistent message") )
- // throw new Exception("Queue is not prioritizing messages correctly. Unexpected Message:"+message);
- log.info(message.getText());
-
- message = (TextMessage) i.nextElement();
- //if( !message.getText().equals("Normal message") )
- // throw new Exception("Queue is not ordering messages correctly. Unexpected Message:"+message);
- log.info(message.getText());
-
- message = (TextMessage) i.nextElement();
- //if( !message.getText().equals("Persistent message") )
- // throw new Exception("Queue is not ordering messages correctly. Unexpected Message:"+message);
- log.info(message.getText());
-
- // if( enum.hasMoreElements() )
- // throw new Exception("Queue does not expire messages correctly. Unexpected Message:"+enum.nextElement());
- }
- finally
- {
- if (conn!=null)
- {
- conn.close();
- }
- }
-
- }
-
+ }
// Package protected ----------------------------------------------------------------------------
Modified: branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/GroupManagementTest.java
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/GroupManagementTest.java 2007-11-09 17:00:32 UTC (rev 3306)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/GroupManagementTest.java 2007-11-09 20:43:00 UTC (rev 3307)
@@ -40,7 +40,7 @@
{
try
{
- ServerManagement.start(0, "all");
+ ServerManagement.start(0, "all", false);
Set view = ServerManagement.getServer(0).getNodeIDView();
assertEquals(1, view.size());
@@ -59,13 +59,13 @@
try
{
- ServerManagement.start(0, "all");
+ ServerManagement.start(0, "all", false);
log.info("Started server 0");
ServerManagement.addNotificationListener(0, postOfficeObjectName, listener);
- ServerManagement.start(1, "all");
+ ServerManagement.start(1, "all", false);
log.info("Blocking to receive notification ...");
@@ -91,13 +91,13 @@
{
try
{
- ServerManagement.start(0, "all");
+ ServerManagement.start(0, "all", false);
Set view = ServerManagement.getServer(0).getNodeIDView();
assertEquals(1, view.size());
assertTrue(view.contains(new Integer(0)));
- ServerManagement.start(1, "all");
+ ServerManagement.start(1, "all", false);
view = ServerManagement.getServer(0).getNodeIDView();
assertEquals(2, view.size());
@@ -121,13 +121,13 @@
{
try
{
- ServerManagement.start(0, "all");
+ ServerManagement.start(0, "all", false);
Set view = ServerManagement.getServer(0).getNodeIDView();
assertEquals(1, view.size());
assertTrue(view.contains(new Integer(0)));
- ServerManagement.start(1, "all");
+ ServerManagement.start(1, "all", false);
view = ServerManagement.getServer(0).getNodeIDView();
assertEquals(2, view.size());
@@ -139,7 +139,7 @@
assertTrue(view.contains(new Integer(0)));
assertTrue(view.contains(new Integer(1)));
- ServerManagement.start(3, "all");
+ ServerManagement.start(3, "all", false);
view = ServerManagement.getServer(0).getNodeIDView();
assertEquals(3, view.size());
@@ -172,9 +172,9 @@
{
try
{
- ServerManagement.start(0, "all");
- ServerManagement.start(1, "all");
- ServerManagement.start(2, "all");
+ ServerManagement.start(0, "all", false);
+ ServerManagement.start(1, "all", false);
+ ServerManagement.start(2, "all", false);
Set view = ServerManagement.getServer(0).getNodeIDView();
assertEquals(3, view.size());
@@ -201,7 +201,7 @@
// Reuse the "hollow" RMI server 0 to start another cluster node
- ServerManagement.start(0, "all");
+ ServerManagement.start(0, "all", false);
view = ServerManagement.getServer(0).getNodeIDView();
assertEquals(2, view.size());
@@ -211,7 +211,7 @@
// Reuse the "hollow" RMI server 2 to start another cluster node
- ServerManagement.start(2, "all");
+ ServerManagement.start(2, "all", false);
view = ServerManagement.getServer(2).getNodeIDView();
assertEquals(3, view.size());
@@ -237,8 +237,8 @@
{
// Start with a 2 node cluster
- ServerManagement.start(0, "all");
- ServerManagement.start(1, "all");
+ ServerManagement.start(0, "all", false);
+ ServerManagement.start(1, "all", false);
Set view = ServerManagement.getServer(0).getNodeIDView();
assertEquals(2, view.size());
@@ -283,9 +283,9 @@
{
// Start with a 3 node cluster
- ServerManagement.start(0, "all");
- ServerManagement.start(1, "all");
- ServerManagement.start(2, "all");
+ ServerManagement.start(0, "all", false);
+ ServerManagement.start(1, "all", false);
+ ServerManagement.start(2, "all", false);
Set view = ServerManagement.getServer(0).getNodeIDView();
assertEquals(3, view.size());
@@ -358,7 +358,7 @@
{
// Start with a 1 node cluster
- ServerManagement.start(0, "all");
+ ServerManagement.start(0, "all", false);
Set view = ServerManagement.getServer(0).getNodeIDView();
assertEquals(1, view.size());
@@ -367,7 +367,7 @@
ServerManagement.addNotificationListener(0, postOfficeObjectName, clusterEvent);
// start the ninth node, as there is no chance to be started by scripts
- ServerManagement.start(9, "all");
+ ServerManagement.start(9, "all", false);
if (!clusterEvent.viewChanged(30000))
{
@@ -387,7 +387,84 @@
ServerManagement.kill(9);
}
}
+
+ public void testStartServersSimultaneously() throws Exception
+ {
+ final int numServers = 5;
+
+ try
+ {
+ class ServerStarter extends Thread
+ {
+ int nodeID;
+ boolean failed;
+ ServerStarter(int nodeID)
+ {
+ this.nodeID = nodeID;
+ }
+
+ public void run()
+ {
+ try
+ {
+ log.info("Starting " + nodeID);
+ ServerManagement.start(nodeID, "all", false);
+
+ ServerManagement.deployQueue("testDistributedQueue1", nodeID);
+ ServerManagement.deployTopic("testDistributedTopic1", nodeID);
+
+ ServerManagement.deployQueue("testDistributedQueue2", nodeID);
+ ServerManagement.deployTopic("testDistributedTopic2", nodeID);
+
+ ServerManagement.deployQueue("testDistributedQueue3", nodeID);
+ ServerManagement.deployTopic("testDistributedTopic3", nodeID);
+ log.info("Done start");
+ }
+ catch (Throwable t)
+ {
+ log.error("Failed to start server", t);
+ failed = true;
+ }
+ }
+ }
+
+ ServerStarter[] starters = new ServerStarter[numServers];
+ for (int i = 0; i < 5; i++)
+ {
+ starters[i] = new ServerStarter(i);
+ starters[i].start();
+ }
+
+ boolean failed = false;
+ for (int i = 0; i < 5; i++)
+ {
+ starters[i].join();
+ if (starters[i].failed)
+ {
+ failed = true;
+ }
+ }
+
+ assertFalse(failed);
+ Set view = ServerManagement.getServer(0).getNodeIDView();
+ assertEquals(numServers, view.size());
+ }
+ finally
+ {
+ for (int i = numServers - 1; i >=0; i--)
+ {
+ try
+ {
+ ServerManagement.stop(i);
+ }
+ catch (Exception ignore)
+ {
+ }
+ }
+ }
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/ServerManagement.java 2007-11-09 17:00:32 UTC (rev 3306)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/ServerManagement.java 2007-11-09 20:43:00 UTC (rev 3307)
@@ -192,10 +192,10 @@
* When this method correctly completes, the server (local or remote) is started and fully
* operational (the service container and the server peer are created and started).
*/
- public static synchronized void start(int i, String config,
- ServiceAttributeOverrides attrOverrides,
- boolean clearDatabase,
- boolean startMessagingServer) throws Exception
+ public static void start(int i, String config,
+ ServiceAttributeOverrides attrOverrides,
+ boolean clearDatabase,
+ boolean startMessagingServer) throws Exception
{
log.info("Attempting to start server " + i);
@@ -219,7 +219,7 @@
}
}
- public static synchronized void stop() throws Exception
+ public static void stop() throws Exception
{
stop(0);
}
@@ -230,7 +230,7 @@
* @return true if the server was effectively stopped, or false if the server was alreayd stopped
* when the method was invoked.
*/
- public static synchronized boolean stop(int i) throws Exception
+ public static boolean stop(int i) throws Exception
{
if (servers[i] == null)
{
More information about the jboss-cvs-commits
mailing list