[hornetq-commits] JBoss hornetq SVN: r8965 - in branches/Clebert_TMP/src/main/org/hornetq/jms: persistence/impl/journal and 2 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Thu Mar 25 11:04:27 EDT 2010
Author: ataylor
Date: 2010-03-25 11:04:26 -0400 (Thu, 25 Mar 2010)
New Revision: 8965
Modified:
branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/JMSStorageManager.java
branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/PersistedDestination.java
branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/journal/JournalJMSStorageManagerImpl.java
branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/nullpm/NullJMSStorageManagerImpl.java
branches/Clebert_TMP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
Log:
added JMS destination persistence
Modified: branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/JMSStorageManager.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/JMSStorageManager.java 2010-03-25 14:14:40 UTC (rev 8964)
+++ branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/JMSStorageManager.java 2010-03-25 15:04:26 UTC (rev 8965)
@@ -37,7 +37,9 @@
// Public --------------------------------------------------------
- void storeDestination(PersistedDestination destination);
+ void storeDestination(PersistedDestination destination) throws Exception;
+
+ void deleteDestination(String name) throws Exception;
List<PersistedDestination> recoverDestinations();
Modified: branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/PersistedDestination.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/PersistedDestination.java 2010-03-25 14:14:40 UTC (rev 8964)
+++ branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/PersistedDestination.java 2010-03-25 15:04:26 UTC (rev 8965)
@@ -13,24 +13,71 @@
package org.hornetq.jms.persistence;
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.utils.BufferHelper;
+import org.hornetq.utils.DataConstants;
+
/**
* A PersistedDestination
*
* @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- *
- *
*/
-public class PersistedDestination
+public class PersistedDestination implements EncodingSupport
{
+
// Constants -----------------------------------------------------
+ public enum Type
+ {
+ QUEUE,
+ TOPIC;
+
+ public int getType()
+ {
+ return this == QUEUE ? 1 : 2;
+ }
+
+ static Type getType(int type)
+ {
+ return type == 1 ? QUEUE : TOPIC;
+ }
+ }
// Attributes ----------------------------------------------------
+ private long id;
+
+ private Type type;
+
+ private String name;
+
+ private String jndiBinding;
+
+ private String selector;
+
+ private boolean durable;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
+ public PersistedDestination()
+ {
+ }
+
+ public PersistedDestination(final Type type, final String name, final String jndiBinding)
+ {
+ this(type, name, jndiBinding, null, false);
+ }
+
+ public PersistedDestination(final Type type, final String name, final String jndiBinding, final String selector, final boolean durable)
+ {
+ this.type = type;
+ this.name = name;
+ this.jndiBinding = jndiBinding;
+ this.selector = selector;
+ this.durable = durable;
+ }
// Public --------------------------------------------------------
// Package protected ---------------------------------------------
@@ -41,4 +88,66 @@
// Inner classes -------------------------------------------------
+
+ public long getId()
+ {
+ return id;
+ }
+
+ public void setId(final long id)
+ {
+ this.id = id;
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+
+ public String getJndiBinding()
+ {
+ return jndiBinding;
+ }
+
+ public Type getType()
+ {
+ return type;
+ }
+
+ public String getSelector()
+ {
+ return selector;
+ }
+
+ public boolean isDurable()
+ {
+ return durable;
+ }
+
+ public int getEncodeSize()
+ {
+ return DataConstants.SIZE_INT +
+ BufferHelper.sizeOfSimpleString(name) +
+ BufferHelper.sizeOfSimpleString(jndiBinding) +
+ BufferHelper.sizeOfNullableSimpleString(selector) +
+ DataConstants.SIZE_BOOLEAN;
+ }
+
+ public void encode(final HornetQBuffer buffer)
+ {
+ buffer.writeInt(type.getType());
+ buffer.writeString(name);
+ buffer.writeString(jndiBinding);
+ buffer.writeNullableString(selector);
+ buffer.writeBoolean(durable);
+ }
+
+ public void decode(final HornetQBuffer buffer)
+ {
+ type = Type.getType(buffer.readInt());
+ name = buffer.readString();
+ jndiBinding = buffer.readString();
+ selector = buffer.readNullableString();
+ durable = buffer.readBoolean();
+ }
}
Modified: branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/journal/JournalJMSStorageManagerImpl.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/journal/JournalJMSStorageManagerImpl.java 2010-03-25 14:14:40 UTC (rev 8964)
+++ branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/journal/JournalJMSStorageManagerImpl.java 2010-03-25 15:04:26 UTC (rev 8965)
@@ -51,6 +51,8 @@
// Constants -----------------------------------------------------
private final byte CF_RECORD = 1;
+
+ private final byte DESTINATION_RECORD = 2;
// Attributes ----------------------------------------------------
@@ -66,6 +68,8 @@
private Map<String, PersistedConnectionFactory> mapFactories = new ConcurrentHashMap<String, PersistedConnectionFactory>();
+ private Map<String, PersistedDestination> destinations = new ConcurrentHashMap<String, PersistedDestination>();
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -125,7 +129,7 @@
/* (non-Javadoc)
* @see org.hornetq.jms.persistence.JMSStorageManager#storeConnectionFactory(org.hornetq.jms.persistence.PersistedConnectionFactory)
*/
- public void storeConnectionFactory(PersistedConnectionFactory connectionFactory) throws Exception
+ public void storeConnectionFactory(final PersistedConnectionFactory connectionFactory) throws Exception
{
deleteConnectionFactory(connectionFactory.getName());
long id = idGenerator.generateID();
@@ -134,7 +138,7 @@
mapFactories.put(connectionFactory.getName(), connectionFactory);
}
- public void deleteConnectionFactory(String cfName) throws Exception
+ public void deleteConnectionFactory(final String cfName) throws Exception
{
PersistedConnectionFactory oldCF = mapFactories.remove(cfName);
if (oldCF != null)
@@ -148,18 +152,31 @@
*/
public List<PersistedDestination> recoverDestinations()
{
- return null;
+ List<PersistedDestination> destinations = new ArrayList<PersistedDestination>(this.destinations.size());
+ destinations.addAll(this.destinations.values());
+ return destinations;
}
/* (non-Javadoc)
* @see org.hornetq.jms.persistence.JMSStorageManager#storeDestination(org.hornetq.jms.persistence.PersistedDestination)
*/
- public void storeDestination(PersistedDestination destination)
+ public void storeDestination(final PersistedDestination destination) throws Exception
{
- // TODO Auto-generated method stub
-
+ deleteDestination(destination.getName());
+ long id = idGenerator.generateID();
+ destination.setId(id);
+ jmsJournal.appendAddRecord(id, DESTINATION_RECORD, destination, true);
+ destinations.put(destination.getName(), destination);
}
+ public void deleteDestination(final String name) throws Exception
+ {
+ PersistedDestination destination = destinations.get(name);
+ if(destination != null)
+ {
+ jmsJournal.appendDeleteRecord(destination.getId(), false);
+ }
+ }
/* (non-Javadoc)
* @see org.hornetq.core.server.HornetQComponent#isStarted()
@@ -225,6 +242,13 @@
cf.setId(id);
mapFactories.put(cf.getName(), cf);
}
+ else if(rec == DESTINATION_RECORD)
+ {
+ PersistedDestination destination = new PersistedDestination();
+ destination.decode(buffer);
+ destination.setId(id);
+ destinations.put(destination.getName(), destination);
+ }
else
{
throw new IllegalStateException("Invalid record type " + rec);
Modified: branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/nullpm/NullJMSStorageManagerImpl.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/nullpm/NullJMSStorageManagerImpl.java 2010-03-25 14:14:40 UTC (rev 8964)
+++ branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/nullpm/NullJMSStorageManagerImpl.java 2010-03-25 15:04:26 UTC (rev 8965)
@@ -68,6 +68,14 @@
{
}
+ /* (non-Javadoc)
+ * @see org.hornetq.jms.persistence.JMSStorageManager#deleteDestination (org.hornetq.jms.persistence.PersistedDestination)
+ */
+ public void deleteDestination(String name) throws Exception
+ {
+
+ }
+
/* (non-Javadoc)
* @see org.hornetq.core.server.HornetQComponent#isStarted()
*/
Modified: branches/Clebert_TMP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2010-03-25 14:14:40 UTC (rev 8964)
+++ branches/Clebert_TMP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2010-03-25 15:04:26 UTC (rev 8965)
@@ -44,6 +44,7 @@
import org.hornetq.jms.client.SelectorTranslator;
import org.hornetq.jms.persistence.JMSStorageManager;
import org.hornetq.jms.persistence.PersistedConnectionFactory;
+import org.hornetq.jms.persistence.PersistedDestination;
import org.hornetq.jms.persistence.impl.journal.JournalJMSStorageManagerImpl;
import org.hornetq.jms.persistence.impl.nullpm.NullJMSStorageManagerImpl;
import org.hornetq.jms.server.JMSServerManager;
@@ -55,6 +56,9 @@
import org.hornetq.jms.server.management.impl.JMSManagementServiceImpl;
import org.hornetq.utils.TimeAndCounterIDGenerator;
+import static org.hornetq.jms.persistence.PersistedDestination.Type.QUEUE;
+import static org.hornetq.jms.persistence.PersistedDestination.Type.TOPIC;
+
/**
* A Deployer used to create and add to JNDI queues, topics and connection
* factories. Typically this would only be used in an app server env.
@@ -295,50 +299,22 @@
final boolean durable) throws Exception
{
checkInitialised();
- HornetQDestination jBossQueue = HornetQDestination.createQueue(queueName);
- // Convert from JMS selector to core filter
- String coreFilterString = null;
+ boolean added = internalCreateQueue(queueName, jndiBinding, selectorString, durable);
- if (selectorString != null)
- {
- coreFilterString = SelectorTranslator.convertToHornetQFilterString(selectorString);
- }
+ storage.storeDestination(new PersistedDestination(QUEUE, queueName, jndiBinding, selectorString, durable));
- server.getHornetQServerControl().deployQueue(jBossQueue.getAddress(),
- jBossQueue.getAddress(),
- coreFilterString,
- durable);
-
- boolean added = bindToJndi(jndiBinding, jBossQueue);
-
- if (added)
- {
- addToDestinationBindings(queueName, jndiBinding);
- }
-
- jmsManagementService.registerQueue(jBossQueue, jndiBinding);
return added;
}
public synchronized boolean createTopic(final String topicName, final String jndiBinding) throws Exception
{
checkInitialised();
- HornetQDestination jBossTopic = HornetQDestination.createTopic(topicName);
- // We create a dummy subscription on the topic, that never receives messages - this is so we can perform JMS
- // checks when routing messages to a topic that
- // does not exist - otherwise we would not be able to distinguish from a non existent topic and one with no
- // subscriptions - core has no notion of a topic
- server.getHornetQServerControl().deployQueue(jBossTopic.getAddress(),
- jBossTopic.getAddress(),
- JMSServerManagerImpl.REJECT_FILTER,
- true);
- boolean added = bindToJndi(jndiBinding, jBossTopic);
- if (added)
- {
- addToDestinationBindings(topicName, jndiBinding);
- }
- jmsManagementService.registerTopic(jBossTopic, jndiBinding);
+
+ boolean added = internalCreateTopic(topicName, jndiBinding);
+
+ storage.storeDestination(new PersistedDestination(TOPIC, topicName, jndiBinding));
+
return added;
}
@@ -371,7 +347,7 @@
destinations.remove(name);
jmsManagementService.unregisterQueue(name);
server.getHornetQServerControl().destroyQueue(HornetQDestination.createQueueAddressFromName(name).toString());
-
+ storage.deleteDestination(name);
return true;
}
@@ -401,6 +377,7 @@
}
}
}
+ storage.deleteDestination(name);
return true;
}
@@ -616,6 +593,59 @@
storage.storeConnectionFactory(new PersistedConnectionFactory(cfConfig));
}
+ private boolean internalCreateQueue(final String queueName,
+ final String jndiBinding,
+ final String selectorString,
+ final boolean durable) throws Exception
+ {
+ HornetQDestination jBossQueue = HornetQDestination.createQueue(queueName);
+
+ // Convert from JMS selector to core filter
+ String coreFilterString = null;
+
+ if (selectorString != null)
+ {
+ coreFilterString = SelectorTranslator.convertToHornetQFilterString(selectorString);
+ }
+
+ server.getHornetQServerControl().deployQueue(jBossQueue.getAddress(),
+ jBossQueue.getAddress(),
+ coreFilterString,
+ durable);
+
+ boolean added = bindToJndi(jndiBinding, jBossQueue);
+
+ if (added)
+ {
+ addToDestinationBindings(queueName, jndiBinding);
+ }
+
+ jmsManagementService.registerQueue(jBossQueue, jndiBinding);
+
+ return added;
+ }
+
+ private boolean internalCreateTopic(final String topicName, final String jndiBinding) throws Exception
+ {
+ HornetQDestination jBossTopic = HornetQDestination.createTopic(topicName);
+ // We create a dummy subscription on the topic, that never receives messages - this is so we can perform JMS
+ // checks when routing messages to a topic that
+ // does not exist - otherwise we would not be able to distinguish from a non existent topic and one with no
+ // subscriptions - core has no notion of a topic
+ server.getHornetQServerControl().deployQueue(jBossTopic.getAddress(),
+ jBossTopic.getAddress(),
+ JMSServerManagerImpl.REJECT_FILTER,
+ true);
+ boolean added = bindToJndi(jndiBinding, jBossTopic);
+ if (added)
+ {
+ addToDestinationBindings(topicName, jndiBinding);
+ }
+ jmsManagementService.registerTopic(jBossTopic, jndiBinding);
+
+ return added;
+ }
+
/**
* @param cfConfig
* @throws HornetQException
@@ -995,6 +1025,20 @@
{
internalCreateCF(cf.getConfig());
}
+
+ List<PersistedDestination> destinations = storage.recoverDestinations();
+
+ for (PersistedDestination destination : destinations)
+ {
+ if(destination.getType() == QUEUE)
+ {
+ internalCreateQueue(destination.getName(), destination.getJndiBinding(), destination.getSelector(), destination.isDurable());
+ }
+ else if(destination.getType() == TOPIC)
+ {
+ internalCreateTopic(destination.getName(), destination.getJndiBinding());
+ }
+ }
}
More information about the hornetq-commits
mailing list