[jboss-cvs] JBoss Messaging SVN: r2468 - in trunk: src/main/org/jboss/jms/client/remoting and 4 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Feb 27 11:48:23 EST 2007
Author: timfox
Date: 2007-02-27 11:48:23 -0500 (Tue, 27 Feb 2007)
New Revision: 2468
Modified:
trunk/src/etc/server/default/deploy/messaging-service.xml
trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java
trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
trunk/src/main/org/jboss/jms/client/state/ConsumerState.java
trunk/src/main/org/jboss/jms/client/state/SessionState.java
trunk/src/main/org/jboss/jms/server/ServerPeer.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/MultipleFailoverTest.java
Log:
Fixed another bug in failover plus a few bits and pieces
Modified: trunk/src/etc/server/default/deploy/messaging-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/messaging-service.xml 2007-02-27 13:35:05 UTC (rev 2467)
+++ trunk/src/etc/server/default/deploy/messaging-service.xml 2007-02-27 16:48:23 UTC (rev 2468)
@@ -33,8 +33,8 @@
<attribute name="DefaultExpiryQueue">jboss.messaging.destination:service=Queue,name=ExpiryQueue</attribute>
<attribute name="DefaultRedeliveryDelay">0</attribute>
<attribute name="QueueStatsSamplePeriod">5000</attribute>
- <attribute name="FailoverStartTimeout">3000</attribute>
- <attribute name="FailoverCompleteTimeout">12000</attribute>
+ <attribute name="FailoverStartTimeout">60000</attribute>
+ <attribute name="FailoverCompleteTimeout">300000</attribute>
<attribute name="DefaultMessageCounterHistoryDayLimit">-1</attribute>
<depends optional-attribute-name="PersistenceManager">jboss.messaging:service=PersistenceManager</depends>
Modified: trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java 2007-02-27 13:35:05 UTC (rev 2467)
+++ trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java 2007-02-27 16:48:23 UTC (rev 2468)
@@ -88,7 +88,7 @@
if (parameter instanceof ClientDelivery)
{
ClientDelivery dr = (ClientDelivery)parameter;
-
+
Message msg = dr.getMessage();
MessageProxy proxy = JBossMessage.
Modified: trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2007-02-27 13:35:05 UTC (rev 2467)
+++ trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2007-02-27 16:48:23 UTC (rev 2468)
@@ -42,6 +42,7 @@
import org.jboss.messaging.util.prioritylinkedlist.BasicPriorityLinkedList;
import org.jboss.messaging.util.prioritylinkedlist.PriorityLinkedList;
+import EDU.oswego.cs.dl.util.concurrent.Executor;
import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
/**
@@ -505,7 +506,7 @@
/**
* Needed for failover
*/
- public void synchronizeWith(MessageCallbackHandler newHandler)
+ public void synchronizeWith(MessageCallbackHandler newHandler, QueuedExecutor sessionExecutor)
{
consumerID = newHandler.consumerID;
@@ -517,6 +518,8 @@
buffer.clear();
+ this.sessionExecutor = sessionExecutor;
+
// need to reset toggle state
serverSending = true;
Modified: trunk/src/main/org/jboss/jms/client/state/ConsumerState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/ConsumerState.java 2007-02-27 13:35:05 UTC (rev 2467)
+++ trunk/src/main/org/jboss/jms/client/state/ConsumerState.java 2007-02-27 16:48:23 UTC (rev 2468)
@@ -142,7 +142,9 @@
MessageCallbackHandler handler = oldCallbackManager.unregisterHandler(oldConsumerID);
MessageCallbackHandler newHandler = newCallbackManager.unregisterHandler(consumerID);
- handler.synchronizeWith(newHandler);
+ SessionState sstate = (SessionState)this.getParent();
+
+ handler.synchronizeWith(newHandler, sstate.getExecutor());
newCallbackManager.registerHandler(consumerID, handler);
}
Modified: trunk/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/SessionState.java 2007-02-27 13:35:05 UTC (rev 2467)
+++ trunk/src/main/org/jboss/jms/client/state/SessionState.java 2007-02-27 16:48:23 UTC (rev 2468)
@@ -133,7 +133,7 @@
currentTxId = parent.getResourceManager().createLocalTx();
}
- executor = new QueuedExecutor(new LinkedQueue());
+ createExecutor();
clientAckList = new ArrayList();
@@ -197,6 +197,12 @@
int oldSessionID = sessionID;
sessionID = newState.sessionID;
+
+ // We need to clear anything waiting in the session executor - since there may be messages
+ // from before failover waiting in there and we don't want them to get delivered after failover
+ executor.shutdownAfterProcessingCurrentTask();
+
+ createExecutor();
ClientSessionDelegate newDelegate = (ClientSessionDelegate)newState.getDelegate();
@@ -333,9 +339,9 @@
else
{
log.debug(this + " no delivery recovery info to send on failover");
- }
+ }
}
-
+
// Public ---------------------------------------------------------------------------------------
/**
@@ -441,6 +447,12 @@
// Private --------------------------------------------------------------------------------------
+ private void createExecutor()
+ {
+ executor = new QueuedExecutor(new LinkedQueue());
+ }
+
+
// Inner classes --------------------------------------------------------------------------------
}
Modified: trunk/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ServerPeer.java 2007-02-27 13:35:05 UTC (rev 2467)
+++ trunk/src/main/org/jboss/jms/server/ServerPeer.java 2007-02-27 16:48:23 UTC (rev 2468)
@@ -123,9 +123,11 @@
private Object failoverStatusLock;
- private long failoverStartTimeout = 3000;
+ //Default is 1 minute
+ private long failoverStartTimeout = 60 * 1000;
- private long failoverCompleteTimeout = 12000;
+ //Default is 5 minutes
+ private long failoverCompleteTimeout = 5 * 60 * 1000;
private Map sessions;
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2007-02-27 13:35:05 UTC (rev 2467)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2007-02-27 16:48:23 UTC (rev 2468)
@@ -276,8 +276,7 @@
Client callbackClient = callbackHandler.getCallbackClient();
- ClientDelivery del = new ClientDelivery(message, id, deliveryId, ref
- .getDeliveryCount());
+ ClientDelivery del = new ClientDelivery(message, id, deliveryId, ref.getDeliveryCount());
Callback callback = new Callback(del);
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/MultipleFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/MultipleFailoverTest.java 2007-02-27 13:35:05 UTC (rev 2467)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/MultipleFailoverTest.java 2007-02-27 16:48:23 UTC (rev 2468)
@@ -228,11 +228,10 @@
while (!killer.isDone())
{
TextMessage tm = sessSend.createTextMessage("message " + count);
+ tm.setIntProperty("cnt", count);
prod.send(tm);
- //Thread.sleep(10);
-
if (count % 100 == 0)
{
log.info("sent " + count);
@@ -240,13 +239,9 @@
count++;
}
-
- log.info("sending done");
-
+
t.join();
- log.info("stopping listener");
-
if (killer.failed)
{
fail();
@@ -264,9 +259,19 @@
}
finally
{
+ if (!ServerManagement.isStarted(0))
+ {
+ ServerManagement.start(0, "all");
+ }
+
+ if (!ServerManagement.isStarted(1))
+ {
+ ServerManagement.start(1, "all");
+ }
+
if (conn != null)
{
- log.info("closing connetion");
+ log.info("closing connection");
try
{
conn.close();
@@ -395,7 +400,8 @@
{
this.latch = latch;
}
-
+
+
public void onMessage(Message msg)
{
try
@@ -404,17 +410,14 @@
if (count % 100 == 0)
{
- log.info("Received message " + tm.getText());
+ log.info("Received message " + tm.getText() + " (" + tm + ")");
}
- if (!tm.getText().equals("message " + count))
+ if (tm.getIntProperty("cnt") != count)
{
- log.error("Expected message " + count + " but got " + tm.getText());
-
+ log.error("Wrong message received " + tm.getIntProperty("cnt"));
failed = true;
-
- latch.release();
- }
+ }
count++;
}
More information about the jboss-cvs-commits
mailing list