[jboss-cvs] JBoss Messaging SVN: r4463 - in trunk: src/main/org/jboss/messaging/core/client/impl and 2 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Jun 13 08:56:57 EDT 2008


Author: timfox
Date: 2008-06-13 08:56:57 -0400 (Fri, 13 Jun 2008)
New Revision: 4463

Modified:
   trunk/src/main/org/jboss/messaging/core/client/ClientConsumer.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientBrowserImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MessagingIOSessionDataStructureFactory.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaAcceptor.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaSession.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientSessionImplTest.java
Log:
More tests and tweaks


Modified: trunk/src/main/org/jboss/messaging/core/client/ClientConsumer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientConsumer.java	2008-06-13 09:59:51 UTC (rev 4462)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientConsumer.java	2008-06-13 12:56:57 UTC (rev 4463)
@@ -27,5 +27,7 @@
    
    void close() throws MessagingException;
    
-   boolean isClosed();      
+   boolean isClosed();   
+   
+   boolean isDirect();
 }

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientBrowserImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientBrowserImpl.java	2008-06-13 09:59:51 UTC (rev 4462)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientBrowserImpl.java	2008-06-13 12:56:57 UTC (rev 4463)
@@ -56,10 +56,9 @@
 
    // Constructors ---------------------------------------------------------------------------------
 
-   public ClientBrowserImpl(final long serverTargetID, final ClientSessionInternal session,
-   		                   final RemotingConnection remotingConnection)
+   public ClientBrowserImpl(final ClientSessionInternal session, final long serverTargetID)
    {
-      this.remotingConnection = remotingConnection;
+      this.remotingConnection = session.getConnection().getRemotingConnection();
       
       this.serverTargetID = serverTargetID;
       

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-06-13 09:59:51 UTC (rev 4462)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-06-13 12:56:57 UTC (rev 4463)
@@ -75,6 +75,8 @@
    
    private final boolean direct;
    
+   private final Runner runner = new Runner();
+      
    private volatile Thread receiverThread;
    
    private volatile Thread onMessageThread;
@@ -101,9 +103,7 @@
     * (and its PacketDispatcher) to a single server.
     */
    public ClientConsumerImpl(final ClientSessionInternal session, final long targetID,
-                             final long clientTargetID,
-                             final ExecutorService sessionExecutor,
-                             final RemotingConnection remotingConnection,
+                             final long clientTargetID,                                                   
                              final int clientWindowSize,
                              final boolean direct)
    {
@@ -113,9 +113,9 @@
       
       this.session = session;
       
-      this.sessionExecutor = sessionExecutor;
+      this.sessionExecutor = session.getExecutorService();
       
-      this.remotingConnection = remotingConnection;
+      this.remotingConnection = session.getConnection().getRemotingConnection();
       
       this.clientWindowSize = clientWindowSize;
       
@@ -230,6 +230,8 @@
          throw new MessagingException(MessagingException.ILLEGAL_STATE,"Cannot set MessageHandler - consumer is in receive(...)");
       }
         
+      log.info("Setting handler");
+      
    	waitForOnMessageToComplete();   	
    	
       this.handler = handler;
@@ -285,6 +287,11 @@
    {
       return closed;
    }
+   
+   public boolean isDirect()
+   {
+      return direct;
+   }
 
    // ClientConsumerInternal implementation
    // --------------------------------------------------------------
@@ -388,6 +395,11 @@
    {
       return buffer.size();
    }
+   
+   public int getCreditsToSend()
+   {
+      return creditsToSend;
+   }
 
    // Public
    // ---------------------------------------------------------------------------------------
@@ -403,7 +415,7 @@
 
    private void queueExecutor()
    {
-      sessionExecutor.execute(new Runnable() { public void run() { callOnMessage(); } } );
+      sessionExecutor.execute(runner);
    }
    
    private void flowControl(final int messageBytes) throws MessagingException
@@ -509,5 +521,14 @@
    
    // Inner classes
    // --------------------------------------------------------------------------------
+   
+   private class Runner implements Runnable
+   {
+      public void run()
+      {
+         callOnMessage();
+      } 
+   }
+   
 
 }

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java	2008-06-13 09:59:51 UTC (rev 4462)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java	2008-06-13 12:56:57 UTC (rev 4463)
@@ -30,4 +30,6 @@
    long getIgnoreDeliveryMark();
    
    int getBufferSize();
+   
+   int getCreditsToSend();
 }

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-06-13 09:59:51 UTC (rev 4462)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-06-13 12:56:57 UTC (rev 4463)
@@ -104,7 +104,7 @@
    
    private final boolean cacheProducers;
    
-   private final ExecutorService executor;
+   private final ExecutorService executorService;
    
    private final RemotingConnection remotingConnection;         
    
@@ -163,7 +163,7 @@
       this.cacheProducers = cacheProducers;
       
       //TODO - we should use OrderedExecutorFactory and a pool here
-      executor = Executors.newSingleThreadExecutor();
+      executorService = Executors.newSingleThreadExecutor();
       
       this.xa = xa;
       
@@ -302,7 +302,7 @@
       }
       
       ClientConsumerInternal consumer =
-         new ClientConsumerImpl(this, response.getConsumerTargetID(), clientTargetID, executor, remotingConnection, clientWindowSize, direct);
+         new ClientConsumerImpl(this, response.getConsumerTargetID(), clientTargetID, clientWindowSize, direct);
 
       addConsumer(consumer);
       
@@ -330,7 +330,7 @@
 
       SessionCreateBrowserResponseMessage response = (SessionCreateBrowserResponseMessage)remotingConnection.sendBlocking(serverTargetID, serverTargetID, request);
 
-      ClientBrowser browser = new ClientBrowserImpl(response.getBrowserTargetID(), this, remotingConnection);  
+      ClientBrowser browser = new ClientBrowserImpl(this, response.getBrowserTargetID());  
 
       addBrowser(browser);
 
@@ -508,7 +508,7 @@
       }
       finally
       {
-      	executor.shutdown();
+      	executorService.shutdown();
       	
          connection.removeSession(this);
          
@@ -633,6 +633,11 @@
       return new HashMap<SimpleString, ClientProducerInternal>(producerCache);
    }
    
+   public ExecutorService getExecutorService()
+   {
+      return executorService;
+   }
+   
    // XAResource implementation --------------------------------------------------------------------
    
    public void commit(final Xid xid, final boolean onePhase) throws XAException

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java	2008-06-13 09:59:51 UTC (rev 4462)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java	2008-06-13 12:56:57 UTC (rev 4463)
@@ -8,6 +8,7 @@
 
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 
 import org.jboss.messaging.core.client.ClientBrowser;
 import org.jboss.messaging.core.client.ClientSession;
@@ -48,4 +49,6 @@
    Set<ClientBrowser> getBrowsers();
    
    Map<SimpleString, ClientProducerInternal> getProducerCache();
+   
+   ExecutorService getExecutorService();
 }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MessagingIOSessionDataStructureFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MessagingIOSessionDataStructureFactory.java	2008-06-13 09:59:51 UTC (rev 4462)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MessagingIOSessionDataStructureFactory.java	2008-06-13 12:56:57 UTC (rev 4463)
@@ -25,15 +25,12 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.IoSessionAttributeMap;
 import org.apache.mina.common.IoSessionDataStructureFactory;
 import org.apache.mina.common.WriteRequest;
 import org.apache.mina.common.WriteRequestQueue;
-import org.apache.mina.util.CircularQueue;
 
 /**
  * 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaAcceptor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaAcceptor.java	2008-06-13 09:59:51 UTC (rev 4462)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaAcceptor.java	2008-06-13 12:56:57 UTC (rev 4463)
@@ -21,11 +21,6 @@
    */
 package org.jboss.messaging.core.remoting.impl.mina;
 
-import org.apache.mina.common.*;
-import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
-import org.jboss.messaging.core.remoting.Acceptor;
-import org.jboss.messaging.core.remoting.CleanUpNotifier;
-import org.jboss.messaging.core.remoting.RemotingService;
 import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addCodecFilter;
 import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addSSLFilter;
 
@@ -33,6 +28,16 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
+import org.apache.mina.common.DefaultIoFilterChainBuilder;
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoService;
+import org.apache.mina.common.IoServiceListener;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
+import org.jboss.messaging.core.remoting.Acceptor;
+import org.jboss.messaging.core.remoting.CleanUpNotifier;
+import org.jboss.messaging.core.remoting.RemotingService;
+
 /**
  * A Mina TCP Acceptor that supports SSL
  *
@@ -141,7 +146,7 @@
          //register pinger
          if (remotingService.getConfiguration().getKeepAliveInterval() > 0)
          {
-            remotingService.registerPinger(new MinaSession(session, null));
+            remotingService.registerPinger(new MinaSession(session));
          }
       }
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java	2008-06-13 09:59:51 UTC (rev 4462)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java	2008-06-13 12:56:57 UTC (rev 4463)
@@ -6,7 +6,26 @@
  */
 package org.jboss.messaging.core.remoting.impl.mina;
 
-import org.apache.mina.common.*;
+import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addCodecFilter;
+import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addSSLFilter;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.mina.common.CloseFuture;
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.DefaultIoFilterChainBuilder;
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoService;
+import org.apache.mina.common.IoServiceListener;
+import org.apache.mina.common.IoSession;
 import org.apache.mina.filter.ssl.SslFilter;
 import org.apache.mina.transport.socket.nio.NioSocketConnector;
 import org.jboss.messaging.core.client.ConnectionParams;
@@ -17,19 +36,19 @@
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.ping.Pinger;
 import org.jboss.messaging.core.ping.impl.PingerImpl;
-import org.jboss.messaging.core.remoting.*;
+import org.jboss.messaging.core.remoting.CleanUpNotifier;
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.PacketDispatcher;
+import org.jboss.messaging.core.remoting.PacketHandler;
+import org.jboss.messaging.core.remoting.PacketReturner;
+import org.jboss.messaging.core.remoting.RemotingConnector;
+import org.jboss.messaging.core.remoting.RemotingSession;
+import org.jboss.messaging.core.remoting.ResponseHandler;
+import org.jboss.messaging.core.remoting.TransportType;
 import org.jboss.messaging.core.remoting.impl.ResponseHandlerImpl;
-import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addCodecFilter;
-import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addSSLFilter;
 import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
 import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
 
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.*;
-
 /**
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
  * @version <tt>$Revision$</tt>
@@ -130,7 +149,7 @@
    {
       if (session != null && session.isConnected())
       {
-         return new MinaSession(session, handler);
+         return new MinaSession(session);
       }
 
       threadPool = Executors.newCachedThreadPool();
@@ -151,7 +170,7 @@
       }
       session = future.getSession();
 
-      MinaSession minaSession = new MinaSession(session, handler);
+      MinaSession minaSession = new MinaSession(session);
       //register a handler for dealing with server pings
       dispatcher.register(new PacketHandler()
       {

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaSession.java	2008-06-13 09:59:51 UTC (rev 4462)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaSession.java	2008-06-13 12:56:57 UTC (rev 4463)
@@ -22,25 +22,20 @@
    // Constants -----------------------------------------------------
 
    private static final Logger log = Logger.getLogger(MinaConnector.class);
-   
-   
+      
    // Attributes ----------------------------------------------------
 
    private final IoSession session;
 
-   private MinaHandler handler;
-   
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public MinaSession(IoSession session, MinaHandler handler)
+   public MinaSession(IoSession session)
    {
       assert session != null;
 
       this.session = session;
-  
-      this.handler = handler;
    }
 
    // Public --------------------------------------------------------
@@ -53,15 +48,6 @@
    
    public void write(Packet packet)
    {     
-//      try
-//      {
-//         handler.checkWrite(session);
-//      }
-//      catch (Exception e)
-//      {
-//         log.error("Failed to acquire sem", e);
-//      }
-      
       session.write(packet);
    }
 

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientSessionImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientSessionImplTest.java	2008-06-13 09:59:51 UTC (rev 4462)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientSessionImplTest.java	2008-06-13 12:56:57 UTC (rev 4463)
@@ -2082,7 +2082,7 @@
       RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
             
       //In ClientSessionImpl constructor
-      EasyMock.expect(conn.getRemotingConnection()).andReturn(rc);
+      EasyMock.expect(conn.getRemotingConnection()).andStubReturn(rc);
           
       final long sessionTargetID = 7617622;      
       final SimpleString queueName = new SimpleString("gyugg");
@@ -2406,7 +2406,7 @@
       
       PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);
            
-      EasyMock.expect(conn.getRemotingConnection()).andReturn(rc);
+      EasyMock.expect(conn.getRemotingConnection()).andStubReturn(rc);
       
       EasyMock.expect(conn.getConnectionFactory()).andReturn(cf);
          
@@ -2482,7 +2482,7 @@
       
       PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);
            
-      EasyMock.expect(conn.getRemotingConnection()).andReturn(rc);
+      EasyMock.expect(conn.getRemotingConnection()).andStubReturn(rc);
        
       EasyMock.expect(rc.getPacketDispatcher()).andReturn(pd);
       
@@ -2501,6 +2501,8 @@
       
       EasyMock.expect(rc.sendBlocking(sessionTargetID, sessionTargetID, request)).andReturn(resp);
       
+      //EasyMock.expect(conn.getRemotingConnection()).andReturn(value)
+      
       EasyMock.expect(rc.getPacketDispatcher()).andReturn(pd);
       
       pd.register(new ClientConsumerPacketHandler(null, clientTargetID));
@@ -2549,7 +2551,7 @@
       
       PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);
            
-      EasyMock.expect(conn.getRemotingConnection()).andReturn(rc);
+      EasyMock.expect(conn.getRemotingConnection()).andStubReturn(rc);
       
       EasyMock.expect(conn.getConnectionFactory()).andReturn(cf);
       




More information about the jboss-cvs-commits mailing list