[jboss-cvs] JBoss Messaging SVN: r6211 - trunk/src/main/org/jboss/messaging/core/client/impl.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Mar 30 05:18:40 EDT 2009
Author: jmesnil
Date: 2009-03-30 05:18:40 -0400 (Mon, 30 Mar 2009)
New Revision: 6211
Modified:
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
Log:
JBMESSAGING-1560: Client-side message prioritization support
* in ClientConsumerImpl, used a PriorityLinkedList instead of Queue as the buffer of messages so that all messages in the buffer have an opportunity to be prioritized before being delivered.
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2009-03-30 09:01:24 UTC (rev 6210)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2009-03-30 09:18:40 UTC (rev 6211)
@@ -12,11 +12,19 @@
package org.jboss.messaging.core.client.impl;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.concurrent.Executor;
+
import org.jboss.messaging.core.buffers.ChannelBuffers;
import org.jboss.messaging.core.client.ClientFileMessage;
import org.jboss.messaging.core.client.ClientMessage;
import org.jboss.messaging.core.client.MessageHandler;
import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.list.PriorityLinkedList;
+import org.jboss.messaging.core.list.impl.PriorityLinkedListImpl;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.Channel;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
@@ -27,14 +35,6 @@
import org.jboss.messaging.utils.Future;
import org.jboss.messaging.utils.TokenBucketLimiter;
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.concurrent.Executor;
-
/**
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
@@ -54,6 +54,8 @@
public static final long CLOSE_TIMEOUT_MILLISECONDS = 10000;
+ public static final int NUM_PRIORITIES = 10;
+
// Attributes
// -----------------------------------------------------------------------------------
@@ -69,7 +71,7 @@
private final int ackBatchSize;
- private final Queue<ClientMessageInternal> buffer = new LinkedList<ClientMessageInternal>();
+ private final PriorityLinkedList<ClientMessageInternal> buffer = new PriorityLinkedListImpl<ClientMessageInternal>(NUM_PRIORITIES);
private final Runner runner = new Runner();
@@ -172,7 +174,7 @@
synchronized (this)
{
- while ((stopped || (m = buffer.poll()) == null) &&
+ while ((stopped || (m = buffer.removeFirst()) == null) &&
!closed && toWait > 0)
{
if (start == -1)
@@ -374,7 +376,7 @@
{
// Execute using executor
- buffer.add(messageToHandle);
+ buffer.addLast(messageToHandle, messageToHandle.getPriority());
if (!stopped)
{
queueExecutor();
@@ -383,7 +385,7 @@
else
{
// Add it to the buffer
- buffer.add(messageToHandle);
+ buffer.addLast(messageToHandle, messageToHandle.getPriority());
notify();
}
@@ -623,7 +625,7 @@
synchronized (this)
{
- message = buffer.poll();
+ message = buffer.removeFirst();
}
if (message != null)
More information about the jboss-cvs-commits
mailing list