[jboss-cvs] JBoss Messaging SVN: r2045 - in trunk: src/main/org/jboss/jms/server and 10 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Jan 24 20:55:38 EST 2007
Author: ovidiu.feodorov at jboss.com
Date: 2007-01-24 20:55:37 -0500 (Wed, 24 Jan 2007)
New Revision: 2045
Modified:
trunk/src/main/org/jboss/jms/client/JBossConnection.java
trunk/src/main/org/jboss/jms/server/ConnectionManager.java
trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java
trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
trunk/src/main/org/jboss/jms/server/container/InjectionAspect.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
trunk/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java
trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
trunk/tests/src/org/jboss/test/messaging/jms/crash/CallbackFailureTest.java
trunk/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashLargeLeaseTest.java
trunk/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashNegativeLeaseTest.java
trunk/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashTest.java
trunk/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashTwoConnectionsTest.java
trunk/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashZeroLeaseTest.java
trunk/tests/src/org/jboss/test/messaging/jms/crash/CreateClientOnServerCommand.java
trunk/tests/src/org/jboss/test/messaging/jms/crash/CreateHangingConsumerCommand.java
trunk/tests/src/org/jboss/test/messaging/jms/crash/CreateTwoClientOnServerCommand.java
trunk/tests/src/org/jboss/test/messaging/jms/server/connectionmanager/SimpleConnectionManagerTest.java
trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java
Log:
minor refactoring in preparation of http://jira.jboss.org/jira/browse/JBMESSAGING-779
Modified: trunk/src/main/org/jboss/jms/client/JBossConnection.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JBossConnection.java 2007-01-25 00:05:49 UTC (rev 2044)
+++ trunk/src/main/org/jboss/jms/client/JBossConnection.java 2007-01-25 01:55:37 UTC (rev 2045)
@@ -220,7 +220,7 @@
// Public ---------------------------------------------------------------------------------------
- public String getRemotingClientSessionId()
+ public String getRemotingClientSessionID()
{
ConnectionState state = (ConnectionState)((ClientConnectionDelegate)delegate).getState();
Modified: trunk/src/main/org/jboss/jms/server/ConnectionManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ConnectionManager.java 2007-01-25 00:05:49 UTC (rev 2044)
+++ trunk/src/main/org/jboss/jms/server/ConnectionManager.java 2007-01-25 01:55:37 UTC (rev 2045)
@@ -27,6 +27,9 @@
/**
+ * An interface that allows management of ConnectionEnpoints and their association with remoting
+ * clients.
+ *
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @version <tt>$Revision$</tt>
@@ -42,16 +45,16 @@
/**
* @return null if there is no such connection.
*/
- ConnectionEndpoint unregisterConnection(String jmsClientVMId, String remotingClientSessionID);
+ ConnectionEndpoint unregisterConnection(String jmsClientVMID, String remotingClientSessionID);
- void handleClientFailure(String remotingSessionID);
-
- boolean containsSession(String remotingClientSessionID);
+ boolean containsRemotingSession(String remotingClientSessionID);
/**
- * Returns a list of active connections currently maintained by an instance of this manager.
- * The implementation should make a copy of the list to avoid ConcurrentModificationException.
- * The list could be empty, but never null.
+ * Returns a list of active connection endpoints currently maintained by an instance of this
+ * manager. The implementation should make a copy of the list to avoid
+ * ConcurrentModificationException. The list could be empty, but never null.
+ *
+ * @return List<ConnectionEndpoint>
*/
List getActiveConnections();
}
Modified: trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java 2007-01-25 00:05:49 UTC (rev 2044)
+++ trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java 2007-01-25 01:55:37 UTC (rev 2045)
@@ -109,10 +109,14 @@
if (refCount == 1 && enablePing)
{
+ // TODO Something is not quite right here, we can detect failure even if pinging is not
+ // enabled, for example if we try to send a callback to the client and sending the
+ // calback fails
+
// install the connection listener that listens for failed connections
server.invoke(connectorObjectName, "addConnectionListener",
new Object[] {connectionManager},
- new String[] {"org.jboss.remoting.ConnectionListener"});
+ new String[] {"org.jboss.remoting.ConnectionListener"});
}
// We use the MBean service name to uniquely identify the connection factory
Modified: trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java 2007-01-25 00:05:49 UTC (rev 2044)
+++ trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java 2007-01-25 01:55:37 UTC (rev 2045)
@@ -55,44 +55,47 @@
// Static ---------------------------------------------------------------------------------------
+ private static boolean trace = log.isTraceEnabled();
+
// Attributes -----------------------------------------------------------------------------------
+ // Map<jmsClientVMID<String> - Map<remotingClientSessionID<String> - ConnectionEndpoint>>
protected Map jmsClients;
-
- protected Map sessions;
- protected Set activeConnections;
+ // Map<remotingClientSessionID<String> - jmsClientVMID<String>
+ protected Map remotingSessions;
+ // Set<ConnectionEndpoint>
+ protected Set activeConnectionEndpoints;
+
// Constructors ---------------------------------------------------------------------------------
public SimpleConnectionManager()
{
jmsClients = new HashMap();
-
- sessions = new HashMap();
-
- activeConnections = new HashSet();
+ remotingSessions = new HashMap();
+ activeConnectionEndpoints = new HashSet();
}
- // ConnectionManager ----------------------------------------------------------------------------
+ // ConnectionManager implementation -------------------------------------------------------------
- public synchronized void registerConnection(String jmsClientVMId,
+ public synchronized void registerConnection(String jmsClientVMID,
String remotingClientSessionID,
ConnectionEndpoint endpoint)
{
- Map endpoints = (Map)jmsClients.get(jmsClientVMId);
+ Map endpoints = (Map)jmsClients.get(jmsClientVMID);
if (endpoints == null)
{
endpoints = new HashMap();
- jmsClients.put(jmsClientVMId, endpoints);
+ jmsClients.put(jmsClientVMID, endpoints);
}
endpoints.put(remotingClientSessionID, endpoint);
- sessions.put(remotingClientSessionID, jmsClientVMId);
+ remotingSessions.put(remotingClientSessionID, jmsClientVMID);
- activeConnections.add(endpoint);
+ activeConnectionEndpoints.add(endpoint);
log.debug("registered connection " + endpoint + " as " +
Util.guidToString(remotingClientSessionID));
@@ -120,85 +123,29 @@
jmsClients.remove(jmsClientVMId);
}
- sessions.remove(remotingClientSessionID);
+ remotingSessions.remove(remotingClientSessionID);
return e;
}
return null;
}
- public synchronized void handleClientFailure(String remotingSessionID)
+ public synchronized List getActiveConnections()
{
- String jmsClientId = (String)sessions.get(remotingSessionID);
-
- if (jmsClientId != null)
- {
- log.warn("A problem has been detected with the connection to remote client " +
- remotingSessionID + ". It is possible the client has exited without closing " +
- "its connection(s) or there is a network problem. All connection resources " +
- "corresponding to that client process will now be removed.");
+ // I will make a copy to avoid ConcurrentModification
+ ArrayList list = new ArrayList();
+ list.addAll(activeConnectionEndpoints);
+ return list;
+ }
- // Remoting only provides one pinger per invoker, not per connection therefore when the
- // pinger dies we must close ALL the connections corresponding to that jms client id
-
- Map endpoints = (Map)jmsClients.get(jmsClientId);
-
- if (endpoints != null)
- {
- List sces = new ArrayList();
-
- Iterator iter = endpoints.entrySet().iterator();
-
- while (iter.hasNext())
- {
- Map.Entry entry = (Map.Entry)iter.next();
-
- ConnectionEndpoint sce = (ConnectionEndpoint)entry.getValue();
-
- sces.add(sce);
- }
-
- //Now close the end points - this will result in a callback into unregisterConnection
- //to remove the data from the jmsClients and sessions maps.
- //Note we do this outside the loop to prevent ConcurrentModificationException
-
- iter = sces.iterator();
-
- while (iter.hasNext())
- {
- ConnectionEndpoint sce = (ConnectionEndpoint)iter.next();
-
- try
- {
- sce.closing();
- sce.close();
- log.debug("cleared up state for connection " + sce);
- }
- catch (JMSException e)
- {
- log.error("Failed to close connection", e);
- }
- }
- }
- }
- }
-
/*
* Used in testing only
*/
- public synchronized boolean containsSession(String remotingClientSessionID)
+ public synchronized boolean containsRemotingSession(String remotingClientSessionID)
{
- return sessions.containsKey(remotingClientSessionID);
+ return remotingSessions.containsKey(remotingClientSessionID);
}
- public synchronized List getActiveConnections()
- {
- // I will make a copy to avoid ConcurrentModification
- ArrayList list = new ArrayList();
- list.addAll(activeConnections);
- return list;
- }
-
/*
* Used in testing only
*/
@@ -207,24 +154,26 @@
return Collections.unmodifiableMap(jmsClients);
}
- // ConnectionListener ---------------------------------------------------------------------------
+ // ConnectionListener implementation ------------------------------------------------------------
/**
* Be aware that ConnectionNotifier uses to call this method with null Throwables.
- * @param t - expect it to be null!
+ *
+ * @param t - plan for it to be null!
*/
public void handleConnectionException(Throwable t, Client client)
{
if (t instanceof ClientDisconnectedException)
{
// This is OK
- if (log.isTraceEnabled()) { log.trace(this + " Notified that client " + client + " has disconnected"); }
+ if (trace) { log.trace(this + " notified that client " + client + " has disconnected"); }
return;
}
else
{
- if (log.isTraceEnabled()) { log.trace(this + " Failure on client " + client, t); }
+ if (trace) { log.trace(this + " detected failure on client " + client, t); }
}
+
String remotingSessionID = client.getSessionId();
if (remotingSessionID != null)
@@ -247,6 +196,65 @@
// Public ---------------------------------------------------------------------------------------
+ /**
+ * TODO - this method shouldn't be part of the public interace
+ */
+ public synchronized void handleClientFailure(String remotingSessionID)
+ {
+ String jmsClientID = (String)remotingSessions.get(remotingSessionID);
+
+ if (jmsClientID != null)
+ {
+ log.warn("A problem has been detected with the connection to remote client " +
+ remotingSessionID + ". It is possible the client has exited without closing " +
+ "its connection(s) or there is a network problem. All connection resources " +
+ "corresponding to that client process will now be removed.");
+
+ // Remoting only provides one pinger per invoker, not per connection therefore when the
+ // pinger dies we must close ALL the connections corresponding to that jms client id
+
+ Map endpoints = (Map)jmsClients.get(jmsClientID);
+
+ if (endpoints != null)
+ {
+ List sces = new ArrayList();
+
+ Iterator iter = endpoints.entrySet().iterator();
+
+ while (iter.hasNext())
+ {
+ Map.Entry entry = (Map.Entry)iter.next();
+
+ ConnectionEndpoint sce = (ConnectionEndpoint)entry.getValue();
+
+ sces.add(sce);
+ }
+
+ // Now close the end points - this will result in a callback into unregisterConnection
+ // to remove the data from the jmsClients and sessions maps.
+ // Note we do this outside the loop to prevent ConcurrentModificationException
+
+ iter = sces.iterator();
+
+ while (iter.hasNext())
+ {
+ ConnectionEndpoint sce = (ConnectionEndpoint)iter.next();
+
+ try
+ {
+ sce.closing();
+ sce.close();
+ log.debug("cleared up state for connection " + sce);
+ }
+ catch (JMSException e)
+ {
+ log.error("Failed to close connection", e);
+ }
+ }
+ }
+ }
+ }
+
public String toString()
{
return "ConnectionManager[" + Integer.toHexString(hashCode()) + "]";
Modified: trunk/src/main/org/jboss/jms/server/container/InjectionAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/container/InjectionAspect.java 2007-01-25 00:05:49 UTC (rev 2044)
+++ trunk/src/main/org/jboss/jms/server/container/InjectionAspect.java 2007-01-25 01:55:37 UTC (rev 2045)
@@ -40,18 +40,16 @@
*/
public class InjectionAspect
{
- // Constants -----------------------------------------------------
+ // Constants -----------------------------------------------------------------------------------
- // Static --------------------------------------------------------
+ // Static --------------------------------------------------------------------------------------
- // Attributes ----------------------------------------------------
+ // Attributes ----------------------------------------------------------------------------------
- // Constructors --------------------------------------------------
+ // Constructors --------------------------------------------------------------------------------
- // Public --------------------------------------------------------
+ // Public --------------------------------------------------------------------------------------
- // Interceptor implementation ------------------------------------
-
public Object handleCreateConnectionDelegate(Invocation invocation) throws Throwable
{
MethodInvocation mi = (MethodInvocation)invocation;
@@ -80,33 +78,28 @@
endpoint.setCallbackHandler(handler);
// Then we inject the remoting session id of the client
- String sessionId =
- (String)mi.getMetaData(MetaDataConstants.JMS,
- MetaDataConstants.REMOTING_SESSION_ID);
-
+ String sessionId = (String)mi.getMetaData(MetaDataConstants.JMS,
+ MetaDataConstants.REMOTING_SESSION_ID);
if (sessionId == null)
{
- throw new IllegalStateException("Can't find session id");
+ throw new IllegalStateException("Can't find remoting session ID");
}
- // Then we inject the unique id of the client VM
- String jmsClientVMID =
- (String)mi.getMetaData(MetaDataConstants.JMS,
- MetaDataConstants.JMS_CLIENT_VM_ID);
+ // Then we inject the unique ID of the client VM
+ String jmsClientVMID = (String)mi.getMetaData(MetaDataConstants.JMS,
+ MetaDataConstants.JMS_CLIENT_VM_ID);
if (jmsClientVMID == null)
{
- throw new IllegalStateException("Can't find jms client id");
+ throw new IllegalStateException("Can't find jms client ID");
}
endpoint.setRemotingInformation(jmsClientVMID, sessionId);
// Then we inject the version number from to be used
- Byte ver =
- (Byte)mi.getMetaData(MetaDataConstants.JMS,
- MetaDataConstants.VERSION_NUMBER);
-
+ Byte ver = (Byte)mi.getMetaData(MetaDataConstants.JMS,
+ MetaDataConstants.VERSION_NUMBER);
if (ver == null)
{
throw new IllegalStateException("Can't find version");
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2007-01-25 00:05:49 UTC (rev 2044)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2007-01-25 01:55:37 UTC (rev 2045)
@@ -91,8 +91,8 @@
private String username;
private String password;
- private String remotingClientSessionId;
- private String jmsClientVMId;
+ private String remotingClientSessionID;
+ private String jmsClientVMID;
// the server itself
private ServerPeer serverPeer;
@@ -347,7 +347,7 @@
temporaryDestinations.clear();
}
- cm.unregisterConnection(jmsClientVMId, remotingClientSessionId);
+ cm.unregisterConnection(jmsClientVMID, remotingClientSessionID);
JMSDispatcher.instance.unregisterTarget(new Integer(id));
@@ -459,8 +459,10 @@
{
return sm;
}
-
- //IOC
+
+ /**
+ * IOC
+ */
public void setCallbackHandler(ServerInvokerCallbackHandler handler)
{
callbackHandler = handler;
@@ -493,15 +495,16 @@
return callbackHandler;
}
- // IOC
- public void setRemotingInformation(String jmsClientVMId, String remotingClientSessionId)
+ /**
+ * IOC
+ */
+ public void setRemotingInformation(String jmsClientVMID, String remotingClientSessionID)
{
- this.remotingClientSessionId = remotingClientSessionId;
+ this.remotingClientSessionID = remotingClientSessionID;
+ this.jmsClientVMID = jmsClientVMID;
- this.jmsClientVMId = jmsClientVMId;
-
this.serverPeer.getConnectionManager().
- registerConnection(jmsClientVMId, remotingClientSessionId, this);
+ registerConnection(jmsClientVMID, remotingClientSessionID, this);
}
public void setUsingVersion(byte version)
@@ -596,9 +599,9 @@
}
}
- String getRemotingClientSessionId()
+ String getRemotingClientSessionID()
{
- return remotingClientSessionId;
+ return remotingClientSessionID;
}
void sendMessage(JBossMessage msg, Transaction tx) throws Exception
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2007-01-25 00:05:49 UTC (rev 2044)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2007-01-25 01:55:37 UTC (rev 2045)
@@ -250,7 +250,7 @@
}
catch (HandleCallbackException e)
{
- log.error("Failed to handle callback", e);
+ log.error(this + " failed to handle callback", e);
return null;
}
Modified: trunk/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java 2007-01-25 00:05:49 UTC (rev 2044)
+++ trunk/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java 2007-01-25 01:55:37 UTC (rev 2045)
@@ -163,7 +163,7 @@
public void removeListener(InvokerCallbackHandler callbackHandler)
{
- log.debug("removing callback handler: " + callbackHandler);
+ log.debug("removing callback handler " + callbackHandler);
synchronized(callbackHandlers)
{
Modified: trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/ChannelSupport.java 2007-01-25 00:05:49 UTC (rev 2044)
+++ trunk/src/main/org/jboss/messaging/core/ChannelSupport.java 2007-01-25 01:55:37 UTC (rev 2045)
@@ -737,38 +737,37 @@
{
//Have reached maximum size - will drop message
- log.warn("Queue " + this + " has reached maximum size, ref " + ref + " will be dropped");
+ log.warn(this + " has reached maximum size, " + ref + " will be dropped");
return null;
}
- //Each channel has its own copy of the reference
+ // Each channel has its own copy of the reference
ref = ref.copy();
try
{
if (ref.isReliable() && !recoverable)
{
- //Reliable reference in a non recoverable channel-
- //We handle it as a non reliable reference
- //It's important that we set it to non reliable otherwise if the channel
- //pages and is non recoverable a reliable ref will be paged in the database as reliable
- //which makes them hard to remove on server restart.
- //If we always page them as unreliable then it is easy to remove them.
+ // Reliable reference in a non recoverable channel. We handle it as a non reliable
+ // reference. It's important that we set it to non reliable otherwise if the channel
+ // pages and is non recoverable a reliable ref will be paged in the database as reliable
+ // which makes them hard to remove on server restart. If we always page them as
+ // unreliable then it is easy to remove them.
ref.setReliable(false);
}
if (tx == null)
{
- // Don't even attempt synchronous delivery for a reliable message
- // when we have an non-recoverable state that doesn't accept reliable messages. If
- // we do, we may get into the situation where we need to reliably store an active
- // delivery of a reliable message, which in these conditions cannot be done.
+ // Don't even attempt synchronous delivery for a reliable message when we have an
+ // non-recoverable state that doesn't accept reliable messages. If we do, we may get
+ // into the situation where we need to reliably store an active delivery of a reliable
+ // message, which in these conditions cannot be done.
if (ref.isReliable() && !acceptReliableMessages)
{
- log.error("Cannot handle reliable message " + ref
- + " because the channel has a non-recoverable state!");
+ log.error("Cannot handle reliable message " + ref +
+ " because the channel has a non-recoverable state!");
return null;
}
@@ -777,12 +776,12 @@
// Reliable message in a recoverable state - also add to db
if (trace) { log.trace(this + " adding " + ref + " to database non-transactionally"); }
- //TODO - this db access could safely be done outside the event loop
+ // TODO - this db access could safely be done outside the event loop
pm.addReference(channelID, ref, null);
}
- //If the ref has a scheduled delivery time then we don't add to the in memory queue
- //instead we create a timeout, so when that time comes delivery will attempted directly
+ // If the ref has a scheduled delivery time then we don't add to the in memory queue
+ // instead we create a timeout, so when that time comes delivery will attempted directly
if (!checkAndSchedule(ref))
{
@@ -791,8 +790,8 @@
addReferenceInMemory(ref);
}
- // We only do delivery if there are receivers that haven't said they don't want
- // any more references.
+ // We only do delivery if there are receivers that haven't said they don't want any
+ // more references.
if (receiversReady)
{
// Prompt delivery
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java 2007-01-25 00:05:49 UTC (rev 2044)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java 2007-01-25 01:55:37 UTC (rev 2045)
@@ -62,9 +62,6 @@
import EDU.oswego.cs.dl.util.concurrent.ReentrantWriterPreferenceReadWriteLock;
/**
- *
- * A DefaultPostOffice
- *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
@@ -75,39 +72,41 @@
*/
public class DefaultPostOffice extends JDBCSupport implements PostOffice
{
+ // Constants ------------------------------------------------------------------------------------
+
private static final Logger log = Logger.getLogger(DefaultPostOffice.class);
-
- private boolean trace = log.isTraceEnabled();
-
- private String officeName;
-
- //This lock protects the condition and name maps
- protected ReadWriteLock lock;
-
- protected MessageStore ms;
-
+
+ // Static ---------------------------------------------------------------------------------------
+
+ private static boolean trace = log.isTraceEnabled();
+
+ // Attributes -----------------------------------------------------------------------------------
+
+ protected MessageStore ms;
protected PersistenceManager pm;
-
protected TransactionRepository tr;
-
+ protected FilterFactory filterFactory;
+ protected ConditionFactory conditionFactory;
+ protected QueuedExecutorPool pool;
protected int currentNodeId;
-
+
// Map <NodeID, Map<queueName, Binding>>
protected Map nameMaps;
-
+
// Map <Condition, Bindings>
protected Map conditionMap;
-
- protected FilterFactory filterFactory;
-
- protected ConditionFactory conditionFactory;
-
- protected QueuedExecutorPool pool;
-
+
+ // this lock protects the condition and name maps
+ protected ReadWriteLock lock;
+
+ private String officeName;
+
+ // Constructors ---------------------------------------------------------------------------------
+
public DefaultPostOffice()
- {
+ {
}
-
+
public DefaultPostOffice(DataSource ds, TransactionManager tm, Properties sqlProperties,
boolean createTablesOnStartup,
int nodeId, String officeName, MessageStore ms,
@@ -115,42 +114,36 @@
TransactionRepository tr, FilterFactory filterFactory,
ConditionFactory conditionFactory,
QueuedExecutorPool pool)
- {
+ {
super (ds, tm, sqlProperties, createTablesOnStartup);
-
+
lock = new ReentrantWriterPreferenceReadWriteLock();
-
+
nameMaps = new LinkedHashMap();
-
- conditionMap = new LinkedHashMap();
-
+
+ conditionMap = new LinkedHashMap();
+
this.currentNodeId = nodeId;
-
- this.officeName = officeName;
-
this.ms = ms;
-
this.pm = pm;
-
this.tr = tr;
-
this.filterFactory = filterFactory;
-
this.conditionFactory = conditionFactory;
-
this.pool = pool;
+ this.officeName = officeName;
+
}
-
- // MessagingComponent implementation --------------------------------
-
+
+ // MessagingComponent implementation ------------------------------------------------------------
+
public void start() throws Exception
{
if (trace) { log.trace(this + " starting"); }
-
+
super.start();
-
+
loadBindings();
-
+
log.debug(this + " started");
}
@@ -162,114 +155,114 @@
public void stop(boolean sendNotification) throws Exception
{
if (trace) { log.trace(this + " stopping"); }
-
+
super.stop();
-
+
log.debug(this + " stopped");
}
-
- // PostOffice implementation ---------------------------------------
+ // PostOffice implementation --------------------------------------------------------------------
+
public String getOfficeName()
{
return officeName;
}
-
+
public Binding bindQueue(Condition condition, Queue queue) throws Exception
{
if (trace) { log.trace(this + " binding queue " + queue.getName() + " with condition " + condition); }
-
+
if (queue.getName() == null)
{
throw new IllegalArgumentException("Queue name is null");
}
-
+
if (condition == null)
{
throw new IllegalArgumentException("Condition is null");
}
-
+
lock.writeLock().acquire();
try
- {
+ {
//We currently only allow one binding per name per node
Map nameMap = (Map)nameMaps.get(new Integer(currentNodeId));
-
+
Binding binding = null;
-
+
if (nameMap != null)
{
binding = (Binding)nameMap.get(queue.getName());
}
-
+
if (binding != null)
{
throw new IllegalArgumentException("Binding already exists for name " + queue.getName());
}
-
+
binding = new DefaultBinding(currentNodeId, condition, queue, false);
-
+
addBinding(binding);
-
+
if (queue.isRecoverable())
{
- //Need to write the binding to the db
- insertBinding(binding);
+ //Need to write the binding to the db
+ insertBinding(binding);
}
-
- return binding;
+
+ return binding;
}
finally
{
lock.writeLock().release();
}
- }
-
+ }
+
public Binding unbindQueue( String queueName) throws Throwable
{
if (trace) { log.trace(this + " unbinding queue " + queueName); }
-
+
if (queueName == null)
{
throw new IllegalArgumentException("Queue name is null");
}
-
+
lock.writeLock().acquire();
try
- {
+ {
Binding binding = removeBinding(currentNodeId,queueName);
-
+
if (binding.getQueue().isRecoverable())
{
//Need to remove from db too
-
+
deleteBinding(currentNodeId, binding.getQueue().getName());
}
-
- binding.getQueue().removeAllReferences();
-
- return binding;
+
+ binding.getQueue().removeAllReferences();
+
+ return binding;
}
finally
{
lock.writeLock().release();
}
- }
-
+ }
+
public Collection getBindingsForCondition(Condition condition) throws Exception
{
return listBindingsForConditionInternal(condition, true);
- }
-
+ }
+
public Binding getBindingForQueueName(String queueName) throws Exception
{
if (queueName == null)
{
throw new IllegalArgumentException("Queue name is null");
}
-
+
lock.readLock().acquire();
try
@@ -282,163 +275,252 @@
}
}
- /**
- * Internal methods (e.g. failOver) will already hold a lock and will need to call
- * getBindingForQueueNames() without a lock. (Also... I dind't move this method to the protected
- * section of the code as this is related to getBindingForQueueNames).
- */
- protected Binding internalGetBindingForQueueName(String queueName)
+ public boolean route(MessageReference ref, Condition condition, Transaction tx) throws Exception
{
- Map nameMap = (Map)nameMaps.get(new Integer(currentNodeId));
+ if (trace) { log.trace(this + " routing " + ref + " with condition '" + condition + "' " + (tx == null ? "non-transactionally" : " in " + tx)); }
- Binding binding = null;
-
- if (nameMap != null)
- {
- binding = (Binding)nameMap.get(queueName);
- }
-
- return binding;
- }
-
- public boolean route(MessageReference ref, Condition condition, Transaction tx) throws Exception
- {
- if (trace) { log.trace(this + " routing ref " + ref + " with condition " + condition + " and transaction " + tx); }
-
if (ref == null)
{
throw new IllegalArgumentException("Message reference is null");
}
-
+
if (condition == null)
{
throw new IllegalArgumentException("Condition key is null");
}
-
+
boolean routed = false;
-
+
lock.readLock().acquire();
-
+
try
- {
+ {
Bindings bd = (Bindings)conditionMap.get(condition);
-
+
if (bd != null)
- {
+ {
boolean startInternalTx = false;
-
+
if (tx == null && ref.isReliable())
{
if (bd.getDurableCount() > 1)
{
- // When routing a persistent message without a transaction then we may need to start an
- // internal transaction in order to route it.
- // This is so we can guarantee the message is delivered to all or none of the subscriptions.
- // We need to do this if there is more than one durable sub
+ // When routing a persistent message without a transaction then we may need to
+ // start an internal transaction in order to route it. This is so we can guarantee
+ // the message is delivered to all or none of the subscriptions. We need to do
+ // this if there is more than one durable subscription.
startInternalTx = true;
}
}
-
+
if (startInternalTx)
{
tx = tr.createTransaction();
}
-
+
Collection bindings = bd.getAllBindings();
-
+
Iterator iter = bindings.iterator();
-
+
while (iter.hasNext())
{
Binding binding = (Binding)iter.next();
-
+
//Sanity check
if (binding.getNodeID() != this.currentNodeId)
{
throw new IllegalStateException("Local post office has foreign bindings!");
}
-
+
Queue queue = binding.getQueue();
-
+
Delivery del = queue.handle(null, ref, tx);
-
+
if (del != null && del.isSelectorAccepted())
{
routed = true;
- }
- }
-
+ }
+ }
+
if (startInternalTx)
{
//TODO - do we need to rollback if an exception is thrown??
tx.commit();
}
}
-
+
return routed;
}
finally
- {
+ {
lock.readLock().release();
}
- }
-
+ }
+
public boolean isLocal()
{
return true;
}
-
+
public Binding getBindingforChannelId(long channelId) throws Exception
{
lock.readLock().acquire();
-
+
try
- {
+ {
Map nameMap = (Map)nameMaps.get(new Integer(currentNodeId));
-
+
if (nameMap == null)
{
throw new IllegalStateException("Cannot find name map for current node " + currentNodeId);
}
-
+
Binding binding = null;
-
+
for (Iterator iterbindings = nameMap.values().iterator(); iterbindings.hasNext();)
{
Binding itemBinding = (Binding)iterbindings.next();
-
+
if (itemBinding.getQueue().getChannelID() == channelId)
{
binding = itemBinding;
break;
}
}
-
+
return binding;
}
finally
{
lock.readLock().release();
- }
+ }
}
-
- // Protected -----------------------------------------------------
-
- protected Collection listBindingsForConditionInternal(Condition condition, boolean localOnly) throws Exception
+
+ // Public ---------------------------------------------------------------------------------------
+
+ public String printBindingInformation()
{
+ StringWriter buffer = new StringWriter();
+ PrintWriter out = new PrintWriter(buffer);
+
+ out.println("Ocurrencies of nameMaps:");
+ out.println("<table border=1>");
+ for (Iterator mapIterator = nameMaps.entrySet().iterator();mapIterator.hasNext();)
+ {
+ Map.Entry entry = (Map.Entry)mapIterator.next();
+ out.println("<tr><td colspan=3><b>Map on node " + entry.getKey() + "</b></td></tr>");
+ Map valuesOnNode = (Map)entry.getValue();
+
+ out.println("<tr><td>Key</td><td>Value</td><td>Class of Value</td></tr>");
+ for (Iterator valuesIterator=valuesOnNode.entrySet().iterator();valuesIterator.hasNext();)
+ {
+ Map.Entry entry2 = (Map.Entry)valuesIterator.next();
+
+ out.println("<tr>");
+ out.println("<td>" + entry2.getKey() + "</td><td>" + entry2.getValue()+
+ "</td><td>" + entry2.getValue().getClass().getName() + "</td>");
+ out.println("</tr>");
+
+ if (entry2.getValue() instanceof Binding &&
+ ((Binding)entry2.getValue()).getQueue() instanceof PagingFilteredQueue)
+ {
+ PagingFilteredQueue queue =
+ (PagingFilteredQueue)((Binding)entry2.getValue()).getQueue();
+ List undelivered = queue.undelivered(null);
+ if (!undelivered.isEmpty())
+ {
+ out.println("<tr><td>List of undelivered messages on Paging</td>");
+
+ out.println("<td colspan=2><table border=1>");
+ out.println("<tr><td>Reference#</td><td>Message</td></tr>");
+ for (Iterator i = undelivered.iterator();i.hasNext();)
+ {
+ MessageReference reference = (MessageReference)i.next();
+ out.println("<tr><td>" + reference.getInMemoryChannelCount() +
+ "</td><td>" + reference.getMessage() +"</td></tr>");
+ }
+ out.println("</table></td>");
+ out.println("</tr>");
+ }
+ }
+ }
+ }
+
+ out.println("</table>");
+ out.println("<br>Ocurrencies of conditionMap:");
+ out.println("<table border=1>");
+ out.println("<tr><td>EntryName</td><td>Value</td>");
+
+ for (Iterator iterConditions = conditionMap.entrySet().iterator();iterConditions.hasNext();)
+ {
+ Map.Entry entry = (Map.Entry)iterConditions.next();
+ out.println("<tr><td>" + entry.getKey() + "</td><td>" + entry.getValue() + "</td></tr>");
+
+ if (entry.getValue() instanceof Bindings)
+ {
+ out.println("<tr><td>Binding Information:</td><td>");
+ out.println("<table border=1>");
+ out.println("<tr><td>Binding</td><td>Queue</td></tr>");
+ Bindings bindings = (Bindings)entry.getValue();
+ for (Iterator i = bindings.getAllBindings().iterator();i.hasNext();)
+ {
+
+ Binding binding = (Binding)i.next();
+ out.println("<tr><td>" + binding + "</td><td>" + binding.getQueue() +
+ "</td></tr>");
+ }
+ out.println("</table></td></tr>");
+ }
+ }
+ out.println("</table>");
+
+ return buffer.toString();
+ }
+
+ public String toString()
+ {
+ return "DefaultPostOffice[" + Integer.toHexString(hashCode()) + "]";
+ }
+
+ // Package protected ----------------------------------------------------------------------------
+
+ // Protected ------------------------------------------------------------------------------------
+
+ /**
+ * Internal methods (e.g. failOver) will already hold a lock and will need to call
+ * getBindingForQueueNames() without a lock. (Also... I dind't move this method to the protected
+ * section of the code as this is related to getBindingForQueueNames).
+ */
+ protected Binding internalGetBindingForQueueName(String queueName)
+ {
+ Map nameMap = (Map)nameMaps.get(new Integer(currentNodeId));
+
+ Binding binding = null;
+
+ if (nameMap != null)
+ {
+ binding = (Binding)nameMap.get(queueName);
+ }
+
+ return binding;
+ }
+
+ protected Collection listBindingsForConditionInternal(Condition condition, boolean localOnly)
+ throws Exception
+ {
if (condition == null)
{
throw new IllegalArgumentException("Condition is null");
}
-
+
lock.readLock().acquire();
-
+
try
{
//We should only list the bindings for the local node
-
- Bindings cb = (Bindings)conditionMap.get(condition);
-
+
+ Bindings cb = (Bindings)conditionMap.get(condition);
+
if (cb == null)
{
return Collections.EMPTY_LIST;
@@ -446,21 +528,21 @@
else
{
List list = new ArrayList();
-
+
Collection bindings = cb.getAllBindings();
-
+
Iterator iter = bindings.iterator();
-
+
while (iter.hasNext())
{
Binding binding = (Binding)iter.next();
-
+
if (!localOnly || (binding.getNodeID() == this.currentNodeId))
{
list.add(binding);
}
}
-
+
return list;
}
}
@@ -469,13 +551,12 @@
lock.readLock().release();
}
}
-
+
//FIXME - this is not quite right
- //We should not load any bindings at startup - since then we do not have to create any queues internally
- //Creating queues is problematic since there are params we do not know until destination deploy time
- //e.g. paging params, maxsize etc.
- //This means we have to load the queues disabled and then set the params and re-activate them
- //which is not clean
+ // We should not load any bindings at startup - since then we do not have to create any queues
+ // internally. Creating queues is problematic since there are params we do not know until
+ // destination deploy time e.g. paging params, maxsize etc. This means we have to load the queues
+ // disabled and then set the params and re-activate them which is not clean.
protected void loadBindings() throws Exception
{
lock.writeLock().acquire();
@@ -484,29 +565,29 @@
PreparedStatement ps = null;
ResultSet rs = null;
TransactionWrapper wrap = new TransactionWrapper();
-
+
try
{
conn = ds.getConnection();
-
+
ps = conn.prepareStatement(getSQLStatement("LOAD_BINDINGS"));
-
+
ps.setString(1, officeName);
rs = ps.executeQuery();
-
+
while (rs.next())
{
int nodeID = rs.getInt(1);
String queueName = rs.getString(2);
String conditionText = rs.getString(3);
String selector = rs.getString(4);
-
+
if (rs.wasNull())
{
selector = null;
}
-
+
long channelID = rs.getLong(5);
boolean failed = rs.getString(6).equals("Y");
@@ -527,12 +608,12 @@
//Don't load other nodes binding
}
else
- {
+ {
Binding binding = createBinding(nodeID, condition, queueName, channelID,
selector, true, failed, failedNodeID);
-
+
log.debug(this + " loaded from database " + binding);
-
+
binding.getQueue().deactivate();
addBinding(binding);
}
@@ -564,7 +645,7 @@
protected Binding createBinding(int nodeID, Condition condition, String queueName,
long channelID, String filterString, boolean durable,
boolean failed, Integer failedNodeID) throws Exception
- {
+ {
Filter filter = filterFactory.createFilter(filterString);
return createBinding(nodeID, condition, queueName, channelID,
filter, durable, failed, failedNodeID);
@@ -590,30 +671,30 @@
throw new IllegalStateException("This is a non clustered post office - should not " +
"have bindings from different nodes!");
}
-
+
return new DefaultBinding(nodeID, condition, queue, failed);
}
-
+
protected void insertBinding(Binding binding) throws Exception
{
Connection conn = null;
- PreparedStatement ps = null;
+ PreparedStatement ps = null;
TransactionWrapper wrap = new TransactionWrapper();
-
+
try
{
conn = ds.getConnection();
-
+
ps = conn.prepareStatement(getSQLStatement("INSERT_BINDING"));
-
+
String filterString =
binding.getQueue().getFilter() == null ?
null : binding.getQueue().getFilter().getFilterString();
-
+
ps.setString(1, officeName);
ps.setInt(2, currentNodeId);
ps.setString(3, binding.getQueue().getName());
- ps.setString(4, binding.getCondition().toText());
+ ps.setString(4, binding.getCondition().toText());
if (filterString != null)
{
ps.setString(5, filterString);
@@ -648,28 +729,28 @@
conn.close();
}
wrap.end();
- }
+ }
}
-
+
protected boolean deleteBinding(int parameterNodeId, String queueName) throws Exception
{
if (parameterNodeId<0) parameterNodeId=this.currentNodeId;
Connection conn = null;
PreparedStatement ps = null;
TransactionWrapper wrap = new TransactionWrapper();
-
+
try
{
conn = ds.getConnection();
ps = conn.prepareStatement(getSQLStatement("DELETE_BINDING"));
-
+
ps.setString(1, this.officeName);
ps.setInt(2, parameterNodeId);
ps.setString(3, queueName);
int rows = ps.executeUpdate();
-
+
return rows == 1;
}
finally
@@ -683,104 +764,24 @@
conn.close();
}
wrap.end();
- }
+ }
}
- public String printBindingInformation()
- {
- StringWriter buffer = new StringWriter();
- PrintWriter out = new PrintWriter(buffer);
-
- out.println("Ocurrencies of nameMaps:");
- out.println("<table border=1>");
- for (Iterator mapIterator = nameMaps.entrySet().iterator();mapIterator.hasNext();)
- {
- Map.Entry entry = (Map.Entry)mapIterator.next();
- out.println("<tr><td colspan=3><b>Map on node " + entry.getKey() + "</b></td></tr>");
- Map valuesOnNode = (Map)entry.getValue();
-
- out.println("<tr><td>Key</td><td>Value</td><td>Class of Value</td></tr>");
- for (Iterator valuesIterator=valuesOnNode.entrySet().iterator();valuesIterator.hasNext();)
- {
- Map.Entry entry2 = (Map.Entry)valuesIterator.next();
-
- out.println("<tr>");
- out.println("<td>" + entry2.getKey() + "</td><td>" + entry2.getValue()+ "</td><td>" + entry2.getValue().getClass().getName() + "</td>");
- out.println("</tr>");
-
- if (entry2.getValue() instanceof Binding && ((Binding)entry2.getValue()).getQueue() instanceof PagingFilteredQueue)
- {
- PagingFilteredQueue queue = (PagingFilteredQueue)((Binding)entry2.getValue()).getQueue();
- List undelivered = queue.undelivered(null);
- if (!undelivered.isEmpty())
- {
- out.println("<tr><td>List of undelivered messages on Paging</td>");
-
- out.println("<td colspan=2><table border=1>");
- out.println("<tr><td>Reference#</td><td>Message</td></tr>");
- for (Iterator iterUndelivered = undelivered.iterator();iterUndelivered.hasNext();)
- {
- MessageReference reference = (MessageReference)iterUndelivered.next();
- out.println("<tr><td>" + reference.getInMemoryChannelCount() + "</td><td>" + reference.getMessage() +"</td></tr>");
- }
- out.println("</table></td>");
- out.println("</tr>");
- }
- }
- //out.println(" bindingName=" +entry2.getKey() + " value = " + entry2.getValue() + " valueClass=" + entry2.getValue().getClass().getName());
- }
- }
- out.println("</table>");
-
-
-
- out.println("<br>Ocurrencies of conditionMap:");
-
- out.println("<table border=1>");
- out.println("<tr><td>EntryName</td><td>Value</td>");
- for (Iterator iterConditions = conditionMap.entrySet().iterator();iterConditions.hasNext();)
- {
- Map.Entry entry = (Map.Entry)iterConditions.next();
- out.println("<tr><td>" + entry.getKey() + "</td><td>" + entry.getValue() + "</td></tr>");
-
- if (entry.getValue() instanceof Bindings)
- {
- out.println("<tr><td>Binding Information:</td><td>");
- out.println("<table border=1>");
- out.println("<tr><td>Binding</td><td>Queue</td></tr>");
- Bindings bindings = (Bindings)entry.getValue();
- for (Iterator iterBindings = bindings.getAllBindings().iterator();iterBindings.hasNext();)
- {
-
- Binding binding = (Binding)iterBindings.next();
- out.println("<tr><td>" + binding + "</td><td>" + binding.getQueue() + "</td></tr>");
- }
- out.println("</table></td></tr>");
- }
- }
- out.println("</table>");
-
-
- return buffer.toString();
-
-
- }
-
protected void addBinding(Binding binding)
{
addToNameMap(binding);
addToConditionMap(binding);
- }
-
+ }
+
protected Binding removeBinding(int nodeId, String queueName)
{
Binding binding = removeFromNameMap(nodeId, queueName);
-
+
removeFromConditionMap(binding);
-
+
return binding;
}
-
+
protected void addToNameMap(Binding binding)
{
Integer nodeID = new Integer(binding.getNodeID());
@@ -796,84 +797,84 @@
if (trace) { log.trace(this + " added " + binding + " to name map"); }
}
-
+
protected void addToConditionMap(Binding binding)
{
Condition condition = binding.getCondition();
-
+
Bindings bindings = (Bindings)conditionMap.get(condition);
-
+
if (bindings == null)
{
bindings = new DefaultBindings();
-
+
conditionMap.put(condition, bindings);
}
-
+
bindings.addBinding(binding);
if (trace) { log.trace(this + " added " + binding + " to condition map"); }
}
-
+
protected Binding removeFromNameMap(int nodeId, String queueName)
{
if (queueName == null)
{
throw new IllegalArgumentException("Queue name is null");
}
-
+
Map nameMap = (Map)nameMaps.get(new Integer(nodeId));
-
+
if (nameMap == null)
{
throw new IllegalArgumentException("Cannot find any bindings for node Id: " + nodeId);
}
-
- Binding binding = null;
-
+
+ Binding binding = null;
+
if (nameMap != null)
{
binding = (Binding)nameMap.remove(queueName);
}
-
+
if (binding == null)
{
throw new IllegalArgumentException("Name map does not contain binding for " + queueName);
}
-
+
if (nameMap.isEmpty())
{
nameMaps.remove(new Integer(nodeId));
}
-
+
return binding;
}
-
+
protected void removeFromConditionMap(Binding binding)
{
Bindings bindings = (Bindings)conditionMap.get(binding.getCondition());
-
+
if (bindings == null)
{
throw new IllegalStateException("Cannot find condition bindings for " +
binding.getCondition());
}
-
+
boolean removed = bindings.removeBinding(binding);
-
+
if (!removed)
{
throw new IllegalStateException("Cannot find binding in condition binding list");
- }
-
+ }
+
if (bindings.isEmpty())
{
conditionMap.remove(binding.getCondition());
- }
- }
+ }
+ }
protected Map getDefaultDMLStatements()
- {
+ {
Map map = new LinkedHashMap();
map.put("INSERT_BINDING",
@@ -904,7 +905,7 @@
return map;
}
-
+
protected Map getDefaultDDLStatements()
{
Map map = new LinkedHashMap();
@@ -915,8 +916,12 @@
"FAILED_NODE_ID INTEGER)");
return map;
}
-
- // Private -------------------------------------------------------
+
+ // Private --------------------------------------------------------------------------------------
+
+ // Inner classes --------------------------------------------------------------------------------
+
+ // Private -------------------------------------------------------
// Inner classes -------------------------------------------------
Modified: trunk/tests/src/org/jboss/test/messaging/jms/crash/CallbackFailureTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/crash/CallbackFailureTest.java 2007-01-25 00:05:49 UTC (rev 2044)
+++ trunk/tests/src/org/jboss/test/messaging/jms/crash/CallbackFailureTest.java 2007-01-25 01:55:37 UTC (rev 2045)
@@ -33,9 +33,6 @@
import org.jboss.test.messaging.tools.jndi.InVMInitialContextFactory;
/**
- *
- * A CallbackFailureTest.
- *
* @author <a href="tim.fox at jboss.com">Tim Fox</a>
* @version <tt>$Revision$</tt>
*
@@ -43,39 +40,38 @@
*/
public class CallbackFailureTest extends MessagingTestCase
{
- // Constants -----------------------------------------------------
+ // Constants ------------------------------------------------------------------------------------
- // Static --------------------------------------------------------
+ // Static ---------------------------------------------------------------------------------------
- // Attributes ----------------------------------------------------
+ // Attributes -----------------------------------------------------------------------------------
protected Server localServer;
-
protected Server remoteServer;
- // Constructors --------------------------------------------------
+ // Constructors ---------------------------------------------------------------------------------
public CallbackFailureTest(String name)
{
super(name);
}
- // Public --------------------------------------------------------
+ // Public ---------------------------------------------------------------------------------------
public void setUp() throws Exception
{
super.setUp();
- //Start the local server
+ // Start the local server
localServer = new LocalTestServer();
- //Start all the services locally
+ // Start all the services locally
localServer.start("all", true);
localServer.deployQueue("Queue", null, false);
- //Connect to the remote server, but don't start a servicecontainer on it
- //We are only using the remote server to open a client connection to the local server
+ // Connect to the remote server, but don't start a servicecontainer on it. We are only using
+ // the remote server to open a client connection to the local server.
ServerManagement.create();
remoteServer = ServerManagement.getServer();
@@ -87,21 +83,26 @@
}
/*
- * Test that when a client callback fails, server side resources for connections are cleaned-up
+ * Test that when a client callback fails, server side resources for connections are cleaned-up.
*/
public void testCallbackFailure() throws Exception
{
- if (!ServerManagement.isRemote()) return;
+ if (!ServerManagement.isRemote())
+ {
+ fail("this test should be run in a remote configuration");
+ }
+
+ // we need to disable exception listener otherwise it will clear up the connection itself
- //We need to disable exception listener otherwise it will clear up the connection itself
+ ObjectName remoteConnectorName = ServiceContainer.REMOTING_OBJECT_NAME;
- ObjectName connectorName = ServiceContainer.REMOTING_OBJECT_NAME;
-
ConnectionManager cm = localServer.getServerPeer().getConnectionManager();
- localServer.getServerPeer().getServer().invoke(connectorName, "removeConnectionListener",
- new Object[] {cm},
- new String[] {"org.jboss.remoting.ConnectionListener"});
+ localServer.getServerPeer().getServer().
+ invoke(remoteConnectorName,
+ "removeConnectionListener",
+ new Object[] {cm},
+ new String[] {"org.jboss.remoting.ConnectionListener"});
InitialContext ic = new InitialContext(InVMInitialContextFactory.getJNDIEnvironment());
@@ -115,46 +116,47 @@
remoteServer.kill();
- //we have removed the exception listener so the server side resouces shouldn't be cleared up
-
- Thread.sleep(20000);
+ // we have removed the exception listener so the server side resouces shouldn't be cleared up
+
+ log.info("sleeping for 1 min ...");
+ Thread.sleep(10);
- assertTrue(cm.containsSession(remotingSessionId));
+ assertTrue(cm.containsRemotingSession(remotingSessionId));
- //Now we send a message which should prompt delivery to the dead consumer causing
- //an exception which should cause connection cleanup
+ // Now we send a message which should prompt delivery to the dead consumer causing
+ // an exception which should cause connection cleanup
Connection conn = cf.createConnection();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer prod = sess.createProducer(queue);
-
+
+ // sending just one message should be enough to trigger the failure and client smacking
prod.send(sess.createMessage());
- prod.send(sess.createMessage());
- prod.send(sess.createMessage());
- prod.send(sess.createMessage());
- prod.send(sess.createMessage());
- prod.send(sess.createMessage());
- prod.send(sess.createMessage());
- prod.send(sess.createMessage());
- prod.send(sess.createMessage());
- prod.send(sess.createMessage());
- prod.send(sess.createMessage());
+
+ log.info("sleeping for 45 secs ...");
+ Thread.sleep(10);
- Thread.sleep(45000);
-
- assertFalse(cm.containsSession(remotingSessionId));
+ assertFalse(cm.containsRemotingSession(remotingSessionId));
+
+ // make sure the message is still in queue
+
+ MessageConsumer cons = sess.createConsumer(queue);
+ Message m = cons.receive(1000);
+ assertNotNull(m);
+
+
+ cons.close();
}
- // Package protected ---------------------------------------------
+ // Package protected ----------------------------------------------------------------------------
- // Protected -----------------------------------------------------
+ // Protected ------------------------------------------------------------------------------------
- // Private -------------------------------------------------------
+ // Private --------------------------------------------------------------------------------------
-
- // Inner classes -------------------------------------------------
+ // Inner classes --------------------------------------------------------------------------------
}
Modified: trunk/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashLargeLeaseTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashLargeLeaseTest.java 2007-01-25 00:05:49 UTC (rev 2044)
+++ trunk/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashLargeLeaseTest.java 2007-01-25 01:55:37 UTC (rev 2045)
@@ -111,7 +111,7 @@
ConnectionManager cm = localServer.getServerPeer().getConnectionManager();
- assertTrue(cm.containsSession(remotingSessionId));
+ assertTrue(cm.containsRemotingSession(remotingSessionId));
// Now we should have a client connection from the remote server to the local server
@@ -124,7 +124,7 @@
// See if we still have a connection with this id
//Connection state shouldn't have been cleared up by now
- assertTrue(cm.containsSession(remotingSessionId));
+ assertTrue(cm.containsRemotingSession(remotingSessionId));
}
Modified: trunk/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashNegativeLeaseTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashNegativeLeaseTest.java 2007-01-25 00:05:49 UTC (rev 2044)
+++ trunk/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashNegativeLeaseTest.java 2007-01-25 01:55:37 UTC (rev 2045)
@@ -113,7 +113,7 @@
ConnectionManager cm = localServer.getServerPeer().getConnectionManager();
- assertTrue(cm.containsSession(remotingSessionId));
+ assertTrue(cm.containsRemotingSession(remotingSessionId));
// Now we should have a client connection from the remote server to the local server
@@ -124,7 +124,7 @@
Thread.sleep(15000);
// See if we still have a connection with this id
- assertTrue(cm.containsSession(remotingSessionId));
+ assertTrue(cm.containsRemotingSession(remotingSessionId));
}
Modified: trunk/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashTest.java 2007-01-25 00:05:49 UTC (rev 2044)
+++ trunk/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashTest.java 2007-01-25 01:55:37 UTC (rev 2045)
@@ -115,7 +115,7 @@
ConnectionManager cm = localServer.getServerPeer().getConnectionManager();
- assertTrue(cm.containsSession(remotingSessionId));
+ assertTrue(cm.containsRemotingSession(remotingSessionId));
// Now we should have a client connection from the remote server to the local server
@@ -126,7 +126,7 @@
Thread.sleep(15000);
// See if we still have a connection with this id
- assertFalse(cm.containsSession(remotingSessionId));
+ assertFalse(cm.containsRemotingSession(remotingSessionId));
}
// Package protected ---------------------------------------------
Modified: trunk/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashTwoConnectionsTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashTwoConnectionsTest.java 2007-01-25 00:05:49 UTC (rev 2044)
+++ trunk/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashTwoConnectionsTest.java 2007-01-25 01:55:37 UTC (rev 2045)
@@ -120,8 +120,8 @@
log.info("server(1) = " + remotingSessionId[1]);
log.info("we have = " + ((SimpleConnectionManager)cm).getClients().size() + " clients registered on SimpleconnectionManager");
- assertFalse(cm.containsSession(remotingSessionId[0]));
- assertTrue(cm.containsSession(remotingSessionId[1]));
+ assertFalse(cm.containsRemotingSession(remotingSessionId[0]));
+ assertTrue(cm.containsRemotingSession(remotingSessionId[1]));
// Now we should have a client connection from the remote server to the local server
remoteServer.kill();
@@ -133,8 +133,8 @@
// See if we still have a connection with this id
//Connection state should have been cleared up by now
- assertFalse(cm.containsSession(remotingSessionId[0]));
- assertFalse(cm.containsSession(remotingSessionId[1]));
+ assertFalse(cm.containsRemotingSession(remotingSessionId[0]));
+ assertFalse(cm.containsRemotingSession(remotingSessionId[1]));
log.info("Servers = " + ((SimpleConnectionManager)cm).getClients().size());
Modified: trunk/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashZeroLeaseTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashZeroLeaseTest.java 2007-01-25 00:05:49 UTC (rev 2044)
+++ trunk/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashZeroLeaseTest.java 2007-01-25 01:55:37 UTC (rev 2045)
@@ -113,7 +113,7 @@
ConnectionManager cm = localServer.getServerPeer().getConnectionManager();
- assertTrue(cm.containsSession(remotingSessionId));
+ assertTrue(cm.containsRemotingSession(remotingSessionId));
// Now we should have a client connection from the remote server to the local server
@@ -124,7 +124,7 @@
Thread.sleep(15000);
// See if we still have a connection with this id
- assertTrue(cm.containsSession(remotingSessionId));
+ assertTrue(cm.containsRemotingSession(remotingSessionId));
}
Modified: trunk/tests/src/org/jboss/test/messaging/jms/crash/CreateClientOnServerCommand.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/crash/CreateClientOnServerCommand.java 2007-01-25 00:05:49 UTC (rev 2044)
+++ trunk/tests/src/org/jboss/test/messaging/jms/crash/CreateClientOnServerCommand.java 2007-01-25 01:55:37 UTC (rev 2045)
@@ -91,7 +91,7 @@
//Leave the connection unclosed
//Return the remoting client session id for the connection
- return ((JBossConnection)conn).getRemotingClientSessionId();
+ return ((JBossConnection)conn).getRemotingClientSessionID();
}
}
Modified: trunk/tests/src/org/jboss/test/messaging/jms/crash/CreateHangingConsumerCommand.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/crash/CreateHangingConsumerCommand.java 2007-01-25 00:05:49 UTC (rev 2044)
+++ trunk/tests/src/org/jboss/test/messaging/jms/crash/CreateHangingConsumerCommand.java 2007-01-25 01:55:37 UTC (rev 2045)
@@ -33,9 +33,6 @@
import org.jboss.test.messaging.tools.jmx.rmi.Command;
/**
- *
- * A CreateHangingConsumerCommand.
- *
* @author <a href="tim.fox at jboss.com">Tim Fox</a>
* @version <tt>$Revision$</tt>
*
@@ -46,15 +43,11 @@
private static final long serialVersionUID = -997724797145152821L;
private ConnectionFactory cf;
-
private Queue queue;
-
- private static MessageConsumer consumer;
-
+
public CreateHangingConsumerCommand(ConnectionFactory cf, Queue queue)
{
this.cf = cf;
-
this.queue = queue;
}
@@ -66,21 +59,20 @@
conn.start();
- consumer = sess.createConsumer(queue);
+ MessageConsumer cons = sess.createConsumer(queue);
+
+ cons.setMessageListener(new Listener());
- consumer.setMessageListener(new Listener());
+ // leave the connection unclosed
- //Leave the connection unclosed
-
- //Return the remoting client session id for the connection
- return ((JBossConnection)conn).getRemotingClientSessionId();
+ // return the remoting client session id for the connection
+ return ((JBossConnection)conn).getRemotingClientSessionID();
}
class Listener implements MessageListener
{
public void onMessage(Message m)
{
-
}
}
Modified: trunk/tests/src/org/jboss/test/messaging/jms/crash/CreateTwoClientOnServerCommand.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/crash/CreateTwoClientOnServerCommand.java 2007-01-25 00:05:49 UTC (rev 2044)
+++ trunk/tests/src/org/jboss/test/messaging/jms/crash/CreateTwoClientOnServerCommand.java 2007-01-25 01:55:37 UTC (rev 2045)
@@ -85,8 +85,8 @@
conn2.start();
String arrays[] = new String[2];
- arrays[0] = ((JBossConnection)conn1).getRemotingClientSessionId();
- arrays[1] = ((JBossConnection)conn2).getRemotingClientSessionId();
+ arrays[0] = ((JBossConnection)conn1).getRemotingClientSessionID();
+ arrays[1] = ((JBossConnection)conn2).getRemotingClientSessionID();
conn1.close();
Modified: trunk/tests/src/org/jboss/test/messaging/jms/server/connectionmanager/SimpleConnectionManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/server/connectionmanager/SimpleConnectionManagerTest.java 2007-01-25 00:05:49 UTC (rev 2044)
+++ trunk/tests/src/org/jboss/test/messaging/jms/server/connectionmanager/SimpleConnectionManagerTest.java 2007-01-25 01:55:37 UTC (rev 2045)
@@ -201,12 +201,12 @@
cm.registerConnection("jvm3", "sessionid5", e5);
cm.registerConnection("jvm3", "sessionid6", e6);
- assertTrue(cm.containsSession("sessionid1"));
- assertTrue(cm.containsSession("sessionid2"));
- assertTrue(cm.containsSession("sessionid3"));
- assertTrue(cm.containsSession("sessionid4"));
- assertTrue(cm.containsSession("sessionid5"));
- assertTrue(cm.containsSession("sessionid6"));
+ assertTrue(cm.containsRemotingSession("sessionid1"));
+ assertTrue(cm.containsRemotingSession("sessionid2"));
+ assertTrue(cm.containsRemotingSession("sessionid3"));
+ assertTrue(cm.containsRemotingSession("sessionid4"));
+ assertTrue(cm.containsRemotingSession("sessionid5"));
+ assertTrue(cm.containsRemotingSession("sessionid6"));
ConnectionEndpoint r1 = cm.unregisterConnection("jvm3", "sessionid6");
assertEquals(e6, r1);
@@ -214,21 +214,21 @@
assertNull(cm.unregisterConnection("blah", "blah"));
- assertFalse(cm.containsSession("sessionid6"));
+ assertFalse(cm.containsRemotingSession("sessionid6"));
ConnectionEndpoint r2 = cm.unregisterConnection("jvm3", "sessionid5");
assertEquals(e5, r2);
assertFalse(e5.isClosed());
- assertFalse(cm.containsSession("sessionid5"));
+ assertFalse(cm.containsRemotingSession("sessionid5"));
cm.handleClientFailure("sessionid4");
assertNull(cm.unregisterConnection("jvm2", "sessionid4"));
assertNull(cm.unregisterConnection("jvm2", "sessionid3"));
- assertFalse(cm.containsSession("sessionid4"));
- assertFalse(cm.containsSession("sessionid3"));
+ assertFalse(cm.containsRemotingSession("sessionid4"));
+ assertFalse(cm.containsRemotingSession("sessionid3"));
assertTrue(e3.isClosed());
assertTrue(e4.isClosed());
@@ -241,8 +241,8 @@
assertEquals(e2, r4);
assertFalse(e2.isClosed());
- assertFalse(cm.containsSession("sessionid2"));
- assertFalse(cm.containsSession("sessionid1"));
+ assertFalse(cm.containsRemotingSession("sessionid2"));
+ assertFalse(cm.containsRemotingSession("sessionid1"));
}
Modified: trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java 2007-01-25 00:05:49 UTC (rev 2044)
+++ trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java 2007-01-25 01:55:37 UTC (rev 2045)
@@ -1224,7 +1224,8 @@
"callbackStore=org.jboss.remoting.callback.BlockingCallbackStore&" +
"clientSocketClass=org.jboss.jms.client.remoting.ClientSocketWrapper&" +
"serverSocketClass=org.jboss.jms.server.remoting.ServerSocketWrapper&" +
- "NumberOfRetries=1&NumberOfCallRetries=1";
+ "NumberOfRetries=1&" +
+ "NumberOfCallRetries=1;"
// specific parameters per transport
More information about the jboss-cvs-commits
mailing list