[hornetq-commits] JBoss hornetq SVN: r10702 - in branches/Branch_2_2_EAP_export_tool: src/main/org/hornetq/api/core/client and 7 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Thu May 19 17:27:15 EDT 2011
Author: jicken
Date: 2011-05-19 17:27:14 -0400 (Thu, 19 May 2011)
New Revision: 10702
Added:
branches/Branch_2_2_EAP_export_tool/.gitignore
Modified:
branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/api/core/client/ClientSession.java
branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/ManageDataTool.java
branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/xmlmodel/MessageType.java
branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/xmlmodel/MessagesExportType.java
branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/xmlmodel/QueueType.java
branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage.java
branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/server/QueueQueryResult.java
branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/Branch_2_2_EAP_export_tool/tests/src/org/hornetq/tests/integration/persistence/ExportDataTest.java
Log:
got rid of reading the target journal
queueQuery used to query if the queue exists; if not, then create the queue
the queue query response object now passes the id of the queue as this is needed to map old/new queue ids
Added: branches/Branch_2_2_EAP_export_tool/.gitignore
===================================================================
--- branches/Branch_2_2_EAP_export_tool/.gitignore (rev 0)
+++ branches/Branch_2_2_EAP_export_tool/.gitignore 2011-05-19 21:27:14 UTC (rev 10702)
@@ -0,0 +1,4 @@
+target
+thirdparty
+native
+build
Modified: branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/api/core/client/ClientSession.java
===================================================================
--- branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/api/core/client/ClientSession.java 2011-05-19 15:00:06 UTC (rev 10701)
+++ branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/api/core/client/ClientSession.java 2011-05-19 21:27:14 UTC (rev 10702)
@@ -57,6 +57,11 @@
public interface QueueQuery
{
/**
+ * @return the ID of the queue.
+ */
+ long getId();
+
+ /**
* Returns <code>true</code> if the queue exists, <code>false</code> else.
*/
boolean isExists();
Modified: branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-05-19 15:00:06 UTC (rev 10701)
+++ branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-05-19 21:27:14 UTC (rev 10702)
@@ -363,7 +363,9 @@
SessionQueueQueryResponseMessage response = (SessionQueueQueryResponseMessage)channel.sendBlocking(request);
- return new QueueQueryImpl(response.isDurable(),
+ return new QueueQueryImpl(response.getId(),
+ response.getName(),
+ response.isDurable(),
response.getConsumerCount(),
response.getMessageCount(),
response.getFilterString(),
@@ -1855,9 +1857,11 @@
private static class QueueQueryImpl implements QueueQuery
{
+ private final long id;
+ private final SimpleString name;
+
private final boolean exists;
-
private final boolean durable;
private final long messageCount;
@@ -1868,14 +1872,17 @@
private final SimpleString address;
- public QueueQueryImpl(final boolean durable,
+ public QueueQueryImpl(final long id,
+ final SimpleString name,
+ final boolean durable,
final int consumerCount,
final long messageCount,
final SimpleString filterString,
final SimpleString address,
final boolean exists)
{
-
+ this.id = id;
+ this.name = name;
this.durable = durable;
this.consumerCount = consumerCount;
this.messageCount = messageCount;
@@ -1884,6 +1891,10 @@
this.exists = exists;
}
+ public long getId() {
+ return id;
+ }
+
public SimpleString getAddress()
{
return address;
@@ -1914,5 +1925,8 @@
return exists;
}
+ public SimpleString getName() {
+ return name;
+ }
}
}
Modified: branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/ManageDataTool.java
===================================================================
--- branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/ManageDataTool.java 2011-05-19 15:00:06 UTC (rev 10701)
+++ branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/ManageDataTool.java 2011-05-19 21:27:14 UTC (rev 10702)
@@ -105,10 +105,8 @@
* </ul>
* <p/>
*
- * @param exportFile full qualified name of the export file (/path/name.xml)
* @param bindingsDir directory with the bindings journal
* @param messagesDir directory with the messages journal
- * @param backup flag if a backup should be created before modifying the original journal
* @throws Exception if something goes wrong
*/
public static void exportMessages(final String bindingsDir, final String messagesDir, final OutputStream out) throws Exception
@@ -218,24 +216,6 @@
}
- /*
- *
- * private static void addDelRecordsToOriginal(JournalImpl original, Set<Long> recordsToDelete, List<MessageType> messages) throws Exception {
- List<Long> originalDeletes = new ArrayList<Long>(messages.size() / 3);
- for (MessageType message : messages) {
- if (!recordsToDelete.contains(message.getId()) &&
- !originalDeletes.contains(message.getId())) {
- original.appendDeleteRecord(message.getId(), false, DummyOperationContext.getInstance());
- originalDeletes.add(message.getId());
- }
- }
- if (log.isInfoEnabled()) {
- int size = originalDeletes.size();
- log.info("Deleted " + size + " record" + (size > 1 ? "s" : "") + " from original journal!");
- }
- }
- */
-
private static void loadData(Journal original, final List<RecordInfo> records, final Set<Long> recordsToDelete) throws Exception
{
original.start();
@@ -294,7 +274,6 @@
* Method which writes the XML export file to disk.
*
* @param hqJournalExport the root JAXB context
- * @param exportFile the export filename
* @throws java.io.FileNotFoundException if the export file could not be created
* @throws javax.xml.bind.JAXBException if an error occurs during marshalling
*/
@@ -501,18 +480,17 @@
FileInputStream input = new FileInputStream(new File(importFile));
- return importMessages(input, configuration.getBindingsDirectory(), serverLocator);
+ return importMessages(input, serverLocator);
}
public static long importMessages(final InputStream importFile,
- final String bindingsDirectory,
final ServerLocator serverLocator) throws Exception
{
final JAXBContext context = JAXBContext.newInstance(HornetQExport.class);
final Unmarshaller unmarshaller = context.createUnmarshaller();
- final Map<String, Long> queueBindings = loadBindings(bindingsDirectory);
+ final Map<Long, Long> queueMapping = new HashMap<Long, Long>();
ClientSessionFactory sf = null;
@@ -524,9 +502,22 @@
// message notification callback
final MessagesExportType.Listener listener = new MessagesExportType.Listener()
{
- public void handleMessage(MessageType message) throws Exception
- {
+ public void handleMessage(MessageType message) throws Exception {
+ final List<QueueType> originalQueues = message.getAllPreviousBindings().getQueue();
+ final List<QueueRefType> originalBindings = message.getBindings().getQueue();
+ QueueType queue;
+ ClientSession.QueueQuery queueQuery;
+ for (QueueRefType originalBinding : originalBindings) {
+ queue = originalQueues.get(originalQueues.indexOf(new QueueType(originalBinding.getId())));
+ queueQuery = coreSession.queueQuery(SimpleString.toSimpleString(queue.getName()));
+ if (!queueQuery.isExists()) {
+ coreSession.createQueue(queue.getAddress(), queue.getName(), queue.getFilterString(), message.isDurable());
+ queueQuery = coreSession.queueQuery(SimpleString.toSimpleString(queue.getName()));
+ queueMapping.put(queue.getId(), queueQuery.getId());
+ }
+ }
+
ClientProducer producer = coreSession.createProducer(message.getAddress());
ClientMessage clientMessage = generateClientMessage(message);
producer.send(clientMessage);
@@ -553,7 +544,7 @@
{
if (!ackedQueues.contains(new QueueRefType(binding.getId())))
{
- queues.add(queueBindings.get(message.getAddress()));
+ queues.add(queueMapping.get(binding.getId()));
}
}
clientMessage.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, getByteArrayOf(queues));
@@ -615,6 +606,9 @@
if (target instanceof MessagesExportType)
{
((MessagesExportType)target).setMessageListener(listener);
+ if (((MessagesExportType) target).getBindings() == null) {
+ ((MessagesExportType)target).setOriginalBindings(((HornetQExport)parent).getQueues());
+ }
}
}
Modified: branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/xmlmodel/MessageType.java
===================================================================
--- branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/xmlmodel/MessageType.java 2011-05-19 15:00:06 UTC (rev 10701)
+++ branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/xmlmodel/MessageType.java 2011-05-19 21:27:14 UTC (rev 10702)
@@ -4,6 +4,7 @@
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlAttribute;
import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlTransient;
import javax.xml.bind.annotation.XmlType;
import org.hornetq.api.core.SimpleString;
@@ -79,6 +80,9 @@
@XmlAttribute
protected Long id;
+ @XmlTransient
+ private BindingsJournalType allPreviousBindings;
+
public MessageType(ServerMessage msg) {
setId(msg.getMessageID());
@@ -349,4 +353,12 @@
public int hashCode() {
return id.hashCode();
}
+
+ public BindingsJournalType getAllPreviousBindings() {
+ return allPreviousBindings;
+ }
+
+ public void setAllPreviousBindings(BindingsJournalType allPreviousBindings) {
+ this.allPreviousBindings = allPreviousBindings;
+ }
}
Modified: branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/xmlmodel/MessagesExportType.java
===================================================================
--- branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/xmlmodel/MessagesExportType.java 2011-05-19 15:00:06 UTC (rev 10701)
+++ branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/xmlmodel/MessagesExportType.java 2011-05-19 21:27:14 UTC (rev 10702)
@@ -6,6 +6,7 @@
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlTransient;
import javax.xml.bind.annotation.XmlType;
@@ -35,6 +36,9 @@
@XmlElement(namespace = "urn:hornetq")
protected List<MessageType> message;
+ @XmlTransient
+ private BindingsJournalType bindings;
+
/**
* Gets the value of the message property.
* <p/>
@@ -70,6 +74,7 @@
message = (listener == null) ? null : new ArrayList<MessageType>() {
public boolean add(MessageType o) {
try {
+ o.setAllPreviousBindings(getBindings());
listener.handleMessage(o);
} catch (Exception e) {
e.printStackTrace();
@@ -80,6 +85,14 @@
};
}
+ public void setOriginalBindings(final BindingsJournalType bindings) {
+ this.bindings = bindings;
+ }
+
+ public BindingsJournalType getBindings() {
+ return bindings;
+ }
+
/**
* This listener is invoked every time a new message is unmarshalled.
*/
Modified: branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/xmlmodel/QueueType.java
===================================================================
--- branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/xmlmodel/QueueType.java 2011-05-19 15:00:06 UTC (rev 10701)
+++ branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/xmlmodel/QueueType.java 2011-05-19 21:27:14 UTC (rev 10702)
@@ -46,6 +46,13 @@
@XmlAttribute(required = true)
protected long id;
+ public QueueType() {
+ }
+
+ public QueueType(long id) {
+ this.id = id;
+ }
+
/**
* Gets the value of the name property.
*
@@ -134,4 +141,20 @@
this.id = value;
}
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ QueueType queueType = (QueueType) o;
+
+ if (id != queueType.id) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return (int) (id ^ (id >>> 32));
+ }
}
Modified: branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage.java
===================================================================
--- branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage.java 2011-05-19 15:00:06 UTC (rev 10701)
+++ branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage.java 2011-05-19 21:27:14 UTC (rev 10702)
@@ -27,6 +27,8 @@
*/
public class SessionQueueQueryResponseMessage extends PacketImpl
{
+ private long id;
+
private SimpleString name;
private boolean exists;
@@ -45,7 +47,7 @@
public SessionQueueQueryResponseMessage(final QueueQueryResult result)
{
- this(result.getName(), result.getAddress(), result.isDurable(), result.isTemporary(),
+ this(result.getId(), result.getName(), result.getAddress(), result.isDurable(), result.isTemporary(),
result.getFilterString(), result.getConsumerCount(), result.getMessageCount(), result.isExists());
}
@@ -54,6 +56,21 @@
this(null, null, false, false, null, 0, 0, false);
}
+ private SessionQueueQueryResponseMessage(final long id,
+ final SimpleString name,
+ final SimpleString address,
+ final boolean durable,
+ final boolean temporary,
+ final SimpleString filterString,
+ final int consumerCount,
+ final long messageCount,
+ final boolean exists)
+ {
+ this(name,address, durable, temporary, filterString, consumerCount, messageCount, exists);
+ this.id = id;
+
+ }
+
private SessionQueueQueryResponseMessage(final SimpleString name,
final SimpleString address,
final boolean durable,
@@ -82,6 +99,10 @@
this.exists = exists;
}
+ public long getId() {
+ return id;
+ }
+
@Override
public boolean isResponse()
{
@@ -139,6 +160,7 @@
buffer.writeNullableSimpleString(filterString);
buffer.writeNullableSimpleString(address);
buffer.writeNullableSimpleString(name);
+ buffer.writeLong(id);
}
@Override
@@ -152,6 +174,7 @@
filterString = buffer.readNullableSimpleString();
address = buffer.readNullableSimpleString();
name = buffer.readNullableSimpleString();
+ id = buffer.readLong();
}
@Override
Modified: branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/server/QueueQueryResult.java
===================================================================
--- branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/server/QueueQueryResult.java 2011-05-19 15:00:06 UTC (rev 10701)
+++ branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/server/QueueQueryResult.java 2011-05-19 21:27:14 UTC (rev 10702)
@@ -24,6 +24,8 @@
*/
public class QueueQueryResult
{
+ private long id;
+
private SimpleString name;
private boolean exists;
@@ -50,12 +52,37 @@
{
this(name, address, durable, temporary, filterString, consumerCount, messageCount, true);
}
+ public QueueQueryResult(final long id,
+ final SimpleString name,
+ final SimpleString address,
+ final boolean durable,
+ final boolean temporary,
+ final SimpleString filterString,
+ final int consumerCount,
+ final long messageCount)
+ {
+ this(id, name, address, durable, temporary, filterString, consumerCount, messageCount, true);
+ }
public QueueQueryResult()
{
this(null, null, false, false, null, 0, 0, false);
}
+ private QueueQueryResult(final long id,
+ final SimpleString name,
+ final SimpleString address,
+ final boolean durable,
+ final boolean temporary,
+ final SimpleString filterString,
+ final int consumerCount,
+ final long messageCount,
+ final boolean exists)
+ {
+ this(name, address, durable, temporary, filterString, consumerCount, messageCount, exists);
+ this.id = id;
+ }
+
private QueueQueryResult(final SimpleString name,
final SimpleString address,
final boolean durable,
@@ -82,6 +109,10 @@
this.exists = exists;
}
+ public long getId() {
+ return id;
+ }
+
public boolean isExists()
{
return exists;
Modified: branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-05-19 15:00:06 UTC (rev 10701)
+++ branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-05-19 21:27:14 UTC (rev 10702)
@@ -13,8 +13,6 @@
package org.hornetq.core.server.impl;
-import static org.hornetq.api.core.management.NotificationType.CONSUMER_CREATED;
-
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -74,6 +72,7 @@
import org.hornetq.utils.UUID;
import org.hornetq.utils.json.JSONArray;
import org.hornetq.utils.json.JSONObject;
+import static org.hornetq.api.core.management.NotificationType.CONSUMER_CREATED;
/*
* Session implementation
@@ -495,7 +494,8 @@
SimpleString filterString = filter == null ? null : filter.getFilterString();
- response = new QueueQueryResult(name,
+ response = new QueueQueryResult(queue.getID(),
+ name,
binding.getAddress(),
queue.isDurable(),
queue.isTemporary(),
Modified: branches/Branch_2_2_EAP_export_tool/tests/src/org/hornetq/tests/integration/persistence/ExportDataTest.java
===================================================================
--- branches/Branch_2_2_EAP_export_tool/tests/src/org/hornetq/tests/integration/persistence/ExportDataTest.java 2011-05-19 15:00:06 UTC (rev 10701)
+++ branches/Branch_2_2_EAP_export_tool/tests/src/org/hornetq/tests/integration/persistence/ExportDataTest.java 2011-05-19 21:27:14 UTC (rev 10702)
@@ -105,7 +105,7 @@
locator = createInVMNonHALocator();
- ManageDataTool.importMessages(is, getBindingsDir(), locator);
+ ManageDataTool.importMessages(is, locator);
ClientSessionFactory csf = locator.createSessionFactory();
@@ -121,8 +121,6 @@
assertEquals(i, msg.getIntProperty("count").intValue());
}
-
-
}
finally
{
More information about the hornetq-commits
mailing list