[hornetq-commits] JBoss hornetq SVN: r12288 - in branches/Branch_2_2_EAP_HORNETQ-787: tests/src/org/hornetq/tests/integration/persistence and 1 other directory.
do-not-reply at jboss.org
do-not-reply at jboss.org
Mon Mar 12 15:34:47 EDT 2012
Author: jbertram
Date: 2012-03-12 15:34:46 -0400 (Mon, 12 Mar 2012)
New Revision: 12288
Added:
branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataExporter.java
branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataImporter.java
Removed:
branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataReader.java
branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataWriter.java
Modified:
branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataConstants.java
branches/Branch_2_2_EAP_HORNETQ-787/tests/src/org/hornetq/tests/integration/persistence/XmlImportExportTest.java
Log:
[HORNETQ-787] added transactional functionality to send every message atomically, added comments
Modified: branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataConstants.java
===================================================================
--- branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataConstants.java 2012-03-12 15:10:18 UTC (rev 12287)
+++ branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataConstants.java 2012-03-12 19:34:46 UTC (rev 12288)
@@ -14,7 +14,8 @@
package org.hornetq.core.persistence.impl.journal;
/**
- * The constants shared
+ * The constants shared by <code>org.hornetq.core.persistence.impl.journal.XmlDataImporter</code> and
+ * <code>org.hornetq.core.persistence.impl.journal.XmlDataExporter</code>.
*
* @author Justin Bertram
*/
Copied: branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataExporter.java (from rev 12286, branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataWriter.java)
===================================================================
--- branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataExporter.java (rev 0)
+++ branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataExporter.java 2012-03-12 19:34:46 UTC (rev 12288)
@@ -0,0 +1,744 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.persistence.impl.journal;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Message;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.journal.Journal;
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.message.BodyEncoder;
+import org.hornetq.core.paging.Page;
+import org.hornetq.core.paging.PagedMessage;
+import org.hornetq.core.paging.PagingManager;
+import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.paging.PagingStoreFactory;
+import org.hornetq.core.paging.cursor.PagePosition;
+import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
+import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
+import org.hornetq.core.paging.impl.PagingManagerImpl;
+import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.persistence.impl.nullpm.NullStorageManager;
+import org.hornetq.core.server.JournalType;
+import org.hornetq.core.server.LargeServerMessage;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.settings.HierarchicalRepository;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.core.settings.impl.HierarchicalObjectRepository;
+import org.hornetq.utils.Base64;
+import org.hornetq.utils.ExecutorFactory;
+
+import javax.xml.stream.XMLOutputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamWriter;
+import java.io.OutputStream;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.hornetq.core.persistence.impl.journal.JournalStorageManager.*;
+
+/**
+ * Read the journal, page, and large-message data from a stopped instance of HornetQ and save it in an XML format to
+ * a file. It uses the StAX <code>javax.xml.stream.XMLStreamWriter</code> for speed and simplicity. Output can be
+ * read by <code>org.hornetq.core.persistence.impl.journal.XmlDataImporter</code>.
+ *
+ * @author Justin Bertram
+ */
+public class XmlDataExporter
+{
+ // Constants -----------------------------------------------------
+
+ public static final Long LARGE_MESSAGE_CHUNK_SIZE = 1000L;
+
+ // Attributes ----------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(XmlDataExporter.class);
+
+ private JournalStorageManager storageManager;
+
+ private Configuration config;
+
+ private XMLStreamWriter xmlWriter;
+
+ // an inner map of message refs hashed by the queue ID to which they belong and then hashed by their record ID
+ private Map<Long, HashMap<Long, ReferenceDescribe>> messageRefs;
+
+ // map of all message records hashed by their record ID (which will match the record ID of the message refs)
+ private HashMap<Long, Message> messages;
+
+ private Map<Long, Set<PagePosition>> cursorRecords;
+
+ private Set<Long> pgTXs;
+
+ HashMap<Long, PersistentQueueBindingEncoding> queueBindings;
+
+ long messagesPrinted = 0L;
+
+ long bindingsPrinted = 0L;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public XmlDataExporter(OutputStream out, String bindingsDir, String journalDir, String pagingDir, String largeMessagesDir)
+ {
+ config = new ConfigurationImpl();
+ config.setBindingsDirectory(bindingsDir);
+ config.setJournalDirectory(journalDir);
+ config.setPagingDirectory(pagingDir);
+ config.setLargeMessagesDirectory(largeMessagesDir);
+ config.setJournalType(JournalType.NIO);
+ final ExecutorService executor = Executors.newFixedThreadPool(1);
+ ExecutorFactory executorFactory = new ExecutorFactory()
+ {
+ public Executor getExecutor()
+ {
+ return executor;
+ }
+ };
+
+ storageManager = new JournalStorageManager(config, executorFactory);
+
+ messageRefs = new HashMap<Long, HashMap<Long, ReferenceDescribe>>();
+
+ messages = new HashMap<Long, Message>();
+
+ cursorRecords = new HashMap<Long, Set<PagePosition>>();
+
+ pgTXs = new HashSet<Long>();
+
+ queueBindings = new HashMap<Long, PersistentQueueBindingEncoding>();
+
+ try
+ {
+ XMLOutputFactory factory = XMLOutputFactory.newInstance();
+ XMLStreamWriter rawXmlWriter = factory.createXMLStreamWriter(out);
+ PrettyPrintHandler handler = new PrettyPrintHandler(rawXmlWriter);
+ xmlWriter = (XMLStreamWriter) Proxy.newProxyInstance(
+ XMLStreamWriter.class.getClassLoader(),
+ new Class[]{XMLStreamWriter.class},
+ handler);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ // Public --------------------------------------------------------
+
+ public static void main(String arg[])
+ {
+ if (arg.length < 4)
+ {
+ System.out.println("Use: java -cp hornetq-core.jar <bindings directory> <message directory> <page directory> <large-message directory>");
+ System.exit(-1);
+ }
+
+ try
+ {
+ XmlDataExporter xmlDataExporter = new XmlDataExporter(System.out, arg[0], arg[1], arg[2], arg[3]);
+ xmlDataExporter.writeXMLData();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ public void writeXMLData() throws Exception
+ {
+ long start = System.currentTimeMillis();
+ getBindings();
+ processMessageJournal();
+ printDataAsXML();
+ log.debug("\n\nProcessing took: " + (System.currentTimeMillis() - start) + "ms");
+ log.debug("Output " + messagesPrinted + " messages and " + bindingsPrinted + " bindings.");
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ /**
+ * Read through the message journal and stuff all the events/data we care about into local data structures. We'll
+ * use this data later to print all the right information.
+ *
+ * @throws Exception will be thrown if anything goes wrong reading the journal
+ */
+ private void processMessageJournal() throws Exception
+ {
+ ArrayList<RecordInfo> acks = new ArrayList<RecordInfo>();
+
+ List<RecordInfo> records = new LinkedList<RecordInfo>();
+
+ Journal messageJournal = storageManager.getMessageJournal();
+
+ log.debug("Reading journal from " + config.getJournalDirectory());
+
+ messageJournal.start();
+
+ ((JournalImpl) messageJournal).load(records, null, null, false);
+
+ for (RecordInfo info : records)
+ {
+ byte[] data = info.data;
+
+ HornetQBuffer buff = HornetQBuffers.wrappedBuffer(data);
+
+ Object o = JournalStorageManager.newObjectEncoding(info, storageManager);
+ if (info.getUserRecordType() == JournalStorageManager.ADD_MESSAGE)
+ {
+ messages.put(info.id, ((MessageDescribe) o).msg);
+ }
+ else if (info.getUserRecordType() == JournalStorageManager.ADD_LARGE_MESSAGE)
+ {
+ messages.put(info.id, ((MessageDescribe) o).msg);
+ }
+ else if (info.getUserRecordType() == JournalStorageManager.ADD_REF)
+ {
+ ReferenceDescribe ref = (ReferenceDescribe) o;
+ HashMap<Long, ReferenceDescribe> map = messageRefs.get(info.id);
+ if (map == null)
+ {
+ HashMap<Long, ReferenceDescribe> newMap = new HashMap<Long, ReferenceDescribe>();
+ newMap.put(ref.refEncoding.queueID, ref);
+ messageRefs.put(info.id, newMap);
+ }
+ else
+ {
+ map.put(ref.refEncoding.queueID, ref);
+ }
+ }
+ else if (info.getUserRecordType() == JournalStorageManager.ACKNOWLEDGE_REF)
+ {
+ acks.add(info);
+ }
+ else if (info.userRecordType == JournalStorageManager.ACKNOWLEDGE_CURSOR)
+ {
+ CursorAckRecordEncoding encoding = new CursorAckRecordEncoding();
+ encoding.decode(buff);
+
+ Set<PagePosition> set = cursorRecords.get(encoding.queueID);
+
+ if (set == null)
+ {
+ set = new HashSet<PagePosition>();
+ cursorRecords.put(encoding.queueID, set);
+ }
+
+ set.add(encoding.position);
+ }
+ else if (info.userRecordType == JournalStorageManager.PAGE_TRANSACTION)
+ {
+ if (info.isUpdate)
+ {
+ PageUpdateTXEncoding pageUpdate = new PageUpdateTXEncoding();
+
+ pageUpdate.decode(buff);
+ pgTXs.add(pageUpdate.pageTX);
+ }
+ else
+ {
+ PageTransactionInfoImpl pageTransactionInfo = new PageTransactionInfoImpl();
+
+ pageTransactionInfo.decode(buff);
+
+ pageTransactionInfo.setRecordID(info.id);
+ pgTXs.add(pageTransactionInfo.getTransactionID());
+ }
+ }
+ }
+
+ messageJournal.stop();
+
+ removeAcked(acks);
+ }
+
+ /**
+ * Go back through the messages and message refs we found in the journal and remove the ones that have been acked.
+ *
+ * @param acks the list of ack records we got from the journal
+ */
+ private void removeAcked(ArrayList<RecordInfo> acks)
+ {
+ for (RecordInfo info : acks)
+ {
+ AckDescribe ack = (AckDescribe) JournalStorageManager.newObjectEncoding(info, null);
+ HashMap<Long, ReferenceDescribe> referenceDescribeHashMap = messageRefs.get(info.id);
+ referenceDescribeHashMap.remove(ack.refEncoding.queueID);
+ if (referenceDescribeHashMap.size() == 0)
+ {
+ messages.remove(info.id);
+ messageRefs.remove(info.id);
+ }
+ }
+ }
+
+ /**
+ * Open the bindings journal and extract all bindings data.
+ *
+ * @throws Exception will be thrown if anything goes wrong reading the bindings journal
+ */
+ private void getBindings() throws Exception
+ {
+ List<RecordInfo> records = new LinkedList<RecordInfo>();
+
+ Journal bindingsJournal = storageManager.getBindingsJournal();
+
+ bindingsJournal.start();
+
+ log.debug("Reading bindings journal from " + config.getBindingsDirectory());
+
+ ((JournalImpl) bindingsJournal).load(records, null, null, false);
+
+ for (RecordInfo info : records)
+ {
+ if (info.getUserRecordType() == JournalStorageManager.QUEUE_BINDING_RECORD)
+ {
+ PersistentQueueBindingEncoding bindingEncoding = (PersistentQueueBindingEncoding) JournalStorageManager.newObjectEncoding(info, null);
+ queueBindings.put(bindingEncoding.getId(), bindingEncoding);
+ }
+ }
+
+ bindingsJournal.stop();
+ }
+
+ private void printDataAsXML()
+ {
+ try
+ {
+ xmlWriter.writeStartDocument(XmlDataConstants.XML_VERSION);
+ xmlWriter.writeStartElement(XmlDataConstants.DOCUMENT_PARENT);
+ printBindingsAsXML();
+ printAllMessagesAsXML();
+ xmlWriter.writeEndElement(); // end DOCUMENT_PARENT
+ xmlWriter.writeEndDocument();
+ xmlWriter.flush();
+ xmlWriter.close();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ private void printBindingsAsXML() throws XMLStreamException
+ {
+ xmlWriter.writeStartElement(XmlDataConstants.BINDINGS_PARENT);
+ for (Map.Entry<Long, PersistentQueueBindingEncoding> queueBindingEncodingEntry : queueBindings.entrySet())
+ {
+ PersistentQueueBindingEncoding bindingEncoding = queueBindings.get(queueBindingEncodingEntry.getKey());
+ xmlWriter.writeEmptyElement(XmlDataConstants.BINDINGS_CHILD);
+ xmlWriter.writeAttribute(XmlDataConstants.BINDING_ADDRESS, bindingEncoding.getAddress().toString());
+ String filter = "";
+ if (bindingEncoding.getFilterString() != null)
+ {
+ filter = bindingEncoding.getFilterString().toString();
+ }
+ xmlWriter.writeAttribute(XmlDataConstants.BINDING_FILTER_STRING, filter);
+ xmlWriter.writeAttribute(XmlDataConstants.BINDING_QUEUE_NAME, bindingEncoding.getQueueName().toString());
+ xmlWriter.writeAttribute(XmlDataConstants.BINDING_ID, Long.toString(bindingEncoding.getId()));
+ bindingsPrinted++;
+ }
+ xmlWriter.writeEndElement(); // end BINDINGS_PARENT
+ }
+
+ private void printAllMessagesAsXML() throws XMLStreamException
+ {
+ xmlWriter.writeStartElement(XmlDataConstants.MESSAGES_PARENT);
+
+ // Order here is important. We must process the messages from the journal before we process those from the page
+ // files in order to get the messages in the right order.
+ for (Map.Entry<Long, Message> messageMapEntry : messages.entrySet())
+ {
+ printSingleMessageAsXML((ServerMessage) messageMapEntry.getValue(), extractQueueNames(messageRefs.get(messageMapEntry.getKey())));
+ }
+
+ printPagedMessagesAsXML();
+
+ xmlWriter.writeEndElement(); // end "messages"
+ }
+
+ /**
+ * Reads from the page files and prints messages as it finds them (making sure to check acks and transactions
+ * from the journal).
+ */
+ private void printPagedMessagesAsXML()
+ {
+ try
+ {
+ ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1);
+ final ExecutorService executor = Executors.newFixedThreadPool(10);
+ ExecutorFactory executorFactory = new ExecutorFactory()
+ {
+ public Executor getExecutor()
+ {
+ return executor;
+ }
+ };
+ PagingStoreFactory pageStoreFactory = new PagingStoreFactoryNIO(config.getPagingDirectory(), 1000l, scheduled, executorFactory, false, null);
+ HierarchicalRepository<AddressSettings> addressSettingsRepository = new HierarchicalObjectRepository<AddressSettings>();
+ addressSettingsRepository.setDefault(new AddressSettings());
+ StorageManager sm = new NullStorageManager();
+ PagingManager manager = new PagingManagerImpl(pageStoreFactory, sm, addressSettingsRepository);
+
+ manager.start();
+
+ SimpleString stores[] = manager.getStoreNames();
+
+ for (SimpleString store : stores)
+ {
+ PagingStore pageStore = manager.getPageStore(store);
+ String folder = null;
+
+ if (pageStore != null)
+ {
+ folder = pageStore.getFolder();
+ }
+ log.debug("Reading page store " + store + " folder = " + folder);
+
+ int pageId = (int) pageStore.getFirstPage();
+ for (int i = 0; i < pageStore.getNumberOfPages(); i++)
+ {
+ log.debug("Reading page " + pageId);
+ Page page = pageStore.createPage(pageId);
+ page.open();
+ List<PagedMessage> messages = page.read(sm);
+ page.close();
+
+ int messageId = 0;
+
+ for (PagedMessage message : messages)
+ {
+ message.initMessage(sm);
+ long queueIDs[] = message.getQueueIDs();
+ List<String> queueNames = new ArrayList<String>();
+ for (long queueID : queueIDs)
+ {
+ PagePosition posCheck = new PagePositionImpl(pageId, messageId);
+
+ boolean acked = false;
+
+ Set<PagePosition> positions = cursorRecords.get(queueID);
+ if (positions != null)
+ {
+ acked = positions.contains(posCheck);
+ }
+
+ if (!acked)
+ {
+ queueNames.add(queueBindings.get(queueID).getQueueName().toString());
+ }
+ }
+
+ if (queueNames.size() > 0 && (message.getTransactionID() == -1 || pgTXs.contains(message.getTransactionID())))
+ {
+ printSingleMessageAsXML(message.getMessage(), queueNames);
+ }
+
+ messageId++;
+ }
+
+ pageId++;
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ private void printSingleMessageAsXML(ServerMessage message, List<String> queues) throws XMLStreamException
+ {
+ xmlWriter.writeStartElement(XmlDataConstants.MESSAGES_CHILD);
+ printMessageAttributes(message);
+ printMessageProperties(message);
+ printMessageQueues(queues);
+ printMessageBody(message);
+ xmlWriter.writeEndElement(); // end MESSAGES_CHILD
+ messagesPrinted++;
+ }
+
+ private void printMessageBody(ServerMessage message) throws XMLStreamException
+ {
+ xmlWriter.writeStartElement(XmlDataConstants.MESSAGE_BODY);
+
+ if (message.isLargeMessage())
+ {
+ printLargeMessageBody((LargeServerMessage) message);
+ }
+ else
+ {
+ xmlWriter.writeCData(encode(message.getBodyBuffer().toByteBuffer().array()));
+ }
+ xmlWriter.writeEndElement(); // end MESSAGE_BODY
+ }
+
+ private void printLargeMessageBody(LargeServerMessage message) throws XMLStreamException
+ {
+ xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_IS_LARGE, Boolean.TRUE.toString());
+ BodyEncoder encoder = null;
+
+ try
+ {
+ encoder = message.getBodyEncoder();
+ encoder.open();
+ long totalBytesWritten = 0;
+ Long bufferSize;
+ long bodySize = encoder.getLargeBodySize();
+ for (long i = 0; i < bodySize; i += LARGE_MESSAGE_CHUNK_SIZE)
+ {
+ Long remainder = bodySize - totalBytesWritten;
+ if (remainder >= LARGE_MESSAGE_CHUNK_SIZE)
+ {
+ bufferSize = LARGE_MESSAGE_CHUNK_SIZE;
+ }
+ else
+ {
+ bufferSize = remainder;
+ }
+ HornetQBuffer buffer = HornetQBuffers.fixedBuffer(bufferSize.intValue());
+ encoder.encode(buffer, bufferSize.intValue());
+ xmlWriter.writeCData(encode(buffer.toByteBuffer().array()));
+ totalBytesWritten += bufferSize;
+ }
+ encoder.close();
+ }
+ catch (HornetQException e)
+ {
+ e.printStackTrace();
+ }
+ finally
+ {
+ if (encoder != null)
+ {
+ try
+ {
+ encoder.close();
+ }
+ catch (HornetQException e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ private void printMessageQueues(List<String> queues) throws XMLStreamException
+ {
+ xmlWriter.writeStartElement(XmlDataConstants.QUEUES_PARENT);
+ for (String queueName : queues)
+ {
+ xmlWriter.writeEmptyElement(XmlDataConstants.QUEUES_CHILD);
+ xmlWriter.writeAttribute(XmlDataConstants.QUEUE_NAME, queueName);
+ }
+ xmlWriter.writeEndElement(); // end QUEUES_PARENT
+ }
+
+ private void printMessageProperties(ServerMessage message) throws XMLStreamException
+ {
+ xmlWriter.writeStartElement(XmlDataConstants.PROPERTIES_PARENT);
+ for (SimpleString key : message.getPropertyNames())
+ {
+ Object value = message.getObjectProperty(key);
+ xmlWriter.writeEmptyElement(XmlDataConstants.PROPERTIES_CHILD);
+ xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_NAME, key.toString());
+ if (value instanceof byte[])
+ {
+ xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_VALUE, encode((byte[]) value));
+ }
+ else
+ {
+ xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_VALUE, value.toString());
+ }
+
+ if (value instanceof Boolean)
+ {
+ xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_BOOLEAN);
+ }
+ else if (value instanceof Byte)
+ {
+ xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_BYTE);
+ }
+ else if (value instanceof Short)
+ {
+ xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_SHORT);
+ }
+ else if (value instanceof Integer)
+ {
+ xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_INTEGER);
+ }
+ else if (value instanceof Long)
+ {
+ xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_LONG);
+ }
+ else if (value instanceof Float)
+ {
+ xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_FLOAT);
+ }
+ else if (value instanceof Double)
+ {
+ xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_DOUBLE);
+ }
+ else if (value instanceof String)
+ {
+ xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_STRING);
+ }
+ else if (value instanceof SimpleString)
+ {
+ xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_SIMPLE_STRING);
+ }
+ else if (value instanceof byte[])
+ {
+ xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_BYTES);
+ }
+ }
+ xmlWriter.writeEndElement(); // end PROPERTIES_PARENT
+ }
+
+ private void printMessageAttributes(ServerMessage message) throws XMLStreamException
+ {
+ xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_ID, Long.toString(message.getMessageID()));
+ xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_PRIORITY, Byte.toString(message.getPriority()));
+ xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_EXPIRATION, Long.toString(message.getExpiration()));
+ xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_TIMESTAMP, Long.toString(message.getTimestamp()));
+ byte rawType = message.getType();
+ String prettyType = XmlDataConstants.DEFAULT_TYPE_PRETTY;
+ if (rawType == Message.BYTES_TYPE)
+ {
+ prettyType = XmlDataConstants.BYTES_TYPE_PRETTY;
+ }
+ else if (rawType == Message.MAP_TYPE)
+ {
+ prettyType = XmlDataConstants.MAP_TYPE_PRETTY;
+ }
+ else if (rawType == Message.OBJECT_TYPE)
+ {
+ prettyType = XmlDataConstants.OBJECT_TYPE_PRETTY;
+ }
+ else if (rawType == Message.STREAM_TYPE)
+ {
+ prettyType = XmlDataConstants.STREAM_TYPE_PRETTY;
+ }
+ else if (rawType == Message.TEXT_TYPE)
+ {
+ prettyType = XmlDataConstants.TEXT_TYPE_PRETTY;
+ }
+ xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_TYPE, prettyType);
+ if (message.getUserID() != null)
+ {
+ xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_USER_ID, message.getUserID().toString());
+ }
+ }
+
+ private List<String> extractQueueNames(HashMap<Long, ReferenceDescribe> refMap)
+ {
+ List<String> queues = new ArrayList<String>();
+ for (ReferenceDescribe ref : refMap.values())
+ {
+ queues.add(queueBindings.get(ref.refEncoding.queueID).getQueueName().toString());
+ }
+ return queues;
+ }
+
+ private static String encode(final byte[] data)
+ {
+ return Base64.encodeBytes(data, 0, data.length, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
+ }
+
+ // Inner classes -------------------------------------------------
+
+ /**
+ * Proxy to handle indenting the XML since <code>javax.xml.stream.XMLStreamWriter</code> doesn't support that.
+ */
+ class PrettyPrintHandler implements InvocationHandler
+ {
+ private XMLStreamWriter target;
+
+ private int depth = 0;
+
+ private final char INDENT_CHAR = ' ';
+
+ private final String LINE_SEPARATOR = System.getProperty("line.separator");
+
+
+ public PrettyPrintHandler(XMLStreamWriter target)
+ {
+ this.target = target;
+ }
+
+ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
+ {
+ String m = method.getName();
+
+ if ("writeStartElement".equals(m))
+ {
+ target.writeCharacters(LINE_SEPARATOR);
+ target.writeCharacters(indent(depth));
+
+ depth++;
+ }
+ else if ("writeEndElement".equals(m))
+ {
+ depth--;
+
+ target.writeCharacters(LINE_SEPARATOR);
+ target.writeCharacters(indent(depth));
+ }
+ else if ("writeEmptyElement".equals(m) || "writeCData".equals(m))
+ {
+ target.writeCharacters(LINE_SEPARATOR);
+ target.writeCharacters(indent(depth));
+ }
+
+ method.invoke(target, args);
+
+ return null;
+ }
+
+ private String indent(int depth)
+ {
+ depth *= 3; // level of indentation
+ char[] output = new char[depth];
+ while (depth-- > 0)
+ {
+ output[depth] = INDENT_CHAR;
+ }
+ return new String(output);
+ }
+ }
+}
\ No newline at end of file
Copied: branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataImporter.java (from rev 12286, branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataReader.java)
===================================================================
--- branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataImporter.java (rev 0)
+++ branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataImporter.java 2012-03-12 19:34:46 UTC (rev 12288)
@@ -0,0 +1,529 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.persistence.impl.journal;
+
+
+import org.hornetq.api.core.Message;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientRequestor;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.api.core.management.ManagementHelper;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.message.impl.MessageImpl;
+import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
+import org.hornetq.core.remoting.impl.netty.TransportConstants;
+import org.hornetq.utils.Base64;
+
+import javax.xml.stream.XMLInputFactory;
+import javax.xml.stream.XMLStreamConstants;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * Read XML output from <code>org.hornetq.core.persistence.impl.journal.XmlDataExporter</code>, create a core session, and
+ * send the messages to a running instance of HornetQ. It uses the StAX <code>javax.xml.stream.XMLStreamReader</code>
+ * for speed and simplicity.
+ *
+ * @author Justin Bertram
+ */
+public class XmlDataImporter
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(XmlDataImporter.class);
+
+ XMLStreamReader reader;
+
+ ClientSession session;
+
+ // this session is really only needed if the "session" variable does not auto-commit sends
+ ClientSession managementSession;
+
+ boolean localSession = false;
+
+ Map<String, String> addressMap = new HashMap<String, String>();
+
+ String tempFileName = "";
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ /**
+ * This is the normal constructor for programmatic access to the <code>org.hornetq.core.persistence.impl.journal.XmlDataImporter</code>
+ * if the session passed in uses auto-commit for sends. If the session needs to be transactional then use the
+ * constructor which takes 2 sessions.
+ *
+ * @param inputStream the stream from which to read the XML for import
+ * @param session used for sending messages, must use auto-commit for sends
+ */
+ public XmlDataImporter(InputStream inputStream, ClientSession session)
+ {
+ this(inputStream, session, null);
+ }
+
+ /**
+ * This is the constructor to use if you wish to import all messages transactionally. Pass in a session which doesn't
+ * use auto-commit for sends, and one that does (for management operations necessary during import).
+ *
+ * @param inputStream the stream from which to read the XML for import
+ * @param session used for sending messages, doesn't need to auto-commit sends
+ * @param managementSession used for management queries, must use auto-commit for sends
+ */
+ public XmlDataImporter(InputStream inputStream, ClientSession session, ClientSession managementSession)
+ {
+ try
+ {
+ reader = XMLInputFactory.newInstance().createXMLStreamReader(inputStream);
+ this.session = session;
+ if (managementSession != null)
+ {
+ this.managementSession = managementSession;
+ }
+ else
+ {
+ this.managementSession = session;
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ public XmlDataImporter(InputStream inputStream, String host, String port, boolean transactional)
+ {
+ try
+ {
+ reader = XMLInputFactory.newInstance().createXMLStreamReader(inputStream);
+ HashMap<String, Object> connectionParams = new HashMap<String, Object>();
+ connectionParams.put(TransportConstants.HOST_PROP_NAME, host);
+ connectionParams.put(TransportConstants.PORT_PROP_NAME, port);
+ ServerLocator serverLocator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(NettyConnectorFactory.class.getName(), connectionParams));
+ ClientSessionFactory sf = serverLocator.createSessionFactory();
+ session = sf.createSession(false, !transactional, true);
+ if (transactional) {
+ managementSession = sf.createSession(false, true, true);
+ }
+ localSession = true;
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ public XmlDataImporter(String inputFile, String host, String port, boolean transactional) throws FileNotFoundException
+ {
+ this(new FileInputStream(inputFile), host, port, transactional);
+ }
+
+ // Public --------------------------------------------------------
+
+ public static void main(String arg[])
+ {
+ if (arg.length < 3)
+ {
+ System.out.println("Use: java -cp hornetq-core.jar " + XmlDataImporter.class + " <inputFile> <host> <port> [<transactional>]");
+ System.exit(-1);
+ }
+
+ try
+ {
+ XmlDataImporter xmlDataImporter = new XmlDataImporter(arg[0], arg[1], arg[2], (arg.length > 3 && Boolean.parseBoolean(arg[3])));
+ xmlDataImporter.processXml();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ public void processXml() throws Exception
+ {
+ try
+ {
+ while (reader.hasNext())
+ {
+ log.debug("EVENT:[" + reader.getLocation().getLineNumber() + "][" + reader.getLocation().getColumnNumber() + "] ");
+ if (reader.getEventType() == XMLStreamConstants.START_ELEMENT)
+ {
+ if (XmlDataConstants.BINDINGS_CHILD.equals(reader.getLocalName()))
+ {
+ bindQueue();
+ }
+ if (XmlDataConstants.MESSAGES_CHILD.equals(reader.getLocalName()))
+ {
+ processMessage();
+ }
+ }
+ reader.next();
+ }
+
+ if (!session.isAutoCommitSends())
+ {
+ session.commit();
+ }
+ }
+ finally
+ {
+ // if the session was created in our constructor then close it (otherwise the caller will close it)
+ if (localSession)
+ {
+ session.close();
+ }
+ }
+ }
+
+ private void processMessage() throws Exception
+ {
+ Byte type = 0;
+ Byte priority = 0;
+ Long expiration = 0L;
+ Long timestamp = 0L;
+ org.hornetq.utils.UUID userId = null;
+ ArrayList<String> queues = new ArrayList<String>();
+
+ // get message's attributes
+ for (int i = 0; i < reader.getAttributeCount(); i++)
+ {
+ String attributeName = reader.getAttributeLocalName(i);
+ if (XmlDataConstants.MESSAGE_TYPE.equals(attributeName))
+ {
+ type = getMessageType(reader.getAttributeValue(i));
+ }
+ else if (XmlDataConstants.MESSAGE_PRIORITY.equals(attributeName))
+ {
+ priority = Byte.parseByte(reader.getAttributeValue(i));
+ }
+ else if (XmlDataConstants.MESSAGE_EXPIRATION.equals(attributeName))
+ {
+ expiration = Long.parseLong(reader.getAttributeValue(i));
+ }
+ else if (XmlDataConstants.MESSAGE_TIMESTAMP.equals(attributeName))
+ {
+ timestamp = Long.parseLong(reader.getAttributeValue(i));
+ }
+ else if (XmlDataConstants.MESSAGE_USER_ID.equals(attributeName))
+ {
+ userId = org.hornetq.utils.UUIDGenerator.getInstance().generateUUID();
+ }
+ }
+
+ Message message = session.createMessage(type, true, expiration, timestamp, priority);
+ message.setUserID(userId);
+
+ boolean endLoop = false;
+
+ // loop through the XML and gather up all the message's data (i.e. body, properties, queues, etc.)
+ while (reader.hasNext())
+ {
+ switch (reader.getEventType())
+ {
+ case XMLStreamConstants.START_ELEMENT:
+ if (XmlDataConstants.MESSAGE_BODY.equals(reader.getLocalName()))
+ {
+ processMessageBody(message);
+ }
+ else if (XmlDataConstants.PROPERTIES_CHILD.equals(reader.getLocalName()))
+ {
+ processMessageProperties(message);
+ }
+ else if (XmlDataConstants.QUEUES_CHILD.equals(reader.getLocalName()))
+ {
+ processMessageQueues(queues);
+ }
+ break;
+ case XMLStreamConstants.END_ELEMENT:
+ if (XmlDataConstants.MESSAGES_CHILD.equals(reader.getLocalName()))
+ {
+ endLoop = true;
+ }
+ break;
+ }
+ if (endLoop)
+ {
+ break;
+ }
+ reader.next();
+ }
+
+ sendMessage(queues, message);
+ }
+
+ private Byte getMessageType(String value)
+ {
+ Byte type = Message.DEFAULT_TYPE;
+ if (value.equals(XmlDataConstants.DEFAULT_TYPE_PRETTY))
+ {
+ type = Message.DEFAULT_TYPE;
+ }
+ else if (value.equals(XmlDataConstants.BYTES_TYPE_PRETTY))
+ {
+ type = Message.BYTES_TYPE;
+ }
+ else if (value.equals(XmlDataConstants.MAP_TYPE_PRETTY))
+ {
+ type = Message.MAP_TYPE;
+ }
+ else if (value.equals(XmlDataConstants.OBJECT_TYPE_PRETTY))
+ {
+ type = Message.OBJECT_TYPE;
+ }
+ else if (value.equals(XmlDataConstants.STREAM_TYPE_PRETTY))
+ {
+ type = Message.STREAM_TYPE;
+ }
+ else if (value.equals(XmlDataConstants.TEXT_TYPE_PRETTY))
+ {
+ type = Message.TEXT_TYPE;
+ }
+ return type;
+ }
+
+ private void sendMessage(ArrayList<String> queues, Message message) throws Exception
+ {
+ StringBuilder logMessage = new StringBuilder();
+ String destination = addressMap.get(queues.get(0));
+
+ logMessage.append("Sending ").append(message).append(" to address: ").append(destination).append("; routed to queues: ");
+ ByteBuffer buffer = ByteBuffer.allocate(queues.size() * 8);
+
+ for (String queue : queues)
+ {
+ // Get the ID of the queues involved so the message can be routed properly. This is done because we cannot
+ // send directly to a queue, we have to send to an address instead but not all the queues related to the
+ // address may need the message
+ ClientRequestor requestor = new ClientRequestor(managementSession, "jms.queue.hornetq.management");
+ ClientMessage managementMessage = managementSession.createMessage(false);
+ ManagementHelper.putAttribute(managementMessage, "core.queue." + queue, "ID");
+ managementSession.start();
+ ClientMessage reply = requestor.request(managementMessage);
+ long queueID = (Integer) ManagementHelper.getResult(reply);
+ logMessage.append(queue).append(", ");
+ buffer.putLong(queueID);
+ }
+
+ logMessage.delete(logMessage.length() - 2, logMessage.length()); // take off the trailing comma
+ log.debug(logMessage);
+
+ message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, buffer.array());
+ ClientProducer producer = session.createProducer(destination);
+ producer.send(message);
+ producer.close();
+
+ if (tempFileName.length() > 0)
+ {
+ File tempFile = new File(tempFileName);
+ if (!tempFile.delete())
+ {
+ log.warn("Could not delete: " + tempFileName);
+ }
+ }
+ }
+
+ private void processMessageQueues(ArrayList<String> queues)
+ {
+ for (int i = 0; i < reader.getAttributeCount(); i++)
+ {
+ if (XmlDataConstants.QUEUE_NAME.equals(reader.getAttributeLocalName(i)))
+ {
+ queues.add(reader.getAttributeValue(i));
+ }
+ }
+ }
+
+ private void processMessageProperties(Message message)
+ {
+ String key = "";
+ String value = "";
+ String propertyType = "";
+
+ for (int i = 0; i < reader.getAttributeCount(); i++)
+ {
+ String attributeName = reader.getAttributeLocalName(i);
+ if (XmlDataConstants.PROPERTY_NAME.equals(attributeName))
+ {
+ key = reader.getAttributeValue(i);
+ }
+ else if (XmlDataConstants.PROPERTY_VALUE.equals(attributeName))
+ {
+ value = reader.getAttributeValue(i);
+ }
+ else if (XmlDataConstants.PROPERTY_TYPE.equals(attributeName))
+ {
+ propertyType = reader.getAttributeValue(i);
+ }
+ }
+
+ if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_SHORT))
+ {
+ message.putShortProperty(key, Short.parseShort(value));
+ }
+ else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_BOOLEAN))
+ {
+ message.putBooleanProperty(key, Boolean.parseBoolean(value));
+ }
+ else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_BYTE))
+ {
+ message.putByteProperty(key, Byte.parseByte(value));
+ }
+ else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_BYTES))
+ {
+ message.putBytesProperty(key, decode(value));
+ }
+ else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_DOUBLE))
+ {
+ message.putDoubleProperty(key, Double.parseDouble(value));
+ }
+ else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_FLOAT))
+ {
+ message.putFloatProperty(key, Float.parseFloat(value));
+ }
+ else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_INTEGER))
+ {
+ message.putIntProperty(key, Integer.parseInt(value));
+ }
+ else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_LONG))
+ {
+ message.putLongProperty(key, Long.parseLong(value));
+ }
+ else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_SIMPLE_STRING))
+ {
+ message.putStringProperty(new SimpleString(key), new SimpleString(value));
+ }
+ else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_STRING))
+ {
+ message.putStringProperty(key, value);
+ }
+ }
+
+ private void processMessageBody(Message message) throws XMLStreamException, IOException
+ {
+ boolean isLarge = false;
+
+ if (reader.getAttributeCount() > 0)
+ {
+ isLarge = Boolean.parseBoolean(reader.getAttributeValue(0));
+ }
+ reader.next();
+ if (isLarge)
+ {
+ tempFileName = UUID.randomUUID().toString() + ".tmp";
+ log.debug("Creating temp file " + tempFileName + " for large message.");
+ OutputStream out = new FileOutputStream(tempFileName);
+ while (reader.hasNext())
+ {
+ if (reader.getEventType() == XMLStreamConstants.END_ELEMENT)
+ {
+ break;
+ }
+ else
+ {
+ String characters = new String(reader.getTextCharacters(), reader.getTextStart(), reader.getTextLength());
+ String trimmedCharacters = characters.trim();
+ if (trimmedCharacters.length() > 0) // this will skip "indentation" characters
+ {
+ byte[] data = decode(trimmedCharacters);
+ out.write(data);
+ }
+ }
+ reader.next();
+ }
+ out.close();
+ FileInputStream fileInputStream = new FileInputStream(tempFileName);
+ BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream);
+ ((ClientMessage) message).setBodyInputStream(bufferedInput);
+ }
+ else
+ {
+ reader.next(); // step past the "indentation" characters to get to the CDATA with the message body
+ String characters = new String(reader.getTextCharacters(), reader.getTextStart(), reader.getTextLength());
+ message.getBodyBuffer().writeBytes(decode(characters.trim()));
+ }
+ }
+
+ private void bindQueue() throws Exception
+ {
+ String queueName = "";
+ String address = "";
+ String filter = "";
+
+ for (int i = 0; i < reader.getAttributeCount(); i++)
+ {
+ String attributeName = reader.getAttributeLocalName(i);
+ if (XmlDataConstants.BINDING_ADDRESS.equals(attributeName))
+ {
+ address = reader.getAttributeValue(i);
+ }
+ else if (XmlDataConstants.BINDING_QUEUE_NAME.equals(attributeName))
+ {
+ queueName = reader.getAttributeValue(i);
+ }
+ else if (XmlDataConstants.BINDING_FILTER_STRING.equals(attributeName))
+ {
+ filter = reader.getAttributeValue(i);
+ }
+ }
+
+ ClientSession.QueueQuery queueQuery = session.queueQuery(new SimpleString(queueName));
+
+ if (!queueQuery.isExists())
+ {
+ session.createQueue(address, queueName, filter, true);
+ log.debug("Binding queue(name=" + queueName + ", address=" + address + ", filter=" + filter + ")");
+ }
+ else
+ {
+ log.debug("Binding " + queueName + " already exists so won't re-bind.");
+ }
+
+ addressMap.put(queueName, address);
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ private static byte[] decode(String data)
+ {
+ return Base64.decode(data, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
+ }
+
+ // Inner classes -------------------------------------------------
+
+}
\ No newline at end of file
Deleted: branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataReader.java
===================================================================
--- branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataReader.java 2012-03-12 15:10:18 UTC (rev 12287)
+++ branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataReader.java 2012-03-12 19:34:46 UTC (rev 12288)
@@ -1,489 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.persistence.impl.journal;
-
-
-import org.hornetq.api.core.Message;
-import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.ClientMessage;
-import org.hornetq.api.core.client.ClientProducer;
-import org.hornetq.api.core.client.ClientRequestor;
-import org.hornetq.api.core.client.ClientSession;
-import org.hornetq.api.core.client.ClientSessionFactory;
-import org.hornetq.api.core.client.HornetQClient;
-import org.hornetq.api.core.client.ServerLocator;
-import org.hornetq.api.core.management.ManagementHelper;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.message.impl.MessageImpl;
-import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
-import org.hornetq.core.remoting.impl.netty.TransportConstants;
-import org.hornetq.utils.Base64;
-
-import javax.xml.stream.XMLInputFactory;
-import javax.xml.stream.XMLStreamConstants;
-import javax.xml.stream.XMLStreamException;
-import javax.xml.stream.XMLStreamReader;
-import java.io.BufferedInputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-
-/**
- * Read XML output from <code>org.hornetq.core.persistence.impl.journal.XmlDataWriter</code>, create a core session, and
- * send the messages to a running instance of HornetQ. It uses the StAX <code>javax.xml.stream.XMLStreamReader</code>
- * for speed and simplicity.
- *
- * @author Justin Bertram
- */
-public class XmlDataReader
-{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private static final Logger log = Logger.getLogger(XmlDataReader.class);
-
- XMLStreamReader reader;
-
- ClientSession session;
-
- boolean localSession = false;
-
- Map<String, String> addressMap = new HashMap<String, String>();
-
- String tempFileName = "";
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public XmlDataReader(InputStream inputStream, ClientSession session)
- {
- try
- {
- reader = XMLInputFactory.newInstance().createXMLStreamReader(inputStream);
- this.session = session;
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- }
-
- public XmlDataReader(InputStream inputStream, String host, String port)
- {
- try
- {
- reader = XMLInputFactory.newInstance().createXMLStreamReader(inputStream);
- HashMap<String, Object> connectionParams = new HashMap<String, Object>();
- connectionParams.put(TransportConstants.HOST_PROP_NAME, host);
- connectionParams.put(TransportConstants.PORT_PROP_NAME, port);
- ServerLocator serverLocator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(NettyConnectorFactory.class.getName(), connectionParams));
- ClientSessionFactory sf = serverLocator.createSessionFactory();
- session = sf.createSession(false, true, true);
- localSession = true;
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- }
-
- public XmlDataReader(String inputFile, String host, String port) throws FileNotFoundException
- {
- this(new FileInputStream(inputFile), host, port);
- }
-
- // Public --------------------------------------------------------
-
- public static void main(String arg[])
- {
- if (arg.length < 3)
- {
- System.out.println("Use: java -cp hornetq-core.jar " + XmlDataReader.class + " <inputFile> <host> <port>");
- System.exit(-1);
- }
-
- try
- {
- XmlDataReader xmlDataReader = new XmlDataReader(arg[0], arg[1], arg[2]);
- xmlDataReader.processXml();
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- }
-
- public void processXml() throws Exception
- {
- try
- {
- while (reader.hasNext())
- {
- log.debug("EVENT:[" + reader.getLocation().getLineNumber() + "][" + reader.getLocation().getColumnNumber() + "] ");
- if (reader.getEventType() == XMLStreamConstants.START_ELEMENT)
- {
- if (XmlDataConstants.BINDINGS_CHILD.equals(reader.getLocalName()))
- {
- bindQueue();
- }
- if (XmlDataConstants.MESSAGES_CHILD.equals(reader.getLocalName()))
- {
- processMessage();
- }
- }
- reader.next();
- }
- }
- finally
- {
- // if the session was created in our constructor then close it
- if (localSession)
- {
- session.close();
- }
- }
- }
-
- private void processMessage() throws Exception
- {
- Byte type = 0;
- Byte priority = 0;
- Long expiration = 0L;
- Long timestamp = 0L;
- org.hornetq.utils.UUID userId = null;
- ArrayList<String> queues = new ArrayList<String>();
-
- // get message's attributes
- for (int i = 0; i < reader.getAttributeCount(); i++)
- {
- String attributeName = reader.getAttributeLocalName(i);
- if (XmlDataConstants.MESSAGE_TYPE.equals(attributeName))
- {
- type = getMessageType(reader.getAttributeValue(i));
- }
- else if (XmlDataConstants.MESSAGE_PRIORITY.equals(attributeName))
- {
- priority = Byte.parseByte(reader.getAttributeValue(i));
- }
- else if (XmlDataConstants.MESSAGE_EXPIRATION.equals(attributeName))
- {
- expiration = Long.parseLong(reader.getAttributeValue(i));
- }
- else if (XmlDataConstants.MESSAGE_TIMESTAMP.equals(attributeName))
- {
- timestamp = Long.parseLong(reader.getAttributeValue(i));
- }
- else if (XmlDataConstants.MESSAGE_USER_ID.equals(attributeName))
- {
- userId = org.hornetq.utils.UUIDGenerator.getInstance().generateUUID();
- }
- }
-
- Message message = session.createMessage(type, true, expiration, timestamp, priority);
- message.setUserID(userId);
-
- boolean endLoop = false;
-
- // loop through the XML and gather up all the message's data (i.e. body, properties, queues, etc.)
- while (reader.hasNext())
- {
- switch (reader.getEventType())
- {
- case XMLStreamConstants.START_ELEMENT:
- if (XmlDataConstants.MESSAGE_BODY.equals(reader.getLocalName()))
- {
- processMessageBody(message);
- }
- else if (XmlDataConstants.PROPERTIES_CHILD.equals(reader.getLocalName()))
- {
- processMessageProperties(message);
- }
- else if (XmlDataConstants.QUEUES_CHILD.equals(reader.getLocalName()))
- {
- processMessageQueues(queues);
- }
- break;
- case XMLStreamConstants.END_ELEMENT:
- if (XmlDataConstants.MESSAGES_CHILD.equals(reader.getLocalName()))
- {
- endLoop = true;
- }
- break;
- }
- if (endLoop)
- {
- break;
- }
- reader.next();
- }
-
- sendMessage(queues, message);
- }
-
- private Byte getMessageType(String value)
- {
- Byte type = Message.DEFAULT_TYPE;
- if (value.equals(XmlDataConstants.DEFAULT_TYPE_PRETTY))
- {
- type = Message.DEFAULT_TYPE;
- }
- else if (value.equals(XmlDataConstants.BYTES_TYPE_PRETTY))
- {
- type = Message.BYTES_TYPE;
- }
- else if (value.equals(XmlDataConstants.MAP_TYPE_PRETTY))
- {
- type = Message.MAP_TYPE;
- }
- else if (value.equals(XmlDataConstants.OBJECT_TYPE_PRETTY))
- {
- type = Message.OBJECT_TYPE;
- }
- else if (value.equals(XmlDataConstants.STREAM_TYPE_PRETTY))
- {
- type = Message.STREAM_TYPE;
- }
- else if (value.equals(XmlDataConstants.TEXT_TYPE_PRETTY))
- {
- type = Message.TEXT_TYPE;
- }
- return type;
- }
-
- private void sendMessage(ArrayList<String> queues, Message message) throws Exception
- {
- StringBuilder logMessage = new StringBuilder();
- String destination = addressMap.get(queues.get(0));
-
- logMessage.append("Sending ").append(message).append(" to address: ").append(destination).append("; routed to: ");
- ByteBuffer buffer = ByteBuffer.allocate(queues.size() * 8);
-
- for (String queue : queues)
- {
- // Get the ID of the queues involved so the message can be routed properly. This is done because we cannot
- // send directly to a queue, we have to send to an address instead but not all the queues related to the
- // address may need the message
- ClientRequestor requestor = new ClientRequestor(session, "jms.queue.hornetq.management");
- ClientMessage managementMessage = session.createMessage(false);
- ManagementHelper.putAttribute(managementMessage, "core.queue." + queue, "ID");
- session.start();
- ClientMessage reply = requestor.request(managementMessage);
- long queueID = (Integer) ManagementHelper.getResult(reply);
- logMessage.append(queue).append(", ");
- buffer.putLong(queueID);
- }
-
- logMessage.delete(logMessage.length() - 2, logMessage.length()); // take off the trailing comma
- log.debug(logMessage);
-
- message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, buffer.array());
- ClientProducer producer = session.createProducer(destination);
- producer.send(message);
- producer.close();
-
- if (tempFileName.length() > 0)
- {
- File tempFile = new File(tempFileName);
- if (!tempFile.delete())
- {
- log.warn("Could not delete: " + tempFileName);
- }
- }
- }
-
- private void processMessageQueues(ArrayList<String> queues)
- {
- for (int i = 0; i < reader.getAttributeCount(); i++)
- {
- if (XmlDataConstants.QUEUE_NAME.equals(reader.getAttributeLocalName(i)))
- {
- queues.add(reader.getAttributeValue(i));
- }
- }
- }
-
- private void processMessageProperties(Message message)
- {
- String key = "";
- String value = "";
- String propertyType = "";
-
- for (int i = 0; i < reader.getAttributeCount(); i++)
- {
- String attributeName = reader.getAttributeLocalName(i);
- if (XmlDataConstants.PROPERTY_NAME.equals(attributeName))
- {
- key = reader.getAttributeValue(i);
- }
- else if (XmlDataConstants.PROPERTY_VALUE.equals(attributeName))
- {
- value = reader.getAttributeValue(i);
- }
- else if (XmlDataConstants.PROPERTY_TYPE.equals(attributeName))
- {
- propertyType = reader.getAttributeValue(i);
- }
- }
-
- if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_SHORT))
- {
- message.putShortProperty(key, Short.parseShort(value));
- }
- else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_BOOLEAN))
- {
- message.putBooleanProperty(key, Boolean.parseBoolean(value));
- }
- else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_BYTE))
- {
- message.putByteProperty(key, Byte.parseByte(value));
- }
- else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_BYTES))
- {
- message.putBytesProperty(key, decode(value));
- }
- else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_DOUBLE))
- {
- message.putDoubleProperty(key, Double.parseDouble(value));
- }
- else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_FLOAT))
- {
- message.putFloatProperty(key, Float.parseFloat(value));
- }
- else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_INTEGER))
- {
- message.putIntProperty(key, Integer.parseInt(value));
- }
- else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_LONG))
- {
- message.putLongProperty(key, Long.parseLong(value));
- }
- else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_SIMPLE_STRING))
- {
- message.putStringProperty(new SimpleString(key), new SimpleString(value));
- }
- else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_STRING))
- {
- message.putStringProperty(key, value);
- }
- }
-
- private void processMessageBody(Message message) throws XMLStreamException, IOException
- {
- boolean isLarge = false;
-
- if (reader.getAttributeCount() > 0)
- {
- isLarge = Boolean.parseBoolean(reader.getAttributeValue(0));
- }
- reader.next();
- if (isLarge)
- {
- tempFileName = UUID.randomUUID().toString() + ".tmp";
- log.debug("Creating temp file " + tempFileName + " for large message.");
- OutputStream out = new FileOutputStream(tempFileName);
- while (reader.hasNext())
- {
- if (reader.getEventType() == XMLStreamConstants.END_ELEMENT)
- {
- break;
- }
- else
- {
- String characters = new String(reader.getTextCharacters(), reader.getTextStart(), reader.getTextLength());
- String trimmedCharacters = characters.trim();
- if (trimmedCharacters.length() > 0) // this will skip "indentation" characters
- {
- byte[] data = decode(trimmedCharacters);
- out.write(data);
- }
- }
- reader.next();
- }
- out.close();
- FileInputStream fileInputStream = new FileInputStream(tempFileName);
- BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream);
- ((ClientMessage) message).setBodyInputStream(bufferedInput);
- }
- else
- {
- reader.next(); // step past the "indentation" characters to get to the CDATA with the message body
- String characters = new String(reader.getTextCharacters(), reader.getTextStart(), reader.getTextLength());
- message.getBodyBuffer().writeBytes(decode(characters.trim()));
- }
- }
-
- private void bindQueue() throws Exception
- {
- String queueName = "";
- String address = "";
- String filter = "";
-
- for (int i = 0; i < reader.getAttributeCount(); i++)
- {
- String attributeName = reader.getAttributeLocalName(i);
- if (XmlDataConstants.BINDING_ADDRESS.equals(attributeName))
- {
- address = reader.getAttributeValue(i);
- }
- else if (XmlDataConstants.BINDING_QUEUE_NAME.equals(attributeName))
- {
- queueName = reader.getAttributeValue(i);
- }
- else if (XmlDataConstants.BINDING_FILTER_STRING.equals(attributeName))
- {
- filter = reader.getAttributeValue(i);
- }
- }
-
- ClientSession.QueueQuery queueQuery = session.queueQuery(new SimpleString(queueName));
-
- if (!queueQuery.isExists())
- {
- session.createQueue(address, queueName, filter, true);
- log.debug("Binding queue(name=" + queueName + ", address=" + address + ", filter=" + filter + ")");
- }
- else
- {
- log.debug("Binding " + queueName + " already exists so won't re-bind.");
- }
-
- addressMap.put(queueName, address);
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- private static byte[] decode(String data)
- {
- return Base64.decode(data, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
- }
-
- // Inner classes -------------------------------------------------
-
-}
\ No newline at end of file
Deleted: branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataWriter.java
===================================================================
--- branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataWriter.java 2012-03-12 15:10:18 UTC (rev 12287)
+++ branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataWriter.java 2012-03-12 19:34:46 UTC (rev 12288)
@@ -1,744 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.persistence.impl.journal;
-
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.HornetQBuffers;
-import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.Message;
-import org.hornetq.api.core.SimpleString;
-import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
-import org.hornetq.core.journal.Journal;
-import org.hornetq.core.journal.RecordInfo;
-import org.hornetq.core.journal.impl.JournalImpl;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.message.BodyEncoder;
-import org.hornetq.core.paging.Page;
-import org.hornetq.core.paging.PagedMessage;
-import org.hornetq.core.paging.PagingManager;
-import org.hornetq.core.paging.PagingStore;
-import org.hornetq.core.paging.PagingStoreFactory;
-import org.hornetq.core.paging.cursor.PagePosition;
-import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
-import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
-import org.hornetq.core.paging.impl.PagingManagerImpl;
-import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
-import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.core.persistence.impl.nullpm.NullStorageManager;
-import org.hornetq.core.server.JournalType;
-import org.hornetq.core.server.LargeServerMessage;
-import org.hornetq.core.server.ServerMessage;
-import org.hornetq.core.settings.HierarchicalRepository;
-import org.hornetq.core.settings.impl.AddressSettings;
-import org.hornetq.core.settings.impl.HierarchicalObjectRepository;
-import org.hornetq.utils.Base64;
-import org.hornetq.utils.ExecutorFactory;
-
-import javax.xml.stream.XMLOutputFactory;
-import javax.xml.stream.XMLStreamException;
-import javax.xml.stream.XMLStreamWriter;
-import java.io.OutputStream;
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-
-import static org.hornetq.core.persistence.impl.journal.JournalStorageManager.*;
-
-/**
- * Read the journal, page, and large-message data from a stopped instance of HornetQ and save it in an XML format to
- * a file. It uses the StAX <code>javax.xml.stream.XMLStreamWriter</code> for speed and simplicity. Output can be
- * read by <code>org.hornetq.core.persistence.impl.journal.XmlDataReader</code>.
- *
- * @author Justin Bertram
- */
-public class XmlDataWriter
-{
- // Constants -----------------------------------------------------
-
- public static final Long LARGE_MESSAGE_CHUNK_SIZE = 1000L;
-
- // Attributes ----------------------------------------------------
-
- private static final Logger log = Logger.getLogger(XmlDataWriter.class);
-
- private JournalStorageManager storageManager;
-
- private Configuration config;
-
- private XMLStreamWriter xmlWriter;
-
- // an inner map of message refs hashed by the queue ID to which they belong and then hashed by their record ID
- private Map<Long, HashMap<Long, ReferenceDescribe>> messageRefs;
-
- // map of all message records hashed by their record ID (which will match the record ID of the message refs)
- private HashMap<Long, Message> messages;
-
- private Map<Long, Set<PagePosition>> cursorRecords;
-
- private Set<Long> pgTXs;
-
- HashMap<Long, PersistentQueueBindingEncoding> queueBindings;
-
- long messagesPrinted = 0L;
-
- long bindingsPrinted = 0L;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public XmlDataWriter(OutputStream out, String bindingsDir, String journalDir, String pagingDir, String largeMessagesDir)
- {
- config = new ConfigurationImpl();
- config.setBindingsDirectory(bindingsDir);
- config.setJournalDirectory(journalDir);
- config.setPagingDirectory(pagingDir);
- config.setLargeMessagesDirectory(largeMessagesDir);
- config.setJournalType(JournalType.NIO);
- final ExecutorService executor = Executors.newFixedThreadPool(1);
- ExecutorFactory executorFactory = new ExecutorFactory()
- {
- public Executor getExecutor()
- {
- return executor;
- }
- };
-
- storageManager = new JournalStorageManager(config, executorFactory);
-
- messageRefs = new HashMap<Long, HashMap<Long, ReferenceDescribe>>();
-
- messages = new HashMap<Long, Message>();
-
- cursorRecords = new HashMap<Long, Set<PagePosition>>();
-
- pgTXs = new HashSet<Long>();
-
- queueBindings = new HashMap<Long, PersistentQueueBindingEncoding>();
-
- try
- {
- XMLOutputFactory factory = XMLOutputFactory.newInstance();
- XMLStreamWriter rawXmlWriter = factory.createXMLStreamWriter(out);
- PrettyPrintHandler handler = new PrettyPrintHandler(rawXmlWriter);
- xmlWriter = (XMLStreamWriter) Proxy.newProxyInstance(
- XMLStreamWriter.class.getClassLoader(),
- new Class[]{XMLStreamWriter.class},
- handler);
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- }
-
- // Public --------------------------------------------------------
-
- public static void main(String arg[])
- {
- if (arg.length < 4)
- {
- System.out.println("Use: java -cp hornetq-core.jar <bindings directory> <message directory> <page directory> <large-message directory>");
- System.exit(-1);
- }
-
- try
- {
- XmlDataWriter xmlDataWriter = new XmlDataWriter(System.out, arg[0], arg[1], arg[2], arg[3]);
- xmlDataWriter.writeXMLData();
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- }
-
- public void writeXMLData() throws Exception
- {
- long start = System.currentTimeMillis();
- getBindings();
- processMessageJournal();
- printDataAsXML();
- log.debug("\n\nProcessing took: " + (System.currentTimeMillis() - start) + "ms");
- log.debug("Output " + messagesPrinted + " messages and " + bindingsPrinted + " bindings.");
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- /**
- * Read through the message journal and stuff all the events/data we care about into local data structures. We'll
- * use this data later to print all the right information.
- *
- * @throws Exception will be thrown if anything goes wrong reading the journal
- */
- private void processMessageJournal() throws Exception
- {
- ArrayList<RecordInfo> acks = new ArrayList<RecordInfo>();
-
- List<RecordInfo> records = new LinkedList<RecordInfo>();
-
- Journal messageJournal = storageManager.getMessageJournal();
-
- log.debug("Reading journal from " + config.getJournalDirectory());
-
- messageJournal.start();
-
- ((JournalImpl) messageJournal).load(records, null, null, false);
-
- for (RecordInfo info : records)
- {
- byte[] data = info.data;
-
- HornetQBuffer buff = HornetQBuffers.wrappedBuffer(data);
-
- Object o = JournalStorageManager.newObjectEncoding(info, storageManager);
- if (info.getUserRecordType() == JournalStorageManager.ADD_MESSAGE)
- {
- messages.put(info.id, ((MessageDescribe) o).msg);
- }
- else if (info.getUserRecordType() == JournalStorageManager.ADD_LARGE_MESSAGE)
- {
- messages.put(info.id, ((MessageDescribe) o).msg);
- }
- else if (info.getUserRecordType() == JournalStorageManager.ADD_REF)
- {
- ReferenceDescribe ref = (ReferenceDescribe) o;
- HashMap<Long, ReferenceDescribe> map = messageRefs.get(info.id);
- if (map == null)
- {
- HashMap<Long, ReferenceDescribe> newMap = new HashMap<Long, ReferenceDescribe>();
- newMap.put(ref.refEncoding.queueID, ref);
- messageRefs.put(info.id, newMap);
- }
- else
- {
- map.put(ref.refEncoding.queueID, ref);
- }
- }
- else if (info.getUserRecordType() == JournalStorageManager.ACKNOWLEDGE_REF)
- {
- acks.add(info);
- }
- else if (info.userRecordType == JournalStorageManager.ACKNOWLEDGE_CURSOR)
- {
- CursorAckRecordEncoding encoding = new CursorAckRecordEncoding();
- encoding.decode(buff);
-
- Set<PagePosition> set = cursorRecords.get(encoding.queueID);
-
- if (set == null)
- {
- set = new HashSet<PagePosition>();
- cursorRecords.put(encoding.queueID, set);
- }
-
- set.add(encoding.position);
- }
- else if (info.userRecordType == JournalStorageManager.PAGE_TRANSACTION)
- {
- if (info.isUpdate)
- {
- PageUpdateTXEncoding pageUpdate = new PageUpdateTXEncoding();
-
- pageUpdate.decode(buff);
- pgTXs.add(pageUpdate.pageTX);
- }
- else
- {
- PageTransactionInfoImpl pageTransactionInfo = new PageTransactionInfoImpl();
-
- pageTransactionInfo.decode(buff);
-
- pageTransactionInfo.setRecordID(info.id);
- pgTXs.add(pageTransactionInfo.getTransactionID());
- }
- }
- }
-
- messageJournal.stop();
-
- removeAcked(acks);
- }
-
- /**
- * Go back through the messages and message refs we found in the journal and remove the ones that have been acked.
- *
- * @param acks the list of ack records we got from the journal
- */
- private void removeAcked(ArrayList<RecordInfo> acks)
- {
- for (RecordInfo info : acks)
- {
- AckDescribe ack = (AckDescribe) JournalStorageManager.newObjectEncoding(info, null);
- HashMap<Long, ReferenceDescribe> referenceDescribeHashMap = messageRefs.get(info.id);
- referenceDescribeHashMap.remove(ack.refEncoding.queueID);
- if (referenceDescribeHashMap.size() == 0)
- {
- messages.remove(info.id);
- messageRefs.remove(info.id);
- }
- }
- }
-
- /**
- * Open the bindings journal and extract all bindings data.
- *
- * @throws Exception will be thrown if anything goes wrong reading the bindings journal
- */
- private void getBindings() throws Exception
- {
- List<RecordInfo> records = new LinkedList<RecordInfo>();
-
- Journal bindingsJournal = storageManager.getBindingsJournal();
-
- bindingsJournal.start();
-
- log.debug("Reading bindings journal from " + config.getBindingsDirectory());
-
- ((JournalImpl) bindingsJournal).load(records, null, null, false);
-
- for (RecordInfo info : records)
- {
- if (info.getUserRecordType() == JournalStorageManager.QUEUE_BINDING_RECORD)
- {
- PersistentQueueBindingEncoding bindingEncoding = (PersistentQueueBindingEncoding) JournalStorageManager.newObjectEncoding(info, null);
- queueBindings.put(bindingEncoding.getId(), bindingEncoding);
- }
- }
-
- bindingsJournal.stop();
- }
-
- private void printDataAsXML()
- {
- try
- {
- xmlWriter.writeStartDocument(XmlDataConstants.XML_VERSION);
- xmlWriter.writeStartElement(XmlDataConstants.DOCUMENT_PARENT);
- printBindingsAsXML();
- printAllMessagesAsXML();
- xmlWriter.writeEndElement(); // end DOCUMENT_PARENT
- xmlWriter.writeEndDocument();
- xmlWriter.flush();
- xmlWriter.close();
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- }
-
- private void printBindingsAsXML() throws XMLStreamException
- {
- xmlWriter.writeStartElement(XmlDataConstants.BINDINGS_PARENT);
- for (Map.Entry<Long, PersistentQueueBindingEncoding> queueBindingEncodingEntry : queueBindings.entrySet())
- {
- PersistentQueueBindingEncoding bindingEncoding = queueBindings.get(queueBindingEncodingEntry.getKey());
- xmlWriter.writeEmptyElement(XmlDataConstants.BINDINGS_CHILD);
- xmlWriter.writeAttribute(XmlDataConstants.BINDING_ADDRESS, bindingEncoding.getAddress().toString());
- String filter = "";
- if (bindingEncoding.getFilterString() != null)
- {
- filter = bindingEncoding.getFilterString().toString();
- }
- xmlWriter.writeAttribute(XmlDataConstants.BINDING_FILTER_STRING, filter);
- xmlWriter.writeAttribute(XmlDataConstants.BINDING_QUEUE_NAME, bindingEncoding.getQueueName().toString());
- xmlWriter.writeAttribute(XmlDataConstants.BINDING_ID, Long.toString(bindingEncoding.getId()));
- bindingsPrinted++;
- }
- xmlWriter.writeEndElement(); // end BINDINGS_PARENT
- }
-
- private void printAllMessagesAsXML() throws XMLStreamException
- {
- xmlWriter.writeStartElement(XmlDataConstants.MESSAGES_PARENT);
-
- // Order here is important. We must process the messages from the journal before we process those from the page
- // files in order to get the messages in the right order.
- for (Map.Entry<Long, Message> messageMapEntry : messages.entrySet())
- {
- printSingleMessageAsXML((ServerMessage) messageMapEntry.getValue(), extractQueueNames(messageRefs.get(messageMapEntry.getKey())));
- }
-
- printPagedMessagesAsXML();
-
- xmlWriter.writeEndElement(); // end "messages"
- }
-
- /**
- * Reads from the page files and prints messages as it finds them (making sure to check acks and transactions
- * from the journal).
- */
- private void printPagedMessagesAsXML()
- {
- try
- {
- ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1);
- final ExecutorService executor = Executors.newFixedThreadPool(10);
- ExecutorFactory executorFactory = new ExecutorFactory()
- {
- public Executor getExecutor()
- {
- return executor;
- }
- };
- PagingStoreFactory pageStoreFactory = new PagingStoreFactoryNIO(config.getPagingDirectory(), 1000l, scheduled, executorFactory, false, null);
- HierarchicalRepository<AddressSettings> addressSettingsRepository = new HierarchicalObjectRepository<AddressSettings>();
- addressSettingsRepository.setDefault(new AddressSettings());
- StorageManager sm = new NullStorageManager();
- PagingManager manager = new PagingManagerImpl(pageStoreFactory, sm, addressSettingsRepository);
-
- manager.start();
-
- SimpleString stores[] = manager.getStoreNames();
-
- for (SimpleString store : stores)
- {
- PagingStore pageStore = manager.getPageStore(store);
- String folder = null;
-
- if (pageStore != null)
- {
- folder = pageStore.getFolder();
- }
- log.debug("Reading page store " + store + " folder = " + folder);
-
- int pageId = (int) pageStore.getFirstPage();
- for (int i = 0; i < pageStore.getNumberOfPages(); i++)
- {
- log.debug("Reading page " + pageId);
- Page page = pageStore.createPage(pageId);
- page.open();
- List<PagedMessage> messages = page.read(sm);
- page.close();
-
- int messageId = 0;
-
- for (PagedMessage message : messages)
- {
- message.initMessage(sm);
- long queueIDs[] = message.getQueueIDs();
- List<String> queueNames = new ArrayList<String>();
- for (long queueID : queueIDs)
- {
- PagePosition posCheck = new PagePositionImpl(pageId, messageId);
-
- boolean acked = false;
-
- Set<PagePosition> positions = cursorRecords.get(queueID);
- if (positions != null)
- {
- acked = positions.contains(posCheck);
- }
-
- if (!acked)
- {
- queueNames.add(queueBindings.get(queueID).getQueueName().toString());
- }
- }
-
- if (queueNames.size() > 0 && (message.getTransactionID() == -1 || pgTXs.contains(message.getTransactionID())))
- {
- printSingleMessageAsXML(message.getMessage(), queueNames);
- }
-
- messageId++;
- }
-
- pageId++;
- }
- }
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- }
-
- private void printSingleMessageAsXML(ServerMessage message, List<String> queues) throws XMLStreamException
- {
- xmlWriter.writeStartElement(XmlDataConstants.MESSAGES_CHILD);
- printMessageAttributes(message);
- printMessageProperties(message);
- printMessageQueues(queues);
- printMessageBody(message);
- xmlWriter.writeEndElement(); // end MESSAGES_CHILD
- messagesPrinted++;
- }
-
- private void printMessageBody(ServerMessage message) throws XMLStreamException
- {
- xmlWriter.writeStartElement(XmlDataConstants.MESSAGE_BODY);
-
- if (message.isLargeMessage())
- {
- printLargeMessageBody((LargeServerMessage) message);
- }
- else
- {
- xmlWriter.writeCData(encode(message.getBodyBuffer().toByteBuffer().array()));
- }
- xmlWriter.writeEndElement(); // end MESSAGE_BODY
- }
-
- private void printLargeMessageBody(LargeServerMessage message) throws XMLStreamException
- {
- xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_IS_LARGE, Boolean.TRUE.toString());
- BodyEncoder encoder = null;
-
- try
- {
- encoder = message.getBodyEncoder();
- encoder.open();
- long totalBytesWritten = 0;
- Long bufferSize;
- long bodySize = encoder.getLargeBodySize();
- for (long i = 0; i < bodySize; i += LARGE_MESSAGE_CHUNK_SIZE)
- {
- Long remainder = bodySize - totalBytesWritten;
- if (remainder >= LARGE_MESSAGE_CHUNK_SIZE)
- {
- bufferSize = LARGE_MESSAGE_CHUNK_SIZE;
- }
- else
- {
- bufferSize = remainder;
- }
- HornetQBuffer buffer = HornetQBuffers.fixedBuffer(bufferSize.intValue());
- encoder.encode(buffer, bufferSize.intValue());
- xmlWriter.writeCData(encode(buffer.toByteBuffer().array()));
- totalBytesWritten += bufferSize;
- }
- encoder.close();
- }
- catch (HornetQException e)
- {
- e.printStackTrace();
- }
- finally
- {
- if (encoder != null)
- {
- try
- {
- encoder.close();
- }
- catch (HornetQException e)
- {
- e.printStackTrace();
- }
- }
- }
- }
-
- private void printMessageQueues(List<String> queues) throws XMLStreamException
- {
- xmlWriter.writeStartElement(XmlDataConstants.QUEUES_PARENT);
- for (String queueName : queues)
- {
- xmlWriter.writeEmptyElement(XmlDataConstants.QUEUES_CHILD);
- xmlWriter.writeAttribute(XmlDataConstants.QUEUE_NAME, queueName);
- }
- xmlWriter.writeEndElement(); // end QUEUES_PARENT
- }
-
- private void printMessageProperties(ServerMessage message) throws XMLStreamException
- {
- xmlWriter.writeStartElement(XmlDataConstants.PROPERTIES_PARENT);
- for (SimpleString key : message.getPropertyNames())
- {
- Object value = message.getObjectProperty(key);
- xmlWriter.writeEmptyElement(XmlDataConstants.PROPERTIES_CHILD);
- xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_NAME, key.toString());
- if (value instanceof byte[])
- {
- xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_VALUE, encode((byte[]) value));
- }
- else
- {
- xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_VALUE, value.toString());
- }
-
- if (value instanceof Boolean)
- {
- xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_BOOLEAN);
- }
- else if (value instanceof Byte)
- {
- xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_BYTE);
- }
- else if (value instanceof Short)
- {
- xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_SHORT);
- }
- else if (value instanceof Integer)
- {
- xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_INTEGER);
- }
- else if (value instanceof Long)
- {
- xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_LONG);
- }
- else if (value instanceof Float)
- {
- xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_FLOAT);
- }
- else if (value instanceof Double)
- {
- xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_DOUBLE);
- }
- else if (value instanceof String)
- {
- xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_STRING);
- }
- else if (value instanceof SimpleString)
- {
- xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_SIMPLE_STRING);
- }
- else if (value instanceof byte[])
- {
- xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_BYTES);
- }
- }
- xmlWriter.writeEndElement(); // end PROPERTIES_PARENT
- }
-
- private void printMessageAttributes(ServerMessage message) throws XMLStreamException
- {
- xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_ID, Long.toString(message.getMessageID()));
- xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_PRIORITY, Byte.toString(message.getPriority()));
- xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_EXPIRATION, Long.toString(message.getExpiration()));
- xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_TIMESTAMP, Long.toString(message.getTimestamp()));
- byte rawType = message.getType();
- String prettyType = XmlDataConstants.DEFAULT_TYPE_PRETTY;
- if (rawType == Message.BYTES_TYPE)
- {
- prettyType = XmlDataConstants.BYTES_TYPE_PRETTY;
- }
- else if (rawType == Message.MAP_TYPE)
- {
- prettyType = XmlDataConstants.MAP_TYPE_PRETTY;
- }
- else if (rawType == Message.OBJECT_TYPE)
- {
- prettyType = XmlDataConstants.OBJECT_TYPE_PRETTY;
- }
- else if (rawType == Message.STREAM_TYPE)
- {
- prettyType = XmlDataConstants.STREAM_TYPE_PRETTY;
- }
- else if (rawType == Message.TEXT_TYPE)
- {
- prettyType = XmlDataConstants.TEXT_TYPE_PRETTY;
- }
- xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_TYPE, prettyType);
- if (message.getUserID() != null)
- {
- xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_USER_ID, message.getUserID().toString());
- }
- }
-
- private List<String> extractQueueNames(HashMap<Long, ReferenceDescribe> refMap)
- {
- List<String> queues = new ArrayList<String>();
- for (ReferenceDescribe ref : refMap.values())
- {
- queues.add(queueBindings.get(ref.refEncoding.queueID).getQueueName().toString());
- }
- return queues;
- }
-
- private static String encode(final byte[] data)
- {
- return Base64.encodeBytes(data, 0, data.length, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
- }
-
- // Inner classes -------------------------------------------------
-
- /**
- * Proxy to handle indenting the XML since <code>javax.xml.stream.XMLStreamWriter</code> doesn't support that.
- */
- class PrettyPrintHandler implements InvocationHandler
- {
- private XMLStreamWriter target;
-
- private int depth = 0;
-
- private final char INDENT_CHAR = ' ';
-
- private final String LINE_SEPARATOR = System.getProperty("line.separator");
-
-
- public PrettyPrintHandler(XMLStreamWriter target)
- {
- this.target = target;
- }
-
- public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
- {
- String m = method.getName();
-
- if ("writeStartElement".equals(m))
- {
- target.writeCharacters(LINE_SEPARATOR);
- target.writeCharacters(indent(depth));
-
- depth++;
- }
- else if ("writeEndElement".equals(m))
- {
- depth--;
-
- target.writeCharacters(LINE_SEPARATOR);
- target.writeCharacters(indent(depth));
- }
- else if ("writeEmptyElement".equals(m) || "writeCData".equals(m))
- {
- target.writeCharacters(LINE_SEPARATOR);
- target.writeCharacters(indent(depth));
- }
-
- method.invoke(target, args);
-
- return null;
- }
-
- private String indent(int depth)
- {
- depth *= 3; // level of indentation
- char[] output = new char[depth];
- while (depth-- > 0)
- {
- output[depth] = INDENT_CHAR;
- }
- return new String(output);
- }
- }
-}
\ No newline at end of file
Modified: branches/Branch_2_2_EAP_HORNETQ-787/tests/src/org/hornetq/tests/integration/persistence/XmlImportExportTest.java
===================================================================
--- branches/Branch_2_2_EAP_HORNETQ-787/tests/src/org/hornetq/tests/integration/persistence/XmlImportExportTest.java 2012-03-12 15:10:18 UTC (rev 12287)
+++ branches/Branch_2_2_EAP_HORNETQ-787/tests/src/org/hornetq/tests/integration/persistence/XmlImportExportTest.java 2012-03-12 19:34:46 UTC (rev 12288)
@@ -25,11 +25,10 @@
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.persistence.impl.journal.LargeServerMessageImpl;
-import org.hornetq.core.persistence.impl.journal.XmlDataReader;
-import org.hornetq.core.persistence.impl.journal.XmlDataWriter;
+import org.hornetq.core.persistence.impl.journal.XmlDataExporter;
+import org.hornetq.core.persistence.impl.journal.XmlDataImporter;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.settings.impl.AddressSettings;
-import org.hornetq.tests.integration.paging.PagingSendTest;
import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.tests.util.UnitTestCase;
import org.hornetq.utils.UUIDGenerator;
@@ -110,8 +109,8 @@
server.stop();
ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
- XmlDataWriter xmlDataWriter = new XmlDataWriter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
- xmlDataWriter.writeXMLData();
+ XmlDataExporter xmlDataExporter = new XmlDataExporter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
+ xmlDataExporter.writeXMLData();
System.out.print(new String(xmlOutputStream.toByteArray()));
clearData();
@@ -121,8 +120,8 @@
session = factory.createSession(false, true, true);
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
- XmlDataReader xmlDataReader = new XmlDataReader(xmlInputStream, session);
- xmlDataReader.processXml();
+ XmlDataImporter xmlDataImporter = new XmlDataImporter(xmlInputStream, session);
+ xmlDataImporter.processXml();
ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
session.start();
@@ -189,8 +188,8 @@
server.stop();
ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
- XmlDataWriter xmlDataWriter = new XmlDataWriter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
- xmlDataWriter.writeXMLData();
+ XmlDataExporter xmlDataExporter = new XmlDataExporter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
+ xmlDataExporter.writeXMLData();
System.out.print(new String(xmlOutputStream.toByteArray()));
clearData();
@@ -200,8 +199,8 @@
session = factory.createSession(false, true, true);
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
- XmlDataReader xmlDataReader = new XmlDataReader(xmlInputStream, session);
- xmlDataReader.processXml();
+ XmlDataImporter xmlDataImporter = new XmlDataImporter(xmlInputStream, session);
+ xmlDataImporter.processXml();
ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
session.start();
@@ -250,8 +249,8 @@
server.stop();
ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
- XmlDataWriter xmlDataWriter = new XmlDataWriter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
- xmlDataWriter.writeXMLData();
+ XmlDataExporter xmlDataExporter = new XmlDataExporter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
+ xmlDataExporter.writeXMLData();
System.out.print(new String(xmlOutputStream.toByteArray()));
clearData();
@@ -261,8 +260,8 @@
session = factory.createSession(false, true, true);
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
- XmlDataReader xmlDataReader = new XmlDataReader(xmlInputStream, session);
- xmlDataReader.processXml();
+ XmlDataImporter xmlDataImporter = new XmlDataImporter(xmlInputStream, session);
+ xmlDataImporter.processXml();
ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
session.start();
@@ -293,8 +292,8 @@
server.stop();
ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
- XmlDataWriter xmlDataWriter = new XmlDataWriter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
- xmlDataWriter.writeXMLData();
+ XmlDataExporter xmlDataExporter = new XmlDataExporter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
+ xmlDataExporter.writeXMLData();
System.out.print(new String(xmlOutputStream.toByteArray()));
clearData();
@@ -304,8 +303,8 @@
session = factory.createSession(false, true, true);
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
- XmlDataReader xmlDataReader = new XmlDataReader(xmlInputStream, session);
- xmlDataReader.processXml();
+ XmlDataImporter xmlDataImporter = new XmlDataImporter(xmlInputStream, session);
+ xmlDataImporter.processXml();
ClientSession.QueueQuery queueQuery = session.queueQuery(new SimpleString("queueName1"));
@@ -331,77 +330,72 @@
ClientSessionFactory factory = locator.createSessionFactory();
ClientSession session = factory.createSession(false, false);
- try
- {
- LargeServerMessageImpl fileMessage = new LargeServerMessageImpl((JournalStorageManager)server.getStorageManager());
+ LargeServerMessageImpl fileMessage = new LargeServerMessageImpl((JournalStorageManager) server.getStorageManager());
- fileMessage.setMessageID(1005);
- fileMessage.setDurable(true);
+ fileMessage.setMessageID(1005);
+ fileMessage.setDurable(true);
- for (int i = 0; i < 2 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE; i++)
- {
- fileMessage.addBytes(new byte[] { UnitTestCase.getSamplebyte(i) });
- }
+ for (int i = 0; i < 2 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE; i++)
+ {
+ fileMessage.addBytes(new byte[]{UnitTestCase.getSamplebyte(i)});
+ }
- fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, 2 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
+ fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, 2 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
-// fileMessage.releaseResources();
+ fileMessage.releaseResources();
- session.createQueue("A", "A");
+ session.createQueue("A", "A");
- ClientProducer prod = session.createProducer("A");
+ ClientProducer prod = session.createProducer("A");
- prod.send(fileMessage);
+ prod.send(fileMessage);
- fileMessage.deleteFile();
+ fileMessage.deleteFile();
- session.commit();
+ session.commit();
- session.close();
- locator.close();
- server.stop();
+ session.close();
+ locator.close();
+ server.stop();
- ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
- XmlDataWriter xmlDataWriter = new XmlDataWriter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
- xmlDataWriter.writeXMLData();
- System.out.print(new String(xmlOutputStream.toByteArray()));
+ ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
+ XmlDataExporter xmlDataExporter = new XmlDataExporter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
+ xmlDataExporter.writeXMLData();
+ System.out.print(new String(xmlOutputStream.toByteArray()));
- clearData();
- server.start();
- locator = createFactory(false);
- factory = locator.createSessionFactory();
- session = factory.createSession(false, true, true);
+ clearData();
+ server.start();
+ locator = createFactory(false);
+ factory = locator.createSessionFactory();
+ session = factory.createSession(false, true, true);
- ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
- XmlDataReader xmlDataReader = new XmlDataReader(xmlInputStream, session);
- xmlDataReader.processXml();
- session.close();
- session = factory.createSession(false, false);
- session.start();
+ ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
+ XmlDataImporter xmlDataImporter = new XmlDataImporter(xmlInputStream, session);
+ xmlDataImporter.processXml();
+ session.close();
+ session = factory.createSession(false, false);
+ session.start();
- ClientConsumer cons = session.createConsumer("A");
+ ClientConsumer cons = session.createConsumer("A");
- ClientMessage msg = cons.receive(CONSUMER_TIMEOUT);
+ ClientMessage msg = cons.receive(CONSUMER_TIMEOUT);
- Assert.assertNotNull(msg);
+ Assert.assertNotNull(msg);
- Assert.assertEquals(2 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, msg.getBodySize());
+ Assert.assertEquals(2 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, msg.getBodySize());
- for (int i = 0; i < 2 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE; i++)
- {
- Assert.assertEquals(UnitTestCase.getSamplebyte(i), msg.getBodyBuffer().readByte());
- }
-
- msg.acknowledge();
- session.commit();
- }
- finally
+ for (int i = 0; i < 2 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE; i++)
{
- session.close();
- factory.close();
- locator.close();
- server.stop();
+ Assert.assertEquals(UnitTestCase.getSamplebyte(i), msg.getBodyBuffer().readByte());
}
+
+ msg.acknowledge();
+ session.commit();
+
+ session.close();
+ factory.close();
+ locator.close();
+ server.stop();
}
public void testPartialQueue() throws Exception
@@ -432,8 +426,8 @@
server.stop();
ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
- XmlDataWriter xmlDataWriter = new XmlDataWriter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
- xmlDataWriter.writeXMLData();
+ XmlDataExporter xmlDataExporter = new XmlDataExporter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
+ xmlDataExporter.writeXMLData();
System.out.print(new String(xmlOutputStream.toByteArray()));
clearData();
@@ -443,8 +437,8 @@
session = factory.createSession(false, true, true);
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
- XmlDataReader xmlDataReader = new XmlDataReader(xmlInputStream, session);
- xmlDataReader.processXml();
+ XmlDataImporter xmlDataImporter = new XmlDataImporter(xmlInputStream, session);
+ xmlDataImporter.processXml();
consumer = session.createConsumer("myQueue1");
session.start();
msg = consumer.receive(CONSUMER_TIMEOUT);
@@ -462,6 +456,9 @@
public void testPaging() throws Exception
{
+ final String MY_ADDRESS = "myAddress";
+ final String MY_QUEUE = "myQueue";
+
HornetQServer server = createServer(true);
AddressSettings defaultSetting = new AddressSettings();
@@ -472,8 +469,7 @@
ServerLocator locator = createInVMNonHALocator();
// Making it synchronous, just because we want to stop sending messages as soon as the page-store becomes in
- // page mode
- // and we could only guarantee that by setting it to synchronous
+ // page mode and we could only guarantee that by setting it to synchronous
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
@@ -481,13 +477,11 @@
ClientSessionFactory factory = locator.createSessionFactory();
ClientSession session = factory.createSession(false, true, true);
- session.createQueue("myAddress", "myQueue1", true);
+ session.createQueue(MY_ADDRESS, MY_QUEUE, true);
- ClientProducer producer = session.createProducer("myAddress");
+ ClientProducer producer = session.createProducer(MY_ADDRESS);
- ClientMessage message = null;
-
- message = session.createMessage(true);
+ ClientMessage message = session.createMessage(true);
message.getBodyBuffer().writeBytes(new byte[1024]);
for (int i = 0; i < 200; i++)
@@ -500,8 +494,8 @@
server.stop();
ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
- XmlDataWriter xmlDataWriter = new XmlDataWriter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
- xmlDataWriter.writeXMLData();
+ XmlDataExporter xmlDataExporter = new XmlDataExporter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
+ xmlDataExporter.writeXMLData();
System.out.print(new String(xmlOutputStream.toByteArray()));
clearData();
@@ -511,10 +505,10 @@
session = factory.createSession(false, true, true);
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
- XmlDataReader xmlDataReader = new XmlDataReader(xmlInputStream, session);
- xmlDataReader.processXml();
+ XmlDataImporter xmlDataImporter = new XmlDataImporter(xmlInputStream, session);
+ xmlDataImporter.processXml();
- ClientConsumer consumer = session.createConsumer("myQueue1");
+ ClientConsumer consumer = session.createConsumer(MY_QUEUE);
session.start();
@@ -530,6 +524,53 @@
server.stop();
}
+ public void testTransactional() throws Exception
+ {
+ final String QUEUE_NAME = "A1";
+ HornetQServer server = createServer(true);
+ server.start();
+ ServerLocator locator = createInVMNonHALocator();
+ ClientSessionFactory factory = locator.createSessionFactory();
+ ClientSession session = factory.createSession(false, true, true);
+
+ session.createQueue(QUEUE_NAME, QUEUE_NAME);
+
+ ClientProducer producer = session.createProducer(QUEUE_NAME);
+
+
+ ClientMessage msg = session.createMessage(true);
+ producer.send(msg);
+
+ session.close();
+ locator.close();
+ server.stop();
+
+ ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
+ XmlDataExporter xmlDataExporter = new XmlDataExporter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
+ xmlDataExporter.writeXMLData();
+ System.out.print(new String(xmlOutputStream.toByteArray()));
+
+ clearData();
+ server.start();
+ locator = createInVMNonHALocator();
+ factory = locator.createSessionFactory();
+ session = factory.createSession(false, false, true);
+ ClientSession managementSession = factory.createSession(false, true, true);
+
+ ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
+ XmlDataImporter xmlDataImporter = new XmlDataImporter(xmlInputStream, session, managementSession);
+ xmlDataImporter.processXml();
+ ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
+ session.start();
+
+ msg = consumer.receive(CONSUMER_TIMEOUT);
+ Assert.assertNotNull(msg);
+
+ session.close();
+ locator.close();
+ server.stop();
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
More information about the hornetq-commits
mailing list