[jboss-cvs] JBoss Messaging SVN: r4560 - in trunk: src/main/org/jboss/messaging/core/client/impl and 3 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Jun 23 11:56:26 EDT 2008
Author: timfox
Date: 2008-06-23 11:56:26 -0400 (Mon, 23 Jun 2008)
New Revision: 4560
Modified:
trunk/src/main/org/jboss/messaging/core/client/ClientConnection.java
trunk/src/main/org/jboss/messaging/core/client/ClientConsumer.java
trunk/src/main/org/jboss/messaging/core/client/ClientProducer.java
trunk/src/main/org/jboss/messaging/core/client/ClientSession.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionInternal.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerInternal.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java
trunk/src/main/org/jboss/messaging/jms/client/JBossConnection.java
trunk/src/main/org/jboss/messaging/util/MessagingBufferFactory.java
trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossConnectionTest.java
Log:
Some changes to cleanup and close
Modified: trunk/src/main/org/jboss/messaging/core/client/ClientConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientConnection.java 2008-06-23 14:50:51 UTC (rev 4559)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientConnection.java 2008-06-23 15:56:26 UTC (rev 4560)
@@ -50,6 +50,4 @@
boolean isClosed();
Version getServerVersion();
-
- void cleanUp();
}
Modified: trunk/src/main/org/jboss/messaging/core/client/ClientConsumer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientConsumer.java 2008-06-23 14:50:51 UTC (rev 4559)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientConsumer.java 2008-06-23 15:56:26 UTC (rev 4560)
@@ -45,7 +45,5 @@
boolean isClosed();
- boolean isDirect();
-
- void cleanUp();
+ boolean isDirect();
}
Modified: trunk/src/main/org/jboss/messaging/core/client/ClientProducer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientProducer.java 2008-06-23 14:50:51 UTC (rev 4559)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientProducer.java 2008-06-23 15:56:26 UTC (rev 4560)
@@ -52,6 +52,4 @@
int getMaxRate();
int getInitialWindowSize();
-
- void cleanUp();
}
Modified: trunk/src/main/org/jboss/messaging/core/client/ClientSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientSession.java 2008-06-23 14:50:51 UTC (rev 4559)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientSession.java 2008-06-23 15:56:26 UTC (rev 4560)
@@ -96,8 +96,6 @@
boolean isXA();
- void cleanUp();
-
ClientMessage createClientMessage(final byte type, final boolean durable, final long expiration,
final long timestamp, final byte priority);
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java 2008-06-23 14:50:51 UTC (rev 4559)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java 2008-06-23 15:56:26 UTC (rev 4560)
@@ -169,11 +169,21 @@
}
}
- public synchronized void cleanUp()
+ public synchronized void cleanUp() throws Exception
{
- cleanUpChildren();
+ if (closed)
+ {
+ return;
+ }
- closed = true;
+ try
+ {
+ cleanUpChildren();
+ }
+ finally
+ {
+ closed = true;
+ }
}
public boolean isClosed()
@@ -208,11 +218,6 @@
return serverVersion;
}
-// public ClientConnectionFactory getConnectionFactory()
-// {
-// return connectionFactory;
-// }
-
// Public ---------------------------------------------------------------------------------------
// Protected ------------------------------------------------------------------------------------
@@ -241,13 +246,13 @@
}
}
- private void cleanUpChildren()
+ private void cleanUpChildren() throws Exception
{
//We copy the set of sessions to prevent ConcurrentModificationException which would occur
//when the child trues to remove itself from its parent
- Set<ClientSession> childrenClone = new HashSet<ClientSession>(sessions);
+ Set<ClientSessionInternal> childrenClone = new HashSet<ClientSessionInternal>(sessions);
- for (ClientSession session: childrenClone)
+ for (ClientSessionInternal session: childrenClone)
{
session.cleanUp();
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionInternal.java 2008-06-23 14:50:51 UTC (rev 4559)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionInternal.java 2008-06-23 15:56:26 UTC (rev 4560)
@@ -42,6 +42,8 @@
void removeSession(ClientSessionInternal session);
Set<ClientSession> getSessions();
+
+ void cleanUp() throws Exception;
//For testing only
RemotingConnection getRemotingConnection();
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2008-06-23 14:50:51 UTC (rev 4559)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2008-06-23 15:56:26 UTC (rev 4560)
@@ -259,6 +259,8 @@
}
}
+ //TODO - should combine close() and cleanup() functionality in one method - there is currently duplication
+
public void close() throws MessagingException
{
if (closed)
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java 2008-06-23 14:50:51 UTC (rev 4559)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java 2008-06-23 15:56:26 UTC (rev 4560)
@@ -48,4 +48,6 @@
int getBufferSize();
int getCreditsToSend();
+
+ void cleanUp() throws Exception;
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java 2008-06-23 14:50:51 UTC (rev 4559)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java 2008-06-23 15:56:26 UTC (rev 4560)
@@ -168,22 +168,19 @@
return;
}
- session.removeProducer(this);
-
- dispatcher.unregister(clientTargetID);
-
- closed = true;
+ doCleanup();
}
public void cleanUp()
{
- session.removeProducer(this);
-
- dispatcher.unregister(clientTargetID);
-
- closed = true;
+ if (closed)
+ {
+ return;
+ }
+
+ doCleanup();
}
-
+
public boolean isClosed()
{
return closed;
@@ -229,6 +226,15 @@
// Private --------------------------------------------------------------------------------------
+ private void doCleanup()
+ {
+ session.removeProducer(this);
+
+ dispatcher.unregister(clientTargetID);
+
+ closed = true;
+ }
+
private void doSend(final SimpleString address, final ClientMessage msg) throws MessagingException
{
if (address != null)
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerInternal.java 2008-06-23 14:50:51 UTC (rev 4559)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerInternal.java 2008-06-23 15:56:26 UTC (rev 4560)
@@ -36,4 +36,6 @@
void receiveCredits(int credits) throws Exception;
int getAvailableCredits();
+
+ void cleanUp();
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-06-23 14:50:51 UTC (rev 4559)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-06-23 15:56:26 UTC (rev 4560)
@@ -21,25 +21,64 @@
*/
package org.jboss.messaging.core.client.impl;
-import org.jboss.messaging.core.client.*;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.jboss.messaging.core.client.ClientBrowser;
+import org.jboss.messaging.core.client.ClientConnectionFactory;
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.PacketDispatcher;
import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.impl.wireformat.*;
+import org.jboss.messaging.core.remoting.impl.wireformat.ConsumerFlowCreditMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionAddDestinationMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCancelMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateBrowserMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateBrowserResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAForgetMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAGetInDoubtXidsResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAGetTimeoutResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAJoinMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAPrepareMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResumeMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXARollbackMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAStartMessage;
import org.jboss.messaging.util.MessagingBuffer;
import org.jboss.messaging.util.MessagingBufferFactory;
import org.jboss.messaging.util.SimpleString;
import org.jboss.messaging.util.TokenBucketLimiterImpl;
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-import java.util.*;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
/**
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
@@ -478,11 +517,6 @@
{
closeChildren();
- if (cacheProducers)
- {
- producerCache.clear();
- }
-
//Flush any acks to the server
acknowledgeInternal(false);
@@ -490,11 +524,7 @@
}
finally
{
- executorService.shutdown();
-
- connection.removeSession(this);
-
- closed = true;
+ doCleanup();
}
}
@@ -516,17 +546,23 @@
return new ClientMessageImpl(durable, body);
}
- public synchronized void cleanUp()
+ public synchronized void cleanUp() throws Exception
{
- cleanUpChildren();
-
- executorService.shutdown();
-
- connection.removeSession(this);
-
- closed = true;
+ if (closed)
+ {
+ return;
+ }
+
+ try
+ {
+ cleanUpChildren();
+ }
+ finally
+ {
+ doCleanup();
+ }
}
-
+
public boolean isClosed()
{
return closed;
@@ -992,18 +1028,18 @@
}
}
- private void cleanUpChildren()
+ private void cleanUpChildren() throws Exception
{
- Set<ClientConsumer> consumersClone = new HashSet<ClientConsumer>(consumers);
+ Set<ClientConsumerInternal> consumersClone = new HashSet<ClientConsumerInternal>(consumers);
- for (ClientConsumer consumer: consumersClone)
+ for (ClientConsumerInternal consumer: consumersClone)
{
consumer.cleanUp();
}
- Set<ClientProducer> producersClone = new HashSet<ClientProducer>(producers);
+ Set<ClientProducerInternal> producersClone = new HashSet<ClientProducerInternal>(producers);
- for (ClientProducer producer: producersClone)
+ for (ClientProducerInternal producer: producersClone)
{
producer.cleanUp();
}
@@ -1016,6 +1052,20 @@
}
}
+ private void doCleanup()
+ {
+ executorService.shutdown();
+
+ connection.removeSession(this);
+
+ if (cacheProducers)
+ {
+ producerCache.clear();
+ }
+
+ closed = true;
+ }
+
// Inner Classes --------------------------------------------------------------------------------
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java 2008-06-23 14:50:51 UTC (rev 4559)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java 2008-06-23 15:56:26 UTC (rev 4560)
@@ -64,4 +64,6 @@
Set<ClientBrowser> getBrowsers();
Map<SimpleString, ClientProducerInternal> getProducerCache();
+
+ void cleanUp() throws Exception;
}
Modified: trunk/src/main/org/jboss/messaging/jms/client/JBossConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossConnection.java 2008-06-23 14:50:51 UTC (rev 4559)
+++ trunk/src/main/org/jboss/messaging/jms/client/JBossConnection.java 2008-06-23 15:56:26 UTC (rev 4560)
@@ -22,15 +22,35 @@
package org.jboss.messaging.jms.client;
+import javax.jms.Connection;
+import javax.jms.ConnectionConsumer;
+import javax.jms.ConnectionMetaData;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.QueueConnection;
+import javax.jms.QueueSession;
+import javax.jms.ServerSessionPool;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicSession;
+import javax.jms.XAConnection;
+import javax.jms.XAQueueConnection;
+import javax.jms.XAQueueSession;
+import javax.jms.XASession;
+import javax.jms.XATopicConnection;
+import javax.jms.XATopicSession;
+
import org.jboss.messaging.core.client.ClientConnection;
import org.jboss.messaging.core.client.ClientSession;
import org.jboss.messaging.core.client.RemotingSessionListener;
+import org.jboss.messaging.core.client.impl.ClientConnectionInternal;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
-import javax.jms.*;
-import javax.jms.IllegalStateException;
-
/**
* @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
@@ -392,7 +412,9 @@
public void sessionDestroyed(long sessionID, MessagingException me)
{
if (me == null)
+ {
return;
+ }
if (exceptionListener != null)
{
@@ -403,7 +425,16 @@
exceptionListener.onException(je);
}
- connection.cleanUp();
+ try
+ {
+ //FIXME - this should not be called from the jms layer
+ //We need cleanup to also occur when core connections are dead, and the user may not be using the jms api
+ ((ClientConnectionInternal)connection).cleanUp();
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to cleanup connection", e);
+ }
}
}
Modified: trunk/src/main/org/jboss/messaging/util/MessagingBufferFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/MessagingBufferFactory.java 2008-06-23 14:50:51 UTC (rev 4559)
+++ trunk/src/main/org/jboss/messaging/util/MessagingBufferFactory.java 2008-06-23 15:56:26 UTC (rev 4560)
@@ -35,7 +35,7 @@
{
public static MessagingBuffer createMessagingBuffer(TransportType transportType, int len)
{
- if(transportType == TransportType.TCP)
+ if (transportType == TransportType.TCP)
{
return new IoBufferWrapper(len);
}
@@ -51,7 +51,7 @@
public static MessagingBuffer createMessagingBuffer(MessagingBuffer buffer, int len)
{
- if(buffer instanceof IoBufferWrapper)
+ if (buffer instanceof IoBufferWrapper)
{
return new IoBufferWrapper(len);
}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossConnectionTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossConnectionTest.java 2008-06-23 14:50:51 UTC (rev 4559)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossConnectionTest.java 2008-06-23 15:56:26 UTC (rev 4560)
@@ -22,21 +22,41 @@
package org.jboss.messaging.tests.unit.jms.client;
+import static org.easymock.EasyMock.createStrictMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.jboss.messaging.tests.util.RandomUtil.randomString;
+
+import javax.jms.ConnectionConsumer;
+import javax.jms.ConnectionMetaData;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.QueueSession;
+import javax.jms.ServerSessionPool;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicSession;
+import javax.jms.XAQueueSession;
+import javax.jms.XASession;
+import javax.jms.XATopicSession;
+
import junit.framework.TestCase;
+
import org.easymock.EasyMock;
-import static org.easymock.EasyMock.*;
import org.easymock.IArgumentMatcher;
import org.jboss.messaging.core.client.ClientConnection;
import org.jboss.messaging.core.client.ClientSession;
import org.jboss.messaging.core.client.RemotingSessionListener;
+import org.jboss.messaging.core.client.impl.ClientConnectionInternal;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.version.Version;
import org.jboss.messaging.jms.client.JBossConnection;
import org.jboss.messaging.tests.util.RandomUtil;
-import static org.jboss.messaging.tests.util.RandomUtil.randomString;
-import javax.jms.*;
-
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
*
@@ -683,7 +703,7 @@
public void testResourcesCleanedUp() throws Exception
{
- ClientConnection clientConn = createStrictMock(ClientConnection.class);
+ ClientConnectionInternal clientConn = createStrictMock(ClientConnectionInternal.class);
FailureListenerMatcher failureListenerMatcher = new FailureListenerMatcher();
EasyMock.reportMatcher(failureListenerMatcher);
clientConn.setRemotingSessionListener(null);
More information about the jboss-cvs-commits
mailing list