[hornetq-commits] JBoss hornetq SVN: r8078 - in branches/Replication_Clebert: src/main/org/hornetq/core/persistence/impl/journal and 7 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Sat Oct 10 19:35:45 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-10-10 19:35:44 -0400 (Sat, 10 Oct 2009)
New Revision: 8078
Added:
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationLargeMessageBeingMessage.java
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationLargeMessageWriteMessage.java
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationLargemessageEndMessage.java
Modified:
branches/Replication_Clebert/src/main/org/hornetq/core/persistence/StorageManager.java
branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalLargeServerMessage.java
branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddTXMessage.java
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationCommitMessage.java
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteMessage.java
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteTXMessage.java
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationPrepareMessage.java
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationResponseMessage.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
Replication on large message
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/persistence/StorageManager.java 2009-10-09 20:24:02 UTC (rev 8077)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/persistence/StorageManager.java 2009-10-10 23:35:44 UTC (rev 8078)
@@ -99,6 +99,8 @@
LargeServerMessage createLargeMessage();
+ LargeServerMessage createLargeMessage(byte [] header);
+
void prepare(long txID, Xid xid) throws Exception;
void commit(long txID) throws Exception;
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalLargeServerMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalLargeServerMessage.java 2009-10-09 20:24:02 UTC (rev 8077)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalLargeServerMessage.java 2009-10-10 23:35:44 UTC (rev 8078)
@@ -17,6 +17,8 @@
import java.nio.ByteBuffer;
+import com.sun.org.apache.bcel.internal.generic.StoreInstruction;
+
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.spi.HornetQBuffer;
@@ -91,11 +93,9 @@
{
file.open();
}
+
+ storageManager.addBytesToLargeMessage(file, this.getMessageID(), bytes);
- file.position(file.size());
-
- file.write(ByteBuffer.wrap(bytes), false);
-
bodySize += bytes.length;
}
@@ -232,6 +232,7 @@
public synchronized void deleteFile() throws Exception
{
validateFile();
+ releaseResources();
storageManager.deleteFile(file);
}
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-10-09 20:24:02 UTC (rev 8077)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-10-10 23:35:44 UTC (rev 8078)
@@ -18,6 +18,7 @@
import static org.hornetq.utils.DataConstants.SIZE_LONG;
import java.io.File;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -376,7 +377,35 @@
{
return new JournalLargeServerMessage(this);
}
+
+ public void addBytesToLargeMessage(SequentialFile file, long messageId, final byte[] bytes) throws Exception
+ {
+ file.position(file.size());
+ file.write(ByteBuffer.wrap(bytes), false);
+
+ if (isReplicated())
+ {
+ this.replicator.largeMessageWrite(messageId, bytes);
+ }
+ }
+
+ public LargeServerMessage createLargeMessage(byte [] header)
+ {
+ if (isReplicated())
+ {
+ replicator.largeMessageBegin(header);
+ }
+
+ JournalLargeServerMessage largeMessage = new JournalLargeServerMessage(this);
+
+ HornetQBuffer headerBuffer = ChannelBuffers.wrappedBuffer(header);
+
+ largeMessage.decodeProperties(headerBuffer);
+
+ return largeMessage;
+ }
+
// Non transactional operations
public void storeMessage(final ServerMessage message) throws Exception
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-10-09 20:24:02 UTC (rev 8077)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-10-10 23:35:44 UTC (rev 8078)
@@ -19,6 +19,7 @@
import javax.transaction.xa.Xid;
+import org.hornetq.core.buffers.ChannelBuffers;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagedMessage;
@@ -26,6 +27,7 @@
import org.hornetq.core.persistence.QueueBindingInfo;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.Binding;
+import org.hornetq.core.remoting.spi.HornetQBuffer;
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
@@ -174,6 +176,18 @@
return new NullStorageLargeServerMessage();
}
+ public LargeServerMessage createLargeMessage(byte [] header)
+ {
+ NullStorageLargeServerMessage largeMessage = new NullStorageLargeServerMessage();
+
+ HornetQBuffer headerBuffer = ChannelBuffers.wrappedBuffer(header);
+
+ largeMessage.decodeProperties(headerBuffer);
+
+ return largeMessage;
+ }
+
+
public long generateUniqueID()
{
long id = idSequence.getAndIncrement();
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java 2009-10-09 20:24:02 UTC (rev 8077)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java 2009-10-10 23:35:44 UTC (rev 8078)
@@ -13,6 +13,9 @@
package org.hornetq.core.remoting.impl;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_BEGIN;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_END;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_WRITE;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_PAGE_EVENT;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_PAGE_WRITE;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_PREPARE;
@@ -87,6 +90,9 @@
import org.hornetq.core.remoting.impl.wireformat.ReplicationCommitMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteTXMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageBeingMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageWriteMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationLargemessageEndMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationPageEventMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationPageWriteMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationPrepareMessage;
@@ -420,6 +426,21 @@
packet = new ReplicationPageEventMessage();
break;
}
+ case REPLICATION_LARGE_MESSAGE_BEGIN:
+ {
+ packet = new ReplicationLargeMessageBeingMessage();
+ break;
+ }
+ case REPLICATION_LARGE_MESSAGE_END:
+ {
+ packet = new ReplicationLargemessageEndMessage();
+ break;
+ }
+ case REPLICATION_LARGE_MESSAGE_WRITE:
+ {
+ packet = new ReplicationLargeMessageWriteMessage();
+ break;
+ }
default:
{
throw new IllegalArgumentException("Invalid type: " + packetType);
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java 2009-10-09 20:24:02 UTC (rev 8077)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java 2009-10-10 23:35:44 UTC (rev 8078)
@@ -159,7 +159,13 @@
public static final byte REPLICATION_PAGE_EVENT = 88;
+ public static final byte REPLICATION_LARGE_MESSAGE_BEGIN = 89;
+ public static final byte REPLICATION_LARGE_MESSAGE_END = 90;
+
+ public static final byte REPLICATION_LARGE_MESSAGE_WRITE = 91;
+
+
// Static --------------------------------------------------------
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java 2009-10-09 20:24:02 UTC (rev 8077)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java 2009-10-10 23:35:44 UTC (rev 8078)
@@ -35,7 +35,7 @@
/** 0 - Bindings, 1 - MessagesJournal */
private byte journalID;
-
+
private boolean isUpdate;
private byte recordType;
@@ -53,7 +53,11 @@
super(REPLICATION_APPEND);
}
- public ReplicationAddMessage(byte journalID, boolean isUpdate, long id, byte recordType, EncodingSupport encodingData)
+ public ReplicationAddMessage(final byte journalID,
+ final boolean isUpdate,
+ final long id,
+ final byte recordType,
+ final EncodingSupport encodingData)
{
this();
this.journalID = journalID;
@@ -65,10 +69,10 @@
// Public --------------------------------------------------------
+ @Override
public int getRequiredBufferSize()
{
- return BASIC_PACKET_SIZE +
- DataConstants.SIZE_BYTE +
+ return BASIC_PACKET_SIZE + DataConstants.SIZE_BYTE +
DataConstants.SIZE_BOOLEAN +
DataConstants.SIZE_LONG +
DataConstants.SIZE_BYTE +
@@ -115,7 +119,7 @@
{
return journalID;
}
-
+
public boolean isUpdate()
{
return isUpdate;
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddTXMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddTXMessage.java 2009-10-09 20:24:02 UTC (rev 8077)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddTXMessage.java 2009-10-10 23:35:44 UTC (rev 8078)
@@ -32,12 +32,12 @@
// Attributes ----------------------------------------------------
private long txId;
-
+
private long id;
/** 0 - Bindings, 1 - MessagesJournal */
private byte journalID;
-
+
private boolean isUpdate;
private byte recordType;
@@ -55,7 +55,12 @@
super(REPLICATION_APPEND_TX);
}
- public ReplicationAddTXMessage(byte journalID, boolean isUpdate, long txId, long id, byte recordType, EncodingSupport encodingData)
+ public ReplicationAddTXMessage(final byte journalID,
+ final boolean isUpdate,
+ final long txId,
+ final long id,
+ final byte recordType,
+ final EncodingSupport encodingData)
{
this();
this.journalID = journalID;
@@ -68,10 +73,10 @@
// Public --------------------------------------------------------
+ @Override
public int getRequiredBufferSize()
{
- return BASIC_PACKET_SIZE +
- DataConstants.SIZE_BYTE +
+ return BASIC_PACKET_SIZE + DataConstants.SIZE_BYTE +
DataConstants.SIZE_BOOLEAN +
DataConstants.SIZE_LONG +
DataConstants.SIZE_LONG +
@@ -113,7 +118,7 @@
{
return id;
}
-
+
public long getTxId()
{
return txId;
@@ -126,7 +131,7 @@
{
return journalID;
}
-
+
public boolean isUpdate()
{
return isUpdate;
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationCommitMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationCommitMessage.java 2009-10-09 20:24:02 UTC (rev 8077)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationCommitMessage.java 2009-10-10 23:35:44 UTC (rev 8078)
@@ -46,7 +46,7 @@
super(REPLICATION_COMMIT_ROLLBACK);
}
- public ReplicationCommitMessage(byte journalID, boolean rollback, long txId)
+ public ReplicationCommitMessage(final byte journalID, final boolean rollback, final long txId)
{
this();
this.journalID = journalID;
@@ -56,6 +56,7 @@
// Public --------------------------------------------------------
+ @Override
public int getRequiredBufferSize()
{
return BASIC_PACKET_SIZE + DataConstants.SIZE_BYTE + DataConstants.SIZE_BOOLEAN + DataConstants.SIZE_LONG;
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteMessage.java 2009-10-09 20:24:02 UTC (rev 8077)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteMessage.java 2009-10-10 23:35:44 UTC (rev 8078)
@@ -13,7 +13,6 @@
package org.hornetq.core.remoting.impl.wireformat;
-import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.remoting.spi.HornetQBuffer;
import org.hornetq.utils.DataConstants;
@@ -45,7 +44,7 @@
super(REPLICATION_DELETE);
}
- public ReplicationDeleteMessage(byte journalID, long id)
+ public ReplicationDeleteMessage(final byte journalID, final long id)
{
this();
this.journalID = journalID;
@@ -54,11 +53,10 @@
// Public --------------------------------------------------------
+ @Override
public int getRequiredBufferSize()
{
- return BASIC_PACKET_SIZE +
- DataConstants.SIZE_BYTE +
- DataConstants.SIZE_LONG;
+ return BASIC_PACKET_SIZE + DataConstants.SIZE_BYTE + DataConstants.SIZE_LONG;
}
@@ -92,7 +90,6 @@
return journalID;
}
-
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteTXMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteTXMessage.java 2009-10-09 20:24:02 UTC (rev 8077)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteTXMessage.java 2009-10-10 23:35:44 UTC (rev 8078)
@@ -32,7 +32,7 @@
// Attributes ----------------------------------------------------
private long txId;
-
+
private long id;
/** 0 - Bindings, 1 - MessagesJournal */
@@ -51,7 +51,10 @@
super(REPLICATION_DELETE_TX);
}
- public ReplicationDeleteTXMessage(byte journalID, long txId, long id, EncodingSupport encodingData)
+ public ReplicationDeleteTXMessage(final byte journalID,
+ final long txId,
+ final long id,
+ final EncodingSupport encodingData)
{
this();
this.journalID = journalID;
@@ -62,10 +65,10 @@
// Public --------------------------------------------------------
+ @Override
public int getRequiredBufferSize()
{
- return BASIC_PACKET_SIZE +
- DataConstants.SIZE_BYTE +
+ return BASIC_PACKET_SIZE + DataConstants.SIZE_BYTE +
DataConstants.SIZE_LONG +
DataConstants.SIZE_LONG +
DataConstants.SIZE_INT +
@@ -101,7 +104,7 @@
{
return id;
}
-
+
public long getTxId()
{
return txId;
@@ -114,7 +117,7 @@
{
return journalID;
}
-
+
/**
* @return the recordData
*/
Added: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationLargeMessageBeingMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationLargeMessageBeingMessage.java (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationLargeMessageBeingMessage.java 2009-10-10 23:35:44 UTC (rev 8078)
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.impl.wireformat;
+
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.utils.DataConstants;
+
+/**
+ * A ReplicationLargeMessageBeingMessage
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReplicationLargeMessageBeingMessage extends PacketImpl
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ byte header[];
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public ReplicationLargeMessageBeingMessage(final byte[] header)
+ {
+ this();
+ this.header = header;
+ }
+
+ public ReplicationLargeMessageBeingMessage()
+ {
+ super(REPLICATION_LARGE_MESSAGE_BEGIN);
+ }
+
+ // Public --------------------------------------------------------
+
+ @Override
+ public int getRequiredBufferSize()
+ {
+ return BASIC_PACKET_SIZE + DataConstants.SIZE_INT + header.length;
+ }
+
+ @Override
+ public void encodeBody(final HornetQBuffer buffer)
+ {
+ buffer.writeInt(header.length);
+ buffer.writeBytes(header);
+ }
+
+ @Override
+ public void decodeBody(final HornetQBuffer buffer)
+ {
+ int size = buffer.readInt();
+ header = new byte[size];
+ buffer.readBytes(header);
+ }
+
+ /**
+ * @return the header
+ */
+ public byte[] getHeader()
+ {
+ return header;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationLargeMessageWriteMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationLargeMessageWriteMessage.java (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationLargeMessageWriteMessage.java 2009-10-10 23:35:44 UTC (rev 8078)
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.impl.wireformat;
+
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.utils.DataConstants;
+
+/**
+ * A ReplicationLargeMessageWriteMessage
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReplicationLargeMessageWriteMessage extends PacketImpl
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private long messageId;
+
+ private byte body[];
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+ public ReplicationLargeMessageWriteMessage()
+ {
+ super(REPLICATION_LARGE_MESSAGE_WRITE);
+ }
+
+ /**
+ * @param messageId
+ * @param body
+ */
+ public ReplicationLargeMessageWriteMessage(final long messageId, final byte[] body)
+ {
+ this();
+
+ this.messageId = messageId;
+ this.body = body;
+ }
+
+ // Public --------------------------------------------------------
+ @Override
+ public int getRequiredBufferSize()
+ {
+ return BASIC_PACKET_SIZE + DataConstants.SIZE_LONG + DataConstants.SIZE_INT + body.length;
+ }
+
+ @Override
+ public void encodeBody(final HornetQBuffer buffer)
+ {
+ buffer.writeLong(messageId);
+ buffer.writeInt(body.length);
+ buffer.writeBytes(body);
+ }
+
+ @Override
+ public void decodeBody(final HornetQBuffer buffer)
+ {
+ messageId = buffer.readLong();
+ int size = buffer.readInt();
+ body = new byte[size];
+ buffer.readBytes(body);
+ }
+
+ /**
+ * @return the messageId
+ */
+ public long getMessageId()
+ {
+ return messageId;
+ }
+
+ /**
+ * @return the body
+ */
+ public byte[] getBody()
+ {
+ return body;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationLargemessageEndMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationLargemessageEndMessage.java (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationLargemessageEndMessage.java 2009-10-10 23:35:44 UTC (rev 8078)
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.impl.wireformat;
+
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.utils.DataConstants;
+
+/**
+ * A ReplicationLargemessageEndMessage
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReplicationLargemessageEndMessage extends PacketImpl
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ long messageId;
+
+ boolean isDelete;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public ReplicationLargemessageEndMessage()
+ {
+ super(REPLICATION_LARGE_MESSAGE_END);
+ }
+
+ public ReplicationLargemessageEndMessage(final long messageId, final boolean isDelete)
+ {
+ this();
+ this.messageId = messageId;
+ this.isDelete = isDelete;
+ }
+
+ // Public --------------------------------------------------------
+
+ @Override
+ public int getRequiredBufferSize()
+ {
+ return BASIC_PACKET_SIZE + DataConstants.SIZE_LONG + DataConstants.SIZE_BOOLEAN;
+ }
+
+ @Override
+ public void encodeBody(final HornetQBuffer buffer)
+ {
+ buffer.writeLong(messageId);
+ buffer.writeBoolean(isDelete);
+ }
+
+ @Override
+ public void decodeBody(final HornetQBuffer buffer)
+ {
+ messageId = buffer.readLong();
+ isDelete = buffer.readBoolean();
+ }
+
+ /**
+ * @return the messageId
+ */
+ public long getMessageId()
+ {
+ return messageId;
+ }
+
+ /**
+ * @return the isDelete
+ */
+ public boolean isDelete()
+ {
+ return isDelete;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationPrepareMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationPrepareMessage.java 2009-10-09 20:24:02 UTC (rev 8077)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationPrepareMessage.java 2009-10-10 23:35:44 UTC (rev 8078)
@@ -49,7 +49,7 @@
super(REPLICATION_PREPARE);
}
- public ReplicationPrepareMessage(byte journalID, long txId, EncodingSupport encodingData)
+ public ReplicationPrepareMessage(final byte journalID, final long txId, final EncodingSupport encodingData)
{
this();
this.journalID = journalID;
@@ -59,10 +59,10 @@
// Public --------------------------------------------------------
+ @Override
public int getRequiredBufferSize()
{
- return BASIC_PACKET_SIZE +
- DataConstants.SIZE_BYTE +
+ return BASIC_PACKET_SIZE + DataConstants.SIZE_BYTE +
DataConstants.SIZE_LONG +
DataConstants.SIZE_INT +
(encodingData != null ? encodingData.getEncodeSize() : recordData.length);
@@ -100,7 +100,7 @@
{
return journalID;
}
-
+
/**
* @return the recordData
*/
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationResponseMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationResponseMessage.java 2009-10-09 20:24:02 UTC (rev 8077)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationResponseMessage.java 2009-10-10 23:35:44 UTC (rev 8078)
@@ -13,7 +13,6 @@
package org.hornetq.core.remoting.impl.wireformat;
-
/**
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @version <tt>$Revision$</tt>
@@ -32,18 +31,17 @@
{
super(REPLICATION_RESPONSE);
}
-
+
// Public --------------------------------------------------------
/* (non-Javadoc)
* @see org.hornetq.core.remoting.Packet#getRequiredBufferSize()
*/
+ @Override
public int getRequiredBufferSize()
{
return BASIC_PACKET_SIZE;
}
-
-
// Package protected ---------------------------------------------
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-10-09 20:24:02 UTC (rev 8077)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-10-10 23:35:44 UTC (rev 8078)
@@ -73,5 +73,13 @@
* @param pageNumber
*/
void pageWrite(PagedMessage message, int pageNumber);
+
+ void largeMessageBegin(byte [] header);
+
+ void largeMessageWrite(long messageId, byte [] body);
+
+ void largeMessageEnd(long messageId);
+
+ void largeMessageDelete(long messageId);
}
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-10-09 20:24:02 UTC (rev 8077)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-10-10 23:35:44 UTC (rev 8078)
@@ -13,6 +13,10 @@
package org.hornetq.core.replication.impl;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_BEGIN;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_END;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_WRITE;
+
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -33,12 +37,16 @@
import org.hornetq.core.remoting.impl.wireformat.ReplicationCommitMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteTXMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageBeingMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageWriteMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationLargemessageEndMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationPageEventMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationPageWriteMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationPrepareMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationResponseMessage;
import org.hornetq.core.replication.ReplicationEndpoint;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.utils.SimpleString;
@@ -72,6 +80,8 @@
private PagingManager pageManager;
private final ConcurrentMap<SimpleString, ConcurrentMap<Integer, Page>> pageIndex = new ConcurrentHashMap<SimpleString, ConcurrentMap<Integer, Page>>();
+
+ private final ConcurrentMap<Long, LargeServerMessage> largeMessages = new ConcurrentHashMap<Long, LargeServerMessage>();
// Constructors --------------------------------------------------
public ReplicationEndpointImpl(final HornetQServer server)
@@ -120,7 +130,22 @@
{
handlePageEvent((ReplicationPageEventMessage)packet);
}
-
+ else if (packet.getType() == REPLICATION_LARGE_MESSAGE_BEGIN)
+ {
+ handleLargeMessageBegin((ReplicationLargeMessageBeingMessage)packet);
+ }
+ else if (packet.getType() == REPLICATION_LARGE_MESSAGE_WRITE)
+ {
+ handleLargeMessageWrite((ReplicationLargeMessageWriteMessage)packet);
+ }
+ else if (packet.getType() == REPLICATION_LARGE_MESSAGE_END)
+ {
+ handleLargeMessageEnd((ReplicationLargemessageEndMessage)packet);
+ }
+ else
+ {
+ log.warn("Packet " + packet + " can't be processed by the ReplicationEndpoint");
+ }
}
catch (Exception e)
{
@@ -171,6 +196,31 @@
{
channel.close();
storage.stop();
+
+ for (ConcurrentMap<Integer, Page> map : pageIndex.values())
+ {
+ for (Page page : map.values())
+ {
+ try
+ {
+ page.close();
+ }
+ catch (Exception e)
+ {
+ log.warn("Error while closing the page on backup", e);
+ }
+ }
+ }
+
+ pageIndex.clear();
+
+
+ for (LargeServerMessage largeMessage : largeMessages.values())
+ {
+ largeMessage.releaseResources();
+ }
+
+ largeMessages.clear();
}
/* (non-Javadoc)
@@ -194,10 +244,87 @@
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
+ /**
+ * @param packet
+ */
+ private void handleLargeMessageEnd(ReplicationLargemessageEndMessage packet)
+ {
+ LargeServerMessage message = lookupLargeMessage(packet.getMessageId(), packet.isDelete());
+ if (message != null)
+ {
+ if (packet.isDelete())
+ {
+ try
+ {
+ message.deleteFile();
+ }
+ catch (Exception e)
+ {
+ log.warn("Error deleting large message ID = " + packet.getMessageId(), e);
+ }
+ }
+ else
+ {
+ try
+ {
+ message.setStored();
+ }
+ catch (Exception e)
+ {
+ log.warn("Error deleting large message ID = " + packet.getMessageId(), e);
+ }
+ }
+ }
+ }
/**
* @param packet
*/
+ private void handleLargeMessageWrite(ReplicationLargeMessageWriteMessage packet) throws Exception
+ {
+ LargeServerMessage message = lookupLargeMessage(packet.getMessageId(), false);
+ if (message != null)
+ {
+ message.addBytes(packet.getBody());
+ }
+ }
+
+
+ private LargeServerMessage lookupLargeMessage(long messageId, boolean isDelete)
+ {
+ LargeServerMessage message;
+
+ if (isDelete)
+ {
+ message = largeMessages.remove(messageId);
+ }
+ else
+ {
+ message = largeMessages.get(messageId);
+ }
+
+ if (message == null)
+ {
+ log.warn("Large MessageID " + messageId + " is not available on backup server. Ignoring replication message");
+ }
+
+ return message;
+
+ }
+
+ /**
+ * @param packet
+ */
+ private void handleLargeMessageBegin(ReplicationLargeMessageBeingMessage packet)
+ {
+ LargeServerMessage largeMessage = storage.createLargeMessage(packet.getHeader());
+ this.largeMessages.put(largeMessage.getMessageID(), largeMessage);
+ }
+
+
+ /**
+ * @param packet
+ */
private void handleCommitRollback(final ReplicationCommitMessage packet) throws Exception
{
Journal journalToUse = getJournal(packet.getJournalID());
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-10-09 20:24:02 UTC (rev 8077)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-10-10 23:35:44 UTC (rev 8078)
@@ -33,6 +33,9 @@
import org.hornetq.core.remoting.impl.wireformat.ReplicationCommitMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteTXMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageBeingMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageWriteMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationLargemessageEndMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationPageEventMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationPageWriteMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationPrepareMessage;
@@ -252,8 +255,54 @@
sendReplicatePacket(new ReplicationPageWriteMessage(message, pageNumber));
}
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationManager#largeMessageBegin(byte[])
+ */
+ public void largeMessageBegin(byte[] header)
+ {
+ if (enabled)
+ {
+ sendReplicatePacket(new ReplicationLargeMessageBeingMessage(header));
+ }
+ }
/* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationManager#largeMessageDelete(long)
+ */
+ public void largeMessageDelete(long messageId)
+ {
+ if (enabled)
+ {
+ sendReplicatePacket(new ReplicationLargemessageEndMessage(messageId, true));
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationManager#largeMessageEnd(long)
+ */
+ public void largeMessageEnd(long messageId)
+ {
+ if (enabled)
+ {
+ sendReplicatePacket(new ReplicationLargemessageEndMessage(messageId, false));
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationManager#largeMessageWrite(long, byte[])
+ */
+ public void largeMessageWrite(long messageId, byte[] body)
+ {
+ if (enabled)
+ {
+ sendReplicatePacket(new ReplicationLargeMessageWriteMessage(messageId, body));
+ }
+ }
+
+
+
+ /* (non-Javadoc)
* @see org.hornetq.core.server.HornetQComponent#isStarted()
*/
public synchronized boolean isStarted()
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-10-09 20:24:02 UTC (rev 8077)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-10-10 23:35:44 UTC (rev 8078)
@@ -164,7 +164,6 @@
private final SimpleString nodeID;
// The current currentLargeMessage being processed
- // In case of replication, currentLargeMessage should only be accessed within the replication callbacks
private volatile LargeServerMessage currentLargeMessage;
private ServerSessionPacketHandler handler;
@@ -1688,13 +1687,7 @@
private LargeServerMessage createLargeMessageStorage(final byte[] header) throws Exception
{
- LargeServerMessage largeMessage = storageManager.createLargeMessage();
-
- HornetQBuffer headerBuffer = ChannelBuffers.wrappedBuffer(header);
-
- largeMessage.decodeProperties(headerBuffer);
-
- return largeMessage;
+ return storageManager.createLargeMessage(header);
}
private void doRollback(final boolean lastMessageAsDelived, final Transaction theTx) throws Exception
Modified: branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-10-09 20:24:02 UTC (rev 8077)
+++ branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-10-10 23:35:44 UTC (rev 8078)
@@ -67,6 +67,7 @@
import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.HornetQThreadFactory;
import org.hornetq.utils.SimpleString;
+import org.jboss.netty.channel.Channels;
/**
* A ReplicationTest
@@ -231,8 +232,23 @@
manager.pageDeleted(dummy, 4);
manager.pageDeleted(dummy, 5);
manager.pageDeleted(dummy, 6);
+
+ blockOnReplication(manager);
+ ServerMessageImpl serverMsg = new ServerMessageImpl();
+ serverMsg.setMessageID(500);
+ serverMsg.setDestination(new SimpleString("tttt"));
+
+
+ HornetQBuffer buffer = ChannelBuffers.dynamicBuffer(100);
+ serverMsg.encodeProperties(buffer);
+
+ manager.largeMessageBegin(buffer.array());
+ manager.largeMessageWrite(500, new byte[1024]);
+
+ manager.largeMessageEnd(500);
+
blockOnReplication(manager);
store.start();
More information about the hornetq-commits
mailing list