[hornetq-commits] JBoss hornetq SVN: r9461 - trunk/src/main/org/hornetq/core/remoting/impl/netty.
do-not-reply at jboss.org
do-not-reply at jboss.org
Fri Jul 23 04:11:28 EDT 2010
Author: timfox
Date: 2010-07-23 04:11:27 -0400 (Fri, 23 Jul 2010)
New Revision: 9461
Modified:
trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java
Log:
fixed batching
Modified: trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java 2010-07-23 04:51:06 UTC (rev 9460)
+++ trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java 2010-07-23 08:11:27 UTC (rev 9461)
@@ -162,72 +162,74 @@
public void write(HornetQBuffer buffer, final boolean flush, final boolean batched)
{
- if (writeLock.compareAndSet(false, true))
+ while (!writeLock.compareAndSet(false, true))
{
- try
+ Thread.yield();
+ }
+
+ try
+ {
+ if (batchBuffer == null && batchingEnabled && batched && !flush)
{
- if (batchBuffer == null && batchingEnabled && batched && !flush)
+ // Lazily create batch buffer
+
+ batchBuffer = HornetQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
+ }
+
+ if (batchBuffer != null)
+ {
+ batchBuffer.writeBytes(buffer, 0, buffer.writerIndex());
+
+ if (batchBuffer.writerIndex() >= BATCHING_BUFFER_SIZE || !batched || flush)
{
- // Lazily create batch buffer
+ // If the batch buffer is full or it's flush param or not batched then flush the buffer
- batchBuffer = HornetQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
+ buffer = batchBuffer;
}
+ else
+ {
+ return;
+ }
- if (batchBuffer != null)
+ if (!batched || flush)
{
- batchBuffer.writeBytes(buffer, 0, buffer.writerIndex());
+ batchBuffer = null;
+ }
+ else
+ {
+ // Create a new buffer
- if (batchBuffer.writerIndex() >= BATCHING_BUFFER_SIZE || !batched || flush)
- {
- // If the batch buffer is full or it's flush param or not batched then flush the buffer
-
- buffer = batchBuffer;
- }
- else
- {
- return;
- }
-
- if (!batched || flush)
- {
- batchBuffer = null;
- }
- else
- {
- // Create a new buffer
-
- batchBuffer = HornetQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
- }
+ batchBuffer = HornetQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
}
+ }
- ChannelFuture future = channel.write(buffer.channelBuffer());
+ ChannelFuture future = channel.write(buffer.channelBuffer());
- if (flush)
+ if (flush)
+ {
+ while (true)
{
- while (true)
+ try
{
- try
- {
- boolean ok = future.await(10000);
+ boolean ok = future.await(10000);
- if (!ok)
- {
- NettyConnection.log.warn("Timed out waiting for packet to be flushed");
- }
-
- break;
- }
- catch (InterruptedException ignore)
+ if (!ok)
{
+ NettyConnection.log.warn("Timed out waiting for packet to be flushed");
}
+
+ break;
}
+ catch (InterruptedException ignore)
+ {
+ }
}
}
- finally
- {
- writeLock.set(false);
- }
}
+ finally
+ {
+ writeLock.set(false);
+ }
}
public String getRemoteAddress()
More information about the hornetq-commits
mailing list