[jboss-cvs] JBoss Messaging SVN: r4170 - in trunk: src/main/org/jboss/messaging/core/client/impl and 6 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri May 9 12:50:24 EDT 2008


Author: timfox
Date: 2008-05-09 12:50:24 -0400 (Fri, 09 May 2008)
New Revision: 4170

Modified:
   trunk/src/main/org/jboss/messaging/core/client/ClientConnectionFactory.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/server/Queue.java
   trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java
   trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
   trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageProducerTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/CoreClientTest.java
Log:
Revert queue pause and resume, also added attributes to cf


Modified: trunk/src/main/org/jboss/messaging/core/client/ClientConnectionFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientConnectionFactory.java	2008-05-09 12:52:40 UTC (rev 4169)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientConnectionFactory.java	2008-05-09 16:50:24 UTC (rev 4170)
@@ -35,4 +35,24 @@
    ClientConnection createConnection() throws MessagingException;
    
    ClientConnection createConnection(String username, String password) throws MessagingException;   
+   
+   void setDefaultConsumerWindowSize(int size);
+   
+   int getDefaultConsumerWindowSize();
+   
+   void setDefaultProducerWindowSize(int size);     
+   
+   int getDefaultProducerWindowSize();
+   
+   void setDefaultConsumerMaxRate(int rate);
+   
+   int getDefaultConsumerMaxRate();
+   
+   void setDefaultProducerMaxRate(int rate);
+   
+   int getDefaultProducerMaxRate();
+   
+   Location getLocation();
+   
+   ConnectionParams getConnectionParams();
 }

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java	2008-05-09 12:52:40 UTC (rev 4169)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java	2008-05-09 16:50:24 UTC (rev 4170)
@@ -70,13 +70,13 @@
  
    private final boolean strictTck;
       
-   private final int defaultConsumerWindowSize;
+   private int defaultConsumerWindowSize;
    
-   private final int defaultConsumerMaxRate;
+   private int defaultConsumerMaxRate;
 
-   private final int defaultProducerWindowSize;
+   private int defaultProducerWindowSize;
    
-   private final int defaultProducerMaxRate;
+   private int defaultProducerMaxRate;
    
    
    // Static ---------------------------------------------------------------------------------------
@@ -180,25 +180,60 @@
    
    // ClientConnectionFactory implementation ---------------------------------------------
 
-	public int getConsumerWindowSize()
+	public int getDefaultConsumerWindowSize()
 	{
 		return defaultConsumerWindowSize;
 	}
+	
+	public void setDefaultConsumerWindowSize(final int size)
+   {
+      defaultConsumerWindowSize = size;
+   }
 
-	public int getProducerWindowSize()
+	public int getDefaultProducerWindowSize()
 	{
 		return defaultProducerWindowSize;
 	}
+			
+	public void setDefaultProducerWindowSize(final int size)
+   {
+      defaultProducerWindowSize = size;
+   }
 
 	public boolean isStrictTck()
 	{
 		return strictTck;
 	}
 
-	public int getMaxProducerRate()
+	public int getDefaultProducerMaxRate()
 	{
 		return defaultProducerMaxRate;
 	}
+	
+	public void setDefaultProducerMaxRate(final int rate)
+	{
+	   this.defaultProducerMaxRate = rate;
+	}
+	
+	public int getDefaultConsumerMaxRate()
+   {
+      return defaultConsumerMaxRate;
+   }
+   
+   public void setDefaultConsumerMaxRate(final int rate)
+   {
+      this.defaultConsumerMaxRate = rate;
+   }
+   
+   public ConnectionParams getConnectionParams()
+   {
+      return connectionParams;
+   }
+   
+   public Location getLocation()
+   {
+      return location;
+   }
 		  
    // Public ---------------------------------------------------------------------------------------
       

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-05-09 12:52:40 UTC (rev 4169)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-05-09 16:50:24 UTC (rev 4170)
@@ -270,8 +270,10 @@
       
       SessionCreateConsumerResponseMessage response = (SessionCreateConsumerResponseMessage)remotingConnection.sendBlocking(serverTargetID, serverTargetID, request);
       
+      int tokenBatchSize = response.getWindowSize() == -1 ? 0 : 1;
+      
       ClientConsumerInternal consumer =
-         new ClientConsumerImpl(this, response.getConsumerTargetID(), clientTargetID, executor, remotingConnection, direct, 1);
+         new ClientConsumerImpl(this, response.getConsumerTargetID(), clientTargetID, executor, remotingConnection, direct, tokenBatchSize);
 
       consumers.put(response.getConsumerTargetID(), consumer);
       

Modified: trunk/src/main/org/jboss/messaging/core/server/Queue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Queue.java	2008-05-09 12:52:40 UTC (rev 4169)
+++ trunk/src/main/org/jboss/messaging/core/server/Queue.java	2008-05-09 16:50:24 UTC (rev 4170)
@@ -108,8 +108,4 @@
    MessageReference getReference(long id);
    
    void deleteAllReferences(StorageManager storageManager) throws Exception;
-
-   void stopDelivery();
-
-   void startDelivery();
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerSession.java	2008-05-09 12:52:40 UTC (rev 4169)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerSession.java	2008-05-09 16:50:24 UTC (rev 4170)
@@ -57,7 +57,7 @@
 	
 	void setStarted(boolean started) throws Exception;
 	
-	HandleStatus handleDelivery(MessageReference reference, ServerConsumer consumer) throws Exception;
+	void handleDelivery(MessageReference reference, ServerConsumer consumer) throws Exception;
 	
 	void promptDelivery(Queue queue);
 	

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-05-09 12:52:40 UTC (rev 4169)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-05-09 16:50:24 UTC (rev 4170)
@@ -48,14 +48,14 @@
 import org.jboss.messaging.util.SimpleString;
 
 /**
- *
+ * 
  * Implementation of a Queue
- *
+ * 
  * TODO use Java 5 concurrent queue
- *
+ * 
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="ataylor at redhat.com">Andy Taylor</a>
- *
+ * 
  */
 public class QueueImpl implements Queue
 {
@@ -66,7 +66,7 @@
    private volatile long persistenceID = -1;
 
    private final SimpleString name;
-   
+
    private volatile Filter filter;
 
    private final boolean clustered;
@@ -76,12 +76,13 @@
    private final boolean temporary;
 
    private volatile int maxSize;
-         
+
    private final ScheduledExecutorService scheduledExecutor;
 
-   private final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<MessageReference>(NUM_PRIORITIES);
+   private final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<MessageReference>(
+         NUM_PRIORITIES);
 
-   private final List<Consumer> consumers  = new ArrayList<Consumer>();
+   private final List<Consumer> consumers = new ArrayList<Consumer>();
 
    private final Set<ScheduledDeliveryRunnable> scheduledRunnables = new LinkedHashSet<ScheduledDeliveryRunnable>();
 
@@ -96,16 +97,15 @@
    private AtomicInteger messagesAdded = new AtomicInteger(0);
 
    private AtomicInteger deliveringCount = new AtomicInteger(0);
-   
+
    private volatile FlowController flowController;
 
-   private boolean delivering = true; 
-   
-   public QueueImpl(final long persistenceID, final SimpleString name, final Filter filter, final boolean clustered,
-                    final boolean durable, final boolean temporary, final int maxSize,
-                    final ScheduledExecutorService scheduledExecutor)
+   public QueueImpl(final long persistenceID, final SimpleString name,
+         final Filter filter, final boolean clustered, final boolean durable,
+         final boolean temporary, final int maxSize,
+         final ScheduledExecutorService scheduledExecutor)
    {
-   	this.persistenceID = persistenceID;
+      this.persistenceID = persistenceID;
 
       this.name = name;
 
@@ -118,14 +118,15 @@
       this.temporary = temporary;
 
       this.maxSize = maxSize;
-      
+
       this.scheduledExecutor = scheduledExecutor;
-   	
-      direct = true;        	
+
+      direct = true;
    }
-   
-   // Queue implementation -------------------------------------------------------------------
 
+   // Queue implementation
+   // -------------------------------------------------------------------
+
    public boolean isClustered()
    {
       return clustered;
@@ -172,6 +173,7 @@
 
    /*
     * Attempt to deliver all the messages in the queue
+    * 
     * @see org.jboss.messaging.newcore.intf.Queue#deliver()
     */
    public synchronized void deliver()
@@ -202,7 +204,7 @@
          {
             if (iterator == null)
             {
-               //We delivered all the messages - go into direct delivery
+               // We delivered all the messages - go into direct delivery
                direct = true;
 
                promptDelivery = false;
@@ -225,12 +227,13 @@
          }
          else if (status == HandleStatus.BUSY)
          {
-            //All consumers busy - give up
+            // All consumers busy - give up
             break;
          }
          else if (status == HandleStatus.NO_MATCH && iterator == null)
          {
-            //Consumers not all busy - but filter not accepting - iterate back through the queue
+            // Consumers not all busy - but filter not accepting - iterate back
+            // through the queue
             iterator = messageReferences.iterator();
          }
       }
@@ -273,7 +276,7 @@
       {
          ArrayList<MessageReference> list = new ArrayList<MessageReference>();
 
-         for (MessageReference ref: messageReferences.getAll())
+         for (MessageReference ref : messageReferences.getAll())
          {
             if (filter.match(ref.getMessage()))
             {
@@ -287,42 +290,39 @@
 
    public synchronized boolean removeReferenceWithID(final long id)
    {
-   	ListIterator<MessageReference> iterator = messageReferences.iterator();
-   	
-   	boolean removed = false;
-   	
-   	while (iterator.hasNext())
-   	{
-   		MessageReference ref = iterator.next();
-   		
-   		if (ref.getMessage().getMessageID() == id)
-   		{
-   			iterator.remove();
-   			
-   			removed = true;
-   			
-   			break;
-   		}
-   	}
-   	
-   	return removed;
+      ListIterator<MessageReference> iterator = messageReferences.iterator();
+
+      boolean removed = false;
+
+      while (iterator.hasNext())
+      {
+         MessageReference ref = iterator.next();
+
+         if (ref.getMessage().getMessageID() == id)
+         {
+            iterator.remove();
+
+            removed = true;
+
+            break;
+         }
+      }
+
+      return removed;
    }
-   
+
    public synchronized MessageReference getReference(final long id)
    {
-   	ListIterator<MessageReference> iterator = messageReferences.iterator();
-   	
-   	while (iterator.hasNext())
-   	{
-   		MessageReference ref = iterator.next();
-   		
-   		if (ref.getMessage().getMessageID() == id)
-   		{
-   			return ref;
-   		}
-   	}
-   	
-   	return null;
+      ListIterator<MessageReference> iterator = messageReferences.iterator();
+
+      while (iterator.hasNext())
+      {
+         MessageReference ref = iterator.next();
+
+         if (ref.getMessage().getMessageID() == id) { return ref; }
+      }
+
+      return null;
    }
 
    public long getPersistenceID()
@@ -347,7 +347,8 @@
 
    public synchronized int getMessageCount()
    {
-      return messageReferences.size() + getScheduledCount() + getDeliveringCount();
+      return messageReferences.size() + getScheduledCount()
+            + getDeliveringCount();
    }
 
    public synchronized int getScheduledCount()
@@ -363,13 +364,13 @@
    public void referenceAcknowledged() throws Exception
    {
       deliveringCount.decrementAndGet();
-      
+
       if (flowController != null)
       {
-      	flowController.messageAcknowledged();
+         flowController.messageAcknowledged();
       }
    }
-   
+
    public void referenceCancelled()
    {
       deliveringCount.decrementAndGet();
@@ -384,11 +385,10 @@
    {
       int num = messageReferences.size() + scheduledRunnables.size();
 
-      if (maxSize < num)
-      {
-         throw new IllegalArgumentException("Cannot set maxSize to " + maxSize + " since there are " + num + " refs");
-      }
-      
+      if (maxSize < num) { throw new IllegalArgumentException(
+            "Cannot set maxSize to " + maxSize + " since there are " + num
+                  + " refs"); }
+
       this.maxSize = maxSize;
    }
 
@@ -406,72 +406,61 @@
    {
       return messagesAdded.get();
    }
-    
+
    public void setFlowController(final FlowController flowController)
    {
-   	this.flowController = flowController;
+      this.flowController = flowController;
    }
-   
+
    public FlowController getFlowController()
    {
-   	return flowController;
+      return flowController;
    }
-   
-   public synchronized void deleteAllReferences(final StorageManager storageManager) throws Exception
-   {
-   	Transaction tx = new TransactionImpl(storageManager, null);
-   	   	   	
-   	ListIterator<MessageReference> iter = messageReferences.iterator();
-   	
-   	while (iter.hasNext())
-   	{
-   		MessageReference ref = iter.next();
-   		
-   		deliveringCount.incrementAndGet();
-   		
-   		tx.addAcknowledgement(ref);
-   		
-   		iter.remove();
-   	}
-   	
-   	synchronized (scheduledRunnables)
-   	{
-   		for (ScheduledDeliveryRunnable runnable: scheduledRunnables)
-   		{
-   			runnable.cancel();
-   			
-   			deliveringCount.incrementAndGet();
-   			
-   			tx.addAcknowledgement(runnable.getReference());
-   		}
-   		
-   		scheduledRunnables.clear();
-   	}
-   		
-   	tx.commit();   	
-   }
 
-   public void stopDelivery()
+   public synchronized void deleteAllReferences(
+         final StorageManager storageManager) throws Exception
    {
-      delivering = false;
-   }
+      Transaction tx = new TransactionImpl(storageManager, null);
 
-   public void startDelivery()
-   {
-      delivering = true;
-      deliver();
+      ListIterator<MessageReference> iter = messageReferences.iterator();
+
+      while (iter.hasNext())
+      {
+         MessageReference ref = iter.next();
+
+         deliveringCount.incrementAndGet();
+
+         tx.addAcknowledgement(ref);
+
+         iter.remove();
+      }
+
+      synchronized (scheduledRunnables)
+      {
+         for (ScheduledDeliveryRunnable runnable : scheduledRunnables)
+         {
+            runnable.cancel();
+
+            deliveringCount.incrementAndGet();
+
+            tx.addAcknowledgement(runnable.getReference());
+         }
+
+         scheduledRunnables.clear();
+      }
+
+      tx.commit();
    }
 
-   // Public -----------------------------------------------------------------------------
 
+   // Public
+   // -----------------------------------------------------------------------------
+
    public boolean equals(Object other)
    {
-      if (this == other)
-      {
-         return true;
-      }
+      if (this == other) { return true; }
 
-      QueueImpl qother = (QueueImpl)other;
+      QueueImpl qother = (QueueImpl) other;
 
       return name.equals(qother.name);
    }
@@ -481,14 +470,12 @@
       return name.hashCode();
    }
 
-   // Private ------------------------------------------------------------------------------
-   
+   // Private
+   // ------------------------------------------------------------------------------
+
    private HandleStatus add(final MessageReference ref, final boolean first)
    {
-      if (!checkFull())
-      {
-         return HandleStatus.BUSY;
-      }
+      if (!checkFull()) { return HandleStatus.BUSY; }
 
       if (!first)
       {
@@ -501,13 +488,13 @@
 
          if (direct)
          {
-            //Deliver directly
+            // Deliver directly
 
             HandleStatus status = deliver(ref);
 
             if (status == HandleStatus.HANDLED)
             {
-               //Ok
+               // Ok
             }
             else if (status == HandleStatus.BUSY)
             {
@@ -541,9 +528,12 @@
 
             if (!direct && promptDelivery)
             {
-               //We have consumers with filters which don't match, so we need to prompt delivery every time
-               //a new message arrives - this is why you really shouldn't use filters with queues - in most cases
-               //it's an ant-pattern since it would cause a queue scan on each message
+               // We have consumers with filters which don't match, so we need
+               // to prompt delivery every time
+               // a new message arrives - this is why you really shouldn't use
+               // filters with queues - in most cases
+               // it's an ant-pattern since it would cause a queue scan on each
+               // message
                deliver();
             }
          }
@@ -554,11 +544,15 @@
 
    private boolean checkAndSchedule(final MessageReference ref)
    {
-   	long now = System.currentTimeMillis();
+      long now = System.currentTimeMillis();
 
       if (scheduledExecutor != null && ref.getScheduledDeliveryTime() > now)
       {
-         if (trace) { log.trace("Scheduling delivery for " + ref + " to occur at " + ref.getScheduledDeliveryTime()); }
+         if (trace)
+         {
+            log.trace("Scheduling delivery for " + ref + " to occur at "
+                  + ref.getScheduledDeliveryTime());
+         }
 
          long delay = ref.getScheduledDeliveryTime() - now;
 
@@ -566,7 +560,8 @@
 
          scheduledRunnables.add(runnable);
 
-         Future<?> future = scheduledExecutor.schedule(runnable, delay, TimeUnit.MILLISECONDS);
+         Future<?> future = scheduledExecutor.schedule(runnable, delay,
+               TimeUnit.MILLISECONDS);
 
          runnable.setFuture(future);
 
@@ -579,10 +574,15 @@
    }
 
    private boolean checkFull()
-   {   	
-      if (maxSize != -1 && (deliveringCount.get() + messageReferences.size() + scheduledRunnables.size()) >= maxSize)
+   {
+      if (maxSize != -1
+            && (deliveringCount.get() + messageReferences.size() + scheduledRunnables
+                  .size()) >= maxSize)
       {
-         if (trace) { log.trace(this + " queue is full, rejecting message"); }
+         if (trace)
+         {
+            log.trace(this + " queue is full, rejecting message");
+         }
 
          return false;
       }
@@ -594,7 +594,7 @@
 
    private HandleStatus deliver(final MessageReference reference)
    {
-      if (consumers.isEmpty() || !delivering)
+      if (consumers.isEmpty())
       {
          return HandleStatus.BUSY;
       }
@@ -617,21 +617,19 @@
          }
          catch (Throwable t)
          {
-            //If the consumer throws an exception we remove the consumer
+            // If the consumer throws an exception we remove the consumer
             removeConsumer(consumer);
 
             return HandleStatus.BUSY;
          }
 
-         if (status == null)
-         {
-            throw new IllegalStateException("ClientConsumer.handle() should never return null");
-         }
+         if (status == null) { throw new IllegalStateException(
+               "ClientConsumer.handle() should never return null"); }
 
          if (status == HandleStatus.HANDLED)
          {
             deliveringCount.incrementAndGet();
-            
+
             return HandleStatus.HANDLED;
          }
          else if (status == HandleStatus.NO_MATCH)
@@ -643,21 +641,22 @@
 
          if (pos == startPos)
          {
-            //Tried all of them
+            // Tried all of them
             if (filterRejected)
             {
                return HandleStatus.NO_MATCH;
             }
             else
             {
-               //Give up - all consumers busy
+               // Give up - all consumers busy
                return HandleStatus.BUSY;
             }
          }
       }
    }
 
-   // Inner classes --------------------------------------------------------------------------
+   // Inner classes
+   // --------------------------------------------------------------------------
 
    private class ScheduledDeliveryRunnable implements Runnable
    {
@@ -674,34 +673,37 @@
 
       public synchronized void setFuture(final Future<?> future)
       {
-      	if (cancelled)
-      	{
-      		future.cancel(false);
-      	}
-      	else
-      	{
-      		this.future = future;
-      	}
+         if (cancelled)
+         {
+            future.cancel(false);
+         }
+         else
+         {
+            this.future = future;
+         }
       }
 
       public synchronized void cancel()
       {
-      	if (future != null)
-      	{
-      		future.cancel(false);
-      	}
+         if (future != null)
+         {
+            future.cancel(false);
+         }
 
-      	cancelled = true;
+         cancelled = true;
       }
-      
+
       public MessageReference getReference()
       {
-      	return ref;
+         return ref;
       }
 
       public void run()
       {
-         if (trace) { log.trace("Scheduled delivery timeout " + ref); }
+         if (trace)
+         {
+            log.trace("Scheduled delivery timeout " + ref);
+         }
 
          synchronized (scheduledRunnables)
          {
@@ -709,9 +711,9 @@
 
             if (!removed)
             {
-            	log.warn("Failed to remove timeout " + this);
-            	
-            	return;
+               log.warn("Failed to remove timeout " + this);
+
+               return;
             }
          }
 
@@ -721,13 +723,17 @@
 
          if (HandleStatus.HANDLED != status)
          {
-            //Add back to the front of the queue
+            // Add back to the front of the queue
 
             addFirst(ref);
          }
          else
          {
-            if (trace) { log.trace("Delivered scheduled delivery at " + System.currentTimeMillis() + " for " + ref); }
+            if (trace)
+            {
+               log.trace("Delivered scheduled delivery at "
+                     + System.currentTimeMillis() + " for " + ref);
+            }
          }
       }
    }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-05-09 12:52:40 UTC (rev 4169)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-05-09 16:50:24 UTC (rev 4170)
@@ -214,15 +214,16 @@
                    
          try
          {
-         	return sessionEndpoint.handleDelivery(ref, this);
+         	sessionEndpoint.handleDelivery(ref, this);
          }
          catch (Exception e)
          {
          	log.error("Failed to handle delivery", e);
          	
          	started = false; // DO NOT return null or the message might get delivered more than once
-            return HandleStatus.BUSY;
          }
+         
+         return HandleStatus.HANDLED;
       }
    }
    

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-05-09 12:52:40 UTC (rev 4169)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-05-09 16:50:24 UTC (rev 4170)
@@ -136,8 +136,6 @@
 
    private Transaction tx;
 
-   private ArrayList<Queue> stopped = new ArrayList<Queue>();
-
    // Constructors
    // ---------------------------------------------------------------------------------
 
@@ -225,19 +223,13 @@
       dispatcher.unregister(producer.getID());
    }
 
-   public synchronized HandleStatus handleDelivery(final MessageReference ref, final ServerConsumer consumer) throws Exception
+   public synchronized void handleDelivery(final MessageReference ref, final ServerConsumer consumer) throws Exception
    {
-      //if the queue we are delivering to has been stopped then dont deliver!
-      if (stopped.contains(ref.getQueue()))
-      {
-         return HandleStatus.BUSY;
-      }
       Delivery delivery = new DeliveryImpl(ref, id, consumer.getClientTargetID(), deliveryIDSequence++, sender);
 
       deliveries.add(delivery);
 
       delivery.deliver();
-      return HandleStatus.HANDLED;
    }
 
    public void setStarted(final boolean s) throws Exception
@@ -428,8 +420,7 @@
          tx = new TransactionImpl(persistenceManager, postOffice);
       }
 
-      // Synchronize to prevent any new deliveries arriving during this recovery. also we stop any queues that are having
-      // messages rolled back, if their are any messages in mid delivery then these will not be delivered. 
+      // Synchronize to prevent any new deliveries arriving during this recovery. 
       synchronized (this)
       {
          // Add any unacked deliveries into the tx. Doing this ensures all references are rolled back in the correct
@@ -439,28 +430,14 @@
          {
             tx.addAcknowledgement(del.getReference());
          }
-         //stop the queue delivering for all the queues where messages are being rolled back
-         List<MessageReference> acks = tx.getAcknowledgements();
-         for (MessageReference ack : acks)
-         {
-            stopped.add(ack.getQueue());
-         }
-         for (Queue queue : stopped)
-         {
-            queue.stopDelivery();
-         }
+        
          deliveries.clear();
+         
          deliveryIDSequence -= tx.getAcknowledgementsCount();
       }
+      
       tx.rollback(queueSettingsRepository);
-      //once we have done the rollbackwe can restart any queues which will flush any awaiting deliveries
-      ArrayList<Queue> toRestart = new ArrayList<Queue>(stopped);
-      stopped.clear();
-      for (Queue queue : toRestart)
-      {
-         queue.startDelivery();
-      }
-
+      
       tx = new TransactionImpl(persistenceManager, postOffice);
    }
 

Modified: trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java	2008-05-09 12:52:40 UTC (rev 4169)
+++ trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java	2008-05-09 16:50:24 UTC (rev 4170)
@@ -21,8 +21,6 @@
  */
 package org.jboss.messaging.core.transaction;
 
-import java.util.List;
-
 import javax.transaction.xa.Xid;
 
 import org.jboss.messaging.core.server.MessageReference;
@@ -47,8 +45,6 @@
    
    void addMessage(ServerMessage message) throws Exception;
 
-   List<MessageReference> getAcknowledgements();
-
    void addAcknowledgement(MessageReference acknowledgement) throws Exception;
    
    int getAcknowledgementsCount();

Modified: trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2008-05-09 12:52:40 UTC (rev 4169)
+++ trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2008-05-09 16:50:24 UTC (rev 4170)
@@ -117,11 +117,6 @@
 		}
 	}
 
-   public List<MessageReference> getAcknowledgements()
-   {
-      return new ArrayList<MessageReference>(acknowledgements);
-   }
-
    public void addAcknowledgement(final MessageReference acknowledgement)
 			throws Exception
 	{

Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageProducerTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageProducerTest.java	2008-05-09 12:52:40 UTC (rev 4169)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageProducerTest.java	2008-05-09 16:50:24 UTC (rev 4170)
@@ -152,51 +152,51 @@
          }
       }
    }
-//
-//   public void testSpeed() throws Exception
-//   {
-//      Connection pconn = null;      
-//
-//      try
-//      {
-//         pconn = cf.createConnection();
-//
-//         Session ps = pconn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
-//
-//         MessageProducer p = ps.createProducer(queue1);
-//             
-//         pconn.start();
-//         
-//         p.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-//         
-//         p.setDisableMessageID(true);
-//         p.setDisableMessageTimestamp(true);
-//
-//         final int numMessages = 100000;
-//
-//         long start = System.currentTimeMillis();
-//
-//         BytesMessage msg = ps.createBytesMessage();
-//         
-//         msg.writeBytes(new byte[200]);
-//                           
-//         for (int i = 0; i < numMessages; i++)
-//         {
-//            p.send(msg);
-//         }
-//         
-//         long end = System.currentTimeMillis();
-//
-//         double actualRate = 1000 * (double)numMessages / ( end - start);
-//
-//         log.info("rate " + actualRate + " msgs /sec");
-//
-//      }
-//      finally
-//      {
-//         pconn.close();
-//      }
-//   }
+
+   public void testSpeed() throws Exception
+   {
+      Connection pconn = null;      
+
+      try
+      {
+         pconn = cf.createConnection();
+
+         Session ps = pconn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
+
+         MessageProducer p = ps.createProducer(queue1);
+             
+         pconn.start();
+         
+         p.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+         
+         p.setDisableMessageID(true);
+         p.setDisableMessageTimestamp(true);
+
+         final int numMessages = 100000;
+
+         long start = System.currentTimeMillis();
+
+         BytesMessage msg = ps.createBytesMessage();
+         
+         msg.writeBytes(new byte[200]);
+                           
+         for (int i = 0; i < numMessages; i++)
+         {
+            p.send(msg);
+         }
+         
+         long end = System.currentTimeMillis();
+
+         double actualRate = 1000 * (double)numMessages / ( end - start);
+
+         log.info("rate " + actualRate + " msgs /sec");
+
+      }
+      finally
+      {
+         pconn.close();
+      }
+   }
 //   
 //   public void testSpeed2() throws Exception
 //   {
@@ -266,75 +266,75 @@
 //      }
 //   }
 //   
-//   public void testSpeed3() throws Exception
-//   {
-//      Connection pconn = null;      
-//
-//      try
-//      {
-//         pconn = cf.createConnection();
-//
-//         Session ps = pconn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
-//
-//         MessageProducer p = ps.createProducer(queue1);
-//         
-//         MessageConsumer cons = ps.createConsumer(queue1);
-//             
-//         p.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-//         
-//         p.setDisableMessageID(true);
-//         p.setDisableMessageTimestamp(true);
-//
-//         final int numMessages = 100000;
-//         
-//         BytesMessage msg = ps.createBytesMessage();
-//         
-//         msg.writeBytes(new byte[1000]);
-//         
-//         final CountDownLatch latch = new CountDownLatch(1);
-//         
-//         class MyListener implements MessageListener
-//         {
-//            int count;
-//
-//            public void onMessage(Message msg)
-//            {
-//               count++;
-//               
-//               if (count == numMessages)
-//               {
-//                  latch.countDown();
-//               }
-//            }            
-//         }
-//         
-//         cons.setMessageListener(new MyListener());
-//         
-//         for (int i = 0; i < numMessages; i++)
-//         {
-//            p.send(msg);
-//         }
-//         
-//         long start = System.currentTimeMillis();
-//
-//         
-//         pconn.start();
-//         
-//         
-//         latch.await();
-//         
-//         long end = System.currentTimeMillis();
-//
-//         double actualRate = 1000 * (double)numMessages / ( end - start);
-//
-//         log.info("rate " + actualRate + " msgs /sec");
-//
-//      }
-//      finally
-//      {
-//         pconn.close();
-//      }
-//   }
+   public void testSpeed3() throws Exception
+   {
+      Connection pconn = null;      
+
+      try
+      {
+         pconn = cf.createConnection();
+
+         Session ps = pconn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
+
+         MessageProducer p = ps.createProducer(queue1);
+         
+         MessageConsumer cons = ps.createConsumer(queue1);
+             
+         p.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+         
+         p.setDisableMessageID(true);
+         p.setDisableMessageTimestamp(true);
+
+         final int numMessages = 100000;
+         
+         BytesMessage msg = ps.createBytesMessage();
+         
+         msg.writeBytes(new byte[200]);
+         
+         final CountDownLatch latch = new CountDownLatch(1);
+         
+         class MyListener implements MessageListener
+         {
+            int count;
+
+            public void onMessage(Message msg)
+            {
+               count++;
+               
+               if (count == numMessages)
+               {
+                  latch.countDown();
+               }
+            }            
+         }
+         
+         cons.setMessageListener(new MyListener());
+         
+         for (int i = 0; i < numMessages; i++)
+         {
+            p.send(msg);
+         }
+         
+         long start = System.currentTimeMillis();
+
+         
+         pconn.start();
+         
+         
+         latch.await();
+         
+         long end = System.currentTimeMillis();
+
+         double actualRate = 1000 * (double)numMessages / ( end - start);
+
+         log.info("rate " + actualRate + " msgs /sec");
+
+      }
+      finally
+      {
+         pconn.close();
+      }
+   }
    
    public void testTransactedSendPersistent() throws Exception
    {

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/CoreClientTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/CoreClientTest.java	2008-05-09 12:52:40 UTC (rev 4169)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/CoreClientTest.java	2008-05-09 16:50:24 UTC (rev 4170)
@@ -1,5 +1,7 @@
 package org.jboss.messaging.tests.integration;
 
+import java.util.concurrent.CountDownLatch;
+
 import junit.framework.TestCase;
 
 import org.jboss.messaging.core.client.ClientConnection;
@@ -9,6 +11,7 @@
 import org.jboss.messaging.core.client.ClientProducer;
 import org.jboss.messaging.core.client.ClientSession;
 import org.jboss.messaging.core.client.Location;
+import org.jboss.messaging.core.client.MessageHandler;
 import org.jboss.messaging.core.client.impl.ClientConnectionFactoryImpl;
 import org.jboss.messaging.core.client.impl.ClientMessageImpl;
 import org.jboss.messaging.core.client.impl.LocationImpl;
@@ -81,9 +84,87 @@
       
       conn.close();
    }
+   
+   public void testCoreClientPerf() throws Exception
+   {
+      Location location = new LocationImpl(TransportType.TCP, "localhost", ConfigurationImpl.DEFAULT_REMOTING_PORT);
+            
+      ClientConnectionFactory cf = new ClientConnectionFactoryImpl(location);
+      cf.setDefaultConsumerWindowSize(-1);
+      
+      ClientConnection conn = cf.createConnection();
+      
+      ClientSession session = conn.createClientSession(false, true, false, -1, false, false);
+      session.createQueue(QUEUE, QUEUE, null, false, false);
+      
+      ClientProducer producer = session.createProducer(QUEUE);
 
+      ClientMessage message = new ClientMessageImpl(JBossTextMessage.TYPE, false, 0,
+            System.currentTimeMillis(), (byte) 1);
+      
+      ClientConsumer consumer = session.createConsumer(QUEUE, null, false, false, true);
+            
+      final CountDownLatch latch = new CountDownLatch(1);
+      
+      final int numMessages = 100000;
+      
+      class MyHandler implements MessageHandler
+      {
+         int count;
 
+         public void onMessage(ClientMessage msg)
+         {
+            count++;
 
+            if (count == numMessages)
+            {
+               latch.countDown();
+            }
+         }            
+      }
+
+      consumer.setMessageHandler(new MyHandler());
+      
+      
+      
+      
+      
+      
+      for (int i = 0; i < numMessages; i++)
+      {      
+         producer.send(message);
+      }
+      
+//      long end = System.currentTimeMillis();
+//
+//      double actualRate = 1000 * (double)numMessages / ( end - start);
+//                  
+//      System.out.println("Rate is " + actualRate);
+
+      conn.start();
+      
+      long start = System.currentTimeMillis();
+      
+      //start = System.currentTimeMillis();
+
+      latch.await();
+      
+      long end = System.currentTimeMillis();
+      
+      double actualRate = 1000 * (double)numMessages / ( end - start);
+      
+      System.out.println("Rate is " + actualRate);
+      
+//      
+//      message = consumer.receive(1000);
+//      
+//      assertEquals("testINVMCoreClient", message.getBody().getString());
+//      
+      conn.close();
+   }
+
+
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------




More information about the jboss-cvs-commits mailing list