[jboss-cvs] JBoss Messaging SVN: r3052 - in trunk: src/main/org/jboss/jms/server and 4 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Aug 24 08:35:34 EDT 2007
Author: timfox
Date: 2007-08-24 08:35:34 -0400 (Fri, 24 Aug 2007)
New Revision: 3052
Modified:
trunk/src/etc/server/default/deploy/mysql-persistence-service.xml
trunk/src/main/org/jboss/jms/server/ServerPeer.java
trunk/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
trunk/tests/build.xml
trunk/tests/src/org/jboss/test/messaging/core/PostOfficeTestBase.java
trunk/tests/src/org/jboss/test/messaging/core/postoffice/ClusteredPostOfficeTest.java
Log:
Some logging changes and small fixes
Modified: trunk/src/etc/server/default/deploy/mysql-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/mysql-persistence-service.xml 2007-08-24 06:54:33 UTC (rev 3051)
+++ trunk/src/etc/server/default/deploy/mysql-persistence-service.xml 2007-08-24 12:35:34 UTC (rev 3052)
@@ -144,8 +144,8 @@
<attribute name="ChannelPartitionName">${jboss.partition.name:DefaultPartition}-JMS</attribute>
-->
- <!-- JGroups stack configuration for the data channel - used for sending data across the cluster -->
-
+ <!-- JGroups stack configuration for the data channel - used for sending data across the cluster -->
+
<attribute name="DataChannelConfig">
<config>
<UDP
@@ -189,12 +189,13 @@
view_ack_collection_timeout="5000"/>
<FC max_credits="2000000" down_thread="false" up_thread="false"
min_threshold="0.10"/>
- <FRAG2 frag_size="60000" down_thread="false" up_thread="false"/>
+ <FRAG2 frag_size="60000" down_thread="false" up_thread="false"/>
+
</config>
</attribute>
<!-- JGroups stack configuration to use for the control channel - used for control messages -->
-
+
<attribute name="ControlChannelConfig">
<config>
<UDP
@@ -239,7 +240,8 @@
<pbcast.STATE_TRANSFER down_thread="false" up_thread="false" use_flush="true" flush_timeout="3000"/>
<pbcast.FLUSH down_thread="false" up_thread="false" timeout="20000" auto_flush_conf="false"/>
</config>
- </attribute>
+ </attribute>
+
</mbean>
<!-- Messaging JMS User Manager MBean config
Modified: trunk/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ServerPeer.java 2007-08-24 06:54:33 UTC (rev 3051)
+++ trunk/src/main/org/jboss/jms/server/ServerPeer.java 2007-08-24 12:35:34 UTC (rev 3052)
@@ -1509,16 +1509,10 @@
//Unbind the destination's queues
- log.info("Destroying destination " + name);
-
- log.info("Got queues " + queues.size());
-
while (iter.hasNext())
{
Queue queue = (Queue)iter.next();
- log.info("Queue is " + queue);
-
queue.removeAllReferences();
//Durable subs need to be removed on all nodes
Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java 2007-08-24 06:54:33 UTC (rev 3051)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java 2007-08-24 12:35:34 UTC (rev 3052)
@@ -44,6 +44,10 @@
import org.jgroups.util.Rsp;
import org.jgroups.util.RspList;
+import EDU.oswego.cs.dl.util.concurrent.Executor;
+import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
+
/**
*
* This class handles the interface with JGroups
@@ -92,6 +96,12 @@
private volatile int startedState;
+ private volatile Thread viewThread;
+
+ //We need to process view changes on a different thread, since if we have more than one node running
+ //in the same VM then the thread that sends the leave message ends up executing the view change on the other node
+ //We probably don't need this if all nodes are in different VMs
+
public GroupMember(String groupName, long stateTimeout, long castTimeout,
JChannelFactory jChannelFactory, RequestTarget requestTarget,
GroupListener groupListener)
@@ -116,7 +126,7 @@
this.dataChannel = jChannelFactory.createDataChannel();
this.startedState = STOPPED;
-
+
// We don't want to receive local messages on any of the channels
controlChannel.setOpt(Channel.LOCAL, Boolean.FALSE);
@@ -172,9 +182,14 @@
dataChannel.connect(groupName);
}
-
+
public void stop() throws Exception
{
+ if (startedState == STOPPED)
+ {
+ throw new IllegalStateException("Is already stopped");
+ }
+
try
{
dataChannel.close();
@@ -195,11 +210,9 @@
controlChannel = null;
- dataChannel = null;
+ dataChannel = null;
- currentView = null;
-
- startedState = STOPPED;
+ currentView = null;
}
public Address getSyncAddress()
@@ -317,7 +330,7 @@
if (startedState != newState)
{
- throw new IllegalStateException("Timed out waiting for state to arrive");
+ throw new IllegalStateException("Timed out waiting for state to change");
}
}
}
@@ -406,7 +419,7 @@
}
}
}
-
+
/*
* We use this class so we notice when members leave the group
*/
@@ -422,10 +435,10 @@
// NOOP
}
- public void viewAccepted(View newView)
- {
+ public void viewAccepted(final View newView)
+ {
log.debug(this + " got new view " + newView + ", old view is " + currentView);
-
+
if (currentView == null)
{
//The first view is arriving
@@ -435,61 +448,80 @@
throw new IllegalStateException("Got first view but started state is " + startedState);
}
}
-
- // JGroups will make sure this method is never called by more than one thread concurrently
-
- View oldView = currentView;
-
- currentView = newView;
-
- try
- {
- // Act on membership change, on both cases when an old member left or a new member joined
-
- if (oldView != null)
- {
- List leftNodes = new ArrayList();
- for (Iterator i = oldView.getMembers().iterator(); i.hasNext(); )
- {
- Address address = (Address)i.next();
- if (!newView.containsMember(address))
- {
- leftNodes.add(address);
- }
- }
- if (!leftNodes.isEmpty())
- {
- groupListener.nodesLeft(leftNodes);
- }
- }
-
- for (Iterator i = newView.getMembers().iterator(); i.hasNext(); )
- {
- Address address = (Address)i.next();
- if (oldView == null || !oldView.containsMember(address))
- {
- groupListener.nodeJoined(address);
- }
- }
- }
- catch (Throwable e)
- {
- log.error("Caught Exception in MembershipListener", e);
- IllegalStateException e2 = new IllegalStateException(e.getMessage());
- e2.setStackTrace(e.getStackTrace());
- throw e2;
- }
-
- if (startedState == WAITING_FOR_FIRST_VIEW)
- {
- synchronized (waitLock)
- {
- startedState = WAITING_FOR_STATE;
-
- waitLock.notify();
- }
- }
+ else
+ {
+ if (startedState != STARTED)
+ {
+ return;
+ }
+ }
+
+ class ViewChangeRunnable implements Runnable
+ {
+ public void run()
+ {
+ // JGroups will make sure this method is never called by more than one thread concurrently
+
+ View oldView = currentView;
+
+ currentView = newView;
+
+ try
+ {
+ // Act on membership change, on both cases when an old member left or a new member joined
+
+ if (oldView != null)
+ {
+ List leftNodes = new ArrayList();
+ for (Iterator i = oldView.getMembers().iterator(); i.hasNext(); )
+ {
+ Address address = (Address)i.next();
+ if (!newView.containsMember(address))
+ {
+ leftNodes.add(address);
+ }
+ }
+ if (!leftNodes.isEmpty())
+ {
+ groupListener.nodesLeft(leftNodes);
+ }
+ }
+
+ for (Iterator i = newView.getMembers().iterator(); i.hasNext(); )
+ {
+ Address address = (Address)i.next();
+ if (oldView == null || !oldView.containsMember(address))
+ {
+ groupListener.nodeJoined(address);
+ }
+ }
+ }
+ catch (Throwable e)
+ {
+ log.error("Caught Exception in MembershipListener", e);
+ IllegalStateException e2 = new IllegalStateException(e.getMessage());
+ e2.setStackTrace(e.getStackTrace());
+ throw e2;
+ }
+
+ if (startedState == WAITING_FOR_FIRST_VIEW)
+ {
+ synchronized (waitLock)
+ {
+ startedState = WAITING_FOR_STATE;
+
+ waitLock.notify();
+ }
+ }
+ }
+ }
+
+ //Needs to be executed on different thread to avoid deadlock when running invm
+ viewThread = new Thread(new ViewChangeRunnable());
+
+ viewThread.start();
}
+
public byte[] getState()
{
@@ -498,6 +530,11 @@
}
}
+
+
+
+
+
/*
* This class is used to listen for messages on the async channel
*/
Modified: trunk/tests/build.xml
===================================================================
--- trunk/tests/build.xml 2007-08-24 06:54:33 UTC (rev 3051)
+++ trunk/tests/build.xml 2007-08-24 12:35:34 UTC (rev 3052)
@@ -343,7 +343,6 @@
<include name="**/messaging/core/**/${test-mask}.class"/>
<include name="**/jms/**/${test-mask}.class"/>
<include name="**/messaging/util/**/${test-mask}.class"/>
-
<exclude name="**/jms/MemLeakTest.class"/>
<exclude name="**/jms/RemotingConnectionConfigurationTest.class"/>
<exclude name="**/jms/XAResourceRecoveryTest.class"/>
Modified: trunk/tests/src/org/jboss/test/messaging/core/PostOfficeTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/PostOfficeTestBase.java 2007-08-24 06:54:33 UTC (rev 3051)
+++ trunk/tests/src/org/jboss/test/messaging/core/PostOfficeTestBase.java 2007-08-24 12:35:34 UTC (rev 3052)
@@ -267,17 +267,20 @@
protected void tearDown() throws Exception
{
+ Thread.sleep(2000);
+
pm.reapUnreferencedMessages();
if (this.checkNoMessageData())
{
fail("Message data still exists");
- }
+ }
if (this.checkNoBindingData())
{
fail("Binding data still exists");
}
+
sc.stop();
sc = null;
Modified: trunk/tests/src/org/jboss/test/messaging/core/postoffice/ClusteredPostOfficeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/postoffice/ClusteredPostOfficeTest.java 2007-08-24 06:54:33 UTC (rev 3051)
+++ trunk/tests/src/org/jboss/test/messaging/core/postoffice/ClusteredPostOfficeTest.java 2007-08-24 12:35:34 UTC (rev 3052)
@@ -410,6 +410,8 @@
// Stop office 2
office2.stop();
+ Thread.sleep(1000);
+
queues = office3.getQueuesForCondition(condition1, false);
assertNotNull(queues);
assertEquals(1, queues.size());
@@ -468,8 +470,9 @@
//Unbind it
+ log.info("Removing queue6 binding");
removed = office1.removeBinding(queue6.getName(), false);
- assertNotNull(removed);
+ assertNotNull(removed);
queues = office1.getQueuesForCondition(condition1, false);
assertNotNull(queues);
@@ -558,7 +561,10 @@
assertNotNull(queues);
assertEquals(2, queues.size());
assertTrue(queues.contains(queue8));
- assertTrue(queues.contains(queue9));
+ assertTrue(queues.contains(queue9));
+
+ log.info("at end");
+ //Thread.sleep(10000000);
}
catch (Throwable e)
{
@@ -569,23 +575,37 @@
{
if (office1 != null)
{
- office1.stop();
+ try
+ {
+ office1.stop();
+ }
+ catch (Exception ignore)
+ {
+ }
}
if (office2 != null)
{
- office2.stop();
+ try
+ {
+ office2.stop();
+ }
+ catch (Exception ignore)
+ {
+ }
}
if (office3 != null)
{
- office3.stop();
+ try
+ {
+ office3.stop();
+ }
+ catch (Exception ignore)
+ {
+ }
}
- if (checkNoBindingData())
- {
- fail("data still in database");
- }
}
}
@@ -712,12 +732,7 @@
assertTrue(bindings.isEmpty());
bindings = office3.getAllBindings();
- assertTrue(bindings.isEmpty());
-
- if (checkNoBindingData())
- {
- fail("data still in database");
- }
+ assertTrue(bindings.isEmpty());
}
finally
{
@@ -827,12 +842,7 @@
assertTrue(bindings.isEmpty());
bindings = office3.getAllBindings();
- assertTrue(bindings.isEmpty());
-
- if (checkNoBindingData())
- {
- fail("data still in database");
- }
+ assertTrue(bindings.isEmpty());
}
finally
{
@@ -928,12 +938,7 @@
assertTrue(bindings.isEmpty());
bindings = office3.getAllBindings();
- assertTrue(bindings.isEmpty());
-
- if (checkNoBindingData())
- {
- fail("data still in database");
- }
+ assertTrue(bindings.isEmpty());
}
finally
{
@@ -1034,13 +1039,7 @@
assertTrue(bindings.isEmpty());
bindings = office3.getAllBindings();
- assertTrue(bindings.isEmpty());
-
-
- if (checkNoBindingData())
- {
- fail("data still in database");
- }
+ assertTrue(bindings.isEmpty());
}
finally
{
@@ -1149,12 +1148,7 @@
assertTrue(bindings.isEmpty());
bindings = office3.getAllBindings();
- assertTrue(bindings.isEmpty());
-
- if (checkNoBindingData())
- {
- fail("data still in database");
- }
+ assertTrue(bindings.isEmpty());
}
finally
{
@@ -1290,12 +1284,7 @@
assertGotAll(2, bindings, queue1.getName());
bindings = office3.getAllBindings();
- assertGotAll(3, bindings, queue1.getName());
-
- if (checkNoBindingData())
- {
- fail("data still in database");
- }
+ assertGotAll(3, bindings, queue1.getName());
}
finally
{
@@ -1518,12 +1507,7 @@
receiver4.acknowledge(msgRec3, null);
msgs = queue4.browse(null);
assertNotNull(msgs);
- assertTrue(msgs.isEmpty());
-
- if (checkNoMessageData())
- {
- fail("Message data still in database");
- }
+ assertTrue(msgs.isEmpty());
}
finally
{
@@ -1920,13 +1904,7 @@
msgs = queues[i].browse(null);
assertNotNull(msgs);
assertTrue(msgs.isEmpty());
- }
-
-
- if (checkNoMessageData())
- {
- fail("Message data still in database");
- }
+ }
}
finally
{
@@ -2056,11 +2034,6 @@
msgs = queue3.browse(null);
assertNotNull(msgs);
assertTrue(msgs.isEmpty());
-
- if (checkNoMessageData())
- {
- fail("Message data still in database");
- }
}
finally
{
@@ -2210,11 +2183,6 @@
msgs = queue4.browse(null);
assertNotNull(msgs);
assertTrue(msgs.isEmpty());
-
- if (checkNoMessageData())
- {
- fail("Message data still in database");
- }
}
finally
{
@@ -2410,11 +2378,6 @@
}
}
-
- if (checkNoMessageData())
- {
- fail("Message data still in database");
- }
}
finally
{
@@ -2609,11 +2572,6 @@
}
}
-
- if (checkNoMessageData())
- {
- fail("Message data still in database");
- }
}
finally
{
@@ -2786,13 +2744,6 @@
checkNotGetsMessage(queue0, receiver0);
checkNotGetsMessage(queue2, receiver2);
-
-
-
- if (checkNoMessageData())
- {
- fail("Message data still in database");
- }
}
finally
{
@@ -2936,13 +2887,6 @@
checkNotGetsMessage(queue0, receiver0);
checkNotGetsMessage(queue2, receiver2);
-
-
-
- if (checkNoMessageData())
- {
- fail("Message data still in database");
- }
}
finally
{
More information about the jboss-cvs-commits
mailing list