[jboss-cvs] JBoss Messaging SVN: r4978 - in branches/Branch_Stable: src/main/org/jboss/jms/server and 5 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Sep 17 22:58:10 EDT 2008
Author: gaohoward
Date: 2008-09-17 22:58:10 -0400 (Wed, 17 Sep 2008)
New Revision: 4978
Added:
branches/Branch_Stable/tests/src/org/jboss/test/messaging/core/impl/
branches/Branch_Stable/tests/src/org/jboss/test/messaging/core/impl/NullPersistenceManagerTest.java
branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/NullPersistenceClusterTest.java
Modified:
branches/Branch_Stable/src/etc/server/default/deploy/null-persistence-service.xml
branches/Branch_Stable/src/main/org/jboss/jms/server/ServerPeer.java
branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/NullPersistenceManager.java
branches/Branch_Stable/tests/etc/container.xml
Log:
JBMESSAGING-1376
Modified: branches/Branch_Stable/src/etc/server/default/deploy/null-persistence-service.xml
===================================================================
--- branches/Branch_Stable/src/etc/server/default/deploy/null-persistence-service.xml 2008-09-18 02:49:57 UTC (rev 4977)
+++ branches/Branch_Stable/src/etc/server/default/deploy/null-persistence-service.xml 2008-09-18 02:58:10 UTC (rev 4978)
@@ -32,7 +32,7 @@
<!-- This post office is clustered. If you don't want a clustered post office then set to false -->
- <attribute name="Clustered">false</attribute>
+ <attribute name="Clustered">true</attribute>
<!-- All the remaining properties only have to be specified if the post office is clustered.
You can safely comment them out if your post office is non clustered -->
Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/ServerPeer.java 2008-09-18 02:49:57 UTC (rev 4977)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/ServerPeer.java 2008-09-18 02:58:10 UTC (rev 4978)
@@ -69,6 +69,7 @@
import org.jboss.messaging.core.impl.FailoverWaiter;
import org.jboss.messaging.core.impl.IDManager;
import org.jboss.messaging.core.impl.JDBCPersistenceManager;
+import org.jboss.messaging.core.impl.NullPersistenceManager;
import org.jboss.messaging.core.impl.RotatingID;
import org.jboss.messaging.core.impl.clusterconnection.ClusterConnectionManager;
import org.jboss.messaging.core.impl.memory.SimpleMemoryManager;
@@ -165,6 +166,9 @@
//From a system property - this overrides
private boolean strictTckProperty;
+ // For generating unique Channel ID for cluster without a shared DB
+ private long serverStartTime;
+
// wired components
private DestinationJNDIMapper destinationJNDIMapper;
@@ -247,6 +251,10 @@
{
((JDBCPersistenceManager)persistenceManager).injectNodeID(serverPeerID);
}
+ else if (persistenceManager instanceof NullPersistenceManager)
+ {
+ ((NullPersistenceManager)persistenceManager).initCounter(serverPeerID, serverStartTime);
+ }
jmsUserManager = (JMSUserManager)JMXAccessor.getJMXAttributeOverSecurity(mbeanServer, jmsUserManagerObjectName, "Instance");
Modified: branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/NullPersistenceManager.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/NullPersistenceManager.java 2008-09-18 02:49:57 UTC (rev 4977)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/NullPersistenceManager.java 2008-09-18 02:58:10 UTC (rev 4978)
@@ -1,3 +1,24 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005, JBoss Inc., and
+ * individual contributors as indicated by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
package org.jboss.messaging.core.impl;
import java.util.Collections;
@@ -4,151 +25,270 @@
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicLong;
import org.jboss.messaging.core.contract.MessageReference;
import org.jboss.messaging.core.contract.PersistenceManager;
import org.jboss.messaging.core.impl.tx.Transaction;
-/**
+/*
*
* A NullPersistenceManager
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
*/
public class NullPersistenceManager implements PersistenceManager
{
- private ConcurrentMap<String, AtomicLong> counters = new ConcurrentHashMap<String, AtomicLong>();
+ private static final int MAX_PEER_ID = 255;
- public void addReference(long channelID, MessageReference ref, Transaction tx)
- throws Exception
- {
- //NOOP
- }
+ private ConcurrentMap<String, IDCounter> counters = new ConcurrentHashMap<String, IDCounter>();
- public List getMessageChannelPairAcksForTx(long transactionId)
- throws Exception
- {
- return Collections.EMPTY_LIST;
- }
+ private int peerID; // 0 - 255
- public List getMessageChannelPairRefsForTx(long transactionId)
- throws Exception
- {
- return Collections.EMPTY_LIST;
- }
+ private long timeMark;
- public List getMessages(List messageIds) throws Exception
- {
- return Collections.EMPTY_LIST;
- }
+ public void addReference(long channelID, MessageReference ref, Transaction tx) throws Exception
+ {
+ // NOOP
+ }
- public List getPagedReferenceInfos(long channelID, long orderStart,
- int number) throws Exception
- {
- return Collections.EMPTY_LIST;
- }
+ public List getMessageChannelPairAcksForTx(long transactionId) throws Exception
+ {
+ return Collections.EMPTY_LIST;
+ }
- public boolean idExists(String messageID) throws Exception
- {
- return false;
- }
+ public List getMessageChannelPairRefsForTx(long transactionId) throws Exception
+ {
+ return Collections.EMPTY_LIST;
+ }
- public InitialLoadInfo loadFromStart(long channelID, int fullSize)
- throws Exception
- {
- return new InitialLoadInfo(null, null, Collections.EMPTY_LIST);
- }
+ public List getMessages(List messageIds) throws Exception
+ {
+ return Collections.EMPTY_LIST;
+ }
- public InitialLoadInfo mergeAndLoad(long fromChannelID, long toChannelID,
- int numberToLoad, long firstPagingOrder, long nextPagingOrder)
- throws Exception
- {
- return new InitialLoadInfo(null, null, Collections.EMPTY_LIST);
- }
+ public List getPagedReferenceInfos(long channelID, long orderStart, int number) throws Exception
+ {
+ return Collections.EMPTY_LIST;
+ }
- public void mergeTransactions(int fromNodeID, int toNodeID) throws Exception
- {
- //NOOP
- }
+ public boolean idExists(String messageID) throws Exception
+ {
+ return false;
+ }
- public void moveReference(long sourceChannelID, long destChannelID,
- MessageReference ref) throws Exception
- {
- //NOOP
- }
+ public InitialLoadInfo loadFromStart(long channelID, int fullSize) throws Exception
+ {
+ return new InitialLoadInfo(null, null, Collections.EMPTY_LIST);
+ }
- public void pageReferences(long channelID, List references, boolean paged)
- throws Exception
- {
- //NOOP
- }
+ public InitialLoadInfo mergeAndLoad(long fromChannelID,
+ long toChannelID,
+ int numberToLoad,
+ long firstPagingOrder,
+ long nextPagingOrder) throws Exception
+ {
+ return new InitialLoadInfo(null, null, Collections.EMPTY_LIST);
+ }
- public void removeDepagedReferences(long channelID, List refs)
- throws Exception
- {
- //NOOP
- }
+ public void mergeTransactions(int fromNodeID, int toNodeID) throws Exception
+ {
+ // NOOP
+ }
- public void removeReference(long channelID, MessageReference ref,
- Transaction tx) throws Exception
- {
- //NOOP
- }
-
- public long reserveIDBlock(String counterName, int size) throws Exception
- {
- AtomicLong counter = counters.get(counterName);
-
- if (counter == null)
- {
- counter = new AtomicLong(0);
-
- AtomicLong oldCounter = counters.putIfAbsent(counterName, counter);
-
- if (oldCounter != null)
- {
- counter = oldCounter;
- }
- }
-
- long idStart = counter.addAndGet(size) - size;
-
- return idStart;
- }
+ public void moveReference(long sourceChannelID, long destChannelID, MessageReference ref) throws Exception
+ {
+ // NOOP
+ }
- public List retrievePreparedTransactions() throws Exception
- {
- return Collections.EMPTY_LIST;
- }
+ public void pageReferences(long channelID, List references, boolean paged) throws Exception
+ {
+ // NOOP
+ }
- public void updateDeliveryCount(long channelID, MessageReference ref)
- throws Exception
- {
- //NOOP
- }
+ public void removeDepagedReferences(long channelID, List refs) throws Exception
+ {
+ // NOOP
+ }
- public void updatePageOrder(long channelID, List references)
- throws Exception
- {
- ///NOOP
- }
+ public void removeReference(long channelID, MessageReference ref, Transaction tx) throws Exception
+ {
+ // NOOP
+ }
- public void updateReferencesNotPagedInRange(long channelID, long orderStart,
- long orderEnd, long num) throws Exception
- {
- //NOOP
- }
+ public long reserveIDBlock(String counterName, int size) throws Exception
+ {
+ checkServerID();
+ IDCounter counter = counters.get(counterName);
- public void start() throws Exception
- {
- //NOOP
- }
+ if (counter == null)
+ {
+ synchronized (counters)
+ {
+ counter = counters.get(counterName);
+ if (counter == null)
+ {
+ counter = new IDCounter(this);
+ counters.put(counterName, counter);
+ }
+ }
+ }
+ long idStart = counter.reserveAndGetNextId(size);
- public void stop() throws Exception
- {
- //NOOP
- }
+ return idStart;
+ }
+ private void checkServerID() throws Exception
+ {
+ if (peerID > MAX_PEER_ID)
+ {
+ throw new Exception("ServerPeerID " + peerID + " exceeding 255");
+ }
+ if (peerID < 0)
+ {
+ throw new Exception("ServerPeerID cannot have negative values");
+ }
+ }
+
+ public List retrievePreparedTransactions() throws Exception
+ {
+ return Collections.EMPTY_LIST;
+ }
+
+ public void updateDeliveryCount(long channelID, MessageReference ref) throws Exception
+ {
+ // NOOP
+ }
+
+ public void updatePageOrder(long channelID, List references) throws Exception
+ {
+ // /NOOP
+ }
+
+ public void updateReferencesNotPagedInRange(long channelID, long orderStart, long orderEnd, long num) throws Exception
+ {
+ // NOOP
+ }
+
+ public void start() throws Exception
+ {
+ // NOOP
+ }
+
+ public void stop() throws Exception
+ {
+ // NOOP
+ }
+
+ public void initCounter(int serverPeerID, long serverStartTime)
+ {
+ setPeerID(serverPeerID);
+ setTimeMark(serverStartTime);
+ }
+
+ public void setPeerID(int peerID)
+ {
+ this.peerID = peerID;
+ }
+
+ public int getPeerID()
+ {
+ return peerID;
+ }
+
+ public void setTimeMark(long timeMark)
+ {
+ this.timeMark = timeMark;
+ }
+
+ public long getTimeMark()
+ {
+ return timeMark;
+ }
+
}
+
+class IDCounter
+{
+
+ private NullPersistenceManager manager;
+
+ private short counter;
+
+ private long tmMark;
+
+ private long peerIDBit;
+
+ public IDCounter(NullPersistenceManager pManager)
+ {
+ manager = pManager;
+ counter = 0;
+ tmMark = manager.getTimeMark() & MASK_TIME;
+ peerIDBit = (((long)manager.getPeerID()) & MASK_SERVER_PEER_ID) << 56;
+ recalculate();// avoid quick restart conflict
+ }
+
+ /**
+ * for each named counter, we generate it using the following algorithm:
+ * <8-bit ServerPeerID> + <40-bit time bit> + <16-bit counter>
+ * the 16-bit counter starts from zero and increases by 1. If the counter
+ * wraps to zero, we re-calculate the time using current time. Check will be
+ * performed when the calculated current time value is the same as the old
+ * value. If so, sleep for a while and get the current time value again.
+ * That will make sure the generated ID will always be unique even
+ * if server peer gets restarted from a previous shutting down.
+ *
+ * Note: the block size is limited by the counter (which is a short).
+ * if the block size is greater than Short.MAX_VALUE - counter, we
+ * will discard the counter and do a recalculate op because we cannot
+ * return a consecutive block of long values.
+ *
+ * @param size : size of the block to be reserved.
+ * @return
+ */
+ private static final long MASK_SERVER_PEER_ID = 0x00000000000000FFL;
+
+ private static final long MASK_TIME = 0x00000FFFFFFFFFF0L;
+
+ public synchronized long reserveAndGetNextId(int size) throws Exception
+ {
+ if (size > Short.MAX_VALUE)
+ {
+ throw new Exception("The block size exceeds " + Short.MAX_VALUE);
+ }
+ if (size > (Short.MAX_VALUE - counter))
+ {
+ recalculate();
+ }
+ long nextID = assembleID();
+ counter += size;
+
+ return nextID;
+ }
+
+ private long assembleID()
+ {
+ long id = peerIDBit;
+ id += tmMark << 12;
+ id += counter;
+ return id;
+ }
+
+ private void recalculate()
+ {
+ counter = 0;
+ long newTm = System.currentTimeMillis() & MASK_TIME;
+
+ while (newTm == tmMark)
+ {
+ try
+ {
+ Thread.sleep(20);
+ }
+ catch (InterruptedException e)
+ {
+ }
+ newTm = System.currentTimeMillis() & MASK_TIME;
+ }
+ tmMark = newTm;
+ }
+}
Modified: branches/Branch_Stable/tests/etc/container.xml
===================================================================
--- branches/Branch_Stable/tests/etc/container.xml 2008-09-18 02:49:57 UTC (rev 4977)
+++ branches/Branch_Stable/tests/etc/container.xml 2008-09-18 02:58:10 UTC (rev 4978)
@@ -12,6 +12,12 @@
<database>mysql</database>
<database-configurations>
+
+ <database-configuration name="null">
+ <url>jdbc:hsqldb:mem:test</url>
+ <driver>org.hsqldb.jdbcDriver</driver>
+ <username>sa</username>
+ </database-configuration>
<database-configuration name="hsqldb">
<url>jdbc:hsqldb:mem:test</url>
Added: branches/Branch_Stable/tests/src/org/jboss/test/messaging/core/impl/NullPersistenceManagerTest.java
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/core/impl/NullPersistenceManagerTest.java (rev 0)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/core/impl/NullPersistenceManagerTest.java 2008-09-18 02:58:10 UTC (rev 4978)
@@ -0,0 +1,157 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005, JBoss Inc., and
+ * individual contributors as indicated by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+package org.jboss.test.messaging.core.impl;
+
+import java.util.Hashtable;
+import java.util.Random;
+
+import org.jboss.messaging.core.impl.NullPersistenceManager;
+import org.jboss.test.messaging.MessagingTestCase;
+
+public class NullPersistenceManagerTest extends MessagingTestCase
+{
+
+ public NullPersistenceManagerTest(String name)
+ {
+ super(name);
+ }
+
+ /**
+ * To make sure that
+ * 1. the server peer ID must be between 0 and 255
+ * 2. the size should not exceed Short.MAX_VALUE
+ * 3. the block generated should be unique even
+ * after the manager was reset due to failures.
+ */
+ public void testReserveIDBlock()
+ {
+ NullPersistenceManager manager = new NullPersistenceManager();
+ manager.setPeerID(256);
+ manager.setTimeMark(System.currentTimeMillis());
+
+ try
+ {
+ manager.reserveIDBlock("testCounter", 255);
+ fail("Failed to throw exception when server peer ID exceeding 255.");
+ }
+ catch (Exception e)
+ {
+ log.debug(e.getMessage());
+ }
+
+ manager.setPeerID(-1);
+ try
+ {
+ manager.reserveIDBlock("testCounter", 255);
+ fail("Failed to throw exception when server peer ID negative.");
+ }
+ catch (Exception e)
+ {
+ log.debug(e.getMessage());
+ }
+
+ manager.setPeerID(255);
+ try
+ {
+ manager.reserveIDBlock("testCounter", Short.MAX_VALUE + 1);
+ fail("Failed to throw exception when block size exceeds Short.MAX_VALUE");
+ }
+ catch (Exception e)
+ {
+ log.debug(e.getMessage());
+ }
+
+ long nextID = 0;
+ int ftimes = 0;
+ Hashtable<String, Integer> data = new Hashtable<String, Integer>();
+ try
+ {
+ for (int i = 0; i < 500; ++i)
+ {
+ int szblock = getRandomBlocksize();
+ nextID = manager.reserveIDBlock("testCounter", szblock);
+ // simulate failure
+ if (getFailure(i))
+ {
+ manager = new NullPersistenceManager();
+ manager.setPeerID(255);
+ manager.setTimeMark(System.currentTimeMillis());
+ ftimes++;
+ }
+ checkData(data, nextID, szblock);
+ }
+ }
+ catch (Exception e)
+ {
+ fail("Exception calling reserveIDBlock()");
+ log.error(e);
+ }
+ log.debug("failure times: " + ftimes);
+ System.out.println("data in set: " + data.size());
+ System.out.println("failure times: " + ftimes);
+ System.gc();
+
+ }
+
+ private void checkData(Hashtable<String, Integer> data, long nID, int szblock)
+ {
+ long id = nID;
+ for (int i = 0; i < szblock; ++i)
+ {
+ // every ID should be unique
+ String key = Long.toHexString(id);
+ assertNull(data.get(key));
+ data.put(key, szblock);
+ id++;
+ }
+
+ }
+
+ // generate a number big enough to cause a wrap to happen.
+ private int getRandomBlocksize()
+ {
+ int base = 1024;
+ Random var = new Random();
+ base += var.nextInt(50);
+ return base;
+ }
+
+ // check if i can be divided by 53 plus a random
+ private boolean getFailure(int i)
+ {
+ Random var = new Random();
+ int num = 43 + var.nextInt(9);
+ boolean failure = ((i + 1) % num == 0);
+ if (var.nextBoolean())
+ {
+ try
+ {
+ Thread.sleep(var.nextInt(9) * 5);
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+ return failure;
+ }
+
+}
Added: branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/NullPersistenceClusterTest.java
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/NullPersistenceClusterTest.java (rev 0)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/NullPersistenceClusterTest.java 2008-09-18 02:58:10 UTC (rev 4978)
@@ -0,0 +1,106 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005, JBoss Inc., and
+ * individual contributors as indicated by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+package org.jboss.test.messaging.jms.clustering;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+public class NullPersistenceClusterTest extends ClusteringTestBase
+{
+
+ public NullPersistenceClusterTest(String name)
+ {
+ super(name);
+ }
+
+ protected void setUp() throws Exception
+ {
+ System.setProperty("test.database", "null");
+ System.setProperty("test.clustered", "true");
+ System.setProperty("remote", "true");
+ super.setUp();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ public void testSimpleMessaging() throws Exception
+ {
+ Connection conn = null;
+
+ try
+ {
+ conn = cf.createConnection();
+
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons = sess.createConsumer(queue[0]);
+
+ conn.start();
+
+ MessageProducer prod = sess.createProducer(queue[0]);
+
+ final int NUM_MESSAGES = 100;
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess.createTextMessage("message-" + i);
+
+ prod.send(tm);
+ }
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message-" + i, tm.getText());
+ }
+
+ Message m = cons.receive(2000);
+
+ assertNull(m);
+
+ cons.close();
+ sess.close();
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+}
More information about the jboss-cvs-commits
mailing list