[jboss-cvs] JBoss Messaging SVN: r4152 - in trunk: src/main/org/jboss/messaging/core/remoting and 12 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue May 6 14:00:34 EDT 2008


Author: timfox
Date: 2008-05-06 14:00:33 -0400 (Tue, 06 May 2008)
New Revision: 4152

Removed:
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaInspector.java
   trunk/src/main/org/jboss/messaging/core/server/ObjectIDGenerator.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ObjectIDGeneratorImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaInspectorTest.java
Modified:
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientBrowserImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnection.java
   trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/NIOSession.java
   trunk/src/main/org/jboss/messaging/core/remoting/Packet.java
   trunk/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectorRegistryImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/PacketDispatcherImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/AbstractPacketCodec.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/SessionCreateConsumerMessageCodec.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/SessionCreateProducerMessageCodec.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/INVMSession.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaSession.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerMessage.java
   trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java
   trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java
   trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerBrowserImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionPacketHandler.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerPacketHandler.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerPacketHandlerSupport.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerPacketHandler.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
   trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/BrowserTest.java
   trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/DeliveryOrderTest.java
   trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MiscellaneousTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaSessionTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/SessionTestBase.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/wireformat/PacketTypeTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/util/SpawnedVMSupport.java
Log:
Re-implemented request response


Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientBrowserImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientBrowserImpl.java	2008-05-06 16:16:37 UTC (rev 4151)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientBrowserImpl.java	2008-05-06 18:00:33 UTC (rev 4152)
@@ -83,7 +83,7 @@
       
       try
       {
-         remotingConnection.send(serverTargetID, session.getServerTargetID(), new PacketImpl(CLOSE));
+         remotingConnection.sendBlocking(serverTargetID, session.getServerTargetID(), new PacketImpl(CLOSE));
       }
       finally
       {
@@ -102,7 +102,7 @@
    {
       checkClosed();
       
-      remotingConnection.send(serverTargetID, session.getServerTargetID(), new PacketImpl(SESS_BROWSER_RESET));
+      remotingConnection.sendBlocking(serverTargetID, session.getServerTargetID(), new PacketImpl(SESS_BROWSER_RESET));
    }
 
    public boolean hasNextMessage() throws MessagingException
@@ -110,7 +110,7 @@
       checkClosed();
       
       SessionBrowserHasNextMessageResponseMessage response =
-         (SessionBrowserHasNextMessageResponseMessage)remotingConnection.send(serverTargetID, session.getServerTargetID(), new PacketImpl(SESS_BROWSER_HASNEXTMESSAGE));
+         (SessionBrowserHasNextMessageResponseMessage)remotingConnection.sendBlocking(serverTargetID, session.getServerTargetID(), new PacketImpl(SESS_BROWSER_HASNEXTMESSAGE));
       
       return response.hasNext();
    }
@@ -120,7 +120,7 @@
       checkClosed();
       
       SessionBrowserNextMessageResponseMessage response =
-         (SessionBrowserNextMessageResponseMessage)remotingConnection.send(serverTargetID, session.getServerTargetID(), new PacketImpl(SESS_BROWSER_NEXTMESSAGE));
+         (SessionBrowserNextMessageResponseMessage)remotingConnection.sendBlocking(serverTargetID, session.getServerTargetID(), new PacketImpl(SESS_BROWSER_NEXTMESSAGE));
       
       return response.getMessage();
    }
@@ -130,7 +130,7 @@
       checkClosed();
       
       SessionBrowserNextMessageBlockResponseMessage response =
-         (SessionBrowserNextMessageBlockResponseMessage)remotingConnection.send(serverTargetID, session.getServerTargetID(), new SessionBrowserNextMessageBlockMessage(maxMessages));
+         (SessionBrowserNextMessageBlockResponseMessage)remotingConnection.sendBlocking(serverTargetID, session.getServerTargetID(), new SessionBrowserNextMessageBlockMessage(maxMessages));
       return response.getMessages();
    }
 

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java	2008-05-06 16:16:37 UTC (rev 4151)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java	2008-05-06 18:00:33 UTC (rev 4152)
@@ -144,7 +144,7 @@
             new CreateConnectionRequest(clientVersion.getIncrementingVersion(), sessionID, username, password);
          
          CreateConnectionResponse response =
-            (CreateConnectionResponse)remotingConnection.send(0, request);
+            (CreateConnectionResponse)remotingConnection.sendBlocking(0, 0, request);
 
          return new ClientConnectionImpl(response.getConnectionTargetID(), strictTck, remotingConnection,
                defaultConsumerWindowSize, defaultConsumerMaxRate,

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java	2008-05-06 16:16:37 UTC (rev 4151)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java	2008-05-06 18:00:33 UTC (rev 4152)
@@ -121,7 +121,8 @@
 
       ConnectionCreateSessionMessage request = new ConnectionCreateSessionMessage(xa, autoCommitSends, autoCommitAcks);
 
-      ConnectionCreateSessionResponseMessage response = (ConnectionCreateSessionResponseMessage)remotingConnection.send(serverTargetID, request);   
+      ConnectionCreateSessionResponseMessage response =
+         (ConnectionCreateSessionResponseMessage)remotingConnection.sendBlocking(serverTargetID, serverTargetID, request);   
 
       ClientSession session =
       	new ClientSessionImpl(this, response.getSessionID(), ackBatchSize, cacheProducers,
@@ -137,14 +138,14 @@
    {
       checkClosed();
        
-      remotingConnection.send(serverTargetID, serverTargetID, new PacketImpl(CONN_START), true);
+      remotingConnection.sendOneWay(serverTargetID, serverTargetID, new PacketImpl(CONN_START));
    }
    
    public void stop() throws MessagingException
    {
       checkClosed();
       
-      remotingConnection.send(serverTargetID, new PacketImpl(CONN_STOP));
+      remotingConnection.sendBlocking(serverTargetID, serverTargetID, new PacketImpl(CONN_STOP));
    }
 
    public void setRemotingSessionListener(final RemotingSessionListener listener) throws MessagingException
@@ -165,7 +166,7 @@
       {
          closeChildren();
          
-         remotingConnection.send(serverTargetID, new PacketImpl(CLOSE));
+         remotingConnection.sendBlocking(serverTargetID, serverTargetID, new PacketImpl(CLOSE));
       }
       finally
       {

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-05-06 16:16:37 UTC (rev 4151)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-05-06 18:00:33 UTC (rev 4152)
@@ -65,6 +65,8 @@
       
    private final long targetID;
    
+   private final long clientTargetID;
+   
    private final ExecutorService sessionExecutor;
    
    private final RemotingConnection remotingConnection;
@@ -100,12 +102,15 @@
     * (and its PacketDispatcher) to a single server.
     */
    public ClientConsumerImpl(final ClientSessionInternal session, final long targetID,
+                             final long clientTargetID,
                              final ExecutorService sessionExecutor,
                              final RemotingConnection remotingConnection,
                              final boolean direct, final int tokenBatchSize)
    {
       this.targetID = targetID;
       
+      this.clientTargetID = clientTargetID;
+      
       this.session = session;
       
       this.sessionExecutor = sessionExecutor;
@@ -256,9 +261,9 @@
          
          receiverThread = null;
 
-         remotingConnection.send(targetID, session.getServerTargetID(), new PacketImpl(CLOSE));
+         remotingConnection.sendBlocking(targetID, session.getServerTargetID(), new PacketImpl(CLOSE));
 
-         remotingConnection.getPacketDispatcher().unregister(targetID);
+         remotingConnection.getPacketDispatcher().unregister(clientTargetID);
       }
       finally
       {
@@ -380,7 +385,7 @@
          {
             tokensToSend = 0;
             
-            remotingConnection.send(targetID, session.getServerTargetID(), new ConsumerFlowTokenMessage(tokenBatchSize), true);                  
+            remotingConnection.sendOneWay(targetID, session.getServerTargetID(), new ConsumerFlowTokenMessage(tokenBatchSize));                  
          }
       }
    }

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java	2008-05-06 16:16:37 UTC (rev 4151)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java	2008-05-06 18:00:33 UTC (rev 4152)
@@ -53,6 +53,8 @@
    
    private final long serverTargetID;
    
+   private final long clientTargetID;
+   
    private final ClientSessionInternal session;
    
    private final RemotingConnection remotingConnection;
@@ -72,6 +74,7 @@
    // Constructors ---------------------------------------------------------------------------------
       
    public ClientProducerImpl(final ClientSessionInternal session, final long serverTargetID,
+                             final long clientTargetID,
    		                    final SimpleString address,
    		                    final RemotingConnection remotingConnection, final int windowSize,
    		                    final int maxRate)
@@ -80,6 +83,8 @@
       
       this.serverTargetID = serverTargetID;
       
+      this.clientTargetID = clientTargetID;
+      
       this.address = address;
       
       this.remotingConnection = remotingConnection;
@@ -141,7 +146,14 @@
    		windowSize--;
    	}
    	
-   	remotingConnection.send(serverTargetID, session.getServerTargetID(), message, !msg.isDurable());
+   	if (msg.isDurable())
+   	{
+   	   remotingConnection.sendBlocking(serverTargetID, session.getServerTargetID(), message);
+   	}
+   	else
+   	{
+   	   remotingConnection.sendOneWay(serverTargetID, session.getServerTargetID(), message);
+   	}
    	 	   	
    	if (rateLimiter != null)
    	{
@@ -170,7 +182,7 @@
       
       session.removeProducer(this);
       
-      remotingConnection.getPacketDispatcher().unregister(serverTargetID);
+      remotingConnection.getPacketDispatcher().unregister(clientTargetID);
       
       closed = true;
    }

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-05-06 16:16:37 UTC (rev 4151)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-05-06 18:00:33 UTC (rev 4152)
@@ -214,14 +214,14 @@
 
       SessionCreateQueueMessage request = new SessionCreateQueueMessage(address, queueName, filterString, durable, temporary);
 
-      remotingConnection.send(serverTargetID, request);
+      remotingConnection.sendBlocking(serverTargetID, serverTargetID, request);
    }
 
    public void deleteQueue(final SimpleString queueName) throws MessagingException
    {
       checkClosed();
 
-      remotingConnection.send(serverTargetID, new SessionDeleteQueueMessage(queueName));
+      remotingConnection.sendBlocking(serverTargetID, serverTargetID, new SessionDeleteQueueMessage(queueName));
    }
    
    public SessionQueueQueryResponseMessage queueQuery(final SimpleString queueName) throws MessagingException
@@ -230,7 +230,7 @@
       
       SessionQueueQueryMessage request = new SessionQueueQueryMessage(queueName);
       
-      SessionQueueQueryResponseMessage response = (SessionQueueQueryResponseMessage)remotingConnection.send(serverTargetID, request);
+      SessionQueueQueryResponseMessage response = (SessionQueueQueryResponseMessage)remotingConnection.sendBlocking(serverTargetID, serverTargetID, request);
       
       return response;
    }
@@ -241,7 +241,7 @@
       
       SessionBindingQueryMessage request = new SessionBindingQueryMessage(address);
       
-      SessionBindingQueryResponseMessage response = (SessionBindingQueryResponseMessage)remotingConnection.send(serverTargetID, request);
+      SessionBindingQueryResponseMessage response = (SessionBindingQueryResponseMessage)remotingConnection.sendBlocking(serverTargetID, serverTargetID, request);
       
       return response;
    }
@@ -252,7 +252,7 @@
       
       SessionAddDestinationMessage request = new SessionAddDestinationMessage(address, temporary);
       
-      remotingConnection.send(serverTargetID, request);
+      remotingConnection.sendBlocking(serverTargetID, serverTargetID, request);
    }
    
    public void removeDestination(final SimpleString address, final boolean temporary) throws MessagingException
@@ -261,31 +261,34 @@
       
       SessionRemoveDestinationMessage request = new SessionRemoveDestinationMessage(address, temporary);
       
-      remotingConnection.send(serverTargetID, request);  
+      remotingConnection.sendBlocking(serverTargetID, serverTargetID, request);  
    }
    
    public ClientConsumer createConsumer(final SimpleString queueName, final SimpleString filterString, final boolean noLocal,
                                         final boolean autoDeleteQueue, final boolean direct) throws MessagingException
    {
       checkClosed();
+      
+      long clientTargetID = remotingConnection.getPacketDispatcher().generateID();
     
       SessionCreateConsumerMessage request =
-         new SessionCreateConsumerMessage(queueName, filterString, noLocal, autoDeleteQueue,
+         new SessionCreateConsumerMessage(clientTargetID, queueName, filterString, noLocal, autoDeleteQueue,
          		                           defaultConsumerWindowSize, defaultConsumerMaxRate);
       
-      SessionCreateConsumerResponseMessage response = (SessionCreateConsumerResponseMessage)remotingConnection.send(serverTargetID, request);
+      SessionCreateConsumerResponseMessage response = (SessionCreateConsumerResponseMessage)remotingConnection.sendBlocking(serverTargetID, serverTargetID, request);
       
       ClientConsumerInternal consumer =
-         new ClientConsumerImpl(this, response.getConsumerTargetID(), executor, remotingConnection, direct, 1);
+         new ClientConsumerImpl(this, response.getConsumerTargetID(), clientTargetID, executor, remotingConnection, direct, 1);
 
       consumers.put(response.getConsumerTargetID(), consumer);
+      
 
-      remotingConnection.getPacketDispatcher().register(new ClientConsumerPacketHandler(consumer, response.getConsumerTargetID()));
+      remotingConnection.getPacketDispatcher().register(new ClientConsumerPacketHandler(consumer, clientTargetID));
       
       //Now we send window size tokens to start the consumption
       //We even send it if windowSize == -1, since we need to start the consumer
       
-      remotingConnection.send(response.getConsumerTargetID(), serverTargetID, new ConsumerFlowTokenMessage(response.getWindowSize()), true);
+      remotingConnection.sendOneWay(response.getConsumerTargetID(), serverTargetID, new ConsumerFlowTokenMessage(response.getWindowSize()));
 
       return consumer;
    }
@@ -296,7 +299,7 @@
 
       SessionCreateBrowserMessage request = new SessionCreateBrowserMessage(queueName, filterString);
 
-      SessionCreateBrowserResponseMessage response = (SessionCreateBrowserResponseMessage)remotingConnection.send(serverTargetID, request);
+      SessionCreateBrowserResponseMessage response = (SessionCreateBrowserResponseMessage)remotingConnection.sendBlocking(serverTargetID, serverTargetID, request);
 
       ClientBrowser browser = new ClientBrowserImpl(response.getBrowserTargetID(), this, remotingConnection);  
 
@@ -323,18 +326,20 @@
 
       if (producer == null)
       {
-      	SessionCreateProducerMessage request = new SessionCreateProducerMessage(address, windowSize, maxRate);
+         long clientTargetID = remotingConnection.getPacketDispatcher().generateID();
+         
+      	SessionCreateProducerMessage request = new SessionCreateProducerMessage(clientTargetID, address, windowSize, maxRate);
       	
       	SessionCreateProducerResponseMessage response =
-      		(SessionCreateProducerResponseMessage)remotingConnection.send(serverTargetID, request);
+      		(SessionCreateProducerResponseMessage)remotingConnection.sendBlocking(serverTargetID, serverTargetID, request);
       	
       	//maxRate and windowSize can be overridden by the server
       	
-      	producer = new ClientProducerImpl(this, response.getProducerTargetID(), address,
+      	producer = new ClientProducerImpl(this, response.getProducerTargetID(), clientTargetID, address,
       			                            remotingConnection, response.getWindowSize(),
       			                            response.getMaxRate());  
       	
-      	remotingConnection.getPacketDispatcher().register(new ClientProducerPacketHandler(producer, response.getProducerTargetID()));
+      	remotingConnection.getPacketDispatcher().register(new ClientProducerPacketHandler(producer, clientTargetID));
       }
 
       producers.add(producer);
@@ -363,7 +368,7 @@
         
       acknowledgeInternal(false);
       
-      remotingConnection.send(serverTargetID, new PacketImpl(SESS_COMMIT));
+      remotingConnection.sendBlocking(serverTargetID, serverTargetID, new PacketImpl(SESS_COMMIT));
       
       lastCommittedID = lastID;
    }
@@ -391,7 +396,7 @@
 
       toAckCount = 0;
 
-      remotingConnection.send(serverTargetID, new PacketImpl(SESS_ROLLBACK));   
+      remotingConnection.sendBlocking(serverTargetID, serverTargetID, new PacketImpl(SESS_ROLLBACK));   
    }
    
    public void acknowledge() throws MessagingException
@@ -409,7 +414,7 @@
       
       if (deliveryExpired)
       {
-         remotingConnection.send(serverTargetID, serverTargetID, new SessionCancelMessage(lastID, true), true);
+         remotingConnection.sendOneWay(serverTargetID, serverTargetID, new SessionCancelMessage(lastID, true));
          
          toAckCount = 0;
       }
@@ -452,7 +457,7 @@
          
          acknowledgeInternal(false);      
          
-         remotingConnection.send(serverTargetID, new PacketImpl(CLOSE));            
+         remotingConnection.sendBlocking(serverTargetID, serverTargetID, new PacketImpl(CLOSE));            
       }
       finally
       {
@@ -513,7 +518,7 @@
 
       //2. cancel all deliveries on server but not in tx
             
-      remotingConnection.send(serverTargetID, new SessionCancelMessage(-1, false));
+      remotingConnection.sendBlocking(serverTargetID, serverTargetID, new SessionCancelMessage(-1, false));
    }
    
    public void removeProducer(final ClientProducerInternal producer)
@@ -541,7 +546,7 @@
       { 
          SessionXACommitMessage packet = new SessionXACommitMessage(xid, onePhase);
                   
-         SessionXAResponseMessage response = (SessionXAResponseMessage)remotingConnection.send(serverTargetID, packet);
+         SessionXAResponseMessage response = (SessionXAResponseMessage)remotingConnection.sendBlocking(serverTargetID, serverTargetID, packet);
          
          if (response.isError())
          {
@@ -582,7 +587,7 @@
          //Need to flush any acks to server first
          acknowledgeInternal(false);
          
-         SessionXAResponseMessage response = (SessionXAResponseMessage)remotingConnection.send(serverTargetID, packet);
+         SessionXAResponseMessage response = (SessionXAResponseMessage)remotingConnection.sendBlocking(serverTargetID, serverTargetID, packet);
          
          if (response.isError())
          {
@@ -604,7 +609,7 @@
          //Need to flush any acks to server first
          acknowledgeInternal(false);
                   
-         SessionXAResponseMessage response = (SessionXAResponseMessage)remotingConnection.send(serverTargetID, new SessionXAForgetMessage(xid));
+         SessionXAResponseMessage response = (SessionXAResponseMessage)remotingConnection.sendBlocking(serverTargetID, serverTargetID, new SessionXAForgetMessage(xid));
          
          if (response.isError())
          {
@@ -623,7 +628,7 @@
       try
       {                              
          SessionXAGetTimeoutResponseMessage response =
-            (SessionXAGetTimeoutResponseMessage)remotingConnection.send(serverTargetID, new PacketImpl(SESS_XA_GET_TIMEOUT));
+            (SessionXAGetTimeoutResponseMessage)remotingConnection.sendBlocking(serverTargetID, serverTargetID, new PacketImpl(SESS_XA_GET_TIMEOUT));
          
          return response.getTimeoutSeconds();
       }
@@ -658,7 +663,7 @@
       {
          SessionXAPrepareMessage packet = new SessionXAPrepareMessage(xid);
          
-         SessionXAResponseMessage response = (SessionXAResponseMessage)remotingConnection.send(serverTargetID, packet);
+         SessionXAResponseMessage response = (SessionXAResponseMessage)remotingConnection.sendBlocking(serverTargetID, serverTargetID, packet);
          
          if (response.isError())
          {
@@ -681,7 +686,7 @@
    {
       try
       {
-         SessionXAGetInDoubtXidsResponseMessage response = (SessionXAGetInDoubtXidsResponseMessage)remotingConnection.send(serverTargetID, new PacketImpl(SESS_XA_INDOUBT_XIDS));
+         SessionXAGetInDoubtXidsResponseMessage response = (SessionXAGetInDoubtXidsResponseMessage)remotingConnection.sendBlocking(serverTargetID, serverTargetID, new PacketImpl(SESS_XA_INDOUBT_XIDS));
          
          List<Xid> xids = response.getXids();
          
@@ -702,7 +707,7 @@
       {
          SessionXARollbackMessage packet = new SessionXARollbackMessage(xid);
          
-         SessionXAResponseMessage response = (SessionXAResponseMessage)remotingConnection.send(serverTargetID, packet);
+         SessionXAResponseMessage response = (SessionXAResponseMessage)remotingConnection.sendBlocking(serverTargetID, serverTargetID, packet);
          
          if (response.isError())
          {
@@ -722,7 +727,7 @@
       try
       {                              
          SessionXASetTimeoutResponseMessage response =
-            (SessionXASetTimeoutResponseMessage)remotingConnection.send(serverTargetID, new SessionXASetTimeoutMessage(seconds));
+            (SessionXASetTimeoutResponseMessage)remotingConnection.sendBlocking(serverTargetID, serverTargetID, new SessionXASetTimeoutMessage(seconds));
          
          return response.isOK();
       }
@@ -762,7 +767,7 @@
             throw new XAException(XAException.XAER_INVAL);
          }
                      
-         SessionXAResponseMessage response = (SessionXAResponseMessage)remotingConnection.send(serverTargetID, packet);
+         SessionXAResponseMessage response = (SessionXAResponseMessage)remotingConnection.sendBlocking(serverTargetID, serverTargetID, packet);
          
          if (response.isError())
          {
@@ -800,8 +805,15 @@
       
       SessionAcknowledgeMessage message = new SessionAcknowledgeMessage(lastID, !broken);
             
-      remotingConnection.send(serverTargetID, serverTargetID, message, !block);
-      
+      if (block)
+      {
+         remotingConnection.sendBlocking(serverTargetID, serverTargetID, message);
+      }
+      else
+      {
+         remotingConnection.sendOneWay(serverTargetID, serverTargetID, message);
+      }
+            
       acked = true;
    }
    

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnection.java	2008-05-06 16:16:37 UTC (rev 4151)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnection.java	2008-05-06 18:00:33 UTC (rev 4152)
@@ -21,15 +21,9 @@
    
    public long getSessionID();
  
-   /**
-    * Use this method if the packet is to be executed in the context of the targetID (i.e. for
-    * sessions, connections & connections factories)
-    */
-   Packet send(long targetID, Packet packet) throws MessagingException;
-
-   Packet send(long targetID, long executorID, Packet packet) throws MessagingException;
+   Packet sendBlocking(long targetID, long executorID, Packet packet) throws MessagingException;
    
-   Packet send(long targetID, long executorID, Packet packet, boolean oneWay) throws MessagingException;
+   void sendOneWay(long targetID, long executorID, Packet packet) throws MessagingException;
    
    void setRemotingSessionListener(RemotingSessionListener newListener);
    

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnectionImpl.java	2008-05-06 16:16:37 UTC (rev 4151)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnectionImpl.java	2008-05-06 18:00:33 UTC (rev 4152)
@@ -21,18 +21,19 @@
   */
 package org.jboss.messaging.core.client.impl;
 
-import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.jboss.messaging.core.remoting.ConnectorRegistrySingleton.REGISTRY;
 
+import org.jboss.messaging.core.client.ConnectionParams;
+import org.jboss.messaging.core.client.Location;
 import org.jboss.messaging.core.client.RemotingSessionListener;
-import org.jboss.messaging.core.client.Location;
-import org.jboss.messaging.core.client.ConnectionParams;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.NIOConnector;
 import org.jboss.messaging.core.remoting.NIOSession;
 import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.PacketDispatcher;
+import org.jboss.messaging.core.remoting.PacketHandler;
+import org.jboss.messaging.core.remoting.PacketSender;
 import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
 
 /**
@@ -64,7 +65,7 @@
    private RemotingSessionListener listener;
 
    private transient PacketDispatcher dispatcher;
-
+   
    // Constructors ---------------------------------------------------------------------------------
 
    public RemotingConnectionImpl(final Location location, ConnectionParams connectionParams, final PacketDispatcher dispatcher) throws Exception
@@ -131,31 +132,70 @@
       return session.getID();
    }
     
-   public Packet send(final long targetID, final Packet packet) throws MessagingException
-   {
-      return send(targetID, targetID, packet);
-   }
-
    /**
     * send the packet and block until a response is received (<code>oneWay</code> is set to <code>false</code>)
     */
-   public Packet send(final long targetID, final long executorID, final Packet packet) throws MessagingException
+   public Packet sendBlocking(final long targetID, final long executorID, final Packet packet) throws MessagingException
    {
-      return send(targetID, executorID, packet, false);
+      checkConnected();
+      
+      long handlerID = dispatcher.generateID();
+      
+      ResponseHandler handler = new ResponseHandler(handlerID);
+      
+      dispatcher.register(handler);
+      
+      try
+      {  
+         packet.setTargetID(targetID);
+         packet.setExecutorID(executorID);
+         packet.setResponseTargetID(handlerID);
+            
+         try
+         {
+            session.write(packet);
+         }
+         catch (Exception e)
+         {
+            log.error("Caught unexpected exception", e);
+            
+            throw new MessagingException(MessagingException.INTERNAL_ERROR);
+         }
+         
+         Packet response = handler.waitForResponse(1000 * connectionParams.getTimeout());
+         
+         if (response == null)
+         {
+            throw new IllegalStateException("No response received for " + packet);
+         }
+         
+         if (response instanceof MessagingExceptionMessage)
+         {
+            MessagingExceptionMessage message = (MessagingExceptionMessage) response;
+            
+            throw message.getException();
+         }
+         else
+         {
+            return response;
+         } 
+      }
+      finally
+      {
+         dispatcher.unregister(handlerID);
+      }           
    }
    
-   public Packet send(final long targetID, final long executorID, final Packet packet, final boolean oneWay) throws MessagingException
+   public void sendOneWay(final long targetID, final long executorID, final Packet packet) throws MessagingException
    {
       assert packet != null;
 
       packet.setTargetID(targetID);
       packet.setExecutorID(executorID);
       
-      Packet response;
-      
       try
-      {      
-         response = (Packet) send(packet, oneWay);
+      {
+         session.write(packet);
       }
       catch (Exception e)
       {
@@ -163,22 +203,6 @@
          
          throw new MessagingException(MessagingException.INTERNAL_ERROR);
       }
-      
-      if (oneWay == false && response == null)
-      {
-         throw new IllegalStateException("No response received for " + packet);
-      }
-      
-      if (response instanceof MessagingExceptionMessage)
-      {
-         MessagingExceptionMessage message = (MessagingExceptionMessage) response;
-         
-         throw message.getException();
-      }
-      else
-      {
-         return response;
-      } 
    }
    
    public synchronized void setRemotingSessionListener(final RemotingSessionListener newListener)
@@ -214,22 +238,55 @@
    // Protected ------------------------------------------------------------------------------------
 
    // Private --------------------------------------------------------------------------------------
-
-   private Packet send(final Packet packet, final boolean oneWay) throws Exception
+      
+   private static class ResponseHandler implements PacketHandler
    {
-      assert packet != null;
-      checkConnected();
+      private long id;
+      
+      private Packet response;
+      
+      ResponseHandler(final long id)
+      {
+         this.id = id;
+      }
 
-      if (oneWay)
+      public long getID()
       {
-         session.write(packet);
-         return null;
-      } else 
+         return id;
+      }
+
+      public synchronized void handle(final Packet packet, final PacketSender sender)
       {
-         Packet response = (Packet) session.writeAndBlock(packet, 
-               connectionParams.getTimeout(), SECONDS);
-         return response;
+         this.response = packet;
+         
+         notify();
       }
+      
+      public synchronized Packet waitForResponse(final long timeout)
+      {
+         long toWait = timeout;
+         long start = System.currentTimeMillis();
+
+         while (response == null && toWait > 0)
+         {
+            try
+            {
+               wait(toWait);
+            }
+            catch (InterruptedException e)
+            {
+            }
+            
+            long now = System.currentTimeMillis();
+            
+            toWait -= now - start;
+            
+            start = now;
+         }
+         
+         return response;         
+      }
+      
    }
 
    private void checkConnected() throws MessagingException

Modified: trunk/src/main/org/jboss/messaging/core/remoting/NIOSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/NIOSession.java	2008-05-06 16:16:37 UTC (rev 4151)
+++ trunk/src/main/org/jboss/messaging/core/remoting/NIOSession.java	2008-05-06 18:00:33 UTC (rev 4152)
@@ -6,11 +6,10 @@
  */
 package org.jboss.messaging.core.remoting;
 
-import java.util.concurrent.TimeUnit;
 
-
 /**
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * 
  * @version <tt>$Revision$</tt>
  * 
@@ -19,9 +18,7 @@
 {
    long getID();
 
-   void write(Object object) throws Exception;
+   void write(Packet packet) throws Exception;
 
-   Object writeAndBlock(Packet packet, long timeout, TimeUnit timeUnit) throws Exception;
-
    boolean isConnected();
 }
\ No newline at end of file

Modified: trunk/src/main/org/jboss/messaging/core/remoting/Packet.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/Packet.java	2008-05-06 16:16:37 UTC (rev 4151)
+++ trunk/src/main/org/jboss/messaging/core/remoting/Packet.java	2008-05-06 18:00:33 UTC (rev 4152)
@@ -9,16 +9,23 @@
 
 import org.jboss.messaging.core.remoting.impl.wireformat.PacketType;
 
-
+/**
+ * 
+ * A Packet
+ * 
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
 public interface Packet
 {
    // Public --------------------------------------------------------
 
    public static final long NO_ID_SET = -1L;
 
-   void setCorrelationID(long correlationID);
+   void setResponseTargetID(long responseTargetID);
 
-   long getCorrelationID();
+   long getResponseTargetID();
 
    PacketType getType();
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java	2008-05-06 16:16:37 UTC (rev 4151)
+++ trunk/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java	2008-05-06 18:00:33 UTC (rev 4152)
@@ -26,5 +26,7 @@
 
    /** Call filters on a package */
    void callFilters(Packet packet) throws Exception;
+   
+   long generateID();
 
 }
\ No newline at end of file

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectorRegistryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectorRegistryImpl.java	2008-05-06 16:16:37 UTC (rev 4151)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectorRegistryImpl.java	2008-05-06 18:00:33 UTC (rev 4152)
@@ -6,6 +6,7 @@
  */
 package org.jboss.messaging.core.remoting.impl;
 
+import static org.jboss.messaging.core.remoting.TransportType.INVM;
 import static org.jboss.messaging.core.remoting.TransportType.TCP;
 
 import java.util.Collection;
@@ -88,8 +89,10 @@
       assert location != null;
       String key = location.getLocation();
       
+      log.info("*** Getting connector for " + location);
+      
       if (connectors.containsKey(key))
-      {
+      {         
          NIOConnectorHolder holder = connectors.get(key);
          holder.increment();
          NIOConnector connector = holder.getConnector();
@@ -126,6 +129,11 @@
       {
          connector = new MinaConnector(location, connectionParams, dispatcher);
       }
+      else if (transport == INVM)
+      {
+         PacketDispatcher localDispatcher = localDispatchers.get(key);
+         connector = new INVMConnector(idCounter.getAndIncrement(), dispatcher, localDispatcher);
+      }
 
       if (connector == null)
       {

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/PacketDispatcherImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/PacketDispatcherImpl.java	2008-05-06 16:16:37 UTC (rev 4151)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/PacketDispatcherImpl.java	2008-05-06 18:00:33 UTC (rev 4152)
@@ -6,13 +6,14 @@
  */
 package org.jboss.messaging.core.remoting.impl;
 
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.NO_ID_SET;
+import static org.jboss.messaging.core.remoting.Packet.NO_ID_SET;
 
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.Interceptor;
@@ -23,7 +24,8 @@
 import org.jboss.messaging.core.remoting.PacketSender;
 
 /**
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>.
  * 
  * @version <tt>$Revision$</tt>
  */
@@ -41,6 +43,8 @@
    private final Map<Long, PacketHandler> handlers;
    public final List<Interceptor> filters;
    private transient PacketHandlerRegistrationListener listener;
+   
+   private final AtomicLong idSequence = new AtomicLong(0);
 
    // Static --------------------------------------------------------
 
@@ -54,6 +58,19 @@
 
    // Public --------------------------------------------------------
 
+   public long generateID()
+   {
+      long id = idSequence.getAndIncrement();
+      
+      if (id == 0)
+      {
+         //ID 0 is reserved for the connection factory handler
+         id = generateID();
+      }
+      
+      return id;
+   }
+   
    public void register(final PacketHandler handler)
    { 
       handlers.put(handler.getID(), handler);

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/AbstractPacketCodec.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/AbstractPacketCodec.java	2008-05-06 16:16:37 UTC (rev 4151)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/AbstractPacketCodec.java	2008-05-06 18:00:33 UTC (rev 4152)
@@ -65,7 +65,7 @@
 
    public void encode(final P packet, final ProtocolEncoderOutput out) throws Exception
    {
-      long correlationID = packet.getCorrelationID();
+      long responseTargetID = packet.getResponseTargetID();
       long targetID = packet.getTargetID();
       long executorID = packet.getExecutorID();
       
@@ -79,7 +79,7 @@
       //The standard header fields
       buf.putInt(messageLength);
       buf.put(packet.getType().byteValue());
-      buf.putLong(correlationID);
+      buf.putLong(responseTargetID);
       buf.putLong(targetID);
       buf.putLong(executorID);
 
@@ -105,7 +105,7 @@
       
       Packet packet = decodeBody(buffer);
 
-      packet.setCorrelationID(correlationID);
+      packet.setResponseTargetID(correlationID);
       packet.setTargetID(targetID);
       packet.setExecutorID(executorID);
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/SessionCreateConsumerMessageCodec.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/SessionCreateConsumerMessageCodec.java	2008-05-06 16:16:37 UTC (rev 4151)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/SessionCreateConsumerMessageCodec.java	2008-05-06 18:00:33 UTC (rev 4152)
@@ -9,8 +9,10 @@
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.SESS_CREATECONSUMER;
 import static org.jboss.messaging.util.DataConstants.SIZE_BOOLEAN;
 import static org.jboss.messaging.util.DataConstants.SIZE_INT;
+import static org.jboss.messaging.util.DataConstants.SIZE_LONG;
 
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
+import org.jboss.messaging.util.DataConstants;
 import org.jboss.messaging.util.SimpleString;
 
 /**
@@ -39,7 +41,7 @@
 
    public int getBodyLength(final SessionCreateConsumerMessage packet) throws Exception
    {   	
-   	int bodyLength = SimpleString.sizeofString(packet.getQueueName()) +
+   	int bodyLength = SIZE_LONG + SimpleString.sizeofString(packet.getQueueName()) +
    	SimpleString.sizeofNullableString(packet.getFilterString()) + 2 * SIZE_BOOLEAN + 2 * SIZE_INT;
    	
    	return bodyLength;
@@ -55,6 +57,7 @@
       int windowSize = request.getWindowSize();
       int maxRate = request.getMaxRate();
 
+      out.putLong(request.getClientTargetID());
       out.putSimpleString(queueName);
       out.putNullableSimpleString(filterString);
       out.putBoolean(noLocal);
@@ -67,6 +70,7 @@
    protected SessionCreateConsumerMessage decodeBody(final RemotingBuffer in)
          throws Exception
    {
+      long clientTargetID = in.getLong();
       SimpleString queueName = in.getSimpleString();
       SimpleString filterString = in.getNullableSimpleString();
       boolean noLocal = in.getBoolean();
@@ -74,7 +78,7 @@
       int windowSize = in.getInt();
       int maxRate = in.getInt();
  
-      return new SessionCreateConsumerMessage(queueName, filterString, noLocal, autoDelete, windowSize, maxRate);
+      return new SessionCreateConsumerMessage(clientTargetID, queueName, filterString, noLocal, autoDelete, windowSize, maxRate);
    }
 
    // Package protected ---------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/SessionCreateProducerMessageCodec.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/SessionCreateProducerMessageCodec.java	2008-05-06 16:16:37 UTC (rev 4151)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/SessionCreateProducerMessageCodec.java	2008-05-06 18:00:33 UTC (rev 4152)
@@ -8,9 +8,9 @@
 
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.SESS_CREATEPRODUCER;
 import static org.jboss.messaging.util.DataConstants.SIZE_INT;
+import static org.jboss.messaging.util.DataConstants.SIZE_LONG;
 
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerMessage;
-import org.jboss.messaging.util.DataConstants;
 import org.jboss.messaging.util.SimpleString;
 
 /**
@@ -44,7 +44,7 @@
    {   	
    	SimpleString address = packet.getAddress();
       
-      int bodyLength = SimpleString.sizeofNullableString(address) + 2 * SIZE_INT;
+      int bodyLength = SIZE_LONG + SimpleString.sizeofNullableString(address) + 2 * SIZE_INT;
       
       return bodyLength;
    }
@@ -54,6 +54,7 @@
    {
       SimpleString address = request.getAddress();
      
+      out.putLong(request.getClientTargetID());
       out.putNullableSimpleString(address);
       out.putInt(request.getWindowSize());
       out.putInt(request.getMaxRate());
@@ -63,13 +64,15 @@
    protected SessionCreateProducerMessage decodeBody(final RemotingBuffer in)
          throws Exception
    {
+      long clientTargetID = in.getLong();
+      
       SimpleString address = in.getNullableSimpleString();
       
       int windowSize = in.getInt();
       
       int maxRate = in.getInt();
 
-      return new SessionCreateProducerMessage(address, windowSize, maxRate);
+      return new SessionCreateProducerMessage(clientTargetID, address, windowSize, maxRate);
    }
 
    // Package protected ---------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/INVMSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/INVMSession.java	2008-05-06 16:16:37 UTC (rev 4151)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/INVMSession.java	2008-05-06 18:00:33 UTC (rev 4152)
@@ -69,11 +69,11 @@
       return connected;
    }
 
-   public void write(final Object object) throws Exception
+   public void write(final Packet packet) throws Exception
    {
-      assert object instanceof Packet;
+     // assert packet instanceof Packet;
 
-      serverDispatcher.dispatch((Packet) object,
+      serverDispatcher.dispatch((Packet) packet,
             new PacketSender()
             {
                public void send(Packet response) throws Exception
@@ -94,55 +94,55 @@
             });
    }
 
-   public Object writeAndBlock(final Packet request, long timeout, TimeUnit timeUnit) throws Exception
-   {
-      request.setCorrelationID(correlationCounter++);
-      final Packet[] responses = new Packet[1];
+//   public Object writeAndBlock(final Packet request, long timeout, TimeUnit timeUnit) throws Exception
+//   {
+//      request.setCorrelationID(correlationCounter++);
+//      final Packet[] responses = new Packet[1];
+//
+//      serverDispatcher.dispatch(request,
+//            new PacketSender()
+//            {
+//               public void send(Packet response)
+//               {
+//                  try
+//                  {
+//                     serverDispatcher.callFilters(response);
+//                     // 1st response is used to reply to the blocking request
+//                     if (responses[0] == null)
+//                     {
+//                        responses[0] = response;
+//                     } else 
+//                     // other later responses are dispatched directly to the client
+//                     {
+//                        clientDispatcher.dispatch(response, null);
+//                     }
+//                  }
+//                  catch (Exception e)
+//                  {
+//                     log.warn("An interceptor throwed an exception what caused the packet " + response + " to be ignored", e);
+//                     responses[0] = null;
+//                  }
+//               }
+//
+//               public long getSessionID()
+//               {
+//                  return getID();
+//               }
+//               
+//               public String getRemoteAddress()
+//               {
+//                  return "invm";
+//               }
+//            });
+//
+//      if (responses[0] == null)
+//      {
+//         throw new IllegalStateException("No response received for request " + request);
+//      }
+//
+//      return responses[0];
+//   }
 
-      serverDispatcher.dispatch(request,
-            new PacketSender()
-            {
-               public void send(Packet response)
-               {
-                  try
-                  {
-                     serverDispatcher.callFilters(response);
-                     // 1st response is used to reply to the blocking request
-                     if (responses[0] == null)
-                     {
-                        responses[0] = response;
-                     } else 
-                     // other later responses are dispatched directly to the client
-                     {
-                        clientDispatcher.dispatch(response, null);
-                     }
-                  }
-                  catch (Exception e)
-                  {
-                     log.warn("An interceptor throwed an exception what caused the packet " + response + " to be ignored", e);
-                     responses[0] = null;
-                  }
-               }
-
-               public long getSessionID()
-               {
-                  return getID();
-               }
-               
-               public String getRemoteAddress()
-               {
-                  return "invm";
-               }
-            });
-
-      if (responses[0] == null)
-      {
-         throw new IllegalStateException("No response received for request " + request);
-      }
-
-      return responses[0];
-   }
-
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java	2008-05-06 16:16:37 UTC (rev 4151)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java	2008-05-06 18:00:33 UTC (rev 4152)
@@ -124,19 +124,5 @@
       filterChain.addLast("logger", filter);
    }
 
-   static ScheduledExecutorService addBlockingRequestResponseFilter(
-         final DefaultIoFilterChainBuilder filterChain)
-   {
-      assert filterChain != null;
-
-      ScheduledExecutorService executorService = Executors
-            .newScheduledThreadPool(1);
-      RequestResponseFilter filter = new RequestResponseFilter(
-            new MinaInspector(), executorService);
-      filterChain.addLast("reqres", filter);
-
-      return executorService;
-   }
-
    // Inner classes -------------------------------------------------
 }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java	2008-05-06 16:16:37 UTC (rev 4151)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java	2008-05-06 18:00:33 UTC (rev 4152)
@@ -6,11 +6,8 @@
  */
 package org.jboss.messaging.core.remoting.impl.mina;
 
-import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addBlockingRequestResponseFilter;
 import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addCodecFilter;
 import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addKeepAliveFilter;
-import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addLoggingFilter;
-import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addMDCFilter;
 import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addSSLFilter;
 
 import java.io.IOException;
@@ -19,7 +16,6 @@
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.mina.common.CloseFuture;
 import org.apache.mina.common.ConnectFuture;
@@ -31,8 +27,8 @@
 import org.apache.mina.filter.ssl.SslFilter;
 import org.apache.mina.transport.socket.nio.NioSocketConnector;
 import org.jboss.messaging.core.client.ConnectionParams;
-import org.jboss.messaging.core.client.RemotingSessionListener;
 import org.jboss.messaging.core.client.Location;
+import org.jboss.messaging.core.client.RemotingSessionListener;
 import org.jboss.messaging.core.client.impl.ConnectionParamsImpl;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
@@ -65,8 +61,6 @@
 
    private PacketDispatcher dispatcher;
 
-   private ScheduledExecutorService blockingScheduler;
-
    private ExecutorService threadPool;
    
    private IoSession session;
@@ -125,7 +119,6 @@
       }
       addCodecFilter(filterChain);
       // addLoggingFilter(filterChain);
-      blockingScheduler = addBlockingRequestResponseFilter(filterChain);
       addKeepAliveFilter(filterChain, keepAliveFactory, connectionParams.getKeepAliveInterval(),
             connectionParams.getKeepAliveTimeout(), this);
       connector.getSessionConfig().setTcpNoDelay(connectionParams.isTcpNoDelay());
@@ -182,7 +175,6 @@
       CloseFuture closeFuture = session.close().awaitUninterruptibly();
       boolean closed = closeFuture.isClosed();
       
-      blockingScheduler.shutdown();
       connector.removeListener(ioListener);
       connector.dispose();
       threadPool.shutdown();
@@ -204,7 +196,6 @@
       }
       
       connector = null;
-      blockingScheduler = null;
       session = null;
 
       return closed;

Deleted: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaInspector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaInspector.java	2008-05-06 16:16:37 UTC (rev 4151)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaInspector.java	2008-05-06 18:00:33 UTC (rev 4152)
@@ -1,71 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.messaging.core.remoting.impl.mina;
-
-import static org.apache.mina.filter.reqres.ResponseType.WHOLE;
-
-import org.apache.mina.filter.reqres.ResponseInspector;
-import org.apache.mina.filter.reqres.ResponseType;
-import org.jboss.messaging.core.remoting.Packet;
-
-/**
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
- * 
- * @version <tt>$Revision$</tt>
- */
-public class MinaInspector implements ResponseInspector
-{
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-
-   // ResponseInspector implementation ------------------------------
-
-   public Object getRequestId(Object message)
-   {
-      if (!(message instanceof Packet))
-      {
-         return null;
-      }
-      Packet packet = (Packet) message;
-      long id = packet.getCorrelationID();
-      
-      if (id == -1)
-      {
-      	return null;
-      }
-      else
-      {
-      	return id;
-      }
-   }
-
-   public ResponseType getResponseType(Object message)
-   {
-      if (!(message instanceof Packet))
-      {
-         return null;
-      }
-
-      return WHOLE;
-   }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-
-}
\ No newline at end of file

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java	2008-05-06 16:16:37 UTC (rev 4151)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java	2008-05-06 18:00:33 UTC (rev 4152)
@@ -11,8 +11,6 @@
 import static org.jboss.messaging.core.remoting.impl.RemotingConfigurationValidator.validate;
 import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addCodecFilter;
 import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addKeepAliveFilter;
-import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addLoggingFilter;
-import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addMDCFilter;
 import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addSSLFilter;
 
 import java.net.InetSocketAddress;
@@ -38,7 +36,6 @@
 import org.jboss.messaging.core.remoting.PacketDispatcher;
 import org.jboss.messaging.core.remoting.RemotingService;
 import org.jboss.messaging.core.remoting.impl.PacketDispatcherImpl;
-import org.jboss.messaging.core.server.ConnectionManager;
 
 /**
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -171,10 +168,12 @@
          acceptor.addListener(acceptorListener);
       }
       
-      boolean disableInvm = config.isInvmDisabled();
-      if (log.isDebugEnabled())
-         log.debug("invm optimization for remoting is " + (disableInvm ? "disabled" : "enabled"));
-      if (!disableInvm)
+//      boolean disableInvm = config.isInvmDisabled();
+//      if (log.isDebugEnabled())
+//         log.debug("invm optimization for remoting is " + (disableInvm ? "disabled" : "enabled"));
+     // if (!disableInvm)
+      
+      log.info("Registering:" + config.getLocation());
          REGISTRY.register(config.getLocation(), dispatcher);
 
       started = true;

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaSession.java	2008-05-06 16:16:37 UTC (rev 4151)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaSession.java	2008-05-06 18:00:33 UTC (rev 4152)
@@ -6,12 +6,7 @@
  */
 package org.jboss.messaging.core.remoting.impl.mina;
 
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.mina.common.IoSession;
-import org.apache.mina.filter.reqres.Request;
-import org.apache.mina.filter.reqres.Response;
 import org.jboss.messaging.core.remoting.NIOSession;
 import org.jboss.messaging.core.remoting.Packet;
 
@@ -29,7 +24,7 @@
 
    private final IoSession session;
 
-   private AtomicLong correlationCounter;
+   //private AtomicLong correlationCounter;
    
    // Static --------------------------------------------------------
 
@@ -40,7 +35,7 @@
       assert session != null;
 
       this.session = session;
-      correlationCounter = new AtomicLong(0);
+     // correlationCounter = new AtomicLong(0);
    }
 
    // Public --------------------------------------------------------
@@ -50,20 +45,11 @@
       return session.getId();
    }
 
-   public void write(Object object)
+   public void write(Packet packet)
    {
-      session.write(object);
+      session.write(packet);
    }
 
-   public Object writeAndBlock(Packet packet, long timeout, TimeUnit timeUnit) throws Exception
-   {
-      packet.setCorrelationID(correlationCounter.incrementAndGet());
-      Request req = new Request(packet.getCorrelationID(), packet, timeout, timeUnit);
-      session.write(req);
-      Response response = req.awaitResponse();
-      return response.getMessage();
-   }
-
    public boolean isConnected()
    {
       return session.isConnected();

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java	2008-05-06 16:16:37 UTC (rev 4151)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java	2008-05-06 18:00:33 UTC (rev 4152)
@@ -17,7 +17,7 @@
 {
 	// Constants -----------------------------------------------------
 
-	private long correlationID = NO_ID_SET;
+	private long responseTargetID = NO_ID_SET;
 
 	private long targetID = NO_ID_SET;
 
@@ -43,14 +43,14 @@
 		return type;
 	}
 
-	public void setCorrelationID(long correlationID)
+	public void setResponseTargetID(long responseTargetID)
 	{
-		this.correlationID = correlationID;
+		this.responseTargetID = responseTargetID;
 	}
 
-	public long getCorrelationID()
+	public long getResponseTargetID()
 	{
-		return correlationID;
+		return responseTargetID;
 	}
 
 	public long getTargetID()
@@ -77,7 +77,7 @@
 	{
 		assert other != null;
 
-		setCorrelationID(other.getCorrelationID());
+		setTargetID(other.getResponseTargetID());
 	}
 
 	/**
@@ -85,7 +85,7 @@
 	 */
 	 public boolean isRequest()
 	 {
-		 return targetID != NO_ID_SET && correlationID != NO_ID_SET;
+		 return targetID != NO_ID_SET && responseTargetID != NO_ID_SET;
 	 }
 
 	 @Override
@@ -99,7 +99,7 @@
 	 protected String getParentString()
 	 {
 		 return "PACKET[type=" + type
-		 + ", correlationID=" + correlationID + ", targetID=" + targetID
+		 + ", responseTargetID=" + responseTargetID + ", targetID=" + targetID
 		 + ", executorID=" + executorID;
 	 }
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerMessage.java	2008-05-06 16:16:37 UTC (rev 4151)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerMessage.java	2008-05-06 18:00:33 UTC (rev 4152)
@@ -20,6 +20,8 @@
 
    // Attributes ----------------------------------------------------
 
+   private final long clientTargetID;
+   
    private final SimpleString queueName;
    
    private final SimpleString filterString;
@@ -36,12 +38,13 @@
 
    // Constructors --------------------------------------------------
 
-   public SessionCreateConsumerMessage(final SimpleString queueName, final SimpleString filterString,
+   public SessionCreateConsumerMessage(final long clientTargetID, final SimpleString queueName, final SimpleString filterString,
    		                              final boolean noLocal, final boolean autoDeleteQueue,
    		                              final int windowSize, final int maxRate)
    {
       super(PacketType.SESS_CREATECONSUMER);
 
+      this.clientTargetID = clientTargetID;
       this.queueName = queueName;
       this.filterString = filterString;
       this.noLocal = noLocal;
@@ -66,6 +69,11 @@
       return buff.toString();
    }
 
+   public long getClientTargetID()
+   {
+      return clientTargetID;
+   }
+   
    public SimpleString getQueueName()
    {
       return queueName;

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerMessage.java	2008-05-06 16:16:37 UTC (rev 4151)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerMessage.java	2008-05-06 18:00:33 UTC (rev 4152)
@@ -20,6 +20,8 @@
 
    // Attributes ----------------------------------------------------
 
+   private final long clientTargetID;
+   
    private final SimpleString address;
    
    private final int windowSize;
@@ -30,10 +32,12 @@
 
    // Constructors --------------------------------------------------
 
-   public SessionCreateProducerMessage(final SimpleString address, final int windowSize, final int maxRate)
+   public SessionCreateProducerMessage(final long clientTargetID, final SimpleString address, final int windowSize, final int maxRate)
    {
       super(PacketType.SESS_CREATEPRODUCER);
 
+      this.clientTargetID = clientTargetID;
+      
       this.address = address;
       
       this.windowSize = windowSize;
@@ -53,6 +57,11 @@
       buff.append("]");
       return buff.toString();
    }
+   
+   public long getClientTargetID()
+   {
+      return clientTargetID;
+   }
 
    public SimpleString getAddress()
    {

Modified: trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java	2008-05-06 16:16:37 UTC (rev 4151)
+++ trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java	2008-05-06 18:00:33 UTC (rev 4152)
@@ -94,6 +94,4 @@
                                              int incrementVersion) throws Exception;
 
    DeploymentManager getDeploymentManager();
-   
-   ObjectIDGenerator getObjectIDGenerator();
 }

Deleted: trunk/src/main/org/jboss/messaging/core/server/ObjectIDGenerator.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ObjectIDGenerator.java	2008-05-06 16:16:37 UTC (rev 4151)
+++ trunk/src/main/org/jboss/messaging/core/server/ObjectIDGenerator.java	2008-05-06 18:00:33 UTC (rev 4152)
@@ -1,34 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.core.server;
-
-/**
- * 
- * A ObjectIDGenerator
- * 
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- */
-public interface ObjectIDGenerator
-{
-	long generateID();
-}

Modified: trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java	2008-05-06 16:16:37 UTC (rev 4151)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java	2008-05-06 18:00:33 UTC (rev 4152)
@@ -33,6 +33,8 @@
 {
 	long getID();
 	
+	long getClientTargetID();
+	
 	void close() throws Exception;
 	
 	void setStarted(boolean started) throws Exception;

Modified: trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerSession.java	2008-05-06 16:16:37 UTC (rev 4151)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerSession.java	2008-05-06 18:00:33 UTC (rev 4152)
@@ -105,10 +105,10 @@
 
    void deleteQueue(SimpleString queueName) throws Exception;
 
-   SessionCreateConsumerResponseMessage createConsumer(SimpleString queueName, SimpleString filterString, boolean noLocal,
+   SessionCreateConsumerResponseMessage createConsumer(long clientTargetID, SimpleString queueName, SimpleString filterString, boolean noLocal,
    		                                              boolean autoDeleteQueue, int windowSize, int maxRate) throws Exception;
    
-   SessionCreateProducerResponseMessage createProducer(SimpleString address, int windowSize, int maxRate) throws Exception;   
+   SessionCreateProducerResponseMessage createProducer(long clientTargetID, SimpleString address, int windowSize, int maxRate) throws Exception;   
 
    SessionQueueQueryResponseMessage executeQueueQuery(SessionQueueQueryMessage request) throws Exception;
 

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-05-06 16:16:37 UTC (rev 4151)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-05-06 18:00:33 UTC (rev 4152)
@@ -33,6 +33,7 @@
 import org.jboss.messaging.core.deployers.impl.FileDeploymentManager;
 import org.jboss.messaging.core.deployers.impl.QueueSettingsDeployer;
 import org.jboss.messaging.core.deployers.impl.SecurityDeployer;
+import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.memory.MemoryManager;
 import org.jboss.messaging.core.memory.impl.SimpleMemoryManager;
 import org.jboss.messaging.core.persistence.StorageManager;
@@ -51,7 +52,6 @@
 import org.jboss.messaging.core.security.impl.SecurityStoreImpl;
 import org.jboss.messaging.core.server.ConnectionManager;
 import org.jboss.messaging.core.server.MessagingServer;
-import org.jboss.messaging.core.server.ObjectIDGenerator;
 import org.jboss.messaging.core.server.QueueFactory;
 import org.jboss.messaging.core.server.ServerConnection;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
@@ -60,7 +60,6 @@
 import org.jboss.messaging.core.transaction.ResourceManager;
 import org.jboss.messaging.core.transaction.impl.ResourceManagerImpl;
 import org.jboss.messaging.core.version.Version;
-import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.util.VersionLoader;
 
 /**
@@ -99,7 +98,6 @@
    private Deployer queueSettingsDeployer;
    private JBMSecurityManager securityManager = new JBMSecurityManagerImpl(true);
    private DeploymentManager deploymentManager = new FileDeploymentManager();
-   private ObjectIDGenerator objectIDGenerator = new ObjectIDGeneratorImpl();
 
    // plugins
 
@@ -336,25 +334,21 @@
       // security my be screwed up, on account of thread local security stack being corrupted.
 
       securityStore.authenticate(username, password);
+      
+      long id = remotingService.getDispatcher().generateID();
 
       final ServerConnection connection =
-         new ServerConnectionImpl(username, password,
+         new ServerConnectionImpl(id, username, password,
                           remotingClientSessionID, clientAddress,
                           remotingService.getDispatcher(), resourceManager, storageManager,
                           queueSettingsRepository,
-                          postOffice, securityStore, connectionManager,
-                          objectIDGenerator);
+                          postOffice, securityStore, connectionManager);
 
       remotingService.getDispatcher().register(new ServerConnectionPacketHandler(connection));
 
       return new CreateConnectionResponse(connection.getID(), version);
    }
    
-   public ObjectIDGenerator getObjectIDGenerator()
-   {
-   	return this.objectIDGenerator;
-   }
-   
    // Public ---------------------------------------------------------------------------------------
 
    // Package protected ----------------------------------------------------------------------------

Deleted: trunk/src/main/org/jboss/messaging/core/server/impl/ObjectIDGeneratorImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ObjectIDGeneratorImpl.java	2008-05-06 16:16:37 UTC (rev 4151)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ObjectIDGeneratorImpl.java	2008-05-06 18:00:33 UTC (rev 4152)
@@ -1,52 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.core.server.impl;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.jboss.messaging.core.server.ObjectIDGenerator;
-
-/**
- * 
- * A ObjectIDGeneratorImpl
- * 
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- */
-public class ObjectIDGeneratorImpl implements ObjectIDGenerator
-{
-   private AtomicLong objectIDGenerator = new AtomicLong(1);
-	
-   public long generateID()
-   {
-   	long id = objectIDGenerator.getAndIncrement();
-   	
-   	if (id == 0)
-   	{
-   		//ID 0 is reserved for the connection factory handler
-   		id = generateID();
-   	}
-   	
-   	return id;
-   }
-
-}

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerBrowserImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerBrowserImpl.java	2008-05-06 16:16:37 UTC (rev 4151)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerBrowserImpl.java	2008-05-06 18:00:33 UTC (rev 4152)
@@ -230,7 +230,7 @@
          }
 
          // reply if necessary
-         if (response == null && packet.getCorrelationID() != Packet.NO_ID_SET)
+         if (response == null && packet.getResponseTargetID() != Packet.NO_ID_SET)
          {
             response = new PacketImpl(NULL);               
          }            

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionImpl.java	2008-05-06 16:16:37 UTC (rev 4151)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionImpl.java	2008-05-06 18:00:33 UTC (rev 4152)
@@ -34,7 +34,6 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.ConnectionCreateSessionResponseMessage;
 import org.jboss.messaging.core.security.SecurityStore;
 import org.jboss.messaging.core.server.ConnectionManager;
-import org.jboss.messaging.core.server.ObjectIDGenerator;
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.ServerConnection;
 import org.jboss.messaging.core.server.ServerSession;
@@ -90,8 +89,6 @@
    private final SecurityStore securityStore;
    
    private final ConnectionManager connectionManager;
-   
-   private final ObjectIDGenerator objectIDGenerator;
 
    private final long createdTime;
          
@@ -106,7 +103,7 @@
 
    // Constructors ---------------------------------------------------------------------------------
       
-   public ServerConnectionImpl(final String username, final String password,
+   public ServerConnectionImpl(final long id, final String username, final String password,
    		                      final long remotingClientSessionID,
    		                      final String clientAddress,
    		                      final PacketDispatcher dispatcher,
@@ -114,10 +111,9 @@
    		                      final StorageManager persistenceManager,
    		                      final HierarchicalRepository<QueueSettings> queueSettingsRepository,
    		                      final PostOffice postOffice, final SecurityStore securityStore,
-   		                      final ConnectionManager connectionManager,
-   		                      final ObjectIDGenerator objectIDGenerator)
+   		                      final ConnectionManager connectionManager)
    {
-   	this.id = objectIDGenerator.generateID();
+   	this.id = id;
       
    	this.username = username;
       
@@ -141,8 +137,6 @@
       
       this.connectionManager = connectionManager;
       
-      this.objectIDGenerator = objectIDGenerator;
-      
       started = false;
       
       createdTime = System.currentTimeMillis();
@@ -161,10 +155,10 @@
    		                                                      final boolean autoCommitAcks,
                                                                final PacketSender sender) throws Exception
    {           
+      long id = dispatcher.generateID();
       ServerSession session =
-         new ServerSessionImpl(autoCommitSends, autoCommitAcks, xa, this, resourceManager,
-         		sender, dispatcher, persistenceManager, queueSettingsRepository, postOffice, securityStore,
-         		objectIDGenerator);
+         new ServerSessionImpl(id, autoCommitSends, autoCommitAcks, xa, this, resourceManager,
+         		sender, dispatcher, persistenceManager, queueSettingsRepository, postOffice, securityStore);
 
       sessions.add(session);
       

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionPacketHandler.java	2008-05-06 16:16:37 UTC (rev 4151)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionPacketHandler.java	2008-05-06 18:00:33 UTC (rev 4152)
@@ -80,7 +80,7 @@
       }
 
       // reply if necessary
-      if (response == null && packet.getCorrelationID() != Packet.NO_ID_SET)
+      if (response == null && packet.getResponseTargetID() != Packet.NO_ID_SET)
       {
          response = new PacketImpl(NULL);               
       }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-05-06 16:16:37 UTC (rev 4151)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-05-06 18:00:33 UTC (rev 4152)
@@ -66,6 +66,8 @@
    private final boolean trace = log.isTraceEnabled();
 
    private final long id;
+   
+   private final long clientTargetID;
 
    private final Queue messageQueue;
    
@@ -95,7 +97,7 @@
 
    // Constructors ---------------------------------------------------------------------------------
  
-   ServerConsumerImpl(final long id, final Queue messageQueue, final boolean noLocal, final Filter filter,
+   ServerConsumerImpl(final long id, final long clientTargetID, final Queue messageQueue, final boolean noLocal, final Filter filter,
    		             final boolean autoDeleteQueue, final boolean enableFlowControl, final int maxRate,
    		             final long connectionID, final ServerSession sessionEndpoint,
 					       final StorageManager persistenceManager,
@@ -104,6 +106,8 @@
 					       final boolean started)
    {
    	this.id = id;
+   	
+   	this.clientTargetID = clientTargetID;
       
       this.messageQueue = messageQueue;
       
@@ -153,6 +157,11 @@
    	return id;
    }
    
+   public long getClientTargetID()
+   {
+      return clientTargetID;
+   }
+   
    public HandleStatus handle(MessageReference ref) throws Exception
    {
       if (availableTokens != null && availableTokens.get() == 0)

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerPacketHandler.java	2008-05-06 16:16:37 UTC (rev 4151)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerPacketHandler.java	2008-05-06 18:00:33 UTC (rev 4152)
@@ -73,7 +73,7 @@
       }
 
       // reply if necessary
-      if (response == null && packet.getCorrelationID() != Packet.NO_ID_SET)
+      if (response == null && packet.getResponseTargetID() != Packet.NO_ID_SET)
       {
          response = new PacketImpl(NULL);               
       }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerPacketHandlerSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerPacketHandlerSupport.java	2008-05-06 16:16:37 UTC (rev 4151)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerPacketHandlerSupport.java	2008-05-06 18:00:33 UTC (rev 4152)
@@ -66,7 +66,7 @@
       }
       
       // reply if necessary
-      if (response != null && packet.getCorrelationID() != Packet.NO_ID_SET)
+      if (response != null && packet.getResponseTargetID() != Packet.NO_ID_SET)
       {
          response.normalize(packet);
          

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java	2008-05-06 16:16:37 UTC (rev 4151)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java	2008-05-06 18:00:33 UTC (rev 4152)
@@ -44,6 +44,8 @@
 	
 	private final long id;
 	
+	private final long clientTargetID;
+	
 	private final ServerSession session;
 	
 	private final SimpleString address;
@@ -56,11 +58,13 @@
 	
 	// Constructors ----------------------------------------------------------------
 	
-	public ServerProducerImpl(final long id, final ServerSession session, final SimpleString address, 
+	public ServerProducerImpl(final long id, final long clientTargetID, final ServerSession session, final SimpleString address, 
 			                    final PacketSender sender,
 			                    final FlowController flowController) throws Exception
 	{
 		this.id = id;
+		
+		this.clientTargetID = clientTargetID;
       
 		this.session = session;
 		
@@ -106,7 +110,7 @@
 	{
 		Packet packet = new ProducerReceiveTokensMessage(1);
 		
-		packet.setTargetID(id);
+		packet.setTargetID(clientTargetID);
 		packet.setExecutorID(session.getID());
 		sender.send(packet);		
 	}

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerPacketHandler.java	2008-05-06 16:16:37 UTC (rev 4151)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerPacketHandler.java	2008-05-06 18:00:33 UTC (rev 4152)
@@ -72,7 +72,7 @@
       }
 
       // reply if necessary
-      if (response == null && packet.getCorrelationID() != Packet.NO_ID_SET)
+      if (response == null && packet.getResponseTargetID() != Packet.NO_ID_SET)
       {
          response = new PacketImpl(NULL);               
       }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-05-06 16:16:37 UTC (rev 4151)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-05-06 18:00:33 UTC (rev 4152)
@@ -58,7 +58,6 @@
 import org.jboss.messaging.core.security.SecurityStore;
 import org.jboss.messaging.core.server.Delivery;
 import org.jboss.messaging.core.server.HandleStatus;
-import org.jboss.messaging.core.server.ObjectIDGenerator;
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.ServerConnection;
 import org.jboss.messaging.core.server.ServerConsumer;
@@ -123,8 +122,6 @@
 
    private final SecurityStore securityStore;
 
-   private final ObjectIDGenerator objectIDGenerator;
-
    private final Set<ServerConsumer> consumers = new ConcurrentHashSet<ServerConsumer>();
 
    private final Set<ServerBrowserImpl> browsers = new ConcurrentHashSet<ServerBrowserImpl>();
@@ -144,16 +141,15 @@
    // Constructors
    // ---------------------------------------------------------------------------------
 
-   public ServerSessionImpl(final boolean autoCommitSends,
+   public ServerSessionImpl(final long id, final boolean autoCommitSends,
                             final boolean autoCommitAcks,
                             final boolean xa, final ServerConnection connection,
                             final ResourceManager resourceManager, final PacketSender sender,
                             final PacketDispatcher dispatcher, final StorageManager persistenceManager,
                             final HierarchicalRepository<QueueSettings> queueSettingsRepository,
-                            final PostOffice postOffice, final SecurityStore securityStore,
-                            final ObjectIDGenerator objectIDGenerator) throws Exception
+                            final PostOffice postOffice, final SecurityStore securityStore) throws Exception
    {
-   	this.id = objectIDGenerator.generateID();
+   	this.id = id;
 
       this.autoCommitSends = autoCommitSends;
 
@@ -180,8 +176,6 @@
 
       this.securityStore = securityStore;
 
-      this.objectIDGenerator = objectIDGenerator;
-
       if (log.isTraceEnabled())
       {
          log.trace("created server session endpoint for " + sender.getRemoteAddress());
@@ -238,7 +232,7 @@
       {
          return HandleStatus.BUSY;
       }
-      Delivery delivery = new DeliveryImpl(ref, id, consumer.getID(), deliveryIDSequence++, sender);
+      Delivery delivery = new DeliveryImpl(ref, id, consumer.getClientTargetID(), deliveryIDSequence++, sender);
 
       deliveries.add(delivery);
 
@@ -918,7 +912,7 @@
       }
    }
 
-   public SessionCreateConsumerResponseMessage createConsumer(final SimpleString queueName, final SimpleString filterString,
+   public SessionCreateConsumerResponseMessage createConsumer(final long clientTargetID, final SimpleString queueName, final SimpleString filterString,
                                                               final boolean noLocal, final boolean autoDeleteQueue,
                                                               int windowSize, int maxRate) throws Exception
    {
@@ -948,8 +942,10 @@
 
       maxRate = queueMaxRate != null ? queueMaxRate : maxRate;
 
+      long id = dispatcher.generateID();
+      
       ServerConsumer consumer =
-      	new ServerConsumerImpl(objectIDGenerator.generateID(), binding.getQueue(), noLocal, filter, autoDeleteQueue, windowSize != -1, maxRate, connection.getID(),
+      	new ServerConsumerImpl(id, clientTargetID, binding.getQueue(), noLocal, filter, autoDeleteQueue, windowSize != -1, maxRate, connection.getID(),
                                 this, persistenceManager, queueSettingsRepository, postOffice, connection.isStarted());
 
       dispatcher.register(new ServerConsumerPacketHandler(consumer));
@@ -1029,8 +1025,9 @@
 
       securityStore.check(binding.getAddress().toString(), CheckType.READ, connection);
 
-      ServerBrowserImpl browser = new ServerBrowserImpl(objectIDGenerator.generateID(),
-            this, binding.getQueue(), filterString == null ? null : filterString.toString());
+      long id = dispatcher.generateID();
+      
+      ServerBrowserImpl browser = new ServerBrowserImpl(id, this, binding.getQueue(), filterString == null ? null : filterString.toString());
 
       browsers.add(browser);
 
@@ -1048,7 +1045,7 @@
     * is set and there are not sufficient empty spaces in the queue, or it is overridden by any producer-window_size
     * specified on the queue
     */
-   public SessionCreateProducerResponseMessage createProducer(final SimpleString address, final int windowSize,
+   public SessionCreateProducerResponseMessage createProducer(final long clientTargetID, final SimpleString address, final int windowSize,
    		                                                     final int maxRate) throws Exception
    {
    	FlowController flowController = null;
@@ -1059,8 +1056,10 @@
    	{
    		flowController = windowSize == -1 ? null : postOffice.getFlowController(address);
    	}
+   	
+   	long id = dispatcher.generateID();
 
-   	ServerProducerImpl producer = new ServerProducerImpl(objectIDGenerator.generateID(), this, address, sender, flowController);
+   	ServerProducerImpl producer = new ServerProducerImpl(id, clientTargetID, this, address, sender, flowController);
 
    	producers.add(producer);
 

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2008-05-06 16:16:37 UTC (rev 4151)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2008-05-06 18:00:33 UTC (rev 4152)
@@ -94,7 +94,7 @@
       {
          SessionCreateConsumerMessage request = (SessionCreateConsumerMessage) packet;
          
-         response = session.createConsumer(request.getQueueName(), request.getFilterString(),
+         response = session.createConsumer(request.getClientTargetID(), request.getQueueName(), request.getFilterString(),
          		                            request.isNoLocal(), request.isAutoDeleteQueue(),
          		                            request.getWindowSize(), request.getMaxRate());
          break;
@@ -135,7 +135,7 @@
       case SESS_CREATEPRODUCER:
       {
          SessionCreateProducerMessage request = (SessionCreateProducerMessage) packet;
-         response = session.createProducer(request.getAddress(), request.getWindowSize(), request.getMaxRate());
+         response = session.createProducer(request.getClientTargetID(), request.getAddress(), request.getWindowSize(), request.getMaxRate());
          break;
       }
       case CLOSE:
@@ -241,7 +241,7 @@
       }
       
       // reply if necessary
-      if (response == null && packet.getCorrelationID() != Packet.NO_ID_SET)
+      if (response == null && packet.getResponseTargetID() != Packet.NO_ID_SET)
       {
          response = new PacketImpl(NULL);               
       }

Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/BrowserTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/BrowserTest.java	2008-05-06 16:16:37 UTC (rev 4151)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/BrowserTest.java	2008-05-06 18:00:33 UTC (rev 4152)
@@ -202,6 +202,8 @@
 
 			final int numMessages = 100;
 
+			log.info("****** sending messages");
+			
 			for (int i = 0; i < numMessages; i++)
 			{
 				Message m = session.createMessage();
@@ -209,7 +211,7 @@
 				producer.send(m);
 			}
 
-			log.trace("Sent messages");
+			log.info(" ****** Sent messages");
 
 			QueueBrowser browser = session.createBrowser(queue1, "test_counter > 30");
 

Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/DeliveryOrderTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/DeliveryOrderTest.java	2008-05-06 16:16:37 UTC (rev 4151)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/DeliveryOrderTest.java	2008-05-06 18:00:33 UTC (rev 4152)
@@ -81,12 +81,14 @@
             
             prod.send(tm);
             
+            log.info("sent message");
+            
             if (i % 10 == 0)
             {
                sess.commit();
             }
          }
-
+         
          // need extra commit for cases in which the last message index is not a multiple of 10
          sess.commit();
 

Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MiscellaneousTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MiscellaneousTest.java	2008-05-06 16:16:37 UTC (rev 4151)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MiscellaneousTest.java	2008-05-06 18:00:33 UTC (rev 4152)
@@ -246,6 +246,7 @@
             // close the connection on the same thread that processed the message
             try
             {
+               log.info("** closing");
                conn.close();
                result.setSuccess();
             }

Deleted: trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaInspectorTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaInspectorTest.java	2008-05-06 16:16:37 UTC (rev 4151)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaInspectorTest.java	2008-05-06 18:00:33 UTC (rev 4152)
@@ -1,109 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.messaging.tests.integration.core.remoting.mina;
-
-import static org.apache.mina.filter.reqres.ResponseType.WHOLE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.NULL;
-
-import java.util.UUID;
-
-import junit.framework.TestCase;
-
-import org.jboss.messaging.core.remoting.Packet;
-import org.jboss.messaging.core.remoting.impl.mina.MinaInspector;
-import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
-
-/**
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- * 
- * @version <tt>$Revision$</tt>
- * 
- */
-public class MinaInspectorTest extends TestCase
-{
-
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-
-   private MinaInspector inspector;
-
-   public void testGetRequestIdForNull()
-   {
-      assertNull(inspector.getRequestId(null));
-   }
-
-   public void testGetRequestIdForNotAbstractPacket()
-   {
-      assertNull(inspector.getRequestId(new Object()));
-   }
-
-   public void testGetRequestIdForAbstractPacketWhichIsNotRequest()
-   {
-      Packet packet = new PacketImpl(NULL);
-      packet.setTargetID(23);
-      assertFalse(packet.isRequest());     
-      assertNull(inspector.getRequestId(packet));
-   }
-
-   public void testGetRequestIdForAbstractPacketWhichIsRequest()
-   {
-      Packet packet = new PacketImpl(NULL);
-      packet.setTargetID(23);
-      packet.setCorrelationID(System.currentTimeMillis());
-      assertTrue(packet.isRequest());
-
-      Object requestID = inspector.getRequestId(packet);
-      assertNotNull(requestID);
-      assertEquals(packet.getCorrelationID(), requestID);
-   }
-
-   public void testGetResponseTypeForNull()
-   {
-      assertNull(inspector.getResponseType(null));
-   }
-
-   public void testGetResponseTypeForNotAbstractPacket()
-   {
-      assertNull(inspector.getResponseType(new Object()));
-   }
-
-   public void testGetResponseTypeForAbstractPacket()
-   {
-      Packet packet = new PacketImpl(NULL);
-
-      assertEquals(WHOLE, inspector.getResponseType(packet));
-   }
-
-   // TestCase overrides --------------------------------------------
-
-   @Override
-   protected void setUp() throws Exception
-   {
-      inspector = new MinaInspector();
-   }
-
-   @Override
-   protected void tearDown() throws Exception
-   {
-      inspector = null;
-   }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-}

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaSessionTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaSessionTest.java	2008-05-06 16:16:37 UTC (rev 4151)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaSessionTest.java	2008-05-06 18:00:33 UTC (rev 4152)
@@ -40,23 +40,7 @@
    // Constructors --------------------------------------------------
 
    // Public --------------------------------------------------------
-
-   public void testWriteAndBlockWithTimeout() throws Exception
-   {
-      serverPacketHandler.setSleepTime(1000, MILLISECONDS);
-
-      PacketImpl packet = new TextPacket("testSendBlockingWithTimeout");
-      packet.setTargetID(serverPacketHandler.getID());
-      
-      try
-      {
-         session.writeAndBlock(packet, 500, MILLISECONDS);
-         fail("a Throwable should be thrown");
-      } catch (Throwable t)
-      {
-      }
-   }
-   
+ 
    // ClientTestBase overrides --------------------------------------
    
    @Override

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/SessionTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/SessionTestBase.java	2008-05-06 16:16:37 UTC (rev 4151)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/SessionTestBase.java	2008-05-06 18:00:33 UTC (rev 4152)
@@ -109,37 +109,6 @@
       }
    }
 
-   
-   public void testWriteAndBlock() throws Exception
-   {
-      TextPacket request = new TextPacket("testSendBlocking");
-      request.setTargetID(serverPacketHandler.getID());
-
-      PacketImpl receivedPacket = (PacketImpl) session.writeAndBlock(request, REQRES_TIMEOUT, SECONDS);
-
-      assertNotNull(receivedPacket);
-      assertTrue(receivedPacket instanceof TextPacket);
-      TextPacket response = (TextPacket) receivedPacket;
-      assertEquals(reverse(request.getText()), response.getText());
-   }
-   
-   public void testCorrelationCounter() throws Exception
-   {
-      TextPacket request = new TextPacket("testSendBlocking");
-      request.setTargetID(serverPacketHandler.getID());
-
-      PacketImpl receivedPacket = (PacketImpl) session.writeAndBlock(request, REQRES_TIMEOUT, SECONDS);
-      long correlationID = request.getCorrelationID();
-      
-      assertNotNull(receivedPacket);      
-      assertEquals(request.getCorrelationID(), receivedPacket.getCorrelationID());
-      
-      receivedPacket = (PacketImpl) session.writeAndBlock(request, REQRES_TIMEOUT, SECONDS);
-      assertEquals(correlationID + 1, request.getCorrelationID());
-      assertEquals(correlationID + 1, receivedPacket.getCorrelationID());      
-   }
-
-   
    public void testClientHandlePacketSentByServer() throws Exception
    {
       TestPacketHandler clientHandler = new TestPacketHandler(generateID());

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/wireformat/PacketTypeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/wireformat/PacketTypeTest.java	2008-05-06 16:16:37 UTC (rev 4151)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/wireformat/PacketTypeTest.java	2008-05-06 18:00:33 UTC (rev 4152)
@@ -281,11 +281,11 @@
 
       assertEquals(buffer.get(), packet.getType().byteValue());
 
-      long correlationID = buffer.getLong();
+      long responseTargetID = buffer.getLong();
       long targetID = buffer.getLong();
       long executorID = buffer.getLong();
 
-      assertEquals(packet.getCorrelationID(), correlationID);
+      assertEquals(packet.getResponseTargetID(), responseTargetID);
       assertEquals(packet.getTargetID(), targetID);
       assertEquals(packet.getExecutorID(), executorID);
    }
@@ -366,7 +366,7 @@
    {
       Packet packet = new PacketImpl(NULL);
       long cid = randomLong();
-      packet.setCorrelationID(cid);
+      packet.setResponseTargetID(cid);
       packet.setTargetID(randomLong());
       packet.setExecutorID(randomLong());
       AbstractPacketCodec codec = PacketCodecFactory
@@ -376,7 +376,7 @@
       assertTrue(decodedPacket instanceof PacketImpl);
 
       assertEquals(NULL, decodedPacket.getType());
-      assertEquals(packet.getCorrelationID(), decodedPacket.getCorrelationID());
+      assertEquals(packet.getResponseTargetID(), decodedPacket.getResponseTargetID());
       assertEquals(packet.getTargetID(), decodedPacket.getTargetID());
       assertEquals(packet.getExecutorID(), decodedPacket.getExecutorID());
    }
@@ -393,7 +393,7 @@
       assertTrue(decodedPacket instanceof Ping);
       Ping decodedPing = (Ping) decodedPacket;
       assertEquals(PING, decodedPing.getType());
-      assertEquals(ping.getCorrelationID(), decodedPacket.getCorrelationID());
+      assertEquals(ping.getResponseTargetID(), decodedPacket.getResponseTargetID());
       assertEquals(ping.getTargetID(), decodedPacket.getTargetID());
       assertEquals(ping.getExecutorID(), decodedPacket.getExecutorID());
    }
@@ -547,12 +547,12 @@
    public void testSessionCreateConsumerMessage() throws Exception
    {
       SimpleString destination = new SimpleString("queue.SessionCreateConsumerMessage");
-      SessionCreateConsumerMessage request = new SessionCreateConsumerMessage(
+      SessionCreateConsumerMessage request = new SessionCreateConsumerMessage(randomLong(),
             destination, new SimpleString("color = 'red'"), false, false, randomInt(),
             randomInt());
       AbstractPacketCodec codec = new SessionCreateConsumerMessageCodec();
 
-      Packet decodedPacket = encodeAndCheckBytesAndDecode(request, codec,
+      Packet decodedPacket = encodeAndCheckBytesAndDecode(request, codec, request.getClientTargetID(),
             request.getQueueName(), new NullableStringHolder(request.getFilterString()), request
                   .isNoLocal(), request.isAutoDeleteQueue(), request
                   .getWindowSize(), request.getMaxRate());
@@ -560,6 +560,7 @@
       assertTrue(decodedPacket instanceof SessionCreateConsumerMessage);
       SessionCreateConsumerMessage decodedRequest = (SessionCreateConsumerMessage) decodedPacket;
       assertEquals(PacketType.SESS_CREATECONSUMER, decodedRequest.getType());
+      assertEquals(request.getClientTargetID(), decodedRequest.getClientTargetID());
       assertEquals(request.getQueueName(), decodedRequest.getQueueName());
       assertEquals(request.getFilterString(), decodedRequest.getFilterString());
       assertEquals(request.isNoLocal(), decodedRequest.isNoLocal());
@@ -594,16 +595,17 @@
       SimpleString destination = new SimpleString("queue.testSessionCreateProducerMessage");
       int windowSize = randomInt();
       int maxRate = randomInt();
-      SessionCreateProducerMessage request = new SessionCreateProducerMessage(
+      SessionCreateProducerMessage request = new SessionCreateProducerMessage(randomLong(),
             destination, windowSize, maxRate);
       AbstractPacketCodec codec = new SessionCreateProducerMessageCodec();
 
-      Packet decodedPacket = encodeAndCheckBytesAndDecode(request, codec,
-            request.getAddress(), request.getWindowSize(), request.getMaxRate());
+      Packet decodedPacket = encodeAndCheckBytesAndDecode(request, codec, request.getClientTargetID(),
+            new NullableStringHolder(request.getAddress()), request.getWindowSize(), request.getMaxRate());
 
       assertTrue(decodedPacket instanceof SessionCreateProducerMessage);
       SessionCreateProducerMessage decodedRequest = (SessionCreateProducerMessage) decodedPacket;
       assertEquals(SESS_CREATEPRODUCER, decodedRequest.getType());
+      assertEquals(request.getClientTargetID(), decodedRequest.getClientTargetID());
       assertEquals(request.getAddress(), decodedRequest.getAddress());
       assertEquals(request.getWindowSize(), decodedRequest.getWindowSize());
       assertEquals(request.getMaxRate(), decodedRequest.getMaxRate());

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/util/SpawnedVMSupport.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/util/SpawnedVMSupport.java	2008-05-06 16:16:37 UTC (rev 4151)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/util/SpawnedVMSupport.java	2008-05-06 18:00:33 UTC (rev 4152)
@@ -68,11 +68,11 @@
 
       String commandLine = sb.toString();
 
-      log.info("command line: " + commandLine);
+      log.trace("command line: " + commandLine);
 
       Process process = Runtime.getRuntime().exec(commandLine);
 
-      log.info("process: " + process);
+      log.trace("process: " + process);
 
       ProcessLogger outputLogger = new ProcessLogger(process.getInputStream(),
             className);




More information about the jboss-cvs-commits mailing list