[jboss-cvs] JBoss Messaging SVN: r4814 - in branches/Branch_Message_Chunking_new: examples/jms/src/org/jboss/jms/example and 5 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Aug 18 04:59:08 EDT 2008
Author: ataylor
Date: 2008-08-18 04:59:07 -0400 (Mon, 18 Aug 2008)
New Revision: 4814
Modified:
branches/Branch_Message_Chunking_new/examples/jms/src/org/jboss/jms/example/QueueExample.java
branches/Branch_Message_Chunking_new/messaging.ipr
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/client/ClientMessage.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/EncoderImpl.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingHandlerImpl.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ProducerSendMessage.java
Log:
more tidying up
Modified: branches/Branch_Message_Chunking_new/examples/jms/src/org/jboss/jms/example/QueueExample.java
===================================================================
--- branches/Branch_Message_Chunking_new/examples/jms/src/org/jboss/jms/example/QueueExample.java 2008-08-18 08:55:18 UTC (rev 4813)
+++ branches/Branch_Message_Chunking_new/examples/jms/src/org/jboss/jms/example/QueueExample.java 2008-08-18 08:59:07 UTC (rev 4814)
@@ -49,7 +49,7 @@
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final MessageProducer producer = session.createProducer(queue);
final BytesMessage message = session.createBytesMessage();
- byte[] bytes = new byte[2048];
+ byte[] bytes = new byte[1024];
for(int i = 0; i < bytes.length; i++)
{
bytes[i] = (byte) i;
@@ -62,7 +62,7 @@
producer.send(message);
}*/
producer.send(message);
- producer.send(message);
+ //producer.send(message);
MessageConsumer messageConsumer = session.createConsumer(queue);
connection.start();
BytesMessage message2 = (BytesMessage) messageConsumer.receive(50000000);
@@ -80,7 +80,7 @@
throw new RuntimeException(i+"");
}
}
- message2 = (BytesMessage) messageConsumer.receive(50000000);
+ /*message2 = (BytesMessage) messageConsumer.receive(50000000);
log.info("message received from queue");
message2.readBytes(newbytes);
//log.info("message = " + new String(newbytes));
@@ -94,7 +94,7 @@
{
throw new RuntimeException(i+"");
}
- }
+ }*/
}
catch (Exception e)
{
Modified: branches/Branch_Message_Chunking_new/messaging.ipr
===================================================================
--- branches/Branch_Message_Chunking_new/messaging.ipr 2008-08-18 08:55:18 UTC (rev 4813)
+++ branches/Branch_Message_Chunking_new/messaging.ipr 2008-08-18 08:59:07 UTC (rev 4814)
@@ -463,14 +463,12 @@
<root url="jar:///home/andy/devtools/apache-ant-1.7.0/lib/ant-junit.jar!/" />
<root url="jar:///home/andy/devtools/apache-ant-1.7.0/lib/ant.jar!/" />
<root url="jar://$PROJECT_DIR$/thirdparty/apache-mina/lib/mina-core-2.0.0-M3-20080730.120633-1.jar!/" />
- <root url="file://$PROJECT_DIR$/examples/jms/config" />
</CLASSES>
<JAVADOC />
<SOURCES>
<root url="jar://$PROJECT_DIR$/thirdparty/jboss/jnpserver/lib/jnpserver-sources.jar!/" />
<root url="jar://$PROJECT_DIR$/thirdparty/apache-mina/lib/mina-core-2.0.0-M3-20080730.120633-1-sources.jar!/" />
</SOURCES>
- <jarDirectory url="file://$PROJECT_DIR$/examples/jms/config" recursive="false" />
<jarDirectory url="file://$PROJECT_DIR$/src/etc" recursive="false" />
<jarDirectory url="file://$PROJECT_DIR$/tests/etc" recursive="false" />
<jarDirectory url="file://$PROJECT_DIR$/tests/jms-tests/etc" recursive="false" />
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/client/ClientMessage.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/client/ClientMessage.java 2008-08-18 08:55:18 UTC (rev 4813)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/client/ClientMessage.java 2008-08-18 08:59:07 UTC (rev 4814)
@@ -40,6 +40,4 @@
long getDeliveryID();
void setDeliveryID(long deliveryID);
-
- void prepareBuffers();
}
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java 2008-08-18 08:55:18 UTC (rev 4813)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java 2008-08-18 08:59:07 UTC (rev 4814)
@@ -103,15 +103,4 @@
{
return this.deliveryID;
}
-
-
- public void prepareBuffers()
- {
- bodySize = 0;
- for (MessagingBuffer buffer : buffers)
- {
- buffersSizes.add(buffer.limit() - 9);
- bodySize += (buffer.limit() - 9);
- }
- }
}
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-08-18 08:55:18 UTC (rev 4813)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-08-18 08:59:07 UTC (rev 4814)
@@ -21,63 +21,23 @@
*/
package org.jboss.messaging.core.client.impl;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-
-import org.jboss.messaging.core.client.ClientBrowser;
-import org.jboss.messaging.core.client.ClientConnectionFactory;
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.*;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.MessagingBuffer;
import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.PacketDispatcher;
import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.impl.wireformat.ConsumerFlowCreditMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionAddDestinationMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCancelMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateBrowserMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateBrowserResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAForgetMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAGetInDoubtXidsResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAGetTimeoutResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAJoinMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAPrepareMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResumeMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXARollbackMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAStartMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.*;
import org.jboss.messaging.util.SimpleString;
import org.jboss.messaging.util.TokenBucketLimiterImpl;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import java.util.*;
+import java.util.concurrent.Executor;
+
/**
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
@@ -531,21 +491,21 @@
public ClientMessage createClientMessage(byte type, boolean durable, long expiration, long timestamp, byte priority)
{
- MessagingBuffer body = remotingConnection.createBuffer(INITIAL_MESSAGE_BODY_SIZE);
+ MessagingBuffer body = remotingConnection.createBuffer(INITIAL_MESSAGE_BODY_SIZE + PacketImpl.BUFFER_HEADER_SIZE);
return new ClientMessageImpl(type, durable, expiration, timestamp, priority, body);
}
public ClientMessage createClientMessage(byte type, boolean durable)
{
- MessagingBuffer body = remotingConnection.createBuffer(INITIAL_MESSAGE_BODY_SIZE);
+ MessagingBuffer body = remotingConnection.createBuffer(INITIAL_MESSAGE_BODY_SIZE + PacketImpl.BUFFER_HEADER_SIZE);
return new ClientMessageImpl(type, durable, body);
}
public ClientMessage createClientMessage(boolean durable)
{
- MessagingBuffer body = remotingConnection.createBuffer(INITIAL_MESSAGE_BODY_SIZE);
+ MessagingBuffer body = remotingConnection.createBuffer(INITIAL_MESSAGE_BODY_SIZE + PacketImpl.BUFFER_HEADER_SIZE);
return new ClientMessageImpl(durable, body);
}
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java 2008-08-18 08:55:18 UTC (rev 4813)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java 2008-08-18 08:59:07 UTC (rev 4814)
@@ -105,7 +105,7 @@
this.priority = priority;
buffers.add(body);
pos = body.position();
- bodySize = body.limit();
+ //bodySize = body.limit();
//fill in the headers
body.putInt(-1);
body.putInt(-1);
@@ -154,12 +154,14 @@
for (int i = 0; i < buffers.size(); i++)
{
MessagingBuffer buffer = buffers.get(i);
- //todo fix horrible hack, there is a mina bug so slicing the buffer doesnt seem to work, so i need to make a copy in case of multiple sends
- /*IoBuffer copied = IoBuffer.allocate(buffer.limit());
- copied.put(buffer.array(), 0, buffer.limit());
- copied.setAutoExpand(true);
- buff.putBuffer(new IoBufferWrapper(copied));*/
- buff.putBuffer(buffer.slice());
+ //todo it would be nice to always send the same buffer, but this is problematic since we need to change the packetid for each send!!!
+ //buff.putBuffer(buffer.slice());
+
+ MessagingBuffer copy = buffer.createNewBuffer(buffer.limit());
+ copy.putBytes(buffer.array());
+ copy.limit(buffer.limit());
+ copy.flip();
+ buff.putBuffer(copy);
}
}
}
@@ -211,14 +213,7 @@
for (int i = 0; i < buffers.size(); i++)
{
MessagingBuffer buffer = buffers.get(i);
- try
- {
- buff.putBytes(buffer.array(), 9, buffersSizes.get(i));
- }
- catch (Exception e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
+ buff.putBytes(buffer.array(), PacketImpl.BUFFER_HEADER_SIZE, buffersSizes.get(i));
}
}
else
@@ -254,7 +249,7 @@
/* Timestamp */ SIZE_LONG +
/* Priority */ SIZE_BYTE +
/* PropertySize and Properties */ properties.getEncodeSize() +
- /* BodySize and Body */ SIZE_INT + getBodySize();
+ /* BodySize and Body */ SIZE_INT + getBodySize() + (buffers.size() * PacketImpl.BUFFER_HEADER_SIZE);
}
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/EncoderImpl.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/EncoderImpl.java 2008-08-18 08:55:18 UTC (rev 4813)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/EncoderImpl.java 2008-08-18 08:59:07 UTC (rev 4814)
@@ -127,7 +127,7 @@
public void putBytes(byte[] bytes, int i, int i1)
{
- if(getCurrentBuffer().remaining() > (i1 - i))
+ if(getCurrentBuffer().remaining() >= (i1 - i))
{
getCurrentBuffer().putBytes(bytes, i, i1);
pos += (i1 - i);
@@ -429,7 +429,7 @@
for (MessagingBuffer buffer : buffers)
{
buffer.flip();
- bodySize += buffer.limit();
+ bodySize += buffer.limit() - PacketImpl.BUFFER_HEADER_SIZE;
}
pos = 0;
}
@@ -452,16 +452,20 @@
public void putBuffer(MessagingBuffer body)
{
buffers.add(body);
- buffersSizes.add(body.limit() - 9);
- bodySize += body.limit() - 9;
+ buffersSizes.add(body.limit() - PacketImpl.BUFFER_HEADER_SIZE);
+ bodySize += body.limit() - PacketImpl.BUFFER_HEADER_SIZE;
}
public void transferRemainingBuffer(Encoder buffer, int len)
{
- for(int i = bufferPos + 1; i < buffers.size(); i++)
+ while(buffers.size() > bufferPos + 1)
{
+ buffer.putBuffer(buffers.remove(bufferPos + 1));
+ }
+ /*for(int i = bufferPos + 1; i < buffers.size(); i++)
+ {
buffer.putBuffer(buffers.get(i));
- }
+ }*/
}
private void checkReadSpace(int size)
@@ -482,7 +486,7 @@
private void setNextFragment()
{
- MessagingBuffer buffer = buffers.get(0).createNewBuffer(PacketImpl.INITIAL_BUFFER_SIZE);
+ MessagingBuffer buffer = buffers.get(0).createNewBuffer(PacketImpl.INITIAL_BUFFER_SIZE + PacketImpl.BUFFER_HEADER_SIZE);
buffer.putInt(-1);
buffer.putInt(-1);
buffer.putBoolean(false);
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-08-18 08:55:18 UTC (rev 4813)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-08-18 08:59:07 UTC (rev 4814)
@@ -302,17 +302,11 @@
throw new IllegalStateException("Cannot write packet to connection, it is destroyed");
}
- MessagingBuffer buffer = transportConnection.createBuffer(PacketImpl.INITIAL_BUFFER_SIZE);
+ MessagingBuffer buffer = transportConnection.createBuffer(PacketImpl.INITIAL_BUFFER_SIZE + PacketImpl.BUFFER_HEADER_SIZE);
packet.encode(buffer, packetIdCreator.getAndIncrement());
- log.info("sending packet type:" + packet.getType());
for (MessagingBuffer buff : packet.getBuffers())
{
- if(packet.getType() == 100)
- {
- System.out.println(buff.getInt() + ":" + buff.getInt() + ":" + buff.getBoolean() + ":" + buff.getByte());
- buff.position(0);
- }
transportConnection.write(buff);
}
}
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingHandlerImpl.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingHandlerImpl.java 2008-08-18 08:55:18 UTC (rev 4813)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingHandlerImpl.java 2008-08-18 08:59:07 UTC (rev 4814)
@@ -111,7 +111,6 @@
if (lastPacket)
{
packetsToDeliver.get(connectionID).remove(packetId);
- log.info("handling packet type:" + packet.getType());
packet.decode();
if (executorFactory != null)
{
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java 2008-08-18 08:55:18 UTC (rev 4813)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java 2008-08-18 08:59:07 UTC (rev 4814)
@@ -50,6 +50,8 @@
public static final long NO_ID_SET = -1L;
public static final int INITIAL_BUFFER_SIZE = 1024;
+
+ public static final int BUFFER_HEADER_SIZE = 9;
private int commandID;
@@ -197,11 +199,11 @@
{
//The standard header fields
buffers.add(buffer);
- //buffersSizes.add(-1);
//length filled at end
getCurrentBuffer().putInt(-1);
+ //the packetid filled at end
+ getCurrentBuffer().putInt(-1);
//are we the last packet, filled at end.
- getCurrentBuffer().putInt(-1);
getCurrentBuffer().putBoolean(false);
putByte(type);
putInt(commandID);
@@ -218,7 +220,6 @@
{
messagingBuffer.flip();
}
- // buffersSizes.add(i, messagingBuffer.limit() - SIZE_INT);
messagingBuffer.putInt(0, messagingBuffer.limit() - SIZE_INT);
messagingBuffer.putInt(SIZE_INT, packetId);
if(i == buffers.size() - 1)
Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ProducerSendMessage.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ProducerSendMessage.java 2008-08-18 08:55:18 UTC (rev 4813)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ProducerSendMessage.java 2008-08-18 08:59:07 UTC (rev 4814)
@@ -82,7 +82,6 @@
public void encodeBody()
{
- clientMessage.prepareBuffers();
clientMessage.encodeForTransport(this);
}
More information about the jboss-cvs-commits
mailing list