[jboss-cvs] JBoss Messaging SVN: r3113 - in trunk: src/etc/xmdesc and 6 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Sep 18 05:44:24 EDT 2007
Author: timfox
Date: 2007-09-18 05:44:24 -0400 (Tue, 18 Sep 2007)
New Revision: 3113
Modified:
trunk/docs/userguide/en/modules/configuration.xml
trunk/src/etc/xmdesc/ConnectionFactory-xmbean.xml
trunk/src/main/org/jboss/jms/server/ConnectionFactoryManager.java
trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java
trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
trunk/tests/src/org/jboss/test/messaging/jms/ConnectionFactoryTest.java
Log:
http://jira.jboss.com/jira/browse/JBMESSAGING-1072
Modified: trunk/docs/userguide/en/modules/configuration.xml
===================================================================
--- trunk/docs/userguide/en/modules/configuration.xml 2007-09-17 22:01:10 UTC (rev 3112)
+++ trunk/docs/userguide/en/modules/configuration.xml 2007-09-18 09:44:24 UTC (rev 3113)
@@ -1332,16 +1332,16 @@
<section id="conf.destination.queue.attributes.redeliverydelay">
<title>RedeliveryDelay</title>
<para>The redelivery delay to be used for this queue. Overrides any value set on the ServerPeer config</para>
- </section>
-
- <section id="conf.destination.queue.attributes.maxdeliveryattempts">
- <title>MaxDeliveryAttempts</title>
- <para>
- The maximum number of times delivery of a message will be attempted before sending the message to the DLQ,
- if configured. If set to -1 (the default), the value from the ServerPeer config is used. Any other setting
- overrides the value set on the ServerPeer config.
- </para>
</section>
+
+ <section id="conf.destination.queue.attributes.maxdeliveryattempts">
+ <title>MaxDeliveryAttempts</title>
+ <para>
+ The maximum number of times delivery of a message will be attempted before sending the message to the DLQ,
+ if configured. If set to -1 (the default), the value from the ServerPeer config is used. Any other setting
+ overrides the value set on the ServerPeer config.
+ </para>
+ </section>
<section id="conf.destination.queue.attributes.security">
<title>Destination Security Configuration</title>
@@ -1658,15 +1658,15 @@
<para>The redelivery delay to be used for this topic. Overrides any value set on the ServerPeer config</para>
</section>
- <section id="conf.destination.topic.attributes.maxdeliveryattempts">
- <title>MaxDeliveryAttempts</title>
- <para>
- The maximum number of times delivery of a message will be attempted before sending the message to the DLQ,
- if configured. If set to -1 (the default), the value from the ServerPeer config is used. Any other setting
- overrides the value set on the ServerPeer config.
- </para>
- </section>
-
+ <section id="conf.destination.topic.attributes.maxdeliveryattempts">
+ <title>MaxDeliveryAttempts</title>
+ <para>
+ The maximum number of times delivery of a message will be attempted before sending the message to the DLQ,
+ if configured. If set to -1 (the default), the value from the ServerPeer config is used. Any other setting
+ overrides the value set on the ServerPeer config.
+ </para>
+ </section>
+
<section id="conf.destination.topic.attributes.security">
<title>Destination Security Configuration</title>
@@ -2038,6 +2038,8 @@
<attribute name="LoadBalancingFactory">org.acme.MyLoadBalancingFactory</attribute>
<attribute name="PrefetchSize">1000</attribute>
+
+ <attribute name="SlowConsumers">false</attribute>
<attribute name="DefaultTempQueueFullSize">50000</attribute>
@@ -2056,7 +2058,7 @@
<literal>myClientID</literal> and bind the connection factory in two places in
the JNDI tree: <literal>/MyConnectionFactory</literal>
and <literal>/factories/cf</literal>.
- The connection factory overrides the default values for PreFetchSize,
+ The connection factory overrides the default values for PreFetchSize,
DefaultTempQueueFullSize, DefaultTempQueuePageSize, DefaultTempQueueDownCacheSize and
DupsOKBatchSize, SupportsFailover, SupportsLoadBalancing and LoadBalancingFactory.
The connection factory will use the default
@@ -2096,7 +2098,16 @@
The prefetchSize determines the size of this buffer. Larger values give better throughput.
</para>
</section>
+
+ <section id="conf.connectionfactory.attributes.slowconsumers">
+ <title>SlowConsumers</title>
+ <para>
+ If you have very slow consumers, then you probably want to make sure they don't buffer any messages.
+ Since this can prevent them from being consumed by faster consumers.
+ </para>
+ </section>
+
<section id="conf.connectionfactory.attributes.tempqueuepaging">
<title>Temporary queue paging parameters</title>
Modified: trunk/src/etc/xmdesc/ConnectionFactory-xmbean.xml
===================================================================
--- trunk/src/etc/xmdesc/ConnectionFactory-xmbean.xml 2007-09-17 22:01:10 UTC (rev 3112)
+++ trunk/src/etc/xmdesc/ConnectionFactory-xmbean.xml 2007-09-18 09:44:24 UTC (rev 3113)
@@ -47,6 +47,12 @@
<name>PrefetchSize</name>
<type>int</type>
</attribute>
+
+ <attribute access="read-write" getMethod="isSlowConsumers" setMethod="setSlowConsumers">
+ <description>Does this connection factory create slow consumers? Slow consumers never buffer messages</description>
+ <name>SlowConsumers</name>
+ <type>boolean</type>
+ </attribute>
<attribute access="read-write" getMethod="getDefaultTempQueueFullSize" setMethod="setDefaultTempQueueFullSize">
<description>The default value of paging full size for any temporary queues created for connections from this connection factory</description>
Modified: trunk/src/main/org/jboss/jms/server/ConnectionFactoryManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ConnectionFactoryManager.java 2007-09-17 22:01:10 UTC (rev 3112)
+++ trunk/src/main/org/jboss/jms/server/ConnectionFactoryManager.java 2007-09-18 09:44:24 UTC (rev 3113)
@@ -41,6 +41,7 @@
void registerConnectionFactory(String uniqueName, String clientID, JNDIBindings jndiBindings,
String locatorURI, boolean clientPing,
int prefetchSize,
+ boolean slowConsumers,
int defaultTempQueueFullSize,
int defaultTempQueuePageSize,
int defaultTempQueueDownCacheSize,
Modified: trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java 2007-09-17 22:01:10 UTC (rev 3112)
+++ trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java 2007-09-18 09:44:24 UTC (rev 3113)
@@ -45,6 +45,8 @@
private int prefetchSize = 150;
+ private boolean slowConsumers;
+
private boolean supportsFailover;
private boolean supportsLoadBalancing;
@@ -145,7 +147,7 @@
connectionFactoryManager.
registerConnectionFactory(getServiceName().getCanonicalName(), clientID, jndiBindings,
- locatorURI, enablePing, prefetchSize,
+ locatorURI, enablePing, prefetchSize, slowConsumers,
defaultTempQueueFullSize, defaultTempQueuePageSize,
defaultTempQueueDownCacheSize, dupsOKBatchSize, supportsFailover, supportsLoadBalancing,
loadBalancingFactory);
@@ -232,6 +234,16 @@
{
this.prefetchSize = prefetchSize;
}
+
+ public boolean isSlowConsumers()
+ {
+ return slowConsumers;
+ }
+
+ public void setSlowConsumers(boolean slowConsumers)
+ {
+ this.slowConsumers = slowConsumers;
+ }
public String getClientID()
{
Modified: trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2007-09-17 22:01:10 UTC (rev 3112)
+++ trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2007-09-18 09:44:24 UTC (rev 3113)
@@ -107,6 +107,7 @@
String locatorURI,
boolean clientPing,
int prefetchSize,
+ boolean slowConsumers,
int defaultTempQueueFullSize,
int defaultTempQueuePageSize,
int defaultTempQueueDownCacheSize,
@@ -135,6 +136,7 @@
ServerConnectionFactoryEndpoint endpoint =
new ServerConnectionFactoryEndpoint(uniqueName, id, serverPeer, clientID,
jndiBindings, prefetchSize,
+ slowConsumers,
defaultTempQueueFullSize,
defaultTempQueuePageSize,
defaultTempQueueDownCacheSize,
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java 2007-09-17 22:01:10 UTC (rev 3112)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java 2007-09-18 09:44:24 UTC (rev 3113)
@@ -85,6 +85,8 @@
private int dupsOKBatchSize;
private boolean supportsFailover;
+
+ private boolean slowConsumers;
/** Cluster Topology on ClusteredConnectionFactories
Information to failover to other connections on clients **/
@@ -106,6 +108,7 @@
String defaultClientID,
JNDIBindings jndiBindings,
int preFetchSize,
+ boolean slowConsumers,
int defaultTempQueueFullSize,
int defaultTempQueuePageSize,
int defaultTempQueueDownCacheSize,
@@ -123,6 +126,11 @@
this.defaultTempQueueDownCacheSize = defaultTempQueueDownCacheSize;
this.dupsOKBatchSize = dupsOKBatchSize;
this.supportsFailover = supportsFailover;
+ this.slowConsumers = slowConsumers;
+ if (slowConsumers)
+ {
+ this.prefetchSize = 1;
+ }
}
// ConnectionFactoryDelegate implementation -----------------------------------------------------
@@ -345,6 +353,11 @@
this.delegates = delegates;
this.failoverMap = failoverMap;
}
+
+ public boolean isSlowConsumers()
+ {
+ return slowConsumers;
+ }
public String toString()
{
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2007-09-17 22:01:10 UTC (rev 3112)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2007-09-18 09:44:24 UTC (rev 3113)
@@ -107,6 +107,8 @@
private boolean replicating;
+ private boolean slow;
+
// Constructors ---------------------------------------------------------------------------------
ServerConsumerEndpoint(String id, Queue messageQueue, String queueName,
@@ -151,6 +153,8 @@
this.replicating = replicating;
+ this.slow = sessionEndpoint.getConnectionEndpoint().getConnectionFactoryEndpoint().isSlowConsumers();
+
if (dest.isTopic() && !messageQueue.isRecoverable())
{
// This is a consumer of a non durable topic subscription. We don't need to store
@@ -260,6 +264,17 @@
return delivery;
}
+ if (slow)
+ {
+ //If this is a slow consumer, we do not want to do any message buffering, so we immediately
+ //set clientAccepting to false
+ //When the client has consumed the message it will send a changeRate + message which will set
+ //clientAccepting to true again
+ //We cannot just rely on setting the prefetchSize to 1, since this is not a hard guarantee that only one message
+ //will be buffered at once due to the asynchronous nature of sending changeRate
+ this.clientAccepting = false;
+ }
+
try
{
sessionEndpoint.handleDelivery(delivery, this);
@@ -270,7 +285,7 @@
this.started = false; // DO NOT return null or the message might get delivered more than once
}
-
+
return delivery;
}
}
Modified: trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java 2007-09-17 22:01:10 UTC (rev 3112)
+++ trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java 2007-09-18 09:44:24 UTC (rev 3113)
@@ -284,7 +284,6 @@
public boolean isSupportsBlobSelect()
{
-
if (supportsBlobSelect == null)
{
supportsBlobSelect = getSQLStatement("SUPPORTS_BLOB_ON_SELECT").equals("Y") ? Boolean.TRUE:
Modified: trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java 2007-09-17 22:01:10 UTC (rev 3112)
+++ trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java 2007-09-18 09:44:24 UTC (rev 3113)
@@ -301,8 +301,6 @@
if (trace) { log.trace("Acknowledged message"); }
}
-
- //if (queue.)
}
catch (Exception e)
{
Modified: trunk/tests/src/org/jboss/test/messaging/jms/ConnectionFactoryTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/ConnectionFactoryTest.java 2007-09-17 22:01:10 UTC (rev 3112)
+++ trunk/tests/src/org/jboss/test/messaging/jms/ConnectionFactoryTest.java 2007-09-18 09:44:24 UTC (rev 3113)
@@ -24,9 +24,14 @@
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.Session;
+import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
@@ -185,11 +190,11 @@
ObjectName c2 = deployConnector(1235, name2);
ObjectName c3 = deployConnector(1236, name3);
- ObjectName cf1 = deployConnectionFactory("jboss.messaging.destination:service=TestConnectionFactory1", name1, "/TestConnectionFactory1", "clientid1");
- ObjectName cf2 = deployConnectionFactory("jboss.messaging.destination:service=TestConnectionFactory2", name2, "/TestConnectionFactory2", "clientid2");
- ObjectName cf3 = deployConnectionFactory("jboss.messaging.destination:service=TestConnectionFactory3", name3, "/TestConnectionFactory3", "clientid3");
+ ObjectName cf1 = deployConnectionFactory("jboss.messaging.destination:service=TestConnectionFactory1", name1, "/TestConnectionFactory1", "clientid1", false);
+ ObjectName cf2 = deployConnectionFactory("jboss.messaging.destination:service=TestConnectionFactory2", name2, "/TestConnectionFactory2", "clientid2", false);
+ ObjectName cf3 = deployConnectionFactory("jboss.messaging.destination:service=TestConnectionFactory3", name3, "/TestConnectionFactory3", "clientid3", false);
//Last one shares the same connector
- ObjectName cf4 = deployConnectionFactory("jboss.messaging.destination:service=TestConnectionFactory4", name3, "/TestConnectionFactory4", "clientid4");
+ ObjectName cf4 = deployConnectionFactory("jboss.messaging.destination:service=TestConnectionFactory4", name3, "/TestConnectionFactory4", "clientid4", false);
JBossConnectionFactory f1 = (JBossConnectionFactory)ic.lookup("/TestConnectionFactory1");
@@ -265,7 +270,7 @@
// Added for http://jira.jboss.org/jira/browse/JBMESSAGING-939
public void testDurableSubscriptionOnPreConfiguredConnectionFactory() throws Exception
{
- ObjectName cf1 = deployConnectionFactory("jboss.messaging.destination:service=TestConnectionFactory1", ServiceContainer.REMOTING_OBJECT_NAME.getCanonicalName(), "/TestDurableCF", "cfTest");
+ ObjectName cf1 = deployConnectionFactory("jboss.messaging.destination:service=TestConnectionFactory1", ServiceContainer.REMOTING_OBJECT_NAME.getCanonicalName(), "/TestDurableCF", "cfTest", false);
ServerManagement.deployTopic("TestSubscriber");
@@ -324,6 +329,163 @@
}
+
+ public void testSlowConsumers() throws Exception
+ {
+ ObjectName cf1 = deployConnectionFactory("jboss.messaging.destination:service=TestConnectionFactorySlowConsumers",
+ ServiceContainer.REMOTING_OBJECT_NAME.getCanonicalName(), "/TestSlowConsumersCF", null, true);
+
+ Connection conn = null;
+
+ try
+ {
+ ConnectionFactory cf = (ConnectionFactory) ic.lookup("/TestSlowConsumersCF");
+
+ conn = cf.createConnection();
+
+ Session session1 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Session session2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ final Object waitLock = new Object();
+
+ final int numMessages = 500;
+
+ class FastListener implements MessageListener
+ {
+ volatile int processed;
+
+ public void onMessage(Message msg)
+ {
+ processed++;
+
+ TextMessage tm = (TextMessage)msg;
+
+ try
+ {
+ log.info("Fast listener got message " + tm.getText());
+ }
+ catch (JMSException e)
+ {
+ }
+
+ if (processed == numMessages - 1)
+ {
+ synchronized (waitLock)
+ {
+ log.info("Notifying");
+ waitLock.notifyAll();
+ }
+ }
+ }
+ }
+
+ final FastListener fast = new FastListener();
+
+ class SlowListener implements MessageListener
+ {
+
+ public void onMessage(Message msg)
+ {
+ TextMessage tm = (TextMessage)msg;
+
+ try
+ {
+ log.info("Slow listener got message " + tm.getText());
+ }
+ catch (JMSException e)
+ {
+ }
+
+ synchronized (waitLock)
+ {
+ //Should really cope with spurious wakeups
+ while (fast.processed != numMessages - 1)
+ {
+ log.info("Waiting");
+ try
+ {
+ waitLock.wait(20000);
+ }
+ catch (InterruptedException e)
+ {
+ }
+ log.info("Waited");
+ }
+ }
+ }
+ }
+
+
+ MessageConsumer cons1 = session1.createConsumer(queue1);
+
+ cons1.setMessageListener(new SlowListener());
+
+ MessageConsumer cons2 = session2.createConsumer(queue1);
+
+ cons2.setMessageListener(fast);
+
+
+ Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = sessSend.createProducer(queue1);
+
+ conn.start();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ TextMessage tm = sessSend.createTextMessage("message" + i);
+
+ prod.send(tm);
+ }
+
+ //All the messages bar one should be consumed by the fast listener - since the slow listener shouldn't buffer any.
+
+ synchronized (waitLock)
+ {
+ //Should really cope with spurious wakeups
+ while (fast.processed != numMessages - 1)
+ {
+ log.info("Waiting");
+ waitLock.wait(20000);
+ log.info("Waited");
+ }
+ }
+
+ assertTrue(fast.processed == numMessages - 1);
+
+ }
+ finally
+ {
+ try
+ {
+ if (conn != null)
+ {
+ log.info("Closing connection");
+ conn.close();
+ log.info("Closed connection");
+ }
+ }
+ catch (Exception e)
+ {
+ log.warn(e.toString(), e);
+ }
+
+
+ try
+ {
+ stopService(cf1);
+ }
+ catch (Exception e)
+ {
+ log.warn(e.toString(), e);
+ }
+
+ }
+
+ }
+
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -375,7 +537,7 @@
return on;
}
- private ObjectName deployConnectionFactory(String name, String connectorName, String binding, String clientID) throws Exception
+ private ObjectName deployConnectionFactory(String name, String connectorName, String binding, String clientID, boolean slowConsumers) throws Exception
{
String mbeanConfig =
"<mbean code=\"org.jboss.jms.server.connectionfactory.ConnectionFactory\"\n" +
@@ -391,6 +553,7 @@
" <binding>" + binding + " </binding>\n" +
" </bindings>\n" +
" </attribute>\n" +
+ " <attribute name=\"SlowConsumers\">" + slowConsumers + "</attribute>\n" +
" </mbean>";
ObjectName on = ServerManagement.deploy(mbeanConfig);
More information about the jboss-cvs-commits
mailing list