Author: timfox
Date: 2010-07-22 09:18:50 -0400 (Thu, 22 Jul 2010)
New Revision: 9457
Modified:
trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
trunk/src/main/org/hornetq/core/paging/PagingStore.java
trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java
trunk/src/main/org/hornetq/utils/OrderedExecutorFactory.java
Log:
a few tweaks
Modified: trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2010-07-22
13:04:29 UTC (rev 9456)
+++ trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2010-07-22
13:18:50 UTC (rev 9457)
@@ -36,7 +36,6 @@
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.SessionFailureListener;
-import org.hornetq.api.core.client.loadbalance.ConnectionLoadBalancingPolicy;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.protocol.core.Channel;
import org.hornetq.core.protocol.core.ChannelHandler;
Modified: trunk/src/main/org/hornetq/core/paging/PagingStore.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/PagingStore.java 2010-07-22 13:04:29 UTC (rev
9456)
+++ trunk/src/main/org/hornetq/core/paging/PagingStore.java 2010-07-22 13:18:50 UTC (rev
9457)
@@ -15,7 +15,6 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.server.HornetQComponent;
-import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-07-22
13:04:29 UTC (rev 9456)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-07-22
13:18:50 UTC (rev 9457)
@@ -838,6 +838,7 @@
private void setPagingStore(final ServerMessage message) throws Exception
{
PagingStore store = pagingManager.getPageStore(message.getAddress());
+
message.setPagingStore(store);
}
Modified: trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java 2010-07-22
13:04:29 UTC (rev 9456)
+++ trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java 2010-07-22
13:18:50 UTC (rev 9457)
@@ -13,6 +13,8 @@
package org.hornetq.core.remoting.impl.netty;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQBuffers;
import org.hornetq.core.buffers.impl.ChannelBufferWrapper;
@@ -49,27 +51,30 @@
private final ConnectionLifeCycleListener listener;
private final boolean batchingEnabled;
-
+
private final boolean directDeliver;
-
- private HornetQBuffer batchBuffer;
-
- private final Object writeLock = new Object();
+ private volatile HornetQBuffer batchBuffer;
+
+ private final AtomicBoolean writeLock = new AtomicBoolean(false);
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public NettyConnection(final Channel channel, final ConnectionLifeCycleListener
listener, boolean batchingEnabled, boolean directDeliver)
+ public NettyConnection(final Channel channel,
+ final ConnectionLifeCycleListener listener,
+ boolean batchingEnabled,
+ boolean directDeliver)
{
this.channel = channel;
this.listener = listener;
this.batchingEnabled = batchingEnabled;
-
+
this.directDeliver = directDeliver;
-
+
listener.connectionCreated(this, ProtocolType.CORE);
}
@@ -127,19 +132,26 @@
// This is called periodically to flush the batch buffer
public void checkFlushBatchBuffer()
{
- synchronized (writeLock)
+ if (!batchingEnabled)
{
- if (!batchingEnabled)
- {
- return;
- }
+ return;
+ }
- if (batchBuffer != null && batchBuffer.readable())
+ if (writeLock.compareAndSet(false, true))
+ {
+ try
{
- channel.write(batchBuffer.channelBuffer());
+ if (batchBuffer != null && batchBuffer.readable())
+ {
+ channel.write(batchBuffer.channelBuffer());
- batchBuffer = HornetQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
+ batchBuffer = HornetQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
+ }
}
+ finally
+ {
+ writeLock.set(false);
+ }
}
}
@@ -150,64 +162,71 @@
public void write(HornetQBuffer buffer, final boolean flush, final boolean batched)
{
- synchronized (writeLock)
+ if (writeLock.compareAndSet(false, true))
{
- if (batchBuffer == null && batchingEnabled && batched &&
!flush)
+ try
{
- // Lazily create batch buffer
-
- batchBuffer = HornetQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
- }
-
- if (batchBuffer != null)
- {
- batchBuffer.writeBytes(buffer, 0, buffer.writerIndex());
-
- if (batchBuffer.writerIndex() >= BATCHING_BUFFER_SIZE || !batched ||
flush)
+ if (batchBuffer == null && batchingEnabled && batched
&& !flush)
{
- // If the batch buffer is full or it's flush param or not batched then
flush the buffer
+ // Lazily create batch buffer
- buffer = batchBuffer;
+ batchBuffer = HornetQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
}
- else
- {
- return;
- }
- if (!batched || flush)
+ if (batchBuffer != null)
{
- batchBuffer = null;
- }
- else
- {
- // Create a new buffer
+ batchBuffer.writeBytes(buffer, 0, buffer.writerIndex());
- batchBuffer = HornetQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
+ if (batchBuffer.writerIndex() >= BATCHING_BUFFER_SIZE || !batched ||
flush)
+ {
+ // If the batch buffer is full or it's flush param or not batched
then flush the buffer
+
+ buffer = batchBuffer;
+ }
+ else
+ {
+ return;
+ }
+
+ if (!batched || flush)
+ {
+ batchBuffer = null;
+ }
+ else
+ {
+ // Create a new buffer
+
+ batchBuffer = HornetQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
+ }
}
- }
- ChannelFuture future = channel.write(buffer.channelBuffer());
+ ChannelFuture future = channel.write(buffer.channelBuffer());
- if (flush)
- {
- while (true)
+ if (flush)
{
- try
+ while (true)
{
- boolean ok = future.await(10000);
+ try
+ {
+ boolean ok = future.await(10000);
- if (!ok)
+ if (!ok)
+ {
+ NettyConnection.log.warn("Timed out waiting for packet to be
flushed");
+ }
+
+ break;
+ }
+ catch (InterruptedException ignore)
{
- NettyConnection.log.warn("Timed out waiting for packet to be
flushed");
}
-
- break;
}
- catch (InterruptedException ignore)
- {
- }
}
}
+ finally
+ {
+ writeLock.set(false);
+ }
}
}
@@ -215,7 +234,7 @@
{
return channel.getRemoteAddress().toString();
}
-
+
public boolean isDirectDeliver()
{
return directDeliver;
Modified: trunk/src/main/org/hornetq/utils/OrderedExecutorFactory.java
===================================================================
--- trunk/src/main/org/hornetq/utils/OrderedExecutorFactory.java 2010-07-22 13:04:29 UTC
(rev 9456)
+++ trunk/src/main/org/hornetq/utils/OrderedExecutorFactory.java 2010-07-22 13:18:50 UTC
(rev 9457)
@@ -16,6 +16,8 @@
import java.util.LinkedList;
import java.util.concurrent.Executor;
+import org.hornetq.core.logging.Logger;
+
/**
* A factory for producing executors that run all tasks in order, which delegate to a
single common executor instance.
*
@@ -27,6 +29,8 @@
*/
public final class OrderedExecutorFactory implements ExecutorFactory
{
+ private static final Logger log = Logger.getLogger(OrderedExecutorFactory.class);
+
private final Executor parent;
/**
@@ -97,7 +101,7 @@
}
catch (Throwable t)
{
- // eat it!
+ log.error("Caught unexpected Throwable", t);
}
}
}
Show replies by date