[jboss-cvs] JBoss Messaging SVN: r3113 - in trunk: src/etc/xmdesc and 6 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Sep 18 05:44:24 EDT 2007


Author: timfox
Date: 2007-09-18 05:44:24 -0400 (Tue, 18 Sep 2007)
New Revision: 3113

Modified:
   trunk/docs/userguide/en/modules/configuration.xml
   trunk/src/etc/xmdesc/ConnectionFactory-xmbean.xml
   trunk/src/main/org/jboss/jms/server/ConnectionFactoryManager.java
   trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java
   trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
   trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
   trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
   trunk/tests/src/org/jboss/test/messaging/jms/ConnectionFactoryTest.java
Log:
http://jira.jboss.com/jira/browse/JBMESSAGING-1072


Modified: trunk/docs/userguide/en/modules/configuration.xml
===================================================================
--- trunk/docs/userguide/en/modules/configuration.xml	2007-09-17 22:01:10 UTC (rev 3112)
+++ trunk/docs/userguide/en/modules/configuration.xml	2007-09-18 09:44:24 UTC (rev 3113)
@@ -1332,16 +1332,16 @@
                  <section id="conf.destination.queue.attributes.redeliverydelay">
                     <title>RedeliveryDelay</title>
                     <para>The redelivery delay to be used for this queue. Overrides any value set on the ServerPeer config</para>
-                 </section>
-                 
-                 <section id="conf.destination.queue.attributes.maxdeliveryattempts">
-                    <title>MaxDeliveryAttempts</title>
-                    <para>
-                      The maximum number of times delivery of a message will be attempted before sending the message to the DLQ,
-                      if configured. If set to -1 (the default), the value from the ServerPeer config is used. Any other setting
-                      overrides the value set on the ServerPeer config.
-                    </para>
                  </section>
+                 
+                 <section id="conf.destination.queue.attributes.maxdeliveryattempts">
+                    <title>MaxDeliveryAttempts</title>
+                    <para>
+                      The maximum number of times delivery of a message will be attempted before sending the message to the DLQ,
+                      if configured. If set to -1 (the default), the value from the ServerPeer config is used. Any other setting
+                      overrides the value set on the ServerPeer config.
+                    </para>
+                 </section>
 
                  <section id="conf.destination.queue.attributes.security">
                     <title>Destination Security Configuration</title>
@@ -1658,15 +1658,15 @@
                     <para>The redelivery delay to be used for this topic. Overrides any value set on the ServerPeer config</para>
                 </section>
 
-                <section id="conf.destination.topic.attributes.maxdeliveryattempts">
-                    <title>MaxDeliveryAttempts</title>
-                    <para>
-                      The maximum number of times delivery of a message will be attempted before sending the message to the DLQ,
-                      if configured. If set to -1 (the default), the value from the ServerPeer config is used. Any other setting
-                      overrides the value set on the ServerPeer config.
-                    </para>
-                </section>
-
+                <section id="conf.destination.topic.attributes.maxdeliveryattempts">
+                    <title>MaxDeliveryAttempts</title>
+                    <para>
+                      The maximum number of times delivery of a message will be attempted before sending the message to the DLQ,
+                      if configured. If set to -1 (the default), the value from the ServerPeer config is used. Any other setting
+                      overrides the value set on the ServerPeer config.
+                    </para>
+                </section>
+
                 <section id="conf.destination.topic.attributes.security">
                    <title>Destination Security Configuration</title>
 
@@ -2038,6 +2038,8 @@
       &lt;attribute name="LoadBalancingFactory"&gt;org.acme.MyLoadBalancingFactory&lt;/attribute&gt;
           
       &lt;attribute name="PrefetchSize"&gt;1000&lt;/attribute&gt; 
+
+      &lt;attribute name="SlowConsumers"&gt;false&lt;/attribute&gt;
       
       &lt;attribute name="DefaultTempQueueFullSize"&gt;50000&lt;/attribute&gt; 
       
@@ -2056,7 +2058,7 @@
 	        <literal>myClientID</literal> and bind the connection factory in two places in
 	        the JNDI tree: <literal>/MyConnectionFactory</literal>
 	        and <literal>/factories/cf</literal>. 
-	        The connection factory overrides the default values for PreFetchSize,
+	        The connection factory overrides the default values for PreFetchSize, 
 	        DefaultTempQueueFullSize, DefaultTempQueuePageSize, DefaultTempQueueDownCacheSize and
 	        DupsOKBatchSize, SupportsFailover, SupportsLoadBalancing and LoadBalancingFactory.
 	        The connection factory will use the default
@@ -2096,7 +2098,16 @@
 	            The prefetchSize determines the size of this buffer. Larger values give better throughput.
 	           </para>
 	        </section>
+
+               <section id="conf.connectionfactory.attributes.slowconsumers">
+	           <title>SlowConsumers</title>
 	
+	           <para>
+	            If you have very slow consumers, then you probably want to make sure they don't buffer any messages.
+                    Since this can prevent them from being consumed by faster consumers.
+	           </para>
+	        </section>		      
+
 	        <section id="conf.connectionfactory.attributes.tempqueuepaging">
 	           <title>Temporary queue paging parameters</title>
 	

Modified: trunk/src/etc/xmdesc/ConnectionFactory-xmbean.xml
===================================================================
--- trunk/src/etc/xmdesc/ConnectionFactory-xmbean.xml	2007-09-17 22:01:10 UTC (rev 3112)
+++ trunk/src/etc/xmdesc/ConnectionFactory-xmbean.xml	2007-09-18 09:44:24 UTC (rev 3113)
@@ -47,6 +47,12 @@
       <name>PrefetchSize</name>
       <type>int</type>
    </attribute>
+   
+   <attribute access="read-write" getMethod="isSlowConsumers" setMethod="setSlowConsumers">
+      <description>Does this connection factory create slow consumers? Slow consumers never buffer messages</description>
+      <name>SlowConsumers</name>
+      <type>boolean</type>
+   </attribute>   
 
    <attribute access="read-write" getMethod="getDefaultTempQueueFullSize" setMethod="setDefaultTempQueueFullSize">
       <description>The default value of paging full size for any temporary queues created for connections from this connection factory</description>

Modified: trunk/src/main/org/jboss/jms/server/ConnectionFactoryManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ConnectionFactoryManager.java	2007-09-17 22:01:10 UTC (rev 3112)
+++ trunk/src/main/org/jboss/jms/server/ConnectionFactoryManager.java	2007-09-18 09:44:24 UTC (rev 3113)
@@ -41,6 +41,7 @@
    void registerConnectionFactory(String uniqueName, String clientID, JNDIBindings jndiBindings,
                                  String locatorURI, boolean clientPing,
                                  int prefetchSize,
+                                 boolean slowConsumers,
                                  int defaultTempQueueFullSize,
                                  int defaultTempQueuePageSize,
                                  int defaultTempQueueDownCacheSize,

Modified: trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java	2007-09-17 22:01:10 UTC (rev 3112)
+++ trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java	2007-09-18 09:44:24 UTC (rev 3113)
@@ -45,6 +45,8 @@
    
    private int prefetchSize = 150;
    
+   private boolean slowConsumers;
+   
    private boolean supportsFailover;
    
    private boolean supportsLoadBalancing;
@@ -145,7 +147,7 @@
          
          connectionFactoryManager.
             registerConnectionFactory(getServiceName().getCanonicalName(), clientID, jndiBindings,
-                                      locatorURI, enablePing, prefetchSize,
+                                      locatorURI, enablePing, prefetchSize, slowConsumers,
                                       defaultTempQueueFullSize, defaultTempQueuePageSize,                                      
                                       defaultTempQueueDownCacheSize, dupsOKBatchSize, supportsFailover, supportsLoadBalancing,
                                       loadBalancingFactory);
@@ -232,6 +234,16 @@
    {
       this.prefetchSize = prefetchSize;
    }
+   
+   public boolean isSlowConsumers()
+   {
+   	return slowConsumers;
+   }
+   
+   public void setSlowConsumers(boolean slowConsumers)
+   {
+   	this.slowConsumers = slowConsumers;
+   }
 
    public String getClientID()
    {

Modified: trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java	2007-09-17 22:01:10 UTC (rev 3112)
+++ trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java	2007-09-18 09:44:24 UTC (rev 3113)
@@ -107,6 +107,7 @@
                                                       String locatorURI,
                                                       boolean clientPing,
                                                       int prefetchSize,
+                                                      boolean slowConsumers,
                                                       int defaultTempQueueFullSize,
                                                       int defaultTempQueuePageSize,
                                                       int defaultTempQueueDownCacheSize,
@@ -135,6 +136,7 @@
       ServerConnectionFactoryEndpoint endpoint =
          new ServerConnectionFactoryEndpoint(uniqueName, id, serverPeer, clientID,
                                              jndiBindings, prefetchSize,
+                                             slowConsumers,
                                              defaultTempQueueFullSize,
                                              defaultTempQueuePageSize,
                                              defaultTempQueueDownCacheSize,

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java	2007-09-17 22:01:10 UTC (rev 3112)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java	2007-09-18 09:44:24 UTC (rev 3113)
@@ -85,6 +85,8 @@
    private int dupsOKBatchSize;
    
    private boolean supportsFailover;
+   
+   private boolean slowConsumers;
 
    /** Cluster Topology on ClusteredConnectionFactories
        Information to failover to other connections on clients **/
@@ -106,6 +108,7 @@
                                           String defaultClientID,
                                           JNDIBindings jndiBindings,
                                           int preFetchSize,
+                                          boolean slowConsumers,
                                           int defaultTempQueueFullSize,
                                           int defaultTempQueuePageSize,
                                           int defaultTempQueueDownCacheSize,
@@ -123,6 +126,11 @@
       this.defaultTempQueueDownCacheSize = defaultTempQueueDownCacheSize;
       this.dupsOKBatchSize = dupsOKBatchSize;
       this.supportsFailover = supportsFailover;
+      this.slowConsumers = slowConsumers;
+      if (slowConsumers)
+      {
+      	this.prefetchSize = 1;
+      }
    }
 
    // ConnectionFactoryDelegate implementation -----------------------------------------------------
@@ -345,6 +353,11 @@
       this.delegates = delegates;
       this.failoverMap = failoverMap;
    }
+   
+   public boolean isSlowConsumers()
+   {
+   	return slowConsumers;
+   }
 
    public String toString()
    {

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2007-09-17 22:01:10 UTC (rev 3112)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2007-09-18 09:44:24 UTC (rev 3113)
@@ -107,6 +107,8 @@
    
    private boolean replicating;
    
+   private boolean slow;
+   
    // Constructors ---------------------------------------------------------------------------------
 
    ServerConsumerEndpoint(String id, Queue messageQueue, String queueName,
@@ -151,6 +153,8 @@
       
       this.replicating = replicating;
       
+      this.slow = sessionEndpoint.getConnectionEndpoint().getConnectionFactoryEndpoint().isSlowConsumers();
+      
       if (dest.isTopic() && !messageQueue.isRecoverable())
       {
          // This is a consumer of a non durable topic subscription. We don't need to store
@@ -260,6 +264,17 @@
             return delivery;
          }
          
+         if (slow)
+         {
+         	//If this is a slow consumer, we do not want to do any message buffering, so we immediately
+         	//set clientAccepting to false
+         	//When the client has consumed the message it will send a changeRate + message which will set
+         	//clientAccepting to true again
+         	//We cannot just rely on setting the prefetchSize to 1, since this is not a hard guarantee that only one message
+         	//will be buffered at once due to the asynchronous nature of sending changeRate
+         	this.clientAccepting = false;
+         }
+         
          try
          {
          	sessionEndpoint.handleDelivery(delivery, this);
@@ -270,7 +285,7 @@
          	
          	this.started = false; // DO NOT return null or the message might get delivered more than once
          }
-                                  
+                          
          return delivery;
       }
    }

Modified: trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2007-09-17 22:01:10 UTC (rev 3112)
+++ trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2007-09-18 09:44:24 UTC (rev 3113)
@@ -284,7 +284,6 @@
 
    public boolean isSupportsBlobSelect()
    {
-
       if (supportsBlobSelect == null)
       {
          supportsBlobSelect = getSQLStatement("SUPPORTS_BLOB_ON_SELECT").equals("Y") ? Boolean.TRUE:

Modified: trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java	2007-09-17 22:01:10 UTC (rev 3112)
+++ trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java	2007-09-18 09:44:24 UTC (rev 3113)
@@ -301,8 +301,6 @@
 				
 				if (trace) { log.trace("Acknowledged message"); }
 			}
-			
-			//if (queue.)
 		}
 		catch (Exception e)
 		{

Modified: trunk/tests/src/org/jboss/test/messaging/jms/ConnectionFactoryTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/ConnectionFactoryTest.java	2007-09-17 22:01:10 UTC (rev 3112)
+++ trunk/tests/src/org/jboss/test/messaging/jms/ConnectionFactoryTest.java	2007-09-18 09:44:24 UTC (rev 3113)
@@ -24,9 +24,14 @@
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
 import javax.jms.QueueConnection;
 import javax.jms.QueueConnectionFactory;
 import javax.jms.Session;
+import javax.jms.TextMessage;
 import javax.jms.Topic;
 import javax.jms.TopicConnection;
 import javax.jms.TopicConnectionFactory;
@@ -185,11 +190,11 @@
       ObjectName c2 = deployConnector(1235, name2);
       ObjectName c3 = deployConnector(1236, name3);
       
-      ObjectName cf1 = deployConnectionFactory("jboss.messaging.destination:service=TestConnectionFactory1", name1, "/TestConnectionFactory1", "clientid1");
-      ObjectName cf2 = deployConnectionFactory("jboss.messaging.destination:service=TestConnectionFactory2", name2, "/TestConnectionFactory2", "clientid2");
-      ObjectName cf3 = deployConnectionFactory("jboss.messaging.destination:service=TestConnectionFactory3", name3, "/TestConnectionFactory3", "clientid3");
+      ObjectName cf1 = deployConnectionFactory("jboss.messaging.destination:service=TestConnectionFactory1", name1, "/TestConnectionFactory1", "clientid1", false);
+      ObjectName cf2 = deployConnectionFactory("jboss.messaging.destination:service=TestConnectionFactory2", name2, "/TestConnectionFactory2", "clientid2", false);
+      ObjectName cf3 = deployConnectionFactory("jboss.messaging.destination:service=TestConnectionFactory3", name3, "/TestConnectionFactory3", "clientid3", false);
       //Last one shares the same connector
-      ObjectName cf4 = deployConnectionFactory("jboss.messaging.destination:service=TestConnectionFactory4", name3, "/TestConnectionFactory4", "clientid4");
+      ObjectName cf4 = deployConnectionFactory("jboss.messaging.destination:service=TestConnectionFactory4", name3, "/TestConnectionFactory4", "clientid4", false);
       
       
       JBossConnectionFactory f1 = (JBossConnectionFactory)ic.lookup("/TestConnectionFactory1");            
@@ -265,7 +270,7 @@
    // Added for http://jira.jboss.org/jira/browse/JBMESSAGING-939
    public void testDurableSubscriptionOnPreConfiguredConnectionFactory() throws Exception
    {
-      ObjectName cf1 = deployConnectionFactory("jboss.messaging.destination:service=TestConnectionFactory1", ServiceContainer.REMOTING_OBJECT_NAME.getCanonicalName(), "/TestDurableCF", "cfTest");
+      ObjectName cf1 = deployConnectionFactory("jboss.messaging.destination:service=TestConnectionFactory1", ServiceContainer.REMOTING_OBJECT_NAME.getCanonicalName(), "/TestDurableCF", "cfTest", false);
 
       ServerManagement.deployTopic("TestSubscriber");
 
@@ -324,6 +329,163 @@
 
    }
 
+   
+   public void testSlowConsumers() throws Exception
+   {
+      ObjectName cf1 = deployConnectionFactory("jboss.messaging.destination:service=TestConnectionFactorySlowConsumers",
+      		                                   ServiceContainer.REMOTING_OBJECT_NAME.getCanonicalName(), "/TestSlowConsumersCF", null, true);
+
+      Connection conn = null;
+
+      try
+      {
+         ConnectionFactory cf = (ConnectionFactory) ic.lookup("/TestSlowConsumersCF");
+         
+         conn = cf.createConnection();
+
+         Session session1 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         Session session2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         final Object waitLock = new Object();
+         
+         final int numMessages = 500;
+               
+         class FastListener implements MessageListener
+         {
+         	volatile int processed;
+         	
+         	public void onMessage(Message msg)
+				{
+         		processed++;
+         		
+         		TextMessage tm = (TextMessage)msg;
+         		
+         		try
+         		{
+         			log.info("Fast listener got message " + tm.getText());
+         		}
+               catch (JMSException e)
+               {               	
+               }
+               
+         		if (processed == numMessages - 1)
+         		{
+         			synchronized (waitLock)
+         			{
+         				log.info("Notifying");
+         				waitLock.notifyAll();
+         			}
+         		}
+				}
+         }
+         
+         final FastListener fast = new FastListener();
+         
+         class SlowListener implements MessageListener
+         {
+
+				public void onMessage(Message msg)
+				{
+               TextMessage tm = (TextMessage)msg;
+         		
+               try
+               {
+               	log.info("Slow listener got message " + tm.getText());
+               }
+               catch (JMSException e)
+               {               	
+               }
+					
+               synchronized (waitLock)
+               {
+               	//Should really cope with spurious wakeups
+               	while (fast.processed != numMessages - 1)
+               	{
+               		log.info("Waiting");
+               		try
+               		{
+               			waitLock.wait(20000);
+               		}
+               		catch (InterruptedException e)
+               		{               			
+               		}
+               		log.info("Waited");
+               	}
+               }
+				}         	
+         }
+         
+
+         MessageConsumer cons1 = session1.createConsumer(queue1);
+         
+         cons1.setMessageListener(new SlowListener());
+         
+         MessageConsumer cons2 = session2.createConsumer(queue1);
+         
+         cons2.setMessageListener(fast);
+         
+   
+         Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         MessageProducer prod = sessSend.createProducer(queue1);
+         
+         conn.start();
+         
+         for (int i = 0; i < numMessages; i++)
+         {
+         	TextMessage tm = sessSend.createTextMessage("message" + i);
+         	
+         	prod.send(tm);
+         }
+         
+         //All the messages bar one should be consumed by the fast listener  - since the slow listener shouldn't buffer any.
+         
+         synchronized (waitLock)
+         {
+         	//Should really cope with spurious wakeups
+         	while (fast.processed != numMessages - 1)
+         	{
+         		log.info("Waiting");
+         		waitLock.wait(20000);
+         		log.info("Waited");
+         	}
+         }
+         
+         assertTrue(fast.processed == numMessages - 1);
+         
+      }
+      finally
+      {
+         try
+         {
+            if (conn != null)
+            {
+            	log.info("Closing connection");
+               conn.close();
+               log.info("Closed connection");
+            }
+         }
+         catch (Exception e)
+         {
+            log.warn(e.toString(), e);
+         }
+
+
+         try
+         {
+            stopService(cf1);
+         }
+         catch (Exception e)
+         {
+            log.warn(e.toString(), e);
+         }
+
+      }
+
+   }
+   
+   
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
@@ -375,7 +537,7 @@
       return on;
    }
    
-   private ObjectName deployConnectionFactory(String name, String connectorName, String binding, String clientID) throws Exception
+   private ObjectName deployConnectionFactory(String name, String connectorName, String binding, String clientID, boolean slowConsumers) throws Exception
    {
       String mbeanConfig =
             "<mbean code=\"org.jboss.jms.server.connectionfactory.ConnectionFactory\"\n" +
@@ -391,6 +553,7 @@
             "            <binding>" + binding + " </binding>\n" +
             "          </bindings>\n" +
             "       </attribute>\n" +
+            "       <attribute name=\"SlowConsumers\">" + slowConsumers + "</attribute>\n" +            
             " </mbean>";
 
       ObjectName on = ServerManagement.deploy(mbeanConfig);




More information about the jboss-cvs-commits mailing list