[jboss-cvs] JBoss Messaging SVN: r6211 - trunk/src/main/org/jboss/messaging/core/client/impl.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Mar 30 05:18:40 EDT 2009


Author: jmesnil
Date: 2009-03-30 05:18:40 -0400 (Mon, 30 Mar 2009)
New Revision: 6211

Modified:
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
Log:
JBMESSAGING-1560: Client-side message prioritization support

* in ClientConsumerImpl, used a PriorityLinkedList instead of Queue as the buffer of messages so that all messages in the buffer have an opportunity to be prioritized before being delivered.

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2009-03-30 09:01:24 UTC (rev 6210)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2009-03-30 09:18:40 UTC (rev 6211)
@@ -12,11 +12,19 @@
 
 package org.jboss.messaging.core.client.impl;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.concurrent.Executor;
+
 import org.jboss.messaging.core.buffers.ChannelBuffers;
 import org.jboss.messaging.core.client.ClientFileMessage;
 import org.jboss.messaging.core.client.ClientMessage;
 import org.jboss.messaging.core.client.MessageHandler;
 import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.list.PriorityLinkedList;
+import org.jboss.messaging.core.list.impl.PriorityLinkedListImpl;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.Channel;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
@@ -27,14 +35,6 @@
 import org.jboss.messaging.utils.Future;
 import org.jboss.messaging.utils.TokenBucketLimiter;
 
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.concurrent.Executor;
-
 /**
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
@@ -54,6 +54,8 @@
 
    public static final long CLOSE_TIMEOUT_MILLISECONDS = 10000;
 
+   public static final int NUM_PRIORITIES = 10;
+
    // Attributes
    // -----------------------------------------------------------------------------------
 
@@ -69,7 +71,7 @@
 
    private final int ackBatchSize;
 
-   private final Queue<ClientMessageInternal> buffer = new LinkedList<ClientMessageInternal>();
+   private final PriorityLinkedList<ClientMessageInternal> buffer = new PriorityLinkedListImpl<ClientMessageInternal>(NUM_PRIORITIES);
 
    private final Runner runner = new Runner();
 
@@ -172,7 +174,7 @@
             
             synchronized (this)
             {
-               while ((stopped || (m = buffer.poll()) == null) &&
+               while ((stopped || (m = buffer.removeFirst()) == null) &&
                       !closed && toWait > 0)
                {
                   if (start == -1)
@@ -374,7 +376,7 @@
       {
          // Execute using executor
 
-         buffer.add(messageToHandle);
+         buffer.addLast(messageToHandle, messageToHandle.getPriority());
          if (!stopped)
          {
             queueExecutor();
@@ -383,7 +385,7 @@
       else
       {
          // Add it to the buffer
-         buffer.add(messageToHandle);
+         buffer.addLast(messageToHandle, messageToHandle.getPriority());
 
          notify();
       }
@@ -623,7 +625,7 @@
 
          synchronized (this)
          {
-            message = buffer.poll();
+            message = buffer.removeFirst();
          }
 
          if (message != null)




More information about the jboss-cvs-commits mailing list