[jboss-cvs] JBoss Messaging SVN: r4204 - in trunk: src/main/org/jboss/messaging/core/client/impl and 3 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu May 15 08:20:26 EDT 2008


Author: timfox
Date: 2008-05-15 08:20:26 -0400 (Thu, 15 May 2008)
New Revision: 4204

Modified:
   trunk/src/main/org/jboss/messaging/core/client/ClientSession.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java
   trunk/src/main/org/jboss/messaging/jms/client/JBossSession.java
   trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaHandlerOrderingTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaHandlerTest.java
Log:
Don't order on messages received


Modified: trunk/src/main/org/jboss/messaging/core/client/ClientSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientSession.java	2008-05-15 11:38:09 UTC (rev 4203)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientSession.java	2008-05-15 12:20:26 UTC (rev 4204)
@@ -35,7 +35,7 @@
    SessionBindingQueryResponseMessage bindingQuery(SimpleString address) throws MessagingException;
    
    ClientConsumer createConsumer(SimpleString queueName, SimpleString filterString, boolean noLocal,
-                                 boolean autoDeleteQueue) throws MessagingException;
+                                 boolean autoDeleteQueue, boolean direct) throws MessagingException;
    
    ClientConsumer createConsumer(SimpleString queueName) throws MessagingException;
    

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-05-15 11:38:09 UTC (rev 4203)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-05-15 12:20:26 UTC (rev 4204)
@@ -72,6 +72,8 @@
    
    private final PriorityLinkedList<ClientMessage> buffer = new PriorityLinkedListImpl<ClientMessage>(10);
    
+   private final boolean direct;
+   
    private volatile Thread receiverThread;
    
    private volatile Thread onMessageThread;
@@ -83,6 +85,8 @@
    private volatile long ignoreDeliveryMark = -1;
    
    private volatile int tokensToSend;   
+   
+   
 
    // Constructors
    // ---------------------------------------------------------------------------------
@@ -100,7 +104,8 @@
                              final long clientTargetID,
                              final ExecutorService sessionExecutor,
                              final RemotingConnection remotingConnection,
-                             final int tokenBatchSize)
+                             final int tokenBatchSize,
+                             final boolean direct)
    {
       this.targetID = targetID;
       
@@ -113,6 +118,8 @@
       this.remotingConnection = remotingConnection;
       
       this.tokenBatchSize = tokenBatchSize;
+      
+      this.direct = direct;
    }
 
    // ClientConsumer implementation
@@ -309,8 +316,7 @@
       
       if (handler != null)
       {
-         //TODO
-         if (true)
+         if (direct)
          {
             //Dispatch it directly on remoting thread
             

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-05-15 11:38:09 UTC (rev 4203)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-05-15 12:20:26 UTC (rev 4204)
@@ -259,11 +259,11 @@
    
    public ClientConsumer createConsumer(final SimpleString queueName) throws MessagingException
    {
-      return createConsumer(queueName, null, false, false);
+      return createConsumer(queueName, null, false, false, false);
    }
    
    public ClientConsumer createConsumer(final SimpleString queueName, final SimpleString filterString, final boolean noLocal,
-                                        final boolean autoDeleteQueue) throws MessagingException
+                                        final boolean autoDeleteQueue, final boolean direct) throws MessagingException
    {
       checkClosed();
       
@@ -278,7 +278,7 @@
       int tokenBatchSize = response.getWindowSize() == -1 ? 0 : 1;
       
       ClientConsumerInternal consumer =
-         new ClientConsumerImpl(this, response.getConsumerTargetID(), clientTargetID, executor, remotingConnection, tokenBatchSize);
+         new ClientConsumerImpl(this, response.getConsumerTargetID(), clientTargetID, executor, remotingConnection, tokenBatchSize, direct);
 
       consumers.put(response.getConsumerTargetID(), consumer);
       

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java	2008-05-15 11:38:09 UTC (rev 4203)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java	2008-05-15 12:20:26 UTC (rev 4204)
@@ -151,7 +151,9 @@
       }
       
       threadPool = Executors.newCachedThreadPool();
-      handler = new MinaHandler(dispatcher, threadPool, this, false);
+      //We don't order executions in the handler for messages received - this is done in the ClientConsumeImpl
+      //since they are put on the queue in order
+      handler = new MinaHandler(dispatcher, threadPool, this, false, false);
       connector.setHandler(handler);
       InetSocketAddress address = new InetSocketAddress(location.getHost(), location.getPort());
       ConnectFuture future = connector.connect(address);

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java	2008-05-15 11:38:09 UTC (rev 4203)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java	2008-05-15 12:20:26 UTC (rev 4204)
@@ -51,14 +51,15 @@
 
    // Note! must use ConcurrentMap here to avoid race condition
    private final ConcurrentMap<Long, Executor> executors = new ConcurrentHashMap<Long, Executor>();
-
+   
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
    public MinaHandler(final PacketDispatcher dispatcher,
-         final ExecutorService executorService,
-         final CleanUpNotifier failureNotifier,
-         final boolean closeSessionOnExceptionCaught)
+                      final ExecutorService executorService,
+                      final CleanUpNotifier failureNotifier,
+                      final boolean closeSessionOnExceptionCaught,
+                      final boolean useExecutor)
    {
       assert dispatcher != null;
       assert executorService != null;
@@ -66,8 +67,14 @@
       this.dispatcher = dispatcher;
       this.failureNotifier = failureNotifier;
       this.closeSessionOnExceptionCaught = closeSessionOnExceptionCaught;
-
-      this.executorFactory = new OrderedExecutorFactory(executorService);
+      if (useExecutor)
+      {
+         this.executorFactory = new OrderedExecutorFactory(executorService);
+      }
+      else
+      {
+         this.executorFactory = null;
+      }
       this.dispatcher.setListener(this);
    }
 
@@ -112,35 +119,44 @@
    throws Exception
    {
       final Packet packet = (Packet) message;
-      long executorID = packet.getExecutorID();
-
-      Executor executor = executors.get(executorID);
-      if (executor == null)
+      
+      if (executorFactory != null)
       {
-         executor = executorFactory.getOrderedExecutor();
-
-         Executor oldExecutor = executors.putIfAbsent(executorID, executor);
-
-         if (oldExecutor != null)
+         
+         long executorID = packet.getExecutorID();
+   
+         Executor executor = executors.get(executorID);
+         if (executor == null)
          {
-            //Avoid race
-            executor = oldExecutor;
+            executor = executorFactory.getOrderedExecutor();
+   
+            Executor oldExecutor = executors.putIfAbsent(executorID, executor);
+   
+            if (oldExecutor != null)
+            {
+               //Avoid race
+               executor = oldExecutor;
+            }
          }
-      }
-
-      executor.execute(new Runnable()
-      {
-         public void run()
+   
+         executor.execute(new Runnable()
          {
-            try
+            public void run()
             {
-               messageReceivedInternal(session, packet);
-            } catch (Exception e)
-            {
-               log.error("unexpected error", e);
+               try
+               {
+                  messageReceivedInternal(session, packet);
+               } catch (Exception e)
+               {
+                  log.error("unexpected error", e);
+               }
             }
-         }
-      });
+         });
+      }
+      else
+      {
+         messageReceivedInternal(session, packet);
+      }
    }
 
    private final int high = 2000;

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java	2008-05-15 11:38:09 UTC (rev 4203)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java	2008-05-15 12:20:26 UTC (rev 4204)
@@ -161,7 +161,7 @@
          acceptor.setCloseOnDeactivation(false);
 
          threadPool = Executors.newCachedThreadPool();
-         acceptor.setHandler(new MinaHandler(dispatcher, threadPool, this, true));
+         acceptor.setHandler(new MinaHandler(dispatcher, threadPool, this, true, true));
          acceptor.bind();
          acceptorListener = new MinaSessionListener();
          acceptor.addListener(acceptorListener);

Modified: trunk/src/main/org/jboss/messaging/jms/client/JBossSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossSession.java	2008-05-15 11:38:09 UTC (rev 4203)
+++ trunk/src/main/org/jboss/messaging/jms/client/JBossSession.java	2008-05-15 12:20:26 UTC (rev 4204)
@@ -487,7 +487,7 @@
                throw new InvalidDestinationException("Queue " + dest.getName() + " does not exist");
             }
             
-            consumer = session.createConsumer(dest.getSimpleAddress(), coreFilterString, noLocal, false);
+            consumer = session.createConsumer(dest.getSimpleAddress(), coreFilterString, noLocal, false, false);
          }
          else
          {
@@ -508,7 +508,7 @@
                
                session.createQueue(dest.getSimpleAddress(), queueName, coreFilterString, false, false);
                
-               consumer = session.createConsumer(queueName, null, noLocal, true);
+               consumer = session.createConsumer(queueName, null, noLocal, true, false);
             }
             else
             {
@@ -570,7 +570,7 @@
                   }                          
                }
                
-               consumer = session.createConsumer(queueName, null, noLocal, false);
+               consumer = session.createConsumer(queueName, null, noLocal, false, false);
             }         
          }
          

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaHandlerOrderingTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaHandlerOrderingTest.java	2008-05-15 11:38:09 UTC (rev 4203)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaHandlerOrderingTest.java	2008-05-15 12:20:26 UTC (rev 4204)
@@ -102,7 +102,7 @@
    {
       clientDispatcher = new PacketDispatcherImpl(null);
       threadPool = Executors.newCachedThreadPool();
-      handler = new MinaHandler(clientDispatcher, threadPool, null, true);
+      handler = new MinaHandler(clientDispatcher, threadPool, null, true, true);
 
       handler_1 = new TestPacketHandler(23);
       clientDispatcher.register(handler_1);

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaHandlerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaHandlerTest.java	2008-05-15 11:38:09 UTC (rev 4203)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaHandlerTest.java	2008-05-15 12:20:26 UTC (rev 4204)
@@ -78,7 +78,7 @@
    {
       clientDispatcher = new PacketDispatcherImpl(null);
       threadPool = Executors.newCachedThreadPool();
-      handler = new MinaHandler(clientDispatcher, threadPool, null, true);
+      handler = new MinaHandler(clientDispatcher, threadPool, null, true, true);
 
       packetHandler = new TestPacketHandler(23);
       clientDispatcher.register(packetHandler);




More information about the jboss-cvs-commits mailing list