Author: clebert.suconic(a)jboss.com
Date: 2011-11-22 16:37:07 -0500 (Tue, 22 Nov 2011)
New Revision: 11743
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/HornetQChannelHandler.java
Log:
avoiding dead locks through stomp / close session / exception handling
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 2011-11-22
21:35:33 UTC (rev 11742)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 2011-11-22
21:37:07 UTC (rev 11743)
@@ -17,6 +17,7 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQBuffers;
@@ -66,6 +67,8 @@
private final Object failLock = new Object();
+ private final Executor executor;
+
private volatile boolean dataReceived;
public StompDecoder getDecoder()
@@ -73,7 +76,7 @@
return decoder;
}
- StompConnection(final Acceptor acceptorUsed, final Connection transportConnection,
final StompProtocolManager manager)
+ StompConnection(final Acceptor acceptorUsed, final Connection transportConnection,
final StompProtocolManager manager, final Executor executor)
{
this.transportConnection = transportConnection;
@@ -82,6 +85,8 @@
this.creationTime = System.currentTimeMillis();
this.acceptorUsed = acceptorUsed;
+
+ this.executor = executor;
}
public void addFailureListener(final FailureListener listener)
@@ -322,7 +327,6 @@
private void callFailureListeners(final HornetQException me)
{
final List<FailureListener> listenersClone = new
ArrayList<FailureListener>(failureListeners);
-
for (final FailureListener listener : listenersClone)
{
try
@@ -343,20 +347,26 @@
{
final List<CloseListener> listenersClone = new
ArrayList<CloseListener>(closeListeners);
- for (final CloseListener listener : listenersClone)
- {
- try
+ // avoiding a dead lock
+ executor.execute(new Runnable(){
+ public void run()
{
- listener.connectionClosed();
+ for (final CloseListener listener : listenersClone)
+ {
+ try
+ {
+ listener.connectionClosed();
+ }
+ catch (final Throwable t)
+ {
+ // Failure of one listener to execute shouldn't prevent others
+ // from
+ // executing
+ log.error("Failed to execute failure listener", t);
+ }
+ }
}
- catch (final Throwable t)
- {
- // Failure of one listener to execute shouldn't prevent others
- // from
- // executing
- log.error("Failed to execute failure listener", t);
- }
- }
+ });
}
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2011-11-22
21:35:33 UTC (rev 11742)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2011-11-22
21:37:07 UTC (rev 11743)
@@ -120,7 +120,7 @@
public ConnectionEntry createConnectionEntry(final Acceptor acceptorUsed, final
Connection connection)
{
- StompConnection conn = new StompConnection(acceptorUsed, connection, this);
+ StompConnection conn = new StompConnection(acceptorUsed, connection, this,
server.getExecutorFactory().getExecutor());
// Note that STOMP has no heartbeat, so if connection ttl is non zero, data must
continue to be sent or connection
// will be timed out and closed!
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/HornetQChannelHandler.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/HornetQChannelHandler.java 2011-11-22
21:35:33 UTC (rev 11742)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/HornetQChannelHandler.java 2011-11-22
21:37:07 UTC (rev 11743)
@@ -96,19 +96,20 @@
@Override
public void exceptionCaught(final ChannelHandlerContext ctx, final ExceptionEvent e)
throws Exception
{
- synchronized (this)
+ if (!active)
{
- if (!active)
- {
- return;
- }
+ return;
+ }
- // We don't want to log this - since it is normal for this to happen during
failover/reconnect
- // and we don't want to spew out stack traces in that event
- // The user has access to this exeception anyway via the HornetQException
initial cause
+ // We don't want to log this - since it is normal for this to happen during
failover/reconnect
+ // and we don't want to spew out stack traces in that event
+ // The user has access to this exeception anyway via the HornetQException initial
cause
- HornetQException me = new HornetQException(HornetQException.INTERNAL_ERROR,
"Netty exception");
- me.initCause(e.getCause());
+ HornetQException me = new HornetQException(HornetQException.INTERNAL_ERROR,
"Netty exception");
+ me.initCause(e.getCause());
+
+ synchronized (listener)
+ {
try
{
listener.connectionException(e.getChannel().getId(), me);