[jboss-cvs] JBoss Messaging SVN: r5093 - in branches/Branch_Chunk_Clebert: src/main/org/jboss/messaging/core/config and 20 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Oct 8 23:09:56 EDT 2008
Author: clebert.suconic at jboss.com
Date: 2008-10-08 23:09:55 -0400 (Wed, 08 Oct 2008)
New Revision: 5093
Added:
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalServerLargeMessageImpl.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageServerLargeMessageImpl.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendChunkMessage.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/ServerLargeMessage.java
branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
Modified:
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/config/Configuration.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/SequentialFile.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/message/Message.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/StorageManager.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/Packet.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReceiveMessage.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/ServerProducer.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/ServerSession.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/MessagingServiceImpl.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/base/IntegrationTestBase.java
branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/remoting/DestroyConsumerTest.java
branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaRecoveryTest.java
branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/stress/paging/MultipleDestinationPagingTest.java
branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/timing/core/client/ScheduledMessageTest.java
branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/persistence/impl/journal/JournalStorageManagerTest.java
Log:
1st commit
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java 2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java 2008-10-09 03:09:55 UTC (rev 5093)
@@ -50,10 +50,6 @@
{
super();
- this.session = session;
-
- this.consumerID = consumerID;
-
this.deliveryCount = deliveryCount;
}
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java 2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java 2008-10-09 03:09:55 UTC (rev 5093)
@@ -1,32 +1,46 @@
/*
- * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors by
- * the @authors tag. See the copyright.txt in the distribution for a full listing of individual contributors. This is
- * free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version.
- * This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
- * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
- * details. You should have received a copy of the GNU Lesser General Public License along with this software; if not,
- * write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
- * site: http://www.fsf.org.
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.messaging.core.client.impl;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Semaphore;
+
import org.jboss.messaging.core.client.AcknowledgementHandler;
import org.jboss.messaging.core.client.ClientMessage;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.message.impl.MessageImpl;
import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionProducerCloseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendChunkMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionScheduledSendMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendManagementMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.util.SimpleString;
import org.jboss.messaging.util.TokenBucketLimiter;
-import java.util.concurrent.Semaphore;
-
/**
* The client-side Producer connectionFactory class.
*
@@ -43,8 +57,12 @@
// Attributes -----------------------------------------------------------------------------------
- private final boolean trace = log.isTraceEnabled();
+ // TODO This is temporary, make this better
+ private static final int BIG_PACKAGE_SIZE = 10 * 1024;
+ // TODO This is temporary, make this better
+ public static final int CHUNK_SIZE = 10 * 1024;
+
private final SimpleString address;
private final long id;
@@ -150,46 +168,46 @@
public void sendManagement(final ClientMessage msg) throws MessagingException
{
checkClosed();
-
+
if (address != null)
{
msg.setDestination(address);
}
else
{
- msg.setDestination(this.address);
+ msg.setDestination(address);
}
-
+
if (rateLimiter != null)
{
// Rate flow control
-
+
rateLimiter.limit();
}
-
+
boolean sendBlocking = msg.isDurable() ? blockOnPersistentSend : blockOnNonPersistentSend;
-
+
SessionSendManagementMessage message = new SessionSendManagementMessage(id, msg, false);
-
+
if (sendBlocking)
- {
+ {
channel.sendBlocking(message);
}
else
{
channel.send(message);
- }
-
- //We only flow control with non-anonymous producers
+ }
+
+ // We only flow control with non-anonymous producers
if (address == null && creditFlowControl)
{
try
{
- availableCredits.acquire(message.getClientMessage().getEncodeSize());
+ availableCredits.acquire(msg.getEncodeSize());
}
catch (InterruptedException e)
- {
- }
+ {
+ }
}
}
@@ -305,40 +323,88 @@
rateLimiter.limit();
}
- if(autoGroupId != null)
+ if (autoGroupId != null)
{
msg.putStringProperty(MessageImpl.GROUP_ID, autoGroupId);
}
boolean sendBlocking = msg.isDurable() ? blockOnPersistentSend : blockOnNonPersistentSend;
- SessionSendMessage message;
- //check to see if this message need to be scheduled.
- if(scheduledDeliveryTime <= 0)
+
+ if (msg.getEncodeSize() > BIG_PACKAGE_SIZE)
{
- message = new SessionSendMessage(id, msg, sendBlocking);
+ int headerSize = msg.getPropertiesEncodeSize();
+
+ if (headerSize > BIG_PACKAGE_SIZE)
+ {
+ throw new MessagingException(MessagingException.ILLEGAL_STATE,
+ "Header size is too big, use the messageBody for large data");
+ }
+
+ MessagingBuffer headerBuffer = new ByteBufferWrapper(ByteBuffer.allocate(headerSize));
+ msg.encodeProperties(headerBuffer);
+
+ final int bodySize = msg.getBodyEncodeSize();
+
+ int bodyLength = BIG_PACKAGE_SIZE - headerSize;
+
+ MessagingBuffer bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate(bodyLength));
+
+ msg.encodeBody(bodyBuffer, 0, bodyLength);
+
+ SessionSendChunkMessage chunk = new SessionSendChunkMessage(id,
+ headerBuffer.array(),
+ bodyBuffer.array(),
+ true,
+ true);
+
+ channel.sendBlocking(chunk);
+
+ for (int pos = bodyLength; pos < bodySize;)
+ {
+ bodyLength = Math.min(bodySize - pos, CHUNK_SIZE);
+ bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate(bodyLength));
+
+ msg.encodeBody(bodyBuffer, pos, bodyLength);
+
+ chunk = new SessionSendChunkMessage(id, null, bodyBuffer.array(), pos + bodyLength < bodySize, true);
+
+ channel.sendBlocking(chunk);
+
+ pos += bodyLength;
+ }
+
}
else
{
- message = new SessionScheduledSendMessage(id, msg, sendBlocking, scheduledDeliveryTime);
- }
+ SessionSendMessage message;
+ //check to see if this message need to be scheduled.
+ if(scheduledDeliveryTime <= 0)
+ {
+ message = new SessionSendMessage(id, msg, sendBlocking);
+ }
+ else
+ {
+ message = new SessionScheduledSendMessage(id, msg, sendBlocking, scheduledDeliveryTime);
+ }
- if (sendBlocking)
- {
- channel.sendBlocking(message);
+ if (sendBlocking)
+ {
+ channel.sendBlocking(message);
+ }
+ else
+ {
+ channel.send(message);
+ }
}
- else
- {
- channel.send(message);
- }
// We only flow control with non-anonymous producers
if (address == null && creditFlowControl)
{
try
{
- availableCredits.acquire(message.getClientMessage().getEncodeSize());
+ availableCredits.acquire(msg.getEncodeSize());
}
catch (InterruptedException e)
{
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/config/Configuration.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/config/Configuration.java 2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/config/Configuration.java 2008-10-09 03:09:55 UTC (rev 5093)
@@ -37,8 +37,7 @@
*/
public interface Configuration extends Serializable
{
- // General attributes
- // -------------------------------------------------------------------
+ // General attributes -------------------------------------------------------------------
boolean isClustered();
@@ -92,9 +91,10 @@
void setBackupConnectorConfiguration(TransportConfiguration config);
- // Journal related attributes
- // ------------------------------------------------------------
+ boolean isWildcardRoutingEnabled();
+ // Journal related attributes ------------------------------------------------------------
+
String getBindingsDirectory();
void setBindingsDirectory(String dir);
@@ -103,10 +103,6 @@
void setJournalDirectory(String dir);
- String getPagingDirectory();
-
- void setPagingDirectory(String dir);
-
JournalType getJournalType();
void setJournalType(JournalType type);
@@ -143,9 +139,20 @@
void setCreateJournalDir(boolean create);
+
+ // Paging Properties --------------------------------------------------------------------
+
+ String getPagingDirectory();
+
+ void setPagingDirectory(String dir);
+
long getPagingMaxGlobalSizeBytes();
void setPagingMaxGlobalSizeBytes(long maxGlobalSize);
-
- boolean isWildcardRoutingEnabled();
+
+ // Large Messages Properties ------------------------------------------------------------
+
+ String getLargeMessagesDirectory();
+
+ void setLargeMessagesDirectory(String directory);
}
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2008-10-09 03:09:55 UTC (rev 5093)
@@ -58,6 +58,8 @@
public static final String DEFAULT_JOURNAL_DIR = "data/journal";
public static final String DEFAULT_PAGING_DIR = "data/paging";
+
+ public static final String DEFAULT_LARGEMESSAGES_DIR = "data/largemessages";
public static final boolean DEFAULT_CREATE_JOURNAL_DIR = true;
@@ -105,14 +107,19 @@
protected TransportConfiguration backupConnectorConfig;
- // Paging related attributes
+ protected boolean wildcardRoutingEnabled = DEFAULT_WILDCARD_ROUTING_ENABLED;
+ // Paging related attributes ------------------------------------------------------------
+
protected long pagingMaxGlobalSize = -1;
protected String pagingDirectory = DEFAULT_PAGING_DIR;
+
- // Journal related attributes
+ // File related attributes -----------------------------------------------------------
+ protected String largeMessagesDirectory = DEFAULT_LARGEMESSAGES_DIR;
+
protected String bindingsDirectory = DEFAULT_BINDINGS_DIRECTORY;
protected boolean createBindingsDir = DEFAULT_CREATE_BINDINGS_DIR;
@@ -135,8 +142,6 @@
protected int journalBufferReuseSize = DEFAULT_JOURNAL_REUSE_BUFFER_SIZE;
- protected boolean wildcardRoutingEnabled = DEFAULT_WILDCARD_ROUTING_ENABLED;
-
public boolean isClustered()
{
return clustered;
@@ -401,6 +406,18 @@
{
pagingMaxGlobalSize = maxGlobalSize;
}
+
+
+ public String getLargeMessagesDirectory()
+ {
+ return largeMessagesDirectory;
+ }
+
+ public void setLargeMessagesDirectory(final String directory)
+ {
+ this.largeMessagesDirectory = directory;
+ }
+
@Override
public boolean equals(final Object other)
@@ -424,6 +441,7 @@
cother.isRequireDestinations() == isRequireDestinations() &&
cother.isSecurityEnabled() == isSecurityEnabled() &&
cother.isWildcardRoutingEnabled() == isWildcardRoutingEnabled() &&
+ cother.getLargeMessagesDirectory().equals(getLargeMessagesDirectory()) &&
cother.getBindingsDirectory().equals(getBindingsDirectory()) &&
cother.getJournalDirectory().equals(getJournalDirectory()) &&
cother.getJournalFileSize() == getJournalFileSize() &&
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java 2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java 2008-10-09 03:09:55 UTC (rev 5093)
@@ -306,6 +306,8 @@
// Persistence config
+ largeMessagesDirectory = getString(e, "large-messages-directory", largeMessagesDirectory);
+
bindingsDirectory = getString(e, "bindings-directory", bindingsDirectory);
createBindingsDir = getBoolean(e, "create-bindings-dir", createBindingsDir);
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/SequentialFile.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/SequentialFile.java 2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/SequentialFile.java 2008-10-09 03:09:55 UTC (rev 5093)
@@ -38,6 +38,8 @@
* Creates the file if it doesn't already exist, then opens it
*/
void open() throws Exception;
+
+ boolean isOpen();
/**
* For certain operations (like loading) we don't need open the file with full maxIO
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2008-10-09 03:09:55 UTC (rev 5093)
@@ -74,6 +74,11 @@
this.maxIO = maxIO;
}
+ public boolean isOpen()
+ {
+ return opened;
+ }
+
public int getAlignment() throws Exception
{
checkOpened();
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java 2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java 2008-10-09 03:09:55 UTC (rev 5093)
@@ -77,6 +77,11 @@
{
return fileName;
}
+
+ public boolean isOpen()
+ {
+ return file != null;
+ }
public synchronized void open() throws Exception
{
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/message/Message.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/message/Message.java 2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/message/Message.java 2008-10-09 03:09:55 UTC (rev 5093)
@@ -74,6 +74,26 @@
void decode(MessagingBuffer buffer);
+
+ int getPropertiesEncodeSize();
+
+ void encodeProperties(MessagingBuffer buffer);
+
+ void decodeProperties(MessagingBuffer buffer);
+
+
+ int getBodyEncodeSize();
+
+
+ // Used on Message chunk
+ void encodeBody(MessagingBuffer buffer, int start, int size);
+
+ void encodeBody(MessagingBuffer buffer);
+
+ void decodeBody(MessagingBuffer buffer);
+
+
+
// Properties
// ------------------------------------------------------------------
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java 2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java 2008-10-09 03:09:55 UTC (rev 5093)
@@ -18,19 +18,26 @@
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ */
package org.jboss.messaging.core.message.impl;
+import static org.jboss.messaging.util.DataConstants.SIZE_BOOLEAN;
+import static org.jboss.messaging.util.DataConstants.SIZE_BYTE;
+import static org.jboss.messaging.util.DataConstants.SIZE_INT;
+import static org.jboss.messaging.util.DataConstants.SIZE_LONG;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
+
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.message.Message;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-import static org.jboss.messaging.util.DataConstants.*;
+import org.jboss.messaging.core.server.ServerLargeMessage;
import org.jboss.messaging.util.SimpleString;
import org.jboss.messaging.util.TypedProperties;
-import java.util.Set;
-
/**
* A concrete implementation of a message
*
@@ -61,7 +68,7 @@
// Attributes ----------------------------------------------------
protected long messageID;
-
+
private SimpleString destination;
private byte type;
@@ -83,7 +90,7 @@
protected MessageImpl()
{
- this.properties = new TypedProperties();
+ properties = new TypedProperties();
}
/**
@@ -95,8 +102,12 @@
* @param priority
* @param body
*/
- protected MessageImpl(final byte type, final boolean durable, final long expiration,
- final long timestamp, final byte priority, MessagingBuffer body)
+ protected MessageImpl(final byte type,
+ final boolean durable,
+ final long expiration,
+ final long timestamp,
+ final byte priority,
+ final MessagingBuffer body)
{
this();
this.type = type;
@@ -113,17 +124,17 @@
protected MessageImpl(final MessageImpl other)
{
this();
- this.messageID = other.messageID;
- this.destination = other.destination;
- this.type = other.type;
- this.durable = other.durable;
- this.expiration = other.expiration;
- this.timestamp = other.timestamp;
- this.priority = other.priority;
- this.properties = new TypedProperties(other.properties);
- this.body = other.body;
+ messageID = other.messageID;
+ destination = other.destination;
+ type = other.type;
+ durable = other.durable;
+ expiration = other.expiration;
+ timestamp = other.timestamp;
+ priority = other.priority;
+ properties = new TypedProperties(other.properties);
+ body = other.body;
}
-
+
protected MessageImpl(final long messageID)
{
this();
@@ -132,34 +143,69 @@
// Message implementation ----------------------------------------
- public void encode(MessagingBuffer buff)
+ public void encode(final MessagingBuffer buffer)
{
- buff.putLong(messageID);
- buff.putSimpleString(destination);
- buff.putByte(type);
- buff.putBoolean(durable);
- buff.putLong(expiration);
- buff.putLong(timestamp);
- buff.putByte(priority);
- properties.encode(buff);
- buff.putInt(body.limit());
- buff.putBytes(body.array(), 0, body.limit());
+ encodeProperties(buffer);
+ encodeBody(buffer);
}
public int getEncodeSize()
{
- return SIZE_LONG + /* Destination */ SimpleString.sizeofString(destination) +
- /* Type */ SIZE_BYTE +
- /* Durable */ SIZE_BOOLEAN +
- /* Expiration */ SIZE_LONG +
- /* Timestamp */ SIZE_LONG +
- /* Priority */ SIZE_BYTE +
- /* PropertySize and Properties */ properties.getEncodeSize() +
- /* BodySize and Body */ SIZE_INT + body.limit();
+ return getPropertiesEncodeSize() + getBodyEncodeSize();
}
+ public int getPropertiesEncodeSize()
+ {
+ return SIZE_LONG + /* Destination */SimpleString.sizeofString(destination) +
+ /* Type */SIZE_BYTE +
+ /* Durable */SIZE_BOOLEAN +
+ /* Expiration */SIZE_LONG +
+ /* Timestamp */SIZE_LONG +
+ /* Priority */SIZE_BYTE +
+ /* PropertySize and Properties */properties.getEncodeSize();
+ }
+
+ public int getBodyEncodeSize()
+ {
+ return /* BodySize and Body */SIZE_INT + body.limit();
+ }
+
+
+ public void encodeProperties(MessagingBuffer buffer)
+ {
+ buffer.putLong(messageID);
+ buffer.putSimpleString(destination);
+ buffer.putByte(type);
+ buffer.putBoolean(durable);
+ buffer.putLong(expiration);
+ buffer.putLong(timestamp);
+ buffer.putByte(priority);
+ properties.encode(buffer);
+ }
+
+
+ public void encodeBody(MessagingBuffer buffer)
+ {
+ buffer.putInt(body.limit());
+ buffer.putBytes(body.array(), 0, body.limit());
+ }
+
+ // Used on Message chunk
+ public void encodeBody(MessagingBuffer buffer, int start, int size)
+ {
+ buffer.putBytes(body.array(), start, size);
+ }
+
+
public void decode(final MessagingBuffer buffer)
{
+ decodeProperties(buffer);
+
+ decodeBody(buffer);
+ }
+
+ public void decodeProperties(final MessagingBuffer buffer)
+ {
messageID = buffer.getLong();
destination = buffer.getSimpleString();
type = buffer.getByte();
@@ -167,32 +213,33 @@
expiration = buffer.getLong();
timestamp = buffer.getLong();
priority = buffer.getByte();
+ properties.decode(buffer);
+ }
- properties.decode(buffer);
+ public void decodeBody(final MessagingBuffer buffer)
+ {
int len = buffer.getInt();
-
- //TODO - this can be optimised
+ // TODO - this can be optimised
byte[] bytes = new byte[len];
buffer.getBytes(bytes);
- body = buffer.createNewBuffer(len);
- body.putBytes(bytes);
+ this.body = new ByteBufferWrapper(ByteBuffer.wrap(bytes));;
}
-
+
public long getMessageID()
{
return messageID;
}
-
+
public SimpleString getDestination()
{
return destination;
}
-
- public void setDestination(SimpleString destination)
+
+ public void setDestination(final SimpleString destination)
{
this.destination = destination;
}
-
+
public byte getType()
{
return type;
@@ -202,7 +249,7 @@
{
return durable;
}
-
+
public void setDurable(final boolean durable)
{
this.durable = durable;
@@ -222,12 +269,12 @@
{
return timestamp;
}
-
+
public void setTimestamp(final long timestamp)
{
this.timestamp = timestamp;
}
-
+
public byte getPriority()
{
return priority;
@@ -237,105 +284,105 @@
{
this.priority = priority;
}
-
+
public boolean isExpired()
{
if (expiration == 0)
{
return false;
}
-
+
return System.currentTimeMillis() - expiration >= 0;
}
-
- // Properties
+
+ // Properties
// ---------------------------------------------------------------------------------------
-
+
public void putBooleanProperty(final SimpleString key, final boolean value)
{
properties.putBooleanProperty(key, value);
}
-
+
public void putByteProperty(final SimpleString key, final byte value)
{
properties.putByteProperty(key, value);
}
-
+
public void putBytesProperty(final SimpleString key, final byte[] value)
{
properties.putBytesProperty(key, value);
}
-
+
public void putShortProperty(final SimpleString key, final short value)
{
properties.putShortProperty(key, value);
}
-
+
public void putIntProperty(final SimpleString key, final int value)
{
properties.putIntProperty(key, value);
}
-
+
public void putLongProperty(final SimpleString key, final long value)
{
properties.putLongProperty(key, value);
}
-
+
public void putFloatProperty(final SimpleString key, final float value)
{
properties.putFloatProperty(key, value);
}
-
+
public void putDoubleProperty(final SimpleString key, final double value)
{
properties.putDoubleProperty(key, value);
}
-
+
public void putStringProperty(final SimpleString key, final SimpleString value)
{
properties.putStringProperty(key, value);
}
-
+
public Object getProperty(final SimpleString key)
{
return properties.getProperty(key);
- }
-
+ }
+
public Object removeProperty(final SimpleString key)
{
return properties.removeProperty(key);
}
-
+
public boolean containsProperty(final SimpleString key)
{
return properties.containsProperty(key);
}
-
+
public Set<SimpleString> getPropertyNames()
{
return properties.getPropertyNames();
}
-
+
// Body
// -------------------------------------------------------------------------------------
-
+
public MessagingBuffer getBody()
{
return body;
}
-
+
public void setBody(final MessagingBuffer body)
{
this.body = body;
}
-
+
// Public --------------------------------------------------------
-
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
- // Inner classes -------------------------------------------------
+ // Inner classes -------------------------------------------------
}
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/StorageManager.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/StorageManager.java 2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/StorageManager.java 2008-10-09 03:09:55 UTC (rev 5093)
@@ -30,6 +30,7 @@
import org.jboss.messaging.core.server.MessagingComponent;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.QueueFactory;
+import org.jboss.messaging.core.server.ServerLargeMessage;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.transaction.ResourceManager;
import org.jboss.messaging.util.SimpleString;
@@ -67,6 +68,11 @@
void storeMessageScheduledTransactional(final long txID,final ServerMessage message, final long scheduledDeliveryTime) throws Exception;
void storeDeleteMessageTransactional(long txID, long queueID, long messageID) throws Exception;
+
+ /** Create an area that will get LargeMessage bytes on the server size*/
+ ServerLargeMessage createLargeMessageStorage(final long messageID) throws Exception;
+
+
/** Used to delete non-messaging data (such as PageTransaction and LasPage) */
void storeDeleteTransactional(long txID, long recordID) throws Exception;
Added: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalServerLargeMessageImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalServerLargeMessageImpl.java (rev 0)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalServerLargeMessageImpl.java 2008-10-09 03:09:55 UTC (rev 5093)
@@ -0,0 +1,155 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.persistence.impl.journal;
+
+import java.nio.ByteBuffer;
+
+import org.jboss.messaging.core.client.impl.ClientProducerImpl;
+import org.jboss.messaging.core.journal.SequentialFile;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.core.server.ServerLargeMessage;
+import org.jboss.messaging.core.server.impl.ServerMessageImpl;
+
+/**
+ * A ServerLargeMessageImpl
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ * Created 30-Sep-08 12:02:45 PM
+ *
+ *
+ */
+public class JournalServerLargeMessageImpl extends ServerMessageImpl implements ServerLargeMessage
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ final SequentialFile file;
+
+ ByteBuffer headersBuffer;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public JournalServerLargeMessageImpl(final long id, final SequentialFile file)
+ {
+ super(id);
+ this.file = file;
+ }
+
+ // Public --------------------------------------------------------
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.server.ServerLargeMessage#addBytes(byte[])
+ */
+ public void addBytes(final byte[] bytes) throws Exception
+ {
+ if (!file.isOpen())
+ {
+ file.open();
+ }
+
+ file.write(ByteBuffer.wrap(bytes), false);
+
+ }
+
+ @Override
+ public void encodeBody(final MessagingBuffer bufferOut)
+ {
+ try
+ {
+ ByteBuffer bufferRead = ByteBuffer.allocate(ClientProducerImpl.CHUNK_SIZE);
+ if (!file.isOpen())
+ {
+ file.open();
+ }
+
+ int bytesRead = 0;
+ file.position(0);
+ do
+ {
+ bufferRead.clear();
+ bytesRead = file.read(bufferRead);
+ bufferRead.flip();
+
+ if (bytesRead > 0)
+ {
+ bufferOut.putBytes(bufferRead.array(), 0, bytesRead);
+ }
+ }
+ while (bytesRead == ClientProducerImpl.CHUNK_SIZE);
+
+ releaseResources();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public int getBodyEncodeSize()
+ {
+ try
+ {
+ if (!file.isOpen())
+ {
+ file.open();
+ }
+
+ return (int)file.size();
+ }
+
+ catch (Exception e)
+ {
+ throw new RuntimeException("Can't get the file size on " + file.getFileName());
+ }
+ }
+
+ @Override
+ public int getMemoryEstimate()
+ {
+ // The body won't be on memory (aways on-file), so we don't consider this for paging
+ return super.getPropertiesEncodeSize();
+ }
+
+ public void releaseResources() throws Exception
+ {
+ if (file.isOpen())
+ {
+ file.close();
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-10-09 03:09:55 UTC (rev 5093)
@@ -22,7 +22,24 @@
package org.jboss.messaging.core.persistence.impl.journal;
+import static org.jboss.messaging.util.DataConstants.SIZE_BYTE;
+import static org.jboss.messaging.util.DataConstants.SIZE_INT;
+import static org.jboss.messaging.util.DataConstants.SIZE_LONG;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.transaction.xa.Xid;
+
import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.filter.impl.FilterImpl;
import org.jboss.messaging.core.journal.EncodingSupport;
@@ -50,6 +67,7 @@
import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.QueueFactory;
+import org.jboss.messaging.core.server.ServerLargeMessage;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.impl.ServerMessageImpl;
import org.jboss.messaging.core.transaction.ResourceManager;
@@ -59,17 +77,6 @@
import org.jboss.messaging.util.SimpleString;
import org.jboss.messaging.util.TimeAndCounterIDGenerator;
-import javax.transaction.xa.Xid;
-import java.io.File;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicLong;
-
/**
*
* A JournalStorageManager
@@ -83,12 +90,6 @@
{
private static final Logger log = Logger.getLogger(JournalStorageManager.class);
- private static final int SIZE_LONG = 8;
-
- private static final int SIZE_INT = 4;
-
- private static final int SIZE_BYTE = 1;
-
// Bindings journal record type
public static final byte BINDING_RECORD = 21;
@@ -100,6 +101,8 @@
// Message journal record types
+ public static final byte ADD_LARGE_MESSAGE = 30;
+
public static final byte ADD_MESSAGE = 31;
public static final byte ACKNOWLEDGE_REF = 32;
@@ -112,7 +115,7 @@
public static final byte SET_SCHEDULED_DELIVERY_TIME = 44;
- //This will produce a unique id **for this node only**
+ // This will produce a unique id **for this node only**
private final IDGenerator idGenerator = new TimeAndCounterIDGenerator();
private final AtomicLong bindingIDSequence = new AtomicLong(0);
@@ -121,6 +124,8 @@
private final Journal bindingsJournal;
+ private final SequentialFileFactory largeMessagesFactory;
+
private final ConcurrentMap<SimpleString, Long> destinationIDMap = new ConcurrentHashMap<SimpleString, Long>();
private volatile boolean started;
@@ -191,13 +196,18 @@
"jbm",
config.getJournalMaxAIO(),
config.getJournalBufferReuseSize());
+
+ largeMessagesFactory = new NIOSequentialFileFactory(config.getLargeMessagesDirectory());
}
/* This constructor is only used for testing */
- public JournalStorageManager(final Journal messageJournal, final Journal bindingsJournal)
+ public JournalStorageManager(final Journal messageJournal,
+ final Journal bindingsJournal,
+ final SequentialFileFactory largeMessagesFactory)
{
this.messageJournal = messageJournal;
this.bindingsJournal = bindingsJournal;
+ this.largeMessagesFactory = largeMessagesFactory;
}
public long generateUniqueID()
@@ -205,11 +215,32 @@
return idGenerator.generateID();
}
+ /** Create an area that will get LargeMessage bytes on the server size*/
+ public ServerLargeMessage createLargeMessageStorage(final long messageID) throws Exception
+ {
+ return new JournalServerLargeMessageImpl(messageID, largeMessagesFactory.createSequentialFile(messageID + ".msg",
+ -1));
+ }
+
// Non transactional operations
public void storeMessage(final ServerMessage message) throws Exception
{
- messageJournal.appendAddRecord(message.getMessageID(), ADD_MESSAGE, message);
+ if (message.getMessageID() <= 0)
+ {
+ throw new MessagingException(MessagingException.ILLEGAL_STATE, "MessageId was not assigned to Message");
+ }
+
+ if (message instanceof ServerLargeMessage)
+ {
+ messageJournal.appendAddRecord(message.getMessageID(),
+ ADD_LARGE_MESSAGE,
+ new LargeMessageEncoding((ServerLargeMessage)message));
+ }
+ else
+ {
+ messageJournal.appendAddRecord(message.getMessageID(), ADD_MESSAGE, message);
+ }
}
public void storeAcknowledge(final long queueID, final long messageID) throws Exception
@@ -231,7 +262,23 @@
public void storeMessageTransactional(final long txID, final ServerMessage message) throws Exception
{
- messageJournal.appendAddRecordTransactional(txID, message.getMessageID(), ADD_MESSAGE, message);
+ if (message.getMessageID() <= 0)
+ {
+ throw new MessagingException(MessagingException.ILLEGAL_STATE, "MessageId was not assigned to Message");
+ }
+
+ if (message instanceof ServerLargeMessage)
+ {
+ messageJournal.appendAddRecordTransactional(txID,
+ message.getMessageID(),
+ ADD_LARGE_MESSAGE,
+ new LargeMessageEncoding(((ServerLargeMessage)message)));
+ }
+ else
+ {
+ messageJournal.appendAddRecordTransactional(txID, message.getMessageID(), ADD_MESSAGE, message);
+ }
+
}
public void storePageTransaction(final long txID, final PageTransactionInfo pageTransaction) throws Exception
@@ -332,6 +379,23 @@
switch (recordType)
{
+ case ADD_LARGE_MESSAGE:
+ {
+ ServerLargeMessage largeMessage = this.createLargeMessageStorage(record.id);
+
+ LargeMessageEncoding messageEncoding = new LargeMessageEncoding(largeMessage);
+
+ messageEncoding.decode(buff);
+
+ List<MessageReference> refs = postOffice.route(largeMessage);
+
+ for (MessageReference ref : refs)
+ {
+ ref.getQueue().addLast(ref);
+ }
+
+ break;
+ }
case ADD_MESSAGE:
{
ServerMessage message = new ServerMessageImpl(record.id);
@@ -883,7 +947,7 @@
public int getEncodeSize()
{
return SimpleString.sizeofString(queueName) + SimpleString.sizeofString(address) + 1 + // HasFilter?
- ((filter != null) ? SimpleString.sizeofString(filter) : 0);
+ (filter != null ? SimpleString.sizeofString(filter) : 0);
}
}
@@ -917,6 +981,42 @@
}
+ private static class LargeMessageEncoding implements EncodingSupport
+ {
+
+ private final ServerLargeMessage message;
+
+ public LargeMessageEncoding(ServerLargeMessage message)
+ {
+ this.message = message;
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.journal.EncodingSupport#decode(org.jboss.messaging.core.remoting.spi.MessagingBuffer)
+ */
+ public void decode(final MessagingBuffer buffer)
+ {
+ message.decodeProperties(buffer);
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.journal.EncodingSupport#encode(org.jboss.messaging.core.remoting.spi.MessagingBuffer)
+ */
+ public void encode(final MessagingBuffer buffer)
+ {
+ message.encodeProperties(buffer);
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.journal.EncodingSupport#getEncodeSize()
+ */
+ public int getEncodeSize()
+ {
+ return message.getPropertiesEncodeSize();
+ }
+
+ }
+
private static class DeliveryCountUpdateEncoding implements EncodingSupport
{
long queueID;
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java 2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java 2008-10-09 03:09:55 UTC (rev 5093)
@@ -31,6 +31,7 @@
import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.QueueFactory;
+import org.jboss.messaging.core.server.ServerLargeMessage;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.transaction.ResourceManager;
import org.jboss.messaging.util.IDGenerator;
@@ -145,7 +146,16 @@
public void updateDeliveryCount(MessageReference ref) throws Exception
{
}
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.persistence.StorageManager#createLargeMessageStorage(long, int, int)
+ */
+ public ServerLargeMessage createLargeMessageStorage(final long messageID) throws Exception
+ {
+ return new NullStorageServerLargeMessageImpl(messageID);
+ }
+
public long generateUniqueID()
{
//FIXME - this needs to use Howard's ID generator from JBM 1.4
Added: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageServerLargeMessageImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageServerLargeMessageImpl.java (rev 0)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageServerLargeMessageImpl.java 2008-10-09 03:09:55 UTC (rev 5093)
@@ -0,0 +1,97 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.persistence.impl.nullpm;
+
+import java.nio.ByteBuffer;
+
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.core.server.ServerLargeMessage;
+import org.jboss.messaging.core.server.impl.ServerMessageImpl;
+
+/**
+ * A NullStorageServerLargeMessageImpl
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ * Created 30-Sep-08 1:51:42 PM
+ *
+ *
+ */
+public class NullStorageServerLargeMessageImpl extends ServerMessageImpl implements ServerLargeMessage
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public NullStorageServerLargeMessageImpl(final long messageID)
+ {
+ super(messageID);
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.server.ServerLargeMessage#release()
+ */
+ public void releaseResources() throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.server.ServerLargeMessage#addBytes(byte[])
+ */
+ public synchronized void addBytes(final byte[] bytes)
+ {
+ MessagingBuffer buffer = this.getBody();
+
+ if (buffer != null)
+ {
+ ByteBuffer newBuffer = ByteBuffer.allocate(buffer.limit() + bytes.length);
+ newBuffer.put(buffer.array());
+ buffer = new ByteBufferWrapper(newBuffer);
+ this.setBody(buffer);
+ }
+ else
+ {
+ buffer = new ByteBufferWrapper(ByteBuffer.allocate(bytes.length));
+ this.setBody(buffer);
+ }
+
+ buffer.putBytes(bytes);
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/Packet.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/Packet.java 2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/Packet.java 2008-10-09 03:09:55 UTC (rev 5093)
@@ -45,6 +45,8 @@
void decode(MessagingBuffer buffer);
+ int getPacketSize();
+
boolean isRequiresConfirmations();
boolean isReplicateBlocking();
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-10-09 03:09:55 UTC (rev 5093)
@@ -12,21 +12,6 @@
package org.jboss.messaging.core.remoting.impl;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.Channel;
-import org.jboss.messaging.core.remoting.ChannelHandler;
-import org.jboss.messaging.core.remoting.FailureListener;
-import org.jboss.messaging.core.remoting.Interceptor;
-import org.jboss.messaging.core.remoting.Packet;
-import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.ResponseNotifier;
-import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.DuplicablePacket;
-import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATESESSION;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATESESSION_RESP;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.EXCEPTION;
@@ -44,6 +29,7 @@
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_HASNEXTMESSAGE_RESP;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_NEXTMESSAGE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_RESET;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CHUNK_SEND;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CLOSE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_COMMIT;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CONSUMER_CLOSE;
@@ -84,6 +70,37 @@
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SET_TIMEOUT_RESP;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_START;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SUSPEND;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.ChannelHandler;
+import org.jboss.messaging.core.remoting.FailureListener;
+import org.jboss.messaging.core.remoting.Interceptor;
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.ResponseNotifier;
+import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.DuplicablePacket;
+import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
import org.jboss.messaging.core.remoting.impl.wireformat.PacketsConfirmedMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
@@ -116,6 +133,7 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionScheduledSendMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendChunkMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendManagementMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
@@ -138,21 +156,6 @@
import org.jboss.messaging.util.OrderedExecutorFactory;
import org.jboss.messaging.util.SimpleIDGenerator;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
/**
* @author <a href="tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -488,8 +491,8 @@
private void doWrite(final Packet packet)
{
- final MessagingBuffer buffer = transportConnection.createBuffer(PacketImpl.INITIAL_BUFFER_SIZE);
-
+ //System.out.println(packet.getClass().getCanonicalName() + " size = " + packet.getPacketSize());
+ final MessagingBuffer buffer = transportConnection.createBuffer(packet.getPacketSize());
packet.encode(buffer);
transportConnection.write(buffer);
@@ -793,6 +796,11 @@
packet = new SessionSendManagementMessage();
break;
}
+ case SESS_CHUNK_SEND:
+ {
+ packet = new SessionSendChunkMessage();
+ break;
+ }
default:
{
throw new IllegalArgumentException("Invalid type: " + packetType);
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java 2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java 2008-10-09 03:09:55 UTC (rev 5093)
@@ -28,7 +28,9 @@
private static final Logger log = Logger.getLogger(PacketImpl.class);
- public static final int INITIAL_BUFFER_SIZE = 1024;
+ public static final int DEFAULT_PACKET_SIZE = 1024;
+
+ protected static final int BASIC_PACKET_SIZE = DataConstants.SIZE_INT + DataConstants.SIZE_BYTE + DataConstants.SIZE_LONG;
private long channelID;
@@ -159,6 +161,8 @@
public static final byte SESS_SCHEDULED_SEND = 91;
+ public static final byte SESS_CHUNK_SEND = 95;
+
// Static --------------------------------------------------------
public PacketImpl(final byte type)
@@ -168,6 +172,12 @@
// Public --------------------------------------------------------
+
+ public int getPacketSize()
+ {
+ return DEFAULT_PACKET_SIZE;
+ }
+
public byte getType()
{
return type;
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReceiveMessage.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReceiveMessage.java 2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReceiveMessage.java 2008-10-09 03:09:55 UTC (rev 5093)
@@ -27,6 +27,7 @@
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.util.DataConstants;
/**
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
@@ -95,6 +96,12 @@
return deliveryCount;
}
+
+ public int getPacketSize()
+ {
+ return BASIC_PACKET_SIZE + DataConstants.SIZE_LONG + DataConstants.SIZE_INT + serverMessage.getEncodeSize();
+ }
+
public void encodeBody(final MessagingBuffer buffer)
{
buffer.putLong(consumerID);
Added: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendChunkMessage.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendChunkMessage.java (rev 0)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendChunkMessage.java 2008-10-09 03:09:55 UTC (rev 5093)
@@ -0,0 +1,192 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.remoting.impl.wireformat;
+
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.util.DataConstants;
+
+/**
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ * @version <tt>$Revision$</tt>
+ */
+public class SessionSendChunkMessage extends PacketImpl
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private long targetID;
+
+ private byte[] header;
+
+ private byte[] body;
+
+ private boolean continues;
+
+ private long messageID = 0;
+
+ private boolean requiresResponse;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public SessionSendChunkMessage(final long targetID,
+ final byte[] header,
+ final byte[] body,
+ final boolean continues,
+ final boolean requiresResponse)
+ {
+ super(SESS_CHUNK_SEND);
+ this.targetID = targetID;
+ this.header = header;
+ this.body = body;
+ this.continues = continues;
+ this.requiresResponse = requiresResponse;
+ }
+
+ public SessionSendChunkMessage()
+ {
+ super(SESS_CHUNK_SEND);
+ }
+
+ // Public --------------------------------------------------------
+
+ public long getTargetID()
+ {
+ return targetID;
+ }
+
+ public boolean isRequiresResponse()
+ {
+ return requiresResponse;
+ }
+
+ public byte[] getHeader()
+ {
+ return header;
+ }
+
+ public byte[] getBody()
+ {
+ return body;
+ }
+
+ public long getMessageID()
+ {
+ return messageID;
+ }
+
+ public void setMessageID(final long messageId)
+ {
+ messageID = messageId;
+ }
+
+ public boolean isContinues()
+ {
+ return continues;
+ }
+
+ @Override
+ public int getPacketSize()
+ {
+ return DEFAULT_PACKET_SIZE + DataConstants.SIZE_LONG /* TargetID */+
+ DataConstants.SIZE_INT /* HeaderLength */+
+ (header != null ? header.length : 0) /* Header bytes */+
+ DataConstants.SIZE_INT /* BodyLength */+
+ body.length /* Body bytes */+
+ DataConstants.SIZE_BOOLEAN /* hasContinuations */+
+ DataConstants.SIZE_BOOLEAN /* requiresResponse */+
+ DataConstants.SIZE_BOOLEAN /* has MessageId */+
+ (messageID > 0 ? DataConstants.SIZE_LONG : 0);
+ }
+
+ @Override
+ public void encodeBody(final MessagingBuffer buffer)
+ {
+ buffer.putLong(targetID);
+
+ if (header != null)
+ {
+ buffer.putInt(header.length);
+ buffer.putBytes(header);
+ }
+ else
+ {
+ buffer.putInt(0);
+ }
+
+ buffer.putInt(body.length);
+ buffer.putBytes(body);
+
+ buffer.putBoolean(continues);
+
+ buffer.putBoolean(requiresResponse);
+
+ buffer.putBoolean(messageID > 0);
+
+ if (messageID > 0)
+ {
+ buffer.putLong(messageID);
+ }
+ }
+
+ @Override
+ public void decodeBody(final MessagingBuffer buffer)
+ {
+ buffer.putLong(targetID);
+
+ final int headerLength = buffer.getInt();
+
+ if (headerLength > 0)
+ {
+ header = new byte[headerLength];
+ buffer.getBytes(header);
+ }
+
+ final int bodyLength = buffer.getInt();
+
+ body = new byte[bodyLength];
+ buffer.getBytes(body);
+
+ continues = buffer.getBoolean();
+
+ requiresResponse = buffer.getBoolean();
+
+ final boolean hasMessageID = buffer.getBoolean();
+
+ if (hasMessageID)
+ {
+ messageID = buffer.getLong();
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Added: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/ServerLargeMessage.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/ServerLargeMessage.java (rev 0)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/ServerLargeMessage.java 2008-10-09 03:09:55 UTC (rev 5093)
@@ -0,0 +1,41 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.server;
+
+/**
+ * A LargeMessage
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ * Created 30-Sep-08 10:58:04 AM
+ *
+ *
+ */
+public interface ServerLargeMessage extends ServerMessage
+{
+ void addBytes(byte[] bytes) throws Exception;
+
+ /** Close the files if opened */
+ void releaseResources() throws Exception;
+
+}
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/ServerProducer.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/ServerProducer.java 2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/ServerProducer.java 2008-10-09 03:09:55 UTC (rev 5093)
@@ -18,11 +18,10 @@
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ */
package org.jboss.messaging.core.server;
-
/**
*
* A ServerProducer
@@ -39,6 +38,12 @@
void send(ServerMessage msg) throws Exception;
+ /** Current LargeMessage being sent in chunks */
+ ServerLargeMessage getCurrentChunk();
+
+ /** Current LargeMessage being sent in chunks */
+ void setCurrentChunk(ServerLargeMessage message);
+
void sendScheduled(ServerMessage message, long scheduledDeliveryTime) throws Exception;
void requestAndSendCredits() throws Exception;
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/ServerSession.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/ServerSession.java 2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/ServerSession.java 2008-10-09 03:09:55 UTC (rev 5093)
@@ -145,6 +145,12 @@
void sendProducerMessage(long producerID, ServerMessage message) throws Exception;
+ ServerLargeMessage getCurrentLargeMessage(long producerID);
+
+ ServerLargeMessage createLargeMessageStorage(long producerID, byte[] header) throws Exception;
+
+ void clearCurrentLargeMessage(long producerID);
+
void sendScheduledProducerMessage(long producerID, ServerMessage serverMessage, long scheduledDeliveryTime) throws Exception;
boolean browserHasNextMessage(long browserID) throws Exception;
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/MessagingServiceImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/MessagingServiceImpl.java 2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/MessagingServiceImpl.java 2008-10-09 03:09:55 UTC (rev 5093)
@@ -24,6 +24,7 @@
import org.jboss.messaging.core.config.Configuration;
import org.jboss.messaging.core.config.impl.ConfigurationImpl;
import org.jboss.messaging.core.journal.Journal;
+import org.jboss.messaging.core.journal.SequentialFileFactory;
import org.jboss.messaging.core.journal.impl.JournalImpl;
import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
import org.jboss.messaging.core.management.ManagementService;
@@ -78,7 +79,7 @@
return new MessagingServiceImpl(server, storageManager, remotingService);
}
- public static MessagingServiceImpl newNioStorageMessagingServer(final Configuration config, String journalDir, String bindingsDir)
+ public static MessagingServiceImpl newNioStorageMessagingServer(final Configuration config, String journalDir, String bindingsDir, String largeMessagesDir)
{
NIOSequentialFileFactory sequentialFileFactory = new NIOSequentialFileFactory(journalDir);
NIOSequentialFileFactory sequentialFileFactory2 = new NIOSequentialFileFactory(bindingsDir);
@@ -92,8 +93,10 @@
config.getJournalMinFiles(), config.isJournalSyncTransactional(),
config.isJournalSyncNonTransactional(), sequentialFileFactory,
"jbm-bindings", "jbm", config.getJournalMaxAIO(), 0);
+
+ SequentialFileFactory largeMessagesFactory = new NIOSequentialFileFactory(largeMessagesDir);
- StorageManager storageManager = new JournalStorageManager(msgs, bindings);
+ StorageManager storageManager = new JournalStorageManager(msgs, bindings, largeMessagesFactory);
RemotingService remotingService = new RemotingServiceImpl(config);
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java 2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java 2008-10-09 03:09:55 UTC (rev 5093)
@@ -18,50 +18,52 @@
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ */
package org.jboss.messaging.core.server.impl;
-import org.jboss.messaging.core.logging.Logger;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.postoffice.FlowController;
import org.jboss.messaging.core.remoting.Channel;
import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionProducerFlowCreditMessage;
+import org.jboss.messaging.core.server.ServerLargeMessage;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.ServerProducer;
import org.jboss.messaging.core.server.ServerSession;
import org.jboss.messaging.util.SimpleString;
-import java.util.concurrent.atomic.AtomicInteger;
-
/**
*
* A ServerProducerImpl
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
* @author <a href="mailto:andy.taylor at jboss.org>Andy Taylor</a>
*
*/
public class ServerProducerImpl implements ServerProducer
{
- private static final Logger log = Logger.getLogger(ServerProducerImpl.class);
-
- private final long id;
-
- private final ServerSession session;
-
- private final SimpleString address;
-
- private final FlowController flowController;
-
- private final int windowSize;
-
- private volatile boolean waiting;
-
- private AtomicInteger creditsToSend = new AtomicInteger(0);
-
+ private final long id;
+
+ private final ServerSession session;
+
+ private final SimpleString address;
+
+ private final FlowController flowController;
+
+ private final int windowSize;
+
+ private volatile boolean waiting;
+
+ private final AtomicInteger creditsToSend = new AtomicInteger(0);
+
private final Channel channel;
-
+
+ private ServerLargeMessage currentlargeMessage;
+
// Constructors ----------------------------------------------------------------
public ServerProducerImpl(final long id, final ServerSession session,
@@ -109,35 +111,61 @@
session.sendScheduled(message, scheduledDeliveryTime);
}
+ public ServerLargeMessage getCurrentChunk()
+ {
+ return currentlargeMessage;
+ }
+
+ public void setCurrentChunk(final ServerLargeMessage message)
+ {
+ currentlargeMessage = message;
+ }
+
+
+ public void sendChunk(final byte[] message, final int position, final int totalSize) throws Exception
+ {
+ currentlargeMessage.addBytes(message);
+ if (position + message.length > totalSize)
+ {
+ throw new MessagingException(MessagingException.ILLEGAL_STATE, "Invalid ChunkSize sent over the wire");
+ }
+ else if (position + message.length == totalSize)
+ {
+ System.out.println("Sending complete message now");
+ send(currentlargeMessage);
+ this.currentlargeMessage = null;
+ }
+ }
+
public void requestAndSendCredits() throws Exception
- {
- if (!waiting)
- {
- flowController.requestAndSendCredits(this, creditsToSend.get());
- }
- }
+ {
+ if (!waiting)
+ {
+ flowController.requestAndSendCredits(this, creditsToSend.get());
+ }
+ }
- public void sendCredits(final int credits) throws Exception
- {
- creditsToSend.addAndGet(-credits);
-
- Packet packet = new SessionProducerFlowCreditMessage(id, credits);
-
- channel.send( packet);
- }
-
- public void setWaiting(final boolean waiting)
- {
- this.waiting = waiting;
- }
-
- public boolean isWaiting()
- {
- return waiting;
- }
+ public void sendCredits(final int credits) throws Exception
+ {
+ creditsToSend.addAndGet(-credits);
+ Packet packet = new SessionProducerFlowCreditMessage(id, credits);
+ channel.send(packet);
+ }
+ public void setWaiting(final boolean waiting)
+ {
+ this.waiting = waiting;
+ }
+
+ public boolean isWaiting()
+ {
+ return waiting;
+ }
+
+
+
private void doFlowControl(final ServerMessage message) throws Exception
{
if (this.address != null)
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-10-09 03:09:55 UTC (rev 5093)
@@ -34,12 +34,14 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendManagementMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResponseMessage;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.core.security.CheckType;
import org.jboss.messaging.core.security.SecurityStore;
import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.MessagingServer;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.ServerConsumer;
+import org.jboss.messaging.core.server.ServerLargeMessage;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.ServerProducer;
import org.jboss.messaging.core.server.ServerSession;
@@ -86,8 +88,6 @@
// Attributes ----------------------------------------------------------------------------
- private final boolean trace = log.isTraceEnabled();
-
private final long id;
private final String username;
@@ -1092,7 +1092,7 @@
int initialCredits = flowController == null ? -1 : flowController.getInitialCredits(windowToUse, producer);
SimpleString groupId = null;
- if(autoGroupId)
+ if (autoGroupId)
{
groupId = simpleStringIdGenerator.generateID();
}
@@ -1169,6 +1169,44 @@
producers.get(producerID).send(message);
}
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.server.ServerSession#createLargeMessage(long, int, byte[])
+ */
+ public ServerLargeMessage createLargeMessageStorage(long producerID, byte[] header) throws Exception
+ {
+ ServerLargeMessage largeMessage = storageManager.createLargeMessageStorage(storageManager.generateUniqueID());
+
+ MessagingBuffer headerBuffer = new ByteBufferWrapper(ByteBuffer.wrap(header));
+
+ largeMessage.decodeProperties(headerBuffer);
+
+ ServerProducer producer = producers.get(producerID);
+
+ producer.setCurrentChunk(largeMessage);
+
+ return largeMessage;
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.server.ServerSession#getCurrentLargeMessage(long)
+ */
+ public ServerLargeMessage getCurrentLargeMessage(long producerID)
+ {
+ ServerProducer producer = producers.get(producerID);
+
+ return producer.getCurrentChunk();
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.server.ServerSession#sendCurrentLargeMessage(long)
+ */
+ public void clearCurrentLargeMessage(long producerID)
+ {
+ ServerProducer producer = producers.get(producerID);
+
+ producer.setCurrentChunk(null);
+ }
+
public void sendScheduledProducerMessage(final long producerID, final ServerMessage message, final long scheduledDeliveryTime) throws Exception
{
producers.get(producerID).sendScheduled(message, scheduledDeliveryTime);
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2008-10-09 03:09:55 UTC (rev 5093)
@@ -26,6 +26,7 @@
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_HASNEXTMESSAGE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_NEXTMESSAGE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_RESET;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CHUNK_SEND;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CLOSE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_COMMIT;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CONSUMER_CLOSE;
@@ -76,6 +77,7 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionProducerCloseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendChunkMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionScheduledSendMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendManagementMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
@@ -91,6 +93,7 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAStartMessage;
+import org.jboss.messaging.core.server.ServerLargeMessage;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.ServerSession;
@@ -149,7 +152,20 @@
send.getServerMessage().setMessageID(id);
}
}
+ else
+ if (type == SESS_CHUNK_SEND)
+ {
+ SessionSendChunkMessage send = (SessionSendChunkMessage)packet;
+ if (send.getHeader() != null && send.getMessageID() <= 0L)
+ {
+ // must generate message id here, so we know they are in sync
+ long id = storageManager.generateUniqueID();
+
+ send.setMessageID(id);
+ }
+ }
+
Packet response = null;
try
@@ -207,7 +223,10 @@
case SESS_CREATEPRODUCER:
{
SessionCreateProducerMessage request = (SessionCreateProducerMessage)packet;
- response = session.createProducer(request.getAddress(), request.getWindowSize(), request.getMaxRate(), request.isAutoGroupId());
+ response = session.createProducer(request.getAddress(),
+ request.getWindowSize(),
+ request.getMaxRate(),
+ request.isAutoGroupId());
break;
}
case SESS_PROCESSED:
@@ -229,7 +248,7 @@
case SESS_ROLLBACK:
{
session.rollback();
- //Rollback response is handled in the rollback() method
+ // Rollback response is handled in the rollback() method
break;
}
case SESS_XA_COMMIT:
@@ -370,6 +389,37 @@
}
break;
}
+ case SESS_CHUNK_SEND:
+ {
+ SessionSendChunkMessage message = (SessionSendChunkMessage)packet;
+
+ ServerLargeMessage largeMessage = null;
+
+ if (message.getHeader() != null)
+ {
+ largeMessage = session.createLargeMessageStorage(message.getTargetID(), message.getHeader());
+ largeMessage.setMessageID(message.getMessageID());
+ }
+ else
+ {
+ largeMessage = session.getCurrentLargeMessage(message.getTargetID());
+ }
+
+
+ largeMessage.addBytes(message.getBody());
+
+
+ if (!message.isContinues())
+ {
+ session.sendProducerMessage(message.getTargetID(), largeMessage);
+ }
+
+ if (message.isRequiresResponse())
+ {
+ response = new NullResponseMessage(false);
+ }
+ break;
+ }
case SESS_SCHEDULED_SEND:
{
SessionScheduledSendMessage message = (SessionScheduledSendMessage)packet;
Modified: branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/base/IntegrationTestBase.java
===================================================================
--- branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/base/IntegrationTestBase.java 2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/base/IntegrationTestBase.java 2008-10-09 03:09:55 UTC (rev 5093)
@@ -1,183 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-
-package org.jboss.messaging.tests.integration.base;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
-import org.jboss.messaging.core.config.Configuration;
-import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.config.impl.ConfigurationImpl;
-import org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory;
-import org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory;
-import org.jboss.messaging.core.remoting.impl.netty.NettyAcceptorFactory;
-import org.jboss.messaging.core.remoting.impl.netty.NettyConnectorFactory;
-import org.jboss.messaging.core.server.MessagingService;
-import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
-import org.jboss.messaging.core.settings.impl.QueueSettings;
-import org.jboss.messaging.jms.client.JBossBytesMessage;
-import org.jboss.messaging.jms.client.JBossTextMessage;
-import org.jboss.messaging.tests.util.UnitTestCase;
-
-/**
- *
- * Base class with basic utilities on starting up a basic server
- *
- * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
- *
- */
-public class IntegrationTestBase extends UnitTestCase
-{
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- protected static final String INVM_ACCEPTOR_FACTORY = InVMAcceptorFactory.class.getCanonicalName();
- protected static final String INVM_CONNECTOR_FACTORY = InVMConnectorFactory.class.getCanonicalName();
-
- protected static final String NETTY_ACCEPTOR_FACTORY = NettyAcceptorFactory.class.getCanonicalName();
- protected static final String NETTY_CONNECTOR_FACTORY = NettyConnectorFactory.class.getCanonicalName();
-
- protected String journalDir = System.getProperty("java.io.tmpdir", "/tmp") + "/integration-test/journal";
- protected String bindingsDir = System.getProperty("java.io.tmpdir", "/tmp") + "/integration-test/bindings";
- protected String pageDir = System.getProperty("java.io.tmpdir", "/tmp") + "/integration-test/page";
- protected MessagingService messagingService;
-
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- protected void clearData()
- {
- File file = new File(journalDir);
- File file2 = new File(bindingsDir);
- File file3 = new File(pageDir);
- deleteDirectory(file);
- file.mkdirs();
- deleteDirectory(file2);
- file2.mkdirs();
- deleteDirectory(file3);
- file3.mkdirs();
- }
-
-
- protected MessagingService createService(boolean realFiles, boolean netty, Configuration configuration, Map<String, QueueSettings> settings)
- {
- TransportConfiguration transportConfig = new TransportConfiguration(INVM_ACCEPTOR_FACTORY);
- configuration.getAcceptorConfigurations().add(transportConfig);
-
- if (netty)
- {
- configuration.getAcceptorConfigurations().add(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY));
- }
-
- MessagingService service;
-
- if (realFiles)
- {
- service = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
- }
- else
- {
- service = MessagingServiceImpl.newNullStorageMessagingServer(configuration);
- }
-
-
- for (Map.Entry<String, QueueSettings> setting: settings.entrySet())
- {
- service.getServer().getQueueSettingsRepository().addMatch(setting.getKey(), setting.getValue());
- }
-
-
- return service;
- }
-
- protected MessagingService createService(boolean realFiles)
- {
- return createService(realFiles, false, createDefaultConfig(), new HashMap<String, QueueSettings>());
- }
-
-
- protected Configuration createDefaultConfig()
- {
- Configuration configuration = new ConfigurationImpl();
- configuration.setSecurityEnabled(false);
- configuration.setJournalMinFiles(2);
- configuration.setPagingDirectory(pageDir);
-
- return configuration;
- }
-
-
- protected ClientSessionFactory createInVMFactory()
- {
- return new ClientSessionFactoryImpl(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
- }
-
- protected ClientSessionFactory createNettyFactory()
- {
- return new ClientSessionFactoryImpl(new TransportConfiguration(NETTY_CONNECTOR_FACTORY));
- }
-
- protected ClientMessage createTextMessage(ClientSession session, String s)
- {
- return createTextMessage(session, s, true);
- }
-
- protected ClientMessage createTextMessage(ClientSession session, String s, boolean durable)
- {
- ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE, durable, 0, System.currentTimeMillis(), (byte) 1);
- message.getBody().putString(s);
- message.getBody().flip();
- return message;
- }
-
- protected ClientMessage createBytesMessage(ClientSession session, byte[] b, boolean durable)
- {
- ClientMessage message = session.createClientMessage(JBossBytesMessage.TYPE, durable, 0, System.currentTimeMillis(), (byte) 1);
- message.getBody().putBytes(b);
- message.getBody().flip();
- return message;
- }
-
-
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Modified: branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/remoting/DestroyConsumerTest.java
===================================================================
--- branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/remoting/DestroyConsumerTest.java 2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/remoting/DestroyConsumerTest.java 2008-10-09 03:09:55 UTC (rev 5093)
@@ -32,10 +32,10 @@
import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.server.MessagingService;
import org.jboss.messaging.core.settings.impl.QueueSettings;
-import org.jboss.messaging.tests.integration.base.IntegrationTestBase;
+import org.jboss.messaging.tests.util.ServiceTestBase;
import org.jboss.messaging.util.SimpleString;
-public class DestroyConsumerTest extends IntegrationTestBase
+public class DestroyConsumerTest extends ServiceTestBase
{
// Constants -----------------------------------------------------
Modified: branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaRecoveryTest.java
===================================================================
--- branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaRecoveryTest.java 2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaRecoveryTest.java 2008-10-09 03:09:55 UTC (rev 5093)
@@ -55,6 +55,7 @@
private String journalDir = System.getProperty("java.io.tmpdir", "/tmp") + "/xa-recovery-test/journal";
private String bindingsDir = System.getProperty("java.io.tmpdir", "/tmp") + "/xa-recovery-test/bindings";
+ private String largeMessagesDir = System.getProperty("java.io.tmpdir", "/tmp") + "/xa-recovery-test/largemsg";
private String pageDir = System.getProperty("java.io.tmpdir", "/tmp") + "/xa-recovery-test/page";
private MessagingService messagingService;
private ClientSession clientSession;
@@ -83,7 +84,7 @@
TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
configuration.getAcceptorConfigurations().add(transportConfig);
- messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+ messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir, largeMessagesDir);
//start the server
messagingService.start();
//then we create a client as normal
@@ -1188,7 +1189,7 @@
clientSession = null;
messagingService.stop();
messagingService = null;
- messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+ messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir, largeMessagesDir);
addSettings();
Modified: branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/stress/paging/MultipleDestinationPagingTest.java
===================================================================
--- branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/stress/paging/MultipleDestinationPagingTest.java 2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/stress/paging/MultipleDestinationPagingTest.java 2008-10-09 03:09:55 UTC (rev 5093)
@@ -23,7 +23,7 @@
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.server.MessagingService;
import org.jboss.messaging.core.settings.impl.QueueSettings;
-import org.jboss.messaging.tests.integration.base.IntegrationTestBase;
+import org.jboss.messaging.tests.util.ServiceTestBase;
import org.jboss.messaging.util.SimpleString;
/**
@@ -31,7 +31,7 @@
*
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
*/
-public class MultipleDestinationPagingTest extends IntegrationTestBase
+public class MultipleDestinationPagingTest extends ServiceTestBase
{
// Constants -----------------------------------------------------
Modified: branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/timing/core/client/ScheduledMessageTest.java
===================================================================
--- branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/timing/core/client/ScheduledMessageTest.java 2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/timing/core/client/ScheduledMessageTest.java 2008-10-09 03:09:55 UTC (rev 5093)
@@ -103,7 +103,7 @@
TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
configuration.getAcceptorConfigurations().add(transportConfig);
- messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+ messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir, null);
//start the server
messagingService.start();
//then we create a client as normal
@@ -124,7 +124,7 @@
session.close();
messagingService.stop();
messagingService = null;
- messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+ messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir, null);
messagingService.start();
sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
@@ -148,7 +148,7 @@
Xid xid = new XidImpl("xa1".getBytes(), 1, new GUID().toString().getBytes());
TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
configuration.getAcceptorConfigurations().add(transportConfig);
- messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+ messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir, null);
//start the server
messagingService.start();
//then we create a client as normal
@@ -171,7 +171,7 @@
session.close();
messagingService.stop();
messagingService = null;
- messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+ messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir, null);
messagingService.start();
sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
@@ -196,7 +196,7 @@
TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
configuration.getAcceptorConfigurations().add(transportConfig);
configuration.setPagingMaxGlobalSizeBytes(0);
- messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+ messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir, null);
//start the server
messagingService.start();
//then we create a client as normal
Modified: branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/persistence/impl/journal/JournalStorageManagerTest.java
===================================================================
--- branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/persistence/impl/journal/JournalStorageManagerTest.java 2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/persistence/impl/journal/JournalStorageManagerTest.java 2008-10-09 03:09:55 UTC (rev 5093)
@@ -76,7 +76,7 @@
Journal messageJournal = EasyMock.createStrictMock(Journal.class);
Journal bindingsJournal = EasyMock.createStrictMock(Journal.class);
- JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal);
+ JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal, null);
ServerMessage msg = EasyMock.createStrictMock(ServerMessage.class);
long msgID = 1021092;
@@ -92,7 +92,7 @@
Journal messageJournal = EasyMock.createStrictMock(Journal.class);
Journal bindingsJournal = EasyMock.createStrictMock(Journal.class);
- JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal);
+ JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal, null);
final long queueID = 1210981;
final long messageID = 101921092;
@@ -110,7 +110,7 @@
Journal messageJournal = EasyMock.createStrictMock(Journal.class);
Journal bindingsJournal = EasyMock.createStrictMock(Journal.class);
- JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal);
+ JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal, null);
final long messageID = 101921092;
@@ -125,7 +125,7 @@
Journal messageJournal = EasyMock.createStrictMock(Journal.class);
Journal bindingsJournal = EasyMock.createStrictMock(Journal.class);
- JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal);
+ JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal, null);
ServerMessage msg = EasyMock.createStrictMock(ServerMessage.class);
long msgID = 1021092;
@@ -142,7 +142,7 @@
Journal messageJournal = EasyMock.createStrictMock(Journal.class);
Journal bindingsJournal = EasyMock.createStrictMock(Journal.class);
- JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal);
+ JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal, null);
final long queueID = 1210981;
final long messageID = 101921092;
@@ -162,7 +162,7 @@
Journal messageJournal = EasyMock.createStrictMock(Journal.class);
Journal bindingsJournal = EasyMock.createStrictMock(Journal.class);
- JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal);
+ JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal, null);
final long messageID = 101921092;
final long txID = 1209373;
@@ -178,7 +178,7 @@
Journal messageJournal = EasyMock.createStrictMock(Journal.class);
Journal bindingsJournal = EasyMock.createStrictMock(Journal.class);
- JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal);
+ JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal, null);
final long txID = 1209373;
@@ -196,7 +196,7 @@
Journal messageJournal = EasyMock.createStrictMock(Journal.class);
Journal bindingsJournal = EasyMock.createStrictMock(Journal.class);
- JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal);
+ JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal, null);
final long txID = 1209373;
@@ -211,7 +211,7 @@
Journal messageJournal = EasyMock.createStrictMock(Journal.class);
Journal bindingsJournal = EasyMock.createStrictMock(Journal.class);
- JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal);
+ JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal, null);
final long txID = 1209373;
@@ -226,7 +226,7 @@
Journal messageJournal = EasyMock.createStrictMock(Journal.class);
Journal bindingsJournal = EasyMock.createStrictMock(Journal.class);
- JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal);
+ JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal, null);
final long msgID = 120912901;
final long queueID = 1283743;
@@ -254,7 +254,7 @@
Journal messageJournal = EasyMock.createStrictMock(Journal.class);
Journal bindingsJournal = EasyMock.createStrictMock(Journal.class);
- JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal);
+ JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal, null);
messageJournal.load((List<RecordInfo>)EasyMock.anyObject(), (List<PreparedTransactionInfo>)EasyMock.anyObject());
@@ -422,7 +422,7 @@
Journal messageJournal = EasyMock.createStrictMock(Journal.class);
Journal bindingsJournal = EasyMock.createStrictMock(Journal.class);
- JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal);
+ JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal, null);
Queue queue = EasyMock.createStrictMock(Queue.class);
SimpleString queueName = new SimpleString("saiohsiudh");
@@ -493,7 +493,7 @@
Journal messageJournal = EasyMock.createStrictMock(Journal.class);
Journal bindingsJournal = EasyMock.createStrictMock(Journal.class);
- JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal);
+ JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal, null);
Binding binding = EasyMock.createStrictMock(Binding.class);
Queue queue = EasyMock.createStrictMock(Queue.class);
@@ -515,7 +515,7 @@
Journal messageJournal = EasyMock.createStrictMock(Journal.class);
Journal bindingsJournal = EasyMock.createStrictMock(Journal.class);
- JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal);
+ JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal, null);
Binding binding = EasyMock.createStrictMock(Binding.class);
Queue queue = EasyMock.createStrictMock(Queue.class);
@@ -543,7 +543,7 @@
Journal messageJournal = EasyMock.createStrictMock(Journal.class);
Journal bindingsJournal = EasyMock.createStrictMock(Journal.class);
- JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal);
+ JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal, null);
SimpleString dest = new SimpleString("oaskokas");
@@ -667,7 +667,7 @@
Journal messageJournal = EasyMock.createStrictMock(Journal.class);
Journal bindingsJournal = EasyMock.createStrictMock(Journal.class);
- JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal);
+ JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal, null);
bindingsJournal.load((List<RecordInfo>)EasyMock.anyObject(), (List<PreparedTransactionInfo>)EasyMock.anyObject());
@@ -734,7 +734,7 @@
Journal messageJournal = EasyMock.createStrictMock(Journal.class);
Journal bindingsJournal = EasyMock.createStrictMock(Journal.class);
- JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal);
+ JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal, null);
assertFalse(jsm.isStarted());
bindingsJournal.start();
@@ -788,7 +788,7 @@
Journal messageJournal = EasyMock.createStrictMock(Journal.class);
Journal bindingsJournal = EasyMock.createStrictMock(Journal.class);
- JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal);
+ JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal, null);
long id = jsm.generateUniqueID();
Added: branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
===================================================================
--- branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java (rev 0)
+++ branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java 2008-10-09 03:09:55 UTC (rev 5093)
@@ -0,0 +1,216 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.util;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory;
+import org.jboss.messaging.core.remoting.impl.netty.NettyAcceptorFactory;
+import org.jboss.messaging.core.remoting.impl.netty.NettyConnectorFactory;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
+import org.jboss.messaging.jms.client.JBossBytesMessage;
+import org.jboss.messaging.jms.client.JBossTextMessage;
+
+/**
+ *
+ * Base class with basic utilities on starting up a basic server
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
+public class ServiceTestBase extends UnitTestCase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ protected boolean realFiles = false;
+
+ protected static final String INVM_ACCEPTOR_FACTORY = InVMAcceptorFactory.class.getCanonicalName();
+
+ protected static final String INVM_CONNECTOR_FACTORY = InVMConnectorFactory.class.getCanonicalName();
+
+ protected static final String NETTY_ACCEPTOR_FACTORY = NettyAcceptorFactory.class.getCanonicalName();
+
+ protected static final String NETTY_CONNECTOR_FACTORY = NettyConnectorFactory.class.getCanonicalName();
+
+ protected String journalDir = System.getProperty("java.io.tmpdir", "/tmp") + "/unit-test/journal";
+
+ protected String bindingsDir = System.getProperty("java.io.tmpdir", "/tmp") + "/unit-test/bindings";
+
+ protected String pageDir = System.getProperty("java.io.tmpdir", "/tmp") + "/unit-test/page";
+
+ protected String largeMessagesDir = System.getProperty("java.io.tmpdir", "/tmp") + "/unit-test/large-msg";
+
+ protected MessagingService messagingService;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ if (realFiles)
+ {
+ clearData();
+ }
+ messagingService = createService(realFiles);
+ messagingService.start();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ messagingService.stop();
+ }
+
+ protected void clearData()
+ {
+ File file = new File(journalDir);
+ File file2 = new File(bindingsDir);
+ File file3 = new File(pageDir);
+ File file4 = new File(largeMessagesDir);
+ deleteDirectory(file);
+ file.mkdirs();
+ deleteDirectory(file2);
+ file2.mkdirs();
+ deleteDirectory(file3);
+ file3.mkdirs();
+ deleteDirectory(file4);
+ file4.mkdirs();
+ }
+
+ protected MessagingService createService(final boolean realFiles,
+ final boolean netty,
+ final Configuration configuration,
+ final Map<String, QueueSettings> settings)
+ {
+ TransportConfiguration transportConfig = new TransportConfiguration(INVM_ACCEPTOR_FACTORY);
+ configuration.getAcceptorConfigurations().add(transportConfig);
+
+ if (netty)
+ {
+ configuration.getAcceptorConfigurations().add(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY));
+ }
+
+ MessagingService service;
+
+ if (realFiles)
+ {
+ service = MessagingServiceImpl.newNioStorageMessagingServer(configuration,
+ journalDir,
+ bindingsDir,
+ largeMessagesDir);
+ }
+ else
+ {
+ service = MessagingServiceImpl.newNullStorageMessagingServer(configuration);
+ }
+
+ for (Map.Entry<String, QueueSettings> setting : settings.entrySet())
+ {
+ service.getServer().getQueueSettingsRepository().addMatch(setting.getKey(), setting.getValue());
+ }
+
+ return service;
+ }
+
+ protected MessagingService createService(final boolean realFiles)
+ {
+ return createService(realFiles, false, createDefaultConfig(), new HashMap<String, QueueSettings>());
+ }
+
+ protected Configuration createDefaultConfig()
+ {
+ Configuration configuration = new ConfigurationImpl();
+ configuration.setSecurityEnabled(false);
+ configuration.setJournalMinFiles(2);
+ configuration.setPagingDirectory(pageDir);
+
+ return configuration;
+ }
+
+ protected ClientSessionFactory createInVMFactory()
+ {
+ return new ClientSessionFactoryImpl(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+ }
+
+ protected ClientSessionFactory createNettyFactory()
+ {
+ return new ClientSessionFactoryImpl(new TransportConfiguration(NETTY_CONNECTOR_FACTORY));
+ }
+
+ protected ClientMessage createTextMessage(final ClientSession session, final String s)
+ {
+ return createTextMessage(session, s, true);
+ }
+
+ protected ClientMessage createTextMessage(final ClientSession session, final String s, final boolean durable)
+ {
+ ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE,
+ durable,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.getBody().putString(s);
+ message.getBody().flip();
+ return message;
+ }
+
+ protected ClientMessage createBytesMessage(final ClientSession session, final byte[] b, final boolean durable)
+ {
+ ClientMessage message = session.createClientMessage(JBossBytesMessage.TYPE,
+ durable,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.getBody().putBytes(b);
+ message.getBody().flip();
+ return message;
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
More information about the jboss-cvs-commits
mailing list