Author: timfox
Date: 2009-11-19 06:23:14 -0500 (Thu, 19 Nov 2009)
New Revision: 8321
Modified:
branches/20-optimisation/src/main/org/hornetq/core/client/ClientSession.java
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
branches/20-optimisation/src/main/org/hornetq/core/client/impl/DelegatingSession.java
branches/20-optimisation/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java
branches/20-optimisation/src/main/org/hornetq/core/journal/impl/JournalImpl.java
branches/20-optimisation/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
branches/20-optimisation/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java
branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/AbstractBufferHandler.java
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder.java
branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/NettyConnection.java
branches/20-optimisation/src/main/org/hornetq/jms/client/HornetQTextMessage.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java
Log:
optimisation
Modified: branches/20-optimisation/src/main/org/hornetq/core/client/ClientSession.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/client/ClientSession.java 2009-11-19
03:01:55 UTC (rev 8320)
+++
branches/20-optimisation/src/main/org/hornetq/core/client/ClientSession.java 2009-11-19
11:23:14 UTC (rev 8321)
@@ -98,20 +98,10 @@
ClientProducer createProducer(SimpleString address, int rate) throws
HornetQException;
- ClientProducer createProducer(SimpleString address,
- int maxRate,
- boolean blockOnNonPersistentSend,
- boolean blockOnPersistentSend) throws HornetQException;
-
ClientProducer createProducer(String address) throws HornetQException;
ClientProducer createProducer(String address, int rate) throws HornetQException;
- ClientProducer createProducer(String address,
- int maxRate,
- boolean blockOnNonPersistentSend,
- boolean blockOnPersistentSend) throws HornetQException;
-
SessionQueueQueryResponseMessage queueQuery(SimpleString queueName) throws
HornetQException;
SessionBindingQueryResponseMessage bindingQuery(SimpleString address) throws
HornetQException;
Modified:
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java 2009-11-19
03:01:55 UTC (rev 8320)
+++
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java 2009-11-19
11:23:14 UTC (rev 8321)
@@ -66,8 +66,6 @@
checkCredits(credits);
- log.info("trying to acquire " + credits);
-
semaphore.acquire(credits);
}
Modified:
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-11-19
03:01:55 UTC (rev 8320)
+++
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-11-19
11:23:14 UTC (rev 8321)
@@ -438,7 +438,7 @@
public ClientProducer createProducer(final SimpleString address, final int maxRate)
throws HornetQException
{
- return createProducer(address, maxRate, blockOnNonPersistentSend,
blockOnPersistentSend);
+ return internalCreateProducer(address, maxRate);
}
public ClientProducer createProducer(final String address, final int rate) throws
HornetQException
@@ -446,22 +446,6 @@
return createProducer(toSimpleString(address), rate);
}
- public ClientProducer createProducer(final SimpleString address,
- final int maxRate,
- final boolean blockOnNonPersistentSend,
- final boolean blockOnPersistentSend) throws
HornetQException
- {
- return internalCreateProducer(address, maxRate, blockOnNonPersistentSend,
blockOnPersistentSend);
- }
-
- public ClientProducer createProducer(final String address,
- final int maxRate,
- final boolean blockOnNonPersistentSend,
- final boolean blockOnPersistentSend) throws
HornetQException
- {
- return createProducer(toSimpleString(address), maxRate, blockOnNonPersistentSend,
blockOnPersistentSend);
- }
-
public XAResource getXAResource()
{
return this;
@@ -1397,9 +1381,7 @@
}
private ClientProducer internalCreateProducer(final SimpleString address,
- final int maxRate,
- final boolean blockOnNonPersistentSend,
- final boolean blockOnPersistentSend)
throws HornetQException
+ final int maxRate) throws
HornetQException
{
checkClosed();
Modified:
branches/20-optimisation/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2009-11-19
03:01:55 UTC (rev 8320)
+++
branches/20-optimisation/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2009-11-19
11:23:14 UTC (rev 8321)
@@ -228,14 +228,6 @@
return session.createProducer();
}
- public ClientProducer createProducer(SimpleString address,
- int maxRate,
- boolean blockOnNonPersistentSend,
- boolean blockOnPersistentSend) throws
HornetQException
- {
- return session.createProducer(address, maxRate, blockOnNonPersistentSend,
blockOnPersistentSend);
- }
-
public ClientProducer createProducer(SimpleString address, int rate) throws
HornetQException
{
return session.createProducer(address, rate);
@@ -246,14 +238,6 @@
return session.createProducer(address);
}
- public ClientProducer createProducer(String address,
- int maxRate,
- boolean blockOnNonPersistentSend,
- boolean blockOnPersistentSend) throws
HornetQException
- {
- return session.createProducer(address, maxRate, blockOnNonPersistentSend,
blockOnPersistentSend);
- }
-
public ClientProducer createProducer(String address, int rate) throws
HornetQException
{
return session.createProducer(address, rate);
Modified:
branches/20-optimisation/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java 2009-11-19
03:01:55 UTC (rev 8320)
+++
branches/20-optimisation/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java 2009-11-19
11:23:14 UTC (rev 8321)
@@ -52,7 +52,7 @@
final boolean logRates)
{
this.journalDir = journalDir;
- log.info("** buffered?" + buffered);
+
if (buffered)
{
timedBuffer = new TimedBuffer(bufferSize, bufferTimeout, flushOnSync,
logRates);
Modified:
branches/20-optimisation/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-11-19
03:01:55 UTC (rev 8320)
+++
branches/20-optimisation/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-11-19
11:23:14 UTC (rev 8321)
@@ -926,8 +926,6 @@
callback = getSyncCallback(sync);
- // log.info("callback is " + callback);
-
lockAppend.lock();
try
{
@@ -957,7 +955,6 @@
if (callback != null)
{
callback.waitCompletion();
- // log.info("waited completion");
}
}
Modified:
branches/20-optimisation/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2009-11-19
03:01:55 UTC (rev 8320)
+++
branches/20-optimisation/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2009-11-19
11:23:14 UTC (rev 8321)
@@ -139,14 +139,14 @@
try
{
int bytesRead = channel.read(bytes);
-
+
if (callback != null)
{
callback.done();
}
-
+
bytes.flip();
-
+
return bytesRead;
}
catch (Exception e)
@@ -160,16 +160,12 @@
}
}
-
- public static AtomicInteger numSyncs = new AtomicInteger(0);
public void sync() throws Exception
{
if (channel != null)
{
channel.force(false);
-
- numSyncs.incrementAndGet();
}
}
@@ -208,7 +204,7 @@
{
throw new NullPointerException("callback parameter need to be set");
}
-
+
try
{
internalWrite(bytes, sync, callback);
@@ -233,8 +229,6 @@
*/
private void internalWrite(final ByteBuffer bytes, final boolean sync, final
IOCompletion callback) throws Exception
{
- //log.info("writing " + bytes.limit() + " bytes");
-
position.addAndGet(bytes.limit());
channel.write(bytes);
Modified:
branches/20-optimisation/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-11-19
03:01:55 UTC (rev 8320)
+++
branches/20-optimisation/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-11-19
11:23:14 UTC (rev 8321)
@@ -95,8 +95,7 @@
// Public --------------------------------------------------------
public TimedBuffer(final int size, final long timeout, final boolean flushOnSync,
final boolean logRates)
- {
- log.info("creating timed buffer, log rates is " + logRates);
+ {
bufferSize = 490 * 1024;
this.logRates = logRates;
if (logRates)
@@ -230,8 +229,6 @@
public synchronized void addBytes(final byte[] bytes, final boolean sync, final
IOCompletion callback)
{
- // log.info("timedbuffer addbytes, " + bytes.length + " sync "
+ sync);
-
if (buffer.writerIndex() == 0)
{
// Resume latch
@@ -253,17 +250,12 @@
if (flushOnSync)
{
- log.info("flushing on sync record added");
-
flush();
}
}
- //log.info("buffer writer index is now " + buffer.writerIndex());
-
if (buffer.writerIndex() == bufferLimit)
{
- log.info("flushing because reached buffer limit");
flush();
}
}
@@ -272,8 +264,6 @@
{
if (buffer.writerIndex() > 0)
{
- //log.info("actually flushing");
-
latchTimer.up();
int pos = buffer.writerIndex();
@@ -321,7 +311,6 @@
{
if (bufferObserver != null)
{
- //log.info("flushing on timer");
flush();
}
}
@@ -352,20 +341,15 @@
{
double rate = 1000 * ((double)bytesFlushed) / (now - lastExecution);
log.info("Write rate = " + rate + " bytes / sec or " +
(long)(rate / (1024 * 1024)) + " MiB / sec");
- double flushRate = 1000 * ((double)flushesDone) / (now - lastExecution);
- double numSyncs = 1000 * ((double)NIOSequentialFile.numSyncs.get()) / (now
- lastExecution);
+ double flushRate = 1000 * ((double)flushesDone) / (now - lastExecution);
log.info("Flush rate = " + flushRate + " flushes /
sec");
- log.info("numSyncs " + numSyncs);
}
lastExecution = now;
bytesFlushed = 0;
- flushesDone = 0;
-
- NIOSequentialFile.numSyncs.set(0);
-
+ flushesDone = 0;
}
}
Modified:
branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java 2009-11-19
03:01:55 UTC (rev 8320)
+++
branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java 2009-11-19
11:23:14 UTC (rev 8321)
@@ -145,8 +145,6 @@
{
// return getHeadersAndPropertiesEncodeSize() + SIZE_INT + getBodySize();
- //log.info("getting encode size, writer index is " +
buffer.writerIndex(), new Exception());
-
return buffer.writerIndex() - PACKET_HEADERS_SIZE;
}
@@ -167,27 +165,17 @@
properties.encode(buffer);
}
-// public void encodeBody(final HornetQBuffer buffer)
-// {
-// HornetQBuffer localBody = getBuffer();
-// buffer.writeBytes(localBody.array(), 0, localBody.writerIndex());
-// }
-
public void decode(final HornetQBuffer buffer)
{
decodeHeadersAndProperties(buffer);
this.buffer = buffer;
}
-
-
-
+
public void decodeHeadersAndProperties(final HornetQBuffer buffer)
{
- messageID = buffer.readLong();
- log.info("message id is " + messageID);
- destination = buffer.readSimpleString();
- log.info("destination is " + destination);
+ messageID = buffer.readLong();
+ destination = buffer.readSimpleString();
type = buffer.readByte();
durable = buffer.readBoolean();
expiration = buffer.readLong();
Modified:
branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-19
03:01:55 UTC (rev 8320)
+++
branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-19
11:23:14 UTC (rev 8321)
@@ -451,8 +451,6 @@
throw new HornetQException(HornetQException.ILLEGAL_STATE, "MessageId was
not assigned to Message");
}
- // log.info("calling store msg");
-
// Note that we don't sync, the add reference that comes immediately after will
sync if appropriate
if (message.isLargeMessage())
@@ -469,26 +467,22 @@
}
public void storeReference(final long queueID, final long messageID, final boolean
last) throws Exception
- {
- //log.info("calling store reference " + syncNonTransactional);
+ {
messageJournal.appendUpdateRecord(messageID, ADD_REF, new RefEncoding(queueID),
last && syncNonTransactional);
}
public void storeAcknowledge(final long queueID, final long messageID) throws
Exception
- {
- log.info("calling acknowledge");
+ {
messageJournal.appendUpdateRecord(messageID, ACKNOWLEDGE_REF, new
RefEncoding(queueID), syncNonTransactional);
}
public void deleteMessage(final long messageID) throws Exception
- {
- log.info("calling delete message");
+ {
messageJournal.appendDeleteRecord(messageID, syncNonTransactional);
}
public void updateScheduledDeliveryTime(final MessageReference ref) throws Exception
- {
- log.info("calling update sched delivery");
+ {
ScheduledDeliveryEncoding encoding = new
ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(), ref.getQueue()
.getID());
@@ -499,16 +493,14 @@
}
public void storeDuplicateID(final SimpleString address, final byte[] duplID, final
long recordID) throws Exception
- {
- log.info("calling store dupl id");
+ {
DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID);
messageJournal.appendAddRecord(recordID, DUPLICATE_ID, encoding,
syncNonTransactional);
}
public void deleteDuplicateID(long recordID) throws Exception
- {
- log.info("calling delete dupl id");
+ {
messageJournal.appendDeleteRecord(recordID, syncNonTransactional);
}
Modified:
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/AbstractBufferHandler.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/AbstractBufferHandler.java 2009-11-19
03:01:55 UTC (rev 8320)
+++
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/AbstractBufferHandler.java 2009-11-19
11:23:14 UTC (rev 8321)
@@ -29,8 +29,6 @@
public int isReadyToHandle(final HornetQBuffer buffer)
{
- log.info("calling buffer is ready to handle");
-
if (buffer.readableBytes() < DataConstants.SIZE_INT)
{
return -1;
@@ -38,8 +36,6 @@
int length = buffer.readInt();
- log.info("length is " + length + " readable bytes is " +
buffer.readableBytes());
-
if (buffer.readableBytes() < length)
{
return -1;
Modified:
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java 2009-11-19
03:01:55 UTC (rev 8320)
+++
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java 2009-11-19
11:23:14 UTC (rev 8321)
@@ -146,8 +146,6 @@
// This must never called by more than one thread concurrently
public void send(final Packet packet, final boolean flush)
{
- log.info("Sending packet on channel " + packet);
-
synchronized (sendLock)
{
packet.setChannelID(id);
Modified:
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java 2009-11-19
03:01:55 UTC (rev 8320)
+++
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java 2009-11-19
11:23:14 UTC (rev 8321)
@@ -157,8 +157,6 @@
{
final byte packetType = in.readByte();
- log.info("Packet type is " + packetType);
-
Packet packet;
switch (packetType)
Modified:
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java 2009-11-19
03:01:55 UTC (rev 8320)
+++
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java 2009-11-19
11:23:14 UTC (rev 8321)
@@ -330,8 +330,6 @@
public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
{
- log.info("decoding packet " + buffer);
-
final Packet packet = decoder.decode(buffer);
if (executor == null || packet.getType() == PacketImpl.PING)
Modified:
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java 2009-11-19
03:01:55 UTC (rev 8320)
+++
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java 2009-11-19
11:23:14 UTC (rev 8321)
@@ -101,23 +101,13 @@
@Override
public HornetQBuffer encode(final RemotingConnection connection)
{
- log.info("Encoding session send message, consumer id is " + consumerID +
" delivery count is " + deliveryCount);
-
HornetQBuffer buffer = serverMessage.getBuffer();
- log.info("** DELIVERING writer index is " + buffer.writerIndex());
-
-
- log.info("** WRITING CONSUMER ID AT POS " + buffer.writerIndex());
-
buffer.writeLong(consumerID);
buffer.writeInt(deliveryCount);
-
- // At this point, the rest of the message has already been encoded into the buffer
+
size = buffer.writerIndex();
- log.info("size is " + size);
-
buffer.setIndex(0, 0);
// The standard header fields
@@ -126,6 +116,10 @@
buffer.writeInt(len);
buffer.writeByte(type);
buffer.writeLong(channelID);
+
+ //And fill in the message id, since this was set on the server side so won't
already be in the buffer
+ buffer.setIndex(0, buffer.writerIndex() + DataConstants.SIZE_INT);
+ buffer.writeLong(serverMessage.getMessageID());
buffer.setIndex(0, size);
@@ -137,7 +131,7 @@
clientMessage = new ClientMessageImpl();
//fast forward past the size byte
- buffer.readInt();
+ int size = buffer.readInt();
clientMessage.decode(buffer);
@@ -147,26 +141,16 @@
//Now we need to fast forward past the body part
- int size = buffer.readInt(PacketImpl.PACKET_HEADERS_SIZE);
+ //int size = buffer.readInt(PacketImpl.PACKET_HEADERS_SIZE);
buffer.setIndex(size, buffer.writerIndex());
- log.info("decoded receive message");
-
- log.info("*** READING CONSUMER ID AT POS " + buffer.readerIndex());
-
consumerID = buffer.readLong();
- log.info("consumer id is " + consumerID);
-
deliveryCount = buffer.readInt();
- log.info("delivery count is " + deliveryCount);
-
clientMessage.setDeliveryCount(deliveryCount);
- //clientMessage.getBuffer().resetReaderIndex();
-
//Reset buffer to beginning of body
buffer.setIndex(bodyBeginning, buffer.writerIndex());
Modified:
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java 2009-11-19
03:01:55 UTC (rev 8320)
+++
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java 2009-11-19
11:23:14 UTC (rev 8321)
@@ -80,18 +80,15 @@
@Override
public HornetQBuffer encode(final RemotingConnection connection)
{
- log.info("Encoding session send message");
-
HornetQBuffer buffer = sentMessage.getBuffer();
- log.info("ENCODE ** size is " + buffer.writerIndex());
+ int afterBody = buffer.writerIndex();
buffer.writeBoolean(requiresResponse);
// At this point, the rest of the message has already been encoded into the buffer
size = buffer.writerIndex();
-
buffer.setIndex(0, 0);
// The standard header fields
@@ -100,8 +97,10 @@
buffer.writeInt(len);
buffer.writeByte(type);
buffer.writeLong(channelID);
- buffer.writeInt(size);
+ //This last byte we write marks the position of the end of the message body where
we store extra data for the packet
+ buffer.writeInt(afterBody);
+
buffer.setIndex(0, size);
return buffer;
@@ -114,22 +113,17 @@
sentMessage = receivedMessage;
- //fast forward past the size byte
- buffer.readInt();
+ //Read the position of after the body where extra data is stored
+ int afterBody = buffer.readInt();
- log.info("********** server message ");
-
receivedMessage.decode(buffer);
-
- receivedMessage.getBuffer().resetReaderIndex();
-
- requiresResponse = buffer.readBoolean();
- //reset the writer index back one boolean since when we deliver to the client we
will write the extra fields on here
+ buffer.setIndex(afterBody, buffer.writerIndex());
- //buffer.setIndex(0, buffer.writerIndex() - DataConstants.SIZE_BOOLEAN);
-
- log.info("SEND MESSAGE DECODE, WRITER INDEX IS " +
buffer.writerIndex());
+ requiresResponse = buffer.readBoolean();
+
+ receivedMessage.getBuffer().resetReaderIndex();
+
}
public int getRequiredBufferSize()
Modified:
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-11-19
03:01:55 UTC (rev 8320)
+++
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-11-19
11:23:14 UTC (rev 8321)
@@ -210,7 +210,7 @@
}
final ServerMessage message = ref.getMessage();
-
+
if (filter != null && !filter.match(message))
{
return HandleStatus.NO_MATCH;
@@ -449,7 +449,7 @@
// Acknowledge acknowledges all refs delivered by the consumer up to and including
the one explicitly
// acknowledged
-
+
MessageReference ref;
do
{
@@ -584,8 +584,6 @@
availableCredits.addAndGet(-packet.getRequiredBufferSize());
}
- log.info("*** delivering message to client");
-
channel.send(packet);
}
Modified:
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-19
03:01:55 UTC (rev 8320)
+++
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-19
11:23:14 UTC (rev 8321)
@@ -1455,20 +1455,18 @@
public void handleSend(final SessionSendMessage packet)
{
- log.info("Got message on server");
-
Packet response = null;
ServerMessage message = packet.getServerMessage();
- log.info("server message is " + message);
-
+ //log.info("Got msg on server");
+
try
{
long id = storageManager.generateUniqueID();
message.setMessageID(id);
-
+
if (message.getDestination().equals(managementAddress))
{
// It's a management message
@@ -1479,7 +1477,10 @@
{
send(message);
}
-
+
+ //log.info("requires response "+ packet.isRequiresResponse());
+
+
if (packet.isRequiresResponse())
{
response = new NullResponseMessage();
@@ -1544,7 +1545,7 @@
currentLargeMessage = null;
}
-
+
if (packet.isRequiresResponse())
{
response = new NullResponseMessage();
Modified:
branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder.java 2009-11-19
03:01:55 UTC (rev 8320)
+++
branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder.java 2009-11-19
11:23:14 UTC (rev 8321)
@@ -74,8 +74,6 @@
newBuffer.readInt();
- // log.info("Now ri, wi " + newBuffer.readerIndex() + ", " +
newBuffer.writerIndex());
-
return newBuffer;
}
}
Modified:
branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/NettyConnection.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/NettyConnection.java 2009-11-19
03:01:55 UTC (rev 8320)
+++
branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/NettyConnection.java 2009-11-19
11:23:14 UTC (rev 8321)
@@ -119,8 +119,6 @@
public void write(final HornetQBuffer buffer, final boolean flush)
{
- log.info("writing buffer " + buffer.readerIndex() + " writer "
+ buffer.writerIndex());
-
ChannelFuture future = channel.write(buffer.getUnderlyingBuffer());
if (flush)
Modified:
branches/20-optimisation/src/main/org/hornetq/jms/client/HornetQTextMessage.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/jms/client/HornetQTextMessage.java 2009-11-19
03:01:55 UTC (rev 8320)
+++
branches/20-optimisation/src/main/org/hornetq/jms/client/HornetQTextMessage.java 2009-11-19
11:23:14 UTC (rev 8321)
@@ -55,14 +55,6 @@
// Constructors --------------------------------------------------
- /*
- * This constructor is used to construct messages prior to sending
- */
-// public HornetQTextMessage()
-// {
-// super(HornetQTextMessage.TYPE);
-// }
-
public HornetQTextMessage(final ClientSession session)
{
super(HornetQTextMessage.TYPE, session);
@@ -113,8 +105,12 @@
// HornetQRAMessage override -----------------------------------------
+ private SimpleString dest = new SimpleString("jms.queue.test_queue");
+
public void doBeforeSend() throws Exception
- {
+ {
+ message.setDestination(dest);
+
message.encodeToBuffer();
message.getBuffer().writeNullableSimpleString(text);
Modified:
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java
===================================================================
---
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java 2009-11-19
03:01:55 UTC (rev 8320)
+++
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java 2009-11-19
11:23:14 UTC (rev 8321)
@@ -78,10 +78,45 @@
ClientProducer producer = session.createProducer(QUEUE);
- final int numMessages = 1;
+ final int numMessages = 10000;
for (int i = 0; i < numMessages; i++)
{
+ /*
+ * Like this:
+ *
+ * ClientMessage message = producer.createMessage(...);
+ *
+ * message.putStringProperty("foo", "bar");
+ *
+ * message.encodeToBuffer(); [this sets the destination from the producer, and
encodes]
+ *
+ * message.getBuffer().writeString("testINVMCoreClient");
+ *
+ * message.send();
+ *
+ * OR, another option:
+ *
+ * Get rid of client producer altogether,
+ *
+ * Have send direct on the session, and destination must be set explicitly
+ *
+ * e.g.
+ *
+ * ClientMessage message = session.createMessage(...)
+ *
+ * message.putStringProperty("foo", "bar");
+ *
+ * message.setDestination("foo");
+ *
+ * message.encodeToBuffer();
+ *
+ * message.getBuffer().writeString("testINVMCoreClient");
+ *
+ * message.send();
+ *
+ */
+
ClientMessage message = session.createClientMessage(HornetQTextMessage.TYPE,
false,
0,
@@ -90,13 +125,16 @@
message.putStringProperty("foo", "bar");
+ //One way around the setting destination problem is as follows -
+ //Remove destination as an attribute from client producer.
+ //The destination always has to be set explicity before sending a message
+
message.setDestination(QUEUE);
-
+
message.encodeToBuffer();
message.getBuffer().writeString("testINVMCoreClient");
- log.info("sending message " + i);
producer.send(message);
}