JBoss hornetq SVN: r10145 - in branches/Branch_2_2_EAP: tests/src/org/hornetq/tests/integration/jms/server/management and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-01-25 16:58:43 -0500 (Tue, 25 Jan 2011)
New Revision: 10145
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java
Log:
HORNETQ-629 - fixing dropTopicSubscription management operation
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java 2011-01-25 18:00:45 UTC (rev 10144)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java 2011-01-25 21:58:43 UTC (rev 10145)
@@ -267,7 +267,12 @@
String[] queues = addressControl.getQueueNames();
for (String queue : queues)
{
- serverControl.destroyQueue(queue);
+ // Drop all subscription shouldn't delete the dummy queue used to identify if the topic exists on the core queues.
+ // we will just ignore this queue
+ if (!queue.equals(managedTopic.getAddress()))
+ {
+ serverControl.destroyQueue(queue);
+ }
}
}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java 2011-01-25 18:00:45 UTC (rev 10144)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java 2011-01-25 21:58:43 UTC (rev 10145)
@@ -18,7 +18,9 @@
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
import javax.jms.Session;
+import javax.jms.TextMessage;
import javax.jms.TopicSubscriber;
import junit.framework.Assert;
@@ -28,7 +30,6 @@
import org.hornetq.api.jms.management.SubscriptionInfo;
import org.hornetq.api.jms.management.TopicControl;
import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
@@ -298,29 +299,44 @@
public void testDropAllSubscriptions() throws Exception
{
Connection connection_1 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
- TopicSubscriber durableSubscriber_1 = JMSUtil.createDurableSubscriber(connection_1,
- topic,
- clientID,
- subscriptionName);
+ connection_1.setClientID(clientID);
+ Session sess1 = connection_1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ TopicSubscriber durableSubscriber_1 = sess1.createDurableSubscriber(topic, subscriptionName);
+
Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
- TopicSubscriber durableSubscriber_2 = JMSUtil.createDurableSubscriber(connection_2,
- topic,
- clientID,
- subscriptionName + "2");
+ connection_2.setClientID(clientID + "2");
+ Session sess2 = connection_1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ TopicSubscriber durableSubscriber_2 = sess2.createDurableSubscriber(topic, subscriptionName + "2");
+
+ connection_1.start();
+ connection_2.start();
+ Session sess = connection_1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer prod = sess.createProducer(topic);
+
+ TextMessage msg1 = sess.createTextMessage("tst1");
+ prod.send(msg1);
+
+ assertNotNull(durableSubscriber_1.receive(5000));
+ assertNotNull(durableSubscriber_2.receive(5000));
+
+ connection_1.close();
+ connection_2.close();
+
TopicControl topicControl = createManagementControl();
- Assert.assertEquals(2, topicControl.getSubscriptionCount());
- durableSubscriber_1.close();
- durableSubscriber_2.close();
-
Assert.assertEquals(2, topicControl.getSubscriptionCount());
topicControl.dropAllSubscriptions();
Assert.assertEquals(0, topicControl.getSubscriptionCount());
- connection_1.close();
- connection_2.close();
+ connection_1 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+ connection_1.setClientID(clientID);
+ sess = connection_1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ prod = sess.createProducer(topic);
+ TextMessage msg2 = sess.createTextMessage("tst2");
+ prod.send(msg2);
+
}
public void testRemoveAllMessages() throws Exception
13 years, 4 months
JBoss hornetq SVN: r10144 - in branches/Branch_2_2_EAP: tests/src/org/hornetq/tests/integration/client and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-01-25 13:00:45 -0500 (Tue, 25 Jan 2011)
New Revision: 10144
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java
Log:
HORNETQ-628 - fix on paging counters
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-01-25 09:07:51 UTC (rev 10143)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-01-25 18:00:45 UTC (rev 10144)
@@ -100,6 +100,9 @@
private final ConcurrentLinkedQueue<MessageReference> concurrentQueue = new ConcurrentLinkedQueue<MessageReference>();
private final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<MessageReference>(QueueImpl.NUM_PRIORITIES);
+
+ // The quantity of pagedReferences on messageREferences priority list
+ private final AtomicInteger pagedReferences = new AtomicInteger(0);
private final List<ConsumerHolder> consumerList = new ArrayList<ConsumerHolder>();
@@ -316,16 +319,17 @@
return;
}
- messageReferences.addHead(ref, ref.getMessage().getPriority());
+ internalAddHead(ref);
directDeliver = false;
}
+
public synchronized void reload(final MessageReference ref)
{
if (!scheduledDeliveryHandler.checkAndSchedule(ref))
{
- messageReferences.addTail(ref, ref.getMessage().getPriority());
+ internalAddTail(ref);
}
directDeliver = false;
@@ -621,6 +625,7 @@
if (ref.getMessage().getMessageID() == id)
{
iterator.remove();
+ refRemoved(ref);
removed = ref;
@@ -662,7 +667,8 @@
{
if (pageSubscription != null)
{
- return messageReferences.size() + getScheduledCount() + deliveringCount.get() + pageSubscription.getMessageCount();
+ // messageReferences will have depaged messages which we need to discount from the counter as they are counted on the pageSubscription as well
+ return messageReferences.size() - pagedReferences.get() + getScheduledCount() + deliveringCount.get() + pageSubscription.getMessageCount();
}
else
{
@@ -778,7 +784,7 @@
{
if (!scheduledDeliveryHandler.checkAndSchedule(reference))
{
- messageReferences.addHead(reference, reference.getMessage().getPriority());
+ internalAddHead(reference);
}
resetAllIterators();
@@ -839,6 +845,7 @@
deliveringCount.incrementAndGet();
acknowledge(tx, ref);
iter.remove();
+ refRemoved(ref);
count++;
}
}
@@ -872,6 +879,7 @@
deliveringCount.incrementAndGet();
acknowledge(tx, ref);
iter.remove();
+ refRemoved(ref);
deleted = true;
break;
}
@@ -894,6 +902,7 @@
deliveringCount.incrementAndGet();
expire(ref);
iter.remove();
+ refRemoved(ref);
return true;
}
}
@@ -915,6 +924,7 @@
deliveringCount.incrementAndGet();
expire(tx, ref);
iter.remove();
+ refRemoved(ref);
count++;
}
}
@@ -936,6 +946,7 @@
deliveringCount.incrementAndGet();
expire(ref);
iter.remove();
+ refRemoved(ref);
}
}
}
@@ -952,6 +963,7 @@
deliveringCount.incrementAndGet();
sendToDeadLetterAddress(ref);
iter.remove();
+ refRemoved(ref);
return true;
}
}
@@ -971,6 +983,7 @@
deliveringCount.incrementAndGet();
sendToDeadLetterAddress(ref);
iter.remove();
+ refRemoved(ref);
count++;
}
}
@@ -992,6 +1005,7 @@
if (ref.getMessage().getMessageID() == messageID)
{
iter.remove();
+ refRemoved(ref);
deliveringCount.incrementAndGet();
try
{
@@ -1097,6 +1111,7 @@
if (ref.getMessage().getMessageID() == messageID)
{
iter.remove();
+ refRemoved(ref);
ref.getMessage().setPriority(newPriority);
addTail(ref, false);
return true;
@@ -1118,6 +1133,7 @@
{
count++;
iter.remove();
+ refRemoved(ref);
ref.getMessage().setPriority(newPriority);
addTail(ref, false);
}
@@ -1186,13 +1202,38 @@
// Private
// ------------------------------------------------------------------------------
+ /**
+ * @param ref
+ */
+ private void internalAddTail(final MessageReference ref)
+ {
+ if (ref.isPaged())
+ {
+ pagedReferences.incrementAndGet();
+ }
+ messageReferences.addTail(ref, ref.getMessage().getPriority());
+ }
+
+ /**
+ * @param ref
+ */
+ private void internalAddHead(final MessageReference ref)
+ {
+ if (ref.isPaged())
+ {
+ pagedReferences.incrementAndGet();
+ }
+ messageReferences.addHead(ref, ref.getMessage().getPriority());
+ }
+
+
private synchronized void doPoll()
{
MessageReference ref = concurrentQueue.poll();
if (ref != null)
{
- messageReferences.addTail(ref, ref.getMessage().getPriority());
+ internalAddTail(ref);
messagesAdded++;
@@ -1264,6 +1305,8 @@
if (checkExpired(ref))
{
holder.iter.remove();
+
+ refRemoved(ref);
continue;
}
@@ -1289,6 +1332,8 @@
if (status == HandleStatus.HANDLED)
{
holder.iter.remove();
+
+ refRemoved(ref);
if (groupID != null && groupConsumer == null)
{
@@ -1333,6 +1378,18 @@
scheduleDepage();
}
}
+
+
+ /**
+ * @param ref
+ */
+ private void refRemoved(MessageReference ref)
+ {
+ if (ref.isPaged())
+ {
+ pagedReferences.decrementAndGet();
+ }
+ }
private void scheduleDepage()
{
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java 2011-01-25 09:07:51 UTC (rev 10143)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java 2011-01-25 18:00:45 UTC (rev 10144)
@@ -473,7 +473,7 @@
ClientSession sess = sf.createSession(true, true, 0);
sess.start();
ClientConsumer cons = sess.createConsumer(ADDRESS);
- for (int i = 0; i < 10; i++)
+ for (int i = 0; i < 100; i++)
{
ClientMessage msg = cons.receive(5000);
assertNotNull(msg);
@@ -510,14 +510,22 @@
}
session.commit();
+
+ q1.getMessageCount();
t1.start();
t1.join();
assertEquals(0, errors.get());
+ long timeout = System.currentTimeMillis() + 10000;
+ while (numberOfMessages -100 != q1.getMessageCount() && System.currentTimeMillis() < timeout)
+ {
+ Thread.sleep(500);
+
+ }
assertEquals(numberOfMessages, q2.getMessageCount());
- assertEquals(numberOfMessages - 10, q1.getMessageCount());
+ assertEquals(numberOfMessages - 100, q1.getMessageCount());
}
catch (Throwable e)
13 years, 4 months
JBoss hornetq SVN: r10143 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl.
by do-not-reply@jboss.org
Author: ataylor
Date: 2011-01-25 04:07:51 -0500 (Tue, 25 Jan 2011)
New Revision: 10143
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
Log:
https://issues.jboss.org/browse/JBPAPP-5807 - fixed, handle exception on send
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-01-25 03:17:50 UTC (rev 10142)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-01-25 09:07:51 UTC (rev 10143)
@@ -378,9 +378,23 @@
// Preserve the original address
dest = message.getAddress();
}
+ //if we failover during send then there is a chance that the
+ //that this will throw a disconnect, we need to remove the message
+ //from the acks so it will get resent, duplicate detection will cope
+ //with any messages resent
+ try
+ {
+ producer.send(dest, message);
+ }
+ catch (HornetQException e)
+ {
+ log.warn("Unable to send message, will try again once bridge reconnects");
- producer.send(dest, message);
+ refs.remove(ref);
+ return HandleStatus.BUSY;
+ }
+
return HandleStatus.HANDLED;
}
}
13 years, 4 months
JBoss hornetq SVN: r10142 - in branches/stomp_1_1: tests/src/org/hornetq/tests/integration/stomp and 1 other directory.
by do-not-reply@jboss.org
Author: gurkapa
Date: 2011-01-24 22:17:50 -0500 (Mon, 24 Jan 2011)
New Revision: 10142
Modified:
branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/Stomp.java
branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompDecoder.java
branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompUtils.java
branches/stomp_1_1/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
https://issues.jboss.org/browse/HORNETQ-553 Implement STOMP 1.1 Specification
Implementation of the version negotiation introduced in 1.1 spec
Modified: branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/Stomp.java
===================================================================
--- branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/Stomp.java 2011-01-25 02:55:02 UTC (rev 10141)
+++ branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/Stomp.java 2011-01-25 03:17:50 UTC (rev 10142)
@@ -28,9 +28,18 @@
String NEWLINE = "\n";
+ public static interface Versions
+ {
+ String V10 = "1.0";
+
+ String V11 = "1.1";
+ }
+
public static interface Commands
{
String CONNECT = "CONNECT";
+
+ String STOMP = "STOMP";
String SEND = "SEND";
@@ -75,6 +84,8 @@
String TRANSACTION = "transaction";
String CONTENT_LENGTH = "content-length";
+
+ String CONTENT_TYPE = "content-type";
public interface Response
{
@@ -159,11 +170,17 @@
String CLIENT_ID = "client-id";
String REQUEST_ID = "request-id";
+
+ String ACCEPT_VERSION = "accept-version";
+
+ String HOST = "host";
}
public interface Error
{
String MESSAGE = "message";
+
+ String VERSION = "version";
}
public interface Connected
@@ -171,6 +188,8 @@
String SESSION = "session";
String RESPONSE_ID = "response-id";
+
+ String VERSION = "version";
}
public interface Ack
Modified: branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 2011-01-25 02:55:02 UTC (rev 10141)
+++ branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 2011-01-25 03:17:50 UTC (rev 10142)
@@ -48,6 +48,8 @@
private String passcode;
private String clientID;
+
+ private String version;
private boolean valid;
@@ -309,6 +311,16 @@
this.valid = valid;
}
+ public void setVersion(String version)
+ {
+ this.version = version;
+ }
+
+ public String getVersion()
+ {
+ return version;
+ }
+
private void callFailureListeners(final HornetQException me)
{
final List<FailureListener> listenersClone = new ArrayList<FailureListener>(failureListeners);
Modified: branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompDecoder.java
===================================================================
--- branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompDecoder.java 2011-01-25 02:55:02 UTC (rev 10141)
+++ branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompDecoder.java 2011-01-25 03:17:50 UTC (rev 10142)
@@ -59,6 +59,10 @@
private static final String COMMAND_SEND = "SEND";
private static final int COMMAND_SEND_LENGTH = COMMAND_SEND.length();
+
+ private static final String COMMAND_STOMP = "STOMP";
+
+ private static final int COMMAND_STOMP_LENGTH = COMMAND_STOMP.length();
private static final String COMMAND_SUBSCRIBE = "SUBSCRIBE";
@@ -82,6 +86,8 @@
private static final byte S = (byte)'S';
+ private static final byte T = (byte)'T';
+
private static final byte U = (byte)'U';
private static final byte HEADER_SEPARATOR = (byte)':';
@@ -267,6 +273,16 @@
// SEND
command = COMMAND_SEND;
+ }
+ else if (workingBuffer[offset + 1] == T)
+ {
+ if (!tryIncrement(offset + COMMAND_STOMP_LENGTH + 1))
+ {
+ return null;
+ }
+
+ // STOMP
+ command = COMMAND_STOMP;
}
else
{
Modified: branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2011-01-25 02:55:02 UTC (rev 10141)
+++ branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2011-01-25 03:17:50 UTC (rev 10142)
@@ -50,12 +50,12 @@
class StompProtocolManager implements ProtocolManager
{
// Constants -----------------------------------------------------
-
+
private static final Logger log = Logger.getLogger(StompProtocolManager.class);
// TODO use same value than HornetQConnection
private static final String CONNECTION_ID_PROP = "__HQ_CID";
-
+
// Attributes ----------------------------------------------------
private final HornetQServer server;
@@ -178,7 +178,7 @@
StompFrame response = null;
- if (Stomp.Commands.CONNECT.equals(command))
+ if (Stomp.Commands.CONNECT.equals(command) || Stomp.Commands.STOMP.equals(command))
{
response = onConnect(request, conn);
}
@@ -577,6 +577,7 @@
String passcode = (String)headers.get(Stomp.Headers.Connect.PASSCODE);
String clientID = (String)headers.get(Stomp.Headers.Connect.CLIENT_ID);
String requestID = (String)headers.get(Stomp.Headers.Connect.REQUEST_ID);
+ String acceptVersion = (String)headers.get(Stomp.Headers.Connect.ACCEPT_VERSION);
HornetQSecurityManager sm = server.getSecurityManager();
@@ -585,11 +586,18 @@
{
sm.validateUser(login, passcode);
}
+
+ String version = negotiateVersion(acceptVersion);
connection.setLogin(login);
connection.setPasscode(passcode);
connection.setClientID(clientID);
connection.setValid(true);
+ if (version == null){
+ // client and server does not have any version in common. Return Error frame
+ return createNegotiationFailedFrame();
+ }
+ connection.setVersion(version);
HashMap<String, Object> h = new HashMap<String, Object>();
h.put(Stomp.Headers.Connected.SESSION, connection.getID());
@@ -597,9 +605,45 @@
{
h.put(Stomp.Headers.Connected.RESPONSE_ID, requestID);
}
+ if (acceptVersion != null)
+ {
+ // Only put this in header if we got a accept-version header.
+ h.put(Stomp.Headers.Connected.VERSION, version);
+ }
return new StompFrame(Stomp.Responses.CONNECTED, h);
}
+ private String negotiateVersion(String acceptVersion)
+ {
+ if (acceptVersion != null)
+ {
+ String bestVersion = null;
+ for(String v : acceptVersion.split(","))
+ {
+ if(Stomp.Versions.V11.equals(v.trim()))
+ {
+ bestVersion = Stomp.Versions.V11;
+ }
+ else if (Stomp.Versions.V10.equals(v.trim()) && bestVersion == null)
+ {
+ bestVersion = Stomp.Versions.V10;
+ }
+ }
+ return bestVersion;
+ }
+ return Stomp.Versions.V10;
+ }
+
+ private StompFrame createNegotiationFailedFrame() throws Exception
+ {
+ HashMap<String, Object> h = new HashMap<String, Object>();
+ h.put(Stomp.Headers.Error.VERSION, Stomp.Versions.V10 + "," + Stomp.Versions.V11);
+ h.put(Stomp.Headers.CONTENT_TYPE, "text/plain");
+
+ StringBuffer eMess = new StringBuffer("Supported protocol versions are " + Stomp.Versions.V10 + " " + Stomp.Versions.V11);
+ return new StompFrame(Stomp.Responses.ERROR, h, eMess.toString().getBytes("UTF-8"));
+ }
+
public void cleanup(StompConnection connection)
{
connection.setValid(false);
Modified: branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompUtils.java
===================================================================
--- branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompUtils.java 2011-01-25 02:55:02 UTC (rev 10141)
+++ branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompUtils.java 2011-01-25 03:17:50 UTC (rev 10142)
@@ -130,6 +130,7 @@
headers.put(name.toString(), message.getObjectProperty(name));
}
}
+
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
Modified: branches/stomp_1_1/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- branches/stomp_1_1/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2011-01-25 02:55:02 UTC (rev 10141)
+++ branches/stomp_1_1/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2011-01-25 03:17:50 UTC (rev 10142)
@@ -88,7 +88,56 @@
Assert.assertTrue(f.startsWith("CONNECTED"));
Assert.assertTrue(f.indexOf("response-id:1") >= 0);
}
+
+ public void testV11Connect() throws Exception
+ {
+ String connect_frame = "CONNECT\n" + "login: brianm\n" +
+ "passcode: wombats\n" +
+ "request-id: 1\n" +
+ "accept-version: 1.0,1.1\n" +
+ "\n" +
+ Stomp.NULL;
+ sendFrame(connect_frame);
+ String f = receiveFrame(10000);
+ Assert.assertTrue(f.startsWith("CONNECTED"));
+ Assert.assertTrue(f.indexOf("response-id:1") >= 0);
+ Assert.assertTrue(f.indexOf("version:1.1") >= 0);
+ }
+
+ public void testConnectWithStomp() throws Exception
+ {
+ String connect_frame = "STOMP\n" + "login: brianm\n" +
+ "passcode: wombats\n" +
+ "request-id: 1\n" +
+ "accept-version: 1.0,1.1\n" +
+ "\n" +
+ Stomp.NULL;
+ sendFrame(connect_frame);
+
+ String f = receiveFrame(10000);
+ Assert.assertTrue(f.startsWith("CONNECTED"));
+ Assert.assertTrue(f.indexOf("response-id:1") >= 0);
+ Assert.assertTrue(f.indexOf("version:1.1") >= 0);
+ }
+
+ public void testProtocolNegotiationFail() throws Exception
+ {
+ String connect_frame = "CONNECT\n" + "login: brianm\n" +
+ "passcode: wombats\n" +
+ "request-id: 1\n" +
+ "accept-version: 1.2\n" +
+ "\n" +
+ Stomp.NULL;
+ sendFrame(connect_frame);
+
+ String f = receiveFrame(10000);
+ Assert.assertTrue(f.startsWith("ERROR"));
+ Assert.assertTrue(f.indexOf("version:1.0,1.1") >= 0);
+ Assert.assertTrue(f.indexOf("content-type:text/plain") >= 0);
+ Assert.assertTrue(f.indexOf("Supported protocol versions are") >= 0);
+ }
+
public void testDisconnectAndError() throws Exception
{
13 years, 4 months
JBoss hornetq SVN: r10141 - in branches/Branch_2_2_EAP: hornetq-rest and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-01-24 21:55:02 -0500 (Mon, 24 Jan 2011)
New Revision: 10141
Modified:
branches/Branch_2_2_EAP/build-maven.xml
branches/Branch_2_2_EAP/hornetq-rest/pom.xml
Log:
upload a new EAP release
Modified: branches/Branch_2_2_EAP/build-maven.xml
===================================================================
--- branches/Branch_2_2_EAP/build-maven.xml 2011-01-25 02:38:16 UTC (rev 10140)
+++ branches/Branch_2_2_EAP/build-maven.xml 2011-01-25 02:55:02 UTC (rev 10141)
@@ -13,7 +13,7 @@
-->
<project default="upload" name="HornetQ">
- <property name="hornetq.version" value="2.2.0.QA-10117"/>
+ <property name="hornetq.version" value="2.2.0.EAP-QA-10136"/>
<property name="build.dir" value="build"/>
<property name="jars.dir" value="${build.dir}/jars"/>
Modified: branches/Branch_2_2_EAP/hornetq-rest/pom.xml
===================================================================
--- branches/Branch_2_2_EAP/hornetq-rest/pom.xml 2011-01-25 02:38:16 UTC (rev 10140)
+++ branches/Branch_2_2_EAP/hornetq-rest/pom.xml 2011-01-25 02:55:02 UTC (rev 10141)
@@ -10,7 +10,7 @@
<properties>
<resteasy.version>2.0.1.GA</resteasy.version>
- <hornetq.version>2.2.0.QA-10111</hornetq.version>
+ <hornetq.version>2.2.0.EAP-QA-10136</hornetq.version>
</properties>
<licenses>
13 years, 4 months
JBoss hornetq SVN: r10139 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-01-24 21:19:47 -0500 (Mon, 24 Jan 2011)
New Revision: 10139
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
Log:
HORNETQ-628 - fix for paging counters
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-01-25 02:03:08 UTC (rev 10138)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-01-25 02:19:47 UTC (rev 10139)
@@ -662,11 +662,11 @@
{
if (pageSubscription != null)
{
- return messageReferences.size() + getScheduledCount() + pageSubscription.getMessageCount();
+ return messageReferences.size() + getScheduledCount() + deliveringCount.get() + pageSubscription.getMessageCount();
}
else
{
- return messageReferences.size() + getScheduledCount();
+ return messageReferences.size() + getScheduledCount() + deliveringCount.get();
}
}
}
@@ -691,6 +691,7 @@
if (ref.isPaged())
{
pageSubscription.ack((PagedReference)ref);
+ postAcknowledge(ref);
}
else
{
@@ -1678,7 +1679,7 @@
boolean durableRef = message.isDurable() && queue.durable;
- if (durableRef)
+ if (durableRef && ! ref.isPaged())
{
int count = message.decrementDurableRefCount();
13 years, 4 months
JBoss hornetq SVN: r10137 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/server/impl and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-01-24 20:09:51 -0500 (Mon, 24 Jan 2011)
New Revision: 10137
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java
Log:
HORNETQ-628 - fix for paging counters
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2011-01-24 20:40:55 UTC (rev 10136)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2011-01-25 01:09:51 UTC (rev 10137)
@@ -102,8 +102,6 @@
private final SortedMap<Long, PageCursorInfo> consumedPages = Collections.synchronizedSortedMap(new TreeMap<Long, PageCursorInfo>());
private final PageSubscriptionCounter counter;
-
- private final AtomicLong deliveredCount = new AtomicLong(0);
// We only store the position for redeliveries. They will be read from the SoftCache again during delivery.
private final ConcurrentLinkedQueue<PagePosition> redeliveries = new ConcurrentLinkedQueue<PagePosition>();
@@ -178,7 +176,7 @@
public long getMessageCount()
{
- return counter.getValue() - deliveredCount.get();
+ return counter.getValue();
}
public PageSubscriptionCounter getCounter()
@@ -969,7 +967,6 @@
for (PagePosition confirmed : positions)
{
cursor.processACK(confirmed);
- cursor.deliveredCount.decrementAndGet();
}
}
@@ -1206,7 +1203,6 @@
{
if (!isredelivery)
{
- deliveredCount.incrementAndGet();
PageSubscriptionImpl.this.getPageInfo(position).remove(position);
}
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-01-24 20:40:55 UTC (rev 10136)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-01-25 01:09:51 UTC (rev 10137)
@@ -662,11 +662,11 @@
{
if (pageSubscription != null)
{
- return messageReferences.size() + getScheduledCount() + getDeliveringCount() + pageSubscription.getMessageCount();
+ return messageReferences.size() + getScheduledCount() + pageSubscription.getMessageCount();
}
else
{
- return messageReferences.size() + getScheduledCount() + getDeliveringCount();
+ return messageReferences.size() + getScheduledCount();
}
}
}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java 2011-01-24 20:40:55 UTC (rev 10136)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java 2011-01-25 01:09:51 UTC (rev 10137)
@@ -15,6 +15,7 @@
import java.nio.ByteBuffer;
import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.SimpleString;
@@ -28,14 +29,20 @@
import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
+import org.hornetq.core.postoffice.Binding;
+import org.hornetq.core.postoffice.Bindings;
+import org.hornetq.core.postoffice.impl.LocalQueueBinding;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerSession;
import org.hornetq.core.server.impl.QueueImpl;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.tests.util.ServiceTestBase;
/**
- * A PagingOrderTest
+ * A PagingOrderTest.
+ *
+ * PagingTest has a lot of tests already. I decided to create a newer one more specialized on Ordering and counters
*
* @author clebertsuconic
*
@@ -177,12 +184,10 @@
{
ClientMessage message = consumer.receive(5000);
assertNotNull(message);
- System.out.println("msg = " + message.getIntProperty("id"));
assertEquals(i, message.getIntProperty("id").intValue());
if (i < 100)
{
- System.out.println("Acking " + i);
// Do not consume the last one so we could restart
message.acknowledge();
}
@@ -211,7 +216,6 @@
{
ClientMessage message = consumer.receive(5000);
assertNotNull(message);
- System.out.println("msg = " + message.getIntProperty("id"));
assertEquals(i, message.getIntProperty("id").intValue());
message.acknowledge();
}
@@ -235,6 +239,305 @@
}
+ public void testPageCounter() throws Throwable
+ {
+ boolean persistentMessages = true;
+
+ Configuration config = createDefaultConfig();
+
+ config.setJournalSyncNonTransactional(false);
+
+ HornetQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int messageSize = 1024;
+
+ final int numberOfMessages = 500;
+
+ try
+ {
+ ServerLocator locator = createInVMNonHALocator();
+
+ locator.setClientFailureCheckPeriod(1000);
+ locator.setConnectionTTL(2000);
+ locator.setReconnectAttempts(0);
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+ locator.setConsumerWindowSize(1024 * 1024);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ Queue q1 = server.createQueue(ADDRESS, ADDRESS, null, true, false);
+
+ Queue q2 = server.createQueue(ADDRESS, new SimpleString("inactive"), null, true, false);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ byte[] body = new byte[messageSize];
+
+ ByteBuffer bb = ByteBuffer.wrap(body);
+
+ for (int j = 1; j <= messageSize; j++)
+ {
+ bb.put(getSamplebyte(j));
+ }
+
+ final AtomicInteger errors = new AtomicInteger(0);
+
+ Thread t1 = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ ServerLocator sl = createInVMNonHALocator();
+ ClientSessionFactory sf = sl.createSessionFactory();
+ ClientSession sess = sf.createSession(true, true, 0);
+ sess.start();
+ ClientConsumer cons = sess.createConsumer(ADDRESS);
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = cons.receive(5000);
+ assertNotNull(msg);
+ assertEquals(i, msg.getIntProperty("id").intValue());
+ msg.acknowledge();
+ }
+
+ assertNull(cons.receiveImmediate());
+ sess.close();
+ sl.close();
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+
+ }
+ };
+
+ t1.start();
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage message = session.createMessage(persistentMessages);
+
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ message.putIntProperty(new SimpleString("id"), i);
+
+ producer.send(message);
+ if (i % 20 == 0)
+ {
+ session.commit();
+ }
+ }
+
+ session.commit();
+
+ t1.join();
+
+ assertEquals(0, errors.get());
+
+ assertEquals(numberOfMessages, q2.getMessageCount());
+ assertEquals(0, q1.getMessageCount());
+
+
+ session.close();
+ sf.close();
+ locator.close();
+
+ server.stop();
+
+
+ server.start();
+
+ Bindings bindings = server.getPostOffice().getBindingsForAddress(ADDRESS);
+
+ q1 = null;
+ q2 = null;
+
+ for (Binding bind : bindings.getBindings())
+ {
+ if (bind instanceof LocalQueueBinding)
+ {
+ LocalQueueBinding qb = (LocalQueueBinding)bind;
+ if (qb.getQueue().getName().equals(ADDRESS))
+ {
+ q1 = qb.getQueue();
+ }
+
+ if (qb.getQueue().getName().equals(new SimpleString("inactive")))
+ {
+ q2 = qb.getQueue();
+ }
+ }
+ }
+
+ assertNotNull(q1);
+
+ assertNotNull(q2);
+
+
+ assertEquals(numberOfMessages, q2.getMessageCount());
+ assertEquals(0, q1.getMessageCount());
+
+
+
+
+
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ throw e;
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
+ public void testPageCounter2() throws Throwable
+ {
+ boolean persistentMessages = true;
+
+ Configuration config = createDefaultConfig();
+
+ config.setJournalSyncNonTransactional(false);
+
+ HornetQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int messageSize = 1024;
+
+ final int numberOfMessages = 500;
+
+ try
+ {
+ ServerLocator locator = createInVMNonHALocator();
+
+ locator.setClientFailureCheckPeriod(1000);
+ locator.setConnectionTTL(2000);
+ locator.setReconnectAttempts(0);
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+ locator.setConsumerWindowSize(1024 * 1024);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ Queue q1 = server.createQueue(ADDRESS, ADDRESS, null, true, false);
+
+ Queue q2 = server.createQueue(ADDRESS, new SimpleString("inactive"), null, true, false);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ byte[] body = new byte[messageSize];
+
+ ByteBuffer bb = ByteBuffer.wrap(body);
+
+ for (int j = 1; j <= messageSize; j++)
+ {
+ bb.put(getSamplebyte(j));
+ }
+
+ final AtomicInteger errors = new AtomicInteger(0);
+
+ Thread t1 = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ ServerLocator sl = createInVMNonHALocator();
+ ClientSessionFactory sf = sl.createSessionFactory();
+ ClientSession sess = sf.createSession(true, true, 0);
+ sess.start();
+ ClientConsumer cons = sess.createConsumer(ADDRESS);
+ for (int i = 0; i < 10; i++)
+ {
+ ClientMessage msg = cons.receive(5000);
+ assertNotNull(msg);
+ assertEquals(i, msg.getIntProperty("id").intValue());
+ msg.acknowledge();
+ }
+ sess.close();
+ sl.close();
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+
+ }
+ };
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage message = session.createMessage(persistentMessages);
+
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ message.putIntProperty(new SimpleString("id"), i);
+
+ producer.send(message);
+ if (i % 20 == 0)
+ {
+ session.commit();
+ }
+ }
+
+ session.commit();
+
+ t1.start();
+ t1.join();
+
+ assertEquals(0, errors.get());
+
+ assertEquals(numberOfMessages, q2.getMessageCount());
+ assertEquals(numberOfMessages - 10, q1.getMessageCount());
+
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ throw e;
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
public void testOrderOverRollback() throws Throwable
{
boolean persistentMessages = true;
@@ -302,8 +605,6 @@
session.close();
- System.out.println("number of refs " + queue.getNumberOfReferences());
-
session = sf.createSession(false, false, 0);
session.start();
@@ -314,7 +615,6 @@
{
ClientMessage message = consumer.receive(5000);
assertNotNull(message);
- System.out.println("msg = " + message.getIntProperty("id"));
assertEquals(i, message.getIntProperty("id").intValue());
message.acknowledge();
}
@@ -333,7 +633,6 @@
{
ClientMessage message = consumer.receive(5000);
assertNotNull(message);
- System.out.println("msg = " + message.getIntProperty("id"));
assertEquals(i, message.getIntProperty("id").intValue());
message.acknowledge();
}
13 years, 4 months
JBoss hornetq SVN: r10136 - in branches/Branch_2_2_EAP: tests/src/org/hornetq/tests/integration/client and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-01-24 15:40:55 -0500 (Mon, 24 Jan 2011)
New Revision: 10136
Added:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
Log:
JBPAPP-5816 / HORNETQ-630 - Paging Ordering after rollback and cancelations
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-01-23 01:38:06 UTC (rev 10135)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-01-24 20:40:55 UTC (rev 10136)
@@ -712,6 +712,8 @@
if (ref.isPaged())
{
pageSubscription.ackTx(tx, (PagedReference)ref);
+
+ getRefsOperation(tx).addAck(ref);
}
else
{
@@ -1423,6 +1425,12 @@
return true;
}
}
+
+ /** Used on testing only **/
+ public int getNumberOfReferences()
+ {
+ return messageReferences.size();
+ }
private void move(final SimpleString toAddress, final MessageReference ref) throws Exception
{
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-01-23 01:38:06 UTC (rev 10135)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-01-24 20:40:55 UTC (rev 10136)
@@ -563,11 +563,11 @@
if (autoCommitAcks || tx == null)
{
- ref.acknowledge();
+ ref.getQueue().acknowledge(ref);
}
else
{
- ref.acknowledge(tx);
+ ref.getQueue().acknowledge(tx, ref);
}
}
while (ref.getMessage().getMessageID() != messageID);
Added: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java (rev 0)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java 2011-01-24 20:40:55 UTC (rev 10136)
@@ -0,0 +1,539 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.impl.QueueImpl;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ * A PagingOrderTest
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public class PagingOrderTest extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ private ServerLocator locator;
+
+ public PagingOrderTest(final String name)
+ {
+ super(name);
+ }
+
+ public PagingOrderTest()
+ {
+ super();
+ }
+
+ // Constants -----------------------------------------------------
+ private static final Logger log = Logger.getLogger(PagingTest.class);
+
+ private static final int RECEIVE_TIMEOUT = 30000;
+
+ private static final int PAGE_MAX = 100 * 1024;
+
+ private static final int PAGE_SIZE = 10 * 1024;
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ locator = createInVMNonHALocator();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ locator.close();
+
+ super.tearDown();
+ }
+
+ public void testOrder1() throws Throwable
+ {
+ boolean persistentMessages = true;
+
+ Configuration config = createDefaultConfig();
+
+ config.setJournalSyncNonTransactional(false);
+
+ HornetQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int messageSize = 1024;
+
+ final int numberOfMessages = 500;
+
+ try
+ {
+ ServerLocator locator = createInVMNonHALocator();
+
+ locator.setClientFailureCheckPeriod(1000);
+ locator.setConnectionTTL(2000);
+ locator.setReconnectAttempts(0);
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+ locator.setConsumerWindowSize(1024 * 1024);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ byte[] body = new byte[messageSize];
+
+ ByteBuffer bb = ByteBuffer.wrap(body);
+
+ for (int j = 1; j <= messageSize; j++)
+ {
+ bb.put(getSamplebyte(j));
+ }
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage message = session.createMessage(persistentMessages);
+
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ message.putIntProperty(new SimpleString("id"), i);
+
+ producer.send(message);
+ if (i % 1000 == 0)
+ {
+ session.commit();
+ }
+ }
+
+ session.commit();
+
+ session.close();
+
+ session = sf.createSession(true, true, 0);
+
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ for (int i = 0; i < numberOfMessages / 2; i++)
+ {
+ ClientMessage message = consumer.receive(5000);
+ assertNotNull(message);
+ System.out.println("msg = " + message.getIntProperty("id"));
+ assertEquals(i, message.getIntProperty("id").intValue());
+
+ if (i < 100)
+ {
+ System.out.println("Acking " + i);
+ // Do not consume the last one so we could restart
+ message.acknowledge();
+ }
+ }
+
+ session.commit();
+
+ for (ServerSession sessionServer : server.getSessions())
+ {
+ sessionServer.close(true);
+ }
+
+ OperationContextImpl.getContext().waitCompletion();
+
+ ((ClientSessionFactoryImpl)sf).stopPingingAfterOne();
+
+ sf = locator.createSessionFactory();
+
+ session = sf.createSession(true, true, 0);
+
+ session.start();
+
+ consumer = session.createConsumer(ADDRESS);
+
+ for (int i = 100; i < numberOfMessages; i++)
+ {
+ ClientMessage message = consumer.receive(5000);
+ assertNotNull(message);
+ System.out.println("msg = " + message.getIntProperty("id"));
+ assertEquals(i, message.getIntProperty("id").intValue());
+ message.acknowledge();
+ }
+
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ throw e;
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
+ public void testOrderOverRollback() throws Throwable
+ {
+ boolean persistentMessages = true;
+
+ Configuration config = createDefaultConfig();
+
+ config.setJournalSyncNonTransactional(false);
+
+ HornetQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int messageSize = 1024;
+
+ final int numberOfMessages = 3000;
+
+ try
+ {
+ ServerLocator locator = createInVMNonHALocator();
+
+ locator.setClientFailureCheckPeriod(1000);
+ locator.setConnectionTTL(2000);
+ locator.setReconnectAttempts(0);
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+ locator.setConsumerWindowSize(1024 * 1024);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ QueueImpl queue = (QueueImpl)server.createQueue(ADDRESS, ADDRESS, null, true, false);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ byte[] body = new byte[messageSize];
+
+ ByteBuffer bb = ByteBuffer.wrap(body);
+
+ for (int j = 1; j <= messageSize; j++)
+ {
+ bb.put(getSamplebyte(j));
+ }
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage message = session.createMessage(persistentMessages);
+
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ message.putIntProperty(new SimpleString("id"), i);
+
+ producer.send(message);
+ if (i % 1000 == 0)
+ {
+ session.commit();
+ }
+ }
+
+ session.commit();
+
+ session.close();
+
+ System.out.println("number of refs " + queue.getNumberOfReferences());
+
+ session = sf.createSession(false, false, 0);
+
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ for (int i = 0; i < numberOfMessages / 2; i++)
+ {
+ ClientMessage message = consumer.receive(5000);
+ assertNotNull(message);
+ System.out.println("msg = " + message.getIntProperty("id"));
+ assertEquals(i, message.getIntProperty("id").intValue());
+ message.acknowledge();
+ }
+
+ session.rollback();
+
+ session.close();
+
+ session = sf.createSession(false, false, 0);
+
+ session.start();
+
+ consumer = session.createConsumer(ADDRESS);
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage message = consumer.receive(5000);
+ assertNotNull(message);
+ System.out.println("msg = " + message.getIntProperty("id"));
+ assertEquals(i, message.getIntProperty("id").intValue());
+ message.acknowledge();
+ }
+
+ session.commit();
+
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ throw e;
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
+ public void testOrderOverRollback2() throws Throwable
+ {
+ boolean persistentMessages = true;
+
+ Configuration config = createDefaultConfig();
+
+ config.setJournalSyncNonTransactional(false);
+
+ HornetQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int messageSize = 1024;
+
+ final int numberOfMessages = 200;
+
+ try
+ {
+ ServerLocator locator = createInVMNonHALocator();
+
+ locator.setClientFailureCheckPeriod(1000);
+ locator.setConnectionTTL(2000);
+ locator.setReconnectAttempts(0);
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+ locator.setConsumerWindowSize(0);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ QueueImpl queue = (QueueImpl)server.createQueue(ADDRESS, ADDRESS, null, true, false);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ byte[] body = new byte[messageSize];
+
+ ByteBuffer bb = ByteBuffer.wrap(body);
+
+ for (int j = 1; j <= messageSize; j++)
+ {
+ bb.put(getSamplebyte(j));
+ }
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage message = session.createMessage(persistentMessages);
+
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ message.putIntProperty(new SimpleString("id"), i);
+
+ producer.send(message);
+ if (i % 1000 == 0)
+ {
+ session.commit();
+ }
+ }
+
+ session.commit();
+
+ session.close();
+
+ session = sf.createSession(false, false, 0);
+
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ // number of references without paging
+ int numberOfRefs = queue.getNumberOfReferences();
+
+ // consume all non-paged references
+ for (int ref = 0; ref < numberOfRefs; ref++)
+ {
+ ClientMessage msg = consumer.receive(5000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ }
+
+ session.commit();
+
+ session.close();
+
+ session = sf.createSession(false, false, 0);
+
+ session.start();
+
+ consumer = session.createConsumer(ADDRESS);
+
+ ClientMessage msg = consumer.receive(5000);
+ assertNotNull(msg);
+ int msgIDRolledBack = msg.getIntProperty("id").intValue();
+ msg.acknowledge();
+
+ session.rollback();
+
+ msg = consumer.receive(5000);
+
+ assertNotNull(msg);
+
+ assertEquals(msgIDRolledBack, msg.getIntProperty("id").intValue());
+
+ session.rollback();
+
+ session.close();
+
+ sf.close();
+ locator.close();
+
+ server.stop();
+
+ server.start();
+
+ locator = createInVMNonHALocator();
+
+ locator.setClientFailureCheckPeriod(1000);
+ locator.setConnectionTTL(2000);
+ locator.setReconnectAttempts(0);
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+ locator.setConsumerWindowSize(0);
+
+ sf = locator.createSessionFactory();
+
+ session = sf.createSession(false, false, 0);
+
+ session.start();
+
+ consumer = session.createConsumer(ADDRESS);
+
+ for (int i = msgIDRolledBack; i < numberOfMessages; i++)
+ {
+ ClientMessage message = consumer.receive(5000);
+ assertNotNull(message);
+ assertEquals(i, message.getIntProperty("id").intValue());
+ message.acknowledge();
+ }
+
+ session.commit();
+
+ session.close();
+
+ locator.close();
+
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ throw e;
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
13 years, 4 months
JBoss hornetq SVN: r10135 - in branches/Branch_2_2_EAP: src/main/org/hornetq/jms/server/impl and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-01-22 20:38:06 -0500 (Sat, 22 Jan 2011)
New Revision: 10135
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/registry/JndiBindingRegistry.java
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/client/StoreConfigTest.java
Log:
HORNETQ-624 / JBPAPP-5801 - avoid duplicates on JNDI through addJNDI
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/registry/JndiBindingRegistry.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/registry/JndiBindingRegistry.java 2011-01-23 01:07:57 UTC (rev 10134)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/registry/JndiBindingRegistry.java 2011-01-23 01:38:06 UTC (rev 10135)
@@ -55,7 +55,7 @@
}
catch (NamingException e)
{
- throw new RuntimeException(e);
+ return null;
}
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2011-01-23 01:07:57 UTC (rev 10134)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2011-01-23 01:38:06 UTC (rev 10135)
@@ -526,6 +526,10 @@
{
throw new IllegalArgumentException("Factory does not exist");
}
+ if (registry.lookup(jndiBinding) != null)
+ {
+ throw new HornetQException(HornetQException.ADDRESS_EXISTS, "JNDI " + name + " is already being used by another connection factory");
+ }
boolean added = bindToJndi(jndiBinding, factory);
if (added)
{
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/client/StoreConfigTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/client/StoreConfigTest.java 2011-01-23 01:07:57 UTC (rev 10134)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/client/StoreConfigTest.java 2011-01-23 01:38:06 UTC (rev 10135)
@@ -25,6 +25,7 @@
import javax.jms.Session;
import javax.naming.NamingException;
+import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.jms.server.config.impl.ConnectionFactoryConfigurationImpl;
import org.hornetq.tests.util.JMSTestBase;
@@ -71,7 +72,18 @@
jmsServer.createConnectionFactory(false, nonPersisted, "/nonPersisted" );
+
+
+ try
+ {
+ jmsServer.addConnectionFactoryToJNDI("np", "/someCF");
+ fail("Failure expected and the API let duplicates");
+ }
+ catch (HornetQException expected)
+ {
+ }
+
openCon("/someCF");
openCon("/someCF2");
openCon("/nonPersisted");
@@ -89,6 +101,14 @@
jmsServer.start();
jmsServer.addConnectionFactoryToJNDI("tst", "/newJNDI");
+ try
+ {
+ jmsServer.addConnectionFactoryToJNDI("tst", "/newJNDI");
+ fail("Failure expected and the API let duplicates");
+ }
+ catch (HornetQException expected)
+ {
+ }
openCon("/someCF");
openCon("/someCF2");
openCon("/newJNDI");
13 years, 4 months