[jboss-cvs] JBoss Messaging SVN: r4801 - in branches/Branch_Message_Chunking_new: src/main/org/jboss/messaging/core/message/impl and 2 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Aug 14 05:50:35 EDT 2008
Author: ataylor
Date: 2008-08-14 05:50:34 -0400 (Thu, 14 Aug 2008)
New Revision: 4801
Modified:
branches/Branch_Message_Chunking_new/examples/jms/src/org/jboss/jms/example/PerfExample.java
branches/Branch_Message_Chunking_new/examples/jms/src/org/jboss/jms/example/QueueExample.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/EncoderImpl.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingHandlerImpl.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
Log:
more tidying up
Modified: branches/Branch_Message_Chunking_new/examples/jms/src/org/jboss/jms/example/PerfExample.java
===================================================================
--- branches/Branch_Message_Chunking_new/examples/jms/src/org/jboss/jms/example/PerfExample.java 2008-08-13 14:43:30 UTC (rev 4800)
+++ branches/Branch_Message_Chunking_new/examples/jms/src/org/jboss/jms/example/PerfExample.java 2008-08-14 09:50:34 UTC (rev 4801)
@@ -21,27 +21,16 @@
*/
package org.jboss.jms.example;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.naming.InitialContext;
-
import org.jboss.jms.util.PerfParams;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.util.TokenBucketLimiter;
import org.jboss.messaging.util.TokenBucketLimiterImpl;
+import javax.jms.*;
+import javax.naming.InitialContext;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLong;
+
/**
* A simple example that can be used to gather basic performance measurements.
*
@@ -169,6 +158,10 @@
producer.setDeliveryMode(deliveryMode);
BytesMessage bytesMessage = session.createBytesMessage();
byte[] payload = new byte[messageSize];
+ for(int i = 0; i < payload.length; i++)
+ {
+ payload[i] = (byte) i;
+ }
bytesMessage.writeBytes(payload);
final int modulo = 2000;
Modified: branches/Branch_Message_Chunking_new/examples/jms/src/org/jboss/jms/example/QueueExample.java
===================================================================
--- branches/Branch_Message_Chunking_new/examples/jms/src/org/jboss/jms/example/QueueExample.java 2008-08-13 14:43:30 UTC (rev 4800)
+++ branches/Branch_Message_Chunking_new/examples/jms/src/org/jboss/jms/example/QueueExample.java 2008-08-14 09:50:34 UTC (rev 4801)
@@ -49,7 +49,7 @@
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final MessageProducer producer = session.createProducer(queue);
final BytesMessage message = session.createBytesMessage();
- byte[] bytes = new byte[40000];
+ byte[] bytes = new byte[2048];
for(int i = 0; i < bytes.length; i++)
{
bytes[i] = (byte) i;
@@ -57,9 +57,12 @@
byte[] newbytes = new byte[bytes.length];
message.writeBytes(bytes);
log.info("sending message to queue");
+ /*for(int i = 0; i < 100000; i++)
+ {
+ producer.send(message);
+ }*/
producer.send(message);
- //producer.send(message);
-
+ producer.send(message);
MessageConsumer messageConsumer = session.createConsumer(queue);
connection.start();
BytesMessage message2 = (BytesMessage) messageConsumer.receive(50000000);
@@ -77,7 +80,7 @@
throw new RuntimeException(i+"");
}
}
- /*message2 = (BytesMessage) messageConsumer.receive(50000000);
+ message2 = (BytesMessage) messageConsumer.receive(50000000);
log.info("message received from queue");
message2.readBytes(newbytes);
//log.info("message = " + new String(newbytes));
@@ -91,22 +94,7 @@
{
throw new RuntimeException(i+"");
}
- }*/
- /*message2 = (BytesMessage) messageConsumer.receive(50000000);
- log.info("message received from queue");
- message2.readBytes(newbytes);
- //log.info("message = " + new String(newbytes));
- for (int i = 0; i < newbytes.length; i++)
- {
- byte newbyte = newbytes[i];
- System.out.print(newbyte);
- System.out.print(" = ");
- System.out.println(bytes[i]);
- if(bytes[i] != newbyte)
- {
- throw new RuntimeException(i+"");
- }
- }*/
+ }
}
catch (Exception e)
{
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java 2008-08-13 14:43:30 UTC (rev 4800)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java 2008-08-14 09:50:34 UTC (rev 4801)
@@ -22,14 +22,12 @@
package org.jboss.messaging.core.message.impl;
-import org.apache.mina.core.buffer.IoBuffer;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.message.Message;
import org.jboss.messaging.core.remoting.Encoder;
import org.jboss.messaging.core.remoting.MessagingBuffer;
import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
import org.jboss.messaging.core.remoting.impl.EncoderImpl;
-import org.jboss.messaging.core.remoting.impl.mina.IoBufferWrapper;
import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
import static org.jboss.messaging.util.DataConstants.*;
import org.jboss.messaging.util.SimpleString;
@@ -157,10 +155,11 @@
{
MessagingBuffer buffer = buffers.get(i);
//todo fix horrible hack, there is a mina bug so slicing the buffer doesnt seem to work, so i need to make a copy in case of multiple sends
- IoBuffer copied = IoBuffer.allocate(buffer.limit());
+ /*IoBuffer copied = IoBuffer.allocate(buffer.limit());
copied.put(buffer.array(), 0, buffer.limit());
copied.setAutoExpand(true);
- buff.putBuffer(new IoBufferWrapper(copied));
+ buff.putBuffer(new IoBufferWrapper(copied));*/
+ buff.putBuffer(buffer.slice());
}
}
}
@@ -197,7 +196,7 @@
abstract protected void resetBodyForUse();
- public void encode(Encoder buff)
+ public void encode(Encoder buff)
{
buff.putSimpleString(getDestination());
buff.putByte(getType());
@@ -212,7 +211,14 @@
for (int i = 0; i < buffers.size(); i++)
{
MessagingBuffer buffer = buffers.get(i);
- buff.putBytes(buffer.array(), 9, buffersSizes.get(i));
+ try
+ {
+ buff.putBytes(buffer.array(), 9, buffersSizes.get(i));
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
}
}
else
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/EncoderImpl.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/EncoderImpl.java 2008-08-13 14:43:30 UTC (rev 4800)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/EncoderImpl.java 2008-08-14 09:50:34 UTC (rev 4801)
@@ -452,8 +452,8 @@
public void putBuffer(MessagingBuffer body)
{
buffers.add(body);
- buffersSizes.add(body.limit() - 5);
- bodySize += body.limit() - 5;
+ buffersSizes.add(body.limit() - 9);
+ bodySize += body.limit() - 9;
}
public void transferRemainingBuffer(Encoder buffer, int len)
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-08-13 14:43:30 UTC (rev 4800)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-08-14 09:50:34 UTC (rev 4801)
@@ -305,10 +305,15 @@
MessagingBuffer buffer = transportConnection.createBuffer(PacketImpl.INITIAL_BUFFER_SIZE);
packet.encode(buffer, packetIdCreator.getAndIncrement());
-
+ log.info("sending packet type:" + packet.getType());
for (MessagingBuffer buff : packet.getBuffers())
{
- transportConnection.write(buff);
+ if(packet.getType() == 100)
+ {
+ System.out.println(buff.getInt() + ":" + buff.getInt() + ":" + buff.getBoolean() + ":" + buff.getByte());
+ buff.position(0);
+ }
+ transportConnection.write(buff);
}
}
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingHandlerImpl.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingHandlerImpl.java 2008-08-13 14:43:30 UTC (rev 4800)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingHandlerImpl.java 2008-08-14 09:50:34 UTC (rev 4801)
@@ -111,6 +111,7 @@
if (lastPacket)
{
packetsToDeliver.get(connectionID).remove(packetId);
+ log.info("handling packet type:" + packet.getType());
packet.decode();
if (executorFactory != null)
{
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java 2008-08-13 14:43:30 UTC (rev 4800)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java 2008-08-14 09:50:34 UTC (rev 4801)
@@ -30,6 +30,7 @@
import static org.jboss.messaging.util.DataConstants.SIZE_INT;
import java.nio.charset.Charset;
+import java.util.ArrayList;
import java.util.List;
/**
@@ -130,7 +131,6 @@
public static final byte PROD_RECEIVETOKENS = 101;
public static final byte RECEIVE_MSG = 110;
- private int packetid;
// Static --------------------------------------------------------
@@ -193,16 +193,15 @@
setTargetID(other.getResponseTargetID());
}
- public void encode(MessagingBuffer buffer, int packetId)
- {
- this.packetid = packetId;
+ public void encode(MessagingBuffer buffer,int packetId)
+ {
//The standard header fields
buffers.add(buffer);
//buffersSizes.add(-1);
//length filled at end
getCurrentBuffer().putInt(-1);
//are we the last packet, filled at end.
- getCurrentBuffer().putInt(packetId);
+ getCurrentBuffer().putInt(-1);
getCurrentBuffer().putBoolean(false);
putByte(type);
putInt(commandID);
@@ -219,7 +218,13 @@
{
messagingBuffer.flip();
}
- buffersSizes.add(i, messagingBuffer.limit() - SIZE_INT);
+ // buffersSizes.add(i, messagingBuffer.limit() - SIZE_INT);
+ messagingBuffer.putInt(0, messagingBuffer.limit() - SIZE_INT);
+ messagingBuffer.putInt(SIZE_INT, packetId);
+ if(i == buffers.size() - 1)
+ {
+ messagingBuffer.putBoolean(SIZE_INT * 2, true);
+ }
}
}
@@ -264,20 +269,11 @@
}
public List<MessagingBuffer> getBuffers()
{
- for (int i = 0; i < buffers.size(); i++)
+ List<MessagingBuffer> buffers = new ArrayList<MessagingBuffer>();
+ for (int i = 0; i < this.buffers.size(); i++)
{
- MessagingBuffer buffer = buffers.get(i);
- if(buffer.position() != 0)
- {
- buffer.flip();
- }
- buffer.putInt(0, buffersSizes.get(i));
- buffer.putInt(SIZE_INT, packetid);
- if(i == buffers.size() - 1)
- {
- buffer.putBoolean(SIZE_INT * 2, true);
- }
-
+ MessagingBuffer buffer = this.buffers.get(i).slice();
+ buffers.add(buffer);
}
return buffers;
}
More information about the jboss-cvs-commits
mailing list