[jboss-cvs] JBoss Messaging SVN: r2495 - in trunk: docs/examples/common/src/org/jboss/example/jms/common and 4 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Feb 28 09:31:20 EST 2007
Author: timfox
Date: 2007-02-28 09:31:19 -0500 (Wed, 28 Feb 2007)
New Revision: 2495
Added:
trunk/src/main/org/jboss/messaging/util/ClearableQueuedExecutor.java
Modified:
trunk/.classpath
trunk/docs/examples/common/src/org/jboss/example/jms/common/ExampleSupport.java
trunk/docs/examples/common/src/org/jboss/example/jms/common/Util.java
trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
trunk/src/main/org/jboss/jms/client/state/ConsumerState.java
trunk/src/main/org/jboss/jms/client/state/SessionState.java
trunk/tests/src/org/jboss/test/messaging/MessagingTestCase.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-904
Modified: trunk/.classpath
===================================================================
--- trunk/.classpath 2007-02-28 12:50:44 UTC (rev 2494)
+++ trunk/.classpath 2007-02-28 14:31:19 UTC (rev 2495)
@@ -1,6 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="src" path="docs/examples/queue-failover/src"/>
+ <classpathentry kind="src" path="docs/examples/distributed-queue/src"/>
<classpathentry kind="src" path="output/gen-parsers"/>
<classpathentry kind="src" path="docs/examples/common/src"/>
<classpathentry kind="src" path="docs/examples/distributed-topic/src"/>
Modified: trunk/docs/examples/common/src/org/jboss/example/jms/common/ExampleSupport.java
===================================================================
--- trunk/docs/examples/common/src/org/jboss/example/jms/common/ExampleSupport.java 2007-02-28 12:50:44 UTC (rev 2494)
+++ trunk/docs/examples/common/src/org/jboss/example/jms/common/ExampleSupport.java 2007-02-28 14:31:19 UTC (rev 2495)
@@ -184,6 +184,8 @@
setup(null);
}
+
+
protected void setup(InitialContext ic) throws Exception
{
String destinationName;
Modified: trunk/docs/examples/common/src/org/jboss/example/jms/common/Util.java
===================================================================
--- trunk/docs/examples/common/src/org/jboss/example/jms/common/Util.java 2007-02-28 12:50:44 UTC (rev 2494)
+++ trunk/docs/examples/common/src/org/jboss/example/jms/common/Util.java 2007-02-28 14:31:19 UTC (rev 2495)
@@ -21,6 +21,7 @@
*/
package org.jboss.example.jms.common;
+import javax.management.Attribute;
import javax.management.MBeanServerConnection;
import javax.management.ObjectName;
import javax.naming.InitialContext;
@@ -65,7 +66,34 @@
{
deployQueue(jndiName,null);
}
+
+ public static void activateMessagePullPolicy(InitialContext ic) throws Exception
+ {
+ //Need to promgrammatically activate the default message pull policy.
+
+ //We need to do this here since the default config ships with the NullMessagePullPolicy which
+ //doesn't do message redistribution
+
+ //You won't have to do this in your own programs - you just need to make sure
+ //your postoffice MBean config specifies the DefaultMessagePullPolicy
+
+ MBeanServerConnection mBeanServer = lookupMBeanServerProxy(ic);
+
+ ObjectName postOfficeObjectName = new ObjectName("jboss.messaging:service=PostOffice");
+ mBeanServer.invoke(postOfficeObjectName, "stop", null, null);
+
+ Attribute att = new Attribute("MessagePullPolicy", "org.jboss.messaging.core.plugin.postoffice.cluster.NullMessagePullPolicy");
+
+ mBeanServer.setAttribute(postOfficeObjectName, att);
+
+ //Restart the post office
+
+ mBeanServer.invoke(postOfficeObjectName, "start", null, null);
+
+
+ }
+
public static void deployQueue(String jndiName, InitialContext ic) throws Exception
{
MBeanServerConnection mBeanServer = lookupMBeanServerProxy(ic);
Modified: trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2007-02-28 12:50:44 UTC (rev 2494)
+++ trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2007-02-28 14:31:19 UTC (rev 2495)
@@ -505,7 +505,7 @@
/**
* Needed for failover
*/
- public void synchronizeWith(MessageCallbackHandler newHandler, QueuedExecutor sessionExecutor)
+ public void synchronizeWith(MessageCallbackHandler newHandler)
{
consumerID = newHandler.consumerID;
@@ -517,11 +517,8 @@
buffer.clear();
- this.sessionExecutor = sessionExecutor;
-
// need to reset toggle state
- serverSending = true;
-
+ serverSending = true;
}
public long getLastDeliveryId()
Modified: trunk/src/main/org/jboss/jms/client/state/ConsumerState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/ConsumerState.java 2007-02-28 12:50:44 UTC (rev 2494)
+++ trunk/src/main/org/jboss/jms/client/state/ConsumerState.java 2007-02-28 14:31:19 UTC (rev 2495)
@@ -138,10 +138,8 @@
MessageCallbackHandler handler = oldCallbackManager.unregisterHandler(oldConsumerID);
MessageCallbackHandler newHandler = newCallbackManager.unregisterHandler(consumerID);
-
- SessionState sstate = (SessionState)this.getParent();
-
- handler.synchronizeWith(newHandler, sstate.getExecutor());
+
+ handler.synchronizeWith(newHandler);
newCallbackManager.registerHandler(consumerID, handler);
}
Modified: trunk/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/SessionState.java 2007-02-28 12:50:44 UTC (rev 2494)
+++ trunk/src/main/org/jboss/jms/client/state/SessionState.java 2007-02-28 14:31:19 UTC (rev 2495)
@@ -47,6 +47,7 @@
import org.jboss.jms.tx.MessagingXAResource;
import org.jboss.jms.tx.ResourceManager;
import org.jboss.logging.Logger;
+import org.jboss.messaging.util.ClearableQueuedExecutor;
import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
@@ -84,7 +85,7 @@
private Object currentTxId;
// Executor used for executing onMessage methods
- private QueuedExecutor executor;
+ private ClearableQueuedExecutor executor;
private boolean recoverCalled;
@@ -133,7 +134,7 @@
currentTxId = parent.getResourceManager().createLocalTx();
}
- createExecutor();
+ executor = new ClearableQueuedExecutor(new LinkedQueue());
clientAckList = new ArrayList();
@@ -201,10 +202,8 @@
// We need to clear anything waiting in the session executor - since there may be messages
// from before failover waiting in there and we don't want them to get delivered after
// failover.
- executor.shutdownAfterProcessingCurrentTask();
+ executor.clearAllExceptCurrentTask();
- createExecutor();
-
ClientSessionDelegate newDelegate = (ClientSessionDelegate)newState.getDelegate();
for (Iterator i = getChildren().iterator(); i.hasNext(); )
@@ -268,6 +267,9 @@
if (!isTransacted() || (isXA() && getCurrentTxId() == null))
{
+ // TODO - the check "(isXA() && getCurrentTxId() == null)" shouldn't be necessary any more
+ // since xa sessions no longer fall back to non transacted
+
// Non transacted session or an XA session with no transaction set (it falls back
// to AUTO_ACKNOWLEDGE)
@@ -448,12 +450,6 @@
// Private --------------------------------------------------------------------------------------
- private void createExecutor()
- {
- executor = new QueuedExecutor(new LinkedQueue());
- }
-
-
// Inner classes --------------------------------------------------------------------------------
}
Added: trunk/src/main/org/jboss/messaging/util/ClearableQueuedExecutor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/ClearableQueuedExecutor.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/util/ClearableQueuedExecutor.java 2007-02-28 14:31:19 UTC (rev 2495)
@@ -0,0 +1,73 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.util;
+
+import EDU.oswego.cs.dl.util.concurrent.Channel;
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
+
+/**
+ * A ClearableQueuedExecutor
+ *
+ * This class extends the QueuedExector with a method to clear all but the currently
+ * executing task without shutting it down.
+ *
+ * We need this functionality when failing over a session.
+ *
+ * In that case we need to clear all tasks apart from the currently executing one.
+ *
+ * We can't just shutdownAfterProcessingCurrentTask then use another instance
+ * after failover since when failover resumes the current task and the next delivery
+ * will be executed on different threads and smack into each other.
+ *
+ * http://jira.jboss.org/jira/browse/JBMESSAGING-904
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class ClearableQueuedExecutor extends QueuedExecutor
+{
+ public ClearableQueuedExecutor()
+ {
+ }
+
+ public ClearableQueuedExecutor(Channel channel)
+ {
+ super(channel);
+ }
+
+ public void clearAllExceptCurrentTask()
+ {
+ try
+ {
+ while (queue_.poll(0) != null);
+ }
+ catch (InterruptedException ex)
+ {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+}
+
Modified: trunk/tests/src/org/jboss/test/messaging/MessagingTestCase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/MessagingTestCase.java 2007-02-28 12:50:44 UTC (rev 2494)
+++ trunk/tests/src/org/jboss/test/messaging/MessagingTestCase.java 2007-02-28 14:31:19 UTC (rev 2495)
@@ -35,11 +35,10 @@
import javax.sql.DataSource;
import javax.transaction.TransactionManager;
-import junit.framework.TestCase;
-
import org.jboss.jms.message.MessageIdGeneratorFactory;
import org.jboss.logging.Logger;
import org.jboss.test.messaging.tools.ServerManagement;
+import org.jboss.test.messaging.util.ProxyAssertSupport;
import org.jboss.tm.TransactionManagerService;
/**
@@ -53,7 +52,7 @@
* @version <tt>$Revision$</tt>
* $Id$
*/
-public class MessagingTestCase extends TestCase
+public class MessagingTestCase extends ProxyAssertSupport
{
// Constants -----------------------------------------------------
More information about the jboss-cvs-commits
mailing list