[hornetq-commits] JBoss hornetq SVN: r12286 - in branches/Branch_2_2_EAP_HORNETQ-787: tests/src/org/hornetq/tests/integration/persistence and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Mar 9 22:40:52 EST 2012


Author: jbertram
Date: 2012-03-09 22:40:51 -0500 (Fri, 09 Mar 2012)
New Revision: 12286

Modified:
   branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataConstants.java
   branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataReader.java
   branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataWriter.java
   branches/Branch_2_2_EAP_HORNETQ-787/tests/src/org/hornetq/tests/integration/persistence/XmlImportExportTest.java
Log:
[HORNETQ-787] added comments, logging, and paging test

Modified: branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataConstants.java
===================================================================
--- branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataConstants.java	2012-03-09 22:40:15 UTC (rev 12285)
+++ branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataConstants.java	2012-03-10 03:40:51 UTC (rev 12286)
@@ -14,7 +14,9 @@
 package org.hornetq.core.persistence.impl.journal;
 
 /**
- * @author <a href="mailto:jbertram at redhat.com">Justin Bertram</a>
+ * The constants shared
+ *
+ * @author Justin Bertram
  */
 public class XmlDataConstants
 {
@@ -60,5 +62,4 @@
    public static final String PROPERTY_TYPE_DOUBLE = "double";
    public static final String PROPERTY_TYPE_STRING = "string";
    public static final String PROPERTY_TYPE_SIMPLE_STRING = "simple-string";
-   public static final Long CHUNK = 1000L;
 }
\ No newline at end of file

Modified: branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataReader.java
===================================================================
--- branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataReader.java	2012-03-09 22:40:15 UTC (rev 12285)
+++ branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataReader.java	2012-03-10 03:40:51 UTC (rev 12286)
@@ -50,7 +50,11 @@
 import java.util.UUID;
 
 /**
- * @author <a href="mailto:jbertram at redhat.com">Justin Bertram</a>
+ * Read XML output from <code>org.hornetq.core.persistence.impl.journal.XmlDataWriter</code>, create a core session, and
+ * send the messages to a running instance of HornetQ.  It uses the StAX <code>javax.xml.stream.XMLStreamReader</code> 
+ * for speed and simplicity.
+ *
+ * @author Justin Bertram
  */
 public class XmlDataReader
 {
@@ -67,7 +71,7 @@
    boolean localSession = false;
 
    Map<String, String> addressMap = new HashMap<String, String>();
-   
+
    String tempFileName = "";
 
    // Static --------------------------------------------------------
@@ -117,7 +121,7 @@
    {
       if (arg.length < 3)
       {
-         System.out.println("Use: java -cp hornetq-core.jar <inputFile> <host> <port>");
+         System.out.println("Use: java -cp hornetq-core.jar " + XmlDataReader.class + " <inputFile> <host> <port>");
          System.exit(-1);
       }
 
@@ -152,8 +156,10 @@
             }
             reader.next();
          }
-      } finally
+      }
+      finally
       {
+         // if the session was created in our constructor then close it
          if (localSession)
          {
             session.close();
@@ -170,6 +176,7 @@
       org.hornetq.utils.UUID userId = null;
       ArrayList<String> queues = new ArrayList<String>();
 
+      // get message's attributes
       for (int i = 0; i < reader.getAttributeCount(); i++)
       {
          String attributeName = reader.getAttributeLocalName(i);
@@ -200,6 +207,7 @@
 
       boolean endLoop = false;
 
+      // loop through the XML and gather up all the message's data (i.e. body, properties, queues, etc.)
       while (reader.hasNext())
       {
          switch (reader.getEventType())
@@ -267,30 +275,42 @@
 
    private void sendMessage(ArrayList<String> queues, Message message) throws Exception
    {
-//      System.out.print("To " + addressMap.get(queues.get(0)) + ": " + message + " (routed to: ");
+      StringBuilder logMessage = new StringBuilder();
+      String destination = addressMap.get(queues.get(0));
+
+      logMessage.append("Sending ").append(message).append(" to address: ").append(destination).append("; routed to: ");
       ByteBuffer buffer = ByteBuffer.allocate(queues.size() * 8);
+
       for (String queue : queues)
       {
+         // Get the ID of the queues involved so the message can be routed properly.  This is done because we cannot
+         // send directly to a queue, we have to send to an address instead but not all the queues related to the
+         // address may need the message
          ClientRequestor requestor = new ClientRequestor(session, "jms.queue.hornetq.management");
          ClientMessage managementMessage = session.createMessage(false);
          ManagementHelper.putAttribute(managementMessage, "core.queue." + queue, "ID");
          session.start();
          ClientMessage reply = requestor.request(managementMessage);
          long queueID = (Integer) ManagementHelper.getResult(reply);
-//         System.out.print(queue + ", ");
-         session.queueQuery(new SimpleString(queue));
+         logMessage.append(queue).append(", ");
          buffer.putLong(queueID);
       }
-//      System.out.println(")");
+
+      logMessage.delete(logMessage.length() - 2, logMessage.length()); // take off the trailing comma
+      log.debug(logMessage);
+
       message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, buffer.array());
-      ClientProducer producer = session.createProducer(addressMap.get(queues.get(0)));
+      ClientProducer producer = session.createProducer(destination);
       producer.send(message);
       producer.close();
 
-      File tempFile = new File(tempFileName);
-      if (!tempFile.delete())
+      if (tempFileName.length() > 0)
       {
-         System.err.println("Couldn't delete " + tempFileName);
+         File tempFile = new File(tempFileName);
+         if (!tempFile.delete())
+         {
+            log.warn("Could not delete: " + tempFileName);
+         }
       }
    }
 
@@ -382,6 +402,7 @@
       if (isLarge)
       {
          tempFileName = UUID.randomUUID().toString() + ".tmp";
+         log.debug("Creating temp file " + tempFileName + " for large message.");
          OutputStream out = new FileOutputStream(tempFileName);
          while (reader.hasNext())
          {
@@ -391,14 +412,11 @@
             }
             else
             {
-//               System.out.println("Reading " + reader.getTextLength() + " characters.");
                String characters = new String(reader.getTextCharacters(), reader.getTextStart(), reader.getTextLength());
                String trimmedCharacters = characters.trim();
-               if (trimmedCharacters.length() > 0)
+               if (trimmedCharacters.length() > 0)  // this will skip "indentation" characters
                {
                   byte[] data = decode(trimmedCharacters);
-//                  System.out.println(new String(data));
-//                  System.out.println("Writing " + data.length + " bytes.");
                   out.write(data);
                }
             }
@@ -411,7 +429,7 @@
       }
       else
       {
-         reader.next(); // step past the "indentation" characters to get to the CDATA
+         reader.next(); // step past the "indentation" characters to get to the CDATA with the message body
          String characters = new String(reader.getTextCharacters(), reader.getTextStart(), reader.getTextLength());
          message.getBodyBuffer().writeBytes(decode(characters.trim()));
       }
@@ -445,11 +463,14 @@
       if (!queueQuery.isExists())
       {
          session.createQueue(address, queueName, filter, true);
+         log.debug("Binding queue(name=" + queueName + ", address=" + address + ", filter=" + filter + ")");
       }
+      else
+      {
+         log.debug("Binding " + queueName + " already exists so won't re-bind.");
+      }
 
       addressMap.put(queueName, address);
-
-//      System.out.println("Binding queue(name=" + queueName + ", address=" + address + ", filter=" + filter + ")");
    }
 
    // Package protected ---------------------------------------------

Modified: branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataWriter.java
===================================================================
--- branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataWriter.java	2012-03-09 22:40:15 UTC (rev 12285)
+++ branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataWriter.java	2012-03-10 03:40:51 UTC (rev 12286)
@@ -23,6 +23,7 @@
 import org.hornetq.core.journal.Journal;
 import org.hornetq.core.journal.RecordInfo;
 import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.logging.Logger;
 import org.hornetq.core.message.BodyEncoder;
 import org.hornetq.core.paging.Page;
 import org.hornetq.core.paging.PagedMessage;
@@ -67,14 +68,22 @@
 import static org.hornetq.core.persistence.impl.journal.JournalStorageManager.*;
 
 /**
- * @author <a href="mailto:jbertram at redhat.com">Justin Bertram</a>
+ * Read the journal, page, and large-message data from a stopped instance of HornetQ and save it in an XML format to
+ * a file.  It uses the StAX <code>javax.xml.stream.XMLStreamWriter</code> for speed and simplicity.  Output can be
+ * read by <code>org.hornetq.core.persistence.impl.journal.XmlDataReader</code>.
+ *
+ * @author Justin Bertram
  */
 public class XmlDataWriter
 {
    // Constants -----------------------------------------------------
 
+   public static final Long LARGE_MESSAGE_CHUNK_SIZE = 1000L;
+
    // Attributes ----------------------------------------------------
 
+   private static final Logger log = Logger.getLogger(XmlDataWriter.class);
+
    private JournalStorageManager storageManager;
 
    private Configuration config;
@@ -93,6 +102,10 @@
 
    HashMap<Long, PersistentQueueBindingEncoding> queueBindings;
 
+   long messagesPrinted = 0L;
+
+   long bindingsPrinted = 0L;
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -154,12 +167,8 @@
 
       try
       {
-//         long start = System.currentTimeMillis();
          XmlDataWriter xmlDataWriter = new XmlDataWriter(System.out, arg[0], arg[1], arg[2], arg[3]);
          xmlDataWriter.writeXMLData();
-//         System.out.println();
-//         System.out.println();
-//         System.out.println("Processing took: " + (System.currentTimeMillis() - start) + "ms");
       }
       catch (Exception e)
       {
@@ -169,9 +178,12 @@
 
    public void writeXMLData() throws Exception
    {
+      long start = System.currentTimeMillis();
       getBindings();
       processMessageJournal();
       printDataAsXML();
+      log.debug("\n\nProcessing took: " + (System.currentTimeMillis() - start) + "ms");
+      log.debug("Output " + messagesPrinted + " messages and " + bindingsPrinted + " bindings.");
    }
 
    // Package protected ---------------------------------------------
@@ -180,6 +192,12 @@
 
    // Private -------------------------------------------------------
 
+   /**
+    * Read through the message journal and stuff all the events/data we care about into local data structures.  We'll
+    * use this data later to print all the right information.
+    *
+    * @throws Exception will be thrown if anything goes wrong reading the journal
+    */
    private void processMessageJournal() throws Exception
    {
       ArrayList<RecordInfo> acks = new ArrayList<RecordInfo>();
@@ -188,6 +206,8 @@
 
       Journal messageJournal = storageManager.getMessageJournal();
 
+      log.debug("Reading journal from " + config.getJournalDirectory());
+
       messageJournal.start();
 
       ((JournalImpl) messageJournal).load(records, null, null, false);
@@ -267,6 +287,11 @@
       removeAcked(acks);
    }
 
+   /**
+    * Go back through the messages and message refs we found in the journal and remove the ones that have been acked.
+    *
+    * @param acks the list of ack records we got from the journal
+    */
    private void removeAcked(ArrayList<RecordInfo> acks)
    {
       for (RecordInfo info : acks)
@@ -282,6 +307,11 @@
       }
    }
 
+   /**
+    * Open the bindings journal and extract all bindings data.
+    *
+    * @throws Exception will be thrown if anything goes wrong reading the bindings journal
+    */
    private void getBindings() throws Exception
    {
       List<RecordInfo> records = new LinkedList<RecordInfo>();
@@ -290,6 +320,8 @@
 
       bindingsJournal.start();
 
+      log.debug("Reading bindings journal from " + config.getBindingsDirectory());
+
       ((JournalImpl) bindingsJournal).load(records, null, null, false);
 
       for (RecordInfo info : records)
@@ -339,6 +371,7 @@
          xmlWriter.writeAttribute(XmlDataConstants.BINDING_FILTER_STRING, filter);
          xmlWriter.writeAttribute(XmlDataConstants.BINDING_QUEUE_NAME, bindingEncoding.getQueueName().toString());
          xmlWriter.writeAttribute(XmlDataConstants.BINDING_ID, Long.toString(bindingEncoding.getId()));
+         bindingsPrinted++;
       }
       xmlWriter.writeEndElement(); // end BINDINGS_PARENT
    }
@@ -347,6 +380,8 @@
    {
       xmlWriter.writeStartElement(XmlDataConstants.MESSAGES_PARENT);
 
+      // Order here is important.  We must process the messages from the journal before we process those from the page
+      // files in order to get the messages in the right order.
       for (Map.Entry<Long, Message> messageMapEntry : messages.entrySet())
       {
          printSingleMessageAsXML((ServerMessage) messageMapEntry.getValue(), extractQueueNames(messageRefs.get(messageMapEntry.getKey())));
@@ -357,20 +392,24 @@
       xmlWriter.writeEndElement(); // end "messages"
    }
 
+   /**
+    * Reads from the page files and prints messages as it finds them (making sure to check acks and transactions
+    * from the journal).
+    */
    private void printPagedMessagesAsXML()
    {
       try
       {
          ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1);
          final ExecutorService executor = Executors.newFixedThreadPool(10);
-         ExecutorFactory execfactory = new ExecutorFactory()
+         ExecutorFactory executorFactory = new ExecutorFactory()
          {
             public Executor getExecutor()
             {
                return executor;
             }
          };
-         PagingStoreFactory pageStoreFactory = new PagingStoreFactoryNIO(config.getPagingDirectory(), 1000l, scheduled, execfactory, false, null);
+         PagingStoreFactory pageStoreFactory = new PagingStoreFactoryNIO(config.getPagingDirectory(), 1000l, scheduled, executorFactory, false, null);
          HierarchicalRepository<AddressSettings> addressSettingsRepository = new HierarchicalObjectRepository<AddressSettings>();
          addressSettingsRepository.setDefault(new AddressSettings());
          StorageManager sm = new NullStorageManager();
@@ -383,10 +422,18 @@
          for (SimpleString store : stores)
          {
             PagingStore pageStore = manager.getPageStore(store);
+            String folder = null;
 
+            if (pageStore != null)
+            {
+               folder = pageStore.getFolder();
+            }
+            log.debug("Reading page store " + store + " folder = " + folder);
+
             int pageId = (int) pageStore.getFirstPage();
             for (int i = 0; i < pageStore.getNumberOfPages(); i++)
             {
+               log.debug("Reading page " + pageId);
                Page page = pageStore.createPage(pageId);
                page.open();
                List<PagedMessage> messages = page.read(sm);
@@ -443,66 +490,72 @@
       printMessageQueues(queues);
       printMessageBody(message);
       xmlWriter.writeEndElement(); // end MESSAGES_CHILD
+      messagesPrinted++;
    }
 
    private void printMessageBody(ServerMessage message) throws XMLStreamException
    {
       xmlWriter.writeStartElement(XmlDataConstants.MESSAGE_BODY);
+
       if (message.isLargeMessage())
       {
-         xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_IS_LARGE, Boolean.TRUE.toString());
-         LargeServerMessage largeMessage = (LargeServerMessage) message;
-         BodyEncoder encoder = null;
+         printLargeMessageBody((LargeServerMessage) message);
+      }
+      else
+      {
+         xmlWriter.writeCData(encode(message.getBodyBuffer().toByteBuffer().array()));
+      }
+      xmlWriter.writeEndElement(); // end MESSAGE_BODY
+   }
 
-         try
+   private void printLargeMessageBody(LargeServerMessage message) throws XMLStreamException
+   {
+      xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_IS_LARGE, Boolean.TRUE.toString());
+      BodyEncoder encoder = null;
+
+      try
+      {
+         encoder = message.getBodyEncoder();
+         encoder.open();
+         long totalBytesWritten = 0;
+         Long bufferSize;
+         long bodySize = encoder.getLargeBodySize();
+         for (long i = 0; i < bodySize; i += LARGE_MESSAGE_CHUNK_SIZE)
          {
-            encoder = largeMessage.getBodyEncoder();
-            encoder.open();
-            long totalBytesWritten = 0;
-            Long bufferSize;
-            long bodySize = encoder.getLargeBodySize();
-            for (long i = 0; i < bodySize; i += XmlDataConstants.CHUNK)
+            Long remainder = bodySize - totalBytesWritten;
+            if (remainder >= LARGE_MESSAGE_CHUNK_SIZE)
             {
-               Long remainder = bodySize - totalBytesWritten;
-               if (remainder >= XmlDataConstants.CHUNK)
-               {
-                  bufferSize = XmlDataConstants.CHUNK;
-               }
-               else
-               {
-                  bufferSize = remainder;
-               }
-               HornetQBuffer buffer = HornetQBuffers.fixedBuffer(bufferSize.intValue());
-               encoder.encode(buffer, bufferSize.intValue());
-               xmlWriter.writeCData(encode(buffer.toByteBuffer().array()));
-               totalBytesWritten += bufferSize;
+               bufferSize = LARGE_MESSAGE_CHUNK_SIZE;
             }
-            encoder.close();
+            else
+            {
+               bufferSize = remainder;
+            }
+            HornetQBuffer buffer = HornetQBuffers.fixedBuffer(bufferSize.intValue());
+            encoder.encode(buffer, bufferSize.intValue());
+            xmlWriter.writeCData(encode(buffer.toByteBuffer().array()));
+            totalBytesWritten += bufferSize;
          }
-         catch (HornetQException e)
+         encoder.close();
+      }
+      catch (HornetQException e)
+      {
+         e.printStackTrace();
+      }
+      finally
+      {
+         if (encoder != null)
          {
-            e.printStackTrace();
-         }
-         finally
-         {
-            if (encoder != null)
+            try
             {
-               try
-               {
-                  encoder.close();
-               }
-               catch (HornetQException e)
-               {
-                  e.printStackTrace();
-               }
+               encoder.close();
             }
+            catch (HornetQException e)
+            {
+               e.printStackTrace();
+            }
          }
       }
-      else
-      {
-         xmlWriter.writeCData(encode(message.getBodyBuffer().toByteBuffer().array()));
-      }
-      xmlWriter.writeEndElement(); // end MESSAGE_BODY
    }
 
    private void printMessageQueues(List<String> queues) throws XMLStreamException
@@ -612,11 +665,6 @@
       }
    }
 
-   private static String encode(final byte[] data)
-   {
-      return Base64.encodeBytes(data, 0, data.length, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
-   }
-
    private List<String> extractQueueNames(HashMap<Long, ReferenceDescribe> refMap)
    {
       List<String> queues = new ArrayList<String>();
@@ -627,8 +675,16 @@
       return queues;
    }
 
+   private static String encode(final byte[] data)
+   {
+      return Base64.encodeBytes(data, 0, data.length, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
+   }
+
    // Inner classes -------------------------------------------------
 
+   /**
+    * Proxy to handle indenting the XML since <code>javax.xml.stream.XMLStreamWriter</code> doesn't support that.
+    */
    class PrettyPrintHandler implements InvocationHandler
    {
       private XMLStreamWriter target;

Modified: branches/Branch_2_2_EAP_HORNETQ-787/tests/src/org/hornetq/tests/integration/persistence/XmlImportExportTest.java
===================================================================
--- branches/Branch_2_2_EAP_HORNETQ-787/tests/src/org/hornetq/tests/integration/persistence/XmlImportExportTest.java	2012-03-09 22:40:15 UTC (rev 12285)
+++ branches/Branch_2_2_EAP_HORNETQ-787/tests/src/org/hornetq/tests/integration/persistence/XmlImportExportTest.java	2012-03-10 03:40:51 UTC (rev 12286)
@@ -28,6 +28,8 @@
 import org.hornetq.core.persistence.impl.journal.XmlDataReader;
 import org.hornetq.core.persistence.impl.journal.XmlDataWriter;
 import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.tests.integration.paging.PagingSendTest;
 import org.hornetq.tests.util.ServiceTestBase;
 import org.hornetq.tests.util.UnitTestCase;
 import org.hornetq.utils.UUIDGenerator;
@@ -36,13 +38,14 @@
 import java.io.ByteArrayOutputStream;
 
 /**
- * A ExportFormatTest
+ * A test of the XML export/import functionality
  *
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @author Justin Bertram
  */
 public class XmlImportExportTest extends ServiceTestBase
 {
    public static final int CONSUMER_TIMEOUT = 5000;
+
    // Constants -----------------------------------------------------
 
    // Attributes ----------------------------------------------------
@@ -416,7 +419,7 @@
 
       ClientMessage msg = session.createMessage(true);
       producer.send(msg);
-      
+
       ClientConsumer consumer = session.createConsumer("myQueue1");
       session.start();
       msg = consumer.receive(CONSUMER_TIMEOUT);
@@ -457,6 +460,76 @@
       server.stop();
    }
 
+   public void testPaging() throws Exception
+   {
+      HornetQServer server = createServer(true);
+
+      AddressSettings defaultSetting = new AddressSettings();
+      defaultSetting.setPageSizeBytes(10 * 1024);
+      defaultSetting.setMaxSizeBytes(20 * 1024);
+      server.getAddressSettingsRepository().addMatch("#", defaultSetting);
+      server.start();
+
+      ServerLocator locator = createInVMNonHALocator();
+      // Making it synchronous, just because we want to stop sending messages as soon as the page-store becomes in
+      // page mode
+      // and we could only guarantee that by setting it to synchronous
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+      locator.setBlockOnAcknowledge(true);
+
+      ClientSessionFactory factory = locator.createSessionFactory();
+      ClientSession session = factory.createSession(false, true, true);
+
+      session.createQueue("myAddress", "myQueue1", true);
+
+      ClientProducer producer = session.createProducer("myAddress");
+
+      ClientMessage message = null;
+
+      message = session.createMessage(true);
+      message.getBodyBuffer().writeBytes(new byte[1024]);
+
+      for (int i = 0; i < 200; i++)
+      {
+         producer.send(message);
+      }
+
+      session.close();
+      locator.close();
+      server.stop();
+
+      ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
+      XmlDataWriter xmlDataWriter = new XmlDataWriter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
+      xmlDataWriter.writeXMLData();
+      System.out.print(new String(xmlOutputStream.toByteArray()));
+
+      clearData();
+      server.start();
+      locator = createInVMNonHALocator();
+      factory = locator.createSessionFactory();
+      session = factory.createSession(false, true, true);
+
+      ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
+      XmlDataReader xmlDataReader = new XmlDataReader(xmlInputStream, session);
+      xmlDataReader.processXml();
+
+      ClientConsumer consumer = session.createConsumer("myQueue1");
+
+      session.start();
+
+      for (int i = 0; i < 200; i++)
+      {
+         message = consumer.receive(CONSUMER_TIMEOUT);
+
+         Assert.assertNotNull(message);
+      }
+
+      session.close();
+      locator.close();
+      server.stop();
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------



More information about the hornetq-commits mailing list