[jboss-cvs] JBoss Messaging SVN: r6310 - in trunk: examples/jms/paging/config and 4 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Apr 3 17:58:39 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-04-03 17:58:38 -0400 (Fri, 03 Apr 2009)
New Revision: 6310

Modified:
   trunk/examples/jms/paging/config/jbm-jms.xml
   trunk/examples/jms/paging/config/jbm-queues.xml
   trunk/examples/jms/paging/readme.html
   trunk/examples/jms/paging/src/org/jboss/jms/example/PagingExample.java
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   trunk/src/schemas/jbm-configuration.xsd
   trunk/tests/src/org/jboss/messaging/tests/integration/client/PagingTest.java
Log:
Adding Paging Test, and fixing a bug on depaging and Queue empty

Modified: trunk/examples/jms/paging/config/jbm-jms.xml
===================================================================
--- trunk/examples/jms/paging/config/jbm-jms.xml	2009-04-03 21:10:42 UTC (rev 6309)
+++ trunk/examples/jms/paging/config/jbm-jms.xml	2009-04-03 21:58:38 UTC (rev 6310)
@@ -14,5 +14,9 @@
    <queue name="exampleQueue">
       <entry name="/queue/exampleQueue"/>
    </queue>
+   
+   <queue name="pagingQueue">
+      <entry name="/queue/pagingQueue"/>
+   </queue>
 
 </deployment>
\ No newline at end of file

Modified: trunk/examples/jms/paging/config/jbm-queues.xml
===================================================================
--- trunk/examples/jms/paging/config/jbm-queues.xml	2009-04-03 21:10:42 UTC (rev 6309)
+++ trunk/examples/jms/paging/config/jbm-queues.xml	2009-04-03 21:58:38 UTC (rev 6310)
@@ -11,5 +11,19 @@
       <permission type="consume" roles="guest"/>
       <permission type="send" roles="guest"/>
    </security>
+   
+   <security match="queuejms.pagingQueue">
+      <permission type="createDurableQueue" roles="guest"/>
+      <permission type="deleteDurableQueue" roles="guest"/>
+      <permission type="createTempQueue" roles="guest"/>
+      <permission type="deleteTempQueue" roles="guest"/>
+      <permission type="consume" roles="guest"/>
+      <permission type="send" roles="guest"/>
+   </security>
+   
+   <address-settings match="queuejms.pagingQueue">
+      <max-size-bytes>100000</max-size-bytes>
+      <page-size-bytes>20000</page-size-bytes>
+   </address-settings>
 
 </settings>

Modified: trunk/examples/jms/paging/readme.html
===================================================================
--- trunk/examples/jms/paging/readme.html	2009-04-03 21:10:42 UTC (rev 6309)
+++ trunk/examples/jms/paging/readme.html	2009-04-03 21:58:38 UTC (rev 6310)
@@ -22,64 +22,97 @@
            <code>InitialContext initialContext = getContext();</code>
         </pre>
 
-        <li>We look-up the JMS queue object from JNDI</li>
+        <li>We look-up the JMS connection factory object from JNDI</li>
         <pre>
-           <code>Queue queue = (Queue) initialContext.lookup("/queue/exampleQueue");</code>
+           <code>ConnectionFactory cf = (ConnectionFactory) initialContext.lookup("/ConnectionFactory");</code>
         </pre>
 
-        <li>We look-up the JMS connection factory object from JNDI</li>
+        <li>We look-up the JMS queue object from JNDI. pagingQueue is configured to hold a very limited number of bytes in memory</li>
         <pre>
-           <code>ConnectionFactory cf = (ConnectionFactory) initialContext.lookup("/ConnectionFactory");</code>
+           <code>Queue pageQueue = (Queue) initialContext.lookup("/queue/pagingQueue");</code>
         </pre>
 
+        <li>We look-up the JMS queue object from JNDI.</li>
+        <pre>
+           <code>Queue queue = (Queue) initialContext.lookup("/queue/exampleQueue");</code>
+        </pre>
+
         <li>We create a JMS connection</li>
         <pre>
            <code>connection = cf.createConnection();</code>
         </pre>
 
-        <li>We create a JMS session. The session is created as non transacted and will auto acknowledge messages.</li>
+        <li>We create a JMS session. The session is created as non transacted. We will use client acknowledgement on this example.</li>
         <pre>
-           <code>Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);</code>
+           <code>Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);</code>
         </pre>
-        
-        <li>We create a JMS message producer on the session. This will be used to send the messages.</li>
-        <pre>
-          <code>MessageProducer messageProducer = session.createProducer(topic);</code>
-       </pre>
 
-        <li>We don't need persistent messages in order to use paging. (This step is optional)</li>
-        <pre><code>
-        messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-        </code></pre>
 
-        <li>Create a Binary Bytes Message with 10K arbitrary bytes</li>
-        <pre><code>
+         <li>Create a JMS Message Producer for pageQueueAddress</li>
+         <pre><code>
+         MessageProducer pageMessageProducer = session.createProducer(pageQueue);
+         </pre></code>
+         
+         <li>We don't need persistent messages in order to use paging. (This step is optional)</li>
+         <pre><code>
+         pageMessageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+         </pre></code>
+         
+         <li>Create a Binary Bytes Message with 10K arbitrary bytes</li>
+         <pre><code>
          BytesMessage message = session.createBytesMessage();
-         message.writeBytes(new byte[10 * 1024]);</code>
-       </pre>
-       
-       <li>Send the same message over 30K times, which should be over the max limit configured and imposed by the server. Probably also over to what would fit on the server's memory</li>
-       
-       <pre><code>
+         message.writeBytes(new byte[10 * 1024]);
+         </pre></code>
+         
+
+         <li>Send only 20 messages to the Queue. This will be already enough for pagingQueue. Look at ./paging/config/jbm-queues.xml for the config.</li>
+         <pre><code>
+         for (int i = 0; i < 20; i++)
+         {
+            pageMessageProducer.send(message);
+         }         
+         </pre></code>
+         
+         <li>Create a JMS Message Producer</li>
+         <pre><code>
+         MessageProducer messageProducer = session.createProducer(queue);
+         </pre></code>
+         
+         <li>We don't need persistent messages in order to use paging. (This step is optional)</li>
+         <pre><code>
+         messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+         </pre></code>
+
+         <li>Send the message for about 30K, which should be over the memory limit imposed by the server</li>
+         <pre><code>
          for (int i = 0; i < 30000; i++)
          {
             messageProducer.send(message);
-         }</code>
-       </pre>
-       
-       <li>Create a JMS Consumer</p>
-       <pre><code>
-       MessageConsumer messageConsumer = session.createConsumer(queue);
-       </code></pre>
-       
-       <li> Start the JMS Connection. This step will activate the subscribers to receive messages.</li>
-       <pre><code>
-       connection.start();
-       </code></pre>
+         }
+         </pre></code>
 
+         <li>if you pause the example here, you will several files under ./build/data/paging</li>
+         
+         <pre><code>
+         // Thread.sleep(30000); // if you want to just our of curiosity, you can sleep here and inspect the created files just for 
+         </pre></code>
+         
+         
+         <li>Create a JMS Message Consumer</li>
+         <pre><code>
+         MessageConsumer messageConsumer = session.createConsumer(queue);
+         </pre></code>
+         
 
-        <li>Receive the messages. It's important to ACK for messages as JBM will not read messages from the paging file system until messages are ACKed or that would lead the server to be OutOfMemory</li>
-        <pre><code>
+         <li>Start the JMS Connection. This step will activate the subscribers to receive messages.</li>
+         <pre><code>
+         connection.start();
+         </pre></code>
+         
+         
+         <li>Receive the messages. It's important to ACK for messages as JBM will not read messages from paging until messages are ACKed</li>
+         
+         <pre><code>
          for (int i = 0; i < 30000; i++)
          {
             message = (BytesMessage)messageConsumer.receive(1000);
@@ -91,23 +124,39 @@
                message.acknowledge();
             }
          }
-        </code></pre>
+         </pre></code>
+         
+         <li>Receive the messages from the Queue names pageQueue. Create the proper consumer for that.</li>
+         <pre><code>
+         messageConsumer.close();
+         messageConsumer = session.createConsumer(pageQueue);
 
+         for (int i = 0; i < 20; i++)
+         {
+            message = (BytesMessage)messageConsumer.receive(1000);
+            
+            System.out.println("Received message " + i + " from pageQueue");
+
+            message.acknowledge();
+         }
+         </pre></code>
+
         <li>And finally, <b>always</b> remember to close your JMS connections after use, in a <code>finally</code> block. Closing a JMS connection will automatically close all of its sessions, consumers, producer and browser objects</li>
 
-        <pre>
-           <code>
-
+         <pre><code>
+      }
       finally
       {
-         if (connection != null)
+         if (initialContext != null)
          {
-            // Step 12. Be sure to close our JMS resources!
+            initialContext.close();
+         }
+         
+         if(connection != null)
+         {
             connection.close();
          }
       }
-           </code>
-        </pre>
 
      </ol>
   </body>

Modified: trunk/examples/jms/paging/src/org/jboss/jms/example/PagingExample.java
===================================================================
--- trunk/examples/jms/paging/src/org/jboss/jms/example/PagingExample.java	2009-04-03 21:10:42 UTC (rev 6309)
+++ trunk/examples/jms/paging/src/org/jboss/jms/example/PagingExample.java	2009-04-03 21:58:38 UTC (rev 6310)
@@ -34,7 +34,7 @@
 /**
  * A simple JMS Queue example that creates a producer and consumer on a queue and sends then receives a message.
  *
- * @author <a href="ataylor at redhat.com">Andy Taylor</a>
+ * @author <a href="csuconic at redhat.com">Clebert Suconic</a>
  */
 public class PagingExample extends JMSExample
 {
@@ -46,50 +46,69 @@
    public void runExample() throws Exception
    {
       Connection connection = null;
+      
+      InitialContext initialContext = null;
       try
       {
          //Step 1. Create an initial context to perform the JNDI lookup.
-         InitialContext initialContext = getContext();
+         initialContext = getContext();
 
-         //Step 2. Perfom a lookup on the queue
+         //Step 2. Perform a lookup on the Connection Factory
+         ConnectionFactory cf = (ConnectionFactory) initialContext.lookup("/ConnectionFactory");
+
+         // Step 3. We look-up the JMS queue object from JNDI. pagingQueue is configured to hold a very limited number of bytes in memory
+         Queue pageQueue = (Queue) initialContext.lookup("/queue/pagingQueue");
+         
+         // Step 4. Lookup for a JMS Queue 
          Queue queue = (Queue) initialContext.lookup("/queue/exampleQueue");
 
-         //Step 3. Perform a lookup on the Connection Factory
-         ConnectionFactory cf = (ConnectionFactory) initialContext.lookup("/ConnectionFactory");
-
-         //Step 4.Create a JMS Connection
+         // Step 5. Create a JMS Connection
          connection = cf.createConnection();
-
-         //Step 5. Create a JMS Session
+         
+         //Step 6. Create a JMS Session
          Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
 
-         //Step 6. Create a JMS Message Producer
+         //Step 7. Create a JMS Message Producer for pageQueueAddress
+         MessageProducer pageMessageProducer = session.createProducer(pageQueue);
+         
+         //Step 8. We don't need persistent messages in order to use paging. (This step is optional)
+         pageMessageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+         
+         //Step 9. Create a Binary Bytes Message with 10K arbitrary bytes
+         BytesMessage message = session.createBytesMessage();
+         message.writeBytes(new byte[10 * 1024]);
+         
+
+         //Step 10. Send only 20 messages to the Queue. This will be already enough for pagingQueue. Look at ./paging/config/jbm-queues.xml for the config.
+         for (int i = 0; i < 20; i++)
+         {
+            pageMessageProducer.send(message);
+         }         
+         
+         //Step 11. Create a JMS Message Producer
          MessageProducer messageProducer = session.createProducer(queue);
          
-         //Step 7. We don't need persistent messages in order to use paging. (This step is optional)
+         //Step 12. We don't need persistent messages in order to use paging. (This step is optional)
          messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
 
-         //Step 8. Create a Binary Bytes Message with 10K arbitrary bytes
-         BytesMessage message = session.createBytesMessage();
-         message.writeBytes(new byte[10 * 1024]);
-         
-         //Step 9. Send the message for about 30K, which should be over the memory limit imposed by the server
+         //Step 13. Send the message for about 30K, which should be over the memory limit imposed by the server
          for (int i = 0; i < 30000; i++)
          {
             messageProducer.send(message);
          }
+
+         // Step 14. if you pause this example here, you will several files under ./build/data/paging
+         // Thread.sleep(30000); // if you want to just our of curiosity, you can sleep here and inspect the created files just for 
          
-         
-         //Step 10. Create a JMS Message Consumer
+         //Step 15. Create a JMS Message Consumer
          MessageConsumer messageConsumer = session.createConsumer(queue);
          
 
-         //Step 11.  Start the JMS Connection. This step will activate the subscribers to receive messages.
+         //Step 16.  Start the JMS Connection. This step will activate the subscribers to receive messages.
          connection.start();
          
          
-         //Step 12. Receive the messages. 
-         //         It's important to ACK for messages as JBM will not read messages from paging until messages are ACKed or that would lead the server to be OutOfMemory
+         //Step 17. Receive the messages. It's important to ACK for messages as JBM will not read messages from paging until messages are ACKed
          
          for (int i = 0; i < 30000; i++)
          {
@@ -103,13 +122,33 @@
             }
          }
          
-         System.out.println("Received 30000 messages");
+         
+         // Step 18. Receive the messages from the Queue names pageQueue. Create the proper consumer for that
+         messageConsumer.close();
+         messageConsumer = session.createConsumer(pageQueue);
 
-         initialContext.close();
+         for (int i = 0; i < 20; i++)
+         {
+            message = (BytesMessage)messageConsumer.receive(1000);
+            
+            System.out.println("Received message " + i + " from pageQueue");
+
+            message.acknowledge();
+         }
+         
+
+         
+
       }
       finally
       {
-         //Step 12. Be sure to close our JMS resources!
+         // And finally, always remember to close your JMS connections after use, in a finally block. Closing a JMS connection will automatically close all of its sessions, consumers, producer and browser objects
+         
+         if (initialContext != null)
+         {
+            initialContext.close();
+         }
+         
          if(connection != null)
          {
             connection.close();

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-04-03 21:10:42 UTC (rev 6309)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-04-03 21:58:38 UTC (rev 6310)
@@ -1232,6 +1232,9 @@
                   // If the queue is empty, we need to check if there are pending messages, and throw a warning
                   if (pagingStore.isPaging() && !pagingStore.isDropWhenMaxSize())
                   {
+                     // This is just a *request* to depage. Depage will only happens if there is space on the Address and GlobalSize
+                     pagingStore.startDepaging();
+                     
                      log.warn("The Queue " + name +
                               " is empty, however there are pending messages on Paging for the address " +
                               pagingStore.getStoreName() +

Modified: trunk/src/schemas/jbm-configuration.xsd
===================================================================
--- trunk/src/schemas/jbm-configuration.xsd	2009-04-03 21:10:42 UTC (rev 6309)
+++ trunk/src/schemas/jbm-configuration.xsd	2009-04-03 21:58:38 UTC (rev 6310)
@@ -132,7 +132,7 @@
 				<xsd:element name="paging-directory" type="xsd:string"
 					maxOccurs="1" minOccurs="0">
 				</xsd:element>
-				<xsd:element name="paging-global-watermark-size" type="xsd:positiveInteger"
+				<xsd:element name="paging-global-watermark-size" type="xsd:unsignedLong"
 					maxOccurs="1" minOccurs="0">
 				</xsd:element>
 				<xsd:element name="paging-max-global-size-bytes"

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/client/PagingTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/PagingTest.java	2009-04-03 21:10:42 UTC (rev 6309)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/PagingTest.java	2009-04-03 21:58:38 UTC (rev 6310)
@@ -720,11 +720,7 @@
          assertNull(consumer.receive(100));
 
          assertEquals(0, server.getPostOffice().getPagingManager().getGlobalSize());
-         assertEquals(0, server
-                                         .getPostOffice()
-                                         .getPagingManager()
-                                         .getPageStore(ADDRESS)
-                                         .getAddressSize());
+         assertEquals(0, server.getPostOffice().getPagingManager().getPageStore(ADDRESS).getAddressSize());
 
          for (int i = 0; i < numberOfMessages; i++)
          {
@@ -785,11 +781,7 @@
          session.close();
 
          assertEquals(0, server.getPostOffice().getPagingManager().getGlobalSize());
-         assertEquals(0, server
-                                         .getPostOffice()
-                                         .getPagingManager()
-                                         .getPageStore(ADDRESS)
-                                         .getAddressSize());
+         assertEquals(0, server.getPostOffice().getPagingManager().getPageStore(ADDRESS).getAddressSize());
 
       }
       finally
@@ -893,19 +885,15 @@
 
          for (int i = 0; i < NUMBER_OF_BINDINGS; i++)
          {
-            Queue queue = (Queue)server
-                                                 .getPostOffice()
-                                                 .getBinding(new SimpleString("someQueue" + i))
-                                                 .getBindable();
+            Queue queue = (Queue)server.getPostOffice().getBinding(new SimpleString("someQueue" + i)).getBindable();
 
             assertEquals("Queue someQueue" + i + " was supposed to be empty", 0, queue.getMessageCount());
             assertEquals("Queue someQueue" + i + " was supposed to be empty", 0, queue.getDeliveringCount());
          }
 
-         assertEquals("There are pending messages on the server", 0, server
-                                                                                     .getPostOffice()
-                                                                                     .getPagingManager()
-                                                                                     .getGlobalSize());
+         assertEquals("There are pending messages on the server", 0, server.getPostOffice()
+                                                                           .getPagingManager()
+                                                                           .getGlobalSize());
 
       }
       finally
@@ -1300,6 +1288,139 @@
       }
    }
 
+   public void testPageTwoDestinationsGlobalAndAddresSettings() throws Exception
+   {
+      clearData();
+      
+      SimpleString PAGED_ADDRESS_A = new SimpleString("paged-a");
+      SimpleString PAGED_ADDRESS_GLOBAL = new SimpleString("paged-global");
+
+      Configuration configuration = createDefaultConfig();
+      configuration.setPagingMaxGlobalSizeBytes(104857600);
+
+      System.out.println("getPagingMaxGlobalSizeBytes:" + configuration.getPagingMaxGlobalSizeBytes());
+      System.out.println("getPagingGlobalWatermarkSize:" + configuration.getPagingGlobalWatermarkSize());
+      
+      Map<String, AddressSettings> addresses = new HashMap<String, AddressSettings>();
+
+      addresses.put("#", new AddressSettings());
+
+      AddressSettings pagedDestinationA = new AddressSettings();
+      pagedDestinationA.setPageSizeBytes(20000);
+      pagedDestinationA.setMaxSizeBytes(100000);
+
+      addresses.put(PAGED_ADDRESS_A.toString(), pagedDestinationA);
+
+      MessagingServer server = createServer(true, configuration, addresses);
+      
+      ClientSession session = null;
+      
+      try
+      {
+         server.start();
+
+         ClientSessionFactory sf = createInVMFactory();
+        
+         session = sf.createSession(false, true, false);
+
+         session.createQueue(PAGED_ADDRESS_A, PAGED_ADDRESS_A, true);
+
+         session.start();
+
+         session.createQueue(PAGED_ADDRESS_GLOBAL, PAGED_ADDRESS_GLOBAL, true);
+
+         ClientProducer producerA = session.createProducer(PAGED_ADDRESS_A);
+
+         ClientProducer producerGlobal = session.createProducer(PAGED_ADDRESS_GLOBAL);
+
+         int NUMBER_OF_MESSAGES_A = 20;
+
+         int NUMBER_OF_MESSAGES_GLOBAL = 30000;
+
+         ClientMessage msg = session.createClientMessage(false);
+         msg.getBody().writeBytes(new byte[10 * 1024]);
+
+         for (int i = 0; i < NUMBER_OF_MESSAGES_A; i++)
+         {
+            producerA.send(msg);
+         }
+
+         session.commit(); // commit was called to clean the buffer only (making sure everything is on the server side)
+
+         assertTrue(server.getPostOffice().getPagingManager().getPageStore(PAGED_ADDRESS_A).isPaging());
+         assertFalse(server.getPostOffice().getPagingManager().getPageStore(PAGED_ADDRESS_GLOBAL).isPaging());
+
+         System.out.println("AddressA.size = " + server.getPostOffice()
+                                                       .getPagingManager()
+                                                       .getPageStore(PAGED_ADDRESS_A)
+                                                       .getAddressSize() +
+                            " globalSize = " +
+                            server.getPostOffice().getPagingManager().getGlobalSize());
+
+         for (int i = 0; i < NUMBER_OF_MESSAGES_GLOBAL; i++)
+         {
+            producerGlobal.send(msg);
+         }
+
+
+         System.out.println("AddressA.size = " + server.getPostOffice()
+                                                       .getPagingManager()
+                                                       .getPageStore(PAGED_ADDRESS_A)
+                                                       .getAddressSize() +
+                            " globalSize = " +
+                            server.getPostOffice().getPagingManager().getGlobalSize());
+
+         assertTrue(server.getPostOffice().getPagingManager().getPageStore(PAGED_ADDRESS_A).isPaging());
+         assertTrue(server.getPostOffice().getPagingManager().getPageStore(PAGED_ADDRESS_GLOBAL).isPaging());
+
+         ClientConsumer consumerGlobal = session.createConsumer(PAGED_ADDRESS_GLOBAL);
+
+         for (int i = 0; i < NUMBER_OF_MESSAGES_GLOBAL; i++)
+         {
+            msg = consumerGlobal.receive(5000);
+            assertNotNull("Couldn't receive a message on consumerGlobal, iteration = " + i, msg);
+            msg.acknowledge();
+            if (i % 1000 == 0)
+            {
+               session.commit();
+            }
+         }
+         
+         session.commit();
+         
+         assertNull(consumerGlobal.receiveImmediate());
+
+         ClientConsumer consumerA = session.createConsumer(PAGED_ADDRESS_A);
+
+         for (int i = 0; i < NUMBER_OF_MESSAGES_A; i++)
+         {
+            msg = consumerA.receive(5000);
+            assertNotNull("Couldn't receive a message on consumerA, iteration = " + i, msg);
+            msg.acknowledge();
+            session.commit();
+         }
+
+         assertNull(consumerA.receiveImmediate());
+
+         consumerA.close();
+
+         session.commit();
+
+         session.close();
+      }
+      finally
+      {
+         if (session != null)
+         {
+            session.close();
+         }
+         if (server.isStarted())
+         {
+            server.stop();
+         }
+      }
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------




More information about the jboss-cvs-commits mailing list