[jboss-cvs] JBoss Messaging SVN: r3390 - in branches/Branch_Stable: src/etc/server/default/deploy and 12 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Nov 30 12:55:48 EST 2007


Author: timfox
Date: 2007-11-30 12:55:48 -0500 (Fri, 30 Nov 2007)
New Revision: 3390

Removed:
   branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/SessionInternalEndpoint.java
   branches/Branch_Stable/src/main/org/jboss/messaging/util/Reorderer.java
   branches/Branch_Stable/tests/src/org/jboss/test/messaging/util/ReordererTest.java
Modified:
   branches/Branch_Stable/src/etc/prepare-aop.xml
   branches/Branch_Stable/src/etc/server/default/deploy/mssql-persistence-service.xml
   branches/Branch_Stable/src/etc/server/default/deploy/mysql-persistence-service.xml
   branches/Branch_Stable/src/etc/server/default/deploy/oracle-persistence-service.xml
   branches/Branch_Stable/src/etc/server/default/deploy/postgresql-persistence-service.xml
   branches/Branch_Stable/src/etc/server/default/deploy/sybase-persistence-service.xml
   branches/Branch_Stable/src/main/org/jboss/jms/client/container/ClientConsumer.java
   branches/Branch_Stable/src/main/org/jboss/jms/client/container/ConsumerAspect.java
   branches/Branch_Stable/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
   branches/Branch_Stable/src/main/org/jboss/jms/client/delegate/DelegateSupport.java
   branches/Branch_Stable/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
   branches/Branch_Stable/src/main/org/jboss/jms/client/state/SessionState.java
   branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
   branches/Branch_Stable/src/main/org/jboss/jms/wireformat/ConsumerChangeRateRequest.java
   branches/Branch_Stable/src/main/org/jboss/jms/wireformat/SessionSendRequest.java
   branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
   branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/JMSTest.java
   branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
   branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/container/RemotingJMXWrapper.java
Log:
Mainly related to JBREM-845 and a few other bits and pieces


Modified: branches/Branch_Stable/src/etc/prepare-aop.xml
===================================================================
--- branches/Branch_Stable/src/etc/prepare-aop.xml	2007-11-30 17:46:34 UTC (rev 3389)
+++ branches/Branch_Stable/src/etc/prepare-aop.xml	2007-11-30 17:55:48 UTC (rev 3390)
@@ -18,7 +18,7 @@
   
   <prepare expr="execution(* org.jboss.jms.server.endpoint.advised.ConnectionFactoryAdvised->$implementing{org.jboss.jms.server.endpoint.ConnectionFactoryInternalEndpoint}(..))"/>
   <prepare expr="execution(* org.jboss.jms.server.endpoint.advised.ConnectionAdvised->$implementing{org.jboss.jms.delegate.ConnectionEndpoint}(..))"/>
-  <prepare expr="execution(* org.jboss.jms.server.endpoint.advised.SessionAdvised->$implementing{org.jboss.jms.server.endpoint.SessionInternalEndpoint}(..))"/> 
+  <prepare expr="execution(* org.jboss.jms.server.endpoint.advised.SessionAdvised->$implementing{org.jboss.jms.delegate.SessionEndpoint}(..))"/> 
   <prepare expr="execution(* org.jboss.jms.server.endpoint.advised.ConsumerAdvised->$implementing{org.jboss.jms.delegate.ConsumerEndpoint}(..))"/>
   <prepare expr="execution(* org.jboss.jms.server.endpoint.advised.BrowserAdvised->$implementing{org.jboss.jms.delegate.BrowserEndpoint}(..))"/>
 

Modified: branches/Branch_Stable/src/etc/server/default/deploy/mssql-persistence-service.xml
===================================================================
--- branches/Branch_Stable/src/etc/server/default/deploy/mssql-persistence-service.xml	2007-11-30 17:46:34 UTC (rev 3389)
+++ branches/Branch_Stable/src/etc/server/default/deploy/mssql-persistence-service.xml	2007-11-30 17:55:48 UTC (rev 3390)
@@ -69,7 +69,7 @@
    UPDATE_MESSAGE_4CONDITIONAL=UPDATE JBM_MSG SET HEADERS=?, PAYLOAD=? WHERE MESSAGE_ID=?
    INSERT_MESSAGE_CONDITIONAL_FULL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, HEADERS, PAYLOAD) SELECT ?, ?, ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)   
    MESSAGE_ID_COLUMN=MESSAGE_ID
-   DELETE_MESSAGE=DELETE FROM JBM_MSG WHERE MESSAGE_ID = ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = ?)   
+   DELETE_MESSAGE=DELETE FROM JBM_MSG WHERE MESSAGE_ID = ? AND NOT EXISTS (SELECT JBM_MSG_REF.MESSAGE_ID FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = ?)   
    INSERT_TRANSACTION=INSERT INTO JBM_TX (NODE_ID, TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID) VALUES(?, ?, ?, ?, ?)
    DELETE_TRANSACTION=DELETE FROM JBM_TX WHERE NODE_ID = ? AND TRANSACTION_ID = ?
    SELECT_PREPARED_TRANSACTIONS=SELECT TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID FROM JBM_TX WHERE NODE_ID = ?

Modified: branches/Branch_Stable/src/etc/server/default/deploy/mysql-persistence-service.xml
===================================================================
--- branches/Branch_Stable/src/etc/server/default/deploy/mysql-persistence-service.xml	2007-11-30 17:46:34 UTC (rev 3389)
+++ branches/Branch_Stable/src/etc/server/default/deploy/mysql-persistence-service.xml	2007-11-30 17:55:48 UTC (rev 3390)
@@ -69,7 +69,7 @@
    UPDATE_MESSAGE_4CONDITIONAL=UPDATE JBM_MSG SET HEADERS=?, PAYLOAD=? WHERE MESSAGE_ID=?
    INSERT_MESSAGE_CONDITIONAL_FULL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, HEADERS, PAYLOAD) SELECT ?, ?, ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)   
    MESSAGE_ID_COLUMN=MESSAGE_ID   
-   DELETE_MESSAGE=DELETE FROM JBM_MSG WHERE MESSAGE_ID = ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = ?)      
+   DELETE_MESSAGE=DELETE FROM JBM_MSG WHERE MESSAGE_ID = ? AND NOT EXISTS (SELECT JBM_MSG_REF.MESSAGE_ID FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = ?)      
    INSERT_TRANSACTION=INSERT INTO JBM_TX (NODE_ID, TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID) VALUES(?, ?, ?, ?, ?)
    DELETE_TRANSACTION=DELETE FROM JBM_TX WHERE NODE_ID = ? AND TRANSACTION_ID = ?
    SELECT_PREPARED_TRANSACTIONS=SELECT TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID FROM JBM_TX WHERE NODE_ID = ?

Modified: branches/Branch_Stable/src/etc/server/default/deploy/oracle-persistence-service.xml
===================================================================
--- branches/Branch_Stable/src/etc/server/default/deploy/oracle-persistence-service.xml	2007-11-30 17:46:34 UTC (rev 3389)
+++ branches/Branch_Stable/src/etc/server/default/deploy/oracle-persistence-service.xml	2007-11-30 17:55:48 UTC (rev 3390)
@@ -73,7 +73,7 @@
    UPDATE_MESSAGE_4CONDITIONAL=UPDATE JBM_MSG SET HEADERS=?, PAYLOAD=? WHERE MESSAGE_ID=?
    INSERT_MESSAGE_CONDITIONAL_FULL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, HEADERS, PAYLOAD) SELECT ?, ?, ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)   
    MESSAGE_ID_COLUMN=MESSAGE_ID
-   DELETE_MESSAGE=DELETE FROM JBM_MSG WHERE MESSAGE_ID = ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = ?)   
+   DELETE_MESSAGE=DELETE FROM JBM_MSG WHERE MESSAGE_ID = ? AND NOT EXISTS (SELECT JBM_MSG_REF.MESSAGE_ID FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = ?)   
    INSERT_TRANSACTION=INSERT INTO JBM_TX (NODE_ID, TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID) VALUES(?, ?, ?, ?, ?)
    DELETE_TRANSACTION=DELETE FROM JBM_TX WHERE NODE_ID = ? AND TRANSACTION_ID = ?
    SELECT_PREPARED_TRANSACTIONS=SELECT TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID FROM JBM_TX WHERE NODE_ID = ?

Modified: branches/Branch_Stable/src/etc/server/default/deploy/postgresql-persistence-service.xml
===================================================================
--- branches/Branch_Stable/src/etc/server/default/deploy/postgresql-persistence-service.xml	2007-11-30 17:46:34 UTC (rev 3389)
+++ branches/Branch_Stable/src/etc/server/default/deploy/postgresql-persistence-service.xml	2007-11-30 17:55:48 UTC (rev 3390)
@@ -69,7 +69,7 @@
    UPDATE_MESSAGE_4CONDITIONAL=UPDATE JBM_MSG SET HEADERS=?, PAYLOAD=? WHERE MESSAGE_ID=?
    INSERT_MESSAGE_CONDITIONAL_FULL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, HEADERS, PAYLOAD) SELECT ?, ?, ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)   
    MESSAGE_ID_COLUMN=MESSAGE_ID
-   DELETE_MESSAGE=DELETE FROM JBM_MSG WHERE MESSAGE_ID = ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = ?)   
+   DELETE_MESSAGE=DELETE FROM JBM_MSG WHERE MESSAGE_ID = ? AND NOT EXISTS (SELECT JBM_MSG_REF.MESSAGE_ID FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = ?)   
    INSERT_TRANSACTION=INSERT INTO JBM_TX (NODE_ID, TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID) VALUES(?, ?, ?, ?, ?)
    DELETE_TRANSACTION=DELETE FROM JBM_TX WHERE NODE_ID = ? AND TRANSACTION_ID = ?
    SELECT_PREPARED_TRANSACTIONS=SELECT TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID FROM JBM_TX WHERE NODE_ID = ?

Modified: branches/Branch_Stable/src/etc/server/default/deploy/sybase-persistence-service.xml
===================================================================
--- branches/Branch_Stable/src/etc/server/default/deploy/sybase-persistence-service.xml	2007-11-30 17:46:34 UTC (rev 3389)
+++ branches/Branch_Stable/src/etc/server/default/deploy/sybase-persistence-service.xml	2007-11-30 17:55:48 UTC (rev 3390)
@@ -74,7 +74,7 @@
    UPDATE_MESSAGE_4CONDITIONAL=UPDATE JBM_MSG SET HEADERS=?, PAYLOAD=? WHERE MESSAGE_ID=?
    INSERT_MESSAGE_CONDITIONAL_FULL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, HEADERS, PAYLOAD) SELECT ?, ?, ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)   
    MESSAGE_ID_COLUMN=MESSAGE_ID   
-   DELETE_MESSAGE=DELETE FROM JBM_MSG WHERE MESSAGE_ID = ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = ?)   
+   DELETE_MESSAGE=DELETE FROM JBM_MSG WHERE MESSAGE_ID = ? AND NOT EXISTS (SELECT JBM_MSG_REF.MESSAGE_ID FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = ?)   
    INSERT_TRANSACTION=INSERT INTO JBM_TX (NODE_ID, TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID) VALUES(?, ?, ?, ?, ?)
    DELETE_TRANSACTION=DELETE FROM JBM_TX WHERE NODE_ID = ? AND TRANSACTION_ID = ?
    SELECT_PREPARED_TRANSACTIONS=SELECT TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID FROM JBM_TX WHERE NODE_ID = ?

Modified: branches/Branch_Stable/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/client/container/ClientConsumer.java	2007-11-30 17:46:34 UTC (rev 3389)
+++ branches/Branch_Stable/src/main/org/jboss/jms/client/container/ClientConsumer.java	2007-11-30 17:55:48 UTC (rev 3390)
@@ -41,7 +41,6 @@
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.contract.Message;
 import org.jboss.messaging.util.Future;
-import org.jboss.messaging.util.Reorderer;
 import org.jboss.messaging.util.prioritylinkedlist.BasicPriorityLinkedList;
 import org.jboss.messaging.util.prioritylinkedlist.PriorityLinkedList;
 
@@ -219,8 +218,7 @@
    private boolean handleFlowControl;
    private long redeliveryDelay;
    private volatile int currentToken;
-   private Reorderer reorderer;
-     
+
    // Constructors ---------------------------------------------------------------------------------
 
    public ClientConsumer(boolean isCC, int ackMode,                                
@@ -228,7 +226,7 @@
                          String queueName,
                          int bufferSize, QueuedExecutor sessionExecutor,
                          int maxDeliveries, boolean shouldAck, boolean handleFlowControl,
-                         long redeliveryDelay, Reorderer reorderer)
+                         long redeliveryDelay)
    {
       if (bufferSize < 1)
       {
@@ -250,14 +248,10 @@
       this.shouldAck = shouldAck;
       this.handleFlowControl = handleFlowControl;
       this.redeliveryDelay = redeliveryDelay;
-      this.reorderer = reorderer;
    }
         
    // Public ---------------------------------------------------------------------------------------
 
-
-   
-   
    public boolean isClosed()
    {
       return closed;
@@ -272,11 +266,6 @@
    {
       ClientDelivery del = (ClientDelivery)message;
       
-      //We need to make sure the deliveries are in the right order - 
-      //Due to the way remoting pooling works we cannot guarantee invocations will use the same TCP connection
-      //Therefore they can be delivered out of order.
-      //We therefore need to re-order them on arrival
-       
       Message msg = del.getMessage();
       
       MessageProxy proxy = JBossMessage.
@@ -287,7 +276,7 @@
       //       a message delivery back to the same client which tries to ack but can't get through
       //       the valve. This won't be necessary when we move to a non blocking transport
       
-      reorderer.handle(new HandleMessageRunnable(currentToken, proxy), del.getDeliveryId());
+      sessionExecutor.execute(new HandleMessageRunnable(currentToken, proxy));
    }
    
    public void setMessageListener(MessageListener listener) throws JMSException

Modified: branches/Branch_Stable/src/main/org/jboss/jms/client/container/ConsumerAspect.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/client/container/ConsumerAspect.java	2007-11-30 17:46:34 UTC (rev 3389)
+++ branches/Branch_Stable/src/main/org/jboss/jms/client/container/ConsumerAspect.java	2007-11-30 17:55:48 UTC (rev 3390)
@@ -36,7 +36,6 @@
 import org.jboss.jms.exception.MessagingShutdownException;
 import org.jboss.logging.Logger;
 import org.jboss.messaging.util.MessageQueueNameHelper;
-import org.jboss.messaging.util.Reorderer;
 
 import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
 
@@ -84,7 +83,6 @@
       String consumerID = consumerState.getConsumerID();
       int prefetchSize = consumerState.getBufferSize();
       QueuedExecutor sessionExecutor = sessionState.getExecutor();
-      Reorderer reorderer = sessionState.getReorderer();
       int maxDeliveries = consumerState.getMaxDeliveries();
       long redeliveryDelay = consumerState.getRedeliveryDelay();
       
@@ -112,7 +110,7 @@
          new ClientConsumer(isCC, sessionState.getAcknowledgeMode(),
                             sessionDelegate, consumerDelegate, consumerID, queueName,
                             prefetchSize, sessionExecutor, maxDeliveries, consumerState.isShouldAck(),
-                            autoFlowControl, redeliveryDelay, reorderer);
+                            autoFlowControl, redeliveryDelay);
       
       sessionState.addCallbackHandler(messageHandler);
       

Modified: branches/Branch_Stable/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java	2007-11-30 17:46:34 UTC (rev 3389)
+++ branches/Branch_Stable/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java	2007-11-30 17:55:48 UTC (rev 3390)
@@ -129,6 +129,8 @@
 
       client = conn.getRemotingClient();
       
+      onewayClient = conn.getOnewayClient();
+      
       strictTck = conn.isStrictTck();
    }
 
@@ -140,6 +142,8 @@
 
       client = conn.getRemotingClient();
       
+      onewayClient = conn.getOnewayClient();
+      
       strictTck = conn.isStrictTck();
    }
 
@@ -451,31 +455,18 @@
    }
    
    public void send(JBossMessage m, boolean checkForDuplicates) throws JMSException
-   {   	
-   	long seq;
-   	
-   	if (m.isReliable() || strictTck)
-   	{
-   		seq = -1;
-   	}
-   	else
-   	{
-   		SessionState sstate = (SessionState)state;
-   		
-   		seq = sstate.getNPSendSequence();
-   		
-   		sstate.incNpSendSequence();
-   	}
-   	
-      RequestSupport req = new SessionSendRequest(id, version, m, checkForDuplicates, seq);
+   {   		
+      boolean oneway = !(m.isReliable() || strictTck);
+      
+      RequestSupport req = new SessionSendRequest(id, version, m, checkForDuplicates, oneway);
 
-      if (seq == -1)
-      {      
-      	doInvoke(client, req);
+      if (oneway)
+      {
+         doInvokeOneway(onewayClient, req);
       }
       else
       {
-      	doInvokeOneway(client, req);
+         doInvoke(client, req);
       }
    }
 

Modified: branches/Branch_Stable/src/main/org/jboss/jms/client/delegate/DelegateSupport.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/client/delegate/DelegateSupport.java	2007-11-30 17:46:34 UTC (rev 3389)
+++ branches/Branch_Stable/src/main/org/jboss/jms/client/delegate/DelegateSupport.java	2007-11-30 17:55:48 UTC (rev 3390)
@@ -83,6 +83,8 @@
    protected transient byte version;
    
    protected transient Client client;
+   
+   protected transient Client onewayClient;
 
    // Static ---------------------------------------------------------------------------------------
 

Modified: branches/Branch_Stable/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java	2007-11-30 17:46:34 UTC (rev 3389)
+++ branches/Branch_Stable/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java	2007-11-30 17:55:48 UTC (rev 3390)
@@ -24,6 +24,7 @@
 import java.security.AccessController;
 import java.security.PrivilegedExceptionAction;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
 
 import org.jboss.jms.server.ServerPeer;
@@ -126,16 +127,9 @@
             metadata.put(Client.CALLBACK_SERVER_PORT, propertyPort);
          }
          
-         Map params = serverLocator.getParameters();
-         int maxPoolSize = 50;
-         if (params != null)
-         {
-         	String val = (String)params.get(MicroSocketClientInvoker.MAX_POOL_SIZE_FLAG);
-         	maxPoolSize = Integer.valueOf((String)val).intValue();
-         }
+         //Callback client pool MUST be size 1 so one way messages don't get out of order
          
-         //Use the same value for the callback server
-         metadata.put(MicroSocketClientInvoker.MAX_POOL_SIZE_FLAG, String.valueOf(maxPoolSize));
+         metadata.put(MicroSocketClientInvoker.MAX_POOL_SIZE_FLAG, "1");
          
          String protocol = serverLocator.getProtocol();
          if ("bisocket".equals(protocol) || "sslbisocket".equals(protocol))
@@ -246,6 +240,7 @@
    // Attributes -----------------------------------------------------------------------------------
 
    private Client client;
+   private Client onewayClient;
    private boolean clientPing;
    private InvokerLocator serverLocator;
    private CallbackManager callbackManager;
@@ -282,7 +277,35 @@
       client = new Client(serverLocator, config);
 
       client.setSubsystem(ServerPeer.REMOTING_JMS_SUBSYSTEM);
-
+      
+      config.putAll(serverLocator.getParameters());
+      config.put(Client.ENABLE_LEASE, "false");
+      config.put(MicroSocketClientInvoker.MAX_POOL_SIZE_FLAG, "1");
+      
+      //Hack - need to change the timeout to make sure a different pool is used
+      config.put("timeout", String.valueOf(Integer.MAX_VALUE));
+      
+      StringBuffer buff = new StringBuffer();
+      buff.append(serverLocator.getProtocol()).append("://");
+      buff.append(serverLocator.getHost()).append(":").append(serverLocator.getPort());
+      buff.append("/?");      
+      Iterator iter = config.entrySet().iterator();
+      while (iter.hasNext())
+      {
+         Map.Entry entry = (Map.Entry)iter.next();
+         String key = (String)entry.getKey();
+         String val = (String)entry.getValue();
+         buff.append(key).append("=").append(val);
+         if (iter.hasNext())
+         {
+            buff.append("&");
+         }
+      }
+ 
+      onewayClient = new Client(new InvokerLocator(buff.toString()), config);
+      
+      onewayClient.setSubsystem(ServerPeer.REMOTING_JMS_SUBSYSTEM);
+                       
       if (log.isTraceEnabled()) { log.trace(this + " created client"); }
 
       callbackManager = new CallbackManager();
@@ -293,6 +316,7 @@
          public Object run() throws Exception
          {
             client.connect();
+            onewayClient.connect();
             return null;
          }
       });
@@ -304,7 +328,10 @@
 
       client.setMarshaller(new JMSWireFormat());
       client.setUnMarshaller(new JMSWireFormat());
-
+      
+      onewayClient.setMarshaller(new JMSWireFormat());
+      onewayClient.setUnMarshaller(new JMSWireFormat());
+      
       Map metadata = new HashMap();
       
       metadata.put(InvokerLocator.DATATYPE, "jms");
@@ -345,9 +372,19 @@
       {      	
       	log.trace(this + " failed to disconnect the client", ignore);
       }
+      
+      try
+      {
+         onewayClient.disconnect();
+      }
+      catch (Throwable ignore)
+      {        
+         log.trace(this + " failed to disconnect the client", ignore);
+      }
+      
+      client = null;
+      onewayClient = null;
 
-      client = null;
-      
       log.trace(this + " closed");
    }
 
@@ -355,19 +392,23 @@
    {
       return client;
    }
-
+   
+   public Client getOnewayClient()
+   {
+      return onewayClient;
+   }
+   
    public CallbackManager getCallbackManager()
    {
       return callbackManager;
    }
 
-
    public boolean isStrictTck()
    {
        return strictTck;
    }
 
-    public synchronized boolean isFailed()
+   public synchronized boolean isFailed()
    {
       return failed;
    }
@@ -394,6 +435,15 @@
       	log.trace(this + " failed to set disconnect timeout", ignore);
       }
       
+      try
+      {
+         onewayClient.setDisconnectTimeout(0);
+      }
+      catch (Throwable ignore)
+      {        
+         log.trace(this + " failed to set disconnect timeout", ignore);
+      }
+       
       stop();
    }
 

Modified: branches/Branch_Stable/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/client/state/SessionState.java	2007-11-30 17:46:34 UTC (rev 3389)
+++ branches/Branch_Stable/src/main/org/jboss/jms/client/state/SessionState.java	2007-11-30 17:55:48 UTC (rev 3390)
@@ -47,7 +47,6 @@
 import org.jboss.jms.tx.ResourceManager;
 import org.jboss.logging.Logger;
 import org.jboss.messaging.util.ClearableQueuedExecutor;
-import org.jboss.messaging.util.Reorderer;
 import org.jboss.messaging.util.Version;
 
 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
@@ -112,8 +111,6 @@
    
    private long npSendSequence;
    
-   private Reorderer reorderer;
-   
    // Constructors ---------------------------------------------------------------------------------
 
    public SessionState(ConnectionState parent, ClientSessionDelegate delegate,
@@ -153,8 +150,6 @@
       // TODO could optimise this to use the same map of callbackmanagers (which holds refs
       // to callbackhandlers) in the connection, instead of maintaining another map
       callbackHandlers = new HashMap();
-      
-      this.reorderer = new ReceiveReorderer();
    }
 
    // HierarchicalState implementation -------------------------------------------------------------
@@ -339,7 +334,6 @@
       }
 
       List recoveryInfos = new ArrayList();
-      long maxDeliveryID = 0;
       if (!ackInfos.isEmpty())
       {         
          for (Iterator i = ackInfos.iterator(); i.hasNext(); )
@@ -351,7 +345,6 @@
                                     del.getQueueName());
 
             recoveryInfos.add(recInfo);        
-            maxDeliveryID = Math.max(maxDeliveryID, del.getMessageProxy().getDeliveryId());
          }         
       }
       
@@ -364,13 +357,7 @@
          //like remove from recovery Area refs corresponding to messages in client consumer buffers
          
       	newDelegate.recoverDeliveries(recoveryInfos, oldSessionID);
-      	
-      	reorderer.reset(maxDeliveryID + 1);
       }
-      else
-      {
-         reorderer.reset(maxDeliveryID);
-      }    
    }
    
    // Public ---------------------------------------------------------------------------------------
@@ -488,11 +475,6 @@
    	npSendSequence++;
    }
    
-   public Reorderer getReorderer()
-   {
-      return reorderer;
-   }
-   
    public String toString()
    {
       return "SessionState[" + sessionID + "]";
@@ -506,13 +488,5 @@
 
    // Inner classes --------------------------------------------------------------------------------
    
-   private class ReceiveReorderer extends Reorderer
-   {
-      public void execute(Object object) throws Exception
-      {         
-         executor.execute((Runnable)object);   
-      }
-   }
-
 }
 

Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-11-30 17:46:34 UTC (rev 3389)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-11-30 17:55:48 UTC (rev 3390)
@@ -86,15 +86,12 @@
 import org.jboss.messaging.util.ExceptionUtil;
 import org.jboss.messaging.util.GUIDGenerator;
 import org.jboss.messaging.util.MessageQueueNameHelper;
-import org.jboss.messaging.util.Reorderer;
-import org.jboss.remoting.Client;
 import org.jboss.remoting.callback.Callback;
 import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
 
 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
 import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
-import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong;
 
 /**
  * The server side representation of a JMS session.
@@ -174,8 +171,6 @@
    
    private LinkedQueue toDeliver = new LinkedQueue();
    
-   private Reorderer reorderer = new SendReorderer();
-      
    private boolean waitingToClose = false;
    
    // Constructors ---------------------------------------------------------------------------------
@@ -337,33 +332,14 @@
    {
       if (trace) log.trace(this + " closing");
       
-      // Wait for last np message to arrive
-      
-      if (sequence != 0)
-      {
-      	reorderer.waitToArrive(sequence); 	
-      }      
-      
       return -1;
    }
  
-   public void send(JBossMessage message, boolean checkForDuplicates) throws JMSException
+   public void send(JBossMessage message, final boolean checkForDuplicates) throws JMSException
    {
-   	throw new IllegalStateException("Should not be handled on the server");
-   }
-     
-   public void send(JBossMessage message, final boolean checkForDuplicates, long thisSequence) throws JMSException
-   {
       try
       {                
-      	if (thisSequence != -1)
-      	{
-      	   reorderer.handle(message, thisSequence);
-      	}
-      	else
-      	{
-      		connectionEndpoint.sendMessage(message, null, checkForDuplicates);
-      	}        
+         connectionEndpoint.sendMessage(message, null, checkForDuplicates);       
       }
       catch (Throwable t)
       {
@@ -590,6 +566,8 @@
          }
          
          deliveryIdSequence = maxDeliveryId + 1;
+         
+       //  log.info("*** set deliveryIdSequence to " + deliveryIdSequence);
       }
       catch (Throwable t)
       {
@@ -1269,14 +1247,14 @@
    //out of order! There maybe some better way of doing this 
    synchronized void handleDelivery(Delivery delivery, ServerConsumerEndpoint consumer) throws Exception
    {
-   	 long deliveryId = -1;
-   	 
    	 if (trace) { log.trace(this + " handling delivery " + delivery); }
    	 
    	 DeliveryRecord rec = null;
    	 
-   	 deliveryId = deliveryIdSequence++;
+   	 long deliveryId = deliveryIdSequence++;
    	 
+   //	 log.info(System.identityHashCode(this) + " Delivery id is now " + deliveryId);
+   	 
    	 if (trace) { log.trace("Delivery id is now " + deliveryId); }
    	 
    	 //TODO can't we combine flags isRetainDeliveries and isReplicating - surely they're mutually exclusive?
@@ -1395,25 +1373,19 @@
       // We send the message to the client on the current thread. The message is written onto the
       // transport and then the thread returns immediately without waiting for a response.
 
-      Client callbackClient = callbackHandler.getCallbackClient();
-
       ClientDelivery del = new ClientDelivery(ref.getMessage(), consumer.getID(), deliveryID, ref.getDeliveryCount());
 
       Callback callback = new Callback(del);
-
+      
       try
       {
-         //Note - even though remoting cannot guarantee ordering of one way invocations unless client pool size =1
-         //in which case performance would be crippled - so this is not an option.
-         //We use a reorderer on the client side to re-order deliveries that may have got out of order
-         
          // one way invocation, no acknowledgment sent back by the client
          if (trace) { log.trace(this + " submitting message " + ref.getMessage() + " to the remoting layer to be sent asynchronously"); }
-         
-         callbackHandler.handleCallbackOneway(callback);
+               
+         callbackHandler.handleCallbackOneway(callback);        
                                  
          //We store the delivery id so we know to wait for any deliveries in transit on close
-         consumer.setLastDeliveryID(deliveryID);        
+         consumer.setLastDeliveryID(deliveryID);  
       }
       catch (Throwable t)
       {
@@ -2335,11 +2307,4 @@
       }
    }
    
-   private class SendReorderer extends Reorderer
-   {
-      public void execute(Object object) throws Exception
-      {
-         connectionEndpoint.sendMessage((JBossMessage)object, null, false);
-      }
-   }
 }

Deleted: branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/SessionInternalEndpoint.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/SessionInternalEndpoint.java	2007-11-30 17:46:34 UTC (rev 3389)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/SessionInternalEndpoint.java	2007-11-30 17:55:48 UTC (rev 3390)
@@ -1,40 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.jms.server.endpoint;
-
-import javax.jms.JMSException;
-
-import org.jboss.jms.delegate.SessionEndpoint;
-import org.jboss.jms.message.JBossMessage;
-
-/**
- * 
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision: $</tt>5 Oct 2007
- *
- * $Id: $
- *
- */
-public interface SessionInternalEndpoint extends SessionEndpoint
-{
-	void send(JBossMessage msg, boolean checkForDuplicates, long seq) throws JMSException;   
-}

Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java	2007-11-30 17:46:34 UTC (rev 3389)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java	2007-11-30 17:55:48 UTC (rev 3390)
@@ -35,7 +35,6 @@
 import org.jboss.jms.destination.JBossTopic;
 import org.jboss.jms.message.JBossMessage;
 import org.jboss.jms.server.endpoint.ServerSessionEndpoint;
-import org.jboss.jms.server.endpoint.SessionInternalEndpoint;
 
 /**
  * The server-side advised instance corresponding to a Session. It is bound to the AOP
@@ -48,7 +47,7 @@
  *
  * $Id$
  */
-public class SessionAdvised extends AdvisedSupport implements SessionInternalEndpoint
+public class SessionAdvised extends AdvisedSupport implements SessionEndpoint
 {
    // Constants -----------------------------------------------------
 	
@@ -79,14 +78,9 @@
 
    public void send(JBossMessage msg, boolean checkForDuplicates) throws JMSException
    {
-      throw new IllegalStateException("Invocation should not be handle here");
+      ((ServerSessionEndpoint)endpoint).send(msg, checkForDuplicates);
    }
    
-   public void send(JBossMessage msg, boolean checkForDuplicates, long seq) throws JMSException
-   {
-      ((ServerSessionEndpoint)endpoint).send(msg, checkForDuplicates, seq);
-   }
-   
    public ConsumerDelegate createConsumerDelegate(JBossDestination destination, String selector,
                                                   boolean noLocal, String subscriptionName,
                                                   boolean connectionConsumer, boolean autoFlowControl) throws JMSException

Modified: branches/Branch_Stable/src/main/org/jboss/jms/wireformat/ConsumerChangeRateRequest.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/wireformat/ConsumerChangeRateRequest.java	2007-11-30 17:46:34 UTC (rev 3389)
+++ branches/Branch_Stable/src/main/org/jboss/jms/wireformat/ConsumerChangeRateRequest.java	2007-11-30 17:55:48 UTC (rev 3390)
@@ -66,13 +66,11 @@
       ConsumerEndpoint endpoint = 
          (ConsumerEndpoint)Dispatcher.instance.getTarget(objectId);
       
-      if (endpoint == null)
+      if (endpoint != null)
       {
-         throw new IllegalStateException("Cannot find object in dispatcher with id " + objectId);
+         endpoint.changeRate(newRate);
       }
       
-      endpoint.changeRate(newRate);
-      
       return null;
    }
 
@@ -83,22 +81,7 @@
       os.writeFloat(newRate);
       
       os.flush();
-   }
-   
-   // Until we have NBIO transport this needs to be synchronous otherwise we can get out of sync
-   // due to earlier invocations overtaking later invocations.
-   
-//   public Object getPayload()
-//   {
-//      OnewayInvocation oi = new OnewayInvocation(this);
-//
-//      InvocationRequest request =
-//         new InvocationRequest(null, ServerPeer.REMOTING_JMS_SUBSYSTEM,
-//                               oi, ONE_WAY_METADATA, null, null);
-//      
-//      return request;     
-//   }
-
+   }     
 }
 
 

Modified: branches/Branch_Stable/src/main/org/jboss/jms/wireformat/SessionSendRequest.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/wireformat/SessionSendRequest.java	2007-11-30 17:46:34 UTC (rev 3389)
+++ branches/Branch_Stable/src/main/org/jboss/jms/wireformat/SessionSendRequest.java	2007-11-30 17:55:48 UTC (rev 3390)
@@ -47,8 +47,8 @@
    
    private JBossMessage msg;
    private boolean checkForDuplicates;
-   private long sequence;
-   
+   private boolean oneway;
+ 
    public SessionSendRequest()
    {      
    }
@@ -57,12 +57,12 @@
                              byte version,
                              JBossMessage msg,
                              boolean checkForDuplicates,
-                             long sequence)
+                             boolean oneway)
    {
       super(objectId, PacketSupport.REQ_SESSION_SEND, version);
       this.msg = msg;
       this.checkForDuplicates = checkForDuplicates;
-      this.sequence = sequence;
+      this.oneway = oneway;
    }
 
    public void read(DataInputStream is) throws Exception
@@ -77,7 +77,7 @@
 
       checkForDuplicates = is.readBoolean();
       
-      sequence = is.readLong();
+      oneway = is.readBoolean();
    }
 
    public ResponseSupport serverInvoke() throws Exception
@@ -87,21 +87,14 @@
       
       if (advised != null)
       {         
-         advised.send(msg, checkForDuplicates, sequence);
+         advised.send(msg, checkForDuplicates);
       }
       else
       {      	
-      	if (sequence == -1)
-      	{
-      		//Persistent message
-
-      		throw new IllegalStateException("Cannot find object in dispatcher with id " + objectId);
-      	}
-      	else
-      	{
-      		// Since NP messages are sent one way, there is a possibility the session has closed
-         	//by the time the message arrives, so we ignore this
-      	}
+         if (!oneway)
+         {
+            throw new IllegalStateException("Cannot find object in dispatcher with id " + objectId);
+         }
       }
       
       return null;
@@ -117,14 +110,14 @@
 
       os.writeBoolean(checkForDuplicates);
       
-      os.writeLong(sequence);
+      os.writeBoolean(oneway);
       
       os.flush();
    }
    
    public Object getPayload()
    {
-   	if (sequence != -1)
+   	if (oneway)
    	{	   	
 	   	OnewayInvocation oi = new OnewayInvocation(this);
 	

Modified: branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2007-11-30 17:46:34 UTC (rev 3389)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2007-11-30 17:55:48 UTC (rev 3390)
@@ -2367,7 +2367,7 @@
       map.put("CREATE_IDX_MESSAGE_REF_MESSAGE_ID",
               "CREATE INDEX JBM_MSG_REF_MESSAGE_ID ON JBM_MSG_REF (MESSAGE_ID)");
       map.put("CREATE_IDX_MESSAGE_REF_SCHED_DELIVERY",
-              "CREATE INDEX JBM_MSG_REF_SCHED_DELIVERY ON JBM_MSG_REF (SCHED_DELIVERY)");
+              "CREATE INDEX JBM_MSG_REF_SCHED_DELIVERY ON JBM_MSG_REF (SCHED_DELIVERY)");    
       // Message
       map.put("CREATE_MESSAGE",
               "CREATE TABLE JBM_MSG (MESSAGE_ID BIGINT, RELIABLE CHAR(1), "
@@ -2450,7 +2450,7 @@
               "UPDATE JBM_MSG SET HEADERS=?, PAYLOAD=? WHERE MESSAGE_ID=?");
       map.put("MESSAGE_ID_COLUMN", "MESSAGE_ID");
       map.put("DELETE_MESSAGE",
-              "DELETE FROM JBM_MSG WHERE MESSAGE_ID = ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = ?)");
+              "DELETE FROM JBM_MSG WHERE MESSAGE_ID = ? AND NOT EXISTS (SELECT JBM_MSG_REF.MESSAGE_ID FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = ?)");
       // Transaction
       map.put("INSERT_TRANSACTION",
               "INSERT INTO JBM_TX (NODE_ID, TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID) "

Deleted: branches/Branch_Stable/src/main/org/jboss/messaging/util/Reorderer.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/util/Reorderer.java	2007-11-30 17:46:34 UTC (rev 3389)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/util/Reorderer.java	2007-11-30 17:55:48 UTC (rev 3390)
@@ -1,107 +0,0 @@
-/*
-  * JBoss, Home of Professional Open Source
-  * Copyright 2005, JBoss Inc., and individual contributors as indicated
-  * by the @authors tag. See the copyright.txt in the distribution for a
-  * full listing of individual contributors.
-  *
-  * This is free software; you can redistribute it and/or modify it
-  * under the terms of the GNU Lesser General Public License as
-  * published by the Free Software Foundation; either version 2.1 of
-  * the License, or (at your option) any later version.
-  *
-  * This software is distributed in the hope that it will be useful,
-  * but WITHOUT ANY WARRANTY; without even the implied warranty of
-  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-  * Lesser General Public License for more details.
-  *
-  * You should have received a copy of the GNU Lesser General Public
-  * License along with this software; if not, write to the Free
-  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-  */
-package org.jboss.messaging.util;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.jboss.logging.Logger;
-
-/**
- * 
- * A Reorderer
- * 
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- */
-public abstract class Reorderer
-{
-   private static final Logger log = Logger.getLogger(Reorderer.class);   
-   
-   private static final long WAIT_TIMEOUT = 5 * 1000;
-      
-   private long expectedSequence = 0;
-   
-   private Map<Long, Object> heldBack = new HashMap<Long, Object>();
-   
-   public synchronized void reset(long expectedSequence)
-   {
-      heldBack.clear();
-      
-      this.expectedSequence = expectedSequence;
-   }
-   
-   public synchronized void handle(Object object, long thisSequence) throws Exception
-   {
-      //Need to make sure it is in correct order since np messages are sent
-      //one way so they can arrive out of sequence
-      
-      //This is a workaround to allow us to use one way messages for np messages for performance
-      //reasons
-                   
-      if (thisSequence == expectedSequence)
-      {
-         do
-         {
-            execute(object);
-            
-            expectedSequence++;
-            
-            object = heldBack.remove(expectedSequence);
-            
-         } while (object != null);                
-      }
-      else
-      {
-         //Not the expected one - add it to the map
-         
-         heldBack.put(thisSequence, object);               
-      }
-      
-      notify();      
-   }
-   
-   public synchronized void waitToArrive(long sequence)
-   {          
-      long wait = WAIT_TIMEOUT;
-      
-      while (sequence != expectedSequence && wait > 0)
-      {
-         long start = System.currentTimeMillis(); 
-         try
-         {
-            wait();
-         }
-         catch (InterruptedException e)
-         {                 
-         }
-         wait -= (System.currentTimeMillis() - start);
-      }
-      
-      if (wait <= 0)
-      {
-         log.warn("Timed out waiting for last message");
-      }             
-   }
-   
-   public abstract void execute(Object object) throws Exception;   
-}

Modified: branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/JMSTest.java
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/JMSTest.java	2007-11-30 17:46:34 UTC (rev 3389)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/JMSTest.java	2007-11-30 17:55:48 UTC (rev 3390)
@@ -82,6 +82,10 @@
 	      TextMessage m = session.createTextMessage("message one");
 	
 	      prod.send(m);
+	      
+	      // It's np so give it some time to hit the server before closing
+         
+         Thread.sleep(2000);   
 	
 	      conn.close();
 	
@@ -124,6 +128,10 @@
 	      m.setText("message one");
 	
 	      prod.send(m);
+	      
+	      //It's np so give it some time to hit the server before closing
+	      
+	      Thread.sleep(2000);
 	
 	      conn.close();
 	
@@ -164,6 +172,11 @@
 	      TextMessage m = session.createTextMessage("message one");
 	
 	      prod.send(m);
+	      
+	      // It's np so give it some time to hit the server before closing
+         
+         Thread.sleep(2000);
+   	      
 	
 	      conn.close();
 	
@@ -291,6 +304,11 @@
 	      prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
 	      TextMessage m = session.createTextMessage("one");
 	      prod.send(m);
+	      
+	      //It's np so give it some time to hit the server before closing
+         
+         Thread.sleep(2000);
+   
 	
 	      conn.close();
 	

Modified: branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java	2007-11-30 17:46:34 UTC (rev 3389)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java	2007-11-30 17:55:48 UTC (rev 3390)
@@ -667,7 +667,7 @@
          JBossMessage msg = new JBossMessage(123);
          
          RequestSupport req =
-            new SessionSendRequest("23", (byte)77, msg, false, 123);
+            new SessionSendRequest("23", (byte)77, msg, false, true);
                  
          testPacket(req, PacketSupport.REQ_SESSION_SEND);                           
       }

Modified: branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/container/RemotingJMXWrapper.java
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/container/RemotingJMXWrapper.java	2007-11-30 17:46:34 UTC (rev 3389)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/container/RemotingJMXWrapper.java	2007-11-30 17:55:48 UTC (rev 3390)
@@ -70,9 +70,14 @@
       {
          return;
       }
-
-      connector = new Connector();
-      connector.setInvokerLocator(locator.getLocatorURI());
+      
+      Map config = new HashMap();
+      
+      config.put("onewayThreadPool", "org.jboss.jms.server.remoting.DirectThreadPool");
+      
+      connector = new Connector(locator, config);
+      
+      connector.create();
       connector.start();
    }
 

Deleted: branches/Branch_Stable/tests/src/org/jboss/test/messaging/util/ReordererTest.java
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/util/ReordererTest.java	2007-11-30 17:46:34 UTC (rev 3389)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/util/ReordererTest.java	2007-11-30 17:55:48 UTC (rev 3390)
@@ -1,188 +0,0 @@
-/**
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.test.messaging.util;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.jboss.messaging.util.Reorderer;
-import org.jboss.test.messaging.MessagingTestCase;
-
-/**
- * 
- * A ReordererTest
- * 
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- */
-public class ReordererTest extends MessagingTestCase
-{
-   // Constants ------------------------------------------------------------------------------------
-
-   // Static ---------------------------------------------------------------------------------------
-
-   // Attributes -----------------------------------------------------------------------------------
-
-   // Constructors ---------------------------------------------------------------------------------
-
-   public ReordererTest(String name)
-   {
-      super(name);
-   }
-
-   // Public ---------------------------------------------------------------------------------------  
-
-   public void testOrder() throws Exception
-   {
-      final List<Integer> list = new ArrayList<Integer>();
-      
-      class MyReorderer extends Reorderer
-      {
-         @Override
-         public void execute(Object object) throws Exception
-         {
-            list.add((Integer)object);
-         }         
-      }
-      
-      Reorderer reorderer = new MyReorderer();
-      
-      reorderer.handle(3, 3);
-      reorderer.handle(1, 1);
-      reorderer.handle(9, 9);
-      reorderer.handle(6, 6);
-      reorderer.handle(0, 0);
-      reorderer.handle(2, 2);
-      reorderer.handle(5, 5);
-      reorderer.handle(8, 8);
-      reorderer.handle(7, 7);
-      reorderer.handle(4, 4);
-   
-      for (int i = 0; i < 10; i++)
-      {
-         assertEquals(Integer.valueOf(i), (Integer)list.get(i));
-      }
-   
-   }
-   
-   public void testHoldBack() throws Exception
-   {
-      final List<Integer> list = new ArrayList<Integer>();
-      
-      class MyReorderer extends Reorderer
-      {
-         @Override
-         public void execute(Object object) throws Exception
-         {
-            list.add((Integer)object);
-         }         
-      }
-      
-      Reorderer reorderer = new MyReorderer();
-      
-      reorderer.handle(3, 3);
-      assertTrue(list.isEmpty());
-      reorderer.handle(1, 1);
-      assertTrue(list.isEmpty());
-      reorderer.handle(9, 9);
-      assertTrue(list.isEmpty());
-      reorderer.handle(6, 6);
-      assertTrue(list.isEmpty());
-      reorderer.handle(0, 0);
-      assertEquals(Integer.valueOf(0), (Integer)list.get(0));
-      assertEquals(Integer.valueOf(1), (Integer)list.get(1));
-      assertEquals(2, list.size());
-      reorderer.handle(2, 2);
-      assertEquals(Integer.valueOf(0), (Integer)list.get(0));
-      assertEquals(Integer.valueOf(1), (Integer)list.get(1));
-      assertEquals(Integer.valueOf(2), (Integer)list.get(2));
-      assertEquals(Integer.valueOf(3), (Integer)list.get(3));
-      assertEquals(4, list.size());
-      reorderer.handle(5, 5);
-      assertEquals(Integer.valueOf(0), (Integer)list.get(0));
-      assertEquals(Integer.valueOf(1), (Integer)list.get(1));
-      assertEquals(Integer.valueOf(2), (Integer)list.get(2));
-      assertEquals(Integer.valueOf(3), (Integer)list.get(3));
-      assertEquals(4, list.size());
-      reorderer.handle(4, 4);
-      assertEquals(Integer.valueOf(0), (Integer)list.get(0));
-      assertEquals(Integer.valueOf(1), (Integer)list.get(1));
-      assertEquals(Integer.valueOf(2), (Integer)list.get(2));
-      assertEquals(Integer.valueOf(3), (Integer)list.get(3));
-      assertEquals(Integer.valueOf(4), (Integer)list.get(4));
-      assertEquals(Integer.valueOf(5), (Integer)list.get(5));
-      assertEquals(Integer.valueOf(6), (Integer)list.get(6));
-      assertEquals(7, list.size());
-      reorderer.handle(8, 8);
-      assertEquals(Integer.valueOf(0), (Integer)list.get(0));
-      assertEquals(Integer.valueOf(1), (Integer)list.get(1));
-      assertEquals(Integer.valueOf(2), (Integer)list.get(2));
-      assertEquals(Integer.valueOf(3), (Integer)list.get(3));
-      assertEquals(Integer.valueOf(4), (Integer)list.get(4));
-      assertEquals(Integer.valueOf(5), (Integer)list.get(5));
-      assertEquals(Integer.valueOf(6), (Integer)list.get(6));
-      assertEquals(7, list.size());
-      reorderer.handle(7, 7);
-      assertEquals(Integer.valueOf(0), (Integer)list.get(0));
-      assertEquals(Integer.valueOf(1), (Integer)list.get(1));
-      assertEquals(Integer.valueOf(2), (Integer)list.get(2));
-      assertEquals(Integer.valueOf(3), (Integer)list.get(3));
-      assertEquals(Integer.valueOf(4), (Integer)list.get(4));
-      assertEquals(Integer.valueOf(5), (Integer)list.get(5));
-      assertEquals(Integer.valueOf(6), (Integer)list.get(6));
-      assertEquals(Integer.valueOf(7), (Integer)list.get(7));
-      assertEquals(Integer.valueOf(8), (Integer)list.get(8));
-      assertEquals(Integer.valueOf(9), (Integer)list.get(9));
-      assertEquals(10, list.size());
-      
-   
-      for (int i = 0; i < 10; i++)
-      {
-         assertEquals(Integer.valueOf(i), (Integer)list.get(i));
-      }
-   
-   }
-   
-   public void testOrderMustStartWithZero() throws Exception
-   {
-      final List<Integer> list = new ArrayList<Integer>();
-      
-      class MyReorderer extends Reorderer
-      {
-         @Override
-         public void execute(Object object) throws Exception
-         {
-            list.add((Integer)object);
-         }         
-      }
-      
-      Reorderer reorderer = new MyReorderer();
-      
-      reorderer.handle(3, 3);
-      reorderer.handle(1, 1);
-      reorderer.handle(9, 9);
-      reorderer.handle(6, 6);
-      reorderer.handle(2, 2);
-      reorderer.handle(5, 5);
-      reorderer.handle(8, 8);
-      reorderer.handle(7, 7);
-      reorderer.handle(4, 4);
-   
-      assertTrue(list.isEmpty());
-   
-   }
-   
-   // Package protected ----------------------------------------------------------------------------
-
-   // Protected ------------------------------------------------------------------------------------
-
-   // Private --------------------------------------------------------------------------------------
-
-   // Inner classes --------------------------------------------------------------------------------
-   
-   
-}




More information about the jboss-cvs-commits mailing list