Author: clebert.suconic
Date: 2011-10-21 15:24:40 -0400 (Fri, 21 Oct 2011)
New Revision: 11578
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java
Log:
format only (no changes)
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java 2011-10-21
15:34:53 UTC (rev 11577)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java 2011-10-21
19:24:40 UTC (rev 11578)
@@ -61,7 +61,7 @@
private volatile HornetQBuffer batchBuffer;
private final Semaphore writeLock = new Semaphore(1);
-
+
private Set<ReadyListener> readyListeners = new
ConcurrentHashSet<ReadyListener>();
// Static --------------------------------------------------------
@@ -75,7 +75,7 @@
{
this(null, channel, listener, batchingEnabled, directDeliver);
}
-
+
public NettyConnection(final Acceptor acceptor,
final Channel channel,
final ConnectionLifeCycleListener listener,
@@ -178,68 +178,69 @@
public void write(HornetQBuffer buffer, final boolean flush, final boolean batched)
{
- try {
- writeLock.acquire();
-
try
{
- if (batchBuffer == null && batchingEnabled && batched &&
!flush)
- {
- // Lazily create batch buffer
+ writeLock.acquire();
- batchBuffer = HornetQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
- }
-
- if (batchBuffer != null)
+ try
{
- batchBuffer.writeBytes(buffer, 0, buffer.writerIndex());
-
- if (batchBuffer.writerIndex() >= BATCHING_BUFFER_SIZE || !batched ||
flush)
+ if (batchBuffer == null && batchingEnabled && batched
&& !flush)
{
- // If the batch buffer is full or it's flush param or not batched then
flush the buffer
+ // Lazily create batch buffer
- buffer = batchBuffer;
+ batchBuffer = HornetQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
}
- else
- {
- return;
- }
- if (!batched || flush)
+ if (batchBuffer != null)
{
- batchBuffer = null;
- }
- else
- {
- // Create a new buffer
+ batchBuffer.writeBytes(buffer, 0, buffer.writerIndex());
- batchBuffer = HornetQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
+ if (batchBuffer.writerIndex() >= BATCHING_BUFFER_SIZE || !batched ||
flush)
+ {
+ // If the batch buffer is full or it's flush param or not batched
then flush the buffer
+
+ buffer = batchBuffer;
+ }
+ else
+ {
+ return;
+ }
+
+ if (!batched || flush)
+ {
+ batchBuffer = null;
+ }
+ else
+ {
+ // Create a new buffer
+
+ batchBuffer = HornetQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
+ }
}
- }
- ChannelFuture future = channel.write(buffer.channelBuffer());
+ ChannelFuture future = channel.write(buffer.channelBuffer());
- if (flush)
- {
- while (true)
+ if (flush)
{
- try
+ while (true)
{
- boolean ok = future.await(10000);
+ try
+ {
+ boolean ok = future.await(10000);
- if (!ok)
+ if (!ok)
+ {
+ NettyConnection.log.warn("Timed out waiting for packet to be
flushed");
+ }
+
+ break;
+ }
+ catch (InterruptedException ignore)
{
- NettyConnection.log.warn("Timed out waiting for packet to be
flushed");
}
-
- break;
}
- catch (InterruptedException ignore)
- {
- }
}
}
- }
finally
{
writeLock.release();
@@ -260,20 +261,20 @@
{
return directDeliver;
}
-
+
public void addReadyListener(final ReadyListener listener)
{
readyListeners.add(listener);
}
-
+
public void removeReadyListener(final ReadyListener listener)
{
readyListeners.remove(listener);
}
-
+
public void fireReady(final boolean ready)
{
- for (ReadyListener listener: readyListeners)
+ for (ReadyListener listener : readyListeners)
{
listener.readyForWriting(ready);
}