[jboss-cvs] JBoss Messaging SVN: r4296 - in trunk: examples/jms/src/org/jboss/jms/example and 19 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sat May 24 03:07:10 EDT 2008
Author: timfox
Date: 2008-05-24 03:07:10 -0400 (Sat, 24 May 2008)
New Revision: 4296
Added:
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ConsumerFlowCreditMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ProducerFlowCreditMessage.java
Removed:
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ConsumerFlowTokenMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ProducerReceiveTokensMessage.java
Modified:
trunk/examples/jms/build.xml
trunk/examples/jms/src/org/jboss/jms/example/PerfExample.java
trunk/examples/jms/src/org/jboss/jms/util/PerfParams.java
trunk/src/config/jbm-configuration.xml
trunk/src/config/queues.xml
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerInternal.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerPacketHandler.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/jboss/messaging/core/deployers/impl/QueueSettingsDeployer.java
trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/jboss/messaging/core/postoffice/FlowController.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/FlowControllerImpl.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MessagingCodec.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaSession.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerResponseMessage.java
trunk/src/main/org/jboss/messaging/core/server/Queue.java
trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java
trunk/src/main/org/jboss/messaging/core/server/ServerProducer.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/QueueFactoryImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerPacketHandler.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/jboss/messaging/core/settings/impl/QueueSettings.java
trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java
trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/deployers/impl/QueueSettingsDeployerTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/wireformat/PacketTypeTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/settings/impl/QueueSettingsTest.java
Log:
Disable MINA write queue blocking, enable producer flow control and various other bits
Modified: trunk/examples/jms/build.xml
===================================================================
--- trunk/examples/jms/build.xml 2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/examples/jms/build.xml 2008-05-24 07:07:10 UTC (rev 4296)
@@ -42,11 +42,12 @@
<!--perf props-->
<property name="message.count" value="200000"/>
- <property name="delivery.mode" value="NON_PERSISTENT"/>
+ <property name="message.warmup.count" value="10000"/>
+ <property name="delivery.mode" value="PERSISTENT"/>
<!-- in seconds -->
<property name="sample.period" value="1"/>
- <property name="sess.trans" value="false"/>
- <property name="sess.trans.size" value="1"/>
+ <property name="sess.trans" value="true"/>
+ <property name="sess.trans.size" value="100"/>
<path id="compile.classpath">
<fileset dir="${lib.dir}">
@@ -125,6 +126,7 @@
<jvmarg value="-XX:+UseFastAccessorMethods"/>
<arg value="-l"/>
<arg value="${message.count}"/>
+ <arg value="${message.warmup.count}"/>
<arg value="${delivery.mode}"/>
<arg value="${sample.period}"/>
<arg value="${sess.trans}"/>
@@ -152,6 +154,7 @@
<jvmarg value="-XX:+UseFastAccessorMethods"/>
<arg value="-s"/>
<arg value="${message.count}"/>
+ <arg value="${message.warmup.count}"/>
<arg value="${delivery.mode}"/>
<arg value="${sample.period}"/>
<arg value="${sess.trans}"/>
Modified: trunk/examples/jms/src/org/jboss/jms/example/PerfExample.java
===================================================================
--- trunk/examples/jms/src/org/jboss/jms/example/PerfExample.java 2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/examples/jms/src/org/jboss/jms/example/PerfExample.java 2008-05-24 07:07:10 UTC (rev 4296)
@@ -53,14 +53,16 @@
PerfExample perfExample = new PerfExample();
int noOfMessages = Integer.parseInt(args[1]);
- int deliveryMode = args[2].equalsIgnoreCase("persistent")? DeliveryMode.PERSISTENT: DeliveryMode.NON_PERSISTENT;
- long samplePeriod = Long.parseLong(args[3]);
- boolean transacted = Boolean.parseBoolean(args[4]);
+ int noOfWarmupMessages = Integer.parseInt(args[2]);
+ int deliveryMode = args[3].equalsIgnoreCase("persistent")? DeliveryMode.PERSISTENT: DeliveryMode.NON_PERSISTENT;
+ long samplePeriod = Long.parseLong(args[4]);
+ boolean transacted = Boolean.parseBoolean(args[5]);
log.info("Transacted:" + transacted);
- int transactionBatchSize = Integer.parseInt(args[5]);
+ int transactionBatchSize = Integer.parseInt(args[6]);
PerfParams perfParams = new PerfParams();
perfParams.setNoOfMessagesToSend(noOfMessages);
+ perfParams.setNoOfWarmupMessages(noOfWarmupMessages);
perfParams.setDeliveryMode(deliveryMode);
perfParams.setSamplePeriod(samplePeriod);
perfParams.setSessionTransacted(transacted);
@@ -93,17 +95,14 @@
{
log.info("params = " + perfParams);
init(perfParams.isSessionTransacted());
- // use 10% of the messages to warm up the system
- int warmupMessages = perfParams.getNoOfMessagesToSend() / 10;
- log.info("warming up by sending " + warmupMessages + " messages");
- sendMessages(warmupMessages, perfParams.getTransactionBatchSize(), perfParams.getDeliveryMode(), perfParams.isSessionTransacted());
- log.info("warmed up");
- // do not take into account messages received during warmup
+ // use 10% of the messages to warm up the system
+ log.info("warming up by sending " + perfParams.getNoOfWarmupMessages() + " messages");
+ sendMessages(perfParams.getNoOfWarmupMessages(), perfParams.getTransactionBatchSize(), perfParams.getDeliveryMode(), perfParams.isSessionTransacted());
+ log.info("warmed up");
messageCount.set(0);
- int remainingMessages = perfParams.getNoOfMessagesToSend() - warmupMessages;
scheduler.scheduleAtFixedRate(command, perfParams.getSamplePeriod(), perfParams.getSamplePeriod(), TimeUnit.SECONDS);
- sendMessages(remainingMessages, perfParams.getTransactionBatchSize(), perfParams.getDeliveryMode(), perfParams.isSessionTransacted());
+ sendMessages(perfParams.getNoOfMessagesToSend(), perfParams.getTransactionBatchSize(), perfParams.getDeliveryMode(), perfParams.isSessionTransacted());
scheduler.shutdownNow();
log.info("average: " + (command.getAverage() / perfParams.getSamplePeriod()) + " msg/s");
@@ -208,11 +207,11 @@
class PerfListener implements MessageListener
{
private CountDownLatch countDownLatch;
- PerfParams perfParams;
- boolean started = false;
+ private PerfParams perfParams;
+
+ private boolean started = false;
-
public PerfListener(CountDownLatch countDownLatch, PerfParams perfParams)
{
this.countDownLatch = countDownLatch;
Modified: trunk/examples/jms/src/org/jboss/jms/util/PerfParams.java
===================================================================
--- trunk/examples/jms/src/org/jboss/jms/util/PerfParams.java 2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/examples/jms/src/org/jboss/jms/util/PerfParams.java 2008-05-24 07:07:10 UTC (rev 4296)
@@ -30,6 +30,7 @@
public class PerfParams implements Serializable
{
int noOfMessagesToSend = 1000;
+ int noOfWarmupMessages;
long samplePeriod = 1; // in seconds
int deliveryMode = DeliveryMode.NON_PERSISTENT;
boolean isSessionTransacted = false;
@@ -44,7 +45,17 @@
{
this.noOfMessagesToSend = noOfMessagesToSend;
}
+
+ public int getNoOfWarmupMessages()
+ {
+ return noOfWarmupMessages;
+ }
+ public void setNoOfWarmupMessages(int noOfWarmupMessages)
+ {
+ this.noOfWarmupMessages = noOfWarmupMessages;
+ }
+
public long getSamplePeriod()
{
return samplePeriod;
Modified: trunk/src/config/jbm-configuration.xml
===================================================================
--- trunk/src/config/jbm-configuration.xml 2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/config/jbm-configuration.xml 2008-05-24 07:07:10 UTC (rev 4296)
@@ -46,7 +46,8 @@
<remoting-writequeue-minbytes>0</remoting-writequeue-minbytes>
- <remoting-writequeue-maxbytes>8192</remoting-writequeue-maxbytes>
+ <!-- Effectively disable this since we're using producer and consumer flow control -->
+ <remoting-writequeue-maxbytes>100000000</remoting-writequeue-maxbytes>
<!-- if ssl is enabled, all remoting-ssl-* properties must be set -->
<remoting-enable-ssl>false</remoting-enable-ssl>
@@ -69,7 +70,7 @@
<create-journal-dir>true</create-journal-dir>
- <journal-type>asyncio</journal-type>
+ <journal-type>nio</journal-type>
<journal-sync>true</journal-sync>
Modified: trunk/src/config/queues.xml
===================================================================
--- trunk/src/config/queues.xml 2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/config/queues.xml 2008-05-24 07:07:10 UTC (rev 4296)
@@ -104,4 +104,4 @@
<message-counter-history-day-limit>10</message-counter-history-day-limit>
</queue-settings>
-</deployment>
\ No newline at end of file
+</deployment>
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java 2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java 2008-05-24 07:07:10 UTC (rev 4296)
@@ -118,7 +118,7 @@
this.strictTck = false;
this.defaultConsumerWindowSize = 1024 * 1024;
this.defaultConsumerMaxRate = -1;
- this.defaultProducerWindowSize = 1000;
+ this.defaultProducerWindowSize = 1024 * 1024;
this.defaultProducerMaxRate = -1;
this.location = location;
this.defaultSendNonPersistentMessagesBlocking = false;
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2008-05-24 07:07:10 UTC (rev 4296)
@@ -31,7 +31,7 @@
import org.jboss.messaging.core.list.PriorityLinkedList;
import org.jboss.messaging.core.list.impl.PriorityLinkedListImpl;
import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.impl.wireformat.ConsumerFlowTokenMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.ConsumerFlowCreditMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.EmptyPacket;
/**
@@ -84,7 +84,7 @@
private volatile long ignoreDeliveryMark = -1;
- private volatile int tokensToSend;
+ private volatile int creditsToSend;
@@ -384,13 +384,13 @@
{
if (clientWindowSize > 0)
{
- tokensToSend += messageBytes;
+ creditsToSend += messageBytes;
- if (tokensToSend >= clientWindowSize)
+ if (creditsToSend >= clientWindowSize)
{
- remotingConnection.sendOneWay(targetID, session.getServerTargetID(), new ConsumerFlowTokenMessage(tokensToSend));
+ remotingConnection.sendOneWay(targetID, session.getServerTargetID(), new ConsumerFlowCreditMessage(creditsToSend));
- tokensToSend = 0;
+ creditsToSend = 0;
}
}
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java 2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java 2008-05-24 07:07:10 UTC (rev 4296)
@@ -21,6 +21,8 @@
*/
package org.jboss.messaging.core.client.impl;
+import java.util.concurrent.Semaphore;
+
import org.jboss.messaging.core.client.AcknowledgementHandler;
import org.jboss.messaging.core.client.ClientMessage;
import org.jboss.messaging.core.exception.MessagingException;
@@ -63,8 +65,10 @@
//For limit throttling
- private volatile int windowSize;
+ //private AtomicInteger availableCredits = new AtomicInteger(0);
+ private Semaphore availableCredits;
+
//For rate throttling
private final TokenBucketLimiter rateLimiter;
@@ -72,6 +76,8 @@
private final boolean sendNonPersistentMessagesSynchronously;
private final boolean sendPersistentMessagesSynchronously;
+
+ private final boolean creditFlowControl;
// Static ---------------------------------------------------------------------------------------
@@ -80,10 +86,11 @@
public ClientProducerImpl(final ClientSessionInternal session, final long serverTargetID,
final long clientTargetID,
final SimpleString address,
- final RemotingConnection remotingConnection, final int windowSize,
+ final RemotingConnection remotingConnection,
final int maxRate,
final boolean sendNonPersistentMessagesSynchronously,
- final boolean sendPersistentMessagesSynchronously)
+ final boolean sendPersistentMessagesSynchronously,
+ final int initialCredits)
{
this.session = session;
@@ -95,8 +102,6 @@
this.remotingConnection = remotingConnection;
- this.windowSize = windowSize;
-
if (maxRate != -1)
{
this.rateLimiter = new TokenBucketLimiter(maxRate, false);
@@ -108,7 +113,13 @@
this.sendNonPersistentMessagesSynchronously = sendNonPersistentMessagesSynchronously;
- this.sendPersistentMessagesSynchronously = sendPersistentMessagesSynchronously;
+ this.sendPersistentMessagesSynchronously = sendPersistentMessagesSynchronously;
+
+// this.availableCredits.set(initialCredits);
+
+ this.availableCredits = new Semaphore(initialCredits);
+
+ this.creditFlowControl = initialCredits != -1;
}
// ClientProducer implementation ----------------------------------------------------------------
@@ -142,34 +153,19 @@
{
msg.setDestination(this.address);
}
-
- ProducerSendMessage message = new ProducerSendMessage(msg);
+
+ if (rateLimiter != null)
+ {
+ // Rate flow control
+
+ rateLimiter.limit();
+ }
- //TODO flow control disabled for now
-
-// //We only flow control with non-anonymous producers
-// if (address == null)
-// {
-// while (windowSize == 0)
-// {
-// synchronized (this)
-// {
-// try
-// {
-// wait();
-// }
-// catch (InterruptedException e)
-// {
-// }
-// }
-// }
-//
-// windowSize--;
-// }
-
boolean sendBlocking = msg.isDurable() && sendPersistentMessagesSynchronously ||
!msg.isDurable() && sendNonPersistentMessagesSynchronously;
-
+
+ ProducerSendMessage message = new ProducerSendMessage(msg);
+
if (sendBlocking)
{
remotingConnection.sendBlocking(serverTargetID, session.getServerTargetID(), message);
@@ -177,14 +173,36 @@
else
{
remotingConnection.sendOneWay(serverTargetID, session.getServerTargetID(), message);
- }
-
- if (rateLimiter != null)
- {
- // Rate flow control
-
- rateLimiter.limit();
- }
+ }
+
+ //We only flow control with non-anonymous producers
+ if (address == null && creditFlowControl)
+ {
+ try
+ {
+ this.availableCredits.acquire(message.getClientMessage().encodeSize());
+ }
+ catch (InterruptedException e)
+ {
+ }
+
+// while (availableCredits.get() <= 0)
+// {
+// //log.info("**blocked");
+// synchronized (this)
+// {
+// try
+// {
+// wait();
+// }
+// catch (InterruptedException e)
+// {
+// }
+// }
+// }
+//
+// availableCredits.addAndGet(-message.getClientMessage().encodeSize());
+ }
}
public void registerAcknowledgementHandler(final AcknowledgementHandler handler)
@@ -218,11 +236,21 @@
// ClientProducerInternal implementation --------------------------------------------------------
- public synchronized void receiveTokens(int tokens)
+ public void receiveCredits(final int credits)
{
- windowSize += tokens;
-
- notify();
+ // log.info("received credits " + credits);
+
+ this.availableCredits.release(credits);
+
+// int prev = availableCredits.getAndAdd(credits);
+//
+// if (prev <= 0 && prev + credits > 0)
+// {
+// synchronized (this)
+// {
+// notify();
+// }
+// }
}
// Public ---------------------------------------------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerInternal.java 2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerInternal.java 2008-05-24 07:07:10 UTC (rev 4296)
@@ -17,5 +17,5 @@
*/
public interface ClientProducerInternal extends ClientProducer
{
- void receiveTokens(int tokens) throws Exception;
+ void receiveCredits(int credits) throws Exception;
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerPacketHandler.java 2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerPacketHandler.java 2008-05-24 07:07:10 UTC (rev 4296)
@@ -5,7 +5,7 @@
import org.jboss.messaging.core.remoting.PacketHandler;
import org.jboss.messaging.core.remoting.PacketReturner;
import org.jboss.messaging.core.remoting.impl.wireformat.EmptyPacket;
-import org.jboss.messaging.core.remoting.impl.wireformat.ProducerReceiveTokensMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.ProducerFlowCreditMessage;
/**
*
@@ -42,9 +42,9 @@
if (type == EmptyPacket.PROD_RECEIVETOKENS)
{
- ProducerReceiveTokensMessage message = (ProducerReceiveTokensMessage) packet;
+ ProducerFlowCreditMessage message = (ProducerFlowCreditMessage) packet;
- clientProducer.receiveTokens(message.getTokens());
+ clientProducer.receiveCredits(message.getTokens());
}
else
{
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-05-24 07:07:10 UTC (rev 4296)
@@ -39,7 +39,7 @@
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.Packet;
-import org.jboss.messaging.core.remoting.impl.wireformat.ConsumerFlowTokenMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.ConsumerFlowCreditMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.EmptyPacket;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionAddDestinationMessage;
@@ -311,10 +311,10 @@
remotingConnection.getPacketDispatcher().register(new ClientConsumerPacketHandler(consumer, clientTargetID));
- //Now we send window size tokens to start the consumption
+ //Now we send window size credits to start the consumption
//We even send it if windowSize == -1, since we need to start the consumer
- remotingConnection.sendOneWay(response.getConsumerTargetID(), serverTargetID, new ConsumerFlowTokenMessage(response.getWindowSize()));
+ remotingConnection.sendOneWay(response.getConsumerTargetID(), serverTargetID, new ConsumerFlowCreditMessage(response.getWindowSize()));
return consumer;
}
@@ -367,11 +367,12 @@
//maxRate and windowSize can be overridden by the server
producer = new ClientProducerImpl(this, response.getProducerTargetID(), clientTargetID, address,
- remotingConnection, response.getWindowSize(),
+ remotingConnection,
response.getMaxRate(),
- sendNonPersistentMessagesBlocking, autoCommitSends);
+ sendNonPersistentMessagesBlocking, autoCommitSends,
+ response.getInitialCredits());
- remotingConnection.getPacketDispatcher().register(new ClientProducerPacketHandler(producer, clientTargetID));
+ remotingConnection.getPacketDispatcher().register(new ClientProducerPacketHandler(producer, clientTargetID));
}
producers.add(producer);
Modified: trunk/src/main/org/jboss/messaging/core/deployers/impl/QueueSettingsDeployer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/deployers/impl/QueueSettingsDeployer.java 2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/deployers/impl/QueueSettingsDeployer.java 2008-05-24 07:07:10 UTC (rev 4296)
@@ -42,7 +42,7 @@
private static final String REDELIVERY_DELAY_NODE_NAME = "redelivery-delay";
- private static final String MAX_SIZE_NODE_NAME = "max-size";
+ private static final String MAX_SIZE_BYTES_NODE_NAME = "max-size-bytes";
private static final String DISTRIBUTION_POLICY_CLASS_NODE_NAME = "distribution-policy-class";
@@ -99,9 +99,9 @@
{
queueSettings.setRedeliveryDelay(Long.valueOf(child.getTextContent()));
}
- else if (MAX_SIZE_NODE_NAME.equalsIgnoreCase(child.getNodeName()))
+ else if (MAX_SIZE_BYTES_NODE_NAME.equalsIgnoreCase(child.getNodeName()))
{
- queueSettings.setMaxSize(Integer.valueOf(child.getTextContent()));
+ queueSettings.setMaxSizeBytes(Integer.valueOf(child.getTextContent()));
}
else if (DISTRIBUTION_POLICY_CLASS_NODE_NAME.equalsIgnoreCase(child.getNodeName()))
{
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-05-24 07:07:10 UTC (rev 4296)
@@ -426,7 +426,6 @@
throw new IllegalStateException("Journal must be loaded first");
}
-
int recordLength = record.encodeSize();
int size = SIZE_ADD_RECORD_TX + recordLength;
@@ -528,7 +527,7 @@
{
throw new IllegalStateException("Journal must be loaded first");
}
-
+
TransactionCallback callback = getTransactionCallback(txID);
callback.countUp();
@@ -585,8 +584,7 @@
if (state != STATE_LOADED)
{
throw new IllegalStateException("Journal must be loaded first");
- }
-
+ }
TransactionNegPos tx = transactionInfos.remove(txID);
@@ -743,7 +741,7 @@
int pos = bb.position();
byte recordType = bb.get();
-
+
switch(recordType)
{
case ADD_RECORD:
@@ -753,7 +751,7 @@
maxMessageID = Math.max(maxMessageID, id);
byte userRecordType = bb.get();
-
+
int size = bb.getInt();
byte[] record = new byte[size];
bb.get(record);
@@ -780,7 +778,7 @@
maxMessageID = Math.max(maxMessageID, id);
byte userRecordType = bb.get();
-
+
int size = bb.getInt();
byte[] record = new byte[size];
bb.get(record);
Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-05-24 07:07:10 UTC (rev 4296)
@@ -58,22 +58,22 @@
// Bindings journal record type
- private static final byte BINDING_RECORD = 1;
+ private static final byte BINDING_RECORD = 21;
- private static final byte DESTINATION_RECORD = 2;
+ private static final byte DESTINATION_RECORD = 22;
// type + expiration + timestamp + priority
public static final int SIZE_FIELDS = SIZE_INT + SIZE_LONG + SIZE_LONG + SIZE_BYTE;
// Message journal record types
- public static final byte ADD_MESSAGE = 1;
+ public static final byte ADD_MESSAGE = 31;
- public static final byte ACKNOWLEDGE_REF = 2;
+ public static final byte ACKNOWLEDGE_REF = 32;
- public static final byte UPDATE_DELIVERY_COUNT = 3;
+ public static final byte UPDATE_DELIVERY_COUNT = 33;
- public static final byte SET_SCHEDULED_DELIVERY_TIME = 4;
+ public static final byte SET_SCHEDULED_DELIVERY_TIME = 44;
private final AtomicLong messageIDSequence = new AtomicLong(0);
@@ -449,16 +449,12 @@
{
List<RecordInfo> records = new ArrayList<RecordInfo>();
- bindingsJournal.load(records, null);
+ long maxID = bindingsJournal.load(records, null);
- long maxID = -1;
-
for (RecordInfo record: records)
{
long id = record.id;
- maxID = Math.max(maxID, id);
-
byte[] data = record.data;
ByteArrayInputStream bais = new ByteArrayInputStream(data);
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/FlowController.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/FlowController.java 2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/FlowController.java 2008-05-24 07:07:10 UTC (rev 4296)
@@ -35,7 +35,7 @@
{
void messageAcknowledged() throws Exception;
- void messageReceived(ServerProducer producer, int windowSize) throws Exception;
+ void requestAndSendCredits(ServerProducer producer, int windowSize) throws Exception;
- int getInitialTokens(int windowSize, ServerProducer producer);
+ int getInitialCredits(int windowSize, ServerProducer producer) throws Exception;
}
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/FlowControllerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/FlowControllerImpl.java 2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/FlowControllerImpl.java 2008-05-24 07:07:10 UTC (rev 4296)
@@ -21,14 +21,11 @@
*/
package org.jboss.messaging.core.postoffice.impl;
-import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.FlowController;
import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.ServerProducer;
import org.jboss.messaging.util.SimpleString;
@@ -45,7 +42,7 @@
private int lastPot;
- private int tokenPot;
+ private int creditPot;
private final PostOffice postOffice;
@@ -58,17 +55,17 @@
this.address = address;
this.postOffice = postOffice;
-
- fillPot();
}
- public synchronized int getInitialTokens(final int windowSize, final ServerProducer producer)
- {
- int num = Math.min(windowSize, tokenPot);
+ public synchronized int getInitialCredits(final int windowSize, final ServerProducer producer) throws Exception
+ {
+ fillPot();
+
+ int num = Math.min(windowSize, creditPot);
- tokenPot -= num;
+ creditPot -= num;
- if (num == 0)
+ if (num <= 0)
{
//Register producer as a waiter or will never get any messages
@@ -84,28 +81,30 @@
//also don't want to execute the whole method if already waiting
public synchronized void messageAcknowledged() throws Exception
{
- fillPot();
-
- while (tokenPot > 0)
- {
- ServerProducer producer = waitingList.poll();
-
- if (producer == null)
- {
- break;
- }
-
- tokenPot--;
-
- producer.setWaiting(false);
-
- producer.sendCredits();
- }
+// log.info("acking");
+//
+// fillPot();
+//
+// log.info("Filled pot is now " + creditPot);
+//
+// while (creditPot > 0)
+// {
+// ServerProducer producer = waitingList.poll();
+//
+// if (producer == null)
+// {
+// break;
+// }
+//
+// producer.setWaiting(false);
+//
+// producer.requestAndSendCredits();
+// }
}
- public synchronized void messageReceived(final ServerProducer producer, final int windowSize) throws Exception
+ public synchronized void requestAndSendCredits(final ServerProducer producer, final int credits) throws Exception
{
- if (tokenPot == 0)
+ if (creditPot <= 0)
{
if (!producer.isWaiting())
{
@@ -116,48 +115,62 @@
}
else
{
- tokenPot--;
+ int creditsToTake = Math.min(credits, creditPot);
+
+ creditPot -= creditsToTake;
- producer.sendCredits();
+ producer.sendCredits(creditsToTake);
}
}
private void fillPot() throws Exception
{
- List<Binding> bindings = postOffice.getBindingsForAddress(address);
+ //TODO - for now we don't take max size into account
+
+// List<Binding> bindings = postOffice.getBindingsForAddress(address);
+//
+// int minAvailable = Integer.MAX_VALUE;
+//
+// for (Binding binding: bindings)
+// {
+// Queue queue = binding.getQueue();
+//
+// int maxSize = queue.getMaxSizeBytes();
+//
+//
+// //log.info("max size is " + maxSize);
+//
+// int available;
+//
+// if (maxSize == -1)
+// {
+// available = Integer.MAX_VALUE;
+// }
+// else
+// {
+// available = maxSize - queue.getSizeBytes();
+//
+// log.info("Available is " + available);
+// }
+//
+// if (available < 0)
+// {
+// available = 0;
+// }
+//
+// minAvailable = Math.min(available, minAvailable);
+//
+// log.info("min available is " + minAvailable);
+// }
+//
+// log.info("lastpot is " + lastPot);
+// if (minAvailable > lastPot)
+// {
+// creditPot += minAvailable - lastPot;
+//
+// lastPot = minAvailable;
+// }
- int minAvailable = Integer.MAX_VALUE;
-
- for (Binding binding: bindings)
- {
- Queue queue = binding.getQueue();
-
- int maxSize = queue.getMaxSize();
-
- int available;
-
- if (maxSize == -1)
- {
- available = Integer.MAX_VALUE;
- }
- else
- {
- available = maxSize - queue.getMessageCount();
- }
-
- if (available < 0)
- {
- available = 0;
- }
-
- minAvailable = Math.min(available, minAvailable);
- }
-
- if (minAvailable > lastPot)
- {
- tokenPot += minAvailable - lastPot;
-
- lastPot = minAvailable;
- }
+ creditPot = Integer.MAX_VALUE;
}
}
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2008-05-24 07:07:10 UTC (rev 4296)
@@ -108,7 +108,7 @@
{
if (!temporary)
{
- storageManager.addDestination(address);
+ storageManager.addDestination(address);
}
flowControllers.put(address, new FlowControllerImpl(address, this));
@@ -288,7 +288,7 @@
}
FlowController flowController = flowControllers.get(binding.getAddress());
-
+
binding.getQueue().setFlowController(flowController);
}
@@ -339,7 +339,13 @@
List<SimpleString> dests = new ArrayList<SimpleString>();
storageManager.loadBindings(queueFactory, bindings, dests);
-
+
+ //Destinations must be added first to ensure flow controllers exist before queues are created
+ for (SimpleString destination: destinations)
+ {
+ addDestination(destination, false);
+ }
+
Map<Long, Queue> queues = new HashMap<Long, Queue>();
for (Binding binding: bindings)
@@ -348,9 +354,7 @@
queues.put(binding.getQueue().getPersistenceID(), binding.getQueue());
}
-
- destinations.addAll(dests);
-
+
storageManager.loadMessages(this, queues);
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MessagingCodec.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MessagingCodec.java 2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MessagingCodec.java 2008-05-24 07:07:10 UTC (rev 4296)
@@ -27,14 +27,14 @@
import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.impl.wireformat.ConnectionCreateSessionMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.ConnectionCreateSessionResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.ConsumerFlowTokenMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.ConsumerFlowCreditMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.CreateConnectionRequest;
import org.jboss.messaging.core.remoting.impl.wireformat.CreateConnectionResponse;
import org.jboss.messaging.core.remoting.impl.wireformat.EmptyPacket;
import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
-import org.jboss.messaging.core.remoting.impl.wireformat.ProducerReceiveTokensMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.ProducerFlowCreditMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.ProducerSendMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.ReceiveMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
@@ -401,7 +401,7 @@
}
case EmptyPacket.CONS_FLOWTOKEN:
{
- packet = new ConsumerFlowTokenMessage();
+ packet = new ConsumerFlowCreditMessage();
break;
}
case EmptyPacket.PROD_SEND:
@@ -411,7 +411,7 @@
}
case EmptyPacket.PROD_RECEIVETOKENS:
{
- packet = new ProducerReceiveTokensMessage();
+ packet = new ProducerFlowCreditMessage();
break;
}
case EmptyPacket.RECEIVE_MSG:
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java 2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java 2008-05-24 07:07:10 UTC (rev 4296)
@@ -169,7 +169,7 @@
}
session = future.getSession();
- ServerPingerImpl pinger = new ServerPingerImpl(keepAliveHandler, (long) 0);
+ //ServerPingerImpl pinger = new ServerPingerImpl(keepAliveHandler, (long) 0);
/*
getDispatcher().register(pinger);
if (connectionParams.getKeepAliveInterval() > 0)
@@ -177,7 +177,7 @@
scheduledExecutor = new ScheduledThreadPoolExecutor(1);
scheduledExecutor.scheduleAtFixedRate(pinger, 0, connectionParams.getKeepAliveInterval(), TimeUnit.SECONDS);
}*/
- dispatcher.register(pinger);
+ //dispatcher.register(pinger);
return new MinaSession(session, handler);
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java 2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java 2008-05-24 07:07:10 UTC (rev 4296)
@@ -177,58 +177,58 @@
}
}
- @Override
- public synchronized void messageSent(final IoSession session, final Object message) throws Exception
- {
- if (blocked)
- {
- long bytes = session.getScheduledWriteBytes();
+// @Override
+// public synchronized void messageSent(final IoSession session, final Object message) throws Exception
+// {
+// if (blocked)
+// {
+// long bytes = session.getScheduledWriteBytes();
+//
+// if (bytes <= bytesLow)
+// {
+// blocked = false;
+//
+// //Note that we need to notify all since there may be more than one thread waiting on this
+// //E.g. the response from a blocking acknowledge and a delivery
+// notifyAll();
+// }
+// }
+// }
+//
+// public synchronized void checkWrite(final IoSession session) throws Exception
+// {
+// while (session.getScheduledWriteBytes() >= bytesHigh)
+// {
+// blocked = true;
+//
+// long start = System.currentTimeMillis();
+//
+// long toWait = blockTimeout;
+//
+// do
+// {
+// wait(toWait);
+//
+// if (session.getScheduledWriteBytes() < bytesHigh)
+// {
+// break;
+// }
+//
+// long now = System.currentTimeMillis();
+//
+// toWait -= now - start;
+//
+// start = now;
+// }
+// while (toWait > 0);
+//
+// if (toWait <= 0)
+// {
+// throw new IllegalStateException("Timed out waiting for MINA queue to free");
+// }
+// }
+// }
- if (bytes <= bytesLow)
- {
- blocked = false;
-
- //Note that we need to notify all since there may be more than one thread waiting on this
- //E.g. the response from a blocking acknowledge and a delivery
- notifyAll();
- }
- }
- }
-
- public synchronized void checkWrite(final IoSession session) throws Exception
- {
- while (session.getScheduledWriteBytes() >= bytesHigh)
- {
- blocked = true;
-
- long start = System.currentTimeMillis();
-
- long toWait = blockTimeout;
-
- do
- {
- wait(toWait);
-
- if (session.getScheduledWriteBytes() < bytesHigh)
- {
- break;
- }
-
- long now = System.currentTimeMillis();
-
- toWait -= now - start;
-
- start = now;
- }
- while (toWait > 0);
-
- if (toWait <= 0)
- {
- throw new IllegalStateException("Timed out waiting for MINA queue to free");
- }
- }
- }
-
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -246,14 +246,14 @@
{
public void send(Packet p) throws Exception
{
- try
- {
- checkWrite(session);
- }
- catch (Exception e)
- {
- log.error("Failed to acquire sem", e);
- }
+// try
+// {
+// checkWrite(session);
+// }
+// catch (Exception e)
+// {
+// log.error("Failed to acquire sem", e);
+// }
dispatcher.callFilters(p);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaSession.java 2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaSession.java 2008-05-24 07:07:10 UTC (rev 4296)
@@ -54,14 +54,14 @@
public void write(Packet packet)
{
- try
- {
- handler.checkWrite(session);
- }
- catch (Exception e)
- {
- log.error("Failed to acquire sem", e);
- }
+// try
+// {
+// handler.checkWrite(session);
+// }
+// catch (Exception e)
+// {
+// log.error("Failed to acquire sem", e);
+// }
session.write(packet);
}
Copied: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ConsumerFlowCreditMessage.java (from rev 4286, trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ConsumerFlowTokenMessage.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ConsumerFlowCreditMessage.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ConsumerFlowCreditMessage.java 2008-05-24 07:07:10 UTC (rev 4296)
@@ -0,0 +1,70 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.impl.wireformat;
+
+import org.jboss.messaging.util.MessagingBuffer;
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ *
+ * @version <tt>$Revision$</tt>
+ */
+public class ConsumerFlowCreditMessage extends EmptyPacket
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private int credits;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public ConsumerFlowCreditMessage(final int credits)
+ {
+ super(CONS_FLOWTOKEN);
+
+ this.credits = credits;
+ }
+
+ public ConsumerFlowCreditMessage()
+ {
+ super(CONS_FLOWTOKEN);
+ }
+
+ // Public --------------------------------------------------------
+
+ public int getTokens()
+ {
+ return credits;
+ }
+
+ public void encodeBody(final MessagingBuffer buffer)
+ {
+ buffer.putInt(credits);
+ }
+
+ public void decodeBody(final MessagingBuffer buffer)
+ {
+ credits = buffer.getInt();
+ }
+
+ @Override
+ public String toString()
+ {
+ return getParentString() + ", credits=" + credits + "]";
+ }
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Deleted: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ConsumerFlowTokenMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ConsumerFlowTokenMessage.java 2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ConsumerFlowTokenMessage.java 2008-05-24 07:07:10 UTC (rev 4296)
@@ -1,70 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.messaging.core.remoting.impl.wireformat;
-
-import org.jboss.messaging.util.MessagingBuffer;
-
-/**
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
- *
- * @version <tt>$Revision$</tt>
- */
-public class ConsumerFlowTokenMessage extends EmptyPacket
-{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private int tokens;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public ConsumerFlowTokenMessage(final int tokens)
- {
- super(CONS_FLOWTOKEN);
-
- this.tokens = tokens;
- }
-
- public ConsumerFlowTokenMessage()
- {
- super(CONS_FLOWTOKEN);
- }
-
- // Public --------------------------------------------------------
-
- public int getTokens()
- {
- return tokens;
- }
-
- public void encodeBody(final MessagingBuffer buffer)
- {
- buffer.putInt(tokens);
- }
-
- public void decodeBody(final MessagingBuffer buffer)
- {
- tokens = buffer.getInt();
- }
-
- @Override
- public String toString()
- {
- return getParentString() + ", tokens=" + tokens + "]";
- }
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ProducerFlowCreditMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ProducerFlowCreditMessage.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ProducerFlowCreditMessage.java 2008-05-24 07:07:10 UTC (rev 4296)
@@ -0,0 +1,75 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.impl.wireformat;
+
+import org.jboss.messaging.util.MessagingBuffer;
+
+/**
+ *
+ * A ProducerFlowCreditMessage
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class ProducerFlowCreditMessage extends EmptyPacket
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private int credits;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public ProducerFlowCreditMessage(final int credits)
+ {
+ super(PROD_RECEIVETOKENS);
+
+ this.credits = credits;
+ }
+
+ public ProducerFlowCreditMessage()
+ {
+ super(PROD_RECEIVETOKENS);
+ }
+
+ // Public --------------------------------------------------------
+
+ public int getTokens()
+ {
+ return credits;
+ }
+
+ public void encodeBody(final MessagingBuffer buffer)
+ {
+ buffer.putInt(credits);
+ }
+
+ public void decodeBody(final MessagingBuffer buffer)
+ {
+ credits = buffer.getInt();
+ }
+
+ @Override
+ public String toString()
+ {
+ StringBuffer buf = new StringBuffer(getParentString());
+ buf.append(", credits=" + credits);
+ buf.append("]");
+ return buf.toString();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Deleted: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ProducerReceiveTokensMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ProducerReceiveTokensMessage.java 2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ProducerReceiveTokensMessage.java 2008-05-24 07:07:10 UTC (rev 4296)
@@ -1,75 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.messaging.core.remoting.impl.wireformat;
-
-import org.jboss.messaging.util.MessagingBuffer;
-
-/**
- *
- * A ProducerReceiveTokensMessage
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- */
-public class ProducerReceiveTokensMessage extends EmptyPacket
-{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private int tokens;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public ProducerReceiveTokensMessage(final int tokens)
- {
- super(PROD_RECEIVETOKENS);
-
- this.tokens = tokens;
- }
-
- public ProducerReceiveTokensMessage()
- {
- super(PROD_RECEIVETOKENS);
- }
-
- // Public --------------------------------------------------------
-
- public int getTokens()
- {
- return tokens;
- }
-
- public void encodeBody(final MessagingBuffer buffer)
- {
- buffer.putInt(tokens);
- }
-
- public void decodeBody(final MessagingBuffer buffer)
- {
- tokens = buffer.getInt();
- }
-
- @Override
- public String toString()
- {
- StringBuffer buf = new StringBuffer(getParentString());
- buf.append(", tokens=" + tokens);
- buf.append("]");
- return buf.toString();
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerResponseMessage.java 2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerResponseMessage.java 2008-05-24 07:07:10 UTC (rev 4296)
@@ -21,7 +21,7 @@
private long producerTargetID;
- private int windowSize;
+ private int initialCredits;
private int maxRate;
@@ -29,13 +29,13 @@
// Constructors --------------------------------------------------
- public SessionCreateProducerResponseMessage(final long producerTargetID, final int windowSize, final int maxRate)
+ public SessionCreateProducerResponseMessage(final long producerTargetID, final int initialCredits, final int maxRate)
{
super(SESS_CREATEPRODUCER_RESP);
this.producerTargetID = producerTargetID;
- this.windowSize = windowSize;
+ this.initialCredits = initialCredits;
this.maxRate = maxRate;
}
@@ -52,9 +52,9 @@
return producerTargetID;
}
- public int getWindowSize()
+ public int getInitialCredits()
{
- return windowSize;
+ return initialCredits;
}
public int getMaxRate()
@@ -65,14 +65,14 @@
public void encodeBody(final MessagingBuffer buffer)
{
buffer.putLong(producerTargetID);
- buffer.putInt(windowSize);
+ buffer.putInt(initialCredits);
buffer.putInt(maxRate);
}
public void decodeBody(final MessagingBuffer buffer)
{
producerTargetID = buffer.getLong();
- windowSize = buffer.getInt();
+ initialCredits = buffer.getInt();
maxRate = buffer.getInt();
}
@@ -82,7 +82,7 @@
{
StringBuffer buf = new StringBuffer(getParentString());
buf.append(", producerTargetID=" + producerTargetID);
- buf.append(", windowSize=" + windowSize);
+ buf.append(", initialCredits=" + initialCredits);
buf.append(", maxRate=" + maxRate);
buf.append("]");
return buf.toString();
Modified: trunk/src/main/org/jboss/messaging/core/server/Queue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Queue.java 2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/server/Queue.java 2008-05-24 07:07:10 UTC (rev 4296)
@@ -78,16 +78,22 @@
int getDeliveringCount();
- void referenceAcknowledged() throws Exception;
+ void referenceAcknowledged(MessageReference ref) throws Exception;
void referenceCancelled();
int getScheduledCount();
- int getMaxSize();
+ // int getMaxSize();
- void setMaxSize(int maxSize);
+ // int getSizeBytes();
+ // void setMaxSize(int maxSize);
+
+ int getMaxSizeBytes();
+
+ int getSizeBytes();
+
DistributionPolicy getDistributionPolicy();
void setDistributionPolicy(DistributionPolicy policy);
Modified: trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java 2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java 2008-05-24 07:07:10 UTC (rev 4296)
@@ -39,5 +39,5 @@
void setStarted(boolean started) throws Exception;
- void receiveTokens(int tokens) throws Exception;
+ void receiveCredits(int credits) throws Exception;
}
Modified: trunk/src/main/org/jboss/messaging/core/server/ServerProducer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerProducer.java 2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerProducer.java 2008-05-24 07:07:10 UTC (rev 4296)
@@ -16,8 +16,10 @@
void send(ServerMessage msg) throws Exception;
- void sendCredits() throws Exception;
+ void requestAndSendCredits() throws Exception;
+ void sendCredits(int credits) throws Exception;
+
void setWaiting(boolean waiting);
boolean isWaiting();
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-05-24 07:07:10 UTC (rev 4296)
@@ -354,15 +354,15 @@
remotingService.getDispatcher().register(new ServerConnectionPacketHandler(connection));
CreateConnectionResponse createConnectionResponse = new CreateConnectionResponse(connection.getID(), version);
- if(cleanUpNotifier != null)
- {
- if(!getRemotingService().getKeepAliveFactory().isPinging(sender.getSessionID()))
- {
- getRemotingService().getKeepAliveFactory().getSessions().add(sender.getSessionID());
- ClientPinger clientPinger = new ClientPingerImpl(this, getRemotingService().getKeepAliveFactory(), cleanUpNotifier, sender);
- new Thread(clientPinger).start();
- }
- }
+// if(cleanUpNotifier != null)
+// {
+// if(!getRemotingService().getKeepAliveFactory().isPinging(sender.getSessionID()))
+// {
+// getRemotingService().getKeepAliveFactory().getSessions().add(sender.getSessionID());
+// ClientPinger clientPinger = new ClientPingerImpl(this, getRemotingService().getKeepAliveFactory(), cleanUpNotifier, sender);
+// new Thread(clientPinger).start();
+// }
+// }
return createConnectionResponse;
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueFactoryImpl.java 2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueFactoryImpl.java 2008-05-24 07:07:10 UTC (rev 4296)
@@ -58,7 +58,7 @@
QueueSettings queueSettings = queueSettingsRepository.getMatch(name.toString());
Queue queue = new QueueImpl(persistenceID, name, filter, queueSettings.isClustered(), durable, temporary,
- queueSettings.getMaxSize(), scheduledExecutor);
+ queueSettings.getMaxSizeBytes(), scheduledExecutor);
queue.setDistributionPolicy(queueSettings.getDistributionPolicy());
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-05-24 07:07:10 UTC (rev 4296)
@@ -77,8 +77,8 @@
private final boolean temporary;
- private volatile int maxSize;
-
+ private final int maxSizeBytes;
+
private final ScheduledExecutorService scheduledExecutor;
private final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<MessageReference>(
@@ -95,6 +95,8 @@
private boolean promptDelivery;
private int pos;
+
+ private AtomicInteger sizeBytes = new AtomicInteger(0);
private AtomicInteger messagesAdded = new AtomicInteger(0);
@@ -109,7 +111,7 @@
public QueueImpl(final long persistenceID, final SimpleString name,
final Filter filter, final boolean clustered, final boolean durable,
- final boolean temporary, final int maxSize,
+ final boolean temporary, final int maxSizeBytes,
final ScheduledExecutorService scheduledExecutor)
{
this.persistenceID = persistenceID;
@@ -124,7 +126,7 @@
this.temporary = temporary;
- this.maxSize = maxSize;
+ this.maxSizeBytes = maxSizeBytes;
this.scheduledExecutor = scheduledExecutor;
@@ -178,9 +180,6 @@
deliver();
}
- // private volatile int count = 0;
-
-
public void deliverAsync(final Executor executor)
{
//Prevent too many executors running at once
@@ -238,11 +237,7 @@
{
if (iterator == null)
{
-// count++;
-// if (count == 500000)
-// {
- messageReferences.removeFirst();
- // }
+ messageReferences.removeFirst();
}
else
{
@@ -385,14 +380,16 @@
return deliveringCount.get();
}
- public void referenceAcknowledged() throws Exception
+ public void referenceAcknowledged(MessageReference ref) throws Exception
{
deliveringCount.decrementAndGet();
+
+ sizeBytes.addAndGet(-ref.getMessage().encodeSize());
- if (flowController != null)
- {
- flowController.messageAcknowledged();
- }
+// if (flowController != null)
+// {
+// flowController.messageAcknowledged();
+// }
}
public void referenceCancelled()
@@ -400,20 +397,14 @@
deliveringCount.decrementAndGet();
}
- public int getMaxSize()
+ public int getMaxSizeBytes()
{
- return maxSize;
+ return maxSizeBytes;
}
-
- public synchronized void setMaxSize(final int maxSize)
+
+ public int getSizeBytes()
{
- int num = messageReferences.size() + scheduledRunnables.size();
-
- if (maxSize < num) { throw new IllegalArgumentException(
- "Cannot set maxSize to " + maxSize + " since there are " + num
- + " refs"); }
-
- this.maxSize = maxSize;
+ return sizeBytes.get();
}
public DistributionPolicy getDistributionPolicy()
@@ -499,19 +490,16 @@
private HandleStatus add(final MessageReference ref, final boolean first)
{
- if (maxSize != -1)
+ if (maxSizeBytes != -1 && sizeBytes.get() >= maxSizeBytes)
{
- int size = deliveringCount.get() + messageReferences.size() + scheduledRunnables.size();
-
- if (size >= maxSize)
- {
- return HandleStatus.BUSY;
- }
+ return HandleStatus.BUSY;
}
if (!first)
{
messagesAdded.incrementAndGet();
+
+ sizeBytes.addAndGet(ref.getMessage().encodeSize());
}
if (checkAndSchedule(ref))
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-05-24 07:07:10 UTC (rev 4296)
@@ -90,7 +90,7 @@
private final Object startStopLock = new Object();
- private final AtomicInteger availableTokens;
+ private final AtomicInteger availableCredits;
private boolean started;
@@ -139,11 +139,11 @@
if (enableFlowControl)
{
- availableTokens = new AtomicInteger(0);
+ availableCredits = new AtomicInteger(0);
}
else
{
- availableTokens = null;
+ availableCredits = null;
}
messageQueue.addConsumer(this);
@@ -163,7 +163,7 @@
public HandleStatus handle(MessageReference ref) throws Exception
{
- if (availableTokens != null && availableTokens.get() <= 0)
+ if (availableCredits != null && availableCredits.get() <= 0)
{
return HandleStatus.BUSY;
}
@@ -207,9 +207,9 @@
}
}
- if (availableTokens != null)
+ if (availableCredits != null)
{
- availableTokens.addAndGet(-message.encodeSize());
+ availableCredits.addAndGet(-message.encodeSize());
}
try
@@ -272,13 +272,13 @@
}
}
- public void receiveTokens(final int tokens) throws Exception
+ public void receiveCredits(final int credits) throws Exception
{
- if (availableTokens != null)
+ if (availableCredits != null)
{
- int previous = availableTokens.getAndAdd(tokens);
+ int previous = availableCredits.getAndAdd(credits);
- if (previous <= 0 && (previous + tokens) > 0)
+ if (previous <= 0 && (previous + credits) > 0)
{
promptDelivery();
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerPacketHandler.java 2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerPacketHandler.java 2008-05-24 07:07:10 UTC (rev 4296)
@@ -25,7 +25,7 @@
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.PacketReturner;
-import org.jboss.messaging.core.remoting.impl.wireformat.ConsumerFlowTokenMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.ConsumerFlowCreditMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.EmptyPacket;
import org.jboss.messaging.core.server.ServerConsumer;
@@ -61,8 +61,8 @@
switch (type)
{
case EmptyPacket.CONS_FLOWTOKEN:
- ConsumerFlowTokenMessage message = (ConsumerFlowTokenMessage) packet;
- consumer.receiveTokens(message.getTokens());
+ ConsumerFlowCreditMessage message = (ConsumerFlowCreditMessage) packet;
+ consumer.receiveCredits(message.getTokens());
break;
case EmptyPacket.CLOSE:
consumer.close();
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java 2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java 2008-05-24 07:07:10 UTC (rev 4296)
@@ -21,11 +21,13 @@
*/
package org.jboss.messaging.core.server.impl;
+import java.util.concurrent.atomic.AtomicInteger;
+
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.postoffice.FlowController;
import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.PacketReturner;
-import org.jboss.messaging.core.remoting.impl.wireformat.ProducerReceiveTokensMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.ProducerFlowCreditMessage;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.ServerProducer;
import org.jboss.messaging.core.server.ServerSession;
@@ -52,15 +54,21 @@
private final FlowController flowController;
+ private final int windowSize;
+
private final PacketReturner sender;
private volatile boolean waiting;
+ private AtomicInteger creditsToSend = new AtomicInteger(0);
+
// Constructors ----------------------------------------------------------------
- public ServerProducerImpl(final long id, final long clientTargetID, final ServerSession session, final SimpleString address,
+ public ServerProducerImpl(final long id, final long clientTargetID, final ServerSession session,
+ final SimpleString address,
final PacketReturner sender,
- final FlowController flowController) throws Exception
+ final FlowController flowController,
+ final int windowSize) throws Exception
{
this.id = id;
@@ -72,7 +80,9 @@
this.sender = sender;
- this.flowController = flowController;
+ this.flowController = flowController;
+
+ this.windowSize = windowSize;
}
// ServerProducer implementation --------------------------------------------
@@ -86,26 +96,41 @@
{
session.removeProducer(this);
}
-
+
+
public void send(final ServerMessage message) throws Exception
{
if (this.address != null)
{
//Only do flow control with non anonymous producers
- //TODO - flow control currently disabled
-// if (flowController != null)
-// {
-// flowController.messageReceived(this, 1);
-// }
+ if (flowController != null)
+ {
+ int creds = creditsToSend.addAndGet(message.encodeSize());
+
+ if (creds >= windowSize)
+ {
+ requestAndSendCredits();
+ }
+ }
}
session.send(message);
}
+
+ public void requestAndSendCredits() throws Exception
+ {
+ if (!waiting)
+ {
+ flowController.requestAndSendCredits(this, creditsToSend.get());
+ }
+ }
- public void sendCredits() throws Exception
+ public void sendCredits(final int credits) throws Exception
{
- Packet packet = new ProducerReceiveTokensMessage(1);
+ creditsToSend.addAndGet(-credits);
+
+ Packet packet = new ProducerFlowCreditMessage(credits);
packet.setTargetID(clientTargetID);
packet.setExecutorID(session.getID());
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-05-24 07:07:10 UTC (rev 4296)
@@ -979,7 +979,7 @@
SimpleString filterString = filter == null ? null : filter.getFilterString();
- response = new SessionQueueQueryResponseMessage(queue.isDurable(), queue.isTemporary(), queue.getMaxSize(),
+ response = new SessionQueueQueryResponseMessage(queue.isDurable(), queue.isTemporary(), queue.getMaxSizeBytes(),
queue.getConsumerCount(), queue.getMessageCount(),
filterString, binding.getAddress());
}
@@ -1044,9 +1044,8 @@
* @param address The address to produce too
* @param windowSize - the producer window size to use for flow control.
* Specify -1 to disable flow control completely
- * The actual window size used may be less than the specified window size if the queue's maxSize attribute
- * is set and there are not sufficient empty spaces in the queue, or it is overridden by any producer-window_size
- * specified on the queue
+ * The actual window size used may be less than the specified window size if
+ * it is overridden by any producer-window-size specified on the queue
*/
public SessionCreateProducerResponseMessage createProducer(final long clientTargetID, final SimpleString address, final int windowSize,
final int maxRate) throws Exception
@@ -1055,24 +1054,31 @@
final int maxRateToUse = maxRate;
- // TODO Flow control disabled for now
+ if (address != null)
+ {
+ flowController = windowSize == -1 ? null : postOffice.getFlowController(address);
+ }
-// if (address != null)
-// {
-// flowController = windowSize == -1 ? null : postOffice.getFlowController(address);
-// }
-
long id = dispatcher.generateID();
+
+ final int windowToUse = flowController == null ? -1 : windowSize;
+
+ //Server window size is 0.75 client window size for producer flow control (other way round to consumer flow control)
+
+ final int serverWindowSize = windowToUse == -1 ? -1 : (int)(windowToUse * 0.75);
+
+ ServerProducerImpl producer
+ = new ServerProducerImpl(id, clientTargetID, this, address, sender, flowController, serverWindowSize);
- ServerProducerImpl producer = new ServerProducerImpl(id, clientTargetID, this, address, sender, flowController);
-
producers.add(producer);
dispatcher.register(new ServerProducerPacketHandler(producer));
-
- final int windowToUse = flowController == null ? -1 : flowController.getInitialTokens(windowSize, producer);
-
- return new SessionCreateProducerResponseMessage(producer.getID(), windowToUse, maxRateToUse);
+
+ //Get some initial credits to send to the producer - we try for windowToUse
+
+ int initialCredits = flowController == null ? -1 : flowController.getInitialCredits(windowToUse, producer);
+
+ return new SessionCreateProducerResponseMessage(producer.getID(), initialCredits, maxRateToUse);
}
// Public ---------------------------------------------------------------------------------------------
@@ -1104,7 +1110,7 @@
}
}
- queue.referenceAcknowledged();
+ queue.referenceAcknowledged(ref);
}
Modified: trunk/src/main/org/jboss/messaging/core/settings/impl/QueueSettings.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/settings/impl/QueueSettings.java 2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/settings/impl/QueueSettings.java 2008-05-24 07:07:10 UTC (rev 4296)
@@ -23,7 +23,6 @@
import org.jboss.logging.Logger;
import org.jboss.messaging.core.server.DistributionPolicy;
-import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.impl.RoundRobinDistributionPolicy;
import org.jboss.messaging.core.settings.Mergeable;
import org.jboss.messaging.util.SimpleString;
@@ -32,6 +31,7 @@
* The Queue Settings that will be used to configure a queue
*
* @author <a href="ataylor at redhat.com">Andy Taylor</a>
+ * @author <a href="tim.fox at jboss.com">Tim Fox</a>
*/
public class QueueSettings implements Mergeable<QueueSettings>
{
@@ -41,14 +41,14 @@
*/
public static final DistributionPolicy DEFAULT_DISTRIBUTION_POLICY = new RoundRobinDistributionPolicy();
public static final Boolean DEFAULT_CLUSTERED = false;
- public static final Integer DEFAULT_MAX_SIZE = -1;
+ public static final Integer DEFAULT_MAX_SIZE_BYTES = -1;
public static final Integer DEFAULT_MAX_DELIVERY_ATTEMPTS = 10;
public static final Integer DEFAULT_MESSAGE_COUNTER_HISTORY_DAY_LIMIT = 0;
public static final Long DEFAULT_REDELIVER_DELAY = (long) 500;
private Boolean clustered = null;
- private Integer maxSize = null;
+ private Integer maxSizeBytes = null;
private String distributionPolicyClass = null;
private Integer maxDeliveryAttempts = null;
private Integer messageCounterHistoryDayLimit = null;
@@ -71,14 +71,14 @@
this.clustered = clustered;
}
- public Integer getMaxSize()
+ public Integer getMaxSizeBytes()
{
- return maxSize != null?maxSize:DEFAULT_MAX_SIZE;
+ return maxSizeBytes != null ? maxSizeBytes:DEFAULT_MAX_SIZE_BYTES;
}
- public void setMaxSize(Integer maxSize)
+ public void setMaxSizeBytes(Integer maxSizeBytes)
{
- this.maxSize = maxSize;
+ this.maxSizeBytes = maxSizeBytes;
}
public Integer getMaxDeliveryAttempts()
@@ -212,9 +212,9 @@
{
maxDeliveryAttempts = merged.maxDeliveryAttempts;
}
- if(maxSize == null)
+ if(maxSizeBytes == null)
{
- maxSize = merged.maxSize;
+ maxSizeBytes = merged.maxSizeBytes;
}
if(messageCounterHistoryDayLimit == null)
{
Modified: trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2008-05-24 07:07:10 UTC (rev 4296)
@@ -220,7 +220,7 @@
for (MessageReference reference : acknowledgements)
{
- reference.getQueue().referenceAcknowledged();
+ reference.getQueue().referenceAcknowledged(reference);
}
clear();
Modified: trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java 2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java 2008-05-24 07:07:10 UTC (rev 4296)
@@ -130,7 +130,7 @@
int consumerWindowSize = 1024 * 1024;
int consumerMaxRate = -1;
- int producerWindowSize = 1000;
+ int producerWindowSize = 1024 * 1024;
int producerMaxRate = -1;
boolean blockOnAcknowledge = false;
boolean sendNonPersistentMessagesSynchronously = false;
@@ -194,9 +194,6 @@
if (ENTRY_NODE_NAME.equalsIgnoreCase(children.item(i).getNodeName()))
{
-
- log.info("Creating cf ** with ws:" + producerWindowSize);
-
String jndiName = child.getAttributes().getNamedItem("name").getNodeValue();
String name = node.getAttributes().getNamedItem(getKeyAttribute()).getNodeValue();
jmsServerManager.createConnectionFactory(name, clientID, dupsOKBatchSize, cfStrictTck,
Modified: trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java 2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java 2008-05-24 07:07:10 UTC (rev 4296)
@@ -154,9 +154,9 @@
public boolean destroyQueue(String name) throws Exception
{
- JBossQueue jBossQueue = new JBossQueue(name);
- messagingServerManagement.destroyQueue(new SimpleString(name));
- messagingServerManagement.removeDestination(jBossQueue.getSimpleAddress());
+ // JBossQueue jBossQueue = new JBossQueue(name);
+// messagingServerManagement.destroyQueue(new SimpleString(name));
+// messagingServerManagement.removeDestination(jBossQueue.getSimpleAddress());
List<String> jndiBindings = destinations.get(name);
if (jndiBindings == null || jndiBindings.size() == 0)
{
@@ -172,8 +172,8 @@
public boolean destroyTopic(String name) throws Exception
{
- JBossTopic jBossTopic = new JBossTopic(name);
- messagingServerManagement.removeDestination(jBossTopic.getSimpleAddress());
+ // JBossTopic jBossTopic = new JBossTopic(name);
+ // messagingServerManagement.removeDestination(jBossTopic.getSimpleAddress());
List<String> jndiBindings = destinations.get(name);
if (jndiBindings == null || jndiBindings.size() == 0)
{
@@ -637,7 +637,7 @@
}
SubscriptionInfo info = new SubscriptionInfo(queue.getName().toString(), queue.isDurable(), subName, clientID,
- queue.getFilter() == null ? null : queue.getFilter().getFilterString().toString(), queue.getMessageCount(), queue.getMaxSize());
+ queue.getFilter() == null ? null : queue.getFilter().getFilterString().toString(), queue.getMessageCount(), queue.getMaxSizeBytes());
subs.add(info);
}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/deployers/impl/QueueSettingsDeployerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/deployers/impl/QueueSettingsDeployerTest.java 2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/deployers/impl/QueueSettingsDeployerTest.java 2008-05-24 07:07:10 UTC (rev 4296)
@@ -65,7 +65,7 @@
QueueSettings queueSettings = new QueueSettings();
queueSettings.setClustered(false);
queueSettings.setRedeliveryDelay((long) 100);
- queueSettings.setMaxSize(-100);
+ queueSettings.setMaxSizeBytes(-100);
queueSettings.setDistributionPolicyClass("org.jboss.messaging.core.impl.RoundRobinDistributionPolicy");
queueSettings.setMessageCounterHistoryDayLimit(1000);
queueSettings.setDLQ(new SimpleString("DLQtest"));
@@ -111,7 +111,7 @@
if (!queueSettings.isClustered().equals(that.isClustered())) return false;
if (!queueSettings.getDistributionPolicyClass().equals(that.getDistributionPolicyClass())) return false;
if (!queueSettings.getMaxDeliveryAttempts().equals(that.getMaxDeliveryAttempts())) return false;
- if (!queueSettings.getMaxSize().equals(that.getMaxSize())) return false;
+ if (!queueSettings.getMaxSizeBytes().equals(that.getMaxSizeBytes())) return false;
if (!queueSettings.getMessageCounterHistoryDayLimit().equals(that.getMessageCounterHistoryDayLimit()))
return false;
if (!queueSettings.getRedeliveryDelay().equals(that.getRedeliveryDelay())) return false;
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/wireformat/PacketTypeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/wireformat/PacketTypeTest.java 2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/wireformat/PacketTypeTest.java 2008-05-24 07:07:10 UTC (rev 4296)
@@ -80,13 +80,13 @@
import org.jboss.messaging.core.remoting.impl.mina.MessagingCodec;
import org.jboss.messaging.core.remoting.impl.wireformat.ConnectionCreateSessionMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.ConnectionCreateSessionResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.ConsumerFlowTokenMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.ConsumerFlowCreditMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.CreateConnectionRequest;
import org.jboss.messaging.core.remoting.impl.wireformat.CreateConnectionResponse;
import org.jboss.messaging.core.remoting.impl.wireformat.EmptyPacket;
import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
-import org.jboss.messaging.core.remoting.impl.wireformat.ProducerReceiveTokensMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.ProducerFlowCreditMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionAddDestinationMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryMessage;
@@ -486,7 +486,7 @@
randomLong(), randomInt(), randomInt());
Packet decodedPacket = encodeAndCheckBytesAndDecode(response, response
- .getProducerTargetID(), response.getWindowSize(), response
+ .getProducerTargetID(), response.getInitialCredits(), response
.getMaxRate());
assertTrue(decodedPacket instanceof SessionCreateProducerResponseMessage);
@@ -494,7 +494,7 @@
assertEquals(SESS_CREATEPRODUCER_RESP, decodedResponse.getType());
assertEquals(response.getProducerTargetID(), decodedResponse
.getProducerTargetID());
- assertEquals(response.getWindowSize(), decodedResponse.getWindowSize());
+ assertEquals(response.getInitialCredits(), decodedResponse.getInitialCredits());
assertEquals(response.getMaxRate(), decodedResponse.getMaxRate());
}
@@ -518,28 +518,28 @@
public void testConsumerFlowTokenMessage() throws Exception
{
- ConsumerFlowTokenMessage message = new ConsumerFlowTokenMessage(
+ ConsumerFlowCreditMessage message = new ConsumerFlowCreditMessage(
randomInt());
Packet decodedPacket = encodeAndCheckBytesAndDecode(message, message
.getTokens());
- assertTrue(decodedPacket instanceof ConsumerFlowTokenMessage);
- ConsumerFlowTokenMessage decodedMessage = (ConsumerFlowTokenMessage) decodedPacket;
+ assertTrue(decodedPacket instanceof ConsumerFlowCreditMessage);
+ ConsumerFlowCreditMessage decodedMessage = (ConsumerFlowCreditMessage) decodedPacket;
assertEquals(CONS_FLOWTOKEN, decodedMessage.getType());
assertEquals(message.getTokens(), decodedMessage.getTokens());
}
public void testProducerReceiveTokensMessage() throws Exception
{
- ProducerReceiveTokensMessage message = new ProducerReceiveTokensMessage(
+ ProducerFlowCreditMessage message = new ProducerFlowCreditMessage(
randomInt());
Packet decodedPacket = encodeAndCheckBytesAndDecode(message, message
.getTokens());
- assertTrue(decodedPacket instanceof ProducerReceiveTokensMessage);
- ProducerReceiveTokensMessage decodedMessage = (ProducerReceiveTokensMessage) decodedPacket;
+ assertTrue(decodedPacket instanceof ProducerFlowCreditMessage);
+ ProducerFlowCreditMessage decodedMessage = (ProducerFlowCreditMessage) decodedPacket;
assertEquals(PROD_RECEIVETOKENS, decodedMessage.getType());
assertEquals(message.getTokens(), decodedMessage.getTokens());
}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueTest.java 2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueTest.java 2008-05-24 07:07:10 UTC (rev 4296)
@@ -114,7 +114,7 @@
assertTrue(queue.isTemporary());
}
- public void testGetSetMaxSize()
+ public void testGetMaxSizeBytes()
{
final int maxSize = 123456;
@@ -124,13 +124,7 @@
assertEquals(id, queue.getPersistenceID());
- assertEquals(maxSize, queue.getMaxSize());
-
- final int maxSize2 = 654321;
-
- queue.setMaxSize(maxSize2);
-
- assertEquals(maxSize2, queue.getMaxSize());
+ assertEquals(maxSize, queue.getMaxSizeBytes());
}
public void testAddRemoveConsumer()
@@ -216,7 +210,7 @@
{
Queue queue = new QueueImpl(1, queue1, null, false, true, false, -1, scheduledExecutor);
- assertEquals(-1, queue.getMaxSize());
+ assertEquals(-1, queue.getMaxSizeBytes());
}
public void testSimpleAddLast()
@@ -513,9 +507,9 @@
refs.clear();
- for (int i = 0; i < numMessages; i++)
+ for (MessageReference ref: refs)
{
- queue.referenceAcknowledged();
+ queue.referenceAcknowledged(ref);
}
for (int i = 0; i < 2 * numMessages; i++)
@@ -537,11 +531,12 @@
cons1.getReferences().clear();
cons2.getReferences().clear();
- refs.clear();
- for (int i = 0; i < 2 * numMessages; i++)
+
+ for (MessageReference ref: refs)
{
- queue.referenceAcknowledged();
+ queue.referenceAcknowledged(ref);
}
+ refs.clear();
FakeConsumer cons3 = new FakeConsumer();
@@ -572,11 +567,12 @@
cons3.getReferences().clear();
cons2.getReferences().clear();
- refs.clear();
- for (int i = 0; i < 3 * numMessages; i++)
+
+ for (MessageReference ref: refs)
{
- queue.referenceAcknowledged();
+ queue.referenceAcknowledged(ref);
}
+ refs.clear();
for (int i = 0; i < 2 * numMessages; i++)
{
@@ -598,11 +594,12 @@
queue.removeConsumer(cons3);
cons2.getReferences().clear();
- refs.clear();
- for (int i = 0; i < 2 * numMessages; i++)
+
+ for (MessageReference ref: refs)
{
- queue.referenceAcknowledged();
+ queue.referenceAcknowledged(ref);
}
+ refs.clear();
for (int i = 0; i < numMessages; i++)
{
@@ -862,39 +859,7 @@
MessageReference ref = generateReference(queue, i);
assertEquals(HandleStatus.BUSY, queue.addLast(ref));
- }
-
- //Increase the max size
-
- queue.setMaxSize(2 * queue.getMaxSize());
-
- for (int i = 0; i < maxSize; i++)
- {
- MessageReference ref = generateReference(queue, i);
-
- refs.add(ref);
-
- assertEquals(HandleStatus.HANDLED, queue.addLast(ref));
- }
-
- assertEquals(maxSize * 2, queue.getMessageCount());
- assertEquals(0, queue.getScheduledCount());
- assertEquals(0, queue.getDeliveringCount());
-
- //Now try and decrease maxSize
-
- try
- {
- queue.setMaxSize(maxSize);
-
- fail("Should throw exception");
- }
- catch (IllegalArgumentException e)
- {
- //Ok
- }
-
- assertEquals(2 * maxSize, queue.getMaxSize());
+ }
}
public void testWithPriorities()
@@ -1098,7 +1063,7 @@
assertRefListsIdenticalRefs(refs, consumer.getReferences());
- queue.referenceAcknowledged();
+ queue.referenceAcknowledged(ref2);
queue.removeConsumer(consumer);
@@ -1206,8 +1171,8 @@
assertRefListsIdenticalRefs(refs, consumer.getReferences());
- queue.referenceAcknowledged();
- queue.referenceAcknowledged();
+ queue.referenceAcknowledged(ref5);
+ queue.referenceAcknowledged(ref6);
queue.removeConsumer(consumer);
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/settings/impl/QueueSettingsTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/settings/impl/QueueSettingsTest.java 2008-05-23 15:49:50 UTC (rev 4295)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/settings/impl/QueueSettingsTest.java 2008-05-24 07:07:10 UTC (rev 4296)
@@ -42,7 +42,7 @@
assertEquals(queueSettings.isClustered(), Boolean.valueOf(false));
assertEquals(queueSettings.getExpiryQueue(), null);
assertEquals(queueSettings.getMaxDeliveryAttempts(), QueueSettings.DEFAULT_MAX_DELIVERY_ATTEMPTS);
- assertEquals(queueSettings.getMaxSize(), QueueSettings.DEFAULT_MAX_SIZE);
+ assertEquals(queueSettings.getMaxSizeBytes(), QueueSettings.DEFAULT_MAX_SIZE_BYTES);
assertEquals(queueSettings.getMessageCounterHistoryDayLimit(), QueueSettings.DEFAULT_MESSAGE_COUNTER_HISTORY_DAY_LIMIT);
assertEquals(queueSettings.getRedeliveryDelay(), QueueSettings.DEFAULT_REDELIVER_DELAY);
@@ -58,7 +58,7 @@
queueSettingsToMerge.setDLQ(DLQ);
queueSettingsToMerge.setExpiryQueue(exp);
queueSettingsToMerge.setMaxDeliveryAttempts(1000);
- queueSettingsToMerge.setMaxSize(1001);
+ queueSettingsToMerge.setMaxSizeBytes(1001);
queueSettingsToMerge.setMessageCounterHistoryDayLimit(1002);
queueSettingsToMerge.setRedeliveryDelay((long)1003);
queueSettings.merge(queueSettingsToMerge);
@@ -68,7 +68,7 @@
assertEquals(queueSettings.getDLQ(), DLQ);
assertEquals(queueSettings.getExpiryQueue(), exp);
assertEquals(queueSettings.getMaxDeliveryAttempts(), Integer.valueOf(1000));
- assertEquals(queueSettings.getMaxSize(), Integer.valueOf(1001));
+ assertEquals(queueSettings.getMaxSizeBytes(), Integer.valueOf(1001));
assertEquals(queueSettings.getMessageCounterHistoryDayLimit(), Integer.valueOf(1002));
assertEquals(queueSettings.getRedeliveryDelay(), Long.valueOf(1003));
}
@@ -83,7 +83,7 @@
queueSettingsToMerge.setDLQ(DLQ);
queueSettingsToMerge.setExpiryQueue(exp);
queueSettingsToMerge.setMaxDeliveryAttempts(1000);
- queueSettingsToMerge.setMaxSize(1001);
+ queueSettingsToMerge.setMaxSizeBytes(1001);
queueSettingsToMerge.setMessageCounterHistoryDayLimit(1002);
queueSettings.merge(queueSettingsToMerge);
@@ -91,7 +91,7 @@
queueSettingsToMerge2.setClustered(true);
SimpleString exp2 = new SimpleString("testExpiryQueue2");
queueSettingsToMerge2.setExpiryQueue(exp2);
- queueSettingsToMerge2.setMaxSize(2001);
+ queueSettingsToMerge2.setMaxSizeBytes(2001);
queueSettingsToMerge2.setRedeliveryDelay((long)2003);
queueSettings.merge(queueSettingsToMerge2);
@@ -101,7 +101,7 @@
assertEquals(queueSettings.getDLQ(), DLQ);
assertEquals(queueSettings.getExpiryQueue(), exp);
assertEquals(queueSettings.getMaxDeliveryAttempts(), Integer.valueOf(1000));
- assertEquals(queueSettings.getMaxSize(), Integer.valueOf(1001));
+ assertEquals(queueSettings.getMaxSizeBytes(), Integer.valueOf(1001));
assertEquals(queueSettings.getMessageCounterHistoryDayLimit(), Integer.valueOf(1002));
assertEquals(queueSettings.getRedeliveryDelay(), Long.valueOf(2003));
}
@@ -115,7 +115,7 @@
SimpleString exp = new SimpleString("testExpiryQueue");
queueSettingsToMerge.setDLQ(DLQ);
queueSettingsToMerge.setExpiryQueue(exp);
- queueSettingsToMerge.setMaxSize(1001);
+ queueSettingsToMerge.setMaxSizeBytes(1001);
queueSettingsToMerge.setRedeliveryDelay((long)1003);
queueSettings.merge(queueSettingsToMerge);
@@ -126,7 +126,7 @@
queueSettingsToMerge2.setExpiryQueue(exp2);
queueSettingsToMerge2.setDLQ(DLQ2);
queueSettingsToMerge2.setMaxDeliveryAttempts(2000);
- queueSettingsToMerge2.setMaxSize(2001);
+ queueSettingsToMerge2.setMaxSizeBytes(2001);
queueSettingsToMerge2.setMessageCounterHistoryDayLimit(2002);
queueSettingsToMerge2.setRedeliveryDelay((long)2003);
queueSettings.merge(queueSettingsToMerge2);
@@ -137,7 +137,7 @@
assertEquals(queueSettings.getDLQ(), DLQ);
assertEquals(queueSettings.getExpiryQueue(), exp);
assertEquals(queueSettings.getMaxDeliveryAttempts(), Integer.valueOf(2000));
- assertEquals(queueSettings.getMaxSize(), Integer.valueOf(1001));
+ assertEquals(queueSettings.getMaxSizeBytes(), Integer.valueOf(1001));
assertEquals(queueSettings.getMessageCounterHistoryDayLimit(), Integer.valueOf(2002));
assertEquals(queueSettings.getRedeliveryDelay(), Long.valueOf(1003));
}
More information about the jboss-cvs-commits
mailing list