[hornetq-commits] JBoss hornetq SVN: r9457 - in trunk/src/main/org/hornetq: core/paging and 3 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Jul 22 09:18:51 EDT 2010


Author: timfox
Date: 2010-07-22 09:18:50 -0400 (Thu, 22 Jul 2010)
New Revision: 9457

Modified:
   trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
   trunk/src/main/org/hornetq/core/paging/PagingStore.java
   trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java
   trunk/src/main/org/hornetq/utils/OrderedExecutorFactory.java
Log:
a few tweaks

Modified: trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java	2010-07-22 13:04:29 UTC (rev 9456)
+++ trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java	2010-07-22 13:18:50 UTC (rev 9457)
@@ -36,7 +36,6 @@
 import org.hornetq.api.core.client.ClientSession;
 import org.hornetq.api.core.client.ClientSessionFactory;
 import org.hornetq.api.core.client.SessionFailureListener;
-import org.hornetq.api.core.client.loadbalance.ConnectionLoadBalancingPolicy;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.protocol.core.Channel;
 import org.hornetq.core.protocol.core.ChannelHandler;

Modified: trunk/src/main/org/hornetq/core/paging/PagingStore.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/PagingStore.java	2010-07-22 13:04:29 UTC (rev 9456)
+++ trunk/src/main/org/hornetq/core/paging/PagingStore.java	2010-07-22 13:18:50 UTC (rev 9457)
@@ -15,7 +15,6 @@
 
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.server.HornetQComponent;
-import org.hornetq.core.server.MessageReference;
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
 

Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2010-07-22 13:04:29 UTC (rev 9456)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2010-07-22 13:18:50 UTC (rev 9457)
@@ -838,6 +838,7 @@
    private void setPagingStore(final ServerMessage message) throws Exception
    {
       PagingStore store = pagingManager.getPageStore(message.getAddress());
+      
       message.setPagingStore(store);
    }
 

Modified: trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java	2010-07-22 13:04:29 UTC (rev 9456)
+++ trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java	2010-07-22 13:18:50 UTC (rev 9457)
@@ -13,6 +13,8 @@
 
 package org.hornetq.core.remoting.impl.netty;
 
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.HornetQBuffers;
 import org.hornetq.core.buffers.impl.ChannelBufferWrapper;
@@ -49,27 +51,30 @@
    private final ConnectionLifeCycleListener listener;
 
    private final boolean batchingEnabled;
-   
+
    private final boolean directDeliver;
-   
-   private HornetQBuffer batchBuffer;
-   
-   private final Object writeLock = new Object();
 
+   private volatile HornetQBuffer batchBuffer;
+
+   private final AtomicBoolean writeLock = new AtomicBoolean(false);
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public NettyConnection(final Channel channel, final ConnectionLifeCycleListener listener, boolean batchingEnabled, boolean directDeliver)
+   public NettyConnection(final Channel channel,
+                          final ConnectionLifeCycleListener listener,
+                          boolean batchingEnabled,
+                          boolean directDeliver)
    {
       this.channel = channel;
 
       this.listener = listener;
 
       this.batchingEnabled = batchingEnabled;
-      
+
       this.directDeliver = directDeliver;
-      
+
       listener.connectionCreated(this, ProtocolType.CORE);
    }
 
@@ -127,19 +132,26 @@
    // This is called periodically to flush the batch buffer
    public void checkFlushBatchBuffer()
    {
-      synchronized (writeLock)
+      if (!batchingEnabled)
       {
-         if (!batchingEnabled)
-         {
-            return;
-         }
+         return;
+      }
 
-         if (batchBuffer != null && batchBuffer.readable())
+      if (writeLock.compareAndSet(false, true))
+      {
+         try
          {
-            channel.write(batchBuffer.channelBuffer());
+            if (batchBuffer != null && batchBuffer.readable())
+            {
+               channel.write(batchBuffer.channelBuffer());
 
-            batchBuffer = HornetQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
+               batchBuffer = HornetQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
+            }
          }
+         finally
+         {
+            writeLock.set(false);
+         }
       }
    }
 
@@ -150,64 +162,71 @@
 
    public void write(HornetQBuffer buffer, final boolean flush, final boolean batched)
    {
-      synchronized (writeLock)
+      if (writeLock.compareAndSet(false, true))
       {
-         if (batchBuffer == null && batchingEnabled && batched && !flush)
+         try
          {
-            // Lazily create batch buffer
-
-            batchBuffer = HornetQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
-         }
-
-         if (batchBuffer != null)
-         {
-            batchBuffer.writeBytes(buffer, 0, buffer.writerIndex());
-
-            if (batchBuffer.writerIndex() >= BATCHING_BUFFER_SIZE || !batched || flush)
+            if (batchBuffer == null && batchingEnabled && batched && !flush)
             {
-               // If the batch buffer is full or it's flush param or not batched then flush the buffer
+               // Lazily create batch buffer
 
-               buffer = batchBuffer;
+               batchBuffer = HornetQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
             }
-            else
-            {
-               return;
-            }
 
-            if (!batched || flush)
+            if (batchBuffer != null)
             {
-               batchBuffer = null;
-            }
-            else
-            {
-               // Create a new buffer
+               batchBuffer.writeBytes(buffer, 0, buffer.writerIndex());
 
-               batchBuffer = HornetQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
+               if (batchBuffer.writerIndex() >= BATCHING_BUFFER_SIZE || !batched || flush)
+               {
+                  // If the batch buffer is full or it's flush param or not batched then flush the buffer
+
+                  buffer = batchBuffer;
+               }
+               else
+               {
+                  return;
+               }
+
+               if (!batched || flush)
+               {
+                  batchBuffer = null;
+               }
+               else
+               {
+                  // Create a new buffer
+
+                  batchBuffer = HornetQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
+               }
             }
-         }
 
-         ChannelFuture future = channel.write(buffer.channelBuffer());
+            ChannelFuture future = channel.write(buffer.channelBuffer());
 
-         if (flush)
-         {
-            while (true)
+            if (flush)
             {
-               try
+               while (true)
                {
-                  boolean ok = future.await(10000);
+                  try
+                  {
+                     boolean ok = future.await(10000);
 
-                  if (!ok)
+                     if (!ok)
+                     {
+                        NettyConnection.log.warn("Timed out waiting for packet to be flushed");
+                     }
+
+                     break;
+                  }
+                  catch (InterruptedException ignore)
                   {
-                     NettyConnection.log.warn("Timed out waiting for packet to be flushed");
                   }
-
-                  break;
                }
-               catch (InterruptedException ignore)
-               {
-               }
             }
          }
+         finally
+         {
+            writeLock.set(false);
+         }
       }
    }
 
@@ -215,7 +234,7 @@
    {
       return channel.getRemoteAddress().toString();
    }
-   
+
    public boolean isDirectDeliver()
    {
       return directDeliver;

Modified: trunk/src/main/org/hornetq/utils/OrderedExecutorFactory.java
===================================================================
--- trunk/src/main/org/hornetq/utils/OrderedExecutorFactory.java	2010-07-22 13:04:29 UTC (rev 9456)
+++ trunk/src/main/org/hornetq/utils/OrderedExecutorFactory.java	2010-07-22 13:18:50 UTC (rev 9457)
@@ -16,6 +16,8 @@
 import java.util.LinkedList;
 import java.util.concurrent.Executor;
 
+import org.hornetq.core.logging.Logger;
+
 /**
  * A factory for producing executors that run all tasks in order, which delegate to a single common executor instance.
  *
@@ -27,6 +29,8 @@
  */
 public final class OrderedExecutorFactory implements ExecutorFactory
 {
+   private static final Logger log = Logger.getLogger(OrderedExecutorFactory.class);
+
    private final Executor parent;
 
    /**
@@ -97,7 +101,7 @@
                   }
                   catch (Throwable t)
                   {
-                     // eat it!
+                     log.error("Caught unexpected Throwable", t);
                   }
                }
             }



More information about the hornetq-commits mailing list