[hornetq-commits] JBoss hornetq SVN: r8631 - in trunk: src/main/org/hornetq/core/remoting and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Dec 9 04:52:13 EST 2009


Author: timfox
Date: 2009-12-09 04:52:12 -0500 (Wed, 09 Dec 2009)
New Revision: 8631

Modified:
   trunk/examples/core/perf/src/org/hornetq/core/example/PerfBase.java
   trunk/src/main/org/hornetq/core/remoting/RemotingConnection.java
   trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
   trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
   trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
Log:
added flush of confirmations on timer

Modified: trunk/examples/core/perf/src/org/hornetq/core/example/PerfBase.java
===================================================================
--- trunk/examples/core/perf/src/org/hornetq/core/example/PerfBase.java	2009-12-09 09:27:14 UTC (rev 8630)
+++ trunk/examples/core/perf/src/org/hornetq/core/example/PerfBase.java	2009-12-09 09:52:12 UTC (rev 8631)
@@ -437,14 +437,13 @@
          {
             session.commit();
          }
-
+         
+         session.close();
+         
          if (useSendAcks)
          {
-            // Must close the session first since this flushes the confirmations
-            session.close();
-
             theLatch.await();
-         }
+         }       
       }
       finally
       {

Modified: trunk/src/main/org/hornetq/core/remoting/RemotingConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/RemotingConnection.java	2009-12-09 09:27:14 UTC (rev 8630)
+++ trunk/src/main/org/hornetq/core/remoting/RemotingConnection.java	2009-12-09 09:52:12 UTC (rev 8631)
@@ -75,4 +75,6 @@
    boolean checkDataReceived();
 
    void removeAllChannels();
+   
+   void flushConfirmations();
 }

Modified: trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java	2009-12-09 09:27:14 UTC (rev 8630)
+++ trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java	2009-12-09 09:52:12 UTC (rev 8631)
@@ -364,10 +364,11 @@
    {
       return connection;
    }
-
-   public void flushConfirmations()
+   
+   //Needs to be synchronized since can be called by remoting service timer thread too for timeout flush
+   public synchronized void flushConfirmations()
    {
-      if (receivedBytes != 0)
+      if (resendCache != null && receivedBytes != 0)
       {
          receivedBytes = 0;
 

Modified: trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java	2009-12-09 09:27:14 UTC (rev 8630)
+++ trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java	2009-12-09 09:52:12 UTC (rev 8631)
@@ -334,6 +334,17 @@
          channels.clear();
       }
    }
+   
+   public void flushConfirmations()
+   {
+      synchronized (transferLock)
+      {
+         for (Channel channel: channels.values())
+         {
+            channel.flushConfirmations();
+         }
+      }
+   }
 
    // Buffer Handler implementation
    // ----------------------------------------------------

Modified: trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2009-12-09 09:27:14 UTC (rev 8630)
+++ trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2009-12-09 09:52:12 UTC (rev 8631)
@@ -88,7 +88,7 @@
 
    private final ScheduledExecutorService scheduledThreadPool;
 
-   private FailureCheckThread failureCheckThread;
+   private FailureCheckAndFlushThread failureCheckAndFlushThread;
 
    // Static --------------------------------------------------------
 
@@ -184,9 +184,10 @@
          a.start();
       }
 
-      failureCheckThread = new FailureCheckThread(RemotingServiceImpl.CONNECTION_TTL_CHECK_INTERVAL);
+      //This thread checks connections that need to be closed, and also flushes confirmations
+      failureCheckAndFlushThread = new FailureCheckAndFlushThread(RemotingServiceImpl.CONNECTION_TTL_CHECK_INTERVAL);
 
-      failureCheckThread.start();
+      failureCheckAndFlushThread.start();
 
       started = true;
    }
@@ -220,7 +221,7 @@
          return;
       }
 
-      failureCheckThread.close();
+      failureCheckAndFlushThread.close();
 
       // We need to stop them accepting first so no new connections are accepted after we send the disconnect message
       for (Acceptor acceptor : acceptors)
@@ -451,13 +452,13 @@
       }
    }
 
-   private final class FailureCheckThread extends Thread
+   private final class FailureCheckAndFlushThread extends Thread
    {
       private final long pauseInterval;
 
       private volatile boolean closed;
 
-      FailureCheckThread(final long pauseInterval)
+      FailureCheckAndFlushThread(final long pauseInterval)
       {
          super("hornetq-failure-check-thread");
 
@@ -493,15 +494,19 @@
 
             for (ConnectionEntry entry : connections.values())
             {
+               RemotingConnection conn = entry.connection;
+               
+               boolean flush = true;
+               
                if (entry.ttl != -1)
                {
                   if (now >= entry.lastCheck + entry.ttl)
                   {
-                     RemotingConnection conn = entry.connection;
-
                      if (!conn.checkDataReceived())
                      {
                         idsToRemove.add(conn.getID());
+                        
+                        flush = false;
                      }
                      else
                      {
@@ -509,6 +514,14 @@
                      }
                   }
                }
+               
+               if (flush)
+               {
+                  //We flush any confirmations on the connection - this prevents idle bridges for example
+                  //sitting there with many unacked messages
+                                    
+                  conn.flushConfirmations();
+               }
             }
 
             for (Object id : idsToRemove)



More information about the hornetq-commits mailing list