[hornetq-commits] JBoss hornetq SVN: r9244 - in trunk: src/main/org/hornetq/core/protocol/core and 4 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue May 18 12:30:21 EDT 2010


Author: timfox
Date: 2010-05-18 12:30:21 -0400 (Tue, 18 May 2010)
New Revision: 9244

Modified:
   trunk/docs/user-manual/en/paging.xml
   trunk/docs/user-manual/en/thread-pooling.xml
   trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
   trunk/src/main/org/hornetq/core/server/ServerConsumer.java
   trunk/src/main/org/hornetq/core/server/ServerSession.java
   trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
   trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   trunk/tests/src/org/hornetq/tests/integration/clientcrash/ClientCrashTest.java
   trunk/tests/src/org/hornetq/tests/integration/clientcrash/CrashClient.java
Log:
https://jira.jboss.org/browse/HORNETQ-390 and some tweaks

Modified: trunk/docs/user-manual/en/paging.xml
===================================================================
--- trunk/docs/user-manual/en/paging.xml	2010-05-18 13:38:01 UTC (rev 9243)
+++ trunk/docs/user-manual/en/paging.xml	2010-05-18 16:30:21 UTC (rev 9244)
@@ -155,6 +155,7 @@
             able to continue sending.</para>
         <para>To do this just set the <literal>address-full-policy</literal> to <literal
                 >BLOCK</literal> in the address settings</para>
+        <para>In the default configuration, all addresses are configured to block producers after 10 MiB of data are in the address.</para>
     </section>
     <section>
         <title>Caution with Addresses with Multiple Queues</title>

Modified: trunk/docs/user-manual/en/thread-pooling.xml
===================================================================
--- trunk/docs/user-manual/en/thread-pooling.xml	2010-05-18 13:38:01 UTC (rev 9243)
+++ trunk/docs/user-manual/en/thread-pooling.xml	2010-05-18 16:30:21 UTC (rev 9244)
@@ -28,14 +28,18 @@
             thread pool for scheduled use. A Java scheduled thread pool cannot be configured to use
             a standard thread pool, otherwise we could use a single thread pool for both scheduled
             and non scheduled activity.</para>
-        <para>When using old (blocking) IO, a separate thread pool is also used to service connections. Since old IO requires a thread per connection
-        it does not make sense to get them from the standard pool as the pool will easily get exhausted if too many connections are made, resulting in the
-        server "hanging" since it has no remaining threads to do anything else.</para>
-        <para>When using new IO (NIO), HornetQ will, by default, use a number of threads equal to three times the number of cores (or hyper-threads)
-            as reported by Runtime.getRuntime().availableProcessors() for processing incoming packets.
-            If you want to override this value, you can set the number of threads by specifying the parameter <literal>nio-remoting-threads</literal>
-            in the transport configuration. See the
-            <xref linkend="configuring-transports"/> for more information on this.</para>
+        <para>When using old (blocking) IO, a separate thread pool is also used to service
+            connections. Since old IO requires a thread per connection it does not make sense to get
+            them from the standard pool as the pool will easily get exhausted if too many
+            connections are made, resulting in the server "hanging" since it has no remaining
+            threads to do anything else. If you require the server to handle many concurrent
+            connections you should make sure you use NIO, not old IO.</para>
+        <para>When using new IO (NIO), HornetQ will, by default, use a number of threads equal to
+            three times the number of cores (or hyper-threads) as reported by
+            Runtime.getRuntime().availableProcessors() for processing incoming packets. If you want
+            to override this value, you can set the number of threads by specifying the parameter
+                <literal>nio-remoting-threads</literal> in the transport configuration. See the
+                <xref linkend="configuring-transports"/> for more information on this.</para>
         <para>There are also a small number of other places where threads are used directly, we'll
             discuss each in turn.</para>
         <section id="server.scheduled.thread.pool">
@@ -111,8 +115,9 @@
 myFactory.setUseGlobalPools(false);
 myFactory.setScheduledThreadPoolMaxSize(10);
 myFactory.setThreadPoolMaxSize(-1);   </programlisting>
-        <para>If you're using the JMS API, you can set the same parameters on the ClientSessionFactory and use it to create the <literal
-                >ConnectionFactory</literal> instance, for example:</para>
+        <para>If you're using the JMS API, you can set the same parameters on the
+            ClientSessionFactory and use it to create the <literal>ConnectionFactory</literal>
+            instance, for example:</para>
         <programlisting>ConnectionFactory myConnectionFactory = HornetQJMSClient.createConnectionFactory(myFactory);     </programlisting>
         <para>If you're using JNDI to instantiate <literal>HornetQConnectionFactory</literal>
             instances, you can also set these parameters in the <literal>hornetq-jms.xml</literal>

Modified: trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java	2010-05-18 13:38:01 UTC (rev 9243)
+++ trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java	2010-05-18 16:30:21 UTC (rev 9244)
@@ -165,7 +165,7 @@
 
       try
       {
-         session.close();
+         session.close(true);
       }
       catch (Exception e)
       {
@@ -181,7 +181,7 @@
 
       try
       {
-         session.close();
+         session.close(false);
       }
       catch (Exception e)
       {
@@ -422,7 +422,7 @@
                case SESS_CLOSE:
                {
                   requiresResponse = true;
-                  session.close();
+                  session.close(false);
                   removeConnectionListeners();
                   response = new NullResponseMessage();
                   flush = true;

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2010-05-18 13:38:01 UTC (rev 9243)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2010-05-18 16:30:21 UTC (rev 9244)
@@ -54,7 +54,7 @@
 
    // TODO use same value than HornetQConnection
    private static final String CONNECTION_ID_PROP = "__HQ_CID";
-   
+
    // Attributes ----------------------------------------------------
 
    private final HornetQServer server;
@@ -108,7 +108,7 @@
       this.frameDecoder = new StompFrameDecoder();
       this.executor = server.getExecutorFactory().getExecutor();
    }
-   
+
    // ProtocolManager implementation --------------------------------
 
    public ConnectionEntry createConnectionEntry(final Connection connection)
@@ -131,26 +131,25 @@
       if (frame == null)
       {
          return -1;
-      } 
+      }
       else
       {
          return buffer.readerIndex() - start;
       }
    }
 
-
    public void handleBuffer(final RemotingConnection connection, final HornetQBuffer buffer)
    {
       try
       {
          doHandleBuffer(connection, buffer);
-      } 
+      }
       finally
       {
          server.getStorageManager().clearContext();
       }
    }
-   
+
    private void doHandleBuffer(final RemotingConnection connection, final HornetQBuffer buffer)
    {
       StompConnection conn = (StompConnection)connection;
@@ -162,7 +161,7 @@
          {
             log.trace("received " + request);
          }
-         
+
          String command = request.getCommand();
          StompFrame response = null;
 
@@ -269,6 +268,7 @@
          }
       }
    }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
@@ -290,11 +290,12 @@
       }
       if (noLocal)
       {
-         String noLocalFilter = CONNECTION_ID_PROP + " <> '"  + connection.getID().toString() + "'";
+         String noLocalFilter = CONNECTION_ID_PROP + " <> '" + connection.getID().toString() + "'";
          if (selector == null)
          {
-            selector = noLocalFilter; 
-         } else
+            selector = noLocalFilter;
+         }
+         else
          {
             selector += " AND " + noLocalFilter;
          }
@@ -325,7 +326,13 @@
       }
       long consumerID = server.getStorageManager().generateUniqueID();
       String clientID = (connection.getClientID() != null) ? connection.getClientID() : null;
-      stompSession.addSubscription(consumerID, subscriptionID, clientID, durableSubscriptionName, destination, selector, ack);
+      stompSession.addSubscription(consumerID,
+                                   subscriptionID,
+                                   clientID,
+                                   durableSubscriptionName,
+                                   destination,
+                                   selector,
+                                   ack);
 
       return null;
    }
@@ -423,7 +430,7 @@
       }
 
       StompSession session = getTransactedSession(connection, txID);
-      
+
       if (session == null)
       {
          throw new StompException("No transaction started: " + txID);
@@ -567,15 +574,16 @@
    {
       connection.setValid(false);
 
-      try {
+      try
+      {
          StompSession session = sessions.remove(connection.getID());
          if (session != null)
          {
             try
             {
                session.getSession().rollback(true);
-               session.getSession().close();
-             }
+               session.getSession().close(false);
+            }
             catch (Exception e)
             {
                log.warn(e.getMessage(), e);
@@ -593,7 +601,7 @@
                try
                {
                   serverSession.rollback(true);
-                  serverSession.close();
+                  serverSession.close(false);
                }
                catch (Exception e)
                {

Modified: trunk/src/main/org/hornetq/core/server/ServerConsumer.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerConsumer.java	2010-05-18 13:38:01 UTC (rev 9243)
+++ trunk/src/main/org/hornetq/core/server/ServerConsumer.java	2010-05-18 16:30:21 UTC (rev 9244)
@@ -28,9 +28,9 @@
 {
    long getID();
 
-   void close() throws Exception;
+   void close(boolean failed) throws Exception;
 
-   List<MessageReference> cancelRefs(boolean lastConsumedAsDelivered, Transaction tx) throws Exception;
+   List<MessageReference> cancelRefs(boolean failed, boolean lastConsumedAsDelivered, Transaction tx) throws Exception;
 
    void setStarted(boolean started);
 

Modified: trunk/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerSession.java	2010-05-18 13:38:01 UTC (rev 9243)
+++ trunk/src/main/org/hornetq/core/server/ServerSession.java	2010-05-18 16:30:21 UTC (rev 9244)
@@ -107,7 +107,7 @@
 
    void requestProducerCredits(SimpleString address, int credits) throws Exception;
 
-   void close() throws Exception;
+   void close(boolean failed) throws Exception;
 
    void setTransferring(boolean transferring);
 }

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2010-05-18 13:38:01 UTC (rev 9243)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2010-05-18 16:30:21 UTC (rev 9244)
@@ -262,7 +262,7 @@
       return filter;
    }
 
-   public void close() throws Exception
+   public void close(final boolean failed) throws Exception
    {
       setStarted(false);
 
@@ -278,7 +278,7 @@
 
       session.removeConsumer(id);
 
-      LinkedList<MessageReference> refs = cancelRefs(false, null);
+      LinkedList<MessageReference> refs = cancelRefs(failed, false, null);
 
       Iterator<MessageReference> iter = refs.iterator();
 
@@ -356,7 +356,7 @@
       }
    }
 
-   public LinkedList<MessageReference> cancelRefs(final boolean lastConsumedAsDelivered, final Transaction tx) throws Exception
+   public LinkedList<MessageReference> cancelRefs(final boolean failed, final boolean lastConsumedAsDelivered, final Transaction tx) throws Exception
    {
       boolean performACK = lastConsumedAsDelivered;
 
@@ -374,7 +374,13 @@
             }
             else
             {
-               ref.decrementDeliveryCount();
+               if (!failed)
+               {
+                  //We don't decrement delivery count if the client failed, since there's a possibility that refs were actually delivered but we just didn't get any acks for them
+                  //before failure
+                  log.info("decrementing delivery count");
+                  ref.decrementDeliveryCount();
+               }
 
                refs.add(ref);
             }

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2010-05-18 13:38:01 UTC (rev 9243)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2010-05-18 16:30:21 UTC (rev 9244)
@@ -241,20 +241,20 @@
       }
    }
 
-   private synchronized void doClose() throws Exception
+   private synchronized void doClose(final boolean failed) throws Exception
    {
       if (tx != null && tx.getXid() == null)
       {
          // We only rollback local txs on close, not XA tx branches
 
-         rollback(false);
+         rollback(failed);
       }
 
       Set<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(consumers.values());
 
       for (ServerConsumer consumer : consumersClone)
       {
-         consumer.close();
+         consumer.close(failed);
       }
 
       consumers.clear();
@@ -905,7 +905,7 @@
       setStarted(false);
    }
 
-   public void close()
+   public void close(final boolean failed)
    {
       storageManager.afterCompleteOperations(new IOAsyncTask()
       {
@@ -917,7 +917,7 @@
          {
             try
             {
-               doClose();
+               doClose(failed);
             }
             catch (Exception e)
             {
@@ -933,7 +933,7 @@
 
       if (consumer != null)
       {
-         consumer.close();
+         consumer.close(false);
       }
       else
       {
@@ -1065,7 +1065,7 @@
       {
          ServerSessionImpl.log.warn("Client connection failed, clearing up resources for session " + name);
 
-         close();
+         close(true);
 
          ServerSessionImpl.log.warn("Cleared up resources for session " + name);
       }
@@ -1134,7 +1134,7 @@
             consumer.setStarted(false);
          }
 
-         toCancel.addAll(consumer.cancelRefs(lastMessageAsDelived, theTx));
+         toCancel.addAll(consumer.cancelRefs(false, lastMessageAsDelived, theTx));
       }
 
       for (MessageReference ref : toCancel)

Modified: trunk/tests/src/org/hornetq/tests/integration/clientcrash/ClientCrashTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/clientcrash/ClientCrashTest.java	2010-05-18 13:38:01 UTC (rev 9243)
+++ trunk/tests/src/org/hornetq/tests/integration/clientcrash/ClientCrashTest.java	2010-05-18 16:30:21 UTC (rev 9244)
@@ -61,8 +61,7 @@
    {
       assertActiveConnections(0);
 
-      // spawn a JVM that creates a Core client, which waits to receive a test
-      // message
+      // spawn a JVM that creates a Core client, which sends a message
       Process p = SpawnedVMSupport.spawnVM(CrashClient.class.getName());
 
       ClientSession session = sf.createSession(false, true, true);
@@ -72,7 +71,7 @@
 
       session.start();
 
-      // send the message to the queue
+      // receive a message from the queue
       Message messageFromClient = consumer.receive(5000);
       Assert.assertNotNull("no message received", messageFromClient);
       Assert.assertEquals(ClientCrashTest.MESSAGE_TEXT_FROM_CLIENT, messageFromClient.getBodyBuffer().readString());
@@ -110,7 +109,42 @@
       // FIXME https://jira.jboss.org/jira/browse/JBMESSAGING-1421
       assertActiveSession(0);
    }
+   
+   public void testCrashClient2() throws Exception
+   {     
+      assertActiveConnections(0);
 
+      ClientSession session = sf.createSession(false, true, true);
+           
+      session.createQueue(ClientCrashTest.QUEUE, ClientCrashTest.QUEUE, null, false);
+      
+      // spawn a JVM that creates a Core client, which sends a message
+      Process p = SpawnedVMSupport.spawnVM(CrashClient2.class.getName());
+      
+      ClientCrashTest.log.debug("waiting for the client VM to crash ...");
+      p.waitFor();
+
+      Assert.assertEquals(9, p.exitValue());
+
+      System.out.println("VM Exited");
+
+      Thread.sleep(3 * ClientCrashTest.CONNECTION_TTL);
+      
+      ClientConsumer consumer = session.createConsumer(ClientCrashTest.QUEUE);
+      
+      session.start();
+
+      // receive a message from the queue
+      ClientMessage messageFromClient = consumer.receive(10000);
+      Assert.assertNotNull("no message received", messageFromClient);
+      Assert.assertEquals(ClientCrashTest.MESSAGE_TEXT_FROM_CLIENT, messageFromClient.getBodyBuffer().readString());
+
+      assertEquals(2, messageFromClient.getDeliveryCount());
+      
+      session.close();
+
+   }
+
    // Package protected ---------------------------------------------
 
    @Override

Modified: trunk/tests/src/org/hornetq/tests/integration/clientcrash/CrashClient.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/clientcrash/CrashClient.java	2010-05-18 13:38:01 UTC (rev 9243)
+++ trunk/tests/src/org/hornetq/tests/integration/clientcrash/CrashClient.java	2010-05-18 16:30:21 UTC (rev 9244)
@@ -23,8 +23,6 @@
 /**
  * Code to be run in an external VM, via main().
  * 
- * This client will open a connection, receive a message and crash.
- *
  * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
  * 



More information about the hornetq-commits mailing list