JBoss hornetq SVN: r11932 - in branches/Branch_2_2_EAP: hornetq-rest and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-12-22 08:55:33 -0500 (Thu, 22 Dec 2011)
New Revision: 11932
Modified:
branches/Branch_2_2_EAP/build-maven.xml
branches/Branch_2_2_EAP/hornetq-rest/pom.xml
branches/Branch_2_2_EAP/src/config/common/hornetq-version.properties
Log:
Version changes
Modified: branches/Branch_2_2_EAP/build-maven.xml
===================================================================
--- branches/Branch_2_2_EAP/build-maven.xml 2011-12-22 13:52:54 UTC (rev 11931)
+++ branches/Branch_2_2_EAP/build-maven.xml 2011-12-22 13:55:33 UTC (rev 11932)
@@ -13,7 +13,7 @@
-->
<project default="upload" name="HornetQ">
- <property name="hornetq.version" value="2.2.8.EAP.GA"/>
+ <property name="hornetq.version" value="2.2.10.EAP.GA"/>
<property name="build.dir" value="build"/>
<property name="jars.dir" value="${build.dir}/jars"/>
Modified: branches/Branch_2_2_EAP/hornetq-rest/pom.xml
===================================================================
--- branches/Branch_2_2_EAP/hornetq-rest/pom.xml 2011-12-22 13:52:54 UTC (rev 11931)
+++ branches/Branch_2_2_EAP/hornetq-rest/pom.xml 2011-12-22 13:55:33 UTC (rev 11932)
@@ -10,7 +10,7 @@
<properties>
<resteasy.version>2.0.1.GA</resteasy.version>
- <hornetq.version>2.2.8.EAP.GA</hornetq.version>
+ <hornetq.version>2.2.10.EAP.GA</hornetq.version>
</properties>
<licenses>
Modified: branches/Branch_2_2_EAP/src/config/common/hornetq-version.properties
===================================================================
--- branches/Branch_2_2_EAP/src/config/common/hornetq-version.properties 2011-12-22 13:52:54 UTC (rev 11931)
+++ branches/Branch_2_2_EAP/src/config/common/hornetq-version.properties 2011-12-22 13:55:33 UTC (rev 11932)
@@ -1,4 +1,4 @@
-hornetq.version.versionName=HQ_2_2_8_EAP_GA
+hornetq.version.versionName=HQ_2_2_10_EAP_GA
hornetq.version.majorVersion=2
hornetq.version.minorVersion=2
hornetq.version.microVersion=8
13 years
JBoss hornetq SVN: r11931 - branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/config/common.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-12-22 08:52:54 -0500 (Thu, 22 Dec 2011)
New Revision: 11931
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/config/common/hornetq-version.properties
Log:
version name change
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/config/common/hornetq-version.properties
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/config/common/hornetq-version.properties 2011-12-22 13:41:59 UTC (rev 11930)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/config/common/hornetq-version.properties 2011-12-22 13:52:54 UTC (rev 11931)
@@ -1,4 +1,4 @@
-hornetq.version.versionName=HQ_2_2_5_GA_EAP_JBPAPP-7710
+hornetq.version.versionName=HQ_2_2_5_GA_EAP_JBPAPP-7710_Build3
hornetq.version.majorVersion=2
hornetq.version.minorVersion=2
hornetq.version.microVersion=5
13 years
JBoss hornetq SVN: r11930 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-12-22 08:41:59 -0500 (Thu, 22 Dec 2011)
New Revision: 11930
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
Log:
I see dead code
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java 2011-12-22 13:37:04 UTC (rev 11929)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java 2011-12-22 13:41:59 UTC (rev 11930)
@@ -89,21 +89,6 @@
}
/* (non-Javadoc)
- * @see org.hornetq.core.server.LargeServerMessage#getLinkedMessage()
- */
- public LargeServerMessage getLinkedMessage()
- {
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.LargeServerMessage#setLinkedMessage(org.hornetq.core.server.LargeServerMessage)
- */
- public void setLinkedMessage(final LargeServerMessage message)
- {
- }
-
- /* (non-Javadoc)
* @see org.hornetq.core.server.LargeServerMessage#isComplete()
*/
public boolean isComplete()
13 years
JBoss hornetq SVN: r11929 - in branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710: src/main/org/hornetq/core/server and 2 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-12-22 08:37:04 -0500 (Thu, 22 Dec 2011)
New Revision: 11929
Added:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/integration/client/ExpiryLargeMessageTest.java
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/LargeServerMessage.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
Log:
backporting JBPAPP-7809 into JBPAPP-7710
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-12-22 13:32:53 UTC (rev 11928)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-12-22 13:37:04 UTC (rev 11929)
@@ -1680,23 +1680,25 @@
if (largeMessage.containsProperty(Message.HDR_ORIG_MESSAGE_ID))
{
+ // for compatibility: couple with old behaviour, copying the old file to avoid message loss
long originalMessageID = largeMessage.getLongProperty(Message.HDR_ORIG_MESSAGE_ID);
-
- LargeServerMessage originalMessage = (LargeServerMessage)messages.get(originalMessageID);
-
- if (originalMessage == null)
+
+ SequentialFile currentFile = createFileForLargeMessage(largeMessage.getMessageID(), true);
+
+ if (!currentFile.exists())
{
- // this could happen if the message was deleted but the file still exists as the file still being used
- originalMessage = createLargeMessage();
- originalMessage.setDurable(true);
- originalMessage.setMessageID(originalMessageID);
- messages.put(originalMessageID, originalMessage);
+ SequentialFile linkedFile = createFileForLargeMessage(originalMessageID, true);
+ if (linkedFile.exists())
+ {
+ linkedFile.copyTo(currentFile);
+ linkedFile.close();
+ }
}
+
+ currentFile.close();
+ }
- originalMessage.incrementDelayDeletionCount();
- largeMessage.setLinkedMessage(originalMessage);
- }
return largeMessage;
}
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2011-12-22 13:32:53 UTC (rev 11928)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2011-12-22 13:37:04 UTC (rev 11929)
@@ -48,8 +48,6 @@
// Attributes ----------------------------------------------------
private final JournalStorageManager storageManager;
-
- private LargeServerMessage linkMessage;
private boolean paged;
@@ -78,7 +76,6 @@
private LargeServerMessageImpl(final LargeServerMessageImpl copy, TypedProperties properties, final SequentialFile fileCopy, final long newID)
{
super(copy, properties);
- linkMessage = copy;
storageManager = copy.storageManager;
file = fileCopy;
bodySize = copy.bodySize;
@@ -155,18 +152,28 @@
public synchronized void incrementDelayDeletionCount()
{
delayDeletionCount.incrementAndGet();
+ try
+ {
+ incrementRefCount();
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
}
public synchronized void decrementDelayDeletionCount() throws Exception
{
int count = delayDeletionCount.decrementAndGet();
+
+ decrementRefCount();
if (count == 0)
{
checkDelete();
}
}
-
+
@Override
public BodyEncoder getBodyEncoder() throws HornetQException
{
@@ -178,27 +185,19 @@
{
if (getRefCount() <= 0)
{
- if (linkMessage != null)
+ if (LargeServerMessageImpl.isTrace)
{
- // This file is linked to another message, deleting the reference where it belongs on this case
- linkMessage.decrementDelayDeletionCount();
+ LargeServerMessageImpl.log.trace("Deleting file " + file + " as the usage was complete");
}
- else
- {
- if (LargeServerMessageImpl.isTrace)
- {
- LargeServerMessageImpl.log.trace("Deleting file " + file + " as the usage was complete");
- }
- try
- {
- deleteFile();
- }
- catch (Exception e)
- {
- LargeServerMessageImpl.log.error(e.getMessage(), e);
- }
+ try
+ {
+ deleteFile();
}
+ catch (Exception e)
+ {
+ LargeServerMessageImpl.log.error(e.getMessage(), e);
+ }
}
}
@@ -289,15 +288,9 @@
{
long idToUse = messageID;
- if (linkMessage != null)
- {
- idToUse = linkMessage.getMessageID();
- }
-
SequentialFile newfile = storageManager.createFileForLargeMessage(idToUse, durable);
- ServerMessage newMessage = new LargeServerMessageImpl(linkMessage == null ? this
- : (LargeServerMessageImpl)linkMessage,
+ ServerMessage newMessage = new LargeServerMessageImpl(this,
properties,
newfile,
messageID);
@@ -308,65 +301,43 @@
@Override
public synchronized ServerMessage copy(final long newID)
{
- if (!paged)
+ try
{
- incrementDelayDeletionCount();
-
- long idToUse = messageID;
-
- if (linkMessage != null)
- {
- idToUse = linkMessage.getMessageID();
- }
-
- SequentialFile newfile = storageManager.createFileForLargeMessage(idToUse, durable);
-
- ServerMessage newMessage = new LargeServerMessageImpl(linkMessage == null ? this
- : (LargeServerMessageImpl)linkMessage,
- properties,
- newfile,
- newID);
+ validateFile();
+
+ SequentialFile file = this.file;
+
+ SequentialFile newFile = storageManager.createFileForLargeMessage(newID, durable);
+
+ file.copyTo(newFile);
+
+ LargeServerMessageImpl newMessage = new LargeServerMessageImpl(this, properties, newFile, newID);
+
return newMessage;
}
- else
+ catch (Exception e)
{
- try
- {
- validateFile();
-
- SequentialFile file = this.file;
-
- SequentialFile newFile = storageManager.createFileForLargeMessage(newID, durable);
-
- file.copyTo(newFile);
-
- LargeServerMessageImpl newMessage = new LargeServerMessageImpl(this, properties, newFile, newID);
-
- newMessage.linkMessage = null;
-
- newMessage.setPaged();
-
- return newMessage;
- }
- catch (Exception e)
- {
- log.warn("Error on copying large message this for DLA or Expiry", e);
- return null;
- }
+ log.warn("Error on copying large message " + this + " for DLA or Expiry", e);
+ return null;
}
+ finally
+ {
+ releaseResources();
+ }
}
- public SequentialFile getFile()
+ public SequentialFile getFile() throws Exception
{
+ validateFile();
return file;
}
@Override
public String toString()
{
- return "ServerMessage[messageID=" + messageID + ",priority=" + this.getPriority() +
+ return "LargeServerMessage[messageID=" + messageID + ",priority=" + this.getPriority() +
",expiration=[" + (this.getExpiration() != 0 ? new java.util.Date(this.getExpiration()) : "null") + "]" +
- ", durable=" + durable + ", address=" + getAddress() + ",properties=" + properties.toString() + "]";
+ ", durable=" + durable + ", address=" + getAddress() + ",properties=" + properties.toString() + "]@" + System.identityHashCode(this);
}
@@ -396,7 +367,7 @@
file = storageManager.createFileForLargeMessage(getMessageID(), durable);
- file.open();
+ openFile();
bodySize = file.size();
}
@@ -407,32 +378,27 @@
throw new HornetQException(HornetQException.INTERNAL_ERROR, e.getMessage(), e);
}
}
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.LargeServerMessage#setLinkedMessage(org.hornetq.core.server.LargeServerMessage)
- */
- public void setLinkedMessage(final LargeServerMessage message)
+
+ protected void openFile() throws Exception
{
- if (file != null)
+ if (file == null)
+ {
+ validateFile();
+ }
+ else
+ if (!file.isOpen())
{
- // Sanity check.. it shouldn't happen
- throw new IllegalStateException("LargeMessage file was already set");
- }
-
- linkMessage = message;
-
- file = storageManager.createFileForLargeMessage(message.getMessageID(), durable);
- try
- {
file.open();
- bodySize = file.size();
- file.close();
}
- catch (Exception e)
- {
- throw new RuntimeException("could not setup linked file", e);
- }
}
+
+ protected void closeFile() throws Exception
+ {
+ if (file != null && file.isOpen())
+ {
+ file.close();
+ }
+ }
// Inner classes -------------------------------------------------
@@ -444,6 +410,10 @@
{
try
{
+ if (cFile != null && cFile.isOpen())
+ {
+ cFile.close();
+ }
cFile = file.copy();
cFile.open();
}
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/LargeServerMessage.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/LargeServerMessage.java 2011-12-22 13:32:53 UTC (rev 11928)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/LargeServerMessage.java 2011-12-22 13:37:04 UTC (rev 11929)
@@ -13,6 +13,7 @@
package org.hornetq.core.server;
+
/**
* A LargeMessage
*
@@ -26,9 +27,6 @@
{
void addBytes(byte[] bytes) throws Exception;
- /** When a large message is copied (e.g. ExpiryQueue) instead of copying the file, we specify a link between the messages */
- void setLinkedMessage(LargeServerMessage message);
-
boolean isFileExists() throws Exception;
/**
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-12-22 13:32:53 UTC (rev 11928)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-12-22 13:37:04 UTC (rev 11929)
@@ -930,7 +930,7 @@
int localChunkLen = 0;
localChunkLen = (int)Math.min(sizePendingLargeMessage - positionPendingLargeMessage, minLargeMessageSize);
-
+
HornetQBuffer bodyBuffer = HornetQBuffers.fixedBuffer(localChunkLen);
context.encode(bodyBuffer, localChunkLen);
Added: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/integration/client/ExpiryLargeMessageTest.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/integration/client/ExpiryLargeMessageTest.java (rev 0)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/integration/client/ExpiryLargeMessageTest.java 2011-12-22 13:37:04 UTC (rev 11929)
@@ -0,0 +1,415 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+import java.io.File;
+
+import org.hornetq.api.core.Message;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ * This test will send large messages in page-mode, DLQ then, expiry then, and they should be received fine
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public class ExpiryLargeMessageTest extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+ final SimpleString EXPIRY = new SimpleString("my-expiry");
+
+ final SimpleString DLQ = new SimpleString("my-DLQ");
+
+ final SimpleString MY_QUEUE = new SimpleString("MY-QUEUE");
+
+ final int messageSize = 10 * 1024;
+
+ // it has to be an even number
+ final int numberOfMessages = 50;
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testExpiryMessagesThenDLQ() throws Exception
+ {
+ HornetQServer server = createServer(true);
+
+ server.getConfiguration().setMessageExpiryScanPeriod(600000);
+
+ AddressSettings setting = new AddressSettings();
+ setting.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ setting.setMaxDeliveryAttempts(5);
+ setting.setMaxSizeBytes(50 * 1024);
+ setting.setPageSizeBytes(10 * 1024);
+ setting.setExpiryAddress(EXPIRY);
+ setting.setDeadLetterAddress(DLQ);
+ server.getAddressSettingsRepository().addMatch(MY_QUEUE.toString(), setting);
+
+ setting.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ setting.setMaxDeliveryAttempts(5);
+ setting.setMaxSizeBytes(50 * 1024);
+ setting.setPageSizeBytes(10 * 1024);
+ setting.setDeadLetterAddress(DLQ);
+ server.getAddressSettingsRepository().addMatch(EXPIRY.toString(), setting);
+
+ server.start();
+
+ try
+ {
+
+ server.createQueue(EXPIRY, EXPIRY, null, true, false);
+
+ server.createQueue(DLQ, DLQ, null, true, false);
+
+ server.createQueue(MY_QUEUE, MY_QUEUE, null, true, false);
+
+ ServerLocator locator = createInVMNonHALocator();
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(true, true, 0);
+
+ byte bufferSample[] = new byte[messageSize];
+
+ for (int i = 0; i < bufferSample.length; i++)
+ {
+ bufferSample[i] = getSamplebyte(i);
+ }
+
+ ClientProducer producer = session.createProducer(MY_QUEUE);
+
+ long timeToExpiry = System.currentTimeMillis() + 1000;
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage message = session.createMessage(true);
+
+ message.putIntProperty("count", i);
+
+ // Send a few regular messages first, then all is just large messages
+ if (i % 2 == 0)
+ {
+ message.putBooleanProperty("tst-large", false);
+ message.getBodyBuffer().writeBytes(bufferSample);
+ }
+ else
+ {
+ message.putBooleanProperty("tst-large", true);
+ message.setBodyInputStream(createFakeLargeStream(messageSize));
+ }
+
+ message.setExpiration(timeToExpiry);
+
+ producer.send(message);
+ }
+
+ server.stop();
+ server.start();
+
+ sf = locator.createSessionFactory();
+
+ session = sf.createSession(true, true, 0);
+
+ Thread.sleep(1500);
+
+ ClientConsumer cons = session.createConsumer(MY_QUEUE);
+ assertNull(cons.receive(1000));
+
+ session.close();
+
+ session = sf.createSession(false, false);
+
+ cons = session.createConsumer(EXPIRY);
+ session.start();
+
+ // Consume half of the messages to make sure all the messages are paging (on the second try)
+ for (int i = 0; i < numberOfMessages / 2; i++)
+ {
+ ClientMessage msg = cons.receive(5000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ }
+
+ session.commit();
+
+ cons.close();
+
+ for (int rep = 0; rep < 6; rep++)
+ {
+ cons = session.createConsumer(EXPIRY);
+ session.start();
+
+ System.out.println("Trying " + rep);
+ for (int i = 0; i < numberOfMessages / 2; i++)
+ {
+ ClientMessage message = cons.receive(5000);
+ assertNotNull(message);
+
+ if (i % 10 == 0)
+ {
+ System.out.println("Received " + i);
+ }
+
+ for (int location = 0; location < messageSize; location++)
+ {
+ assertEquals(getSamplebyte((long)location), message.getBodyBuffer().readByte());
+ }
+ message.acknowledge();
+ }
+
+ session.rollback();
+
+ cons.close();
+
+ session.close();
+ sf.close();
+
+ if (rep == 0)
+ {
+ // restart the server at the first try
+ server.stop();
+ server.start();
+ }
+
+ sf = locator.createSessionFactory();
+ session = sf.createSession(false, false);
+ session.start();
+ }
+
+ cons = session.createConsumer(EXPIRY);
+ session.start();
+ assertNull(cons.receiveImmediate());
+
+ cons.close();
+
+ session.close();
+ sf.close();
+
+ for (int rep = 0; rep < 2; rep++)
+ {
+ sf = locator.createSessionFactory();
+
+ session = sf.createSession(false, false);
+
+ cons = session.createConsumer(DLQ);
+
+ session.start();
+
+ for (int i = 0; i < numberOfMessages / 2; i++)
+ {
+ ClientMessage message = cons.receive(5000);
+ assertNotNull(message);
+
+ if (i % 10 == 0)
+ {
+ System.out.println("Received " + i);
+ }
+
+ for (int location = 0; location < messageSize; location++)
+ {
+ assertEquals(getSamplebyte((long)location), message.getBodyBuffer().readByte());
+ }
+ message.acknowledge();
+ }
+ if (rep == 0)
+ {
+ session.rollback();
+ session.close();
+ sf.close();
+ server.stop();
+ server.start();
+ }
+ }
+
+ session.commit();
+
+ assertNull(cons.receiveImmediate());
+
+ session.close();
+ sf.close();
+ locator.close();
+ }
+ finally
+ {
+ server.stop();
+ }
+ }
+
+ /**
+ * Tests if the system would still couple with old data where the LargeMessage was linked to its previous copy
+ * @throws Exception
+ */
+ public void testCompatilityWithLinks() throws Exception
+ {
+ HornetQServer server = createServer(true);
+
+ server.getConfiguration().setMessageExpiryScanPeriod(600000);
+
+ AddressSettings setting = new AddressSettings();
+ setting.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ setting.setMaxDeliveryAttempts(5);
+ setting.setMaxSizeBytes(-1);
+ setting.setPageSizeBytes(10 * 1024);
+ setting.setExpiryAddress(EXPIRY);
+ setting.setDeadLetterAddress(DLQ);
+ server.getAddressSettingsRepository().addMatch(MY_QUEUE.toString(), setting);
+
+ setting.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ setting.setMaxDeliveryAttempts(5);
+ setting.setMaxSizeBytes(-1);
+ setting.setPageSizeBytes(10 * 1024);
+ setting.setDeadLetterAddress(DLQ);
+ server.getAddressSettingsRepository().addMatch(EXPIRY.toString(), setting);
+
+ server.start();
+
+ try
+ {
+
+ server.createQueue(EXPIRY, EXPIRY, null, true, false);
+
+ server.createQueue(DLQ, DLQ, null, true, false);
+
+ server.createQueue(MY_QUEUE, MY_QUEUE, null, true, false);
+
+ ServerLocator locator = createInVMNonHALocator();
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(true, true, 0);
+
+ byte bufferSample[] = new byte[messageSize];
+
+ for (int i = 0; i < bufferSample.length; i++)
+ {
+ bufferSample[i] = getSamplebyte(i);
+ }
+
+ ClientProducer producer = session.createProducer(MY_QUEUE);
+
+ long timeToExpiry = System.currentTimeMillis() + 1000;
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage message = session.createMessage(true);
+
+ message.putIntProperty("count", i);
+
+ // Everything is going to be a large message
+ message.putBooleanProperty("tst-large", true);
+ message.setBodyInputStream(createFakeLargeStream(messageSize));
+
+ message.setExpiration(timeToExpiry);
+
+ producer.send(message);
+ }
+
+ server.stop();
+ server.start();
+
+ sf = locator.createSessionFactory();
+
+ session = sf.createSession(true, true, 0);
+ session.start();
+
+ Thread.sleep(1500);
+
+ ClientConsumer cons = session.createConsumer(MY_QUEUE);
+ assertNull(cons.receive(1000));
+
+ session.close();
+
+ session = sf.createSession(false, false);
+
+ cons = session.createConsumer(EXPIRY);
+ session.start();
+
+ ClientMessage msg = cons.receive(5000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ session.rollback();
+
+ server.stop();
+
+ // rename the file, simulating old behaviour
+ long messageID = msg.getMessageID();
+ long oldID = msg.getLongProperty(Message.HDR_ORIG_MESSAGE_ID);
+
+ File largeMessagesFileDir = new File(getLargeMessagesDir());
+ File oldFile = new File(largeMessagesFileDir, oldID + ".msg");
+ File currentFile = new File(largeMessagesFileDir, messageID + ".msg");
+ currentFile.renameTo(oldFile);
+
+ server.start();
+
+ sf = locator.createSessionFactory();
+
+ session = sf.createSession(true, true, 0);
+ session.start();
+
+ cons = session.createConsumer(EXPIRY);
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage message = cons.receive(5000);
+ assertNotNull(message);
+
+ if (i % 10 == 0)
+ {
+ System.out.println("Received " + i);
+ }
+
+ for (int location = 0; location < messageSize; location++)
+ {
+ assertEquals(getSamplebyte((long)location), message.getBodyBuffer().readByte());
+ }
+ message.acknowledge();
+ }
+
+ session.commit();
+
+ session.close();
+ sf.close();
+ locator.close();
+ }
+ finally
+ {
+ server.stop();
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
13 years
JBoss hornetq SVN: r11928 - in branches/Branch_2_2_EAP: tests/src/org/hornetq/tests/integration/client and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-12-22 08:32:53 -0500 (Thu, 22 Dec 2011)
New Revision: 11928
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/ExpiryLargeMessageTest.java
Log:
JBPAPP-7809 - adding compatibility with old format
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-12-22 11:22:01 UTC (rev 11927)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-12-22 13:32:53 UTC (rev 11928)
@@ -1746,6 +1746,26 @@
messageEncoding.decode(buff);
+ if (largeMessage.containsProperty(Message.HDR_ORIG_MESSAGE_ID))
+ {
+ // for compatibility: couple with old behaviour, copying the old file to avoid message loss
+ long originalMessageID = largeMessage.getLongProperty(Message.HDR_ORIG_MESSAGE_ID);
+
+ SequentialFile currentFile = createFileForLargeMessage(largeMessage.getMessageID(), true);
+
+ if (!currentFile.exists())
+ {
+ SequentialFile linkedFile = createFileForLargeMessage(originalMessageID, true);
+ if (linkedFile.exists())
+ {
+ linkedFile.copyTo(currentFile);
+ linkedFile.close();
+ }
+ }
+
+ currentFile.close();
+ }
+
return largeMessage;
}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/ExpiryLargeMessageTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/ExpiryLargeMessageTest.java 2011-12-22 11:22:01 UTC (rev 11927)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/ExpiryLargeMessageTest.java 2011-12-22 13:32:53 UTC (rev 11928)
@@ -13,6 +13,9 @@
package org.hornetq.tests.integration.client;
+import java.io.File;
+
+import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.ClientConsumer;
import org.hornetq.api.core.client.ClientMessage;
@@ -136,6 +139,7 @@
Thread.sleep(1500);
+ // just to try expiring
ClientConsumer cons = session.createConsumer(MY_QUEUE);
assertNull(cons.receive(1000));
@@ -145,18 +149,17 @@
cons = session.createConsumer(EXPIRY);
session.start();
-
-
+
// Consume half of the messages to make sure all the messages are paging (on the second try)
- for (int i = 0 ; i < numberOfMessages / 2; i++)
+ for (int i = 0; i < numberOfMessages / 2; i++)
{
ClientMessage msg = cons.receive(5000);
assertNotNull(msg);
msg.acknowledge();
}
-
+
session.commit();
-
+
cons.close();
for (int rep = 0; rep < 6; rep++)
@@ -174,7 +177,7 @@
{
System.out.println("Received " + i);
}
-
+
for (int location = 0; location < messageSize; location++)
{
assertEquals(getSamplebyte((long)location), message.getBodyBuffer().readByte());
@@ -200,7 +203,7 @@
session = sf.createSession(false, false);
session.start();
}
-
+
cons = session.createConsumer(EXPIRY);
session.start();
assertNull(cons.receiveImmediate());
@@ -262,6 +265,148 @@
}
}
+ /**
+ * Tests if the system would still couple with old data where the LargeMessage was linked to its previous copy
+ * @throws Exception
+ */
+ public void testCompatilityWithLinks() throws Exception
+ {
+ HornetQServer server = createServer(true);
+
+ server.getConfiguration().setMessageExpiryScanPeriod(600000);
+
+ AddressSettings setting = new AddressSettings();
+ setting.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ setting.setMaxDeliveryAttempts(5);
+ setting.setMaxSizeBytes(-1);
+ setting.setPageSizeBytes(10 * 1024);
+ setting.setExpiryAddress(EXPIRY);
+ setting.setDeadLetterAddress(DLQ);
+ server.getAddressSettingsRepository().addMatch(MY_QUEUE.toString(), setting);
+
+ setting.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ setting.setMaxDeliveryAttempts(5);
+ setting.setMaxSizeBytes(-1);
+ setting.setPageSizeBytes(10 * 1024);
+ setting.setDeadLetterAddress(DLQ);
+ server.getAddressSettingsRepository().addMatch(EXPIRY.toString(), setting);
+
+ server.start();
+
+ try
+ {
+
+ server.createQueue(EXPIRY, EXPIRY, null, true, false);
+
+ server.createQueue(DLQ, DLQ, null, true, false);
+
+ server.createQueue(MY_QUEUE, MY_QUEUE, null, true, false);
+
+ ServerLocator locator = createInVMNonHALocator();
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(true, true, 0);
+
+ byte bufferSample[] = new byte[messageSize];
+
+ for (int i = 0; i < bufferSample.length; i++)
+ {
+ bufferSample[i] = getSamplebyte(i);
+ }
+
+ ClientProducer producer = session.createProducer(MY_QUEUE);
+
+ long timeToExpiry = System.currentTimeMillis() + 1000;
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage message = session.createMessage(true);
+
+ message.putIntProperty("count", i);
+
+ // Everything is going to be a large message
+ message.putBooleanProperty("tst-large", true);
+ message.setBodyInputStream(createFakeLargeStream(messageSize));
+
+ message.setExpiration(timeToExpiry);
+
+ producer.send(message);
+ }
+
+ server.stop();
+ server.start();
+
+ sf = locator.createSessionFactory();
+
+ session = sf.createSession(true, true, 0);
+ session.start();
+
+ Thread.sleep(1500);
+
+ ClientConsumer cons = session.createConsumer(MY_QUEUE);
+ assertNull(cons.receive(1000));
+
+ session.close();
+
+ session = sf.createSession(false, false);
+
+ cons = session.createConsumer(EXPIRY);
+ session.start();
+
+ ClientMessage msg = cons.receive(5000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ session.rollback();
+
+ server.stop();
+
+ // rename the file, simulating old behaviour
+ long messageID = msg.getMessageID();
+ long oldID = msg.getLongProperty(Message.HDR_ORIG_MESSAGE_ID);
+
+ File largeMessagesFileDir = new File(getLargeMessagesDir());
+ File oldFile = new File(largeMessagesFileDir, oldID + ".msg");
+ File currentFile = new File(largeMessagesFileDir, messageID + ".msg");
+ currentFile.renameTo(oldFile);
+
+ server.start();
+
+ sf = locator.createSessionFactory();
+
+ session = sf.createSession(true, true, 0);
+ session.start();
+
+ cons = session.createConsumer(EXPIRY);
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage message = cons.receive(5000);
+ assertNotNull(message);
+
+ if (i % 10 == 0)
+ {
+ System.out.println("Received " + i);
+ }
+
+ for (int location = 0; location < messageSize; location++)
+ {
+ assertEquals(getSamplebyte((long)location), message.getBodyBuffer().readByte());
+ }
+ message.acknowledge();
+ }
+
+ session.commit();
+
+ session.close();
+ sf.close();
+ locator.close();
+ }
+ finally
+ {
+ server.stop();
+ }
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
13 years
JBoss hornetq SVN: r11927 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-12-22 06:22:01 -0500 (Thu, 22 Dec 2011)
New Revision: 11927
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
Log:
https://issues.jboss.org/browse/JBPAPP-7785 - changing factories protection as I saw a test hanging
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-12-22 04:23:07 UTC (rev 11926)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-12-22 11:22:01 UTC (rev 11927)
@@ -1289,23 +1289,24 @@
connectingFactories.clear();
}
+ Set<ClientSessionFactoryInternal> clonedFactory;
synchronized (factories)
{
- Set<ClientSessionFactoryInternal> clonedFactory = new HashSet<ClientSessionFactoryInternal>(factories);
+ clonedFactory = new HashSet<ClientSessionFactoryInternal>(factories);
- for (ClientSessionFactory factory : clonedFactory)
+ factories.clear();
+ }
+
+ for (ClientSessionFactory factory : clonedFactory)
+ {
+ if (sendClose)
{
- if (sendClose)
- {
- factory.close();
- }
- else
- {
- factory.cleanup();
- }
+ factory.close();
}
-
- factories.clear();
+ else
+ {
+ factory.cleanup();
+ }
}
if (shutdownPool)
@@ -1420,14 +1421,17 @@
if (actMember != null && actMember.getConnector().getA() != null && actMember.getConnector().getB() != null)
{
+ HashSet<ClientSessionFactory> clonedFactories = new HashSet<ClientSessionFactory>();
synchronized (factories)
{
- for (ClientSessionFactory factory : factories)
- {
- ((ClientSessionFactoryInternal)factory).setBackupConnector(actMember.getConnector().getA(),
- actMember.getConnector().getB());
- }
+ clonedFactories.addAll(factories);
}
+
+ for (ClientSessionFactory factory : clonedFactories)
+ {
+ ((ClientSessionFactoryInternal)factory).setBackupConnector(actMember.getConnector().getA(),
+ actMember.getConnector().getB());
+ }
}
updateArraysAndPairs();
@@ -1571,22 +1575,23 @@
return;
}
- synchronized (factories)
+ if (isClosed())
{
- if (isClosed())
- {
- factory.close();
- return;
- }
+ factory.close();
+ return;
+ }
- TransportConfiguration backup = null;
+ TransportConfiguration backup = null;
- if (ha)
- {
- backup = topology.getBackupForConnector(factory.getConnectorConfiguration());
- }
+ if (ha)
+ {
+ backup = topology.getBackupForConnector(factory.getConnectorConfiguration());
+ }
- factory.setBackupConnector(factory.getConnectorConfiguration(), backup);
+ factory.setBackupConnector(factory.getConnectorConfiguration(), backup);
+
+ synchronized (factories)
+ {
factories.add(factory);
}
}
13 years
JBoss hornetq SVN: r11926 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/server and 4 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-12-21 23:23:07 -0500 (Wed, 21 Dec 2011)
New Revision: 11926
Added:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/ExpiryLargeMessageTest.java
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/LargeServerMessage.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java
Log:
https://issues.jboss.org/browse/JBPAPP-7809 - fixing large message copy on DLQ / Expiry Queue
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-12-21 15:53:48 UTC (rev 11925)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-12-22 04:23:07 UTC (rev 11926)
@@ -1746,25 +1746,6 @@
messageEncoding.decode(buff);
- if (largeMessage.containsProperty(Message.HDR_ORIG_MESSAGE_ID))
- {
- long originalMessageID = largeMessage.getLongProperty(Message.HDR_ORIG_MESSAGE_ID);
-
- LargeServerMessage originalMessage = (LargeServerMessage)messages.get(originalMessageID);
-
- if (originalMessage == null)
- {
- // this could happen if the message was deleted but the file still exists as the file still being used
- originalMessage = createLargeMessage();
- originalMessage.setDurable(true);
- originalMessage.setMessageID(originalMessageID);
- messages.put(originalMessageID, originalMessage);
- }
-
- originalMessage.incrementDelayDeletionCount();
-
- largeMessage.setLinkedMessage(originalMessage);
- }
return largeMessage;
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2011-12-21 15:53:48 UTC (rev 11925)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2011-12-22 04:23:07 UTC (rev 11926)
@@ -48,8 +48,6 @@
// Attributes ----------------------------------------------------
private final JournalStorageManager storageManager;
-
- private LargeServerMessage linkMessage;
private long pendingRecordID = -1;
@@ -80,7 +78,6 @@
private LargeServerMessageImpl(final LargeServerMessageImpl copy, TypedProperties properties, final SequentialFile fileCopy, final long newID)
{
super(copy, properties);
- linkMessage = copy;
storageManager = copy.storageManager;
file = fileCopy;
bodySize = copy.bodySize;
@@ -191,7 +188,7 @@
checkDelete();
}
}
-
+
@Override
public BodyEncoder getBodyEncoder() throws HornetQException
{
@@ -203,27 +200,19 @@
{
if (getRefCount() <= 0)
{
- if (linkMessage != null)
+ if (LargeServerMessageImpl.isTrace)
{
- // This file is linked to another message, deleting the reference where it belongs on this case
- linkMessage.decrementDelayDeletionCount();
+ LargeServerMessageImpl.log.trace("Deleting file " + file + " as the usage was complete");
}
- else
- {
- if (LargeServerMessageImpl.isTrace)
- {
- LargeServerMessageImpl.log.trace("Deleting file " + file + " as the usage was complete");
- }
- try
- {
- deleteFile();
- }
- catch (Exception e)
- {
- LargeServerMessageImpl.log.error(e.getMessage(), e);
- }
+ try
+ {
+ deleteFile();
}
+ catch (Exception e)
+ {
+ LargeServerMessageImpl.log.error(e.getMessage(), e);
+ }
}
}
@@ -319,15 +308,9 @@
{
long idToUse = messageID;
- if (linkMessage != null)
- {
- idToUse = linkMessage.getMessageID();
- }
-
SequentialFile newfile = storageManager.createFileForLargeMessage(idToUse, durable);
- ServerMessage newMessage = new LargeServerMessageImpl(linkMessage == null ? this
- : (LargeServerMessageImpl)linkMessage,
+ ServerMessage newMessage = new LargeServerMessageImpl(this,
properties,
newfile,
messageID);
@@ -338,60 +321,34 @@
@Override
public synchronized ServerMessage copy(final long newID)
{
- if (!paged)
+ try
{
- incrementDelayDeletionCount();
-
- long idToUse = messageID;
-
- if (linkMessage != null)
- {
- idToUse = linkMessage.getMessageID();
- }
-
- SequentialFile newfile = storageManager.createFileForLargeMessage(idToUse, durable);
-
- ServerMessage newMessage = new LargeServerMessageImpl(linkMessage == null ? this
- : (LargeServerMessageImpl)linkMessage,
- properties,
- newfile,
- newID);
+ validateFile();
+
+ SequentialFile file = this.file;
+
+ SequentialFile newFile = storageManager.createFileForLargeMessage(newID, durable);
+
+ file.copyTo(newFile);
+
+ LargeServerMessageImpl newMessage = new LargeServerMessageImpl(this, properties, newFile, newID);
+
return newMessage;
}
- else
+ catch (Exception e)
{
- try
- {
- validateFile();
-
- SequentialFile file = this.file;
-
- SequentialFile newFile = storageManager.createFileForLargeMessage(newID, durable);
-
- file.copyTo(newFile);
-
- LargeServerMessageImpl newMessage = new LargeServerMessageImpl(this, properties, newFile, newID);
-
- newMessage.linkMessage = null;
-
- newMessage.setPaged();
-
- return newMessage;
- }
- catch (Exception e)
- {
- log.warn("Error on copying large message this for DLA or Expiry", e);
- return null;
- }
- finally
- {
- releaseResources();
- }
+ log.warn("Error on copying large message " + this + " for DLA or Expiry", e);
+ return null;
}
+ finally
+ {
+ releaseResources();
+ }
}
- public SequentialFile getFile()
+ public SequentialFile getFile() throws Exception
{
+ validateFile();
return file;
}
@@ -463,32 +420,6 @@
}
}
- /* (non-Javadoc)
- * @see org.hornetq.core.server.LargeServerMessage#setLinkedMessage(org.hornetq.core.server.LargeServerMessage)
- */
- public void setLinkedMessage(final LargeServerMessage message)
- {
- if (file != null)
- {
- // Sanity check.. it shouldn't happen
- throw new IllegalStateException("LargeMessage file was already set");
- }
-
- linkMessage = message;
-
- file = storageManager.createFileForLargeMessage(message.getMessageID(), durable);
- try
- {
- openFile();
- bodySize = file.size();
- closeFile();
- }
- catch (Exception e)
- {
- throw new RuntimeException("could not setup linked file", e);
- }
- }
-
// Inner classes -------------------------------------------------
class DecodingContext implements BodyEncoder
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/LargeServerMessage.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/LargeServerMessage.java 2011-12-21 15:53:48 UTC (rev 11925)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/LargeServerMessage.java 2011-12-22 04:23:07 UTC (rev 11926)
@@ -13,6 +13,7 @@
package org.hornetq.core.server;
+
/**
* A LargeMessage
*
@@ -26,9 +27,6 @@
{
void addBytes(byte[] bytes) throws Exception;
- /** When a large message is copied (e.g. ExpiryQueue) instead of copying the file, we specify a link between the messages */
- void setLinkedMessage(LargeServerMessage message);
-
void setPendingRecordID(long pendingRecordID);
long getPendingRecordID();
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-12-21 15:53:48 UTC (rev 11925)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-12-22 04:23:07 UTC (rev 11926)
@@ -691,6 +691,9 @@
serverLocator.setBlockOnDurableSend(!useDuplicateDetection);
serverLocator.setBlockOnNonDurableSend(!useDuplicateDetection);
serverLocator.setCallTimeout(callTimeout);
+
+ // No producer flow control on the bridges, as we don't want to lock the queues
+ serverLocator.setProducerWindowSize(-1);
if (retryInterval > 0)
{
@@ -930,6 +933,9 @@
targetLocator.setRetryIntervalMultiplier(retryIntervalMultiplier);
targetLocator.setMinLargeMessageSize(minLargeMessageSize);
+ // No producer flow control on the bridges, as we don't want to lock the queues
+ targetLocator.setProducerWindowSize(-1);
+
targetLocator.setAfterConnectionInternalListener(this);
targetLocator.setNodeID(nodeId);
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-12-21 15:53:48 UTC (rev 11925)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-12-22 04:23:07 UTC (rev 11926)
@@ -930,7 +930,7 @@
int localChunkLen = 0;
localChunkLen = (int)Math.min(sizePendingLargeMessage - positionPendingLargeMessage, minLargeMessageSize);
-
+
HornetQBuffer bodyBuffer = HornetQBuffers.fixedBuffer(localChunkLen);
context.encode(bodyBuffer, localChunkLen);
Added: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/ExpiryLargeMessageTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/ExpiryLargeMessageTest.java (rev 0)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/ExpiryLargeMessageTest.java 2011-12-22 04:23:07 UTC (rev 11926)
@@ -0,0 +1,273 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ * This test will send large messages in page-mode, DLQ then, expiry then, and they should be received fine
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public class ExpiryLargeMessageTest extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+ final SimpleString EXPIRY = new SimpleString("my-expiry");
+
+ final SimpleString DLQ = new SimpleString("my-DLQ");
+
+ final SimpleString MY_QUEUE = new SimpleString("MY-QUEUE");
+
+ final int messageSize = 10 * 1024;
+
+ // it has to be an even number
+ final int numberOfMessages = 50;
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testExpiryMessagesThenDLQ() throws Exception
+ {
+ HornetQServer server = createServer(true);
+
+ server.getConfiguration().setMessageExpiryScanPeriod(600000);
+
+ AddressSettings setting = new AddressSettings();
+ setting.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ setting.setMaxDeliveryAttempts(5);
+ setting.setMaxSizeBytes(50 * 1024);
+ setting.setPageSizeBytes(10 * 1024);
+ setting.setExpiryAddress(EXPIRY);
+ setting.setDeadLetterAddress(DLQ);
+ server.getAddressSettingsRepository().addMatch(MY_QUEUE.toString(), setting);
+
+ setting.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ setting.setMaxDeliveryAttempts(5);
+ setting.setMaxSizeBytes(50 * 1024);
+ setting.setPageSizeBytes(10 * 1024);
+ setting.setDeadLetterAddress(DLQ);
+ server.getAddressSettingsRepository().addMatch(EXPIRY.toString(), setting);
+
+ server.start();
+
+ try
+ {
+
+ server.createQueue(EXPIRY, EXPIRY, null, true, false);
+
+ server.createQueue(DLQ, DLQ, null, true, false);
+
+ server.createQueue(MY_QUEUE, MY_QUEUE, null, true, false);
+
+ ServerLocator locator = createInVMNonHALocator();
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(true, true, 0);
+
+ byte bufferSample[] = new byte[messageSize];
+
+ for (int i = 0; i < bufferSample.length; i++)
+ {
+ bufferSample[i] = getSamplebyte(i);
+ }
+
+ ClientProducer producer = session.createProducer(MY_QUEUE);
+
+ long timeToExpiry = System.currentTimeMillis() + 1000;
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage message = session.createMessage(true);
+
+ message.putIntProperty("count", i);
+
+ // Send a few regular messages first, then all is just large messages
+ if (i % 2 == 0)
+ {
+ message.putBooleanProperty("tst-large", false);
+ message.getBodyBuffer().writeBytes(bufferSample);
+ }
+ else
+ {
+ message.putBooleanProperty("tst-large", true);
+ message.setBodyInputStream(createFakeLargeStream(messageSize));
+ }
+
+ message.setExpiration(timeToExpiry);
+
+ producer.send(message);
+ }
+
+ server.stop();
+ server.start();
+
+ sf = locator.createSessionFactory();
+
+ session = sf.createSession(true, true, 0);
+
+ Thread.sleep(1500);
+
+ ClientConsumer cons = session.createConsumer(MY_QUEUE);
+ assertNull(cons.receive(1000));
+
+ session.close();
+
+ session = sf.createSession(false, false);
+
+ cons = session.createConsumer(EXPIRY);
+ session.start();
+
+
+ // Consume half of the messages to make sure all the messages are paging (on the second try)
+ for (int i = 0 ; i < numberOfMessages / 2; i++)
+ {
+ ClientMessage msg = cons.receive(5000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ }
+
+ session.commit();
+
+ cons.close();
+
+ for (int rep = 0; rep < 6; rep++)
+ {
+ cons = session.createConsumer(EXPIRY);
+ session.start();
+
+ System.out.println("Trying " + rep);
+ for (int i = 0; i < numberOfMessages / 2; i++)
+ {
+ ClientMessage message = cons.receive(5000);
+ assertNotNull(message);
+
+ if (i % 10 == 0)
+ {
+ System.out.println("Received " + i);
+ }
+
+ for (int location = 0; location < messageSize; location++)
+ {
+ assertEquals(getSamplebyte((long)location), message.getBodyBuffer().readByte());
+ }
+ message.acknowledge();
+ }
+
+ session.rollback();
+
+ cons.close();
+
+ session.close();
+ sf.close();
+
+ if (rep == 0)
+ {
+ // restart the server at the first try
+ server.stop();
+ server.start();
+ }
+
+ sf = locator.createSessionFactory();
+ session = sf.createSession(false, false);
+ session.start();
+ }
+
+ cons = session.createConsumer(EXPIRY);
+ session.start();
+ assertNull(cons.receiveImmediate());
+
+ cons.close();
+
+ session.close();
+ sf.close();
+
+ for (int rep = 0; rep < 2; rep++)
+ {
+ sf = locator.createSessionFactory();
+
+ session = sf.createSession(false, false);
+
+ cons = session.createConsumer(DLQ);
+
+ session.start();
+
+ for (int i = 0; i < numberOfMessages / 2; i++)
+ {
+ ClientMessage message = cons.receive(5000);
+ assertNotNull(message);
+
+ if (i % 10 == 0)
+ {
+ System.out.println("Received " + i);
+ }
+
+ for (int location = 0; location < messageSize; location++)
+ {
+ assertEquals(getSamplebyte((long)location), message.getBodyBuffer().readByte());
+ }
+ message.acknowledge();
+ }
+ if (rep == 0)
+ {
+ session.rollback();
+ session.close();
+ sf.close();
+ server.stop();
+ server.start();
+ }
+ }
+
+ session.commit();
+
+ assertNull(cons.receiveImmediate());
+
+ session.close();
+ sf.close();
+ locator.close();
+
+ validateNoFilesOnLargeDir();
+ }
+ finally
+ {
+ server.stop();
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2011-12-21 15:53:48 UTC (rev 11925)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2011-12-22 04:23:07 UTC (rev 11926)
@@ -96,9 +96,6 @@
}
locators.clear();
super.tearDown();
-// checkFreePort(5445);
-// checkFreePort(5446);
-// checkFreePort(5447);
if (InVMRegistry.instance.size() > 0)
{
fail("InVMREgistry size > 0");
13 years
JBoss hornetq SVN: r11925 - in branches/Branch_2_2_EAP: tests/src/org/hornetq/tests/integration/client and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-12-21 10:53:48 -0500 (Wed, 21 Dec 2011)
New Revision: 11925
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
JBPAPP-7756 - Disabled flow control shouldn't be looking for credits
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java 2011-12-21 13:42:08 UTC (rev 11924)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java 2011-12-21 15:53:48 UTC (rev 11925)
@@ -50,44 +50,51 @@
public ClientProducerCredits getCredits(final SimpleString address, final boolean anon)
{
- boolean needInit = false;
- ClientProducerCredits credits;
-
- synchronized(this)
+ if (windowSize == -1)
{
- credits = producerCredits.get(address);
-
- if (credits == null)
+ return ClientProducerCreditsNoFlowControl.instance;
+ }
+ else
+ {
+ boolean needInit = false;
+ ClientProducerCredits credits;
+
+ synchronized(this)
{
- // Doesn't need to be fair since session is single threaded
- credits = new ClientProducerCreditsImpl(session, address, windowSize);
- needInit = true;
-
- producerCredits.put(address, credits);
+ credits = producerCredits.get(address);
+
+ if (credits == null)
+ {
+ // Doesn't need to be fair since session is single threaded
+ credits = new ClientProducerCreditsImpl(session, address, windowSize);
+ needInit = true;
+
+ producerCredits.put(address, credits);
+ }
+
+ if (!anon)
+ {
+ credits.incrementRefCount();
+
+ // Remove from anon credits (if there)
+ unReferencedCredits.remove(address);
+ }
+ else
+ {
+ addToUnReferencedCache(address, credits);
+ }
}
-
- if (!anon)
+
+ // The init is done outside of the lock
+ // otherwise packages may arrive with flow control
+ // while this is still sending requests causing a dead lock
+ if (needInit)
{
- credits.incrementRefCount();
+ credits.init();
+ }
- // Remove from anon credits (if there)
- unReferencedCredits.remove(address);
- }
- else
- {
- addToUnReferencedCache(address, credits);
- }
+ return credits;
}
-
- // The init is done outside of the lock
- // otherwise packages may arrive with flow control
- // while this is still sending requests causing a dead lock
- if (needInit)
- {
- credits.init();
- }
-
- return credits;
}
public synchronized void returnCredits(final SimpleString address)
@@ -166,5 +173,50 @@
credits.close();
}
+
+
+ static class ClientProducerCreditsNoFlowControl implements ClientProducerCredits
+ {
+ static ClientProducerCreditsNoFlowControl instance = new ClientProducerCreditsNoFlowControl();
+ public void acquireCredits(int credits) throws InterruptedException
+ {
+ }
+
+ public void receiveCredits(int credits)
+ {
+ }
+
+ public boolean isBlocked()
+ {
+ return false;
+ }
+
+ public void init()
+ {
+ }
+
+ public void reset()
+ {
+ }
+
+ public void close()
+ {
+ }
+
+ public void incrementRefCount()
+ {
+ }
+
+ public int decrementRefCount()
+ {
+ return 1;
+ }
+
+ public void releaseOutstanding()
+ {
+ }
+
+ }
+
}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-12-21 13:42:08 UTC (rev 11924)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-12-21 15:53:48 UTC (rev 11925)
@@ -360,6 +360,108 @@
}
+ public void testSendOverBlockingNoFlowControl() throws Exception
+ {
+ clearData();
+
+ Configuration config = createDefaultConfig();
+
+ config.setJournalSyncNonTransactional(false);
+
+
+ HornetQServer server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ AddressFullMessagePolicy.BLOCK,
+ new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int messageSize = 10 * 1024;
+
+ final int numberOfMessages = 500;
+
+ try
+ {
+ ServerLocator locator = createInVMNonHALocator();
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+ locator.setProducerWindowSize(-1);
+ locator.setMinLargeMessageSize(1024 * 1024);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ ClientMessage message = null;
+
+ byte[] body = new byte[messageSize];
+
+ ByteBuffer bb = ByteBuffer.wrap(body);
+
+ for (int j = 1; j <= messageSize; j++)
+ {
+ bb.put(getSamplebyte(j));
+ }
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ message = session.createMessage(true);
+
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ message.putIntProperty(new SimpleString("id"), i);
+
+ producer.send(message);
+
+ if (i % 10 == 0)
+ {
+ session.commit();
+ }
+ }
+ session.commit();
+
+ session.start();
+
+ ClientConsumer cons = session.createConsumer(ADDRESS);
+
+ for (int i = 0 ; i < numberOfMessages; i++)
+ {
+ message = cons.receive(5000);
+ assertNotNull(message);
+ message.acknowledge();
+
+ if (i % 10 == 0)
+ {
+ session.commit();
+ }
+ }
+
+ session.commit();
+
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
public void testReceiveImmediate() throws Exception
{
clearData();
13 years
JBoss hornetq SVN: r11924 - in branches/Branch_2_2_EAP: tests/src/org/hornetq/tests/integration/jms/server/management and 1 other directories.
by do-not-reply@jboss.org
Author: ataylor
Date: 2011-12-21 08:42:08 -0500 (Wed, 21 Dec 2011)
New Revision: 11924
Added:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/ra/BootstrapContext.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/ra/MessageEndpointFactory.java
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/ra/inflow/HornetQActivation.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/ra/ResourceAdapterTest.java
Log:
fix for ra jms meta data and tests
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/ra/inflow/HornetQActivation.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/ra/inflow/HornetQActivation.java 2011-12-21 12:29:50 UTC (rev 11923)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/ra/inflow/HornetQActivation.java 2011-12-21 13:42:08 UTC (rev 11924)
@@ -370,9 +370,10 @@
result.addMetaData("resource-adapter", "inbound");
result.addMetaData("jms-session", "");
- if (spec.getClientID() != null)
+ String clientID = ra.getClientID() == null?spec.getClientID():ra.getClientID();
+ if (clientID != null)
{
- result.addMetaData("jms-client-id", spec.getClientID());
+ result.addMetaData("jms-client-id", clientID);
}
HornetQActivation.log.debug("Using queue connection " + result);
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java 2011-12-21 12:29:50 UTC (rev 11923)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java 2011-12-21 13:42:08 UTC (rev 11924)
@@ -33,13 +33,15 @@
import junit.framework.Assert;
import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.api.jms.management.JMSConnectionInfo;
import org.hornetq.api.jms.management.JMSConsumerInfo;
import org.hornetq.api.jms.management.JMSServerControl;
import org.hornetq.api.jms.management.JMSSessionInfo;
import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
@@ -47,11 +49,17 @@
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
+import org.hornetq.jms.client.HornetQDestination;
import org.hornetq.jms.client.HornetQMessage;
-import org.hornetq.jms.client.HornetQQueue;
import org.hornetq.jms.server.impl.JMSServerManagerImpl;
+import org.hornetq.jms.server.management.JMSManagementService;
+import org.hornetq.jms.server.management.impl.JMSManagementServiceImpl;
+import org.hornetq.ra.HornetQResourceAdapter;
+import org.hornetq.ra.inflow.HornetQActivation;
+import org.hornetq.ra.inflow.HornetQActivationSpec;
import org.hornetq.tests.integration.management.ManagementControlHelper;
import org.hornetq.tests.integration.management.ManagementTestBase;
+import org.hornetq.tests.unit.ra.MessageEndpointFactory;
import org.hornetq.tests.unit.util.InVMContext;
import org.hornetq.tests.util.RandomUtil;
@@ -469,6 +477,173 @@
}
}
}
+
+
+ public void testStartActivationListConnections() throws Exception
+ {
+ try
+ {
+ startHornetQServer(InVMAcceptorFactory.class.getName());
+ HornetQDestination queue = (HornetQDestination)HornetQJMSClient.createQueue("test");
+ serverManager.createQueue(false, "test", null, true, "test");
+
+ JMSServerControl control = createManagementControl();
+
+ HornetQResourceAdapter ra = new HornetQResourceAdapter();
+
+ ra.setConnectorClassName("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory");
+ ra.setUserName("userGlobal");
+ ra.setPassword("passwordGlobal");
+ ra.start(new org.hornetq.tests.unit.ra.BootstrapContext());
+ ra.setClientID("my-client-id");
+ ra.setUserName("user");
+ Connection conn = ra.getDefaultHornetQConnectionFactory().createConnection();
+
+ conn.close();
+
+ HornetQActivationSpec spec = new HornetQActivationSpec();
+
+ spec.setResourceAdapter(ra);
+
+ spec.setUseJNDI(false);
+
+ spec.setPassword("password");
+
+ spec.setDestinationType("Topic");
+ spec.setDestination("test");
+
+ spec.setMinSession(1);
+ spec.setMaxSession(1);
+
+ HornetQActivation activation = new HornetQActivation(ra, new MessageEndpointFactory(), spec);
+
+ activation.start();
+
+ String cons = control.listConnectionsAsJSON();
+
+ JMSConnectionInfo[] jmsConnectionInfos = JMSConnectionInfo.from(cons);
+
+ assertEquals(1, jmsConnectionInfos.length);
+
+ assertEquals("user", jmsConnectionInfos[0].getUsername());
+
+ assertEquals("my-client-id", jmsConnectionInfos[0].getClientID());
+
+ activation.stop();
+
+ ra.stop();
+
+ }
+ finally
+ {
+ try
+ {
+ /*if (connection != null)
+ {
+ connection.close();
+ }*/
+
+ if (serverManager != null)
+ {
+ //serverManager.destroyQueue(queueName);
+ serverManager.stop();
+ }
+ }
+ catch (Throwable ignored)
+ {
+ ignored.printStackTrace();
+ }
+
+ if (server != null)
+ {
+ server.stop();
+ }
+ }
+ }
+
+ public void testStartActivationOverrideListConnections() throws Exception
+ {
+ try
+ {
+ startHornetQServer(InVMAcceptorFactory.class.getName());
+ HornetQDestination queue = (HornetQDestination)HornetQJMSClient.createQueue("test");
+ serverManager.createQueue(false, "test", null, true, "test");
+
+ JMSServerControl control = createManagementControl();
+
+ HornetQResourceAdapter ra = new HornetQResourceAdapter();
+
+ ra.setConnectorClassName("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory");
+ ra.setUserName("userGlobal");
+ ra.setPassword("passwordGlobal");
+ ra.start(new org.hornetq.tests.unit.ra.BootstrapContext());
+
+ Connection conn = ra.getDefaultHornetQConnectionFactory().createConnection();
+
+ conn.close();
+
+ HornetQActivationSpec spec = new HornetQActivationSpec();
+
+ spec.setResourceAdapter(ra);
+
+ spec.setUseJNDI(false);
+
+ spec.setClientId("my-client-id");
+
+ spec.setUser("user");
+ spec.setPassword("password");
+
+ spec.setDestinationType("Topic");
+ spec.setDestination("test");
+
+ spec.setMinSession(1);
+ spec.setMaxSession(1);
+
+ HornetQActivation activation = new HornetQActivation(ra, new MessageEndpointFactory(), spec);
+
+ activation.start();
+
+ String cons = control.listConnectionsAsJSON();
+
+ JMSConnectionInfo[] jmsConnectionInfos = JMSConnectionInfo.from(cons);
+
+ assertEquals(1, jmsConnectionInfos.length);
+
+ assertEquals("user", jmsConnectionInfos[0].getUsername());
+
+ assertEquals("my-client-id", jmsConnectionInfos[0].getClientID());
+
+ activation.stop();
+
+ ra.stop();
+
+ }
+ finally
+ {
+ try
+ {
+ /*if (connection != null)
+ {
+ connection.close();
+ }*/
+
+ if (serverManager != null)
+ {
+ //serverManager.destroyQueue(queueName);
+ serverManager.stop();
+ }
+ }
+ catch (Throwable ignored)
+ {
+ ignored.printStackTrace();
+ }
+
+ if (server != null)
+ {
+ server.stop();
+ }
+ }
+ }
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Added: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/ra/BootstrapContext.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/ra/BootstrapContext.java (rev 0)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/ra/BootstrapContext.java 2011-12-21 13:42:08 UTC (rev 11924)
@@ -0,0 +1,65 @@
+package org.hornetq.tests.unit.ra;
+
+import javax.resource.spi.UnavailableException;
+import javax.resource.spi.XATerminator;
+import javax.resource.spi.work.ExecutionContext;
+import javax.resource.spi.work.Work;
+import javax.resource.spi.work.WorkException;
+import javax.resource.spi.work.WorkListener;
+import javax.resource.spi.work.WorkManager;
+import java.util.Timer;
+
+public class BootstrapContext implements javax.resource.spi.BootstrapContext
+{
+ public Timer createTimer() throws UnavailableException
+ {
+ return null;
+ }
+
+ public WorkManager getWorkManager()
+ {
+ return new WorkManager()
+ {
+ public void doWork(final Work work) throws WorkException
+ {
+ }
+
+ public void doWork(final Work work,
+ final long l,
+ final ExecutionContext executionContext,
+ final WorkListener workListener) throws WorkException
+ {
+ }
+
+ public long startWork(final Work work) throws WorkException
+ {
+ return 0;
+ }
+
+ public long startWork(final Work work,
+ final long l,
+ final ExecutionContext executionContext,
+ final WorkListener workListener) throws WorkException
+ {
+ return 0;
+ }
+
+ public void scheduleWork(final Work work) throws WorkException
+ {
+ work.run();
+ }
+
+ public void scheduleWork(final Work work,
+ final long l,
+ final ExecutionContext executionContext,
+ final WorkListener workListener) throws WorkException
+ {
+ }
+ };
+ }
+
+ public XATerminator getXATerminator()
+ {
+ return null;
+ }
+}
\ No newline at end of file
Added: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/ra/MessageEndpointFactory.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/ra/MessageEndpointFactory.java (rev 0)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/ra/MessageEndpointFactory.java 2011-12-21 13:42:08 UTC (rev 11924)
@@ -0,0 +1,27 @@
+package org.hornetq.tests.unit.ra;
+
+import javax.resource.spi.UnavailableException;
+import javax.resource.spi.endpoint.MessageEndpoint;
+import javax.transaction.xa.XAResource;
+import java.lang.reflect.Method;
+
+public class MessageEndpointFactory implements javax.resource.spi.endpoint.MessageEndpointFactory
+{
+
+ /* (non-Javadoc)
+ * @see javax.resource.spi.endpoint.MessageEndpointFactory#createEndpoint(javax.transaction.xa.XAResource)
+ */
+ public MessageEndpoint createEndpoint(final XAResource arg0) throws UnavailableException
+ {
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see javax.resource.spi.endpoint.MessageEndpointFactory#isDeliveryTransacted(java.lang.reflect.Method)
+ */
+ public boolean isDeliveryTransacted(final Method arg0) throws NoSuchMethodException
+ {
+ return false;
+ }
+
+}
\ No newline at end of file
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/ra/ResourceAdapterTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/ra/ResourceAdapterTest.java 2011-12-21 12:29:50 UTC (rev 11923)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/ra/ResourceAdapterTest.java 2011-12-21 13:42:08 UTC (rev 11924)
@@ -13,7 +13,6 @@
package org.hornetq.tests.unit.ra;
-import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
@@ -23,14 +22,11 @@
import javax.resource.spi.BootstrapContext;
import javax.resource.spi.UnavailableException;
import javax.resource.spi.XATerminator;
-import javax.resource.spi.endpoint.MessageEndpoint;
-import javax.resource.spi.endpoint.MessageEndpointFactory;
import javax.resource.spi.work.ExecutionContext;
import javax.resource.spi.work.Work;
import javax.resource.spi.work.WorkException;
import javax.resource.spi.work.WorkListener;
import javax.resource.spi.work.WorkManager;
-import javax.transaction.xa.XAResource;
import junit.framework.Assert;
@@ -465,7 +461,7 @@
ra.setConnectorClassName("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory");
ra.setUserName("userGlobal");
ra.setPassword("passwordGlobal");
- ra.start(fakeCTX);
+ ra.start(new org.hornetq.tests.unit.ra.BootstrapContext());
Connection conn = ra.getDefaultHornetQConnectionFactory().createConnection();
@@ -486,7 +482,7 @@
spec.setMinSession(1);
spec.setMaxSession(1);
- HornetQActivation activation = new HornetQActivation(ra, new FakeMessageEndpointFactory(), spec);
+ HornetQActivation activation = new HornetQActivation(ra, new MessageEndpointFactory(), spec);
activation.start();
activation.stop();
@@ -526,81 +522,4 @@
}*/
}
- BootstrapContext fakeCTX = new BootstrapContext()
- {
-
- public Timer createTimer() throws UnavailableException
- {
- return null;
- }
-
- public WorkManager getWorkManager()
- {
- return new WorkManager()
- {
- public void doWork(final Work work) throws WorkException
- {
- }
-
- public void doWork(final Work work,
- final long l,
- final ExecutionContext executionContext,
- final WorkListener workListener) throws WorkException
- {
- }
-
- public long startWork(final Work work) throws WorkException
- {
- return 0;
- }
-
- public long startWork(final Work work,
- final long l,
- final ExecutionContext executionContext,
- final WorkListener workListener) throws WorkException
- {
- return 0;
- }
-
- public void scheduleWork(final Work work) throws WorkException
- {
- work.run();
- }
-
- public void scheduleWork(final Work work,
- final long l,
- final ExecutionContext executionContext,
- final WorkListener workListener) throws WorkException
- {
- }
- };
- }
-
- public XATerminator getXATerminator()
- {
- return null;
- }
-
- };
-
- class FakeMessageEndpointFactory implements MessageEndpointFactory
- {
-
- /* (non-Javadoc)
- * @see javax.resource.spi.endpoint.MessageEndpointFactory#createEndpoint(javax.transaction.xa.XAResource)
- */
- public MessageEndpoint createEndpoint(final XAResource arg0) throws UnavailableException
- {
- return null;
- }
-
- /* (non-Javadoc)
- * @see javax.resource.spi.endpoint.MessageEndpointFactory#isDeliveryTransacted(java.lang.reflect.Method)
- */
- public boolean isDeliveryTransacted(final Method arg0) throws NoSuchMethodException
- {
- return false;
- }
-
- }
}
13 years
JBoss hornetq SVN: r11923 - in branches/Branch_2_2_EAP: tests/src/org/hornetq/tests/integration/client and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-12-21 07:29:50 -0500 (Wed, 21 Dec 2011)
New Revision: 11923
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TestFlowControlOnIgnoreLargeMessageBodyTest.java
Log:
https://issues.jboss.org/browse/JBPAPP-7161 - oops - I created a dumb regression here.. fixing it
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-12-21 12:16:06 UTC (rev 11922)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-12-21 12:29:50 UTC (rev 11923)
@@ -1198,8 +1198,6 @@
cacheBridge.addToCache(bridgeDupBytes, context.getTransaction());
message.removeProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID);
-
- message.decrementRefCount();
}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2011-12-21 12:16:06 UTC (rev 11922)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2011-12-21 12:29:50 UTC (rev 11923)
@@ -1029,6 +1029,11 @@
ClientMessage msg = consumer.receive(10000);
+ for (int i = 0 ; i < messageSize; i++)
+ {
+ assertEquals(getSamplebyte(i), msg.getBodyBuffer().readByte());
+ }
+
assertNotNull(msg);
msg.acknowledge();
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TestFlowControlOnIgnoreLargeMessageBodyTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TestFlowControlOnIgnoreLargeMessageBodyTest.java 2011-12-21 12:16:06 UTC (rev 11922)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TestFlowControlOnIgnoreLargeMessageBodyTest.java 2011-12-21 12:29:50 UTC (rev 11923)
@@ -392,6 +392,7 @@
}
assertFalse(error);
+ assertNull(errorMessage);
}
catch (Exception e)
{
13 years