[jboss-cvs] JBoss Messaging SVN: r4198 - in trunk: src/main/org/jboss/messaging/util and 2 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed May 14 15:00:38 EDT 2008
Author: timfox
Date: 2008-05-14 15:00:38 -0400 (Wed, 14 May 2008)
New Revision: 4198
Modified:
trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
trunk/src/main/org/jboss/messaging/util/SimpleString.java
trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageProducerTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/CoreClientTest.java
Log:
Performance tweak
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-05-14 18:14:01 UTC (rev 4197)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-05-14 19:00:38 UTC (rev 4198)
@@ -99,7 +99,7 @@
private AtomicInteger deliveringCount = new AtomicInteger(0);
private volatile FlowController flowController;
-
+
public QueueImpl(final long persistenceID, final SimpleString name,
final Filter filter, final boolean clustered, final boolean durable,
final boolean temporary, final int maxSize,
@@ -475,121 +475,112 @@
private HandleStatus add(final MessageReference ref, final boolean first)
{
- if (!checkFull()) { return HandleStatus.BUSY; }
-
+ if (maxSize != -1)
+ {
+ int size = deliveringCount.get() + messageReferences.size() + scheduledRunnables.size();
+
+ if (size >= maxSize)
+ {
+ return HandleStatus.BUSY;
+ }
+ }
+
if (!first)
{
messagesAdded.incrementAndGet();
}
-
- if (!checkAndSchedule(ref))
+
+ if (checkAndSchedule(ref))
{
- boolean add = false;
+ return HandleStatus.HANDLED;
+ }
- if (direct)
- {
- // Deliver directly
+ boolean add = false;
- HandleStatus status = deliver(ref);
+ if (direct)
+ {
+ // Deliver directly
- if (status == HandleStatus.HANDLED)
- {
- // Ok
- }
- else if (status == HandleStatus.BUSY)
- {
- add = true;
- }
- else if (status == HandleStatus.NO_MATCH)
- {
- add = true;
- }
+ HandleStatus status = deliver(ref);
- if (add)
- {
- direct = false;
- }
+ if (status == HandleStatus.HANDLED)
+ {
+ // Ok
}
- else
+ else if (status == HandleStatus.BUSY)
{
add = true;
}
+ else if (status == HandleStatus.NO_MATCH)
+ {
+ add = true;
+ }
if (add)
{
- if (first)
- {
- messageReferences.addFirst(ref, ref.getMessage().getPriority());
- }
- else
- {
- messageReferences.addLast(ref, ref.getMessage().getPriority());
- }
-
- if (!direct && promptDelivery)
- {
- // We have consumers with filters which don't match, so we need
- // to prompt delivery every time
- // a new message arrives - this is why you really shouldn't use
- // filters with queues - in most cases
- // it's an ant-pattern since it would cause a queue scan on each
- // message
- deliver();
- }
+ direct = false;
}
}
+ else
+ {
+ add = true;
+ }
- return HandleStatus.HANDLED;
- }
-
- private boolean checkAndSchedule(final MessageReference ref)
- {
- long now = System.currentTimeMillis();
-
- if (scheduledExecutor != null && ref.getScheduledDeliveryTime() > now)
+ if (add)
{
- if (trace)
+ if (first)
{
- log.trace("Scheduling delivery for " + ref + " to occur at "
- + ref.getScheduledDeliveryTime());
+ // messageReferences.addFirst(ref, ref.getMessage().getPriority());
}
+ else
+ {
+ // messageReferences.addLast(ref, ref.getMessage().getPriority());
+ }
- long delay = ref.getScheduledDeliveryTime() - now;
+ if (!direct && promptDelivery)
+ {
+ // We have consumers with filters which don't match, so we need
+ // to prompt delivery every time
+ // a new message arrives - this is why you really shouldn't use
+ // filters with queues - in most cases
+ // it's an ant-pattern since it would cause a queue scan on each
+ // message
+ deliver();
+ }
+ }
- ScheduledDeliveryRunnable runnable = new ScheduledDeliveryRunnable(ref);
-
- scheduledRunnables.add(runnable);
-
- Future<?> future = scheduledExecutor.schedule(runnable, delay,
- TimeUnit.MILLISECONDS);
-
- runnable.setFuture(future);
-
- return true;
- }
- else
- {
- return false;
- }
+ return HandleStatus.HANDLED;
}
- private boolean checkFull()
+ private boolean checkAndSchedule(final MessageReference ref)
{
- if (maxSize != -1
- && (deliveringCount.get() + messageReferences.size() + scheduledRunnables
- .size()) >= maxSize)
- {
- if (trace)
+ long deliveryTime = ref.getScheduledDeliveryTime();
+
+ if (deliveryTime != 0 && scheduledExecutor != null)
+ {
+ long now = System.currentTimeMillis();
+
+ if (deliveryTime > now)
{
- log.trace(this + " queue is full, rejecting message");
+ if (trace)
+ {
+ log.trace("Scheduling delivery for " + ref + " to occur at " + deliveryTime);
+ }
+
+ long delay = deliveryTime - now;
+
+ ScheduledDeliveryRunnable runnable = new ScheduledDeliveryRunnable(ref);
+
+ scheduledRunnables.add(runnable);
+
+ Future<?> future = scheduledExecutor.schedule(runnable, delay, TimeUnit.MILLISECONDS);
+
+ runnable.setFuture(future);
+
+ return true;
}
-
- return false;
}
- else
- {
- return true;
- }
+ return false;
}
private HandleStatus deliver(final MessageReference reference)
Modified: trunk/src/main/org/jboss/messaging/util/SimpleString.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/SimpleString.java 2008-05-14 18:14:01 UTC (rev 4197)
+++ trunk/src/main/org/jboss/messaging/util/SimpleString.java 2008-05-14 19:00:38 UTC (rev 4198)
@@ -6,7 +6,6 @@
*/
package org.jboss.messaging.util;
-import static org.jboss.messaging.util.DataConstants.SIZE_BYTE;
import static org.jboss.messaging.util.DataConstants.SIZE_INT;
import java.io.Serializable;
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageProducerTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageProducerTest.java 2008-05-14 18:14:01 UTC (rev 4197)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageProducerTest.java 2008-05-14 19:00:38 UTC (rev 4198)
@@ -22,15 +22,12 @@
package org.jboss.test.messaging.jms;
import java.io.Serializable;
-import java.util.concurrent.CountDownLatch;
-import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
@@ -55,22 +52,6 @@
// Constructors --------------------------------------------------
- public static void main(String[] args)
- {
- try
- {
- MessageProducerTest test = new MessageProducerTest();
-
- test.setUp();
- test.testSpeed3();
- test.tearDown();
- }
- catch (Throwable t)
- {
- t.printStackTrace();
- }
- }
-
public MessageProducerTest(String name)
{
super(name);
@@ -137,8 +118,6 @@
pconn = cf.createConnection();
cconn = cf.createConnection();
- log.info("** created connections");
-
Session ps = pconn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session cs = cconn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer p = ps.createProducer(queue1);
@@ -169,209 +148,6 @@
}
}
- public void testSpeed() throws Exception
- {
- Connection pconn = null;
-
- try
- {
- pconn = cf.createConnection();
-
- Session ps = pconn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
-
- MessageProducer p = ps.createProducer(queue1);
-
- pconn.start();
-
- p.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-
- p.setDisableMessageID(true);
- p.setDisableMessageTimestamp(true);
-
- final int numMessages = 100000;
-
- long start = System.currentTimeMillis();
-
- BytesMessage msg = ps.createBytesMessage();
-
- msg.writeBytes(new byte[200]);
-
- for (int i = 0; i < numMessages; i++)
- {
- p.send(msg);
- }
-
- long end = System.currentTimeMillis();
-
- double actualRate = 1000 * (double)numMessages / ( end - start);
-
- log.info("rate " + actualRate + " msgs /sec");
-
- }
- finally
- {
- pconn.close();
- }
- }
-
- public void testSpeed2() throws Exception
- {
- Connection pconn = null;
-
- try
- {
- pconn = cf.createConnection();
-
- Session ps = pconn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
-
- MessageProducer p = ps.createProducer(queue1);
-
- MessageConsumer cons = ps.createConsumer(queue1);
-
- pconn.start();
-
- p.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-
- p.setDisableMessageID(true);
- p.setDisableMessageTimestamp(true);
-
- final int numMessages = 10000;
-
- long start = System.currentTimeMillis();
-
- BytesMessage msg = ps.createBytesMessage();
-
- msg.writeBytes(new byte[1000]);
-
- final CountDownLatch latch = new CountDownLatch(1);
-
- class MyListener implements MessageListener
- {
- int count;
-
- public void onMessage(Message msg)
- {
- count++;
-
- if (count == numMessages)
- {
- latch.countDown();
- }
- }
- }
-
- cons.setMessageListener(new MyListener());
-
- for (int i = 0; i < numMessages; i++)
- {
- p.send(msg);
- }
-
- latch.await();
-
- long end = System.currentTimeMillis();
-
- double actualRate = 1000 * (double)numMessages / ( end - start);
-
- log.info("rate " + actualRate + " msgs /sec");
-
- }
- finally
- {
- pconn.close();
- }
- }
-
- public MessageProducerTest()
- {
- super("MessageProducerTest");
- }
-
-
-
- public void testSpeed3() throws Exception
- {
- Connection pconn = null;
-
- try
- {
- pconn = cf.createConnection();
-
- Session ps = pconn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
-
- MessageProducer p = ps.createProducer(queue1);
-
- MessageConsumer cons = ps.createConsumer(queue1);
-
- p.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-
- p.setDisableMessageID(true);
- p.setDisableMessageTimestamp(true);
-
- final int numMessages = 100000;
-
- BytesMessage msg = ps.createBytesMessage();
-
- msg.writeBytes(new byte[200]);
-
- final CountDownLatch latch = new CountDownLatch(1);
-
- class MyListener implements MessageListener
- {
- int count;
-
- public void onMessage(Message msg)
- {
- count++;
-
- if (count == numMessages)
- {
- latch.countDown();
- }
- }
- }
-
- cons.setMessageListener(new MyListener());
-
- long start = System.currentTimeMillis();
-
-
- for (int i = 0; i < numMessages; i++)
- {
- p.send(msg);
- }
-
-
- long end = System.currentTimeMillis();
-
- double actualRate = 1000 * (double)numMessages / ( end - start);
-
- log.info("send rate " + actualRate + " msgs /sec");
-
- log.info("Sleeping");
-
- Thread.sleep(10000);
-
- log.info("Let's go....");
-
- pconn.start();
-
-
- latch.await();
-
- end = System.currentTimeMillis();
-
- actualRate = 1000 * (double)numMessages / ( end - start);
-
- log.info("consume rate " + actualRate + " msgs /sec");
-
- }
- finally
- {
- pconn.close();
- }
- }
-
public void testTransactedSendPersistent() throws Exception
{
transactedSend(true);
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/CoreClientTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/CoreClientTest.java 2008-05-14 18:14:01 UTC (rev 4197)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/CoreClientTest.java 2008-05-14 19:00:38 UTC (rev 4198)
@@ -57,35 +57,35 @@
super.tearDown();
}
-//
-//
-// public void testCoreClient() throws Exception
-// {
-// Location location = new LocationImpl(TransportType.TCP, "localhost", ConfigurationImpl.DEFAULT_REMOTING_PORT);
-//
-// ClientConnectionFactory cf = new ClientConnectionFactoryImpl(location);
-// ClientConnection conn = cf.createConnection();
-//
-// ClientSession session = conn.createClientSession(false, true, true, -1, false, false);
-// session.createQueue(QUEUE, QUEUE, null, false, false);
-//
-// ClientProducer producer = session.createProducer(QUEUE);
-//
-// ClientMessage message = new ClientMessageImpl(JBossTextMessage.TYPE, false, 0,
-// System.currentTimeMillis(), (byte) 1);
-// message.getBody().putString("testINVMCoreClient");
-// producer.send(message);
-//
-// ClientConsumer consumer = session.createConsumer(QUEUE);
-// conn.start();
-//
-// message = consumer.receive(1000);
-//
-// assertEquals("testINVMCoreClient", message.getBody().getString());
-//
-// conn.close();
-// }
+
+ public void testCoreClient() throws Exception
+ {
+ Location location = new LocationImpl(TransportType.TCP, "localhost", ConfigurationImpl.DEFAULT_REMOTING_PORT);
+
+ ClientConnectionFactory cf = new ClientConnectionFactoryImpl(location);
+ ClientConnection conn = cf.createConnection();
+
+ ClientSession session = conn.createClientSession(false, true, true, -1, false, false);
+ session.createQueue(QUEUE, QUEUE, null, false, false);
+
+ ClientProducer producer = session.createProducer(QUEUE);
+
+ ClientMessage message = new ClientMessageImpl(JBossTextMessage.TYPE, false, 0,
+ System.currentTimeMillis(), (byte) 1);
+ message.getBody().putString("testINVMCoreClient");
+ producer.send(message);
+
+ ClientConsumer consumer = session.createConsumer(QUEUE);
+ conn.start();
+
+ message = consumer.receive(1000);
+
+ assertEquals("testINVMCoreClient", message.getBody().getString());
+
+ conn.close();
+ }
+
public static void main(String[] args)
{
try
More information about the jboss-cvs-commits
mailing list