JBoss hornetq SVN: r8251 - trunk/tests/src/org/hornetq/tests/integration/cluster/distribution.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-11-09 04:46:10 -0500 (Mon, 09 Nov 2009)
New Revision: 8251
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
Log:
fixed cluster tests setup
* use the correct transport config params for invm acceptor/connector
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2009-11-08 04:14:55 UTC (rev 8250)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2009-11-09 09:46:10 UTC (rev 8251)
@@ -1267,18 +1267,17 @@
Map<String, Object> params = generateParams(node, netty);
- TransportConfiguration invmtc = new TransportConfiguration(INVM_ACCEPTOR_FACTORY, params);
- configuration.getAcceptorConfigurations().add(invmtc);
-
if (netty)
{
TransportConfiguration nettytc = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params);
configuration.getAcceptorConfigurations().add(nettytc);
}
+ else
+ {
+ TransportConfiguration invmtc = new TransportConfiguration(INVM_ACCEPTOR_FACTORY, params);
+ configuration.getAcceptorConfigurations().add(invmtc);
+ }
- TransportConfiguration invmtc_c = new TransportConfiguration(INVM_CONNECTOR_FACTORY, params);
- configuration.getConnectorConfigurations().put(invmtc_c.getName(), invmtc_c);
-
List<Pair<String, String>> connectorPairs = new ArrayList<Pair<String, String>>();
if (netty)
@@ -1291,6 +1290,9 @@
}
else
{
+ TransportConfiguration invmtc_c = new TransportConfiguration(INVM_CONNECTOR_FACTORY, params);
+ configuration.getConnectorConfigurations().put(invmtc_c.getName(), invmtc_c);
+
connectorPairs.add(new Pair<String, String>(invmtc_c.getName(), invmBackuptc == null ? null
: invmBackuptc.getName()));
}
15 years, 1 month
JBoss hornetq SVN: r8250 - trunk/tests/src/org/hornetq/tests/integration/cluster/failover.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-07 23:14:55 -0500 (Sat, 07 Nov 2009)
New Revision: 8250
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-207 - fixing test
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java 2009-11-07 00:49:27 UTC (rev 8249)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java 2009-11-08 04:14:55 UTC (rev 8250)
@@ -58,22 +58,28 @@
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
-
+
public void testPage() throws Exception
{
- internalTestPage(false);
+ internalTestPage(false, false);
}
-
- public void testPageFailBeforeconsume() throws Exception
+
+ public void testPageTransactioned() throws Exception
{
- internalTestPage(true);
+ internalTestPage(true, false);
}
-
- public void internalTestPage(final boolean failBeforeConsume) throws Exception
+
+ public void testPageTransactionedFailBeforeconsume() throws Exception
{
+ internalTestPage(true, true);
+ }
+
+ public void internalTestPage(final boolean transacted, final boolean failBeforeConsume) throws Exception
+ {
ClientSessionFactoryInternal factory = getSessionFactory();
factory.setBlockOnPersistentSend(true);
- ClientSession session = factory.createSession(true, true, 0);
+ factory.setBlockOnAcknowledge(true);
+ ClientSession session = factory.createSession(!transacted, !transacted, 0);
try
{
@@ -88,11 +94,11 @@
{
latch.countDown();
}
-
+
public void beforeReconnect(HornetQException exception)
- {
+ {
}
-
+
}
session.addFailureListener(new MyListener());
@@ -103,7 +109,7 @@
for (int i = 0; i < TOTAL_MESSAGES; i++)
{
- if (i % 10 == 0)
+ if (transacted && i % 10 == 0)
{
session.commit();
}
@@ -114,7 +120,7 @@
}
session.commit();
-
+
if (failBeforeConsume)
{
failSession(session, latch);
@@ -131,7 +137,7 @@
ClientMessage msg = cons.receive(20000);
assertNotNull(msg);
msg.acknowledge();
- if (i % 10 == 0)
+ if (transacted && i % 10 == 0)
{
session.commit();
}
@@ -175,7 +181,6 @@
}
}
-
/**
* @param session
* @param latch
@@ -245,4 +250,5 @@
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
+
}
15 years, 1 month
JBoss hornetq SVN: r8249 - trunk/tests/src/org/hornetq/tests/integration/client.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-06 19:49:27 -0500 (Fri, 06 Nov 2009)
New Revision: 8249
Modified:
trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java
Log:
minor tweaks
Modified: trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java 2009-11-07 00:33:24 UTC (rev 8248)
+++ trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java 2009-11-07 00:49:27 UTC (rev 8249)
@@ -22,7 +22,6 @@
import org.hornetq.core.client.ClientSessionFactory;
import org.hornetq.core.client.MessageHandler;
import org.hornetq.core.client.impl.ClientMessageImpl;
-import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.impl.TestSupportPageStore;
import org.hornetq.core.server.HornetQServer;
@@ -181,10 +180,8 @@
{
final SimpleString address = new SimpleString("testaddress");
- Configuration config = super.createDefaultConfig(isNetty());
+ HornetQServer server = createServer(realFiles, isNetty());
- HornetQServer server = createServer(realFiles, config);
-
AddressSettings addressSettings = new AddressSettings();
addressSettings.setMaxSizeBytes(maxSize);
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
@@ -194,17 +191,8 @@
server.start();
- ClientSessionFactory sf;
+ ClientSessionFactory sf = createFactory(isNetty());
- if (isNetty())
- {
- sf = createNettyFactory();
- }
- else
- {
- sf = createInVMFactory();
- }
-
sf.setProducerWindowSize(producerWindowSize);
sf.setConsumerWindowSize(consumerWindowSize);
sf.setAckBatchSize(ackBatchSize);
@@ -349,10 +337,8 @@
{
final SimpleString address = new SimpleString("testaddress");
- Configuration config = super.createDefaultConfig(false);
+ HornetQServer server = createServer(false, isNetty());
- HornetQServer server = createServer(false, config);
-
AddressSettings addressSettings = new AddressSettings();
addressSettings.setMaxSizeBytes(1024);
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
@@ -362,7 +348,7 @@
server.start();
- ClientSessionFactory sf = createInVMFactory();
+ ClientSessionFactory sf = createFactory(isNetty());
sf.setProducerWindowSize(1024);
sf.setConsumerWindowSize(1024);
@@ -454,10 +440,8 @@
{
final SimpleString address = new SimpleString("testaddress");
- Configuration config = super.createDefaultConfig(false);
+ HornetQServer server = createServer(false, isNetty());
- HornetQServer server = createServer(false, config);
-
AddressSettings addressSettings = new AddressSettings();
addressSettings.setMaxSizeBytes(1024);
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
@@ -467,7 +451,7 @@
server.start();
- ClientSessionFactory sf = createInVMFactory();
+ ClientSessionFactory sf = createFactory(isNetty());
sf.setProducerWindowSize(1024);
sf.setConsumerWindowSize(1024);
@@ -530,10 +514,8 @@
{
final SimpleString address = new SimpleString("testaddress");
- Configuration config = super.createDefaultConfig(false);
+ HornetQServer server = createServer(false, isNetty());
- HornetQServer server = createServer(false, config);
-
AddressSettings addressSettings = new AddressSettings();
addressSettings.setMaxSizeBytes(1024);
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
@@ -543,7 +525,7 @@
server.start();
- ClientSessionFactory sf = createInVMFactory();
+ ClientSessionFactory sf = createFactory(isNetty());
sf.setProducerWindowSize(1024);
sf.setConsumerWindowSize(1024);
@@ -624,10 +606,8 @@
{
final SimpleString address = new SimpleString("testaddress");
- Configuration config = super.createDefaultConfig(false);
+ HornetQServer server = createServer(false, isNetty());
- HornetQServer server = createServer(false, config);
-
AddressSettings addressSettings = new AddressSettings();
addressSettings.setMaxSizeBytes(1024);
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
@@ -637,7 +617,7 @@
server.start();
- ClientSessionFactory sf = createInVMFactory();
+ ClientSessionFactory sf = createFactory(isNetty());
sf.setProducerWindowSize(1024);
sf.setConsumerWindowSize(1024);
@@ -697,10 +677,8 @@
{
final SimpleString address = new SimpleString("testaddress");
- Configuration config = super.createDefaultConfig(false);
+ HornetQServer server = createServer(false, isNetty());
- HornetQServer server = createServer(false, config);
-
AddressSettings addressSettings = new AddressSettings();
addressSettings.setMaxSizeBytes(1024);
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
@@ -710,7 +688,7 @@
server.start();
- ClientSessionFactory sf = createInVMFactory();
+ ClientSessionFactory sf = createFactory(isNetty());
sf.setProducerWindowSize(1024);
sf.setConsumerWindowSize(1024);
15 years, 1 month
JBoss hornetq SVN: r8248 - in trunk: src/main/org/hornetq/core/message and 10 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-06 19:33:24 -0500 (Fri, 06 Nov 2009)
New Revision: 8248
Added:
trunk/src/main/org/hornetq/core/message/BodyEncoder.java
trunk/tests/src/org/hornetq/tests/integration/largemessage/ServerLargeMessageTest.java
Removed:
trunk/src/main/org/hornetq/core/message/LargeMessageEncodingContext.java
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
trunk/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java
trunk/src/main/org/hornetq/core/message/Message.java
trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/FileLargeServerMessage.java
trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
trunk/src/main/org/hornetq/core/server/LargeServerMessage.java
trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/AIOJournalImplTest.java
trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/FakeJournalImplTest.java
trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/JournalImplTestUnit.java
trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/NIOJournalImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-211 - ClientProducer to send LargeMessages and few other tweaks
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java 2009-11-06 20:38:14 UTC (rev 8247)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java 2009-11-07 00:33:24 UTC (rev 8248)
@@ -15,10 +15,12 @@
import java.io.IOException;
import java.io.OutputStream;
+import java.nio.ByteBuffer;
import org.hornetq.core.buffers.ChannelBuffers;
import org.hornetq.core.client.LargeMessageBuffer;
import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.message.BodyEncoder;
import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.remoting.spi.HornetQBuffer;
import org.hornetq.utils.SimpleString;
@@ -228,5 +230,8 @@
"]";
}
-
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#getBodyEncoder()
+ */
+
}
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2009-11-06 20:38:14 UTC (rev 8247)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2009-11-07 00:33:24 UTC (rev 8248)
@@ -17,12 +17,11 @@
import java.io.IOException;
import java.io.InputStream;
-import java.nio.ByteBuffer;
import org.hornetq.core.buffers.ChannelBuffers;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.message.LargeMessageEncodingContext;
+import org.hornetq.core.message.BodyEncoder;
import org.hornetq.core.message.Message;
import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.remoting.Channel;
@@ -347,47 +346,59 @@
{
final long bodySize = msg.getLargeBodySize();
- LargeMessageEncodingContext context = new DecodingContext(msg);
+ BodyEncoder context = msg.getBodyEncoder();
- for (int pos = 0; pos < bodySize;)
+ context.open();
+ try
{
- final boolean lastChunk;
- final int chunkLength = Math.min((int)(bodySize - pos), minLargeMessageSize);
+ for (int pos = 0; pos < bodySize;)
+ {
+ final boolean lastChunk;
- final HornetQBuffer bodyBuffer = ChannelBuffers.buffer(chunkLength);
+ final int chunkLength = Math.min((int)(bodySize - pos), minLargeMessageSize);
- msg.encodeBody(bodyBuffer, context, chunkLength);
+ final HornetQBuffer bodyBuffer = ChannelBuffers.buffer(chunkLength);
- pos += chunkLength;
+ context.encode(bodyBuffer, chunkLength);
- lastChunk = pos >= bodySize;
+ pos += chunkLength;
- final SessionSendContinuationMessage chunk = new SessionSendContinuationMessage(bodyBuffer.array(),
- !lastChunk,
- lastChunk && sendBlocking);
+ lastChunk = pos >= bodySize;
- if (sendBlocking && lastChunk)
- {
- // When sending it blocking, only the last chunk will be blocking.
- channel.sendBlocking(chunk);
- }
- else
- {
- channel.send(chunk);
- }
+ final SessionSendContinuationMessage chunk = new SessionSendContinuationMessage(bodyBuffer.array(),
+ !lastChunk,
+ lastChunk && sendBlocking);
- try
- {
- credits.acquireCredits(chunk.getRequiredBufferSize());
+ if (sendBlocking && lastChunk)
+ {
+ // When sending it blocking, only the last chunk will be blocking.
+ channel.sendBlocking(chunk);
+ }
+ else
+ {
+ channel.send(chunk);
+ }
+
+ try
+ {
+ credits.acquireCredits(chunk.getRequiredBufferSize());
+ }
+ catch (InterruptedException e)
+ {
+ }
}
- catch (InterruptedException e)
- {
- }
}
+ finally
+ {
+ context.close();
+ }
}
/**
+ * TODO: This method could be eliminated and
+ * combined with {@link ClientProducerImpl#largeMessageSendBuffered(boolean, Message, ClientProducerCredits)}.
+ * All that's needed for this is ClientMessage returning the proper BodyEncoder for streamed
* @param sendBlocking
* @param input
* @throws HornetQException
@@ -477,35 +488,4 @@
}
// Inner Classes --------------------------------------------------------------------------------
- class DecodingContext implements LargeMessageEncodingContext
- {
- private final Message message;
-
- private int lastPos = 0;
-
- public DecodingContext(Message message)
- {
- this.message = message;
- }
-
- public void open() throws Exception
- {
- }
-
- public void close() throws Exception
- {
- }
-
- public int write(ByteBuffer bufferRead) throws Exception
- {
- return -1;
- }
-
- public int write(HornetQBuffer bufferOut, int size)
- {
- bufferOut.writeBytes(message.getBody(), lastPos, size);
- lastPos += size;
- return size;
- }
- }
}
Modified: trunk/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java 2009-11-06 20:38:14 UTC (rev 8247)
+++ trunk/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java 2009-11-07 00:33:24 UTC (rev 8248)
@@ -58,7 +58,7 @@
private final LinkedBlockingQueue<SessionReceiveContinuationMessage> packets = new LinkedBlockingQueue<SessionReceiveContinuationMessage>();
- private SessionReceiveContinuationMessage currentPacket = null;
+ private volatile SessionReceiveContinuationMessage currentPacket = null;
private final long totalSize;
Copied: trunk/src/main/org/hornetq/core/message/BodyEncoder.java (from rev 8244, trunk/src/main/org/hornetq/core/message/LargeMessageEncodingContext.java)
===================================================================
--- trunk/src/main/org/hornetq/core/message/BodyEncoder.java (rev 0)
+++ trunk/src/main/org/hornetq/core/message/BodyEncoder.java 2009-11-07 00:33:24 UTC (rev 8248)
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+
+package org.hornetq.core.message;
+
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Class used to encode message body into buffers.
+ * Used to send large streams over the wire
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ * Created Nov 2, 2009
+ */
+public interface BodyEncoder
+{
+ void open() throws HornetQException;
+
+ void close() throws HornetQException;
+
+ int encode(ByteBuffer bufferRead) throws HornetQException;
+
+ int encode(HornetQBuffer bufferOut, int size) throws HornetQException;
+}
Deleted: trunk/src/main/org/hornetq/core/message/LargeMessageEncodingContext.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/LargeMessageEncodingContext.java 2009-11-06 20:38:14 UTC (rev 8247)
+++ trunk/src/main/org/hornetq/core/message/LargeMessageEncodingContext.java 2009-11-07 00:33:24 UTC (rev 8248)
@@ -1,20 +0,0 @@
-package org.hornetq.core.message;
-
-import org.hornetq.core.remoting.spi.HornetQBuffer;
-
-import java.nio.ByteBuffer;
-
-/**
- * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
- * Created Nov 2, 2009
- */
-public interface LargeMessageEncodingContext
-{
- void open() throws Exception;
-
- void close() throws Exception;
-
- int write(ByteBuffer bufferRead) throws Exception;
-
- int write(HornetQBuffer bufferOut, int size);
-}
Modified: trunk/src/main/org/hornetq/core/message/Message.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/Message.java 2009-11-06 20:38:14 UTC (rev 8247)
+++ trunk/src/main/org/hornetq/core/message/Message.java 2009-11-07 00:33:24 UTC (rev 8248)
@@ -78,10 +78,10 @@
boolean isLargeMessage();
long getLargeBodySize();
+
+ /** Used to encode Body over the wire when using large messages */
+ BodyEncoder getBodyEncoder();
- // Used on Message chunk
- void encodeBody(final HornetQBuffer bufferOut, LargeMessageEncodingContext context, int size);
-
/** Set the InputStream used on a message that will be sent over a producer */
void setBodyInputStream(InputStream stream);
Modified: trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java 2009-11-06 20:38:14 UTC (rev 8247)
+++ trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java 2009-11-07 00:33:24 UTC (rev 8248)
@@ -19,13 +19,15 @@
import static org.hornetq.utils.DataConstants.SIZE_LONG;
import java.io.InputStream;
+import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.hornetq.core.buffers.ChannelBuffers;
+import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.message.LargeMessageEncodingContext;
+import org.hornetq.core.message.BodyEncoder;
import org.hornetq.core.message.Message;
import org.hornetq.core.message.PropertyConversionException;
import org.hornetq.core.remoting.spi.HornetQBuffer;
@@ -214,12 +216,6 @@
buffer.writeBytes(localBody.array(), 0, localBody.writerIndex());
}
- // Used on Message chunk side
- public void encodeBody(final HornetQBuffer bufferOut, final LargeMessageEncodingContext context, final int size)
- {
- context.write(bufferOut, size);
- }
-
public void decode(final HornetQBuffer buffer)
{
decodeHeadersAndProperties(buffer);
@@ -663,6 +659,11 @@
{
this.body = body;
}
+
+ public BodyEncoder getBodyEncoder()
+ {
+ return new DecodingContext();
+ }
// Public --------------------------------------------------------
@@ -673,4 +674,36 @@
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
+
+
+ class DecodingContext implements BodyEncoder
+ {
+ private int lastPos = 0;
+
+ public DecodingContext()
+ {
+ }
+
+ public void open()
+ {
+ }
+
+ public void close()
+ {
+ }
+
+ public int encode(ByteBuffer bufferRead) throws HornetQException
+ {
+ HornetQBuffer buffer = ChannelBuffers.wrappedBuffer(bufferRead);
+ return encode(buffer, bufferRead.capacity());
+ }
+
+ public int encode(HornetQBuffer bufferOut, int size)
+ {
+ bufferOut.writeBytes(getBody(), lastPos, size);
+ lastPos += size;
+ return size;
+ }
+ }
+
}
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/FileLargeServerMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/FileLargeServerMessage.java 2009-11-06 20:38:14 UTC (rev 8247)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/FileLargeServerMessage.java 2009-11-07 00:33:24 UTC (rev 8248)
@@ -15,9 +15,11 @@
import static org.hornetq.utils.DataConstants.SIZE_INT;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;
+import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.PagingStore;
@@ -25,7 +27,7 @@
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.ServerMessage;
-import org.hornetq.core.message.LargeMessageEncodingContext;
+import org.hornetq.core.message.BodyEncoder;
import org.hornetq.core.server.impl.ServerMessageImpl;
/**
@@ -55,7 +57,7 @@
private SequentialFile file;
private long bodySize = -1;
-
+
private final AtomicInteger delayDeletionCount = new AtomicInteger(0);
// Static --------------------------------------------------------
@@ -64,7 +66,7 @@
public FileLargeServerMessage(final JournalStorageManager storageManager)
{
- this.storageManager = storageManager;
+ this.storageManager = storageManager;
}
/**
@@ -72,16 +74,14 @@
* @param copy
* @param fileCopy
*/
- private FileLargeServerMessage(final FileLargeServerMessage copy,
- final SequentialFile fileCopy,
- final long newID)
+ private FileLargeServerMessage(final FileLargeServerMessage copy, final SequentialFile fileCopy, final long newID)
{
super(copy);
this.linkMessage = copy;
storageManager = copy.storageManager;
file = fileCopy;
bodySize = copy.bodySize;
- setMessageID(newID);
+ setMessageID(newID);
}
// Public --------------------------------------------------------
@@ -92,7 +92,7 @@
public synchronized void addBytes(final byte[] bytes) throws Exception
{
validateFile();
-
+
if (!file.isOpen())
{
file.open();
@@ -103,14 +103,14 @@
bodySize += bytes.length;
}
- public void encodeBody(final HornetQBuffer bufferOut, LargeMessageEncodingContext context, int size)
+ public void encodeBody(final HornetQBuffer bufferOut, BodyEncoder context, int size)
{
try
{
// This could maybe be optimized (maybe reading directly into bufferOut)
ByteBuffer bufferRead = ByteBuffer.allocate(size);
- int bytesRead = context.write(bufferRead);
+ int bytesRead = context.encode(bufferRead);
bufferRead.flip();
@@ -188,13 +188,13 @@
}
}
- public LargeMessageEncodingContext createNewContext()
+ public BodyEncoder getBodyEncoder()
{
return new DecodingContext();
}
private void checkDelete() throws Exception
- {
+ {
if (getRefCount() <= 0)
{
if (linkMessage != null)
@@ -301,9 +301,9 @@
SequentialFile newfile = storageManager.createFileForLargeMessage(idToUse, durable);
ServerMessage newMessage = new FileLargeServerMessage(linkMessage == null ? this
- : (FileLargeServerMessage)linkMessage,
- newfile,
- newID);
+ : (FileLargeServerMessage)linkMessage,
+ newfile,
+ newID);
return newMessage;
}
@@ -334,7 +334,7 @@
{
throw new RuntimeException("MessageID not set on LargeMessage");
}
-
+
file = storageManager.createFileForLargeMessage(getMessageID(), durable);
file.open();
@@ -372,29 +372,62 @@
// Inner classes -------------------------------------------------
- class DecodingContext implements LargeMessageEncodingContext
+ class DecodingContext implements BodyEncoder
{
private SequentialFile cFile;
-
- public void open() throws Exception
+
+ public void open() throws HornetQException
{
- cFile = file.copy();
- cFile.open();
+ try
+ {
+ cFile = file.copy();
+ cFile.open();
+ }
+ catch (Exception e)
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, e.getMessage(), e);
+ }
}
- public void close() throws Exception
+ public void close() throws HornetQException
{
- cFile.close();
+ try
+ {
+ cFile.close();
+ }
+ catch (Exception e)
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, e.getMessage(), e);
+ }
}
- public int write(ByteBuffer bufferRead) throws Exception
+ public int encode(ByteBuffer bufferRead) throws HornetQException
{
- return cFile.read(bufferRead);
+ try
+ {
+ return cFile.read(bufferRead);
+ }
+ catch (Exception e)
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, e.getMessage(), e);
+ }
}
- public int write(HornetQBuffer bufferOut, int size)
+ public int encode(HornetQBuffer bufferOut, int size) throws HornetQException
{
- return -1;
+ // This could maybe be optimized (maybe reading directly into bufferOut)
+ ByteBuffer bufferRead = ByteBuffer.allocate(size);
+
+ int bytesRead = encode(bufferRead);
+
+ bufferRead.flip();
+
+ if (bytesRead > 0)
+ {
+ bufferOut.writeBytes(bufferRead.array(), 0, bytesRead);
+ }
+
+ return bytesRead;
}
}
}
Modified: trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java 2009-11-06 20:38:14 UTC (rev 8247)
+++ trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java 2009-11-07 00:33:24 UTC (rev 8248)
@@ -16,11 +16,8 @@
import org.hornetq.core.buffers.ChannelBuffers;
import org.hornetq.core.remoting.spi.HornetQBuffer;
import org.hornetq.core.server.LargeServerMessage;
-import org.hornetq.core.message.LargeMessageEncodingContext;
import org.hornetq.core.server.impl.ServerMessageImpl;
-import java.nio.ByteBuffer;
-
/**
* A NullStorageLargeServerMessage
*
@@ -160,49 +157,6 @@
return getHeadersAndPropertiesEncodeSize();
}
- public LargeMessageEncodingContext createNewContext()
- {
- return new DecodingContext();
- }
-
- @Override
- public void encodeBody(HornetQBuffer bufferOut, LargeMessageEncodingContext context, int size)
- {
- DecodingContext decodingContext = (DecodingContext) context;
- try
- {
- decodingContext.write(bufferOut, size);
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- }
-
- class DecodingContext implements LargeMessageEncodingContext
- {
- private int lastPos = 0;
-
- public void open() throws Exception
- {
- }
-
- public void close() throws Exception
- {
- }
-
- public int write(final ByteBuffer bufferRead) throws Exception
- {
- return -1;
- }
-
- public int write(final HornetQBuffer bufferRead, final int size)
- {
- bufferRead.writeBytes(getBody(), lastPos, size);
- lastPos += size;
- return size;
- }
- }
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/server/LargeServerMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/LargeServerMessage.java 2009-11-06 20:38:14 UTC (rev 8247)
+++ trunk/src/main/org/hornetq/core/server/LargeServerMessage.java 2009-11-07 00:33:24 UTC (rev 8248)
@@ -13,7 +13,7 @@
package org.hornetq.core.server;
-import org.hornetq.core.message.LargeMessageEncodingContext;
+import org.hornetq.core.message.BodyEncoder;
/**
* A LargeMessage
@@ -43,6 +43,4 @@
void incrementDelayDeletionCount();
void decrementDelayDeletionCount() throws Exception;
-
- LargeMessageEncodingContext createNewContext();
}
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-11-06 20:38:14 UTC (rev 8247)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-11-07 00:33:24 UTC (rev 8248)
@@ -24,12 +24,13 @@
import org.hornetq.core.buffers.ChannelBuffers;
import org.hornetq.core.client.impl.ClientConsumerImpl;
import org.hornetq.core.client.management.impl.ManagementHelper;
+import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.management.ManagementService;
import org.hornetq.core.management.Notification;
import org.hornetq.core.management.NotificationType;
-import org.hornetq.core.message.LargeMessageEncodingContext;
+import org.hornetq.core.message.BodyEncoder;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.QueueBinding;
@@ -632,7 +633,7 @@
/** The current position on the message being processed */
private long positionPendingLargeMessage;
- private LargeMessageEncodingContext context;
+ private BodyEncoder context;
public LargeMessageDeliverer(final LargeServerMessage message, final MessageReference ref) throws Exception
{
@@ -672,7 +673,7 @@
largeMessage.getLargeBodySize(),
ref.getDeliveryCount());
- context = largeMessage.createNewContext();
+ context = largeMessage.getBodyEncoder();
context.open();
@@ -783,7 +784,7 @@
}
}
- private SessionReceiveContinuationMessage createChunkSend(final LargeMessageEncodingContext context)
+ private SessionReceiveContinuationMessage createChunkSend(final BodyEncoder context) throws HornetQException
{
SessionReceiveContinuationMessage chunk;
@@ -793,8 +794,7 @@
HornetQBuffer bodyBuffer = ChannelBuffers.buffer(localChunkLen);
- // pendingLargeMessage.encodeBody(bodyBuffer, positionPendingLargeMessage, localChunkLen);
- largeMessage.encodeBody(bodyBuffer, context, localChunkLen);
+ context.encode(bodyBuffer, localChunkLen);
chunk = new SessionReceiveContinuationMessage(id,
bodyBuffer.array(),
Modified: trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2009-11-06 20:38:14 UTC (rev 8247)
+++ trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2009-11-07 00:33:24 UTC (rev 8248)
@@ -31,6 +31,8 @@
import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.Message;
+import org.hornetq.core.persistence.impl.journal.FileLargeServerMessage;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.remoting.spi.HornetQBuffer;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
@@ -2080,6 +2082,69 @@
}
}
+ // The ClientConsumer should be able to also send ServerLargeMessages as that's done by the CoreBridge
+ public void testSendServerMessage() throws Exception
+ {
+ HornetQServer server = createServer(true);
+
+ server.start();
+
+ ClientSessionFactory sf = createFactory(false);
+
+ ClientSession session = sf.createSession(false, false);
+
+ try
+ {
+ FileLargeServerMessage fileMessage = new FileLargeServerMessage((JournalStorageManager)server.getStorageManager());
+
+ fileMessage.setMessageID(1005);
+
+ for (int i = 0 ; i < LARGE_MESSAGE_SIZE; i++)
+ {
+ fileMessage.addBytes(new byte[]{getSamplebyte(i)});
+ }
+
+ fileMessage.releaseResources();
+
+ session.createQueue(ADDRESS, ADDRESS, true);
+
+ ClientProducer prod = session.createProducer(ADDRESS);
+
+ prod.send(fileMessage);
+
+ fileMessage.deleteFile();
+
+ session.commit();
+
+ session.start();
+
+ ClientConsumer cons = session.createConsumer(ADDRESS);
+
+ ClientMessage msg = cons.receive(5000);
+
+ assertNotNull(msg);
+
+ assertEquals(msg.getBodySize(), LARGE_MESSAGE_SIZE);
+
+ for (int i = 0 ; i < LARGE_MESSAGE_SIZE; i++)
+ {
+ assertEquals(getSamplebyte(i), msg.getBody().readByte());
+ }
+
+ msg.acknowledge();
+
+ session.commit();
+
+ }
+ finally
+ {
+ sf.close();
+ server.stop();
+ }
+ }
+
+
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2009-11-06 20:38:14 UTC (rev 8247)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2009-11-07 00:33:24 UTC (rev 8248)
@@ -59,8 +59,7 @@
internaltestSimpleBridge(false, true);
}
- // Commented out by Clebert - I'm investigating this failure.. so I've set as disabled
- public void disabled_testSimpleBridgeLargeMessageNullPersistence() throws Exception
+ public void testSimpleBridgeLargeMessageNullPersistence() throws Exception
{
internaltestSimpleBridge(true, false);
}
@@ -234,8 +233,7 @@
internalTestWithFilter(false, true);
}
- // Commented out by Clebert - I'm investigating this failure.. so I've set as disabled
- public void disabled_testWithFilterLargeMessages() throws Exception
+ public void testWithFilterLargeMessages() throws Exception
{
internalTestWithFilter(true, false);
}
Added: trunk/tests/src/org/hornetq/tests/integration/largemessage/ServerLargeMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/largemessage/ServerLargeMessageTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/largemessage/ServerLargeMessageTest.java 2009-11-07 00:33:24 UTC (rev 8248)
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.largemessage;
+
+import org.hornetq.core.client.ClientConsumer;
+import org.hornetq.core.client.ClientMessage;
+import org.hornetq.core.client.ClientProducer;
+import org.hornetq.core.client.ClientSession;
+import org.hornetq.core.client.ClientSessionFactory;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.persistence.impl.journal.FileLargeServerMessage;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ * A ServerLargeMessageTest
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ServerLargeMessageTest extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // The ClientConsumer should be able to also send ServerLargeMessages as that's done by the CoreBridge
+ public void testSendServerMessage() throws Exception
+ {
+ HornetQServer server = createServer(true);
+
+ server.start();
+
+ ClientSessionFactory sf = createFactory(false);
+
+ ClientSession session = sf.createSession(false, false);
+
+ try
+ {
+ FileLargeServerMessage fileMessage = new FileLargeServerMessage((JournalStorageManager)server.getStorageManager());
+
+ fileMessage.setMessageID(1005);
+
+ for (int i = 0 ; i < 2 * ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE; i++)
+ {
+ fileMessage.addBytes(new byte[]{getSamplebyte(i)});
+ }
+
+ fileMessage.releaseResources();
+
+ session.createQueue("A", "A");
+
+ ClientProducer prod = session.createProducer("A");
+
+ prod.send(fileMessage);
+
+ fileMessage.deleteFile();
+
+ session.commit();
+
+ session.start();
+
+ ClientConsumer cons = session.createConsumer("A");
+
+ ClientMessage msg = cons.receive(5000);
+
+ assertNotNull(msg);
+
+ assertEquals(msg.getBodySize(), 2 * ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
+
+ for (int i = 0 ; i < 2 * ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE; i++)
+ {
+ assertEquals(getSamplebyte(i), msg.getBody().readByte());
+ }
+
+ msg.acknowledge();
+
+ session.commit();
+
+ }
+ finally
+ {
+ sf.close();
+ server.stop();
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/AIOJournalImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/AIOJournalImplTest.java 2009-11-06 20:38:14 UTC (rev 8247)
+++ trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/AIOJournalImplTest.java 2009-11-07 00:33:24 UTC (rev 8248)
@@ -17,7 +17,6 @@
import junit.framework.TestSuite;
-import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
import org.hornetq.core.logging.Logger;
@@ -44,6 +43,7 @@
super.setUp();
}
+ @Override
protected SequentialFileFactory getFileFactory() throws Exception
{
File file = new File(getTestDir());
Modified: trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/FakeJournalImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/FakeJournalImplTest.java 2009-11-06 20:38:14 UTC (rev 8247)
+++ trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/FakeJournalImplTest.java 2009-11-07 00:33:24 UTC (rev 8248)
@@ -25,9 +25,9 @@
*/
public class FakeJournalImplTest extends JournalImplTestUnit
{
- protected SequentialFileFactory getFileFactory() throws Exception
- {
- return new FakeSequentialFileFactory();
- }
+ @Override
+ protected SequentialFileFactory getFileFactory() throws Exception
+ {
+ return new FakeSequentialFileFactory();
+ }
}
-
Modified: trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/JournalImplTestUnit.java 2009-11-06 20:38:14 UTC (rev 8247)
+++ trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/JournalImplTestUnit.java 2009-11-07 00:33:24 UTC (rev 8248)
@@ -32,43 +32,44 @@
public abstract class JournalImplTestUnit extends JournalImplTestBase
{
private static final Logger log = Logger.getLogger(JournalImplTestUnit.class);
-
+
+ @Override
protected void tearDown() throws Exception
{
super.tearDown();
-
+
assertEquals(0, AsynchronousFileImpl.getTotalMaxIO());
}
-
+
public void testAddUpdateDeleteManyLargeFileSize() throws Exception
{
final int numberAdds = 10000;
-
+
final int numberUpdates = 5000;
-
+
final int numberDeletes = 3000;
-
+
long[] adds = new long[numberAdds];
-
+
for (int i = 0; i < numberAdds; i++)
{
adds[i] = i;
}
-
+
long[] updates = new long[numberUpdates];
-
+
for (int i = 0; i < numberUpdates; i++)
{
updates[i] = i;
}
-
+
long[] deletes = new long[numberDeletes];
-
+
for (int i = 0; i < numberDeletes; i++)
{
deletes[i] = i;
}
-
+
setup(10, 10 * 1024 * 1024, true);
createJournal();
startJournal();
@@ -80,38 +81,38 @@
createJournal();
startJournal();
loadAndCheck();
-
+
}
-
+
public void testAddUpdateDeleteManySmallFileSize() throws Exception
{
final int numberAdds = 10000;
-
+
final int numberUpdates = 5000;
-
+
final int numberDeletes = 3000;
-
+
long[] adds = new long[numberAdds];
-
+
for (int i = 0; i < numberAdds; i++)
{
adds[i] = i;
}
-
+
long[] updates = new long[numberUpdates];
-
+
for (int i = 0; i < numberUpdates; i++)
{
updates[i] = i;
}
-
+
long[] deletes = new long[numberDeletes];
-
+
for (int i = 0; i < numberDeletes; i++)
{
deletes[i] = i;
}
-
+
setup(10, 10 * 1024, true);
createJournal();
startJournal();
@@ -124,57 +125,53 @@
createJournal();
startJournal();
loadAndCheck();
-
+
}
-
+
public void testReclaimAndReload() throws Exception
{
setup(2, 10 * 1024 * 1024, false);
createJournal();
startJournal();
load();
-
+
long start = System.currentTimeMillis();
-
-
+
byte[] record = generateRecord(recordLength);
-
+
int NUMBER_OF_RECORDS = 1000;
for (int count = 0; count < NUMBER_OF_RECORDS; count++)
{
journal.appendAddRecord(count, (byte)0, record, false);
-
+
if (count >= NUMBER_OF_RECORDS / 2)
{
journal.appendDeleteRecord(count - NUMBER_OF_RECORDS / 2, false);
}
-
+
if (count % 100 == 0)
{
log.debug("Done: " + count);
}
}
-
+
long end = System.currentTimeMillis();
-
- double rate = 1000 * ((double)NUMBER_OF_RECORDS) / (end - start);
-
+
+ double rate = 1000 * (double)NUMBER_OF_RECORDS / (end - start);
+
log.debug("Rate of " + rate + " adds/removes per sec");
-
+
log.debug("Reclaim status = " + debugJournal());
-
+
stopJournal();
createJournal();
startJournal();
journal.load(new ArrayList<RecordInfo>(), new ArrayList<PreparedTransactionInfo>(), null);
-
+
assertEquals(NUMBER_OF_RECORDS / 2, journal.getIDMapSize());
-
+
stopJournal();
}
-
-
-}
-
+}
Modified: trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/NIOJournalImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/NIOJournalImplTest.java 2009-11-06 20:38:14 UTC (rev 8247)
+++ trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/NIOJournalImplTest.java 2009-11-07 00:33:24 UTC (rev 8248)
@@ -29,23 +29,22 @@
*/
public class NIOJournalImplTest extends JournalImplTestUnit
{
- private static final Logger log = Logger.getLogger(NIOJournalImplTest.class);
-
- protected String journalDir = System.getProperty("user.home") + "/journal-test";
-
- protected SequentialFileFactory getFileFactory() throws Exception
- {
- File file = new File(journalDir);
-
- log.debug("deleting directory " + journalDir);
-
- deleteDirectory(file);
-
- file.mkdir();
-
- return new NIOSequentialFileFactory(journalDir);
- }
-
-
-}
+ private static final Logger log = Logger.getLogger(NIOJournalImplTest.class);
+ protected String journalDir = System.getProperty("user.home") + "/journal-test";
+
+ @Override
+ protected SequentialFileFactory getFileFactory() throws Exception
+ {
+ File file = new File(journalDir);
+
+ log.debug("deleting directory " + journalDir);
+
+ deleteDirectory(file);
+
+ file.mkdir();
+
+ return new NIOSequentialFileFactory(journalDir);
+ }
+
+}
Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2009-11-06 20:38:14 UTC (rev 8247)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2009-11-07 00:33:24 UTC (rev 8248)
@@ -22,7 +22,7 @@
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.filter.Filter;
-import org.hornetq.core.message.LargeMessageEncodingContext;
+import org.hornetq.core.message.BodyEncoder;
import org.hornetq.core.message.PropertyConversionException;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.postoffice.Binding;
@@ -458,7 +458,7 @@
}
- public void encodeBody(HornetQBuffer bufferOut, LargeMessageEncodingContext context, int size)
+ public void encodeBody(HornetQBuffer bufferOut, BodyEncoder context, int size)
{
// To change body of implemented methods use File | Settings | File Templates.
}
@@ -1116,6 +1116,15 @@
return false;
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#getBodyEncoder()
+ */
+ public BodyEncoder getBodyEncoder()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
}
class FakeFilter implements Filter
15 years, 1 month
JBoss hornetq SVN: r8247 - trunk/tests/src/org/hornetq/tests/timing/core/journal/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-06 15:38:14 -0500 (Fri, 06 Nov 2009)
New Revision: 8247
Added:
trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/AIOJournalImplTest.java
trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/NIOJournalImplTest.java
Removed:
trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/RealJournalImplAIOTest.java
trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/RealJournalImplTest.java
Log:
tweaks
Copied: trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/AIOJournalImplTest.java (from rev 8244, trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/RealJournalImplAIOTest.java)
===================================================================
--- trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/AIOJournalImplTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/AIOJournalImplTest.java 2009-11-06 20:38:14 UTC (rev 8247)
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.timing.core.journal.impl;
+
+import java.io.File;
+
+import junit.framework.TestSuite;
+
+import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
+import org.hornetq.core.logging.Logger;
+
+/**
+ *
+ * A RealJournalImplTest
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ */
+public class AIOJournalImplTest extends JournalImplTestUnit
+{
+ private static final Logger log = Logger.getLogger(AIOJournalImplTest.class);
+
+ public static TestSuite suite()
+ {
+ return createAIOTestSuite(AIOJournalImplTest.class);
+ }
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ }
+
+ protected SequentialFileFactory getFileFactory() throws Exception
+ {
+ File file = new File(getTestDir());
+
+ deleteDirectory(file);
+
+ file.mkdir();
+
+ return new AIOSequentialFileFactory(getTestDir());
+ }
+
+}
Copied: trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/NIOJournalImplTest.java (from rev 8244, trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/RealJournalImplTest.java)
===================================================================
--- trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/NIOJournalImplTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/NIOJournalImplTest.java 2009-11-06 20:38:14 UTC (rev 8247)
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.timing.core.journal.impl;
+
+import java.io.File;
+
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
+import org.hornetq.core.logging.Logger;
+
+/**
+ *
+ * A RealJournalImplTest
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
+ *
+ */
+public class NIOJournalImplTest extends JournalImplTestUnit
+{
+ private static final Logger log = Logger.getLogger(NIOJournalImplTest.class);
+
+ protected String journalDir = System.getProperty("user.home") + "/journal-test";
+
+ protected SequentialFileFactory getFileFactory() throws Exception
+ {
+ File file = new File(journalDir);
+
+ log.debug("deleting directory " + journalDir);
+
+ deleteDirectory(file);
+
+ file.mkdir();
+
+ return new NIOSequentialFileFactory(journalDir);
+ }
+
+
+}
+
Deleted: trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/RealJournalImplAIOTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/RealJournalImplAIOTest.java 2009-11-06 20:34:55 UTC (rev 8246)
+++ trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/RealJournalImplAIOTest.java 2009-11-06 20:38:14 UTC (rev 8247)
@@ -1,58 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.timing.core.journal.impl;
-
-import java.io.File;
-
-import junit.framework.TestSuite;
-
-import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
-import org.hornetq.core.journal.SequentialFileFactory;
-import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
-import org.hornetq.core.logging.Logger;
-
-/**
- *
- * A RealJournalImplTest
- *
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
- */
-public class RealJournalImplAIOTest extends JournalImplTestUnit
-{
- private static final Logger log = Logger.getLogger(RealJournalImplAIOTest.class);
-
- public static TestSuite suite()
- {
- return createAIOTestSuite(RealJournalImplAIOTest.class);
- }
-
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
- }
-
- protected SequentialFileFactory getFileFactory() throws Exception
- {
- File file = new File(getTestDir());
-
- deleteDirectory(file);
-
- file.mkdir();
-
- return new AIOSequentialFileFactory(getTestDir());
- }
-
-}
Deleted: trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/RealJournalImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/RealJournalImplTest.java 2009-11-06 20:34:55 UTC (rev 8246)
+++ trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/RealJournalImplTest.java 2009-11-06 20:38:14 UTC (rev 8247)
@@ -1,51 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.timing.core.journal.impl;
-
-import java.io.File;
-
-import org.hornetq.core.journal.SequentialFileFactory;
-import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
-import org.hornetq.core.logging.Logger;
-
-/**
- *
- * A RealJournalImplTest
- *
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- * @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
- *
- */
-public class RealJournalImplTest extends JournalImplTestUnit
-{
- private static final Logger log = Logger.getLogger(RealJournalImplTest.class);
-
- protected String journalDir = System.getProperty("user.home") + "/journal-test";
-
- protected SequentialFileFactory getFileFactory() throws Exception
- {
- File file = new File(journalDir);
-
- log.debug("deleting directory " + journalDir);
-
- deleteDirectory(file);
-
- file.mkdir();
-
- return new NIOSequentialFileFactory(journalDir);
- }
-
-
-}
-
15 years, 1 month
JBoss hornetq SVN: r8246 - trunk/tests/src/org/hornetq/tests/integration/journal.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-06 15:34:55 -0500 (Fri, 06 Nov 2009)
New Revision: 8246
Added:
trunk/tests/src/org/hornetq/tests/integration/journal/AIOJournalImplTest.java
trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalImplTest.java
Removed:
trunk/tests/src/org/hornetq/tests/integration/journal/RealAIOJournalImplTest.java
trunk/tests/src/org/hornetq/tests/integration/journal/RealNIOJournalImplTest.java
Log:
tweaks
Copied: trunk/tests/src/org/hornetq/tests/integration/journal/AIOJournalImplTest.java (from rev 8244, trunk/tests/src/org/hornetq/tests/integration/journal/RealAIOJournalImplTest.java)
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/AIOJournalImplTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/AIOJournalImplTest.java 2009-11-06 20:34:55 UTC (rev 8246)
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.journal;
+
+import java.io.File;
+
+import junit.framework.TestSuite;
+
+import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.tests.unit.core.journal.impl.JournalImplTestUnit;
+
+/**
+ *
+ * A RealJournalImplTest
+ * you need to define -Djava.library.path=${project-root}/native/src/.libs when calling the JVM
+ * If you are running this test in eclipse you should do:
+ * I - Run->Open Run Dialog
+ * II - Find the class on the list (you will find it if you already tried running this testcase before)
+ * III - Add -Djava.library.path=<your project place>/native/src/.libs
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
+ *
+ */
+public class AIOJournalImplTest extends JournalImplTestUnit
+{
+ private static final Logger log = Logger.getLogger(AIOJournalImplTest.class);
+
+ public static TestSuite suite()
+ {
+ // Ignore tests if AIO is not installed
+ return createAIOTestSuite(AIOJournalImplTest.class);
+ }
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ if (!AsynchronousFileImpl.isLoaded())
+ {
+ fail(String.format("libAIO is not loaded on %s %s %s",
+ System.getProperty("os.name"),
+ System.getProperty("os.arch"),
+ System.getProperty("os.version")));
+ }
+ }
+
+ @Override
+ protected SequentialFileFactory getFileFactory() throws Exception
+ {
+ File file = new File(getTestDir());
+
+ deleteDirectory(file);
+
+ file.mkdir();
+
+ return new AIOSequentialFileFactory(getTestDir(),
+ ConfigurationImpl.DEFAULT_JOURNAL_AIO_BUFFER_SIZE,
+ 1000000,
+ true,
+ false
+ );
+ }
+
+
+ @Override
+ protected int getAlignment()
+ {
+ return 512;
+ }
+
+}
Copied: trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalImplTest.java (from rev 8244, trunk/tests/src/org/hornetq/tests/integration/journal/RealNIOJournalImplTest.java)
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalImplTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalImplTest.java 2009-11-06 20:34:55 UTC (rev 8246)
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.journal;
+
+import java.io.File;
+
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.tests.unit.core.journal.impl.JournalImplTestUnit;
+
+/**
+ *
+ * A RealJournalImplTest
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ */
+public class NIOJournalImplTest extends JournalImplTestUnit
+{
+ private static final Logger log = Logger.getLogger(NIOJournalImplTest.class);
+
+ @Override
+ protected SequentialFileFactory getFileFactory() throws Exception
+ {
+ File file = new File(getTestDir());
+
+ log.debug("deleting directory " + getTestDir());
+
+ deleteDirectory(file);
+
+ file.mkdir();
+
+ return new NIOSequentialFileFactory(getTestDir());
+ }
+
+ @Override
+ protected int getAlignment()
+ {
+ return 1;
+ }
+
+}
Deleted: trunk/tests/src/org/hornetq/tests/integration/journal/RealAIOJournalImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/RealAIOJournalImplTest.java 2009-11-06 20:25:51 UTC (rev 8245)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/RealAIOJournalImplTest.java 2009-11-06 20:34:55 UTC (rev 8246)
@@ -1,87 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.integration.journal;
-
-import java.io.File;
-
-import junit.framework.TestSuite;
-
-import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
-import org.hornetq.core.config.impl.ConfigurationImpl;
-import org.hornetq.core.journal.SequentialFileFactory;
-import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.tests.unit.core.journal.impl.JournalImplTestUnit;
-
-/**
- *
- * A RealJournalImplTest
- * you need to define -Djava.library.path=${project-root}/native/src/.libs when calling the JVM
- * If you are running this test in eclipse you should do:
- * I - Run->Open Run Dialog
- * II - Find the class on the list (you will find it if you already tried running this testcase before)
- * III - Add -Djava.library.path=<your project place>/native/src/.libs
- *
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- * @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
- *
- */
-public class RealAIOJournalImplTest extends JournalImplTestUnit
-{
- private static final Logger log = Logger.getLogger(RealAIOJournalImplTest.class);
-
- public static TestSuite suite()
- {
- // Ignore tests if AIO is not installed
- return createAIOTestSuite(RealAIOJournalImplTest.class);
- }
-
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
- if (!AsynchronousFileImpl.isLoaded())
- {
- fail(String.format("libAIO is not loaded on %s %s %s",
- System.getProperty("os.name"),
- System.getProperty("os.arch"),
- System.getProperty("os.version")));
- }
- }
-
- @Override
- protected SequentialFileFactory getFileFactory() throws Exception
- {
- File file = new File(getTestDir());
-
- deleteDirectory(file);
-
- file.mkdir();
-
- return new AIOSequentialFileFactory(getTestDir(),
- ConfigurationImpl.DEFAULT_JOURNAL_AIO_BUFFER_SIZE,
- 1000000,
- true,
- false
- );
- }
-
-
- @Override
- protected int getAlignment()
- {
- return 512;
- }
-
-}
Deleted: trunk/tests/src/org/hornetq/tests/integration/journal/RealNIOJournalImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/RealNIOJournalImplTest.java 2009-11-06 20:25:51 UTC (rev 8245)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/RealNIOJournalImplTest.java 2009-11-06 20:34:55 UTC (rev 8246)
@@ -1,54 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.integration.journal;
-
-import java.io.File;
-
-import org.hornetq.core.journal.SequentialFileFactory;
-import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.tests.unit.core.journal.impl.JournalImplTestUnit;
-
-/**
- *
- * A RealJournalImplTest
- *
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
- */
-public class RealNIOJournalImplTest extends JournalImplTestUnit
-{
- private static final Logger log = Logger.getLogger(RealNIOJournalImplTest.class);
-
- @Override
- protected SequentialFileFactory getFileFactory() throws Exception
- {
- File file = new File(getTestDir());
-
- log.debug("deleting directory " + getTestDir());
-
- deleteDirectory(file);
-
- file.mkdir();
-
- return new NIOSequentialFileFactory(getTestDir());
- }
-
- @Override
- protected int getAlignment()
- {
- return 1;
- }
-
-}
15 years, 1 month
JBoss hornetq SVN: r8245 - trunk/tests/src/org/hornetq/tests/integration/cluster/failover.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-06 15:25:51 -0500 (Fri, 06 Nov 2009)
New Revision: 8245
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-207 - removing invalid test
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java 2009-11-06 17:45:51 UTC (rev 8244)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java 2009-11-06 20:25:51 UTC (rev 8245)
@@ -58,28 +58,22 @@
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
-
+
public void testPage() throws Exception
{
- internalTestPagedTransacted(false, false);
+ internalTestPage(false);
}
-
- public void testPageTransactioned() throws Exception
+ public void testPageFailBeforeconsume() throws Exception
{
- internalTestPagedTransacted(true, false);
+ internalTestPage(true);
}
- public void testPageTransactionedFailBeforeconsume() throws Exception
+ public void internalTestPage(final boolean failBeforeConsume) throws Exception
{
- internalTestPagedTransacted(true, true);
- }
-
- public void internalTestPagedTransacted(final boolean transacted, final boolean failBeforeConsume) throws Exception
- {
ClientSessionFactoryInternal factory = getSessionFactory();
factory.setBlockOnPersistentSend(true);
- ClientSession session = factory.createSession(!transacted, !transacted, 0);
+ ClientSession session = factory.createSession(true, true, 0);
try
{
@@ -109,7 +103,7 @@
for (int i = 0; i < TOTAL_MESSAGES; i++)
{
- if (transacted && i % 10 == 0)
+ if (i % 10 == 0)
{
session.commit();
}
@@ -126,7 +120,6 @@
failSession(session, latch);
}
-
session.start();
ClientConsumer cons = session.createConsumer(ADDRESS);
@@ -138,7 +131,7 @@
ClientMessage msg = cons.receive(20000);
assertNotNull(msg);
msg.acknowledge();
- if (transacted && i % 10 == 0)
+ if (i % 10 == 0)
{
session.commit();
}
@@ -252,5 +245,4 @@
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
-
}
15 years, 1 month
JBoss hornetq SVN: r8244 - in trunk: tests/config and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-06 12:45:51 -0500 (Fri, 06 Nov 2009)
New Revision: 8244
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
trunk/tests/config/hornetq-jms-for-JMSServerDeployerTest.xml
trunk/tests/src/org/hornetq/tests/integration/jms/server/JMSServerDeployerTest.java
Log:
adding cache-on-client parameter to validation test
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2009-11-06 17:26:10 UTC (rev 8243)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2009-11-06 17:45:51 UTC (rev 8244)
@@ -430,6 +430,8 @@
reconnectAttempts = DEFAULT_RECONNECT_ATTEMPTS;
failoverOnServerShutdown = DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN;
+
+ cacheLargeMessagesClient = DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
}
public ClientSessionFactoryImpl(final String discoveryAddress, final int discoveryPort)
Modified: trunk/tests/config/hornetq-jms-for-JMSServerDeployerTest.xml
===================================================================
--- trunk/tests/config/hornetq-jms-for-JMSServerDeployerTest.xml 2009-11-06 17:26:10 UTC (rev 8243)
+++ trunk/tests/config/hornetq-jms-for-JMSServerDeployerTest.xml 2009-11-06 17:45:51 UTC (rev 8244)
@@ -33,6 +33,7 @@
<retry-interval>5</retry-interval>
<retry-interval-multiplier>6.0</retry-interval-multiplier>
<max-retry-interval>300</max-retry-interval>
+ <cache-large-message-client>true</cache-large-message-client>
</connection-factory>
<queue name="fullConfigurationQueue">
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/server/JMSServerDeployerTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/server/JMSServerDeployerTest.java 2009-11-06 17:26:10 UTC (rev 8243)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/server/JMSServerDeployerTest.java 2009-11-06 17:45:51 UTC (rev 8244)
@@ -210,6 +210,7 @@
assertEquals(34, cf.getReconnectAttempts());
assertEquals(5, cf.getRetryInterval());
assertEquals(6.0, cf.getRetryIntervalMultiplier());
+ assertEquals(true, cf.isCacheLargeMessagesClient());
}
for (String binding : queueBindings)
15 years, 1 month
JBoss hornetq SVN: r8243 - trunk/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-06 12:26:10 -0500 (Fri, 06 Nov 2009)
New Revision: 8243
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
Log:
Fixing JMSBridgeReconnectTest
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2009-11-06 15:59:15 UTC (rev 8242)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2009-11-06 17:26:10 UTC (rev 8243)
@@ -373,6 +373,8 @@
reconnectAttempts = other.getReconnectAttempts();
failoverOnServerShutdown = other.isFailoverOnServerShutdown();
+
+ cacheLargeMessagesClient = other.isCacheLargeMessagesClient();
}
public ClientSessionFactoryImpl()
15 years, 1 month
JBoss hornetq SVN: r8242 - in trunk: src/main/org/hornetq/core/cluster/impl and 13 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-11-06 10:59:15 -0500 (Fri, 06 Nov 2009)
New Revision: 8242
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManager.java
trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientProducerCredits.java
trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionPacketHandler.java
trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
trunk/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java
trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
trunk/src/main/org/hornetq/core/management/Notification.java
trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionProducerCreditsMessage.java
trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
trunk/src/main/org/hornetq/core/server/MessageReference.java
trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java
trunk/src/main/org/hornetq/core/server/impl/MemoryManagerImpl.java
trunk/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/hornetq/integration/bootstrap/HornetQBootstrapServer.java
trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java
Log:
various tweaks, changes to prod flow control, thread names etc
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManager.java 2009-11-06 15:32:46 UTC (rev 8241)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManager.java 2009-11-06 15:59:15 UTC (rev 8242)
@@ -26,7 +26,7 @@
{
ClientProducerCredits getCredits(SimpleString destination);
- void receiveCredits(SimpleString destination, int credits);
+ void receiveCredits(SimpleString destination, int credits, int offset);
void reset();
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java 2009-11-06 15:32:46 UTC (rev 8241)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java 2009-11-06 15:59:15 UTC (rev 8242)
@@ -55,13 +55,13 @@
return credits;
}
- public synchronized void receiveCredits(final SimpleString destination, final int credits)
+ public synchronized void receiveCredits(final SimpleString destination, final int credits, final int offset)
{
ClientProducerCredits cr = producerCredits.get(destination);
if (cr != null)
{
- cr.receiveCredits(credits);
+ cr.receiveCredits(credits, offset);
}
}
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerCredits.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerCredits.java 2009-11-06 15:32:46 UTC (rev 8241)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerCredits.java 2009-11-06 15:59:15 UTC (rev 8242)
@@ -22,9 +22,9 @@
*/
public interface ClientProducerCredits
{
- void acquireCredits(final int credits) throws InterruptedException;
+ void acquireCredits(int credits) throws InterruptedException;
- void receiveCredits(final int credits);
+ void receiveCredits(int credits, int offset);
void reset();
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java 2009-11-06 15:32:46 UTC (rev 8241)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java 2009-11-06 15:59:15 UTC (rev 8242)
@@ -38,6 +38,8 @@
private final ClientSessionInternal session;
private int arriving;
+
+ private int offset;
public ClientProducerCreditsImpl(final ClientSessionInternal session,
final SimpleString destination,
@@ -58,16 +60,20 @@
checkCredits(windowSize);
}
- public void acquireCredits(final int credits) throws InterruptedException
+ public void acquireCredits(int credits) throws InterruptedException
{
+ // credits += offset;
+
checkCredits(credits);
semaphore.acquire(credits);
}
- public synchronized void receiveCredits(final int credits)
+ public synchronized void receiveCredits(final int credits, final int offset)
{
arriving -= credits;
+
+ this.offset = offset;
semaphore.release(credits);
}
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-11-06 15:32:46 UTC (rev 8241)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-11-06 15:59:15 UTC (rev 8242)
@@ -977,9 +977,9 @@
return producerCreditManager.getCredits(address);
}
- public void handleReceiveProducerCredits(final SimpleString address, final int credits)
+ public void handleReceiveProducerCredits(final SimpleString address, final int credits, final int offset)
{
- producerCreditManager.receiveCredits(address, credits);
+ producerCreditManager.receiveCredits(address, credits, offset);
}
// CommandConfirmationHandler implementation ------------------------------------
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java 2009-11-06 15:32:46 UTC (rev 8241)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java 2009-11-06 15:59:15 UTC (rev 8242)
@@ -71,5 +71,5 @@
ClientProducerCredits getCredits(SimpleString address);
- void handleReceiveProducerCredits(SimpleString address, int credits);
+ void handleReceiveProducerCredits(SimpleString address, int credits, int offset);
}
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionPacketHandler.java 2009-11-06 15:32:46 UTC (rev 8241)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionPacketHandler.java 2009-11-06 15:59:15 UTC (rev 8242)
@@ -84,7 +84,8 @@
{
SessionProducerCreditsMessage message = (SessionProducerCreditsMessage)packet;
- clientSession.handleReceiveProducerCredits(message.getAddress(), message.getCredits());
+ clientSession.handleReceiveProducerCredits(message.getAddress(), message.getCredits(),
+ message.getOffset());
break;
}
Modified: trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2009-11-06 15:32:46 UTC (rev 8241)
+++ trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2009-11-06 15:59:15 UTC (rev 8242)
@@ -539,8 +539,8 @@
return session.getCredits(address);
}
- public void handleReceiveProducerCredits(SimpleString address, int credits)
+ public void handleReceiveProducerCredits(SimpleString address, int credits, int offset)
{
- session.handleReceiveProducerCredits(address, credits);
+ session.handleReceiveProducerCredits(address, credits, offset);
}
}
Modified: trunk/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java 2009-11-06 15:32:46 UTC (rev 8241)
+++ trunk/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java 2009-11-06 15:59:15 UTC (rev 8242)
@@ -116,7 +116,7 @@
started = true;
- thread = new Thread(this);
+ thread = new Thread(this, "hornetq-discovery-group-thread-" + name);
thread.setDaemon(true);
@@ -126,7 +126,7 @@
{
TypedProperties props = new TypedProperties();
props.putSimpleStringProperty(new SimpleString("name"), new SimpleString(name));
- Notification notification = new Notification(nodeID, NotificationType.DISCOVERY_GROUP_STARTED, props );
+ Notification notification = new Notification(nodeID, NotificationType.DISCOVERY_GROUP_STARTED, props);
notificationService.sendNotification(notification );
}
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-11-06 15:32:46 UTC (rev 8241)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-11-06 15:59:15 UTC (rev 8242)
@@ -3506,6 +3506,8 @@
private PerfBlast(final int pages)
{
+ super("hornetq-perfblast-thread");
+
this.pages = pages;
}
Modified: trunk/src/main/org/hornetq/core/management/Notification.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/Notification.java 2009-11-06 15:32:46 UTC (rev 8241)
+++ trunk/src/main/org/hornetq/core/management/Notification.java 2009-11-06 15:59:15 UTC (rev 8242)
@@ -29,8 +29,10 @@
private final NotificationType type;
private final TypedProperties properties;
+
+ private final String uid;
- public Notification(String uid, final NotificationType type, final TypedProperties properties)
+ public Notification(final String uid, final NotificationType type, final TypedProperties properties)
{
this.uid = uid;
this.type = type;
@@ -46,14 +48,12 @@
{
return properties;
}
-
- private String uid;
-
+
public String getUID()
{
return uid;
}
-
+
@Override
public String toString()
{
Modified: trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java 2009-11-06 15:32:46 UTC (rev 8241)
+++ trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java 2009-11-06 15:59:15 UTC (rev 8242)
@@ -132,15 +132,15 @@
protected MessageImpl(final MessageImpl other)
{
this();
- this.messageID = other.messageID;
- this.destination = other.destination;
- this.type = other.type;
- this.durable = other.durable;
- this.expiration = other.expiration;
- this.timestamp = other.timestamp;
- this.priority = other.priority;
- this.properties = new TypedProperties(other.properties);
- this.body = other.body;
+ messageID = other.messageID;
+ destination = other.destination;
+ type = other.type;
+ durable = other.durable;
+ expiration = other.expiration;
+ timestamp = other.timestamp;
+ priority = other.priority;
+ properties = new TypedProperties(other.properties);
+ body = other.body;
}
/*
@@ -149,15 +149,15 @@
protected MessageImpl(final Message other)
{
this();
- this.messageID = other.getMessageID();
- this.destination = other.getDestination();
- this.type = other.getType();
- this.durable = other.isDurable();
- this.expiration = other.getExpiration();
- this.timestamp = other.getTimestamp();
- this.priority = other.getPriority();
- this.properties = new TypedProperties(other.getProperties());
- this.body = other.getBody();
+ messageID = other.getMessageID();
+ destination = other.getDestination();
+ type = other.getType();
+ durable = other.isDurable();
+ expiration = other.getExpiration();
+ timestamp = other.getTimestamp();
+ priority = other.getPriority();
+ properties = new TypedProperties(other.getProperties());
+ body = other.getBody();
}
protected MessageImpl(final long messageID)
@@ -196,7 +196,7 @@
return body.writerIndex();
}
- public void encodeHeadersAndProperties(HornetQBuffer buffer)
+ public void encodeHeadersAndProperties(final HornetQBuffer buffer)
{
buffer.writeLong(messageID);
buffer.writeSimpleString(destination);
@@ -208,14 +208,14 @@
properties.encode(buffer);
}
- public void encodeBody(HornetQBuffer buffer)
+ public void encodeBody(final HornetQBuffer buffer)
{
HornetQBuffer localBody = getBody();
buffer.writeBytes(localBody.array(), 0, localBody.writerIndex());
}
// Used on Message chunk side
- public void encodeBody(final HornetQBuffer bufferOut, LargeMessageEncodingContext context, int size)
+ public void encodeBody(final HornetQBuffer bufferOut, final LargeMessageEncodingContext context, final int size)
{
context.write(bufferOut, size);
}
@@ -330,7 +330,7 @@
/**
* @param bodyInputStream the bodyInputStream to set
*/
- public void setBodyInputStream(InputStream bodyInputStream)
+ public void setBodyInputStream(final InputStream bodyInputStream)
{
this.bodyInputStream = bodyInputStream;
}
@@ -400,8 +400,8 @@
{
properties.putSimpleStringProperty(key, value);
}
-
- public void putObjectProperty(SimpleString key, Object value) throws PropertyConversionException
+
+ public void putObjectProperty(final SimpleString key, final Object value) throws PropertyConversionException
{
if (value == null)
{
@@ -447,7 +447,7 @@
}
}
- public void putObjectProperty(String key, Object value) throws PropertyConversionException
+ public void putObjectProperty(final String key, final Object value) throws PropertyConversionException
{
putObjectProperty(new SimpleString(key), value);
}
@@ -497,7 +497,7 @@
properties.putSimpleStringProperty(new SimpleString(key), new SimpleString(value));
}
- public void putTypedProperties(TypedProperties otherProps)
+ public void putTypedProperties(final TypedProperties otherProps)
{
properties.putTypedProperties(otherProps);
}
@@ -507,87 +507,87 @@
return properties.getProperty(key);
}
- public Boolean getBooleanProperty(SimpleString key) throws PropertyConversionException
+ public Boolean getBooleanProperty(final SimpleString key) throws PropertyConversionException
{
return properties.getBooleanProperty(key);
}
-
- public Boolean getBooleanProperty(String key) throws PropertyConversionException
+
+ public Boolean getBooleanProperty(final String key) throws PropertyConversionException
{
return properties.getBooleanProperty(new SimpleString(key));
}
- public Byte getByteProperty(SimpleString key) throws PropertyConversionException
+ public Byte getByteProperty(final SimpleString key) throws PropertyConversionException
{
return properties.getByteProperty(key);
}
-
- public Byte getByteProperty(String key) throws PropertyConversionException
+
+ public Byte getByteProperty(final String key) throws PropertyConversionException
{
return properties.getByteProperty(new SimpleString(key));
}
- public byte[] getBytesProperty(SimpleString key) throws PropertyConversionException
+ public byte[] getBytesProperty(final SimpleString key) throws PropertyConversionException
{
return properties.getBytesProperty(key);
}
-
- public byte[] getBytesProperty(String key) throws PropertyConversionException
+
+ public byte[] getBytesProperty(final String key) throws PropertyConversionException
{
return getBytesProperty(new SimpleString(key));
}
- public Double getDoubleProperty(SimpleString key) throws PropertyConversionException
+ public Double getDoubleProperty(final SimpleString key) throws PropertyConversionException
{
return properties.getDoubleProperty(key);
}
- public Double getDoubleProperty(String key) throws PropertyConversionException
+ public Double getDoubleProperty(final String key) throws PropertyConversionException
{
return properties.getDoubleProperty(new SimpleString(key));
}
- public Integer getIntProperty(SimpleString key) throws PropertyConversionException
+ public Integer getIntProperty(final SimpleString key) throws PropertyConversionException
{
return properties.getIntProperty(key);
}
- public Integer getIntProperty(String key) throws PropertyConversionException
+ public Integer getIntProperty(final String key) throws PropertyConversionException
{
return properties.getIntProperty(new SimpleString(key));
}
- public Long getLongProperty(SimpleString key) throws PropertyConversionException
+ public Long getLongProperty(final SimpleString key) throws PropertyConversionException
{
return properties.getLongProperty(key);
}
-
- public Long getLongProperty(String key) throws PropertyConversionException
+
+ public Long getLongProperty(final String key) throws PropertyConversionException
{
return properties.getLongProperty(new SimpleString(key));
}
- public Short getShortProperty(SimpleString key) throws PropertyConversionException
+ public Short getShortProperty(final SimpleString key) throws PropertyConversionException
{
return properties.getShortProperty(key);
}
-
- public Short getShortProperty(String key) throws PropertyConversionException
+
+ public Short getShortProperty(final String key) throws PropertyConversionException
{
return properties.getShortProperty(new SimpleString(key));
}
- public Float getFloatProperty(SimpleString key) throws PropertyConversionException
+ public Float getFloatProperty(final SimpleString key) throws PropertyConversionException
{
return properties.getFloatProperty(key);
}
- public Float getFloatProperty(String key) throws PropertyConversionException
+ public Float getFloatProperty(final String key) throws PropertyConversionException
{
return properties.getFloatProperty(new SimpleString(key));
}
-
- public String getStringProperty(SimpleString key) throws PropertyConversionException
+
+ public String getStringProperty(final SimpleString key) throws PropertyConversionException
{
SimpleString str = getSimpleStringProperty(key);
@@ -600,18 +600,18 @@
return str.toString();
}
}
-
- public String getStringProperty(String key) throws PropertyConversionException
+
+ public String getStringProperty(final String key) throws PropertyConversionException
{
return getStringProperty(new SimpleString(key));
}
-
- public SimpleString getSimpleStringProperty(SimpleString key) throws PropertyConversionException
+
+ public SimpleString getSimpleStringProperty(final SimpleString key) throws PropertyConversionException
{
return properties.getSimpleStringProperty(key);
}
-
- public SimpleString getSimpleStringProperty(String key) throws PropertyConversionException
+
+ public SimpleString getSimpleStringProperty(final String key) throws PropertyConversionException
{
return properties.getSimpleStringProperty(new SimpleString(key));
}
@@ -620,7 +620,7 @@
{
return properties.getProperty(new SimpleString(key));
}
-
+
public Object removeProperty(final SimpleString key)
{
return properties.removeProperty(key);
@@ -648,7 +648,7 @@
public TypedProperties getProperties()
{
- return this.properties;
+ return properties;
}
// Body
Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2009-11-06 15:32:46 UTC (rev 8241)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2009-11-06 15:59:15 UTC (rev 8242)
@@ -40,6 +40,7 @@
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.MessageReferenceImpl;
import org.hornetq.core.server.impl.ServerProducerCreditManager;
import org.hornetq.core.server.impl.ServerProducerCreditManagerImpl;
import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
@@ -291,7 +292,7 @@
public void addSize(final MessageReference reference, final boolean add) throws Exception
{
- long size = reference.getMemoryEstimate();
+ long size = MessageReferenceImpl.getMemoryEstimate();
if (add)
{
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-06 15:32:46 UTC (rev 8241)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-06 15:59:15 UTC (rev 8242)
@@ -959,7 +959,7 @@
{
if (reaperPeriod > 0)
{
- reaperThread = new Thread(reaperRunnable, "HornetQ-expiry-reaper");
+ reaperThread = new Thread(reaperRunnable, "hornetq-expiry-reaper-thread");
reaperThread.setPriority(reaperPriority);
Modified: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionProducerCreditsMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionProducerCreditsMessage.java 2009-11-06 15:32:46 UTC (rev 8241)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionProducerCreditsMessage.java 2009-11-06 15:59:15 UTC (rev 8242)
@@ -28,20 +28,24 @@
// Attributes ----------------------------------------------------
private int credits;
-
+
private SimpleString address;
+ private int offset;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public SessionProducerCreditsMessage(final int credits, final SimpleString address)
+ public SessionProducerCreditsMessage(final int credits, final SimpleString address, final int offset)
{
super(SESS_PRODUCER_CREDITS);
this.credits = credits;
-
+
this.address = address;
+
+ this.offset = offset;
}
public SessionProducerCreditsMessage()
@@ -55,22 +59,23 @@
{
return credits;
}
-
+
public SimpleString getAddress()
{
return address;
}
-
-// public boolean isRequiresConfirmations()
-// {
-// return false;
-// }
+ public int getOffset()
+ {
+ return offset;
+ }
+
@Override
public void encodeBody(final HornetQBuffer buffer)
{
buffer.writeInt(credits);
buffer.writeSimpleString(address);
+ buffer.writeInt(offset);
}
@Override
@@ -78,11 +83,14 @@
{
credits = buffer.readInt();
address = buffer.readSimpleString();
+ offset = buffer.readInt();
}
public int getRequiredBufferSize()
{
- int size = BASIC_PACKET_SIZE + DataConstants.SIZE_INT + SimpleString.sizeofString(address);
+ int size = BASIC_PACKET_SIZE + DataConstants.SIZE_INT +
+ SimpleString.sizeofString(address) +
+ DataConstants.SIZE_INT;
return size;
}
Modified: trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2009-11-06 15:32:46 UTC (rev 8241)
+++ trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2009-11-06 15:59:15 UTC (rev 8242)
@@ -444,6 +444,8 @@
FailureCheckThread(final long pauseInterval)
{
+ super("hornetq-failure-check-thread");
+
this.pauseInterval = pauseInterval;
}
Modified: trunk/src/main/org/hornetq/core/server/MessageReference.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/MessageReference.java 2009-11-06 15:32:46 UTC (rev 8241)
+++ trunk/src/main/org/hornetq/core/server/MessageReference.java 2009-11-06 15:59:15 UTC (rev 8242)
@@ -40,8 +40,6 @@
void setScheduledDeliveryTime(long scheduledDeliveryTime);
- int getMemoryEstimate();
-
int getDeliveryCount();
void setDeliveryCount(int deliveryCount);
Modified: trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java 2009-11-06 15:32:46 UTC (rev 8241)
+++ trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java 2009-11-06 15:59:15 UTC (rev 8242)
@@ -185,11 +185,6 @@
return ref.getDeliveryCount();
}
- public int getMemoryEstimate()
- {
- return ref.getMemoryEstimate();
- }
-
public ServerMessage getMessage()
{
return ref.getMessage();
Modified: trunk/src/main/org/hornetq/core/server/impl/MemoryManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/MemoryManagerImpl.java 2009-11-06 15:32:46 UTC (rev 8241)
+++ trunk/src/main/org/hornetq/core/server/impl/MemoryManagerImpl.java 2009-11-06 15:59:15 UTC (rev 8242)
@@ -74,7 +74,7 @@
started = true;
- thread = new Thread(new MemoryRunnable());
+ thread = new Thread(new MemoryRunnable(), "hornetq-memory-manager-thread");
thread.setDaemon(true);
Modified: trunk/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java 2009-11-06 15:32:46 UTC (rev 8241)
+++ trunk/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java 2009-11-06 15:59:15 UTC (rev 8242)
@@ -45,6 +45,13 @@
// Constructors --------------------------------------------------
+ public MessageReferenceImpl()
+ {
+ queue = null;
+
+ message = null;
+ }
+
public MessageReferenceImpl(final MessageReferenceImpl other, final Queue queue)
{
deliveryCount = other.deliveryCount;
@@ -69,7 +76,7 @@
return new MessageReferenceImpl(this, queue);
}
- public int getMemoryEstimate()
+ public static int getMemoryEstimate()
{
// from few tests I have done, deliveryCount and scheduledDelivery will use two longs (because of alignment)
// and each of the references (messages and queue) will use the equivalent to two longs (because of long
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-06 15:32:46 UTC (rev 8241)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-06 15:59:15 UTC (rev 8242)
@@ -28,6 +28,7 @@
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
+import org.hornetq.core.buffers.ChannelBuffers;
import org.hornetq.core.client.impl.ClientMessageImpl;
import org.hornetq.core.client.management.impl.ManagementHelper;
import org.hornetq.core.exception.HornetQException;
@@ -112,7 +113,31 @@
private static final Logger log = Logger.getLogger(ServerSessionImpl.class);
// Static -------------------------------------------------------------------------------
+
+ private static int offset;
+ static
+ {
+ try
+ {
+ ServerMessage msg = new ServerMessageImpl();
+
+ msg.setBody(ChannelBuffers.dynamicBuffer(0));
+
+ msg.setDestination(new SimpleString("foobar"));
+
+ int es = msg.getEncodeSize();
+
+ int me = msg.getMemoryEstimate();
+
+ offset = MessageReferenceImpl.getMemoryEstimate() + me - es;
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to initialise mult and offset", e);
+ }
+ }
+
// Attributes ----------------------------------------------------------------------------
private final long id;
@@ -1895,7 +1920,7 @@
return holder;
}
-
+
private CreditManagerHolder getCreditManagerHolder(final SimpleString address) throws Exception
{
CreditManagerHolder holder = creditManagerHolders.get(address);
@@ -1903,7 +1928,7 @@
if (holder == null)
{
PagingStore store = postOffice.getPagingManager().getPageStore(address);
-
+
holder = new CreditManagerHolder(store);
creditManagerHolders.put(address, holder);
@@ -1913,14 +1938,14 @@
}
private void sendProducerCredits(final CreditManagerHolder holder, final int credits, final SimpleString address)
- {
+ {
holder.outstandingCredits += credits;
- Packet packet = new SessionProducerCreditsMessage(credits, address);
+ Packet packet = new SessionProducerCreditsMessage(credits, address, offset);
channel.send(packet);
}
-
+
private void send(final ServerMessage msg) throws Exception
{
// Look up the paging store
Modified: trunk/src/main/org/hornetq/integration/bootstrap/HornetQBootstrapServer.java
===================================================================
--- trunk/src/main/org/hornetq/integration/bootstrap/HornetQBootstrapServer.java 2009-11-06 15:32:46 UTC (rev 8241)
+++ trunk/src/main/org/hornetq/integration/bootstrap/HornetQBootstrapServer.java 2009-11-06 15:59:15 UTC (rev 8242)
@@ -270,6 +270,11 @@
protected class Shutdown extends Thread
{
+ public Shutdown()
+ {
+ super("hornetq-shutdown-thread");
+ }
+
public void run()
{
HornetQBootstrapServer.this.shutDown();
Modified: trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2009-11-06 15:32:46 UTC (rev 8241)
+++ trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2009-11-06 15:59:15 UTC (rev 8242)
@@ -512,14 +512,7 @@
{
if (connections.remove(connectionID) != null)
{
- // // Execute on different thread to avoid deadlocks
- // new Thread()
- // {
- // public void run()
- // {
listener.connectionDestroyed(connectionID);
- // }
- // }.start();
}
}
Modified: trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java 2009-11-06 15:32:46 UTC (rev 8241)
+++ trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java 2009-11-06 15:59:15 UTC (rev 8242)
@@ -318,7 +318,7 @@
timeChecker = new BatchTimeChecker();
- checkerThread = new Thread(timeChecker);
+ checkerThread = new Thread(timeChecker, "jmsbridge-checker-thread");
batchExpiryTime = System.currentTimeMillis() + maxBatchTime;
@@ -1415,7 +1415,7 @@
//In the case of onMessage we can't close the connection from inside the onMessage method
//since it will block waiting for onMessage to complete. In the case of start we want to return
//from the call before the connections are reestablished so that the caller is not blocked unnecessarily.
- Thread t = new Thread(failureHandler);
+ Thread t = new Thread(failureHandler, "jmsbridge-failurehandler-thread");
t.start();
}
@@ -1504,6 +1504,11 @@
*/
private final class SourceReceiver extends Thread
{
+ SourceReceiver()
+ {
+ super("jmsbridge-source-receiver-thread");
+ }
+
@Override
public void run()
{
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java 2009-11-06 15:32:46 UTC (rev 8241)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java 2009-11-06 15:59:15 UTC (rev 8242)
@@ -12,22 +12,25 @@
*/
package org.hornetq.tests.integration.cluster.failover;
+import java.util.Map;
+
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.HornetQ;
+import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.JournalType;
-import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.HornetQ;
-import java.util.Map;
-
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
* Created Oct 26, 2009
*/
public class GroupingFailoverReplicationTest extends GroupingFailoverTestBase
{
+ private static final Logger log = Logger.getLogger(GroupingFailoverReplicationTest.class);
+
protected void setupReplicatedServer(int node, boolean fileStorage, boolean netty, int backupNode)
{
if (servers[node] != null)
15 years, 1 month