[hornetq-commits] JBoss hornetq SVN: r9083 - in trunk: src/main/org/hornetq/core/protocol/core and 4 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Fri Apr 9 08:06:24 EDT 2010
Author: timfox
Date: 2010-04-09 08:06:23 -0400 (Fri, 09 Apr 2010)
New Revision: 9083
Modified:
trunk/examples/jms/reattach-node/server0/hornetq-jms.xml
trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
trunk/src/main/org/hornetq/core/server/ServerSession.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-336
Modified: trunk/examples/jms/reattach-node/server0/hornetq-jms.xml
===================================================================
--- trunk/examples/jms/reattach-node/server0/hornetq-jms.xml 2010-04-09 11:42:08 UTC (rev 9082)
+++ trunk/examples/jms/reattach-node/server0/hornetq-jms.xml 2010-04-09 12:06:23 UTC (rev 9083)
@@ -27,6 +27,10 @@
to try to reconnect -->
<failover-on-server-shutdown>true</failover-on-server-shutdown>
+ <!-- We need to specify a confirmation-window-size to enable re-attachment, default is -1 which
+ means no re-attachment -->
+ <confirmation-window-size>1048576</confirmation-window-size>
+
</connection-factory>
<!-- This is used by the example to send the management operations, it's not central to the example -->
Modified: trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2010-04-09 11:42:08 UTC (rev 9082)
+++ trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2010-04-09 12:06:23 UTC (rev 9083)
@@ -147,8 +147,6 @@
{
log.warn("Client connection failed, clearing up resources for session " + session.getName());
- session.runConnectionFailureRunners();
-
try
{
session.close();
@@ -177,7 +175,6 @@
public void connectionClosed()
{
- session.runConnectionFailureRunners();
}
private void addConnectionListeners()
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-04-09 11:42:08 UTC (rev 9082)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-04-09 12:06:23 UTC (rev 9083)
@@ -567,8 +567,7 @@
{
session.getSession().rollback(true);
session.getSession().close();
- session.getSession().runConnectionFailureRunners();
- }
+ }
catch (Exception e)
{
log.warn(e.getMessage(), e);
@@ -587,7 +586,6 @@
{
serverSession.rollback(true);
serverSession.close();
- serverSession.runConnectionFailureRunners();
}
catch (Exception e)
{
Modified: trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-04-09 11:42:08 UTC (rev 9082)
+++ trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-04-09 12:06:23 UTC (rev 9083)
@@ -31,8 +31,10 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.protocol.core.ServerSessionPacketHandler;
import org.hornetq.core.protocol.core.impl.CoreProtocolManagerFactory;
import org.hornetq.core.protocol.stomp.StompProtocolManagerFactory;
+import org.hornetq.core.remoting.FailureListener;
import org.hornetq.core.remoting.server.RemotingService;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.management.ManagementService;
@@ -365,16 +367,32 @@
if (conn != null)
{
- // if the connection has no failure listeners it means the sesssions etc were already closed so this is a clean
- // shutdown, therefore we can destroy the connection
- // otherwise client might have crashed/exited without closing connections so we leave them for connection TTL
+ // Bit of a hack - find a better way to do this
- if (conn.connection.getFailureListeners().isEmpty())
+ List<FailureListener> failureListeners = conn.connection.getFailureListeners();
+
+ boolean empty = true;
+
+ for (FailureListener listener : failureListeners)
{
+ if (listener instanceof ServerSessionPacketHandler)
+ {
+ empty = false;
+
+ break;
+ }
+ }
+
+ // We only destroy the connection if the connection has no sessions attached to it
+ // Otherwise it means the connection has died without the sessions being closed first
+ // so we need to keep them for ttl, in case re-attachment occurs
+ if (empty)
+ {
connections.remove(connectionID);
conn.connection.destroy();
}
+
}
}
Modified: trunk/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerSession.java 2010-04-09 11:42:08 UTC (rev 9082)
+++ trunk/src/main/org/hornetq/core/server/ServerSession.java 2010-04-09 12:06:23 UTC (rev 9083)
@@ -110,6 +110,4 @@
void close() throws Exception;
void setTransferring(boolean transferring);
-
- void runConnectionFailureRunners();
}
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-04-09 11:42:08 UTC (rev 9082)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-04-09 12:06:23 UTC (rev 9083)
@@ -73,7 +73,7 @@
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
* @author <a href="mailto:andy.taylor at jboss.org>Andy Taylor</a>
*/
-public class ServerSessionImpl implements ServerSession, FailureListener, CloseListener
+public class ServerSessionImpl implements ServerSession, FailureListener
{
// Constants -----------------------------------------------------------------------------
@@ -102,7 +102,7 @@
private final Map<Long, ServerConsumer> consumers = new ConcurrentHashMap<Long, ServerConsumer>();
private Transaction tx;
-
+
private final boolean xa;
private final StorageManager storageManager;
@@ -117,8 +117,10 @@
private volatile boolean started = false;
- private final Map<SimpleString, Runnable> failureRunners = new HashMap<SimpleString, Runnable>();
+ // private final Map<SimpleString, Runnable> failureRunners = new HashMap<SimpleString, Runnable>();
+ private final Map<SimpleString, TempQueueCleanerUpper> tempQueueCleannerUppers = new HashMap<SimpleString, TempQueueCleanerUpper>();
+
private final String name;
private final HornetQServer server;
@@ -131,7 +133,7 @@
private final RoutingContext routingContext = new RoutingContextImpl(null);
private final SessionCallback callback;
-
+
private volatile SimpleString defaultAddress;
// Constructors ---------------------------------------------------------------------------------
@@ -182,7 +184,7 @@
{
tx = new TransactionImpl(storageManager);
}
-
+
this.xa = xa;
this.strictUpdateDeliveryCount = strictUpdateDeliveryCount;
@@ -196,12 +198,10 @@
this.managementAddress = managementAddress;
this.callback = callback;
-
+
this.defaultAddress = defaultAddress;
remotingConnection.addFailureListener(this);
-
- remotingConnection.addCloseListener(this);
}
// ServerSession implementation ----------------------------------------------------------------------------
@@ -272,7 +272,7 @@
}
remotingConnection.removeFailureListener(this);
-
+
callback.closed();
}
@@ -359,9 +359,9 @@
// session is closed.
// It is up to the user to delete the queue when finished with it
- failureRunners.put(name, new Runnable()
+ CloseListener closeListener = new CloseListener()
{
- public void run()
+ public void connectionClosed()
{
try
{
@@ -375,10 +375,57 @@
ServerSessionImpl.log.error("Failed to remove temporary queue " + name);
}
}
- });
+ };
+
+ TempQueueCleanerUpper cleaner = new TempQueueCleanerUpper(postOffice, name);
+
+ remotingConnection.addCloseListener(cleaner);
+ remotingConnection.addFailureListener(cleaner);
+
+ tempQueueCleannerUppers.put(name, cleaner);
}
}
+ private static class TempQueueCleanerUpper implements CloseListener, FailureListener
+ {
+ private final PostOffice postOffice;
+
+ private final SimpleString bindingName;
+
+ TempQueueCleanerUpper(final PostOffice postOffice, final SimpleString bindingName)
+ {
+ this.postOffice = postOffice;
+
+ this.bindingName = bindingName;
+ }
+
+ private void run()
+ {
+ try
+ {
+ if (postOffice.getBinding(bindingName) != null)
+ {
+ postOffice.removeBinding(bindingName);
+ }
+ }
+ catch (Exception e)
+ {
+ ServerSessionImpl.log.error("Failed to remove temporary queue " + bindingName);
+ }
+ }
+
+ public void connectionFailed(HornetQException exception)
+ {
+ run();
+ }
+
+ public void connectionClosed()
+ {
+ run();
+ }
+
+ }
+
public void deleteQueue(final SimpleString name) throws Exception
{
Binding binding = postOffice.getBinding(name);
@@ -390,7 +437,14 @@
server.destroyQueue(name, this);
- failureRunners.remove(name);
+ TempQueueCleanerUpper cleaner = this.tempQueueCleannerUppers.remove(name);
+
+ if (cleaner != null)
+ {
+ remotingConnection.removeCloseListener(cleaner);
+
+ remotingConnection.removeFailureListener(cleaner);
+ }
}
public QueueQueryResult executeQueueQuery(final SimpleString name) throws Exception
@@ -465,7 +519,7 @@
public void acknowledge(final long consumerID, final long messageID) throws Exception
{
ServerConsumer consumer = consumers.get(consumerID);
-
+
if (this.xa && tx == null)
{
throw new HornetQXAException(XAException.XAER_PROTO, "Invalid transaction state");
@@ -473,11 +527,11 @@
consumer.acknowledge(autoCommitAcks, tx, messageID);
}
-
+
public void individualAcknowledge(final long consumerID, final long messageID) throws Exception
{
ServerConsumer consumer = consumers.get(consumerID);
-
+
if (this.xa && tx == null)
{
throw new HornetQXAException(XAException.XAER_PROTO, "Invalid transaction state");
@@ -921,27 +975,27 @@
currentLargeMessage = msg;
}
-
+
public void send(final ServerMessage message) throws Exception
{
long id = storageManager.generateUniqueID();
SimpleString address = message.getAddress();
-
+
message.setMessageID(id);
message.encodeMessageIDToBuffer();
-
+
if (address == null)
{
if (message.isDurable())
{
- //We need to force a re-encode when the message gets persisted or when it gets reloaded
- //it will have no address
+ // We need to force a re-encode when the message gets persisted or when it gets reloaded
+ // it will have no address
message.setAddress(defaultAddress);
}
else
{
- //We don't want to force a re-encode when the message gets sent to the consumer
+ // We don't want to force a re-encode when the message gets sent to the consumer
message.setAddressTransient(defaultAddress);
}
}
@@ -955,8 +1009,8 @@
else
{
doSend(message);
- }
-
+ }
+
if (defaultAddress == null)
{
defaultAddress = address;
@@ -986,9 +1040,9 @@
}
public void requestProducerCredits(final SimpleString address, final int credits) throws Exception
- {
+ {
PagingStore store = postOffice.getPagingManager().getPageStore(address);
-
+
store.executeRunnableWhenMemoryAvailable(new Runnable()
{
public void run()
@@ -1008,21 +1062,6 @@
}
}
- public void runConnectionFailureRunners()
- {
- for (Runnable runner : failureRunners.values())
- {
- try
- {
- runner.run();
- }
- catch (Throwable t)
- {
- ServerSessionImpl.log.error("Failed to execute failure runner", t);
- }
- }
- }
-
// FailureListener implementation
// --------------------------------------------------------------------
@@ -1032,18 +1071,6 @@
{
ServerSessionImpl.log.warn("Client connection failed, clearing up resources for session " + name);
- for (Runnable runner : failureRunners.values())
- {
- try
- {
- runner.run();
- }
- catch (Throwable t)
- {
- ServerSessionImpl.log.error("Failed to execute failure runner", t);
- }
- }
-
close();
ServerSessionImpl.log.warn("Cleared up resources for session " + name);
@@ -1054,29 +1081,7 @@
}
}
- public void connectionClosed()
- {
- try
- {
- for (Runnable runner : failureRunners.values())
- {
- try
- {
- runner.run();
- }
- catch (Throwable t)
- {
- ServerSessionImpl.log.error("Failed to execute failure runner", t);
- }
- }
- }
- catch (Throwable t)
- {
- ServerSessionImpl.log.error("Failed to fire listeners " + this);
- }
- }
-
// Public
// ----------------------------------------------------------------------------
@@ -1174,7 +1179,7 @@
{
throw new HornetQXAException(XAException.XAER_PROTO, "Invalid transaction state");
}
-
+
if (tx == null || autoCommitSends)
{
}
More information about the hornetq-commits
mailing list