Author: timfox
Date: 2009-11-26 08:30:41 -0500 (Thu, 26 Nov 2009)
New Revision: 8413
Modified:
branches/20-optimisation/build-hornetq.xml
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java
branches/20-optimisation/src/main/org/hornetq/core/server/impl/DivertImpl.java
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/management/ManagementHelperTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/opt/SendTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/util/ServiceTestBase.java
Log:
fixed some tests and re-instated new HornetQDecoder
Modified: branches/20-optimisation/build-hornetq.xml
===================================================================
--- branches/20-optimisation/build-hornetq.xml 2009-11-26 13:19:27 UTC (rev 8412)
+++ branches/20-optimisation/build-hornetq.xml 2009-11-26 13:30:41 UTC (rev 8413)
@@ -1223,9 +1223,9 @@
haltonerror="${junit.batchtest.haltonerror}"
failureproperty="tests.failed">
<formatter type="plain"
usefile="${junit.formatter.usefile}"/>
- <fileset dir="${test.classes.dir}">
- <!-- <exclude name="**/integration/http/*" /> -->
+ <fileset dir="${test.classes.dir}">
<include name="${tests.param}"/>
+ <exclude name="**/integration/cluster/reattach/Netty*" />
</fileset>
</batchtest>
</junit>
Modified:
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-11-26
13:19:27 UTC (rev 8412)
+++
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-11-26
13:30:41 UTC (rev 8413)
@@ -697,7 +697,6 @@
log.trace("Setting up flowControlSize to " +
message.getPacketSize() + " on message = " + clMessage);
}
- // log.info("setting flow control size as " +
message.getPacketSize());
clMessage.setFlowControlSize(message.getPacketSize());
consumer.handleMessage(clMessage);
Modified:
branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java 2009-11-26
13:19:27 UTC (rev 8412)
+++
branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java 2009-11-26
13:30:41 UTC (rev 8413)
@@ -259,9 +259,12 @@
public void setDestination(final SimpleString destination)
{
- this.destination = destination;
+ if (this.destination != destination)
+ {
+ this.destination = destination;
- bufferValid = false;
+ bufferValid = false;
+ }
}
public byte getType()
@@ -276,9 +279,12 @@
public void setDurable(final boolean durable)
{
- this.durable = durable;
+ if (this.durable != durable)
+ {
+ this.durable = durable;
- bufferValid = false;
+ bufferValid = false;
+ }
}
public long getExpiration()
@@ -288,9 +294,12 @@
public void setExpiration(final long expiration)
{
+ if (this.expiration != expiration)
+ {
this.expiration = expiration;
bufferValid = false;
+ }
}
public long getTimestamp()
@@ -300,9 +309,12 @@
public void setTimestamp(final long timestamp)
{
+ if (this.timestamp != timestamp)
+ {
this.timestamp = timestamp;
bufferValid = false;
+ }
}
public byte getPriority()
@@ -312,9 +324,12 @@
public void setPriority(final byte priority)
{
- this.priority = priority;
+ if (this.priority != priority)
+ {
+ this.priority = priority;
- bufferValid = false;
+ bufferValid = false;
+ }
}
public boolean isExpired()
Modified:
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java 2009-11-26
13:19:27 UTC (rev 8412)
+++
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java 2009-11-26
13:30:41 UTC (rev 8413)
@@ -387,7 +387,7 @@
{
lastReceivedCommandID++;
- receivedBytes += packet.getPacketSize();
+ receivedBytes += packet.getPacketSize();
if (receivedBytes >= confWindowSize)
{
Modified:
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java 2009-11-26
13:19:27 UTC (rev 8412)
+++
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java 2009-11-26
13:30:41 UTC (rev 8413)
@@ -111,6 +111,7 @@
requiresResponse = buffer.readBoolean();
buffer.readerIndex(ri);
+
}
// Private -------------------------------------------------------
Modified: branches/20-optimisation/src/main/org/hornetq/core/server/impl/DivertImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/server/impl/DivertImpl.java 2009-11-26
13:19:27 UTC (rev 8412)
+++
branches/20-optimisation/src/main/org/hornetq/core/server/impl/DivertImpl.java 2009-11-26
13:30:41 UTC (rev 8413)
@@ -81,9 +81,7 @@
// We must make a copy of the message, otherwise things like returning credits to
the page won't work
// properly on ack, since the original destination will be overwritten
- // TODO we can optimise this so it doesn't copy if it's not routed anywhere
else
-
- log.info("making copy for divert");
+ // TODO we can optimise this so it doesn't copy if it's not routed anywhere
else
ServerMessage copy = message.copy();
Modified:
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-11-26
13:19:27 UTC (rev 8412)
+++
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-11-26
13:30:41 UTC (rev 8413)
@@ -185,10 +185,8 @@
public HandleStatus handle(final MessageReference ref) throws Exception
{
- //log.info("handling message");
if (availableCredits != null && availableCredits.get() <= 0)
{
- // log.info("busy");
return HandleStatus.BUSY;
}
@@ -418,7 +416,6 @@
public void receiveCredits(final int credits) throws Exception
{
- // log.info("Receiving credits " + credits);
if (credits == -1)
{
// No flow control
@@ -592,12 +589,9 @@
if (availableCredits != null)
{
- //log.info("Subtracting credits " + packet.getPacketSize());
availableCredits.addAndGet(-packet.getPacketSize());
}
- // log.info("delivered message");
-
}
// Inner classes
Modified:
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2009-11-26
13:19:27 UTC (rev 8412)
+++
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2009-11-26
13:30:41 UTC (rev 8413)
@@ -83,11 +83,6 @@
messageID = id;
}
- public void setType(final byte type)
- {
- this.type = type;
- }
-
public MessageReference createReference(final Queue queue)
{
MessageReference ref = new MessageReferenceImpl(this, queue);
Modified:
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-26
13:19:27 UTC (rev 8412)
+++
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-26
13:30:41 UTC (rev 8413)
@@ -1574,8 +1574,6 @@
final CreditManagerHolder holder = this.getCreditManagerHolder(address);
int credits = packet.getCredits();
-
- //log.info("requesting credits " + credits);
int gotCredits = holder.manager.acquireCredits(credits, new
CreditsAvailableRunnable()
{
@@ -1597,8 +1595,6 @@
}
});
- //log.info("got credits " + gotCredits);
-
if (gotCredits > 0)
{
sendProducerCredits(holder, gotCredits, address);
Modified:
branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java 2009-11-26
13:19:27 UTC (rev 8412)
+++
branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java 2009-11-26
13:30:41 UTC (rev 8413)
@@ -47,7 +47,7 @@
public static void addCodecFilter(final ChannelPipeline pipeline, final BufferHandler
handler)
{
assert pipeline != null;
- pipeline.addLast("decoder", new HornetQFrameDecoder(handler));
+ pipeline.addLast("decoder", new HornetQFrameDecoder2());
}
public static void addSSLFilter(final ChannelPipeline pipeline, final SSLContext
context, final boolean client) throws Exception
Modified:
branches/20-optimisation/tests/src/org/hornetq/tests/integration/management/ManagementHelperTest.java
===================================================================
---
branches/20-optimisation/tests/src/org/hornetq/tests/integration/management/ManagementHelperTest.java 2009-11-26
13:19:27 UTC (rev 8412)
+++
branches/20-optimisation/tests/src/org/hornetq/tests/integration/management/ManagementHelperTest.java 2009-11-26
13:30:41 UTC (rev 8413)
@@ -60,7 +60,7 @@
String operationName = randomString();
String param = randomString();
String[] params = new String[] { randomString(), randomString(), randomString() };
- Message msg = new ClientMessageImpl();
+ Message msg = new ClientMessageImpl((byte)0, false, 0, 0, (byte)4, 1000);
ManagementHelper.putOperationInvocation(msg, resource, operationName, param,
params);
Object[] parameters = ManagementHelper.retrieveOperationParameters(msg);
@@ -148,7 +148,7 @@
Object[] params = new Object[] { i, s, d, b, l, map, strArray, maps };
- Message msg = new ClientMessageImpl();
+ Message msg = new ClientMessageImpl((byte)0, false, 0, 0, (byte)4, 1000);
ManagementHelper.putOperationInvocation(msg, resource, operationName, params);
Object[] parameters = ManagementHelper.retrieveOperationParameters(msg);
@@ -214,7 +214,7 @@
Object[] params = new Object[] { "hello", map };
- Message msg = new ClientMessageImpl();
+ Message msg = new ClientMessageImpl((byte)0, false, 0, 0, (byte)4, 1000);
ManagementHelper.putOperationInvocation(msg, resource, operationName, params);
Object[] parameters = ManagementHelper.retrieveOperationParameters(msg);
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/opt/SendTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/opt/SendTest.java 2009-11-26
13:19:27 UTC (rev 8412)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/opt/SendTest.java 2009-11-26
13:30:41 UTC (rev 8413)
@@ -15,11 +15,14 @@
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
@@ -38,6 +41,7 @@
import org.hornetq.integration.transports.netty.TransportConstants;
import org.hornetq.jms.HornetQQueue;
import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.client.HornetQMessage;
import org.hornetq.jms.client.HornetQSession;
import org.hornetq.tests.util.RandomUtil;
@@ -56,7 +60,7 @@
{
try
{
- new SendTest().runTextMessage();
+ new SendTest().runConsume();
}
catch (Exception e)
{
@@ -70,8 +74,9 @@
{
log.info("*** Starting server");
- System.setProperty("org.hornetq.opt.dontadd", "true");
+ //System.setProperty("org.hornetq.opt.dontadd", "true");
// System.setProperty("org.hornetq.opt.routeblast", "true");
+ System.setProperty("org.hornetq.opt.generatemessages",
"true");
Configuration configuration = new ConfigurationImpl();
configuration.setSecurityEnabled(false);
@@ -210,6 +215,134 @@
server.stop();
}
+ public void runSendConsume() throws Exception
+ {
+ startServer();
+
+ Map<String, Object> params = new HashMap<String, Object>();
+
+ // params.put(TransportConstants.HOST_PROP_NAME, "localhost");
+
+ // params.put(TransportConstants.PORT_PROP_NAME, 5445);
+
+ params.put(TransportConstants.TCP_NODELAY_PROPNAME, Boolean.FALSE);
+ //params.put(TransportConstants.USE_NIO_PROP_NAME, Boolean.FALSE);
+
+ TransportConfiguration tc = new
TransportConfiguration(NettyConnectorFactory.class.getCanonicalName(), params);
+
+ //TransportConfiguration tc = new
TransportConfiguration(InVMConnectorFactory.class.getCanonicalName(), params);
+
+ HornetQConnectionFactory cf = new HornetQConnectionFactory(tc);
+
+ cf.setProducerWindowSize(1024 * 1024);
+
+ Connection conn = cf.createConnection();
+
+ Session sess = conn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
+
+ ClientSession coreSession = ((HornetQSession)sess).getCoreSession();
+
+ coreSession.createQueue("jms.queue.test_queue",
"jms.queue.test_queue");
+
+ Queue queue = new HornetQQueue("test_queue");
+
+ MessageConsumer cons = sess.createConsumer(queue);
+
+ conn.start();
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ final int warmup = 500000;
+
+ final int numMessages = 2000000;
+
+ MessageListener listener = new MessageListener()
+ {
+ int count;
+ public void onMessage(Message message)
+ {
+ count++;
+
+ if (count % 10000 == 0)
+ {
+ log.info("received " + count);
+ }
+
+ if (count == numMessages + warmup)
+ {
+ latch.countDown();
+ }
+ }
+ };
+
+ cons.setMessageListener(listener);
+
+ MessageProducer prod = sess.createProducer(queue);
+
+ prod.setDisableMessageID(true);
+
+ prod.setDisableMessageTimestamp(true);
+
+ prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ byte[] bytes1 = new byte[] { (byte)'A',
(byte)'B',(byte)'C',(byte)'D'};
+
+ String s = new String(bytes1);
+
+ System.out.println("Str is " + s);
+
+ byte[] bytes = RandomUtil.randomBytes(512);
+
+ String str = new String(bytes);
+
+
+ log.info("Warming up");
+
+ TextMessage tm = sess.createTextMessage();
+
+ tm.setText(str);
+
+ for (int i = 0; i < warmup; i++)
+ {
+ prod.send(tm);
+
+ if (i % 10000 == 0)
+ {
+ log.info("sent " + i);
+ }
+ }
+
+ log.info("** WARMUP DONE");
+
+ tm = sess.createTextMessage();
+
+ tm.setText(str);
+
+ long start = System.currentTimeMillis();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ prod.send(tm);
+
+ if (i % 10000 == 0)
+ {
+ log.info("sent " + i);
+ }
+ }
+
+ latch.countDown();
+
+ sess.close();
+
+ long end = System.currentTimeMillis();
+
+ double rate = 1000 * (double)numMessages / (end - start);
+
+ System.out.println("Rate of " + rate + " msgs / sec");
+
+ server.stop();
+ }
+
public void runObjectMessage() throws Exception
{
startServer();
@@ -259,18 +392,24 @@
log.info("sending messages");
+
+
for (int i = 0; i < warmup; i++)
{
- ObjectMessage om = sess.createObjectMessage(str);
+// ObjectMessage om = sess.createObjectMessage(str);
+//
+// prod.send(om);
- prod.send(om);
+ TextMessage tm = sess.createTextMessage(str);
+
+ prod.send(tm);
if (i % 10000 == 0)
{
log.info("sent " + i);
}
- om.setObject(str);
+ //om.setObject(str);
}
log.info("** WARMUP DONE");
@@ -279,18 +418,24 @@
long start = System.currentTimeMillis();
+
+
for (int i = 0; i < numMessages; i++)
- {
- ObjectMessage om = sess.createObjectMessage(str);
+ {
+// ObjectMessage om = sess.createObjectMessage(str);
+//
+// prod.send(om);
- prod.send(om);
+ TextMessage tm = sess.createTextMessage(str);
+
+ prod.send(tm);
if (i % 10000 == 0)
{
log.info("sent " + i);
}
- om.setObject(str);
+ //om.setObject(str);
}
long end = System.currentTimeMillis();
@@ -302,4 +447,120 @@
server.stop();
}
+ public void runConsume() throws Exception
+ {
+ startServer();
+
+ Map<String, Object> params = new HashMap<String, Object>();
+
+ // params.put(TransportConstants.HOST_PROP_NAME, "localhost");
+
+ // params.put(TransportConstants.PORT_PROP_NAME, 5445);
+
+ params.put(TransportConstants.TCP_NODELAY_PROPNAME, Boolean.FALSE);
+ //params.put(TransportConstants.USE_NIO_PROP_NAME, Boolean.FALSE);
+
+ TransportConfiguration tc = new
TransportConfiguration(NettyConnectorFactory.class.getCanonicalName(), params);
+
+ //TransportConfiguration tc = new
TransportConfiguration(InVMConnectorFactory.class.getCanonicalName(), params);
+
+ HornetQConnectionFactory cf = new HornetQConnectionFactory(tc);
+
+ Connection conn = cf.createConnection();
+
+ Session sess = conn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
+
+ ClientSession coreSession = ((HornetQSession)sess).getCoreSession();
+
+ coreSession.createQueue("jms.queue.test_queue",
"jms.queue.test_queue");
+
+ Queue queue = new HornetQQueue("test_queue");
+
+ MessageConsumer cons = sess.createConsumer(queue);
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ final int warmup = 50000;
+
+ final int numMessages = 2000000;
+
+ MessageListener listener = new MessageListener()
+ {
+ int count;
+ long start;
+ public void onMessage(Message message)
+ {
+ count++;
+
+ log.info("got message " +
((HornetQMessage)message).getCoreMessage().getMessageID());
+
+ if (count == warmup)
+ {
+ log.info("** WARMED UP");
+
+ start = System.currentTimeMillis();
+ }
+
+ if (count % 10000 == 0)
+ {
+ log.info("received " + count);
+ }
+
+ if (count == numMessages + warmup)
+ {
+ long end = System.currentTimeMillis();
+
+ double rate = 1000 * (double)numMessages / (end - start);
+
+ System.out.println("Rate of " + rate + " msgs /
sec");
+
+ latch.countDown();
+ }
+ }
+ };
+
+ cons.setMessageListener(listener);
+
+ MessageProducer prod = sess.createProducer(queue);
+
+ prod.setDisableMessageID(true);
+
+ prod.setDisableMessageTimestamp(true);
+
+ prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ byte[] bytes = RandomUtil.randomBytes(1);
+
+ String str = new String(bytes);
+
+
+ //Load up the queue with messages
+
+ TextMessage tm = sess.createTextMessage();
+
+ tm.setText(str);
+
+ log.info("loading queue with messages");
+
+ for (int i = 0; i < numMessages + warmup; i++)
+ {
+ prod.send(tm);
+
+ if (i % 10000 == 0)
+ {
+ log.info("sent " + i);
+ }
+ }
+
+ log.info("** loaded queue");
+
+ conn.start();
+
+ latch.await();
+
+ sess.close();
+
+ server.stop();
+ }
+
}
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
---
branches/20-optimisation/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2009-11-26
13:19:27 UTC (rev 8412)
+++
branches/20-optimisation/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2009-11-26
13:30:41 UTC (rev 8413)
@@ -353,7 +353,7 @@
public String getTextMessage(ClientMessage m)
{
- //m.getBodyBuffer().resetReaderIndex();
+ m.getBodyBuffer().resetReaderIndex();
return m.getBodyBuffer().readString();
}