[hornetq-commits] JBoss hornetq SVN: r9461 - trunk/src/main/org/hornetq/core/remoting/impl/netty.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Jul 23 04:11:28 EDT 2010


Author: timfox
Date: 2010-07-23 04:11:27 -0400 (Fri, 23 Jul 2010)
New Revision: 9461

Modified:
   trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java
Log:
fixed batching

Modified: trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java	2010-07-23 04:51:06 UTC (rev 9460)
+++ trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java	2010-07-23 08:11:27 UTC (rev 9461)
@@ -162,72 +162,74 @@
 
    public void write(HornetQBuffer buffer, final boolean flush, final boolean batched)
    {
-      if (writeLock.compareAndSet(false, true))
+      while (!writeLock.compareAndSet(false, true))
       {
-         try
+         Thread.yield();
+      }
+
+      try
+      {
+         if (batchBuffer == null && batchingEnabled && batched && !flush)
          {
-            if (batchBuffer == null && batchingEnabled && batched && !flush)
+            // Lazily create batch buffer
+
+            batchBuffer = HornetQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
+         }
+
+         if (batchBuffer != null)
+         {
+            batchBuffer.writeBytes(buffer, 0, buffer.writerIndex());
+
+            if (batchBuffer.writerIndex() >= BATCHING_BUFFER_SIZE || !batched || flush)
             {
-               // Lazily create batch buffer
+               // If the batch buffer is full or it's flush param or not batched then flush the buffer
 
-               batchBuffer = HornetQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
+               buffer = batchBuffer;
             }
+            else
+            {
+               return;
+            }
 
-            if (batchBuffer != null)
+            if (!batched || flush)
             {
-               batchBuffer.writeBytes(buffer, 0, buffer.writerIndex());
+               batchBuffer = null;
+            }
+            else
+            {
+               // Create a new buffer
 
-               if (batchBuffer.writerIndex() >= BATCHING_BUFFER_SIZE || !batched || flush)
-               {
-                  // If the batch buffer is full or it's flush param or not batched then flush the buffer
-
-                  buffer = batchBuffer;
-               }
-               else
-               {
-                  return;
-               }
-
-               if (!batched || flush)
-               {
-                  batchBuffer = null;
-               }
-               else
-               {
-                  // Create a new buffer
-
-                  batchBuffer = HornetQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
-               }
+               batchBuffer = HornetQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
             }
+         }
 
-            ChannelFuture future = channel.write(buffer.channelBuffer());
+         ChannelFuture future = channel.write(buffer.channelBuffer());
 
-            if (flush)
+         if (flush)
+         {
+            while (true)
             {
-               while (true)
+               try
                {
-                  try
-                  {
-                     boolean ok = future.await(10000);
+                  boolean ok = future.await(10000);
 
-                     if (!ok)
-                     {
-                        NettyConnection.log.warn("Timed out waiting for packet to be flushed");
-                     }
-
-                     break;
-                  }
-                  catch (InterruptedException ignore)
+                  if (!ok)
                   {
+                     NettyConnection.log.warn("Timed out waiting for packet to be flushed");
                   }
+
+                  break;
                }
+               catch (InterruptedException ignore)
+               {
+               }
             }
          }
-         finally
-         {
-            writeLock.set(false);
-         }
       }
+      finally
+      {
+         writeLock.set(false);
+      }
    }
 
    public String getRemoteAddress()



More information about the hornetq-commits mailing list