[hornetq-commits] JBoss hornetq SVN: r8965 - in branches/Clebert_TMP/src/main/org/hornetq/jms: persistence/impl/journal and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Mar 25 11:04:27 EDT 2010


Author: ataylor
Date: 2010-03-25 11:04:26 -0400 (Thu, 25 Mar 2010)
New Revision: 8965

Modified:
   branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/JMSStorageManager.java
   branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/PersistedDestination.java
   branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/journal/JournalJMSStorageManagerImpl.java
   branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/nullpm/NullJMSStorageManagerImpl.java
   branches/Clebert_TMP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
Log:
added JMS destination persistence

Modified: branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/JMSStorageManager.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/JMSStorageManager.java	2010-03-25 14:14:40 UTC (rev 8964)
+++ branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/JMSStorageManager.java	2010-03-25 15:04:26 UTC (rev 8965)
@@ -37,7 +37,9 @@
 
    // Public --------------------------------------------------------
 
-   void storeDestination(PersistedDestination destination);
+   void storeDestination(PersistedDestination destination) throws Exception;
+
+   void deleteDestination(String name) throws Exception;
    
    List<PersistedDestination> recoverDestinations();
    

Modified: branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/PersistedDestination.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/PersistedDestination.java	2010-03-25 14:14:40 UTC (rev 8964)
+++ branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/PersistedDestination.java	2010-03-25 15:04:26 UTC (rev 8965)
@@ -13,24 +13,71 @@
 
 package org.hornetq.jms.persistence;
 
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.utils.BufferHelper;
+import org.hornetq.utils.DataConstants;
+
 /**
  * A PersistedDestination
  *
  * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- *
- *
  */
-public class PersistedDestination
+public class PersistedDestination implements EncodingSupport
 {
 
+
    // Constants -----------------------------------------------------
 
+   public enum Type
+   {
+      QUEUE,
+      TOPIC;
+
+      public int getType()
+      {
+         return this == QUEUE ? 1 : 2;
+      }
+
+      static Type getType(int type)
+      {
+         return type == 1 ? QUEUE : TOPIC;
+      }
+   }
    // Attributes ----------------------------------------------------
 
+   private long id;
+
+   private Type type;
+
+   private String name;
+
+   private String jndiBinding;
+
+   private String selector;
+
+   private boolean durable;
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
+   public PersistedDestination()
+   {
+   }
+
+   public PersistedDestination(final Type type, final String name, final String jndiBinding)
+   {
+      this(type, name, jndiBinding, null, false);
+   }
+
+   public PersistedDestination(final Type type, final String name, final String jndiBinding, final String selector, final boolean durable)
+   {
+      this.type = type;
+      this.name = name;
+      this.jndiBinding = jndiBinding;
+      this.selector = selector;
+      this.durable = durable;
+   }
    // Public --------------------------------------------------------
 
    // Package protected ---------------------------------------------
@@ -41,4 +88,66 @@
 
    // Inner classes -------------------------------------------------
 
+
+   public long getId()
+   {
+      return id;
+   }
+
+   public void setId(final long id)
+   {
+      this.id = id;
+   }
+
+   public String getName()
+   {
+      return name;
+   }
+
+   public String getJndiBinding()
+   {
+      return jndiBinding;
+   }
+
+   public Type getType()
+   {
+      return type;
+   }
+
+   public String getSelector()
+   {
+      return selector;
+   }
+
+   public boolean isDurable()
+   {
+      return durable;
+   }
+
+   public int getEncodeSize()
+   {
+      return DataConstants.SIZE_INT +
+            BufferHelper.sizeOfSimpleString(name) +
+            BufferHelper.sizeOfSimpleString(jndiBinding) +
+            BufferHelper.sizeOfNullableSimpleString(selector) +
+            DataConstants.SIZE_BOOLEAN;
+   }
+
+   public void encode(final HornetQBuffer buffer)
+   {
+      buffer.writeInt(type.getType());
+      buffer.writeString(name);
+      buffer.writeString(jndiBinding);
+      buffer.writeNullableString(selector);
+      buffer.writeBoolean(durable);
+   }
+
+   public void decode(final HornetQBuffer buffer)
+   {
+      type = Type.getType(buffer.readInt());
+      name = buffer.readString();
+      jndiBinding = buffer.readString();
+      selector = buffer.readNullableString();
+      durable = buffer.readBoolean();
+   }
 }

Modified: branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/journal/JournalJMSStorageManagerImpl.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/journal/JournalJMSStorageManagerImpl.java	2010-03-25 14:14:40 UTC (rev 8964)
+++ branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/journal/JournalJMSStorageManagerImpl.java	2010-03-25 15:04:26 UTC (rev 8965)
@@ -51,6 +51,8 @@
    // Constants -----------------------------------------------------
 
    private final byte CF_RECORD = 1;
+
+   private final byte DESTINATION_RECORD = 2;
    
    // Attributes ----------------------------------------------------
 
@@ -66,6 +68,8 @@
    
    private Map<String, PersistedConnectionFactory> mapFactories = new ConcurrentHashMap<String, PersistedConnectionFactory>();
 
+   private Map<String, PersistedDestination> destinations = new ConcurrentHashMap<String, PersistedDestination>();
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -125,7 +129,7 @@
    /* (non-Javadoc)
     * @see org.hornetq.jms.persistence.JMSStorageManager#storeConnectionFactory(org.hornetq.jms.persistence.PersistedConnectionFactory)
     */
-   public void storeConnectionFactory(PersistedConnectionFactory connectionFactory) throws Exception
+   public void storeConnectionFactory(final PersistedConnectionFactory connectionFactory) throws Exception
    {
       deleteConnectionFactory(connectionFactory.getName());
       long id = idGenerator.generateID();
@@ -134,7 +138,7 @@
       mapFactories.put(connectionFactory.getName(), connectionFactory);
    }
    
-   public void deleteConnectionFactory(String cfName) throws Exception
+   public void deleteConnectionFactory(final String cfName) throws Exception
    {
       PersistedConnectionFactory oldCF = mapFactories.remove(cfName);
       if (oldCF != null)
@@ -148,18 +152,31 @@
     */
    public List<PersistedDestination> recoverDestinations()
    {
-      return null;
+      List<PersistedDestination> destinations = new ArrayList<PersistedDestination>(this.destinations.size());
+      destinations.addAll(this.destinations.values());
+      return destinations;
    }
 
    /* (non-Javadoc)
     * @see org.hornetq.jms.persistence.JMSStorageManager#storeDestination(org.hornetq.jms.persistence.PersistedDestination)
     */
-   public void storeDestination(PersistedDestination destination)
+   public void storeDestination(final PersistedDestination destination) throws Exception
    {
-      // TODO Auto-generated method stub
-      
+      deleteDestination(destination.getName());
+      long id = idGenerator.generateID();
+      destination.setId(id);
+      jmsJournal.appendAddRecord(id, DESTINATION_RECORD, destination, true);
+      destinations.put(destination.getName(), destination);
    }
 
+   public void deleteDestination(final String name) throws Exception
+   {
+      PersistedDestination destination = destinations.get(name);
+      if(destination != null)
+      {
+         jmsJournal.appendDeleteRecord(destination.getId(), false);
+      }
+   }
 
    /* (non-Javadoc)
     * @see org.hornetq.core.server.HornetQComponent#isStarted()
@@ -225,6 +242,13 @@
             cf.setId(id);
             mapFactories.put(cf.getName(), cf);
          }
+         else if(rec == DESTINATION_RECORD)
+         {
+            PersistedDestination destination = new PersistedDestination();
+            destination.decode(buffer);
+            destination.setId(id);
+            destinations.put(destination.getName(), destination);
+         }
          else
          {
             throw new IllegalStateException("Invalid record type " + rec);

Modified: branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/nullpm/NullJMSStorageManagerImpl.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/nullpm/NullJMSStorageManagerImpl.java	2010-03-25 14:14:40 UTC (rev 8964)
+++ branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/nullpm/NullJMSStorageManagerImpl.java	2010-03-25 15:04:26 UTC (rev 8965)
@@ -68,6 +68,14 @@
    {
    }
 
+    /* (non-Javadoc)
+    * @see org.hornetq.jms.persistence.JMSStorageManager#deleteDestination (org.hornetq.jms.persistence.PersistedDestination)
+    */
+   public void deleteDestination(String name) throws Exception
+   {
+
+   }
+
    /* (non-Javadoc)
     * @see org.hornetq.core.server.HornetQComponent#isStarted()
     */

Modified: branches/Clebert_TMP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java	2010-03-25 14:14:40 UTC (rev 8964)
+++ branches/Clebert_TMP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java	2010-03-25 15:04:26 UTC (rev 8965)
@@ -44,6 +44,7 @@
 import org.hornetq.jms.client.SelectorTranslator;
 import org.hornetq.jms.persistence.JMSStorageManager;
 import org.hornetq.jms.persistence.PersistedConnectionFactory;
+import org.hornetq.jms.persistence.PersistedDestination;
 import org.hornetq.jms.persistence.impl.journal.JournalJMSStorageManagerImpl;
 import org.hornetq.jms.persistence.impl.nullpm.NullJMSStorageManagerImpl;
 import org.hornetq.jms.server.JMSServerManager;
@@ -55,6 +56,9 @@
 import org.hornetq.jms.server.management.impl.JMSManagementServiceImpl;
 import org.hornetq.utils.TimeAndCounterIDGenerator;
 
+import static org.hornetq.jms.persistence.PersistedDestination.Type.QUEUE;
+import static org.hornetq.jms.persistence.PersistedDestination.Type.TOPIC;
+
 /**
  * A Deployer used to create and add to JNDI queues, topics and connection
  * factories. Typically this would only be used in an app server env.
@@ -295,50 +299,22 @@
                                            final boolean durable) throws Exception
    {
       checkInitialised();
-      HornetQDestination jBossQueue = HornetQDestination.createQueue(queueName);
 
-      // Convert from JMS selector to core filter
-      String coreFilterString = null;
+      boolean added = internalCreateQueue(queueName, jndiBinding, selectorString, durable);
 
-      if (selectorString != null)
-      {
-         coreFilterString = SelectorTranslator.convertToHornetQFilterString(selectorString);
-      }
+      storage.storeDestination(new PersistedDestination(QUEUE, queueName, jndiBinding, selectorString, durable));
 
-      server.getHornetQServerControl().deployQueue(jBossQueue.getAddress(),
-                                                   jBossQueue.getAddress(),
-                                                   coreFilterString,
-                                                   durable);
-
-      boolean added = bindToJndi(jndiBinding, jBossQueue);
-
-      if (added)
-      {
-         addToDestinationBindings(queueName, jndiBinding);
-      }
-
-      jmsManagementService.registerQueue(jBossQueue, jndiBinding);
       return added;
    }
 
    public synchronized boolean createTopic(final String topicName, final String jndiBinding) throws Exception
    {
       checkInitialised();
-      HornetQDestination jBossTopic = HornetQDestination.createTopic(topicName);
-      // We create a dummy subscription on the topic, that never receives messages - this is so we can perform JMS
-      // checks when routing messages to a topic that
-      // does not exist - otherwise we would not be able to distinguish from a non existent topic and one with no
-      // subscriptions - core has no notion of a topic
-      server.getHornetQServerControl().deployQueue(jBossTopic.getAddress(),
-                                                   jBossTopic.getAddress(),
-                                                   JMSServerManagerImpl.REJECT_FILTER,
-                                                   true);
-      boolean added = bindToJndi(jndiBinding, jBossTopic);
-      if (added)
-      {
-         addToDestinationBindings(topicName, jndiBinding);
-      }
-      jmsManagementService.registerTopic(jBossTopic, jndiBinding);
+
+      boolean added = internalCreateTopic(topicName, jndiBinding);
+
+      storage.storeDestination(new PersistedDestination(TOPIC, topicName, jndiBinding));
+
       return added;
    }
 
@@ -371,7 +347,7 @@
       destinations.remove(name);
       jmsManagementService.unregisterQueue(name);
       server.getHornetQServerControl().destroyQueue(HornetQDestination.createQueueAddressFromName(name).toString());
-
+      storage.deleteDestination(name);
       return true;
    }
 
@@ -401,6 +377,7 @@
             }
          }
       }
+      storage.deleteDestination(name);
       return true;
    }
 
@@ -616,6 +593,59 @@
       storage.storeConnectionFactory(new PersistedConnectionFactory(cfConfig));
    }
 
+   private boolean internalCreateQueue(final String queueName,
+                                           final String jndiBinding,
+                                           final String selectorString,
+                                           final boolean durable) throws Exception
+   {
+      HornetQDestination jBossQueue = HornetQDestination.createQueue(queueName);
+
+      // Convert from JMS selector to core filter
+      String coreFilterString = null;
+
+      if (selectorString != null)
+      {
+         coreFilterString = SelectorTranslator.convertToHornetQFilterString(selectorString);
+      }
+
+      server.getHornetQServerControl().deployQueue(jBossQueue.getAddress(),
+                                                   jBossQueue.getAddress(),
+                                                   coreFilterString,
+                                                   durable);
+
+      boolean added = bindToJndi(jndiBinding, jBossQueue);
+
+      if (added)
+      {
+         addToDestinationBindings(queueName, jndiBinding);
+      }
+
+      jmsManagementService.registerQueue(jBossQueue, jndiBinding);
+
+      return added;
+   }
+
+   private boolean internalCreateTopic(final String topicName, final String jndiBinding) throws Exception
+   {
+      HornetQDestination jBossTopic = HornetQDestination.createTopic(topicName);
+      // We create a dummy subscription on the topic, that never receives messages - this is so we can perform JMS
+      // checks when routing messages to a topic that
+      // does not exist - otherwise we would not be able to distinguish from a non existent topic and one with no
+      // subscriptions - core has no notion of a topic
+      server.getHornetQServerControl().deployQueue(jBossTopic.getAddress(),
+                                                   jBossTopic.getAddress(),
+                                                   JMSServerManagerImpl.REJECT_FILTER,
+                                                   true);
+      boolean added = bindToJndi(jndiBinding, jBossTopic);
+      if (added)
+      {
+         addToDestinationBindings(topicName, jndiBinding);
+      }
+      jmsManagementService.registerTopic(jBossTopic, jndiBinding);
+
+      return added;
+   }
+
    /**
     * @param cfConfig
     * @throws HornetQException
@@ -995,6 +1025,20 @@
       {
          internalCreateCF(cf.getConfig());
       }
+
+      List<PersistedDestination> destinations = storage.recoverDestinations();
+
+      for (PersistedDestination destination : destinations)
+      {
+         if(destination.getType() == QUEUE)
+         {
+            internalCreateQueue(destination.getName(), destination.getJndiBinding(), destination.getSelector(), destination.isDurable());
+         }
+         else if(destination.getType() == TOPIC)
+         {
+            internalCreateTopic(destination.getName(), destination.getJndiBinding());   
+         }
+      }
    }
 
 



More information about the hornetq-commits mailing list