JBoss hornetq SVN: r11922 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-12-21 07:16:06 -0500 (Wed, 21 Dec 2011)
New Revision: 11922
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
Log:
JBPAPP-7628 - ignoring RejectedExecutionException during shutdown
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-12-21 02:53:58 UTC (rev 11921)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-12-21 12:16:06 UTC (rev 11922)
@@ -303,17 +303,24 @@
this.executor = executor;
- checkQueueSizeFuture = scheduledExecutor.scheduleWithFixedDelay(new Runnable()
+ try
{
- public void run()
+ checkQueueSizeFuture = scheduledExecutor.scheduleWithFixedDelay(new Runnable()
{
- // This flag is periodically set to true. This enables the directDeliver flag to be set to true if the queue
- // is empty
- // We don't want to evaluate that on every delivery since that's too expensive
-
- checkDirect = true;
- }
- }, CHECK_QUEUE_SIZE_PERIOD, CHECK_QUEUE_SIZE_PERIOD, TimeUnit.MILLISECONDS);
+ public void run()
+ {
+ // This flag is periodically set to true. This enables the directDeliver flag to be set to true if the queue
+ // is empty
+ // We don't want to evaluate that on every delivery since that's too expensive
+
+ checkDirect = true;
+ }
+ }, CHECK_QUEUE_SIZE_PERIOD, CHECK_QUEUE_SIZE_PERIOD, TimeUnit.MILLISECONDS);
+ }
+ catch (RejectedExecutionException ignored)
+ {
+ // This could happen on a server shutdown
+ }
}
// Bindable implementation -------------------------------------------------------------------------------------
13 years
JBoss hornetq SVN: r11921 - branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-12-20 21:53:58 -0500 (Tue, 20 Dec 2011)
New Revision: 11921
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
Log:
https://issues.jboss.org/browse/JBPAPP-7161 - testDuplicates
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2011-12-21 02:47:09 UTC (rev 11920)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2011-12-21 02:53:58 UTC (rev 11921)
@@ -974,9 +974,19 @@
}
}
}
+
+ public void testSentWithDuplicateIDBridge() throws Exception
+ {
+ internalTestSentWithDuplicateID(true);
+ }
public void testSentWithDuplicateID() throws Exception
{
+ internalTestSentWithDuplicateID(false);
+ }
+
+ private void internalTestSentWithDuplicateID(final boolean isSimulateBridge) throws Exception
+ {
final int messageSize = 3 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
ClientSession session = null;
@@ -1001,7 +1011,14 @@
{
Message clientFile = createLargeClientMessage(session, messageSize, true);
- clientFile.putBytesProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID, someDuplicateInfo.getBytes());
+ if (isSimulateBridge)
+ {
+ clientFile.putBytesProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID, someDuplicateInfo.getBytes());
+ }
+ else
+ {
+ clientFile.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, someDuplicateInfo.getBytes());
+ }
producer.send(clientFile);
}
13 years
JBoss hornetq SVN: r11920 - in branches/Branch_2_2_EAP: tests/src/org/hornetq/tests/integration/client and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-12-20 21:47:09 -0500 (Tue, 20 Dec 2011)
New Revision: 11920
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
Log:
https://issues.jboss.org/browse/JBPAPP-7161 - improving deletion of messages after duplicate detection
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-12-21 02:13:14 UTC (rev 11919)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-12-21 02:47:09 UTC (rev 11920)
@@ -1181,6 +1181,8 @@
{
context.getTransaction().markAsRollbackOnly(new HornetQException(HornetQException.DUPLICATE_ID_REJECTED, warnMessage.toString()));
}
+
+ message.decrementRefCount();
return false;
}
@@ -1196,6 +1198,9 @@
cacheBridge.addToCache(bridgeDupBytes, context.getTransaction());
message.removeProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID);
+
+ message.decrementRefCount();
+
}
byte[] duplicateIDBytes = message.getDuplicateIDBytes();
@@ -1219,6 +1224,8 @@
{
context.getTransaction().markAsRollbackOnly(new HornetQException(HornetQException.DUPLICATE_ID_REJECTED, warnMessage));
}
+
+ message.decrementRefCount();
return false;
}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2011-12-21 02:13:14 UTC (rev 11919)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2011-12-21 02:47:09 UTC (rev 11920)
@@ -30,6 +30,7 @@
import org.hornetq.core.client.impl.ClientConsumerInternal;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.persistence.impl.journal.LargeServerMessageImpl;
import org.hornetq.core.server.HornetQServer;
@@ -73,7 +74,7 @@
{
return false;
}
-
+
/**
*
*/
@@ -92,34 +93,32 @@
public void testRollbackPartiallyConsumedBuffer() throws Exception
{
- for (int i = 0 ; i < 1; i++)
+ for (int i = 0; i < 1; i++)
{
log.info("#test " + i);
internalTestRollbackPartiallyConsumedBuffer(false);
tearDown();
setUp();
-
+
}
-
+
}
-
+
public void testRollbackPartiallyConsumedBufferWithRedeliveryDelay() throws Exception
{
internalTestRollbackPartiallyConsumedBuffer(true);
}
-
-
+
private void internalTestRollbackPartiallyConsumedBuffer(final boolean redeliveryDelay) throws Exception
{
final int messageSize = 100 * 1024;
-
final ClientSession session;
try
{
server = createServer(true, isNetty());
-
+
AddressSettings settings = new AddressSettings();
if (redeliveryDelay)
{
@@ -130,7 +129,7 @@
}
}
settings.setMaxDeliveryAttempts(-1);
-
+
server.getAddressSettingsRepository().addMatch("#", settings);
server.start();
@@ -143,35 +142,36 @@
ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
- for (int i = 0 ; i < 20; i++)
+ for (int i = 0; i < 20; i++)
{
Message clientFile = createLargeClientMessage(session, messageSize, true);
-
+
clientFile.putIntProperty("value", i);
-
+
producer.send(clientFile);
}
session.commit();
session.start();
-
+
final CountDownLatch latch = new CountDownLatch(1);
-
+
final AtomicInteger errors = new AtomicInteger(0);
ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
-
+
consumer.setMessageHandler(new MessageHandler()
{
int counter = 0;
+
public void onMessage(ClientMessage message)
{
message.getBodyBuffer().readByte();
System.out.println("message:" + message);
try
{
- if (counter ++ < 20)
+ if (counter++ < 20)
{
Thread.sleep(100);
System.out.println("Rollback");
@@ -183,7 +183,7 @@
message.acknowledge();
session.commit();
}
-
+
if (counter == 40)
{
latch.countDown();
@@ -197,7 +197,7 @@
}
}
});
-
+
assertTrue(latch.await(40, TimeUnit.SECONDS));
consumer.close();
@@ -975,6 +975,74 @@
}
}
+ public void testSentWithDuplicateID() throws Exception
+ {
+ final int messageSize = 3 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+
+ ClientSession session = null;
+
+ try
+ {
+ server = createServer(true, isNetty());
+
+ server.start();
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ session = sf.createSession(true, true, 0);
+
+ session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, true);
+
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+ String someDuplicateInfo = "Anything";
+
+ for (int i = 0; i < 10; i++)
+ {
+ Message clientFile = createLargeClientMessage(session, messageSize, true);
+
+ clientFile.putBytesProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID, someDuplicateInfo.getBytes());
+
+ producer.send(clientFile);
+ }
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ session.start();
+
+ ClientMessage msg = consumer.receive(10000);
+
+ assertNotNull(msg);
+
+ msg.acknowledge();
+
+ assertNull(consumer.receiveImmediate());
+
+ session.commit();
+
+ validateNoFilesOnLargeDir();
+
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ session.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
public void testResendSmallStreamMessage() throws Exception
{
internalTestResendMessage(50000);
@@ -1729,7 +1797,6 @@
100);
}
-
public void testPageOnLargeMessage() throws Exception
{
testPageOnLargeMessage(true, false);
@@ -2600,7 +2667,7 @@
}
}
}
-
+
// JBPAPP-6237
public void testPageOnLargeMessageMultipleQueues() throws Exception
{
@@ -2754,7 +2821,6 @@
}
-
// JBPAPP-6237
public void testPageOnLargeMessageMultipleQueues2() throws Exception
{
@@ -2796,7 +2862,7 @@
for (int i = 0; i < 100; i++)
{
ClientMessage message = session.createMessage(true);
-
+
message.putIntProperty("msgID", msgId++);
message.putBooleanProperty("TestLarge", false);
@@ -2813,7 +2879,6 @@
producer.send(message);
}
-
for (int i = 0; i < 10; i++)
{
ClientMessage clientFile = createLargeClientMessage(session, numberOfBytesBigMessage);
@@ -2830,34 +2895,34 @@
ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS.concat("-" + ad));
session.start();
-
- for (int received = 0 ; received < 5; received++)
+
+ for (int received = 0; received < 5; received++)
{
for (int i = 0; i < 100; i++)
{
ClientMessage message2 = consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
-
+
Assert.assertNotNull(message2);
-
+
assertFalse(message2.getBooleanProperty("TestLarge"));
-
+
message2.acknowledge();
-
+
Assert.assertNotNull(message2);
}
-
+
for (int i = 0; i < 10; i++)
{
ClientMessage messageLarge = consumer.receive(RECEIVE_WAIT_TIME);
-
+
Assert.assertNotNull(messageLarge);
-
+
assertTrue(messageLarge.getBooleanProperty("TestLarge"));
-
+
ByteArrayOutputStream bout = new ByteArrayOutputStream();
-
+
messageLarge.acknowledge();
-
+
messageLarge.saveToOutputStream(bout);
byte[] body = bout.toByteArray();
assertEquals(numberOfBytesBigMessage, body.length);
@@ -2866,7 +2931,7 @@
assertEquals(getSamplebyte(bi), body[bi]);
}
}
-
+
session.rollback();
}
13 years
JBoss hornetq SVN: r11919 - branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-12-20 21:13:14 -0500 (Tue, 20 Dec 2011)
New Revision: 11919
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TestFlowControlOnIgnoreLargeMessageBodyTest.java
Log:
https://issues.jboss.org/browse/JBPAPP-7706 - fixing test
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TestFlowControlOnIgnoreLargeMessageBodyTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TestFlowControlOnIgnoreLargeMessageBodyTest.java 2011-12-21 01:27:21 UTC (rev 11918)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TestFlowControlOnIgnoreLargeMessageBodyTest.java 2011-12-21 02:13:14 UTC (rev 11919)
@@ -13,6 +13,8 @@
package org.hornetq.tests.integration.client;
+import java.util.concurrent.CountDownLatch;
+
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
@@ -44,7 +46,7 @@
private Topic topic;
- private static int TOTAL_MESSAGES_COUNT = 50000;
+ private static int TOTAL_MESSAGES_COUNT = 20000;
private static int MSG_SIZE = 150 * 1024;
@@ -119,7 +121,7 @@
connection = cf.createConnection();
session = connection.createSession(true, Session.SESSION_TRANSACTED);
prod = session.createProducer(topic);
-
+
prod.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int i = 1; i <= messagesCount && !requestForStop; i++)
@@ -195,8 +197,11 @@
private final int numberOfMessages;
private int receiveTimeout = 0;
+
+ private final CountDownLatch consumerCreated;
- LoadConsumer(final String name,
+ LoadConsumer(final CountDownLatch consumerCreated,
+ final String name,
final Topic topic,
final ConnectionFactory cf,
final int receiveTimeout,
@@ -207,6 +212,7 @@
this.topic = topic;
this.receiveTimeout = receiveTimeout;
this.numberOfMessages = numberOfMessages;
+ this.consumerCreated = consumerCreated;
}
public void sendStopRequest()
@@ -231,12 +237,19 @@
try
{
connection = cf.createConnection();
+
connection.setClientID(getName());
+
connection.start();
+
session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
TopicSubscriber subscriber = session.createDurableSubscriber(topic, getName());
+
+ consumerCreated.countDown();
+
int counter = 0;
- int invalidOrderCounter = 0;
+
while (counter < numberOfMessages && !requestForStop && !error)
{
if (counter == 0)
@@ -252,21 +265,9 @@
else
{
counter++;
- // msg.readBytes(new byte[MSG_SIZE]);
if (msg.getIntProperty(TestFlowControlOnIgnoreLargeMessageBodyTest.ATTR_MSG_COUNTER) != counter)
{
- if (invalidOrderCounter < 10)
- {
- error = true;
- System.out.println("Invalid messages order! expected: " + counter +
- ", received " +
- msg.getIntProperty(TestFlowControlOnIgnoreLargeMessageBodyTest.ATTR_MSG_COUNTER) +
- " " +
- topic +
- " - " +
- getName());
- invalidOrderCounter++;
- }
+ error = true;
}
}
if (counter % 10 == 0)
@@ -336,10 +337,13 @@
TestFlowControlOnIgnoreLargeMessageBodyTest.TOTAL_MESSAGES_COUNT);
LoadConsumer consumers[] = new LoadConsumer[CONSUMERS_COUNT];
+
+ CountDownLatch latch = new CountDownLatch(CONSUMERS_COUNT);
for (int i = 0; i < consumers.length; i++)
{
- consumers[i] = new LoadConsumer("consumer " + i,
+ consumers[i] = new LoadConsumer(latch,
+ "consumer " + i,
topic,
cf,
receiveTimeout,
@@ -350,6 +354,9 @@
{
consumer.start();
}
+
+ latch.await();
+
producer.start();
producer.join();
for (LoadConsumer consumer : consumers)
@@ -383,7 +390,7 @@
{
System.out.println(" OK ");
}
-
+
assertFalse(error);
}
catch (Exception e)
13 years
JBoss hornetq SVN: r11918 - branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-12-20 20:27:21 -0500 (Tue, 20 Dec 2011)
New Revision: 11918
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TestFlowControlOnIgnoreLargeMessageBodyTest.java
Log:
https://issues.jboss.org/browse/JBPAPP-7706 - small change on test
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TestFlowControlOnIgnoreLargeMessageBodyTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TestFlowControlOnIgnoreLargeMessageBodyTest.java 2011-12-20 22:17:53 UTC (rev 11917)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TestFlowControlOnIgnoreLargeMessageBodyTest.java 2011-12-21 01:27:21 UTC (rev 11918)
@@ -52,7 +52,7 @@
private static final String ATTR_MSG_COUNTER = "msgIdex";
- protected int receiveTimeout = 2000;
+ protected int receiveTimeout = 10000;
private volatile boolean error = false;
@@ -146,6 +146,7 @@
}
catch (Exception e)
{
+ error = true;
e.printStackTrace();
}
finally
@@ -256,6 +257,7 @@
{
if (invalidOrderCounter < 10)
{
+ error = true;
System.out.println("Invalid messages order! expected: " + counter +
", received " +
msg.getIntProperty(TestFlowControlOnIgnoreLargeMessageBodyTest.ATTR_MSG_COUNTER) +
@@ -366,7 +368,7 @@
{
if (consumer.getReceivedMessages() != TestFlowControlOnIgnoreLargeMessageBodyTest.TOTAL_MESSAGES_COUNT)
{
- errorMessage = "Producer did not send defined count of messages";
+ errorMessage = "Consumer did not send defined count of messages";
break;
}
}
@@ -381,6 +383,8 @@
{
System.out.println(" OK ");
}
+
+ assertFalse(error);
}
catch (Exception e)
{
13 years
JBoss hornetq SVN: r11917 - in branches/Branch_2_2_EAP: tests/src/org/hornetq/tests/integration/client and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-12-20 17:17:53 -0500 (Tue, 20 Dec 2011)
New Revision: 11917
Added:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TestFlowControlOnIgnoreLargeMessageBodyTest.java
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
Log:
https://issues.jboss.org/browse/JBPAPP-7706 - fixing client flow control on large message
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2011-12-20 20:06:45 UTC (rev 11916)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2011-12-20 22:17:53 UTC (rev 11917)
@@ -335,7 +335,7 @@
if (expired)
{
m.discardBody();
-
+
session.expire(id, m.getMessageID());
if (clientWindowSize == 0)
@@ -596,10 +596,10 @@
// Flow control for the first packet, we will have others
- flowControl(packet.getPacketSize(), false);
-
ClientLargeMessageInternal currentChunkMessage = (ClientLargeMessageInternal)packet.getLargeMessage();
+ currentChunkMessage.setFlowControlSize(packet.getPacketSize());
+
currentChunkMessage.setDeliveryCount(packet.getDeliveryCount());
File largeMessageCache = null;
@@ -622,8 +622,6 @@
currentChunkMessage.setLargeMessageController(currentLargeMessageController);
}
- currentChunkMessage.setFlowControlSize(0);
-
handleMessage(currentChunkMessage);
}
@@ -756,11 +754,6 @@
{
creditsToSend += messageBytes;
- if (log.isTraceEnabled())
- {
- log.trace(this + "::FlowControl::creditsToSend=" + creditsToSend + ", clientWindowSize = " + clientWindowSize + " messageBytes = " + messageBytes);
- }
-
if (creditsToSend >= clientWindowSize)
{
if (clientWindowSize == 0 && discountSlowConsumer)
@@ -783,9 +776,9 @@
}
else
{
- if (ClientConsumerImpl.trace)
+ if (log.isDebugEnabled())
{
- ClientConsumerImpl.log.trace("Sending " + messageBytes + " from flow-control");
+ ClientConsumerImpl.log.debug("Sending " + messageBytes + " from flow-control");
}
final int credits = creditsToSend;
@@ -1013,7 +1006,8 @@
// Chunk messages will execute the flow control while receiving the chunks
if (message.getFlowControlSize() != 0)
{
- flowControl(message.getFlowControlSize(), true);
+ // on large messages we should discount 1 on the first packets as we need continuity until the last packet
+ flowControl(message.getFlowControlSize(), !message.isLargeMessage());
}
}
Added: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TestFlowControlOnIgnoreLargeMessageBodyTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TestFlowControlOnIgnoreLargeMessageBodyTest.java (rev 0)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TestFlowControlOnIgnoreLargeMessageBodyTest.java 2011-12-20 22:17:53 UTC (rev 11917)
@@ -0,0 +1,405 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+import javax.naming.Context;
+import javax.naming.NamingException;
+
+import org.hornetq.core.logging.Logger;
+import org.hornetq.tests.util.JMSTestBase;
+
+/**
+ *
+ * A TestFlowControlOnIgnoreLargeMessageBodyTest
+ *
+ * @author clebertsuconic
+ * @author Pavel Slavice
+ *
+ *
+ */
+public class TestFlowControlOnIgnoreLargeMessageBodyTest extends JMSTestBase
+{
+
+ Logger log = Logger.getLogger(TestFlowControlOnIgnoreLargeMessageBodyTest.class);
+
+ private Topic topic;
+
+ private static int TOTAL_MESSAGES_COUNT = 50000;
+
+ private static int MSG_SIZE = 150 * 1024;
+
+ private final int CONSUMERS_COUNT = 5;
+
+ private static final String ATTR_MSG_COUNTER = "msgIdex";
+
+ protected int receiveTimeout = 2000;
+
+ private volatile boolean error = false;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ jmsServer.createTopic(true, "topicIn", "/topic/topicIn");
+ topic = (Topic)context.lookup("/topic/topicIn");
+ }
+
+ @Override
+ protected boolean usePersistence()
+ {
+ return false;
+ }
+
+ /**
+ * LoadProducer
+ */
+ class LoadProducer extends Thread
+ {
+ private final ConnectionFactory cf;
+
+ private final Topic topic;
+
+ private final int messagesCount;
+
+ private volatile boolean requestForStop = false;
+
+ private volatile boolean stopped = false;
+
+ private volatile int sentMessages = 0;
+
+ LoadProducer(final String name, final Topic topic, final ConnectionFactory cf, final int messagesCount) throws Exception
+ {
+ super(name);
+ this.cf = cf;
+ this.topic = topic;
+ this.messagesCount = messagesCount;
+ }
+
+ public void sendStopRequest()
+ {
+ stopped = false;
+ requestForStop = true;
+ }
+
+ public boolean isStopped()
+ {
+ return stopped;
+ }
+
+ @Override
+ public void run()
+ {
+ stopped = false;
+ Connection connection = null;
+ Session session = null;
+ MessageProducer prod;
+ log.info("Starting producer for " + topic + " - " + getName());
+ try
+ {
+ connection = cf.createConnection();
+ session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ prod = session.createProducer(topic);
+
+ prod.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ for (int i = 1; i <= messagesCount && !requestForStop; i++)
+ {
+ if (error)
+ {
+ break;
+ }
+ sentMessages++;
+ BytesMessage msg = session.createBytesMessage();
+ msg.setIntProperty(TestFlowControlOnIgnoreLargeMessageBodyTest.ATTR_MSG_COUNTER, i);
+ msg.writeBytes(new byte[TestFlowControlOnIgnoreLargeMessageBodyTest.MSG_SIZE]);
+ prod.send(msg);
+ if (i % 10 == 0)
+ {
+ session.commit();
+ }
+ if (i % 100 == 0)
+ {
+ log.info("Address " + topic + " sent " + i + " messages");
+ }
+ }
+ System.out.println("Ending producer for " + topic + " - " + getName() + " messages " + sentMessages);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ finally
+ {
+ try
+ {
+ session.commit();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ try
+ {
+ connection.close();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ stopped = true;
+ }
+
+ public int getSentMessages()
+ {
+ return sentMessages;
+ }
+ }
+
+ /**
+ * LoadConsumer
+ */
+ class LoadConsumer extends Thread
+ {
+ private final ConnectionFactory cf;
+
+ private final Topic topic;
+
+ private volatile boolean requestForStop = false;
+
+ private volatile boolean stopped = false;
+
+ private volatile int receivedMessages = 0;
+
+ private final int numberOfMessages;
+
+ private int receiveTimeout = 0;
+
+ LoadConsumer(final String name,
+ final Topic topic,
+ final ConnectionFactory cf,
+ final int receiveTimeout,
+ final int numberOfMessages)
+ {
+ super(name);
+ this.cf = cf;
+ this.topic = topic;
+ this.receiveTimeout = receiveTimeout;
+ this.numberOfMessages = numberOfMessages;
+ }
+
+ public void sendStopRequest()
+ {
+ stopped = false;
+ requestForStop = true;
+ }
+
+ public boolean isStopped()
+ {
+ return stopped;
+ }
+
+ @Override
+ public void run()
+ {
+ Connection connection = null;
+ Session session = null;
+ stopped = false;
+ requestForStop = false;
+ System.out.println("Starting consumer for " + topic + " - " + getName());
+ try
+ {
+ connection = cf.createConnection();
+ connection.setClientID(getName());
+ connection.start();
+ session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ TopicSubscriber subscriber = session.createDurableSubscriber(topic, getName());
+ int counter = 0;
+ int invalidOrderCounter = 0;
+ while (counter < numberOfMessages && !requestForStop && !error)
+ {
+ if (counter == 0)
+ {
+ System.out.println("Starting to consume for " + topic + " - " + getName());
+ }
+ BytesMessage msg = (BytesMessage)subscriber.receive(receiveTimeout);
+ if (msg == null)
+ {
+ System.out.println("Cannot get message in specified timeout: " + topic + " - " + getName());
+ error = true;
+ }
+ else
+ {
+ counter++;
+ // msg.readBytes(new byte[MSG_SIZE]);
+ if (msg.getIntProperty(TestFlowControlOnIgnoreLargeMessageBodyTest.ATTR_MSG_COUNTER) != counter)
+ {
+ if (invalidOrderCounter < 10)
+ {
+ System.out.println("Invalid messages order! expected: " + counter +
+ ", received " +
+ msg.getIntProperty(TestFlowControlOnIgnoreLargeMessageBodyTest.ATTR_MSG_COUNTER) +
+ " " +
+ topic +
+ " - " +
+ getName());
+ invalidOrderCounter++;
+ }
+ }
+ }
+ if (counter % 10 == 0)
+ {
+ session.commit();
+ }
+ if (counter % 100 == 0)
+ {
+ log.info("## " + getName() + " " + topic + " received " + counter);
+ }
+ receivedMessages = counter;
+ }
+ session.commit();
+ }
+ catch (Exception e)
+ {
+ System.out.println("Exception in consumer " + getName() + " : " + e.getMessage());
+ e.printStackTrace();
+ }
+ finally
+ {
+ if (session != null)
+ {
+ try
+ {
+ session.close();
+ }
+ catch (JMSException e)
+ {
+ System.err.println("Cannot close session " + e.getMessage());
+ }
+ }
+ if (connection != null)
+ {
+ try
+ {
+ connection.close();
+ }
+ catch (JMSException e)
+ {
+ System.err.println("Cannot close connection " + e.getMessage());
+ }
+ }
+ }
+ stopped = true;
+ System.out.println("Stopping consumer for " + topic +
+ " - " +
+ getName() +
+ ", received " +
+ getReceivedMessages());
+ }
+
+ public int getReceivedMessages()
+ {
+ return receivedMessages;
+ }
+ }
+
+ public void testFlowControl()
+ {
+ Context context = null;
+ try
+ {
+ LoadProducer producer = new LoadProducer("producer",
+ topic,
+ cf,
+ TestFlowControlOnIgnoreLargeMessageBodyTest.TOTAL_MESSAGES_COUNT);
+
+ LoadConsumer consumers[] = new LoadConsumer[CONSUMERS_COUNT];
+
+ for (int i = 0; i < consumers.length; i++)
+ {
+ consumers[i] = new LoadConsumer("consumer " + i,
+ topic,
+ cf,
+ receiveTimeout,
+ TestFlowControlOnIgnoreLargeMessageBodyTest.TOTAL_MESSAGES_COUNT);
+ }
+
+ for (LoadConsumer consumer : consumers)
+ {
+ consumer.start();
+ }
+ producer.start();
+ producer.join();
+ for (LoadConsumer consumer : consumers)
+ {
+ consumer.join();
+ }
+
+ String errorMessage = null;
+ if (producer.getSentMessages() != TestFlowControlOnIgnoreLargeMessageBodyTest.TOTAL_MESSAGES_COUNT)
+ {
+ errorMessage = "Producer did not send defined count of messages";
+ }
+ else
+ {
+ for (LoadConsumer consumer : consumers)
+ {
+ if (consumer.getReceivedMessages() != TestFlowControlOnIgnoreLargeMessageBodyTest.TOTAL_MESSAGES_COUNT)
+ {
+ errorMessage = "Producer did not send defined count of messages";
+ break;
+ }
+ }
+ }
+
+ if (errorMessage != null)
+ {
+ System.err.println(" ERROR ERROR ERROR ERROR ERROR ERROR ERROR ERROR ERROR ");
+ System.err.println(errorMessage);
+ }
+ else
+ {
+ System.out.println(" OK ");
+ }
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
+ finally
+ {
+ if (context != null)
+ {
+ try
+ {
+ context.close();
+ }
+ catch (NamingException ex)
+ {
+ log.warn(ex.getMessage(), ex);
+ }
+ }
+ }
+ }
+
+}
13 years
JBoss hornetq SVN: r11916 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl.
by do-not-reply@jboss.org
Author: ataylor
Date: 2011-12-20 15:06:45 -0500 (Tue, 20 Dec 2011)
New Revision: 11916
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
Log:
disable flow control on bridges
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-12-20 14:02:24 UTC (rev 11915)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-12-20 20:06:45 UTC (rev 11916)
@@ -471,6 +471,8 @@
serverLocator.setBlockOnDurableSend(!config.isUseDuplicateDetection());
serverLocator.setBlockOnNonDurableSend(!config.isUseDuplicateDetection());
serverLocator.setMinLargeMessageSize(config.getMinLargeMessageSize());
+ //disable flow control
+ serverLocator.setProducerWindowSize(-1);
// This will be set to 30s unless it's changed from embedded / testing
// there is no reason to exception the config for this timeout
13 years
JBoss hornetq SVN: r11915 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-12-20 09:02:24 -0500 (Tue, 20 Dec 2011)
New Revision: 11915
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
Log:
JBPAPP-7792 - avoiding a NPE
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-12-19 17:57:47 UTC (rev 11914)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-12-20 14:02:24 UTC (rev 11915)
@@ -562,12 +562,20 @@
public void onConnection(ClientSessionFactoryInternal sf)
{
TopologyMember localMember = getLocalMember();
- sf.sendNodeAnnounce(localMember.getUniqueEventID(),
- manager.getNodeId(),
- false,
- localMember.getConnector().getA(),
- localMember.getConnector().getB());
+ if (localMember != null)
+ {
+ sf.sendNodeAnnounce(localMember.getUniqueEventID(),
+ manager.getNodeId(),
+ false,
+ localMember.getConnector().getA(),
+ localMember.getConnector().getB());
+ }
+ else
+ {
+ log.warn("LocalMember is not set at on ClusterConnection " + this);
+ }
+ // TODO: shouldn't we send the current time here? and change the current topology?
// sf.sendNodeAnnounce(System.currentTimeMillis(),
// manager.getNodeId(),
// false,
13 years
JBoss hornetq SVN: r11914 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/server/impl and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-12-19 12:57:47 -0500 (Mon, 19 Dec 2011)
New Revision: 11914
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java
Log:
https://issues.jboss.org/browse/JBPAPP-7161 - improving things
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-12-19 14:51:20 UTC (rev 11913)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-12-19 17:57:47 UTC (rev 11914)
@@ -1276,7 +1276,7 @@
{
if (msg.getRefCount() == 0)
{
- JournalStorageManager.log.debug("Large message: " + msg.getMessageID() +
+ JournalStorageManager.log.info("Large message: " + msg.getMessageID() +
" didn't have any associated reference, file will be deleted");
msg.decrementDelayDeletionCount();
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-12-19 14:51:20 UTC (rev 11913)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-12-19 17:57:47 UTC (rev 11914)
@@ -1187,6 +1187,15 @@
return "HornetQServerImpl::" + (nodeManager != null ? "serverUUID=" + nodeManager.getUUID() : "");
}
}
+
+ /**
+ * For tests only, don't use this method as it's not part of the API
+ * @param factory
+ */
+ public void replaceQueueFactory(QueueFactory factory)
+ {
+ this.queueFactory = factory;
+ }
// Package protected
// ----------------------------------------------------------------------------
@@ -1198,26 +1207,6 @@
* Protected so tests can change this behaviour
* @param backupConnector
*/
- // protected FailoverManagerImpl createBackupConnectionFailoverManager(final TransportConfiguration backupConnector,
- // final ExecutorService threadPool,
- // final ScheduledExecutorService scheduledPool)
- // {
- // return new FailoverManagerImpl((ClientSessionFactory)null,
- // backupConnector,
- // null,
- // false,
- // HornetQClient.DEFAULT_CALL_TIMEOUT,
- // HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
- // HornetQClient.DEFAULT_CONNECTION_TTL,
- // 0,
- // 1.0d,
- // 0,
- // 1,
- // false,
- // threadPool,
- // scheduledPool,
- // null);
- // }
protected PagingManager createPagingManager()
{
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-12-19 14:51:20 UTC (rev 11913)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-12-19 17:57:47 UTC (rev 11914)
@@ -2203,7 +2203,8 @@
return status;
}
- private void postAcknowledge(final MessageReference ref)
+ // Protected as testcases may change this behaviour
+ protected void postAcknowledge(final MessageReference ref)
{
QueueImpl queue = (QueueImpl)ref.getQueue();
@@ -2219,6 +2220,15 @@
boolean durableRef = message.isDurable() && queue.durable;
+ try
+ {
+ message.decrementRefCount();
+ }
+ catch (Exception e)
+ {
+ QueueImpl.log.warn("Unable to decrement reference counting", e);
+ }
+
if (durableRef)
{
int count = message.decrementDurableRefCount();
@@ -2250,15 +2260,6 @@
}
}
}
-
- try
- {
- message.decrementRefCount();
- }
- catch (Exception e)
- {
- QueueImpl.log.warn("Unable to decrement reference counting", e);
- }
}
void postRollback(final LinkedList<MessageReference> refs)
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java 2011-12-19 14:51:20 UTC (rev 11913)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java 2011-12-19 17:57:47 UTC (rev 11914)
@@ -13,7 +13,10 @@
package org.hornetq.tests.integration.client;
+import java.io.IOException;
import java.util.HashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
@@ -29,14 +32,25 @@
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.filter.Filter;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.paging.cursor.PageSubscription;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
+import org.hornetq.core.server.MessageReference;
+import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.QueueFactory;
import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.impl.HornetQServerImpl;
+import org.hornetq.core.server.impl.QueueImpl;
import org.hornetq.core.server.impl.ServerSessionImpl;
+import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.tests.integration.largemessage.LargeMessageTestBase;
+import org.hornetq.utils.ExecutorFactory;
/**
* A LargeMessageTest
@@ -128,9 +142,9 @@
}
server.stop(false);
-
+
forceGC();
-
+
server.start();
server.stop();
@@ -254,8 +268,12 @@
try
{
- server = createServer(true, createDefaultConfig(isNetty()), 10000, 20000, new HashMap<String, AddressSettings>());
-
+ server = createServer(true,
+ createDefaultConfig(isNetty()),
+ 10000,
+ 20000,
+ new HashMap<String, AddressSettings>());
+
// server.getConfiguration()
// .getInterceptorClassNames()
// .add(LargeMessageTestInterceptorIgnoreLastPacket.class.getName());
@@ -270,7 +288,7 @@
session = sf.createSession(false, true, true);
session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, true);
-
+
server.getPagingManager().getPageStore(ADDRESS).startPaging();
ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
@@ -282,19 +300,19 @@
producer.send(clientFile);
}
session.commit();
-
+
validateNoFilesOnLargeDir(10);
for (int h = 0; h < 5; h++)
{
session.close();
-
+
sf.close();
-
+
server.stop();
-
+
server.start();
-
+
sf = locator.createSessionFactory();
session = sf.createSession(false, false);
@@ -321,11 +339,11 @@
{
session.rollback();
}
-
+
session.close();
sf.close();
}
-
+
server.stop(false);
server.start();
@@ -363,8 +381,12 @@
try
{
- server = createServer(true, createDefaultConfig(isNetty()), 10000, 20000, new HashMap<String, AddressSettings>());
-
+ server = createServer(true,
+ createDefaultConfig(isNetty()),
+ 10000,
+ 20000,
+ new HashMap<String, AddressSettings>());
+
// server.getConfiguration()
// .getInterceptorClassNames()
// .add(LargeMessageTestInterceptorIgnoreLastPacket.class.getName());
@@ -377,7 +399,7 @@
ClientSessionFactory sf = locator.createSessionFactory();
session = sf.createSession(true, false, false);
-
+
Xid xid1 = newXID();
Xid xid2 = newXID();
@@ -394,13 +416,11 @@
producer.send(clientFile);
}
session.end(xid1, XAResource.TMSUCCESS);
-
+
session.prepare(xid1);
-
session.start(xid2, XAResource.TMNOFLAGS);
-
for (int i = 0; i < 10; i++)
{
Message clientFile = createLargeClientMessage(session, messageSize, true);
@@ -409,32 +429,32 @@
producer.send(clientFile);
}
session.end(xid2, XAResource.TMSUCCESS);
-
+
session.prepare(xid2);
-
+
session.close();
sf.close();
-
+
server.stop(false);
server.start();
-
- for (int start = 0 ; start < 2; start++)
+
+ for (int start = 0; start < 2; start++)
{
System.out.println("Start " + start);
-
+
sf = locator.createSessionFactory();
-
+
if (start == 0)
{
session = sf.createSession(true, false, false);
session.commit(xid1, false);
session.close();
}
-
+
session = sf.createSession(false, false, false);
ClientConsumer cons1 = session.createConsumer(ADDRESS);
session.start();
- for (int i = 0 ; i < 10; i++)
+ for (int i = 0; i < 10; i++)
{
log.info("I = " + i);
ClientMessage msg = cons1.receive(5000);
@@ -442,7 +462,7 @@
assertEquals(1, msg.getIntProperty("txid").intValue());
msg.acknowledge();
}
-
+
if (start == 1)
{
session.commit();
@@ -451,26 +471,26 @@
{
session.rollback();
}
-
+
session.close();
sf.close();
-
+
server.stop();
server.start();
}
server.stop();
-
+
validateNoFilesOnLargeDir(10);
-
+
server.start();
sf = locator.createSessionFactory();
-
+
session = sf.createSession(true, false, false);
session.rollback(xid2);
-
+
sf.close();
-
+
server.stop();
server.start();
server.stop();
@@ -497,6 +517,296 @@
}
}
+ public void testRestartBeforeDelete() throws Exception
+ {
+
+ class NoPostACKQueue extends QueueImpl
+ {
+
+ public NoPostACKQueue(long id,
+ SimpleString address,
+ SimpleString name,
+ Filter filter,
+ PageSubscription pageSubscription,
+ boolean durable,
+ boolean temporary,
+ ScheduledExecutorService scheduledExecutor,
+ PostOffice postOffice,
+ StorageManager storageManager,
+ HierarchicalRepository<AddressSettings> addressSettingsRepository,
+ Executor executor)
+ {
+ super(id,
+ address,
+ name,
+ filter,
+ pageSubscription,
+ durable,
+ temporary,
+ scheduledExecutor,
+ postOffice,
+ storageManager,
+ addressSettingsRepository,
+ executor);
+ }
+
+ protected void postAcknowledge(final MessageReference ref)
+ {
+ System.out.println("Ignoring postACK on message " + ref);
+ }
+ }
+
+ class NoPostACKQueueFactory implements QueueFactory
+ {
+
+ final StorageManager storageManager;
+
+ final PostOffice postOffice;
+
+ final ScheduledExecutorService scheduledExecutor;
+
+ final HierarchicalRepository<AddressSettings> addressSettingsRepository;
+
+ final ExecutorFactory execFactory;
+
+ public NoPostACKQueueFactory(StorageManager storageManager,
+ PostOffice postOffice,
+ ScheduledExecutorService scheduledExecutor,
+ HierarchicalRepository<AddressSettings> addressSettingsRepository,
+ final ExecutorFactory execFactory)
+ {
+ this.storageManager = storageManager;
+ this.postOffice = postOffice;
+ this.scheduledExecutor = scheduledExecutor;
+ this.addressSettingsRepository = addressSettingsRepository;
+ this.execFactory = execFactory;
+ }
+
+ public Queue createQueue(long persistenceID,
+ SimpleString address,
+ SimpleString name,
+ Filter filter,
+ PageSubscription pageSubscription,
+ boolean durable,
+ boolean temporary)
+ {
+
+ return new NoPostACKQueue(persistenceID,
+ address,
+ name,
+ filter,
+ pageSubscription,
+ durable,
+ temporary,
+ scheduledExecutor,
+ postOffice,
+ storageManager,
+ addressSettingsRepository,
+ execFactory.getExecutor());
+// return new QueueImpl(persistenceID,
+// address,
+// name,
+// filter,
+// pageSubscription,
+// durable,
+// temporary,
+// scheduledExecutor,
+// postOffice,
+// storageManager,
+// addressSettingsRepository,
+// execFactory.getExecutor());
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.QueueFactory#setPostOffice(org.hornetq.core.postoffice.PostOffice)
+ */
+ public void setPostOffice(PostOffice postOffice)
+ {
+ }
+
+ }
+ final int messageSize = 3 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+
+ ClientSession session = null;
+
+ LargeMessageTestInterceptorIgnoreLastPacket.interruptMessages = false;
+
+ try
+ {
+ server = createServer(true, isNetty());
+ server.start();
+
+ QueueFactory original = server.getQueueFactory();
+
+ ((HornetQServerImpl)server).replaceQueueFactory(new NoPostACKQueueFactory(server.getStorageManager(),
+ server.getPostOffice(),
+ server.getScheduledPool(),
+ server.getAddressSettingsRepository(),
+ server.getExecutorFactory()));
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ session = sf.createSession(false, true, true);
+
+ session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, true);
+
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+ for (int i = 0; i < 10; i++)
+ {
+ Message clientFile = createLargeClientMessage(session, messageSize, true);
+
+ producer.send(clientFile);
+ }
+ session.commit();
+
+ session.close();
+
+ session = sf.createSession(false, false);
+
+ ClientConsumer cons = session.createConsumer(LargeMessageTest.ADDRESS);
+
+ session.start();
+
+ for (int i = 0; i < 10; i++)
+ {
+ ClientMessage msg = cons.receive(5000);
+ assertNotNull(msg);
+ msg.saveToOutputStream(new java.io.OutputStream()
+ {
+ @Override
+ public void write(int b) throws IOException
+ {
+ }
+ });
+ msg.acknowledge();
+ session.commit();
+ }
+
+ ((HornetQServerImpl)server).replaceQueueFactory(original);
+ server.stop(false);
+ server.start();
+
+ server.stop();
+
+ validateNoFilesOnLargeDir();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ session.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
+ public void testConsumeAfterRestart() throws Exception
+ {
+ final int messageSize = 3 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+
+ ClientSession session = null;
+
+ LargeMessageTestInterceptorIgnoreLastPacket.interruptMessages = false;
+
+ try
+ {
+ server = createServer(true, isNetty());
+ server.start();
+
+ QueueFactory original = server.getQueueFactory();
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ session = sf.createSession(false, true, true);
+
+ session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, true);
+
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+ for (int i = 0; i < 10; i++)
+ {
+ Message clientFile = createLargeClientMessage(session, messageSize, true);
+
+ producer.send(clientFile);
+ }
+ session.commit();
+
+ session.close();
+ sf.close();
+
+ server.stop();
+ server.start();
+
+ sf = locator.createSessionFactory();
+
+ session = sf.createSession(false, false);
+
+ ClientConsumer cons = session.createConsumer(LargeMessageTest.ADDRESS);
+
+ session.start();
+
+ for (int i = 0; i < 10; i++)
+ {
+ ClientMessage msg = cons.receive(5000);
+ assertNotNull(msg);
+ msg.saveToOutputStream(new java.io.OutputStream()
+ {
+ @Override
+ public void write(int b) throws IOException
+ {
+ }
+ });
+ msg.acknowledge();
+ session.commit();
+ }
+
+ ((HornetQServerImpl)server).replaceQueueFactory(original);
+ server.stop(false);
+ server.start();
+
+ server.stop();
+
+ validateNoFilesOnLargeDir();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ session.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
public static class LargeMessageTestInterceptorIgnoreLastPacket implements Interceptor
{
13 years
JBoss hornetq SVN: r11913 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-12-19 09:51:20 -0500 (Mon, 19 Dec 2011)
New Revision: 11913
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
Log:
https://issues.jboss.org/browse/JBPAPP-7785 - protecting factories usage - simple fix
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-12-19 14:33:24 UTC (rev 11912)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-12-19 14:51:20 UTC (rev 11913)
@@ -689,7 +689,10 @@
private void removeFromConnecting(ClientSessionFactoryInternal factory)
{
- connectingFactories.remove(factory);
+ synchronized (connectingFactories)
+ {
+ connectingFactories.remove(factory);
+ }
}
private void addToConnecting(ClientSessionFactoryInternal factory)
13 years