[jboss-cvs] JBoss Messaging SVN: r5164 - in branches/Branch_Chunk_Clebert: src/main/org/jboss/messaging/core/client and 12 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Oct 21 13:07:40 EDT 2008
Author: clebert.suconic at jboss.com
Date: 2008-10-21 13:07:40 -0400 (Tue, 21 Oct 2008)
New Revision: 5164
Modified:
branches/Branch_Chunk_Clebert/.classpath
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionMessage.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/MessagingServer.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/ServerSession.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/jms/client/JBossConnectionFactory.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/jms/server/JMSServerManager.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/jms/server/management/JMSServerControlMBean.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/jms/server/management/impl/JMSServerControl.java
branches/Branch_Chunk_Clebert/tests/jms-tests/src/org/jboss/test/messaging/jms/CTSMiscellaneousTest.java
branches/Branch_Chunk_Clebert/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTestCase.java
branches/Branch_Chunk_Clebert/tests/jms-tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java
branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
Log:
Configuration on big-message-size
Modified: branches/Branch_Chunk_Clebert/.classpath
===================================================================
--- branches/Branch_Chunk_Clebert/.classpath 2008-10-21 14:13:48 UTC (rev 5163)
+++ branches/Branch_Chunk_Clebert/.classpath 2008-10-21 17:07:40 UTC (rev 5164)
@@ -8,7 +8,7 @@
<classpathentry kind="src" path="examples/messaging/src"/>
<classpathentry excluding="**/.svn/**/*" kind="src" path="tests/src">
<attributes>
- <attribute name="org.eclipse.jdt.launching.CLASSPATH_ATTR_LIBRARY_PATH_ENTRY" value="/work/projects/trunk/native/bin"/>
+ <attribute name="org.eclipse.jdt.launching.CLASSPATH_ATTR_LIBRARY_PATH_ENTRY" value="ChunkWork/native/bin"/>
</attributes>
</classpathentry>
<classpathentry kind="src" path="tests/jms-tests/src"/>
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java 2008-10-21 14:13:48 UTC (rev 5163)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java 2008-10-21 17:07:40 UTC (rev 5164)
@@ -61,6 +61,14 @@
int getProducerMaxRate();
+ int getBigMessageSize();
+
+ void setBigMessageSize(final int bigMessageSize);
+
+ int getBigMessageChunkSize();
+
+ void setBigMessageChunkSize(final int bigMessageChunkSize);
+
boolean isBlockOnPersistentSend();
void setBlockOnPersistentSend(final boolean blocking);
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java 2008-10-21 14:13:48 UTC (rev 5163)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java 2008-10-21 17:07:40 UTC (rev 5164)
@@ -58,12 +58,6 @@
// Attributes -----------------------------------------------------------------------------------
- // TODO This is temporary, make this configurable somewhere
- public static final int BIG_PACKAGE_SIZE = 10 * 1024;
-
- // TODO This is temporary, make this better
- public static final int CHUNK_SIZE = 10 * 1024;
-
private final SimpleString address;
private final long id;
@@ -91,6 +85,10 @@
private final int initialWindowSize;
private final SimpleString autoGroupId;
+
+ private final int bigMessageSize;
+
+ private final int bigMessageChunkSize;
// Static ---------------------------------------------------------------------------------------
@@ -104,6 +102,8 @@
final boolean blockOnPersistentSend,
final SimpleString autoGroupId,
final int initialCredits,
+ final int bigMessageSize,
+ final int bigMessageChunkSize,
final Channel channel)
{
this.channel = channel;
@@ -121,6 +121,10 @@
this.blockOnPersistentSend = blockOnPersistentSend;
this.autoGroupId = autoGroupId;
+
+ this.bigMessageSize = bigMessageSize;
+
+ this.bigMessageChunkSize = bigMessageChunkSize;
availableCredits = new Semaphore(initialCredits);
@@ -331,7 +335,7 @@
boolean sendBlocking = msg.isDurable() ? blockOnPersistentSend : blockOnNonPersistentSend;
- if (msg.getEncodeSize() > BIG_PACKAGE_SIZE)
+ if (msg.getEncodeSize() > bigMessageSize)
{
sendMessageInChunks(sendBlocking, msg);
}
@@ -380,7 +384,7 @@
{
int headerSize = msg.getPropertiesEncodeSize();
- if (headerSize > BIG_PACKAGE_SIZE)
+ if (headerSize > bigMessageSize)
{
throw new MessagingException(MessagingException.ILLEGAL_STATE,
"Header size is too big, use the messageBody for large data");
@@ -391,7 +395,7 @@
final int bodySize = msg.getBodySize();
- int bodyLength = BIG_PACKAGE_SIZE - headerSize;
+ int bodyLength = bigMessageSize - headerSize;
MessagingBuffer bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate(bodyLength));
@@ -414,7 +418,7 @@
for (int pos = bodyLength; pos < bodySize;)
{
- bodyLength = Math.min(bodySize - pos, CHUNK_SIZE);
+ bodyLength = Math.min(bodySize - pos, bigMessageChunkSize);
bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate(bodyLength));
msg.encodeBody(bodyBuffer, pos, bodyLength);
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java 2008-10-21 14:13:48 UTC (rev 5163)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java 2008-10-21 17:07:40 UTC (rev 5164)
@@ -47,6 +47,12 @@
public static final long DEFAULT_PING_PERIOD = 2000;
+ // Any message beyond this size is considered a big message (to be chunked)
+ public static final int DEFAULT_BIG_MESSAGE_SIZE = 10 * 1024;
+
+ // The size of the chunks that will be sent on big messages
+ public static final int DEFAULT_BIG_MESSAGE_CHUNK_SIZE = 10 * 1024;
+
public static final int DEFAULT_CONSUMER_WINDOW_SIZE = 1024 * 1024;
public static final int DEFAULT_CONSUMER_MAX_RATE = -1;
@@ -82,6 +88,10 @@
private volatile long pingPeriod;
private volatile long callTimeout;
+
+ private volatile int bigMessageSize;
+
+ private volatile int bigMessageChunkSize;
private volatile int consumerWindowSize;
@@ -120,6 +130,8 @@
final int consumerMaxRate,
final int producerWindowSize,
final int producerMaxRate,
+ final int bigMessageSize,
+ final int bigMessageChunkSize,
final boolean blockOnAcknowledge,
final boolean blockOnNonPersistentSend,
final boolean blockOnPersistentSend,
@@ -141,6 +153,8 @@
this.blockOnAcknowledge = blockOnAcknowledge;
this.blockOnNonPersistentSend = blockOnNonPersistentSend;
this.blockOnPersistentSend = blockOnPersistentSend;
+ this.bigMessageChunkSize = bigMessageChunkSize;
+ this.bigMessageSize = bigMessageSize;
this.autoGroupId = autoGroupId;
connectionRegistry = ConnectionRegistryImpl.instance;
}
@@ -164,6 +178,8 @@
blockOnAcknowledge = DEFAULT_BLOCK_ON_ACKNOWLEDGE;
blockOnPersistentSend = DEFAULT_BLOCK_ON_PERSISTENT_SEND;
blockOnNonPersistentSend = DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND;
+ bigMessageSize = DEFAULT_BIG_MESSAGE_SIZE;
+ bigMessageChunkSize = DEFAULT_BIG_MESSAGE_CHUNK_SIZE;
autoGroupId = DEFAULT_AUTO_GROUP_ID;
connectionRegistry = ConnectionRegistryImpl.instance;
}
@@ -184,6 +200,8 @@
blockOnAcknowledge = DEFAULT_BLOCK_ON_ACKNOWLEDGE;
blockOnPersistentSend = DEFAULT_BLOCK_ON_PERSISTENT_SEND;
blockOnNonPersistentSend = DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND;
+ bigMessageSize = DEFAULT_BIG_MESSAGE_SIZE;
+ bigMessageChunkSize = DEFAULT_BIG_MESSAGE_CHUNK_SIZE;
autoGroupId = DEFAULT_AUTO_GROUP_ID;
connectionRegistry = ConnectionRegistryImpl.instance;
}
@@ -379,6 +397,38 @@
return failedOver;
}
+ /**
+ * @return the bigMessageSize
+ */
+ public int getBigMessageSize()
+ {
+ return bigMessageSize;
+ }
+
+ /**
+ * @param bigMessageSize the bigMessageSize to set
+ */
+ public void setBigMessageSize(int bigMessageSize)
+ {
+ this.bigMessageSize = bigMessageSize;
+ }
+
+ /**
+ * @return the bigMessageChunkSize
+ */
+ public int getBigMessageChunkSize()
+ {
+ return bigMessageChunkSize;
+ }
+
+ /**
+ * @param bigMessageChunkSize the bigMessageChunkSize to set
+ */
+ public void setBigMessageChunkSize(int bigMessageChunkSize)
+ {
+ this.bigMessageChunkSize = bigMessageChunkSize;
+ }
+
// ClientSessionFactoryInternal implementation
// ------------------------------------------
@@ -470,6 +520,8 @@
clientVersion.getIncrementingVersion(),
username,
password,
+ bigMessageSize,
+ bigMessageChunkSize,
xa,
autoCommitSends,
autoCommitAcks);
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-10-21 14:13:48 UTC (rev 5163)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-10-21 17:07:40 UTC (rev 5164)
@@ -448,6 +448,8 @@
autoCommitSends && blockOnPersistentSend,
response.getAutoGroupId(),
response.getInitialCredits(),
+ sessionFactory.getBigMessageSize(),
+ sessionFactory.getBigMessageChunkSize(),
channel);
}
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java 2008-10-21 14:13:48 UTC (rev 5163)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java 2008-10-21 17:07:40 UTC (rev 5164)
@@ -93,6 +93,8 @@
if (chunk.getHeader() != null)
{
+ // The Header only comes on the first message, so a buffer has to be created on the client
+ // to hold either a file or a big message
MessagingBuffer header = new ByteBufferWrapper(ByteBuffer.wrap(chunk.getHeader()));
currentChunkMessage = clientSession.createLargeMessage(chunk.getTargetID(), header);
@@ -112,6 +114,7 @@
}
else
{
+ // No header.. this is then a continuation of a previous message
ByteBuffer body = ByteBuffer.wrap(chunk.getBody());
currentChunkMessage = currentChunk.get(chunk.getTargetID());
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionMessage.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionMessage.java 2008-10-21 14:13:48 UTC (rev 5163)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionMessage.java 2008-10-21 17:07:40 UTC (rev 5164)
@@ -46,6 +46,10 @@
private String password;
+ private int bigMessageSize;
+
+ private int bigMessageChunkSize;
+
private boolean xa;
private boolean autoCommitSends;
@@ -58,6 +62,7 @@
public CreateSessionMessage(final String name, final long sessionChannelID,
final int version, final String username, final String password,
+ final int bigMessageSize, final int bigMessageChunkSize,
final boolean xa, final boolean autoCommitSends,
final boolean autoCommitAcks)
{
@@ -73,6 +78,10 @@
this.password = password;
+ this.bigMessageSize = bigMessageSize;
+
+ this.bigMessageChunkSize = bigMessageChunkSize;
+
this.xa = xa;
this.autoCommitSends = autoCommitSends;
@@ -134,6 +143,8 @@
buffer.putInt(version);
buffer.putNullableString(username);
buffer.putNullableString(password);
+ buffer.putInt(bigMessageSize);
+ buffer.putInt(bigMessageChunkSize);
buffer.putBoolean(xa);
buffer.putBoolean(autoCommitSends);
buffer.putBoolean(autoCommitAcks);
@@ -146,6 +157,8 @@
version = buffer.getInt();
username = buffer.getNullableString();
password = buffer.getNullableString();
+ bigMessageSize = buffer.getInt();
+ bigMessageChunkSize = buffer.getInt();
xa = buffer.getBoolean();
autoCommitSends = buffer.getBoolean();
autoCommitAcks = buffer.getBoolean();
@@ -178,6 +191,20 @@
return false;
}
+ /**
+ * @return
+ */
+ public int getBigMessageSize()
+ {
+ return bigMessageSize;
+ }
+
+ public int getBigMessageChunkSize()
+ {
+
+ return bigMessageChunkSize;
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/MessagingServer.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/MessagingServer.java 2008-10-21 14:13:48 UTC (rev 5163)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/MessagingServer.java 2008-10-21 17:07:40 UTC (rev 5164)
@@ -69,6 +69,8 @@
String password,
int incrementingVersion,
RemotingConnection remotingConnection,
+ int bigMessageSize,
+ int bigMessageChunkSize,
boolean autoCommitSends,
boolean autoCommitAcks,
boolean xa) throws Exception;
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/ServerSession.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/ServerSession.java 2008-10-21 14:13:48 UTC (rev 5163)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/ServerSession.java 2008-10-21 17:07:40 UTC (rev 5164)
@@ -50,6 +50,10 @@
String getUsername();
String getPassword();
+
+ int getBigMessageSize();
+
+ int getBigMessageChunkSize();
void removeBrowser(ServerBrowserImpl browser) throws Exception;
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-10-21 14:13:48 UTC (rev 5163)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-10-21 17:07:40 UTC (rev 5164)
@@ -416,6 +416,8 @@
final String password,
final int incrementingVersion,
final RemotingConnection connection,
+ final int bigMessageSize,
+ final int bigMessageChunkSize,
final boolean autoCommitSends,
final boolean autoCommitAcks,
final boolean xa) throws Exception
@@ -447,6 +449,8 @@
channelID,
username,
password,
+ bigMessageSize,
+ bigMessageChunkSize,
autoCommitSends,
autoCommitAcks,
xa,
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2008-10-21 14:13:48 UTC (rev 5163)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2008-10-21 17:07:40 UTC (rev 5164)
@@ -78,6 +78,8 @@
request.getPassword(),
request.getVersion(),
connection,
+ request.getBigMessageSize(),
+ request.getBigMessageChunkSize(),
request.isAutoCommitSends(),
request.isAutoCommitAcks(),
request.isXA());
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-10-21 14:13:48 UTC (rev 5163)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-10-21 17:07:40 UTC (rev 5164)
@@ -54,6 +54,7 @@
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
*
* @version <tt>$Revision: 3783 $</tt> $Id: ServerConsumerImpl.java 3783 2008-02-25 12:15:14Z timfox $
*/
@@ -67,12 +68,6 @@
// Static
// ---------------------------------------------------------------------------------------
- // TODO This is temporary, make this configurable somewhere
- public static final int BIG_PACKAGE_SIZE = 10 * 1024;
-
- // TODO This is temporary, make this better
- public static final int CHUNK_SIZE = 10 * 1024;
-
// Attributes
// -----------------------------------------------------------------------------------
@@ -84,6 +79,10 @@
private final Filter filter;
+ private final int bigMessageSize;
+
+ private final int bigMessageChunkSize;
+
private final ServerSession session;
private final Object startStopLock = new Object();
@@ -147,6 +146,10 @@
this.channel = channel;
messageQueue.addConsumer(this);
+
+ this.bigMessageSize = session.getBigMessageSize();
+
+ this.bigMessageChunkSize = session.getBigMessageChunkSize();
}
// ServerConsumer implementation
@@ -330,7 +333,7 @@
final int bodySize = message.getBodySize();
- int bodyLength = BIG_PACKAGE_SIZE - headerSize;
+ int bodyLength = bigMessageSize - headerSize;
MessagingBuffer headerBuffer = new ByteBufferWrapper(ByteBuffer.allocate(message.getPropertiesEncodeSize()));
message.encodeProperties(headerBuffer);
@@ -347,7 +350,7 @@
for (int pos = bodyLength; pos < bodySize;)
{
- bodyLength = Math.min(bodySize - pos, CHUNK_SIZE);
+ bodyLength = Math.min(bodySize - pos, bigMessageChunkSize);
bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate(bodyLength));
message.encodeBody(bodyBuffer, pos, bodyLength);
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-10-21 14:13:48 UTC (rev 5163)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-10-21 17:07:40 UTC (rev 5164)
@@ -93,6 +93,10 @@
private final String username;
private final String password;
+
+ private final int bigMessageSize;
+
+ private final int bigMessageChunkSize;
private final boolean autoCommitSends;
@@ -146,6 +150,8 @@
final long id,
final String username,
final String password,
+ final int bigMessageSize,
+ final int bigMessageChunkSize,
final boolean autoCommitSends,
final boolean autoCommitAcks,
final boolean xa,
@@ -166,6 +172,10 @@
this.username = username;
this.password = password;
+
+ this.bigMessageSize = bigMessageSize;
+
+ this.bigMessageChunkSize = bigMessageChunkSize;
this.autoCommitSends = autoCommitSends;
@@ -214,6 +224,16 @@
{
return password;
}
+
+ public int getBigMessageSize()
+ {
+ return bigMessageSize;
+ }
+
+ public int getBigMessageChunkSize()
+ {
+ return bigMessageChunkSize;
+ }
public long getID()
{
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/jms/client/JBossConnectionFactory.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/jms/client/JBossConnectionFactory.java 2008-10-21 14:13:48 UTC (rev 5163)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/jms/client/JBossConnectionFactory.java 2008-10-21 17:07:40 UTC (rev 5164)
@@ -68,6 +68,10 @@
private final int producerWindowSize;
private final int producerMaxRate;
+
+ private final int bigMessageSize;
+
+ private final int bigMessageChunkSize;
private final boolean blockOnAcknowledge;
@@ -89,6 +93,8 @@
final int consumerMaxRate,
final int producerWindowSize,
final int producerMaxRate,
+ final int bigMessageSize,
+ final int bigmessageChunkSize,
final boolean blockOnAcknowledge,
final boolean blockOnNonPersistentSend,
final boolean blockOnPersistentSend,
@@ -105,6 +111,8 @@
this.producerMaxRate = producerMaxRate;
this.producerWindowSize = producerWindowSize;
this.blockOnAcknowledge = blockOnAcknowledge;
+ this.bigMessageSize = bigMessageSize;
+ this.bigMessageChunkSize = bigmessageChunkSize;
this.blockOnNonPersistentSend = blockOnNonPersistentSend;
this.blockOnPersistentSend = blockOnPersistentSend;
this.autoGroupId = autoGroupId;
@@ -279,6 +287,8 @@
consumerMaxRate,
producerWindowSize,
producerMaxRate,
+ bigMessageSize,
+ bigMessageChunkSize,
blockOnAcknowledge,
blockOnNonPersistentSend,
blockOnPersistentSend,
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/jms/server/JMSServerManager.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/jms/server/JMSServerManager.java 2008-10-21 14:13:48 UTC (rev 5163)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/jms/server/JMSServerManager.java 2008-10-21 17:07:40 UTC (rev 5164)
@@ -110,6 +110,7 @@
long pingPeriod, long callTimeout, String clientID,
int dupsOKBatchSize, int consumerWindowSize, int consumerMaxRate,
int producerWindowSize, int producerMaxRate,
+ int bigMessageSize, int bigMessageChunkSize,
boolean blockOnAcknowledge,
boolean blockOnNonPersistentSend,
boolean blockOnPersistentSend, boolean autoGroupId, String jndiBinding)
@@ -121,6 +122,7 @@
long pingPeriod, long callTimeout, String clientID,
int dupsOKBatchSize, int consumerWindowSize, int consumerMaxRate,
int producerWindowSize, int producerMaxRate,
+ int bigMessageSize, int bigMessageChunkSize,
boolean blockOnAcknowledge,
boolean blockOnNonPersistentSend,
boolean blockOnPersistentSend, boolean autoGroupId, List<String> jndiBinding)
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java 2008-10-21 14:13:48 UTC (rev 5163)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java 2008-10-21 17:07:40 UTC (rev 5164)
@@ -55,7 +55,11 @@
private static final String PRODUCER_WINDOW_SIZE_ELEMENT = "producer-window-size";
private static final String PRODUCER_MAX_RATE_ELEMENT = "producer-max-rate";
-
+
+ private static final String BIG_MESSAGE_ELEMENT = "big-message-size";
+
+ private static final String BIG_MESSAGE_CHUNK_ELEMENT = "big-message-chunk-size";
+
private static final String BLOCK_ON_ACKNOWLEDGE_ELEMENT = "block-on-acknowledge";
private static final String SEND_NP_MESSAGES_SYNCHRONOUSLY_ELEMENT = "send-np-messages-synchronously";
@@ -135,6 +139,8 @@
int consumerMaxRate = ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE;
int producerWindowSize = ClientSessionFactoryImpl.DEFAULT_PRODUCER_WINDOW_SIZE;
int producerMaxRate = ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE;
+ int bigMessageSize = ClientSessionFactoryImpl.DEFAULT_BIG_MESSAGE_SIZE;
+ int bigMessageChunkSize = ClientSessionFactoryImpl.DEFAULT_BIG_MESSAGE_CHUNK_SIZE;
boolean blockOnAcknowledge = ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_ACKNOWLEDGE;
boolean blockOnNonPersistentSend = ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND;
boolean blockOnPersistentSend = ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_PERSISTENT_SEND;
@@ -171,6 +177,14 @@
{
producerMaxRate = Integer.parseInt(children.item(j).getTextContent().trim());
}
+ else if (BIG_MESSAGE_ELEMENT.equalsIgnoreCase(children.item(j).getNodeName()))
+ {
+ bigMessageSize = Integer.parseInt(children.item(j).getTextContent().trim());
+ }
+ else if (BIG_MESSAGE_CHUNK_ELEMENT.equalsIgnoreCase(children.item(j).getNodeName()))
+ {
+ bigMessageChunkSize = Integer.parseInt(children.item(j).getTextContent().trim());
+ }
else if (CLIENTID_ELEMENT.equalsIgnoreCase(children.item(j).getNodeName()))
{
clientID = children.item(j).getTextContent().trim();
@@ -395,6 +409,8 @@
consumerMaxRate,
producerWindowSize,
producerMaxRate,
+ bigMessageSize,
+ bigMessageChunkSize,
blockOnAcknowledge,
blockOnNonPersistentSend,
blockOnPersistentSend,
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java 2008-10-21 14:13:48 UTC (rev 5163)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java 2008-10-21 17:07:40 UTC (rev 5164)
@@ -22,6 +22,7 @@
package org.jboss.messaging.jms.server.impl;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.management.MessagingServerControlMBean;
@@ -196,46 +197,35 @@
int consumerMaxRate,
int producerWindowSize,
int producerMaxRate,
+ int bigMessageSize,
+ int bigMessageChunkSize,
boolean blockOnAcknowledge,
boolean blockOnNonPersistentSend,
boolean blockOnPersistentSend,
boolean autoGroupId,
String jndiBinding) throws Exception
{
- JBossConnectionFactory cf = connectionFactories.get(name);
- if (cf == null)
- {
- cf = new JBossConnectionFactory(connectorConfig,
- backupConnectorConfig,
- pingPeriod,
- callTimeout,
- clientID,
- dupsOKBatchSize,
- consumerWindowSize,
- consumerMaxRate,
- producerWindowSize,
- producerMaxRate,
- blockOnAcknowledge,
- blockOnNonPersistentSend,
- blockOnPersistentSend,
- autoGroupId);
- connectionFactories.put(name, cf);
- }
- if (!bindToJndi(jndiBinding, cf))
- {
- return false;
- }
- if (connectionFactoryBindings.get(name) == null)
- {
- connectionFactoryBindings.put(name, new ArrayList<String>());
- }
- connectionFactoryBindings.get(name).add(jndiBinding);
-
- List<String> bindings = new ArrayList<String>();
+ ArrayList<String> bindings = new ArrayList<String>(1);
bindings.add(jndiBinding);
-
- managementService.registerConnectionFactory(name, cf, bindings);
- return true;
+
+ return createConnectionFactory(name,
+ connectorConfig,
+ backupConnectorConfig,
+ pingPeriod,
+ callTimeout,
+ clientID,
+ dupsOKBatchSize,
+ consumerWindowSize,
+ consumerMaxRate,
+ producerWindowSize,
+ producerMaxRate,
+ bigMessageSize,
+ bigMessageChunkSize,
+ blockOnAcknowledge,
+ blockOnNonPersistentSend,
+ blockOnPersistentSend,
+ autoGroupId,
+ bindings);
}
public boolean createConnectionFactory(String name,
@@ -249,6 +239,8 @@
int consumerMaxRate,
int producerWindowSize,
int producerMaxRate,
+ int bigMessageSize,
+ int bigMessageChunkSize,
boolean blockOnAcknowledge,
boolean blockOnNonPersistentSend,
boolean blockOnPersistentSend,
@@ -268,6 +260,8 @@
consumerMaxRate,
producerWindowSize,
producerMaxRate,
+ bigMessageSize == -1 ? ClientSessionFactoryImpl.DEFAULT_BIG_MESSAGE_SIZE : bigMessageSize,
+ bigMessageChunkSize == -1 ? ClientSessionFactoryImpl.DEFAULT_BIG_MESSAGE_CHUNK_SIZE : bigMessageChunkSize,
blockOnAcknowledge,
blockOnNonPersistentSend,
blockOnPersistentSend,
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/jms/server/management/JMSServerControlMBean.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/jms/server/management/JMSServerControlMBean.java 2008-10-21 14:13:48 UTC (rev 5163)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/jms/server/management/JMSServerControlMBean.java 2008-10-21 17:07:40 UTC (rev 5164)
@@ -79,6 +79,8 @@
@Parameter(name = "consumerMaxRate", desc = "Consumer's max rate") int consumerMaxRate,
@Parameter(name = "producerWindowSize", desc = "Producer's window size") int producerWindowSize,
@Parameter(name = "producerMaxRate", desc = "Producer's max rate") int producerMaxRate,
+ @Parameter(name = "bigMessageSize", desc = "Size of what is considered a big message requiring sending in chunks") int bigMessageSize,
+ @Parameter(name = "bigMessageChunkSize", desc = "Size of the chunks sent on big messages") int bigMessageChunkSize,
@Parameter(name = "blockOnAcknowledge", desc = "Does acknowlegment block?") boolean blockOnAcknowledge,
@Parameter(name = "blockOnNonPersistentSend", desc = "Does sending non persistent messages block?") boolean blockOnNonPersistentSend,
@Parameter(name = "blockOnPersistentSend", desc = "Does sending persistent messages block") boolean blockOnPersistentSend,
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/jms/server/management/impl/JMSServerControl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/jms/server/management/impl/JMSServerControl.java 2008-10-21 14:13:48 UTC (rev 5163)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/jms/server/management/impl/JMSServerControl.java 2008-10-21 17:07:40 UTC (rev 5164)
@@ -71,6 +71,7 @@
long pingPeriod, long callTimeout, String clientID,
int dupsOKBatchSize, int consumerWindowSize, int consumerMaxRate,
int producerWindowSize, int producerMaxRate,
+ int bigMessageSize, int bigMessageChunkSize,
boolean blockOnAcknowledge,
boolean blockOnNonPersistentSend,
boolean blockOnPersistentSend, boolean autoGroupId, String jndiBinding) throws Exception
@@ -81,7 +82,8 @@
boolean created = server.createConnectionFactory(name, connectorConfig,
backupConnectorConfig,
pingPeriod, callTimeout, clientID, dupsOKBatchSize,
- consumerWindowSize, consumerMaxRate, producerWindowSize, producerMaxRate,
+ consumerWindowSize, consumerMaxRate, producerWindowSize, producerMaxRate,
+ bigMessageSize, bigMessageChunkSize,
blockOnAcknowledge, blockOnNonPersistentSend,
blockOnPersistentSend, autoGroupId, jndiBinding);
if (created)
Modified: branches/Branch_Chunk_Clebert/tests/jms-tests/src/org/jboss/test/messaging/jms/CTSMiscellaneousTest.java
===================================================================
--- branches/Branch_Chunk_Clebert/tests/jms-tests/src/org/jboss/test/messaging/jms/CTSMiscellaneousTest.java 2008-10-21 14:13:48 UTC (rev 5163)
+++ branches/Branch_Chunk_Clebert/tests/jms-tests/src/org/jboss/test/messaging/jms/CTSMiscellaneousTest.java 2008-10-21 17:07:40 UTC (rev 5164)
@@ -72,7 +72,7 @@
getJmsServerManager().createConnectionFactory("StrictTCKConnectionFactory",
new TransportConfiguration("org.jboss.messaging.core.remoting.impl.netty.NettyConnectorFactory"), null, 5000, 5000,
null,
- 1000, 1024 * 1024, -1, 1000, -1, true, true, true, false, "/StrictTCKConnectionFactory");
+ 1000, 1024 * 1024, -1, 1000, -1, -1, -1, true, true, true, false, "/StrictTCKConnectionFactory");
cf = (JBossConnectionFactory) getInitialContext().lookup("/StrictTCKConnectionFactory");
}
Modified: branches/Branch_Chunk_Clebert/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTestCase.java
===================================================================
--- branches/Branch_Chunk_Clebert/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTestCase.java 2008-10-21 14:13:48 UTC (rev 5163)
+++ branches/Branch_Chunk_Clebert/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTestCase.java 2008-10-21 17:07:40 UTC (rev 5164)
@@ -39,7 +39,7 @@
getJmsServerManager().createConnectionFactory("testsuitecf",
new TransportConfiguration("org.jboss.messaging.core.remoting.impl.netty.NettyConnectorFactory"), null, 5000, 5000,
- null, 1000, 1024 * 1024, -1, 1000, -1, true, true, true, false, "/testsuitecf");
+ null, 1000, 1024 * 1024, -1, 1000, -1, -1, -1, true, true, true, false, "/testsuitecf");
cf = (JBossConnectionFactory) getInitialContext().lookup("/testsuitecf");
}
Modified: branches/Branch_Chunk_Clebert/tests/jms-tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java
===================================================================
--- branches/Branch_Chunk_Clebert/tests/jms-tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java 2008-10-21 14:13:48 UTC (rev 5163)
+++ branches/Branch_Chunk_Clebert/tests/jms-tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java 2008-10-21 17:07:40 UTC (rev 5164)
@@ -529,7 +529,7 @@
getJMSServerManager().createConnectionFactory(objectName,
new TransportConfiguration("org.jboss.messaging.core.remoting.impl.netty.NettyConnectorFactory"), null, 5000, 5000,
clientId, dupsOkBatchSize,
- prefetchSize, -1, 1000, -1, blockOnAcknowledge, true, true, false, jndiBindings);
+ prefetchSize, -1, 1000, -1, -1, -1, blockOnAcknowledge, true, true, false, jndiBindings);
}
Modified: branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
===================================================================
--- branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java 2008-10-21 14:13:48 UTC (rev 5163)
+++ branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java 2008-10-21 17:07:40 UTC (rev 5164)
@@ -142,11 +142,10 @@
session.createQueue(ADDRESS, queue[0], null, true, false);
session.createQueue(ADDRESS, queue[1], null, true, false);
-
int numberOfIntegers = 10000;
-
- FileClientMessage clientFile = createLargeClientMessage(session, numberOfIntegers);
-
+
+ FileClientMessage clientFile = createLargeClientMessage(session, numberOfIntegers);
+
ClientProducer producer = session.createProducer(ADDRESS);
producer.send(clientFile);
@@ -185,37 +184,37 @@
}
}
-
+
public void testPageOnLargeMessage() throws Exception
{
testPageOnLargeMessage(true, false);
-
+
}
-
+
public void testPageOnLargeMessageNullPersistence() throws Exception
{
testPageOnLargeMessage(false, false);
-
+
}
-
+
private void testPageOnLargeMessage(boolean realFiles, boolean sendBlocking) throws Exception
{
-
+
clearData();
Configuration config = createDefaultConfig();
-
+
config.setPagingMaxGlobalSizeBytes(20 * 1024);
config.setPagingDefaultSize(10 * 1024);
-
+
messagingService = createService(realFiles, false, config, new HashMap<String, QueueSettings>());
-
+
messagingService.start();
-
+
final int numberOfIntegers = 256;
-
+
final int numberOfIntegersBigMessage = 10000;
-
+
try
{
ClientSessionFactory sf = createInVMFactory();
@@ -230,7 +229,6 @@
ClientSession session = sf.createSession(false, true, true, false);
session.createQueue(ADDRESS, ADDRESS, null, true, false);
-
ClientProducer producer = session.createProducer(ADDRESS);
@@ -241,7 +239,7 @@
ClientMessage message = null;
MessagingBuffer body = null;
-
+
for (int i = 0; i < 100; i++)
{
MessagingBuffer bodyLocal = new ByteBufferWrapper(ioBuffer);
@@ -251,7 +249,7 @@
bodyLocal.putInt(j);
}
bodyLocal.flip();
-
+
if (i == 0)
{
body = bodyLocal;
@@ -262,9 +260,9 @@
producer.send(message);
}
-
+
FileClientMessage clientFile = createLargeClientMessage(session, numberOfIntegersBigMessage);
-
+
producer.send(clientFile);
session.close();
@@ -272,15 +270,14 @@
if (realFiles)
{
messagingService.stop();
-
- System.out.println("Acceptor "+ InVMRegistry.instance.getAcceptor(0));
+
InVMRegistry.instance.clear();
-
+
config = createDefaultConfig();
-
+
config.setPagingMaxGlobalSizeBytes(20 * 1024);
config.setPagingDefaultSize(10 * 1024);
-
+
messagingService = createService(true, false, config, new HashMap<String, QueueSettings>());
messagingService.start();
@@ -291,11 +288,11 @@
ClientConsumer consumer = session.createConsumer(ADDRESS);
-// if (realFiles)
-// {
-// consumer.setLargeMessagesAsFiles(true);
-// consumer.setLargeMessagesDir(new File(clientLargeMessagesDir));
-// }
+ // if (realFiles)
+ // {
+ // consumer.setLargeMessagesAsFiles(true);
+ // consumer.setLargeMessagesDir(new File(clientLargeMessagesDir));
+ // }
session.start();
@@ -304,11 +301,11 @@
ClientMessage message2 = consumer.receive(10000);
assertNotNull(message2);
-
+
message2.processed();
-
+
System.out.println("msg on client = " + message2.getMessageID());
-
+
assertNotNull(message2);
try
@@ -322,18 +319,17 @@
throw e;
}
}
-
+
consumer.close();
session.close();
-
+
session = sf.createSession(false, true, true, false);
-
+
readMessage(session, ADDRESS, numberOfIntegersBigMessage);
// printBuffer("message received : ", message2.getBody());
-
session.close();
}
finally
@@ -347,21 +343,20 @@
}
}
-
}
private FileClientMessage createLargeClientMessage(ClientSession session, int numberOfIntegers) throws Exception
{
-
+
FileClientMessage clientMessage = session.createFileMessage(true);
-
+
File tmpFile = new File(temporaryDir + "/" + "tmpUpload.data");
-
+
RandomAccessFile random = new RandomAccessFile(tmpFile, "rw");
FileChannel channel = random.getChannel();
-
+
ByteBuffer buffer = ByteBuffer.allocate(4);
-
+
for (int i = 0; i < numberOfIntegers; i++)
{
buffer.rewind();
@@ -369,15 +364,15 @@
buffer.rewind();
channel.write(buffer);
}
-
+
channel.close();
random.close();
-
+
clientMessage.setFile(tmpFile);
-
+
return clientMessage;
}
-
+
/**
* @param session
* @param queueToRead
@@ -390,6 +385,8 @@
FileNotFoundException,
IOException
{
+ session.stop();
+
ClientConsumer consumer = session.createConsumer(queueToRead);
consumer.setLargeMessagesAsFiles(true);
@@ -397,11 +394,20 @@
session.start();
- FileClientMessage clientMessage = (FileClientMessage)consumer.receive(4000);
+ ClientMessage clientMessage = consumer.receive(4000);
- assertNotNull(clientMessage);
- File receivedFile = clientMessage.getFile();
+ if (!(clientMessage instanceof FileClientMessage))
+ {
+ System.out.println("Size = " + clientMessage.getBodySize());
+ }
+ assertTrue(clientMessage instanceof FileClientMessage);
+
+ FileClientMessage fileClientMessage = (FileClientMessage)clientMessage;
+
+ assertNotNull(fileClientMessage);
+ File receivedFile = fileClientMessage.getFile();
+
checkFileRead(receivedFile, numberOfIntegers);
clientMessage.processed();
More information about the jboss-cvs-commits
mailing list