[hornetq-commits] JBoss hornetq SVN: r10702 - in branches/Branch_2_2_EAP_export_tool: src/main/org/hornetq/api/core/client and 7 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu May 19 17:27:15 EDT 2011


Author: jicken
Date: 2011-05-19 17:27:14 -0400 (Thu, 19 May 2011)
New Revision: 10702

Added:
   branches/Branch_2_2_EAP_export_tool/.gitignore
Modified:
   branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/api/core/client/ClientSession.java
   branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
   branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/ManageDataTool.java
   branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/xmlmodel/MessageType.java
   branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/xmlmodel/MessagesExportType.java
   branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/xmlmodel/QueueType.java
   branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage.java
   branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/server/QueueQueryResult.java
   branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   branches/Branch_2_2_EAP_export_tool/tests/src/org/hornetq/tests/integration/persistence/ExportDataTest.java
Log:
got rid of reading the target journal
queueQuery used to query if the queue exists; if not, then create the queue
the queue query response object now passes the id of the queue as this is needed to map old/new queue ids

Added: branches/Branch_2_2_EAP_export_tool/.gitignore
===================================================================
--- branches/Branch_2_2_EAP_export_tool/.gitignore	                        (rev 0)
+++ branches/Branch_2_2_EAP_export_tool/.gitignore	2011-05-19 21:27:14 UTC (rev 10702)
@@ -0,0 +1,4 @@
+target
+thirdparty
+native
+build

Modified: branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/api/core/client/ClientSession.java
===================================================================
--- branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/api/core/client/ClientSession.java	2011-05-19 15:00:06 UTC (rev 10701)
+++ branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/api/core/client/ClientSession.java	2011-05-19 21:27:14 UTC (rev 10702)
@@ -57,6 +57,11 @@
    public interface QueueQuery
    {
       /**
+       * @return the ID of the queue.
+       */
+      long getId();
+
+      /**
        * Returns <code>true</code> if the queue exists, <code>false</code> else.
        */
       boolean isExists();

Modified: branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2011-05-19 15:00:06 UTC (rev 10701)
+++ branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2011-05-19 21:27:14 UTC (rev 10702)
@@ -363,7 +363,9 @@
 
       SessionQueueQueryResponseMessage response = (SessionQueueQueryResponseMessage)channel.sendBlocking(request);
 
-      return new QueueQueryImpl(response.isDurable(),
+      return new QueueQueryImpl(response.getId(),
+                                 response.getName(),
+                                 response.isDurable(),
                                 response.getConsumerCount(),
                                 response.getMessageCount(),
                                 response.getFilterString(),
@@ -1855,9 +1857,11 @@
 
    private static class QueueQueryImpl implements QueueQuery
    {
+      private final long id;
 
+      private final SimpleString name;
+
       private final boolean exists;
-
       private final boolean durable;
 
       private final long messageCount;
@@ -1868,14 +1872,17 @@
 
       private final SimpleString address;
 
-      public QueueQueryImpl(final boolean durable,
+      public QueueQueryImpl(final long id,
+                            final SimpleString name,
+                            final boolean durable,
                             final int consumerCount,
                             final long messageCount,
                             final SimpleString filterString,
                             final SimpleString address,
                             final boolean exists)
       {
-
+         this.id = id;
+         this.name = name;
          this.durable = durable;
          this.consumerCount = consumerCount;
          this.messageCount = messageCount;
@@ -1884,6 +1891,10 @@
          this.exists = exists;
       }
 
+      public long getId() {
+         return id;
+      }
+
       public SimpleString getAddress()
       {
          return address;
@@ -1914,5 +1925,8 @@
          return exists;
       }
 
+      public SimpleString getName() {
+         return name;
+      }
    }
 }

Modified: branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/ManageDataTool.java
===================================================================
--- branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/ManageDataTool.java	2011-05-19 15:00:06 UTC (rev 10701)
+++ branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/ManageDataTool.java	2011-05-19 21:27:14 UTC (rev 10702)
@@ -105,10 +105,8 @@
     * </ul>
     * <p/>
     *
-    * @param exportFile  full qualified name of the export file (/path/name.xml)
     * @param bindingsDir directory with the bindings journal
     * @param messagesDir directory with the messages journal
-    * @param backup flag if a backup should be created before modifying the original journal
     * @throws Exception if something goes wrong
     */
    public static void exportMessages(final String bindingsDir, final String messagesDir, final OutputStream out) throws Exception
@@ -218,24 +216,6 @@
 
    }
 
-   /*
-    * 
-    * private static void addDelRecordsToOriginal(JournalImpl original, Set<Long> recordsToDelete, List<MessageType> messages) throws Exception {
-      List<Long> originalDeletes = new ArrayList<Long>(messages.size() / 3);
-      for (MessageType message : messages) {
-         if (!recordsToDelete.contains(message.getId()) &&
-               !originalDeletes.contains(message.getId())) {
-            original.appendDeleteRecord(message.getId(), false, DummyOperationContext.getInstance());
-            originalDeletes.add(message.getId());
-         }
-      }
-      if (log.isInfoEnabled()) {
-         int size = originalDeletes.size();
-         log.info("Deleted " + size + " record" + (size > 1 ? "s" : "") + " from original journal!");
-      }
-   }
-    */
-
    private static void loadData(Journal original, final List<RecordInfo> records, final Set<Long> recordsToDelete) throws Exception
    {
       original.start();
@@ -294,7 +274,6 @@
     * Method which writes the XML export file to disk.
     *
     * @param hqJournalExport the root JAXB context
-    * @param exportFile the export filename
     * @throws java.io.FileNotFoundException if the export file could not be created
     * @throws javax.xml.bind.JAXBException if an error occurs during marshalling
     */
@@ -501,18 +480,17 @@
 
       FileInputStream input = new FileInputStream(new File(importFile));
 
-      return importMessages(input, configuration.getBindingsDirectory(), serverLocator);
+      return importMessages(input, serverLocator);
    }
 
    public static long importMessages(final InputStream importFile,
-                                     final String bindingsDirectory,
                                      final ServerLocator serverLocator) throws Exception
    {
       final JAXBContext context = JAXBContext.newInstance(HornetQExport.class);
 
       final Unmarshaller unmarshaller = context.createUnmarshaller();
 
-      final Map<String, Long> queueBindings = loadBindings(bindingsDirectory);
+      final Map<Long, Long> queueMapping = new HashMap<Long, Long>();
 
       ClientSessionFactory sf = null;
       
@@ -524,9 +502,22 @@
          // message notification callback
          final MessagesExportType.Listener listener = new MessagesExportType.Listener()
          {
-            public void handleMessage(MessageType message) throws Exception
-            {
+            public void handleMessage(MessageType message) throws Exception {
+               final List<QueueType> originalQueues = message.getAllPreviousBindings().getQueue();
+               final List<QueueRefType> originalBindings = message.getBindings().getQueue();
+               QueueType queue;
+               ClientSession.QueueQuery queueQuery;
+               for (QueueRefType originalBinding : originalBindings) {
+                  queue = originalQueues.get(originalQueues.indexOf(new QueueType(originalBinding.getId())));
+                  queueQuery = coreSession.queueQuery(SimpleString.toSimpleString(queue.getName()));
 
+                  if (!queueQuery.isExists()) {
+                     coreSession.createQueue(queue.getAddress(), queue.getName(), queue.getFilterString(), message.isDurable());
+                     queueQuery = coreSession.queueQuery(SimpleString.toSimpleString(queue.getName()));
+                     queueMapping.put(queue.getId(), queueQuery.getId());
+                  }
+               }
+
                ClientProducer producer = coreSession.createProducer(message.getAddress());
                ClientMessage clientMessage = generateClientMessage(message);
                producer.send(clientMessage);
@@ -553,7 +544,7 @@
                {
                   if (!ackedQueues.contains(new QueueRefType(binding.getId())))
                   {
-                     queues.add(queueBindings.get(message.getAddress()));
+                     queues.add(queueMapping.get(binding.getId()));
                   }
                }
                clientMessage.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, getByteArrayOf(queues));
@@ -615,6 +606,9 @@
                if (target instanceof MessagesExportType)
                {
                   ((MessagesExportType)target).setMessageListener(listener);
+                  if (((MessagesExportType) target).getBindings() == null) {
+                     ((MessagesExportType)target).setOriginalBindings(((HornetQExport)parent).getQueues());
+                  }
                }
             }
 

Modified: branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/xmlmodel/MessageType.java
===================================================================
--- branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/xmlmodel/MessageType.java	2011-05-19 15:00:06 UTC (rev 10701)
+++ branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/xmlmodel/MessageType.java	2011-05-19 21:27:14 UTC (rev 10702)
@@ -4,6 +4,7 @@
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlAttribute;
 import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlTransient;
 import javax.xml.bind.annotation.XmlType;
 
 import org.hornetq.api.core.SimpleString;
@@ -79,6 +80,9 @@
    @XmlAttribute
    protected Long id;
 
+   @XmlTransient
+   private BindingsJournalType allPreviousBindings;
+
    public MessageType(ServerMessage msg) {
       setId(msg.getMessageID());
 
@@ -349,4 +353,12 @@
    public int hashCode() {
       return id.hashCode();
    }
+
+   public BindingsJournalType getAllPreviousBindings() {
+      return allPreviousBindings;
+   }
+
+   public void setAllPreviousBindings(BindingsJournalType allPreviousBindings) {
+      this.allPreviousBindings = allPreviousBindings;
+   }
 }

Modified: branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/xmlmodel/MessagesExportType.java
===================================================================
--- branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/xmlmodel/MessagesExportType.java	2011-05-19 15:00:06 UTC (rev 10701)
+++ branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/xmlmodel/MessagesExportType.java	2011-05-19 21:27:14 UTC (rev 10702)
@@ -6,6 +6,7 @@
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlTransient;
 import javax.xml.bind.annotation.XmlType;
 
 
@@ -35,6 +36,9 @@
    @XmlElement(namespace = "urn:hornetq")
    protected List<MessageType> message;
 
+   @XmlTransient
+   private BindingsJournalType bindings;
+
    /**
     * Gets the value of the message property.
     * <p/>
@@ -70,6 +74,7 @@
       message = (listener == null) ? null : new ArrayList<MessageType>() {
          public boolean add(MessageType o) {
             try {
+               o.setAllPreviousBindings(getBindings());
                listener.handleMessage(o);
             } catch (Exception e) {
                e.printStackTrace();
@@ -80,6 +85,14 @@
       };
    }
 
+   public void setOriginalBindings(final BindingsJournalType bindings) {
+      this.bindings = bindings;
+   }
+
+   public BindingsJournalType getBindings() {
+      return bindings;
+   }
+
    /**
     * This listener is invoked every time a new message is unmarshalled.
     */

Modified: branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/xmlmodel/QueueType.java
===================================================================
--- branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/xmlmodel/QueueType.java	2011-05-19 15:00:06 UTC (rev 10701)
+++ branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/xmlmodel/QueueType.java	2011-05-19 21:27:14 UTC (rev 10702)
@@ -46,6 +46,13 @@
     @XmlAttribute(required = true)
     protected long id;
 
+   public QueueType() {
+   }
+
+   public QueueType(long id) {
+      this.id = id;
+   }
+
     /**
      * Gets the value of the name property.
      * 
@@ -134,4 +141,20 @@
         this.id = value;
     }
 
+   @Override
+   public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      QueueType queueType = (QueueType) o;
+
+      if (id != queueType.id) return false;
+
+      return true;
+   }
+
+   @Override
+   public int hashCode() {
+      return (int) (id ^ (id >>> 32));
+   }
 }

Modified: branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage.java
===================================================================
--- branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage.java	2011-05-19 15:00:06 UTC (rev 10701)
+++ branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage.java	2011-05-19 21:27:14 UTC (rev 10702)
@@ -27,6 +27,8 @@
  */
 public class SessionQueueQueryResponseMessage extends PacketImpl
 {
+   private long id;
+
    private SimpleString name;
    
    private boolean exists;
@@ -45,7 +47,7 @@
 
    public SessionQueueQueryResponseMessage(final QueueQueryResult result)
    {
-      this(result.getName(), result.getAddress(), result.isDurable(), result.isTemporary(),
+      this(result.getId(), result.getName(), result.getAddress(), result.isDurable(), result.isTemporary(),
            result.getFilterString(), result.getConsumerCount(), result.getMessageCount(), result.isExists());
    }
 
@@ -54,6 +56,21 @@
       this(null, null, false, false, null, 0, 0, false);
    }
 
+   private SessionQueueQueryResponseMessage(final long id,
+                                            final SimpleString name,
+                                            final SimpleString address,
+                                            final boolean durable,
+                                            final boolean temporary,
+                                            final SimpleString filterString,
+                                            final int consumerCount,
+                                            final long messageCount,
+                                            final boolean exists)
+   {
+      this(name,address, durable, temporary, filterString, consumerCount, messageCount, exists);
+      this.id = id;
+
+   }
+
    private SessionQueueQueryResponseMessage(final SimpleString name,
                                             final SimpleString address,
                                             final boolean durable,
@@ -82,6 +99,10 @@
       this.exists = exists;
    }
 
+   public long getId() {
+      return id;
+   }
+
    @Override
    public boolean isResponse()
    {
@@ -139,6 +160,7 @@
       buffer.writeNullableSimpleString(filterString);
       buffer.writeNullableSimpleString(address);
       buffer.writeNullableSimpleString(name);
+      buffer.writeLong(id);
    }
 
    @Override
@@ -152,6 +174,7 @@
       filterString = buffer.readNullableSimpleString();
       address = buffer.readNullableSimpleString();
       name = buffer.readNullableSimpleString();
+      id = buffer.readLong();
    }
 
    @Override

Modified: branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/server/QueueQueryResult.java
===================================================================
--- branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/server/QueueQueryResult.java	2011-05-19 15:00:06 UTC (rev 10701)
+++ branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/server/QueueQueryResult.java	2011-05-19 21:27:14 UTC (rev 10702)
@@ -24,6 +24,8 @@
  */
 public class QueueQueryResult
 {
+   private long id;
+
    private SimpleString name;
    
    private boolean exists;
@@ -50,12 +52,37 @@
    {
       this(name, address, durable, temporary, filterString, consumerCount, messageCount, true);
    }
+   public QueueQueryResult(final long id,
+                           final SimpleString name,
+                                           final SimpleString address,
+                                           final boolean durable,
+                                           final boolean temporary,
+                                           final SimpleString filterString,
+                                           final int consumerCount,
+                                           final long messageCount)
+   {
+      this(id, name, address, durable, temporary, filterString, consumerCount, messageCount, true);
+   }
 
    public QueueQueryResult()
    {
       this(null, null, false, false, null, 0, 0, false);
    }
 
+   private QueueQueryResult(final long id,
+                            final SimpleString name,
+                                            final SimpleString address,
+                                            final boolean durable,
+                                            final boolean temporary,
+                                            final SimpleString filterString,
+                                            final int consumerCount,
+                                            final long messageCount,
+                                            final boolean exists)
+   {
+      this(name, address, durable, temporary, filterString, consumerCount, messageCount, exists);
+      this.id = id;
+   }
+
    private QueueQueryResult(final SimpleString name,
                                             final SimpleString address,
                                             final boolean durable,
@@ -82,6 +109,10 @@
       this.exists = exists;
    }
 
+   public long getId() {
+      return id;
+   }
+
    public boolean isExists()
    {
       return exists;

Modified: branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2011-05-19 15:00:06 UTC (rev 10701)
+++ branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2011-05-19 21:27:14 UTC (rev 10702)
@@ -13,8 +13,6 @@
 
 package org.hornetq.core.server.impl;
 
-import static org.hornetq.api.core.management.NotificationType.CONSUMER_CREATED;
-
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -74,6 +72,7 @@
 import org.hornetq.utils.UUID;
 import org.hornetq.utils.json.JSONArray;
 import org.hornetq.utils.json.JSONObject;
+import static org.hornetq.api.core.management.NotificationType.CONSUMER_CREATED;
 
 /*
  * Session implementation 
@@ -495,7 +494,8 @@
 
          SimpleString filterString = filter == null ? null : filter.getFilterString();
 
-         response = new QueueQueryResult(name,
+         response = new QueueQueryResult(queue.getID(),
+                                         name,
                                          binding.getAddress(),
                                          queue.isDurable(),
                                          queue.isTemporary(),

Modified: branches/Branch_2_2_EAP_export_tool/tests/src/org/hornetq/tests/integration/persistence/ExportDataTest.java
===================================================================
--- branches/Branch_2_2_EAP_export_tool/tests/src/org/hornetq/tests/integration/persistence/ExportDataTest.java	2011-05-19 15:00:06 UTC (rev 10701)
+++ branches/Branch_2_2_EAP_export_tool/tests/src/org/hornetq/tests/integration/persistence/ExportDataTest.java	2011-05-19 21:27:14 UTC (rev 10702)
@@ -105,7 +105,7 @@
          
          locator = createInVMNonHALocator();
          
-         ManageDataTool.importMessages(is, getBindingsDir(), locator);
+         ManageDataTool.importMessages(is, locator);
          
          ClientSessionFactory csf = locator.createSessionFactory();
          
@@ -121,8 +121,6 @@
             assertEquals(i, msg.getIntProperty("count").intValue());
          }
          
-   
-         
       }
       finally
       {



More information about the hornetq-commits mailing list