[jboss-cvs] JBoss Messaging SVN: r8282 - in branches/Branch_JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838_JBMESSAGING-1851: src/main/org/jboss/jms/client/container and 2 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Apr 28 14:29:12 EDT 2011
Author: jbertram
Date: 2011-04-28 14:29:11 -0400 (Thu, 28 Apr 2011)
New Revision: 8282
Modified:
branches/Branch_JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838_JBMESSAGING-1851/
branches/Branch_JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838_JBMESSAGING-1851/src/main/org/jboss/jms/client/container/ClientConsumer.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838_JBMESSAGING-1851/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838_JBMESSAGING-1851/tests/src/org/jboss/test/messaging/jms/clustering/ClusteredConnectionFactoryTest.java
Log:
JBPAPP-6159
Property changes on: branches/Branch_JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838_JBMESSAGING-1851
___________________________________________________________________
Modified: svn:mergeinfo
- /branches/Branch_1_4:8238
+ /branches/Branch_1_4:8237-8238,8245,8257
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838_JBMESSAGING-1851/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838_JBMESSAGING-1851/src/main/org/jboss/jms/client/container/ClientConsumer.java 2011-04-28 15:07:52 UTC (rev 8281)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838_JBMESSAGING-1851/src/main/org/jboss/jms/client/container/ClientConsumer.java 2011-04-28 18:29:11 UTC (rev 8282)
@@ -24,7 +24,13 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
@@ -289,6 +295,7 @@
private int consumeCount;
private boolean firstTime = true;
private volatile Thread onMessageThread;
+ private ExecutorService pool = Executors.newCachedThreadPool();
private long maxRetryChangeRate;
private long retryChangeRateInterval;
@@ -464,6 +471,8 @@
this.listener = null;
}
+
+ pool.shutdownNow();
if (trace) { log.trace(this + " closed"); }
}
@@ -551,25 +560,80 @@
if (!isConnectionConsumer && !ignore)
{
- DeliveryInfo info = new DeliveryInfo(m, consumerID, queueName, null, shouldAck);
-
- sessionDelegate.preDeliver(info);
+ final DeliveryInfo info = new DeliveryInfo(m, consumerID, queueName, null, shouldAck);
- //If post deliver didn't succeed and acknowledgement mode is auto_ack
- //That means the ref wasn't acked since it couldn't be found.
- //In order to maintain at most once semantics we must therefore not return
- //the message
-
- ignore = !sessionDelegate.postDeliver();
-
+ if (timeout <= 0)
+ {
+ sessionDelegate.preDeliver(info);
+
+ // If post deliver didn't succeed and acknowledgement mode is auto_ack
+ // That means the ref wasn't acked since it couldn't be found.
+ // In order to maintain at most once semantics we must therefore not return
+ // the message
+
+ ignore = !sessionDelegate.postDeliver();
+
+ }
+ else
+ {
+ //JBMESSAGING-1850
+ Callable<Boolean> afterReceive = new Callable<Boolean>()
+ {
+ public Boolean call() throws Exception
+ {
+ sessionDelegate.preDeliver(info);
+
+ // If post deliver didn't succeed and acknowledgement mode is auto_ack
+ // That means the ref wasn't acked since it couldn't be found.
+ // In order to maintain at most once semantics we must therefore not return
+ // the message
+
+ return !sessionDelegate.postDeliver();
+ }
+ };
+
+ java.util.concurrent.Future<Boolean> f = pool.submit(afterReceive);
+
+ long tmUsed = System.currentTimeMillis() - startTimestamp;
+
+ if (tmUsed >= timeout)
+ {
+ log.warn("Timed out before post message processing, discarding message " + m);
+ throw new JMSException("Timed out before post message processing, discarding message " + m);
+ }
+
+ try
+ {
+ ignore = f.get(timeout - tmUsed, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ log.warn("Interrupted during getting future result.", e);
+ }
+ catch (ExecutionException e)
+ {
+ log.warn("received application exception.", e.getCause());
+ Throwable t = e.getCause();
+ if (t instanceof JMSException)
+ {
+ throw (JMSException)t;
+ }
+ }
+ catch (TimeoutException e)
+ {
+ log.warn("Timed out waiting for post message processing, discarding message " + m);
+ throw new JMSException("Timed out waiting for post message processing, discarding message " + m);
+ }
+ }
+
if (trace)
{
- log.trace("Post deliver returned " + !ignore);
+ log.trace("Post deliver returned " + !ignore);
}
-
+
if (!ignore)
{
- m.incDeliveryCount();
+ m.incDeliveryCount();
}
}
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838_JBMESSAGING-1851/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838_JBMESSAGING-1851/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java 2011-04-28 15:07:52 UTC (rev 8281)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838_JBMESSAGING-1851/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java 2011-04-28 18:29:11 UTC (rev 8282)
@@ -383,6 +383,10 @@
strictTck = in.readBoolean();
sendAcksAsync = in.readBoolean();
+
+ maxRetryChangeRate = in.readLong();
+
+ retryChangeRateInterval = in.readLong();
}
public void write(DataOutputStream out) throws Exception
@@ -400,6 +404,10 @@
out.writeBoolean(strictTck);
out.writeBoolean(sendAcksAsync);
+
+ out.writeLong(this.maxRetryChangeRate);
+
+ out.writeLong(this.retryChangeRateInterval);
}
/**
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838_JBMESSAGING-1851/tests/src/org/jboss/test/messaging/jms/clustering/ClusteredConnectionFactoryTest.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838_JBMESSAGING-1851/tests/src/org/jboss/test/messaging/jms/clustering/ClusteredConnectionFactoryTest.java 2011-04-28 15:07:52 UTC (rev 8281)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838_JBMESSAGING-1851/tests/src/org/jboss/test/messaging/jms/clustering/ClusteredConnectionFactoryTest.java 2011-04-28 18:29:11 UTC (rev 8282)
@@ -23,10 +23,15 @@
package org.jboss.test.messaging.jms.clustering;
import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.management.ObjectName;
+import org.jboss.jms.client.JBossConnection;
import org.jboss.jms.client.JBossConnectionFactory;
import org.jboss.jms.client.delegate.ClientClusteredConnectionFactoryDelegate;
+import org.jboss.jms.client.delegate.ClientConnectionDelegate;
import org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate;
+import org.jboss.jms.client.state.ConnectionState;
import org.jboss.jms.exception.MessagingNetworkFailureException;
import org.jboss.test.messaging.tools.ServerManagement;
import org.jboss.test.messaging.tools.aop.PoisonInterceptor;
@@ -217,6 +222,181 @@
}
+ //https://issues.jboss.org/browse/JBMESSAGING-1851
+ public void testChangeRateConfigSettings() throws Exception
+ {
+ Connection c = null;
+
+ try
+ {
+ String mbeanConfig = "<mbean code=\"org.jboss.jms.server.connectionfactory.ConnectionFactory\"\n" + " name=\"jboss.messaging.connectionfactory:service=TestChangeRateConfigSettingsFactory\"\n"
+ + " xmbean-dd=\"xmdesc/ConnectionFactory-xmbean.xml\">\n"
+ + " <depends optional-attribute-name=\"ServerPeer\">jboss.messaging:service=ServerPeer</depends>\n"
+ + " <depends optional-attribute-name=\"Connector\">jboss.messaging:service=Connector,transport=bisocket</depends>\n"
+ + " <attribute name=\"JNDIBindings\">\n"
+ + " <bindings>\n"
+ + " <binding>/ClusteredTestChangeRateConfigSettingsFactory</binding>\n"
+ + " </bindings>\n"
+ + " </attribute>\n"
+ + " <attribute name=\"SupportsFailover\">true</attribute>"
+ + " <attribute name=\"SupportsLoadBalancing\">true</attribute>"
+ + " <attribute name=\"MaxRetryChangeRate\">10</attribute>\n"
+ + " <attribute name=\"RetryChangeRateInterval\">2345</attribute>\n"
+ + " </mbean>";
+
+ ObjectName on = ServerManagement.deploy(mbeanConfig);
+ ServerManagement.invoke(on, "create", new Object[0], new String[0]);
+ ServerManagement.invoke(on, "start", new Object[0], new String[0]);
+
+ ConnectionFactory cf = (ConnectionFactory)ic[0].lookup("/ClusteredTestChangeRateConfigSettingsFactory");
+ c = cf.createConnection();
+
+ ClientConnectionDelegate del1 = (ClientConnectionDelegate)((JBossConnection)c).getDelegate();
+
+ ConnectionState state1 = (ConnectionState)del1.getState();
+
+ long maxRetry = state1.getMaxRetryChangeRate();
+
+ assertEquals(10, maxRetry);
+
+ long retryInterval = state1.getRetryChangeRateInterval();
+
+ assertEquals(2345, retryInterval);
+ }
+ finally
+ {
+ try
+ {
+ if (c != null)
+ {
+ log.info("Closing connection");
+ c.close();
+ log.info("Closed connection");
+ }
+ }
+ catch (Exception e)
+ {
+ log.warn(e.toString(), e);
+ }
+ }
+ }
+
+ //https://issues.jboss.org/browse/JBMESSAGING-1851
+ public void testChangeRateConfigSettings2() throws Exception
+ {
+ Connection c = null;
+
+ try
+ {
+ String mbeanConfig = "<mbean code=\"org.jboss.jms.server.connectionfactory.ConnectionFactory\"\n" + " name=\"jboss.messaging.connectionfactory:service=TestChangeRateConfigSettingsFactory\"\n"
+ + " xmbean-dd=\"xmdesc/ConnectionFactory-xmbean.xml\">\n"
+ + " <depends optional-attribute-name=\"ServerPeer\">jboss.messaging:service=ServerPeer</depends>\n"
+ + " <depends optional-attribute-name=\"Connector\">jboss.messaging:service=Connector,transport=bisocket</depends>\n"
+ + " <attribute name=\"JNDIBindings\">\n"
+ + " <bindings>\n"
+ + " <binding>/ClusteredTestChangeRateConfigSettingsFactory</binding>\n"
+ + " </bindings>\n"
+ + " </attribute>\n"
+ + " <attribute name=\"SupportsFailover\">false</attribute>"
+ + " <attribute name=\"SupportsLoadBalancing\">true</attribute>"
+ + " <attribute name=\"MaxRetryChangeRate\">10</attribute>\n"
+ + " <attribute name=\"RetryChangeRateInterval\">2345</attribute>\n"
+ + " </mbean>";
+
+ ObjectName on = ServerManagement.deploy(mbeanConfig);
+ ServerManagement.invoke(on, "create", new Object[0], new String[0]);
+ ServerManagement.invoke(on, "start", new Object[0], new String[0]);
+
+ ConnectionFactory cf = (ConnectionFactory)ic[0].lookup("/ClusteredTestChangeRateConfigSettingsFactory");
+ c = cf.createConnection();
+
+ ClientConnectionDelegate del1 = (ClientConnectionDelegate)((JBossConnection)c).getDelegate();
+
+ ConnectionState state1 = (ConnectionState)del1.getState();
+
+ long maxRetry = state1.getMaxRetryChangeRate();
+
+ assertEquals(10, maxRetry);
+
+ long retryInterval = state1.getRetryChangeRateInterval();
+
+ assertEquals(2345, retryInterval);
+ }
+ finally
+ {
+ try
+ {
+ if (c != null)
+ {
+ log.info("Closing connection");
+ c.close();
+ log.info("Closed connection");
+ }
+ }
+ catch (Exception e)
+ {
+ log.warn(e.toString(), e);
+ }
+ }
+ }
+
+ //https://issues.jboss.org/browse/JBMESSAGING-1851
+ public void testChangeRateConfigDefaults() throws Exception
+ {
+ Connection c = null;
+
+ try
+ {
+ String mbeanConfig = "<mbean code=\"org.jboss.jms.server.connectionfactory.ConnectionFactory\"\n" + " name=\"jboss.messaging.connectionfactory:service=TestChangeRateConfigDefaultsFactory\"\n"
+ + " xmbean-dd=\"xmdesc/ConnectionFactory-xmbean.xml\">\n"
+ + " <depends optional-attribute-name=\"ServerPeer\">jboss.messaging:service=ServerPeer</depends>\n"
+ + " <depends optional-attribute-name=\"Connector\">jboss.messaging:service=Connector,transport=bisocket</depends>\n"
+ + " <attribute name=\"JNDIBindings\">\n"
+ + " <bindings>\n"
+ + " <binding>/ClusteredTestChangeRateConfigDefaultsFactory</binding>\n"
+ + " </bindings>\n"
+ + " </attribute>\n"
+ + " <attribute name=\"SupportsFailover\">true</attribute>"
+ + " <attribute name=\"SupportsLoadBalancing\">true</attribute>"
+ + " </mbean>";
+
+ ObjectName on = ServerManagement.deploy(mbeanConfig);
+ ServerManagement.invoke(on, "create", new Object[0], new String[0]);
+ ServerManagement.invoke(on, "start", new Object[0], new String[0]);
+
+ ConnectionFactory cf = (ConnectionFactory)ic[0].lookup("/ClusteredTestChangeRateConfigDefaultsFactory");
+ c = cf.createConnection();
+
+ ClientConnectionDelegate del1 = (ClientConnectionDelegate)((JBossConnection)c).getDelegate();
+
+ ConnectionState state1 = (ConnectionState)del1.getState();
+
+ long maxRetry = state1.getMaxRetryChangeRate();
+
+ assertEquals(0, maxRetry);
+
+ long retryInterval = state1.getRetryChangeRateInterval();
+
+ assertEquals(5000, retryInterval);
+ }
+ finally
+ {
+ try
+ {
+ if (c != null)
+ {
+ log.info("Closing connection");
+ c.close();
+ log.info("Closed connection");
+ }
+ }
+ catch (Exception e)
+ {
+ log.warn(e.toString(), e);
+ }
+ }
+ }
+
// Package protected ----------------------------------------------------------------------------
// Protected ------------------------------------------------------------------------------------
More information about the jboss-cvs-commits
mailing list