JBoss hornetq SVN: r11310 - branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2011-09-09 08:43:32 -0400 (Fri, 09 Sep 2011)
New Revision: 11310
Removed:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompException.java
Log:
del
Deleted: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompException.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompException.java 2011-09-09 12:42:03 UTC (rev 11309)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompException.java 2011-09-09 12:43:32 UTC (rev 11310)
@@ -1,57 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.hornetq.core.protocol.stomp;
-
-import java.io.IOException;
-
-/**
- * @author <a href="http://hiramchirino.com">chirino</a>
- */
-class StompException extends IOException
-{
- private static final long serialVersionUID = -2869735532997332242L;
-
- private final boolean fatal;
-
- public StompException()
- {
- this(null);
- }
-
- public StompException(String s)
- {
- this(s, false);
- }
-
- public StompException(String s, boolean fatal)
- {
- this(s, fatal, null);
- }
-
- public StompException(String s, boolean fatal, Throwable cause)
- {
- super(s);
- this.fatal = fatal;
- initCause(cause);
- }
-
- public boolean isFatal()
- {
- return fatal;
- }
-}
13 years, 3 months
JBoss hornetq SVN: r11309 - in branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp: v10 and 1 other directories.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2011-09-09 08:42:03 -0400 (Fri, 09 Sep 2011)
New Revision: 11309
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompUtils.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameV10.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java
Log:
more code
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java 2011-09-08 16:13:30 UTC (rev 11308)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java 2011-09-09 12:42:03 UTC (rev 11309)
@@ -122,6 +122,8 @@
public static final byte TAB = (byte)'\t';
+ public static final String CONTENT_TYPE_HEADER_NAME = "content-type";
+
public static String CONTENT_LENGTH_HEADER_NAME = "content-length";
public byte[] workingBuffer = new byte[1024];
@@ -147,6 +149,8 @@
public boolean whiteSpaceOnly;
public int contentLength;
+
+ public String contentType;
public int bodyStart;
@@ -599,6 +603,8 @@
contentLength = -1;
+ contentType = null;
+
bodyStart = -1;
}
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java 2011-09-08 16:13:30 UTC (rev 11308)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java 2011-09-09 12:42:03 UTC (rev 11309)
@@ -37,28 +37,25 @@
*/
public class StompFrame
{
- private static final Logger log = Logger.getLogger(StompFrame.class);
+ protected static final Logger log = Logger.getLogger(StompFrame.class);
- public static final byte[] NO_DATA = new byte[] {};
+ protected static final byte[] NO_DATA = new byte[] {};
- private static final byte[] END_OF_FRAME = new byte[] { 0, '\n' };
+ protected static final byte[] END_OF_FRAME = new byte[] { 0, '\n' };
- private String command;
+ protected String command;
- private Map<String, String> headers;
-
- //stomp 1.1 talks about repetitive headers.
- private List<Header> allHeaders = new ArrayList<Header>();
+ protected Map<String, String> headers;
- private String body;
+ protected String body;
- private byte[] bytesBody;
+ protected byte[] bytesBody;
- private HornetQBuffer buffer = null;
+ protected HornetQBuffer buffer = null;
- private int size;
+ protected int size;
- private boolean disconnect;
+ protected boolean disconnect;
public StompFrame(String command)
{
@@ -111,7 +108,6 @@
out += body;
return out;
}
-
public HornetQBuffer toHornetQBuffer() throws Exception
{
@@ -149,11 +145,7 @@
public void addHeader(String key, String val)
{
- if (!headers.containsKey(key))
- {
- headers.put(key, val);
- }
- allHeaders.add(new Header(key, val));
+ headers.put(key, val);
}
public Map<String, String> getHeadersMap()
@@ -171,11 +163,31 @@
this.key = key;
this.val = val;
}
+
+ public String getEscapedKey()
+ {
+ return escape(key);
+ }
+
+ public String getEscapedValue()
+ {
+ return escape(val);
+ }
+
+ private String escape(String str)
+ {
+ str = str.replaceAll("\n", "\\n");
+ str = str.replaceAll("\\", "\\\\");
+ str = str.replaceAll(":", "\\:");
+
+ return str;
+ }
}
- public void setBody(String body)
+ public void setBody(String body) throws UnsupportedEncodingException
{
this.body = body;
+ this.bytesBody = body.getBytes("UTF-8");
}
public boolean hasHeader(String key)
@@ -191,11 +203,7 @@
//Since 1.1, there is a content-type header that needs to take care of
public byte[] getBodyAsBytes() throws UnsupportedEncodingException
{
- if (body != null)
- {
- return body.getBytes("UTF-8");
- }
- return new byte[0];
+ return bytesBody;
}
public boolean needsDisconnect()
@@ -203,11 +211,6 @@
return disconnect;
}
- public List<Header> getHeaders()
- {
- return this.allHeaders;
- }
-
public void setByteBody(byte[] content)
{
this.bytesBody = content;
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompUtils.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompUtils.java 2011-09-08 16:13:30 UTC (rev 11308)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompUtils.java 2011-09-09 12:42:03 UTC (rev 11309)
@@ -46,6 +46,8 @@
public static void copyStandardHeadersFromFrameToMessage(StompFrame frame, ServerMessageImpl msg) throws Exception
{
+ Map<String, String> headers = new HashMap<String, String>(frame.getHeadersMap());
+
String priority = (String)headers.remove(Stomp.Headers.Send.PRIORITY);
if (priority != null)
{
@@ -80,10 +82,10 @@
}
// now the general headers
- for (Iterator<Map.Entry<String, Object>> iter = headers.entrySet().iterator(); iter.hasNext();)
+ for (Iterator<Map.Entry<String, String>> iter = headers.entrySet().iterator(); iter.hasNext();)
{
- Map.Entry<String, Object> entry = iter.next();
- String name = (String)entry.getKey();
+ Map.Entry<String, String> entry = iter.next();
+ String name = entry.getKey();
Object value = entry.getValue();
msg.putObjectProperty(name, value);
}
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java 2011-09-08 16:13:30 UTC (rev 11308)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java 2011-09-09 12:42:03 UTC (rev 11309)
@@ -122,14 +122,7 @@
public StompFrame handleReceipt(String receiptID)
{
StompFrame receipt = new StompFrame(Stomp.Responses.RECEIPT);
- try
- {
- receipt.addHeader(Stomp.Headers.Response.RECEIPT_ID, receiptID);
- }
- catch (HornetQStompException e)
- {
- return e.getFrame();
- }
+ receipt.addHeader(Stomp.Headers.Response.RECEIPT_ID, receiptID);
return receipt;
}
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java 2011-09-08 16:13:30 UTC (rev 11308)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java 2011-09-09 12:42:03 UTC (rev 11309)
@@ -75,9 +75,17 @@
else
{
//not valid
- response = new StompFrame(Stomp.Responses.ERROR);
+ response = new StompFrameV10(Stomp.Responses.ERROR);
response.addHeader(Stomp.Headers.Error.MESSAGE, "Failed to connect");
- response.setBody("The login account is not valid.");
+ try
+ {
+ response.setBody("The login account is not valid.");
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ log.error("Encoding problem", e);
+ //then we will send a null body message.
+ }
connection.sendFrame(response);
connection.destroy();
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameV10.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameV10.java 2011-09-08 16:13:30 UTC (rev 11308)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameV10.java 2011-09-09 12:42:03 UTC (rev 11309)
@@ -1,22 +1,28 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
package org.hornetq.core.protocol.stomp.v10;
-import org.hornetq.core.protocol.stomp.HornetQStompException;
import org.hornetq.core.protocol.stomp.StompFrame;
+/**
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ */
public class StompFrameV10 extends StompFrame
{
public StompFrameV10(String command)
{
super(command);
}
-
- @Override
- public void addHeader(String key, String val) throws HornetQStompException
- {
- //trimming
- String newKey = key.trim();
- String newVal = val.trim();
- super.addHeader(newKey, newVal);
- }
}
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-09-08 16:13:30 UTC (rev 11308)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-09-09 12:42:03 UTC (rev 11309)
@@ -17,12 +17,14 @@
import java.util.concurrent.atomic.AtomicLong;
import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.protocol.stomp.FrameEventListener;
import org.hornetq.core.protocol.stomp.HornetQStompException;
+import org.hornetq.core.protocol.stomp.SimpleBytes;
import org.hornetq.core.protocol.stomp.Stomp;
import org.hornetq.core.protocol.stomp.StompConnection;
import org.hornetq.core.protocol.stomp.StompDecoder;
@@ -69,7 +71,7 @@
connection.setClientID(clientID);
connection.setValid(true);
- response = new StompFrame(Stomp.Responses.CONNECTED);
+ response = new StompFrameV11(Stomp.Responses.CONNECTED);
// version
response.addHeader(Stomp.Headers.Connected.VERSION,
@@ -113,6 +115,10 @@
{
response = e.getFrame();
}
+ catch (UnsupportedEncodingException e)
+ {
+ response = new HornetQStompException("Encoding error.", e).getFrame();
+ }
return response;
}
@@ -437,7 +443,7 @@
}
}
- public StompFrame createPingFrame()
+ public StompFrame createPingFrame() throws UnsupportedEncodingException
{
StompFrame frame = new StompFrame(Stomp.Commands.STOMP);
frame.setBody("\n");
@@ -480,7 +486,14 @@
public void run()
{
lastAccepted.set(System.currentTimeMillis());
- pingFrame = createPingFrame();
+ try
+ {
+ pingFrame = createPingFrame();
+ }
+ catch (UnsupportedEncodingException e1)
+ {
+ log.error("Cannot create ping frame due to encoding problem.", e1);
+ }
synchronized (this)
{
@@ -880,6 +893,11 @@
{
decoder.contentLength = Integer.parseInt(headerValue);
}
+
+ if (decoder.headerName.equals(StompDecoder.CONTENT_TYPE_HEADER_NAME))
+ {
+ decoder.contentType = headerValue;
+ }
decoder.whiteSpaceOnly = true;
@@ -974,4 +992,5 @@
return null;
}
}
+
}
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java 2011-09-08 16:13:30 UTC (rev 11308)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java 2011-09-09 12:42:03 UTC (rev 11309)
@@ -1,14 +1,38 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
package org.hornetq.core.protocol.stomp.v11;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
import org.hornetq.core.protocol.stomp.HornetQStompException;
+import org.hornetq.core.protocol.stomp.SimpleBytes;
+import org.hornetq.core.protocol.stomp.Stomp;
import org.hornetq.core.protocol.stomp.StompFrame;
+import org.hornetq.core.protocol.stomp.StompFrame.Header;
+/**
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ */
public class StompFrameV11 extends StompFrame
{
- public static final char ESC_CHAR = '\\';
- public static final char COLON = ':';
+ //stomp 1.1 talks about repetitive headers.
+ private List<Header> allHeaders = new ArrayList<Header>();
+ private String contentType;
public StompFrameV11(String command, Map<String, String> headers, byte[] content)
{
@@ -19,76 +43,46 @@
{
super(command);
}
-
- public static String escaping(String rawString) throws HornetQStompException
+
+ @Override
+ public HornetQBuffer toHornetQBuffer() throws Exception
{
- int len = rawString.length();
-
- SimpleBytes sb = new SimpleBytes(1024);
-
- boolean beginEsc = false;
- for (int i = 0; i < len; i++)
+ if (buffer == null)
{
- char k = rawString.charAt(i);
+ buffer = HornetQBuffers.dynamicBuffer(bytesBody.length + 512);
- if (k == ESC_CHAR)
+ StringBuffer head = new StringBuffer();
+ head.append(command);
+ head.append(Stomp.NEWLINE);
+ // Output the headers.
+ for (Header h : allHeaders)
{
- if (beginEsc)
- {
- //it is a backslash
- sb.append('\\');
- beginEsc = false;
- }
- else
- {
- beginEsc = true;
- }
+ head.append(h.getEscapedKey());
+ head.append(Stomp.Headers.SEPARATOR);
+ head.append(h.getEscapedValue());
+ head.append(Stomp.NEWLINE);
}
- else if (k == 'n')
- {
- if (beginEsc)
- {
- //it is a newline
- sb.append('\n');
- beginEsc = false;
- }
- else
- {
- sb.append(k);
- }
- }
- else if (k == ':')
- {
- if (beginEsc)
- {
- sb.append(k);
- beginEsc = false;
- }
- else
- {
- //error
- throw new HornetQStompException("Colon not escaped!");
- }
- }
- else
- {
- if (beginEsc)
- {
- //error, no other escape defined.
- throw new HornetQStompException("Bad escape char found: " + k);
- }
- else
- {
- sb.append(k);
- }
- }
+ // Add a newline to separate the headers from the content.
+ head.append(Stomp.NEWLINE);
+
+ buffer.writeBytes(head.toString().getBytes("UTF-8"));
+ buffer.writeBytes(bytesBody);
+ buffer.writeBytes(END_OF_FRAME);
+
+ size = buffer.writerIndex();
}
- return sb.toString();
+ return buffer;
}
-
- public static void main(String[] args)
+
+ @Override
+ public void addHeader(String key, String val)
{
- String rawStr = "hello world\\n\\:"
+ if (!headers.containsKey(key))
+ {
+ headers.put(key, val);
+ }
+ allHeaders.add(new Header(key, val));
}
+
}
13 years, 3 months
JBoss hornetq SVN: r11308 - in branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core: persistence/impl/journal and 4 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-09-08 12:13:30 -0400 (Thu, 08 Sep 2011)
New Revision: 11308
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
HORNETQ-720 (not final) Assume that a problem during the start of replication was on the
backup side, as an actual IO_ERROR would also cause all sorts of trouble on the live itself
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java 2011-09-08 16:12:41 UTC (rev 11307)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java 2011-09-08 16:13:30 UTC (rev 11308)
@@ -34,6 +34,7 @@
import org.hornetq.core.persistence.config.PersistedRoles;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.replication.ReplicationManager;
import org.hornetq.core.server.HornetQComponent;
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.MessageReference;
@@ -239,4 +240,11 @@
*/
Journal getMessageJournal();
+ /**
+ * @param replicationManager
+ * @param pagingManager
+ * @throws Exception
+ */
+ void startReplication(ReplicationManager replicationManager, PagingManager pagingManager) throws Exception;
+
}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-08 16:12:41 UTC (rev 11307)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-08 16:13:30 UTC (rev 11308)
@@ -351,6 +351,7 @@
* @param pagingManager
* @throws HornetQException
*/
+ @Override
public void startReplication(ReplicationManager replicationManager, PagingManager pagingManager) throws Exception
{
if (!started)
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2011-09-08 16:12:41 UTC (rev 11307)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2011-09-08 16:13:30 UTC (rev 11308)
@@ -588,4 +588,10 @@
return null;
}
+ @Override
+ public void startReplication(ReplicationManager replicationManager, PagingManager pagingManager) throws Exception
+ {
+ // no-op
+ }
+
}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-09-08 16:12:41 UTC (rev 11307)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-09-08 16:13:30 UTC (rev 11308)
@@ -30,8 +30,8 @@
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.ServerSessionPacketHandler;
import org.hornetq.core.protocol.core.impl.ChannelImpl.CHANNEL_ID;
-import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.BackupRegistrationMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
import org.hornetq.core.protocol.core.impl.wireformat.Ping;
import org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage;
@@ -149,17 +149,11 @@
else if (packet.getType() == PacketImpl.BACKUP_REGISTRATION)
{
BackupRegistrationMessage msg = (BackupRegistrationMessage)packet;
- try
+ if (server.startReplication(rc))
{
- server.addHaBackup(rc);
+ server.getClusterManager().notifyNodeUp(msg.getNodeID(), getPair(msg.getConnector(), true), true,
+ true);
}
- catch (Exception e)
- {
- // XXX HORNETQ-720 This is not what we want
- e.printStackTrace();
- throw new RuntimeException(e);
- }
- server.getClusterManager().notifyNodeUp(msg.getNodeID(), getPair(msg.getConnector(), true), true, true);
}
}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java 2011-09-08 16:12:41 UTC (rev 11307)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java 2011-09-08 16:13:30 UTC (rev 11308)
@@ -172,5 +172,9 @@
void stop(boolean failoverOnServerShutdown) throws Exception;
- void addHaBackup(CoreRemotingConnection rc) throws Exception;
+ /**
+ * @param rc
+ * @return {@code true} if replication started successfully, {@code false} otherwise
+ */
+ boolean startReplication(CoreRemotingConnection rc);
}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-08 16:12:41 UTC (rev 11307)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-08 16:13:30 UTC (rev 11308)
@@ -1992,18 +1992,27 @@
}
@Override
- public void addHaBackup(CoreRemotingConnection rc) throws Exception
+ public boolean startReplication(CoreRemotingConnection rc)
{
- if (!(storageManager instanceof JournalStorageManager))
+ replicationManager = new ReplicationManagerImpl(rc, executorFactory);
+ try
{
- throw new HornetQException(HornetQException.INTERNAL_ERROR, "unknown implementation of JournalStorageManager!");
+ replicationManager.start();
+ storageManager.startReplication(replicationManager, pagingManager);
+ return true;
}
-
- JournalStorageManager journalStorageManager = (JournalStorageManager)storageManager;
- replicationManager = new ReplicationManagerImpl(rc, executorFactory);
- replicationManager.start();
-
- journalStorageManager.startReplication(replicationManager, pagingManager);
+ catch (Exception e)
+ {
+ /*
+ * The reasoning here is that the exception was either caused by (1) the (interaction with)
+ * the backup, or (2) by an IO Error at the storage. If (1), we can swallow the exception
+ * and ignore the replication request. If (2) the live will crash shortly.
+ */
+ // HORNETQ-720 Need to verify whether swallowing the exception here is acceptable
+ log.warn("Exception when trying to start replication", e);
+ replicationManager = null;
+ return false;
+ }
}
/**
13 years, 3 months
JBoss hornetq SVN: r11307 - branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: borges
Date: 2011-09-08 12:12:41 -0400 (Thu, 08 Sep 2011)
New Revision: 11307
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
Change some markers from HORNETQ-720 to HORNETQ-768
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-08 16:12:12 UTC (rev 11306)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-08 16:12:41 UTC (rev 11307)
@@ -563,7 +563,7 @@
if (liveServerSessionFactory == null)
{
- // XXX HORNETQ-720
+ // XXX HORNETQ-768
throw new RuntimeException("Need to retry...");
}
CoreRemotingConnection liveConnection = liveServerSessionFactory.getConnection();
@@ -597,7 +597,7 @@
if (!isRemoteBackupUpToDate())
{
/*
- * XXX HORNETQ-720 Live is down, and this server was not in sync. Perhaps we should
+ * XXX HORNETQ-768 Live is down, and this server was not in sync. Perhaps we should
* first try to wait a little longer to see if the 'live' comes back?
*/
throw new HornetQException(HornetQException.ILLEGAL_STATE, "Backup Server was not yet in sync with live");
13 years, 3 months
JBoss hornetq SVN: r11306 - in branches/HORNETQ-720_Replication: hornetq-journal/src/main/java/org/hornetq/core/journal/impl and 1 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-09-08 12:12:12 -0400 (Thu, 08 Sep 2011)
New Revision: 11306
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
HORNETQ-720 Add new methods to Journal implementations.
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java 2011-09-08 15:35:08 UTC (rev 11305)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java 2011-09-08 16:12:12 UTC (rev 11306)
@@ -14,6 +14,7 @@
package org.hornetq.core.replication.impl;
import java.util.List;
+import java.util.Map;
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.IOCompletion;
@@ -23,6 +24,7 @@
import org.hornetq.core.journal.PreparedTransactionInfo;
import org.hornetq.core.journal.RecordInfo;
import org.hornetq.core.journal.TransactionFailureCallback;
+import org.hornetq.core.journal.impl.JournalFile;
import org.hornetq.core.journal.impl.dataformat.ByteArrayEncoding;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.persistence.OperationContext;
@@ -576,6 +578,12 @@
return localJournal.loadSyncOnly();
}
+ @Override
+ public JournalFile createFilesForBackupSync(long[] fileIds, Map<Long, JournalFile> mapToFill) throws Exception
+ {
+ throw new UnsupportedOperationException("This method should only be called at a replicating backup");
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java 2011-09-08 15:35:08 UTC (rev 11305)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java 2011-09-08 16:12:12 UTC (rev 11306)
@@ -1,6 +1,7 @@
package org.hornetq.core.journal.impl;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
@@ -257,4 +258,10 @@
{
throw new UnsupportedOperationException();
}
+
+ @Override
+ public JournalFile createFilesForBackupSync(long[] fileIds, Map<Long, JournalFile> mapToFill) throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
}
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java 2011-09-08 15:35:08 UTC (rev 11305)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java 2011-09-08 16:12:12 UTC (rev 11306)
@@ -16,6 +16,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -51,6 +52,7 @@
import org.hornetq.core.journal.PreparedTransactionInfo;
import org.hornetq.core.journal.RecordInfo;
import org.hornetq.core.journal.TransactionFailureCallback;
+import org.hornetq.core.journal.impl.JournalFile;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.paging.PagingStore;
@@ -594,7 +596,7 @@
};
- static class FakeJournal implements Journal
+ static final class FakeJournal implements Journal
{
public
@@ -846,10 +848,6 @@
}
- /*
- * (non-Javadoc)
- * @see org.hornetq.core.journal.Journal#loadSyncOnly()
- */
@Override
public JournalLoadInformation loadSyncOnly() throws Exception
{
@@ -857,5 +855,12 @@
return null;
}
+ @Override
+ public JournalFile createFilesForBackupSync(long[] fileIds, Map<Long, JournalFile> mapToFill) throws Exception
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
}
}
13 years, 3 months
JBoss hornetq SVN: r11305 - in branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp: v10 and 1 other directories.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2011-09-08 11:35:08 -0400 (Thu, 08 Sep 2011)
New Revision: 11305
Added:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/SimpleBytes.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameV10.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
Log:
char escaping
Added: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/SimpleBytes.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/SimpleBytes.java (rev 0)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/SimpleBytes.java 2011-09-08 15:35:08 UTC (rev 11305)
@@ -0,0 +1,44 @@
+package org.hornetq.core.protocol.stomp;
+
+import java.io.UnsupportedEncodingException;
+
+
+public class SimpleBytes
+{
+ private int step;
+ private byte[] contents;
+ private int index;
+
+ public SimpleBytes(int initCapacity)
+ {
+ this.step = initCapacity;
+ contents = new byte[initCapacity];
+ index = 0;
+ }
+
+ public String getString() throws UnsupportedEncodingException
+ {
+ if (index == 0) return "";
+ byte[] realData = new byte[index];
+ System.arraycopy(contents, 0, realData, 0, realData.length);
+
+ return new String(realData, "UTF-8");
+ }
+
+ public void reset()
+ {
+ index = 0;
+ }
+
+ public void append(byte b)
+ {
+ if (index >= contents.length)
+ {
+ //grow
+ byte[] newBuffer = new byte[contents.length + step];
+ System.arraycopy(contents, 0, newBuffer, 0, contents.length);
+ contents = newBuffer;
+ }
+ contents[index++] = b;
+ }
+}
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java 2011-09-08 15:06:11 UTC (rev 11304)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java 2011-09-08 15:35:08 UTC (rev 11305)
@@ -61,7 +61,7 @@
private final long creationTime;
- private StompDecoder decoder = new StompDecoder();
+ private StompDecoder decoder;
private final List<FailureListener> failureListeners = new CopyOnWriteArrayList<FailureListener>();
@@ -90,6 +90,8 @@
this.manager = manager;
+ this.decoder = new StompDecoder(this);
+
this.creationTime = System.currentTimeMillis();
}
@@ -697,4 +699,9 @@
}
}
+
+ public VersionedStompFrameHandler getFrameHandler()
+ {
+ return this.frameHandler;
+ }
}
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java 2011-09-08 15:06:11 UTC (rev 11304)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java 2011-09-08 15:35:08 UTC (rev 11305)
@@ -30,118 +30,131 @@
{
private static final Logger log = Logger.getLogger(StompDecoder.class);
- private static final boolean TRIM_LEADING_HEADER_VALUE_WHITESPACE = true;
+ public static final boolean TRIM_LEADING_HEADER_VALUE_WHITESPACE = true;
- private static final String COMMAND_ABORT = "ABORT";
+ public static final String COMMAND_ABORT = "ABORT";
- private static final int COMMAND_ABORT_LENGTH = COMMAND_ABORT.length();
+ public static final int COMMAND_ABORT_LENGTH = COMMAND_ABORT.length();
- private static final String COMMAND_ACK = "ACK";
+ public static final String COMMAND_ACK = "ACK";
- private static final int COMMAND_ACK_LENGTH = COMMAND_ACK.length();
+ public static final int COMMAND_ACK_LENGTH = COMMAND_ACK.length();
- private static final String COMMAND_BEGIN = "BEGIN";
+ public static final String COMMAND_NACK = "NACK";
- private static final int COMMAND_BEGIN_LENGTH = COMMAND_BEGIN.length();
+ public static final int COMMAND_NACK_LENGTH = COMMAND_NACK.length();
- private static final String COMMAND_COMMIT = "COMMIT";
+ public static final String COMMAND_BEGIN = "BEGIN";
- private static final int COMMAND_COMMIT_LENGTH = COMMAND_COMMIT.length();
+ public static final int COMMAND_BEGIN_LENGTH = COMMAND_BEGIN.length();
- private static final String COMMAND_CONNECT = "CONNECT";
+ public static final String COMMAND_COMMIT = "COMMIT";
- private static final int COMMAND_CONNECT_LENGTH = COMMAND_CONNECT.length();
+ public static final int COMMAND_COMMIT_LENGTH = COMMAND_COMMIT.length();
- private static final String COMMAND_DISCONNECT = "DISCONNECT";
+ public static final String COMMAND_CONNECT = "CONNECT";
- private static final int COMMAND_DISCONNECT_LENGTH = COMMAND_DISCONNECT.length();
+ public static final int COMMAND_CONNECT_LENGTH = COMMAND_CONNECT.length();
- private static final String COMMAND_SEND = "SEND";
+ public static final String COMMAND_DISCONNECT = "DISCONNECT";
- private static final int COMMAND_SEND_LENGTH = COMMAND_SEND.length();
+ public static final int COMMAND_DISCONNECT_LENGTH = COMMAND_DISCONNECT.length();
- private static final String COMMAND_SUBSCRIBE = "SUBSCRIBE";
+ public static final String COMMAND_SEND = "SEND";
- private static final int COMMAND_SUBSCRIBE_LENGTH = COMMAND_SUBSCRIBE.length();
+ public static final int COMMAND_SEND_LENGTH = COMMAND_SEND.length();
- private static final String COMMAND_UNSUBSCRIBE = "UNSUBSCRIBE";
+ public static final String COMMAND_STOMP = "STOMP";
- private static final int COMMAND_UNSUBSCRIBE_LENGTH = COMMAND_UNSUBSCRIBE.length();
+ public static final int COMMAND_STOMP_LENGTH = COMMAND_STOMP.length();
+ public static final String COMMAND_SUBSCRIBE = "SUBSCRIBE";
+
+ public static final int COMMAND_SUBSCRIBE_LENGTH = COMMAND_SUBSCRIBE.length();
+
+ public static final String COMMAND_UNSUBSCRIBE = "UNSUBSCRIBE";
+
+ public static final int COMMAND_UNSUBSCRIBE_LENGTH = COMMAND_UNSUBSCRIBE.length();
+
/**** added by meddy, 27 april 2011, handle header parser for reply to websocket protocol ****/
- private static final String COMMAND_CONNECTED = "CONNECTED";
+ public static final String COMMAND_CONNECTED = "CONNECTED";
- private static final int COMMAND_CONNECTED_LENGTH = COMMAND_CONNECTED.length();
+ public static final int COMMAND_CONNECTED_LENGTH = COMMAND_CONNECTED.length();
- private static final String COMMAND_MESSAGE = "MESSAGE";
+ public static final String COMMAND_MESSAGE = "MESSAGE";
- private static final int COMMAND_MESSAGE_LENGTH = COMMAND_MESSAGE.length();
+ public static final int COMMAND_MESSAGE_LENGTH = COMMAND_MESSAGE.length();
- private static final String COMMAND_ERROR = "ERROR";
+ public static final String COMMAND_ERROR = "ERROR";
- private static final int COMMAND_ERROR_LENGTH = COMMAND_ERROR.length();
+ public static final int COMMAND_ERROR_LENGTH = COMMAND_ERROR.length();
- private static final String COMMAND_RECEIPT = "RECEIPT";
+ public static final String COMMAND_RECEIPT = "RECEIPT";
- private static final int COMMAND_RECEIPT_LENGTH = COMMAND_RECEIPT.length();
+ public static final int COMMAND_RECEIPT_LENGTH = COMMAND_RECEIPT.length();
/**** end ****/
- private static final byte A = (byte)'A';
+ public static final byte A = (byte)'A';
- private static final byte B = (byte)'B';
+ public static final byte B = (byte)'B';
- private static final byte C = (byte)'C';
+ public static final byte C = (byte)'C';
- private static final byte D = (byte)'D';
+ public static final byte D = (byte)'D';
- private static final byte E = (byte)'E';
+ public static final byte E = (byte)'E';
- private static final byte M = (byte)'M';
+ public static final byte M = (byte)'M';
- private static final byte S = (byte)'S';
+ public static final byte S = (byte)'S';
- private static final byte R = (byte)'R';
+ public static final byte R = (byte)'R';
- private static final byte U = (byte)'U';
+ public static final byte U = (byte)'U';
- private static final byte HEADER_SEPARATOR = (byte)':';
+ public static final byte N = (byte)'N';
- private static final byte NEW_LINE = (byte)'\n';
+ public static final byte HEADER_SEPARATOR = (byte)':';
- private static final byte SPACE = (byte)' ';
+ public static final byte NEW_LINE = (byte)'\n';
- private static final byte TAB = (byte)'\t';
+ public static final byte SPACE = (byte)' ';
- private static String CONTENT_LENGTH_HEADER_NAME = "content-length";
+ public static final byte TAB = (byte)'\t';
- private byte[] workingBuffer = new byte[1024];
+ public static String CONTENT_LENGTH_HEADER_NAME = "content-length";
- private int pos;
+ public byte[] workingBuffer = new byte[1024];
- private int data;
+ public int pos;
- private String command;
+ public int data;
- private Map<String, Object> headers;
+ public String command;
- private int headerBytesCopyStart;
+ public Map<String, String> headers;
- private boolean readingHeaders;
+ public int headerBytesCopyStart;
- private boolean headerValueWhitespace;
+ public boolean readingHeaders;
- private boolean inHeaderName;
+ public boolean headerValueWhitespace;
- private String headerName;
+ public boolean inHeaderName;
- private boolean whiteSpaceOnly;
+ public String headerName;
- private int contentLength;
+ public boolean whiteSpaceOnly;
- private int bodyStart;
+ public int contentLength;
- public StompDecoder()
+ public int bodyStart;
+
+ public StompConnection connection;
+
+ public StompDecoder(StompConnection stompConnection)
{
+ this.connection = stompConnection;
init();
}
@@ -156,13 +169,25 @@
* followed by an empty line
* followed by an optional message body
* terminated with a null character
+ *
+ * Note: to support both 1.0 and 1.1, we just assemble a
+ * standard StompFrame and let the versioned handler to do more
+ * spec specific job (like trimming, escaping etc).
*/
public synchronized StompFrame decode(final HornetQBuffer buffer) throws Exception
{
- //log.info("got buff " + buffer.readableBytes());
+ if (connection.isValid())
+ {
+ VersionedStompFrameHandler handler = connection.getFrameHandler();
+ return handler.decode(this, buffer);
+ }
- long start = System.nanoTime();
-
+ return defaultDecode(buffer);
+ }
+
+ public StompFrame defaultDecode(final HornetQBuffer buffer) throws HornetQStompException
+ {
+
int readable = buffer.readableBytes();
if (data + readable >= workingBuffer.length)
@@ -375,8 +400,6 @@
throwInvalid();
}
}
-
- long commandTime = System.nanoTime() - start;
if (readingHeaders)
{
@@ -482,8 +505,6 @@
}
}
}
-
- long headersTime = System.nanoTime() - start - commandTime;
// Now the body
@@ -526,8 +547,6 @@
}
}
-
-
if (content != null)
{
if (data > pos)
@@ -546,34 +565,26 @@
StompFrame ret = new StompFrame(command, headers, content);
init();
-
- // log.info("decoded");
-
- long bodyTime = System.nanoTime() - start - headersTime - commandTime;
-
- // log.info("command: "+ commandTime + " headers: " + headersTime + " body: " + bodyTime);
return ret;
}
else
{
return null;
- }
+ }
}
- private void throwInvalid() throws StompException
+ public void throwInvalid() throws HornetQStompException
{
- throw new StompException("Invalid STOMP frame: " + this.dumpByteArray(workingBuffer));
+ throw new HornetQStompException("Invalid STOMP frame: " + this.dumpByteArray(workingBuffer));
}
- private void init()
+ public void init()
{
pos = 0;
command = null;
- headers = new HashMap<String, Object>();
-
this.headerBytesCopyStart = -1;
readingHeaders = true;
@@ -591,7 +602,7 @@
bodyStart = -1;
}
- private void resizeWorking(final int newSize)
+ public void resizeWorking(final int newSize)
{
byte[] oldBuffer = workingBuffer;
@@ -600,7 +611,7 @@
System.arraycopy(oldBuffer, 0, workingBuffer, 0, oldBuffer.length);
}
- private boolean tryIncrement(final int length)
+ public boolean tryIncrement(final int length)
{
if (pos + length >= data)
{
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java 2011-09-08 15:06:11 UTC (rev 11304)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java 2011-09-08 15:35:08 UTC (rev 11305)
@@ -51,6 +51,8 @@
private List<Header> allHeaders = new ArrayList<Header>();
private String body;
+
+ private byte[] bytesBody;
private HornetQBuffer buffer = null;
@@ -70,6 +72,14 @@
this.disconnect = disconnect;
}
+ public StompFrame(String command, Map<String, String> headers,
+ byte[] content)
+ {
+ this.command = command;
+ this.headers = headers;
+ this.bytesBody = content;
+ }
+
public String getCommand()
{
return command;
@@ -107,7 +117,7 @@
{
if (buffer == null)
{
- buffer = HornetQBuffers.dynamicBuffer(content.length + 512);
+ buffer = HornetQBuffers.dynamicBuffer(bytesBody.length + 512);
StringBuffer head = new StringBuffer();
head.append(command);
@@ -124,7 +134,7 @@
head.append(Stomp.NEWLINE);
buffer.writeBytes(head.toString().getBytes("UTF-8"));
- buffer.writeBytes(content);
+ buffer.writeBytes(bytesBody);
buffer.writeBytes(END_OF_FRAME);
size = buffer.writerIndex();
@@ -151,7 +161,7 @@
return headers;
}
- private class Header
+ public static class Header
{
public String key;
public String val;
@@ -192,4 +202,14 @@
{
return disconnect;
}
+
+ public List<Header> getHeaders()
+ {
+ return this.allHeaders;
+ }
+
+ public void setByteBody(byte[] content)
+ {
+ this.bytesBody = content;
+ }
}
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java 2011-09-08 15:06:11 UTC (rev 11304)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java 2011-09-08 15:35:08 UTC (rev 11305)
@@ -14,6 +14,7 @@
import java.io.UnsupportedEncodingException;
+import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.core.protocol.stomp.v10.StompFrameHandlerV10;
import org.hornetq.core.protocol.stomp.v11.StompFrameHandlerV11;
import org.hornetq.core.server.ServerMessage;
@@ -121,11 +122,23 @@
public StompFrame handleReceipt(String receiptID)
{
StompFrame receipt = new StompFrame(Stomp.Responses.RECEIPT);
- receipt.addHeader(Stomp.Headers.Response.RECEIPT_ID, receiptID);
+ try
+ {
+ receipt.addHeader(Stomp.Headers.Response.RECEIPT_ID, receiptID);
+ }
+ catch (HornetQStompException e)
+ {
+ return e.getFrame();
+ }
return receipt;
}
public abstract StompFrame createMessageFrame(ServerMessage serverMessage,
StompSubscription subscription, int deliveryCount) throws Exception;
+
+ public abstract StompFrame createStompFrame(String command);
+
+ public abstract StompFrame decode(StompDecoder decoder, final HornetQBuffer buffer) throws HornetQStompException;
+
}
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java 2011-09-08 15:06:11 UTC (rev 11304)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java 2011-09-08 15:35:08 UTC (rev 11305)
@@ -13,6 +13,7 @@
package org.hornetq.core.protocol.stomp.v10;
import java.io.UnsupportedEncodingException;
+import java.util.List;
import java.util.Map;
import org.hornetq.api.core.HornetQBuffer;
@@ -23,7 +24,9 @@
import org.hornetq.core.protocol.stomp.HornetQStompException;
import org.hornetq.core.protocol.stomp.Stomp;
import org.hornetq.core.protocol.stomp.StompConnection;
+import org.hornetq.core.protocol.stomp.StompDecoder;
import org.hornetq.core.protocol.stomp.StompFrame;
+import org.hornetq.core.protocol.stomp.StompFrame.Header;
import org.hornetq.core.protocol.stomp.StompSubscription;
import org.hornetq.core.protocol.stomp.StompUtils;
import org.hornetq.core.protocol.stomp.VersionedStompFrameHandler;
@@ -345,4 +348,15 @@
}
+ @Override
+ public StompFrame createStompFrame(String command)
+ {
+ return new StompFrameV10(command);
+ }
+
+ public StompFrame decode(StompDecoder decoder, final HornetQBuffer buffer) throws HornetQStompException
+ {
+ return decoder.defaultDecode(buffer);
+ }
+
}
Added: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameV10.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameV10.java (rev 0)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameV10.java 2011-09-08 15:35:08 UTC (rev 11305)
@@ -0,0 +1,22 @@
+package org.hornetq.core.protocol.stomp.v10;
+
+import org.hornetq.core.protocol.stomp.HornetQStompException;
+import org.hornetq.core.protocol.stomp.StompFrame;
+
+public class StompFrameV10 extends StompFrame
+{
+ public StompFrameV10(String command)
+ {
+ super(command);
+ }
+
+ @Override
+ public void addHeader(String key, String val) throws HornetQStompException
+ {
+ //trimming
+ String newKey = key.trim();
+ String newVal = val.trim();
+ super.addHeader(newKey, newVal);
+ }
+
+}
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-09-08 15:06:11 UTC (rev 11304)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-09-08 15:35:08 UTC (rev 11305)
@@ -12,6 +12,7 @@
*/
package org.hornetq.core.protocol.stomp.v11;
+import java.io.UnsupportedEncodingException;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
@@ -24,6 +25,7 @@
import org.hornetq.core.protocol.stomp.HornetQStompException;
import org.hornetq.core.protocol.stomp.Stomp;
import org.hornetq.core.protocol.stomp.StompConnection;
+import org.hornetq.core.protocol.stomp.StompDecoder;
import org.hornetq.core.protocol.stomp.StompFrame;
import org.hornetq.core.protocol.stomp.StompSubscription;
import org.hornetq.core.protocol.stomp.StompUtils;
@@ -40,6 +42,8 @@
{
private static final Logger log = Logger.getLogger(StompFrameHandlerV11.class);
+ private static final char ESC_CHAR = '\\';
+
private HeartBeater heartBeater;
public StompFrameHandlerV11(StompConnection connection)
@@ -538,4 +542,436 @@
}
}
+ @Override
+ public StompFrame createStompFrame(String command)
+ {
+ return new StompFrameV11(command);
+ }
+
+ //all frame except CONNECT are decoded here.
+ public StompFrame decode(StompDecoder decoder, final HornetQBuffer buffer) throws HornetQStompException
+ {
+ int readable = buffer.readableBytes();
+
+ if (decoder.data + readable >= decoder.workingBuffer.length)
+ {
+ decoder.resizeWorking(decoder.data + readable);
+ }
+
+ buffer.readBytes(decoder.workingBuffer, decoder.data, readable);
+
+ decoder.data += readable;
+
+ if (decoder.command == null)
+ {
+ if (decoder.data < 4)
+ {
+ // Need at least four bytes to identify the command
+ // - up to 3 bytes for the command name + potentially another byte for a leading \n
+
+ return null;
+ }
+
+ int offset;
+
+ if (decoder.workingBuffer[0] == StompDecoder.NEW_LINE)
+ {
+ // Yuck, some badly behaved STOMP clients add a \n *after* the terminating NUL char at the end of the
+ // STOMP
+ // frame this can manifest as an extra \n at the beginning when the next STOMP frame is read - we need to
+ // deal
+ // with this
+ offset = 1;
+ }
+ else
+ {
+ offset = 0;
+ }
+
+ byte b = decoder.workingBuffer[offset];
+
+ switch (b)
+ {
+ case StompDecoder.A:
+ {
+ if (decoder.workingBuffer[offset + 1] == StompDecoder.B)
+ {
+ if (!decoder.tryIncrement(offset + StompDecoder.COMMAND_ABORT_LENGTH + 1))
+ {
+ return null;
+ }
+
+ // ABORT
+ decoder.command = StompDecoder.COMMAND_ABORT;
+ }
+ else
+ {
+ if (!decoder.tryIncrement(offset + StompDecoder.COMMAND_ACK_LENGTH + 1))
+ {
+ return null;
+ }
+
+ // ACK
+ decoder.command = StompDecoder.COMMAND_ACK;
+ }
+ break;
+ }
+ case StompDecoder.B:
+ {
+ if (!decoder.tryIncrement(offset + StompDecoder.COMMAND_BEGIN_LENGTH + 1))
+ {
+ return null;
+ }
+
+ // BEGIN
+ decoder.command = StompDecoder.COMMAND_BEGIN;
+
+ break;
+ }
+ case StompDecoder.C:
+ {
+ if (decoder.workingBuffer[offset + 2] == StompDecoder.M)
+ {
+ if (!decoder.tryIncrement(offset + StompDecoder.COMMAND_COMMIT_LENGTH + 1))
+ {
+ return null;
+ }
+
+ // COMMIT
+ decoder.command = StompDecoder.COMMAND_COMMIT;
+ }
+ /**** added by meddy, 27 april 2011, handle header parser for reply to websocket protocol ****/
+ else if (decoder.workingBuffer[offset+7] == StompDecoder.E)
+ {
+ if (!decoder.tryIncrement(offset + StompDecoder.COMMAND_CONNECTED_LENGTH + 1))
+ {
+ return null;
+ }
+
+ // CONNECTED
+ decoder.command = StompDecoder.COMMAND_CONNECTED;
+ }
+ /**** end ****/
+ else
+ {
+ if (!decoder.tryIncrement(offset + StompDecoder.COMMAND_CONNECT_LENGTH + 1))
+ {
+ return null;
+ }
+
+ // CONNECT
+ decoder.command = StompDecoder.COMMAND_CONNECT;
+ }
+ break;
+ }
+ case StompDecoder.D:
+ {
+ if (!decoder.tryIncrement(offset + StompDecoder.COMMAND_DISCONNECT_LENGTH + 1))
+ {
+ return null;
+ }
+
+ // DISCONNECT
+ decoder.command = StompDecoder.COMMAND_DISCONNECT;
+
+ break;
+ }
+ case StompDecoder.R:
+ {
+ if (!decoder.tryIncrement(offset + StompDecoder.COMMAND_RECEIPT_LENGTH + 1))
+ {
+ return null;
+ }
+
+ // RECEIPT
+ decoder.command = StompDecoder.COMMAND_RECEIPT;
+
+ break;
+ }
+ /**** added by meddy, 27 april 2011, handle header parser for reply to websocket protocol ****/
+ case StompDecoder.E:
+ {
+ if (!decoder.tryIncrement(offset + StompDecoder.COMMAND_ERROR_LENGTH + 1))
+ {
+ return null;
+ }
+
+ // ERROR
+ decoder.command = StompDecoder.COMMAND_ERROR;
+
+ break;
+ }
+ case StompDecoder.M:
+ {
+ if (!decoder.tryIncrement(offset + StompDecoder.COMMAND_MESSAGE_LENGTH + 1))
+ {
+ return null;
+ }
+
+ // MESSAGE
+ decoder.command = StompDecoder.COMMAND_MESSAGE;
+
+ break;
+ }
+ /**** end ****/
+ case StompDecoder.S:
+ {
+ if (decoder.workingBuffer[offset + 1] == StompDecoder.E)
+ {
+ if (!decoder.tryIncrement(offset + StompDecoder.COMMAND_SEND_LENGTH + 1))
+ {
+ return null;
+ }
+
+ // SEND
+ decoder.command = StompDecoder.COMMAND_SEND;
+ }
+ else if (decoder.workingBuffer[offset + 1] == StompDecoder.U)
+ {
+ if (!decoder.tryIncrement(offset + StompDecoder.COMMAND_SUBSCRIBE_LENGTH + 1))
+ {
+ return null;
+ }
+
+ // SUBSCRIBE
+ decoder.command = StompDecoder.COMMAND_SUBSCRIBE;
+ }
+ else
+ {
+ if (!decoder.tryIncrement(offset + StompDecoder.COMMAND_STOMP_LENGTH + 1))
+ {
+ return null;
+ }
+
+ // SUBSCRIBE
+ decoder.command = StompDecoder.COMMAND_STOMP;
+ }
+ break;
+ }
+ case StompDecoder.U:
+ {
+ if (!decoder.tryIncrement(offset + StompDecoder.COMMAND_UNSUBSCRIBE_LENGTH + 1))
+ {
+ return null;
+ }
+
+ // UNSUBSCRIBE
+ decoder.command = StompDecoder.COMMAND_UNSUBSCRIBE;
+
+ break;
+ }
+ case StompDecoder.N:
+ {
+ if (!decoder.tryIncrement(offset + StompDecoder.COMMAND_NACK_LENGTH + 1))
+ {
+ return null;
+ }
+ //NACK
+ decoder.command = StompDecoder.COMMAND_NACK;
+ break;
+ }
+ default:
+ {
+ decoder.throwInvalid();
+ }
+ }
+
+ // Sanity check
+
+ if (decoder.workingBuffer[decoder.pos - 1] != StompDecoder.NEW_LINE)
+ {
+ decoder.throwInvalid();
+ }
+ }
+
+ if (decoder.readingHeaders)
+ {
+ if (decoder.headerBytesCopyStart == -1)
+ {
+ decoder.headerBytesCopyStart = decoder.pos;
+ }
+
+ // Now the headers
+
+ boolean isEscaping = false;
+ SimpleBytes holder = new SimpleBytes(1024);
+
+ outer: while (true)
+ {
+ byte b = decoder.workingBuffer[decoder.pos++];
+
+ switch (b)
+ {
+ //escaping
+ case ESC_CHAR:
+ {
+ if (isEscaping)
+ {
+ //this is a backslash
+ holder.append(b);
+ isEscaping = false;
+ }
+ else
+ {
+ //begin escaping
+ isEscaping = true;
+ }
+ break;
+ }
+ case StompDecoder.HEADER_SEPARATOR:
+ {
+ if (isEscaping)
+ {
+ //a colon
+ holder.append(b);
+ isEscaping = false;
+ }
+ else
+ {
+ if (decoder.inHeaderName)
+ {
+ try
+ {
+ decoder.headerName = holder.getString();
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ throw new HornetQStompException("Encoding exception", e);
+ }
+
+ holder.reset();
+
+ decoder.inHeaderName = false;
+
+ decoder.headerBytesCopyStart = decoder.pos;
+
+ decoder.headerValueWhitespace = true;
+ }
+ }
+
+ decoder.whiteSpaceOnly = false;
+
+ break;
+ }
+ case StompDecoder.NEW_LINE:
+ {
+ if (decoder.whiteSpaceOnly)
+ {
+ // Headers are terminated by a blank line
+ decoder.readingHeaders = false;
+
+ break outer;
+ }
+
+ String headerValue;
+ try
+ {
+ headerValue = holder.getString();
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ throw new HornetQStompException("Encoding exception.", e);
+ }
+ holder.reset();
+
+ decoder.headers.put(decoder.headerName, headerValue);
+
+ if (decoder.headerName.equals(StompDecoder.CONTENT_LENGTH_HEADER_NAME))
+ {
+ decoder.contentLength = Integer.parseInt(headerValue);
+ }
+
+ decoder.whiteSpaceOnly = true;
+
+ decoder.headerBytesCopyStart = decoder.pos;
+
+ decoder.inHeaderName = true;
+
+ decoder.headerValueWhitespace = false;
+
+ break;
+ }
+ default:
+ {
+ decoder.whiteSpaceOnly = false;
+
+ decoder.headerValueWhitespace = false;
+ }
+ }
+ if (decoder.pos == decoder.data)
+ {
+ // Run out of data
+
+ return null;
+ }
+ }
+ }
+
+ // Now the body
+
+ byte[] content = null;
+
+ if (decoder.contentLength != -1)
+ {
+ if (decoder.pos + decoder.contentLength + 1 > decoder.data)
+ {
+ // Need more bytes
+ }
+ else
+ {
+ content = new byte[decoder.contentLength];
+
+ System.arraycopy(decoder.workingBuffer, decoder.pos, content, 0, decoder.contentLength);
+
+ decoder.pos += decoder.contentLength + 1;
+ }
+ }
+ else
+ {
+ // Need to scan for terminating NUL
+
+ if (decoder.bodyStart == -1)
+ {
+ decoder.bodyStart = decoder.pos;
+ }
+
+ while (decoder.pos < decoder.data)
+ {
+ if (decoder.workingBuffer[decoder.pos++] == 0)
+ {
+ content = new byte[decoder.pos - decoder.bodyStart - 1];
+
+ System.arraycopy(decoder.workingBuffer, decoder.bodyStart, content, 0, content.length);
+
+ break;
+ }
+ }
+ }
+
+ if (content != null)
+ {
+ if (decoder.data > decoder.pos)
+ {
+ if (decoder.workingBuffer[decoder.pos] == StompDecoder.NEW_LINE) decoder.pos++;
+
+ if (decoder.data > decoder.pos)
+ // More data still in the buffer from the next packet
+ System.arraycopy(decoder.workingBuffer, decoder.pos, decoder.workingBuffer, 0, decoder.data - decoder.pos);
+ }
+
+ decoder.data = decoder.data - decoder.pos;
+
+ // reset
+
+ StompFrame ret = new StompFrameV11(decoder.command, decoder.headers, content);
+
+ decoder.init();
+
+ return ret;
+ }
+ else
+ {
+ return null;
+ }
+ }
}
Added: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java (rev 0)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java 2011-09-08 15:35:08 UTC (rev 11305)
@@ -0,0 +1,94 @@
+package org.hornetq.core.protocol.stomp.v11;
+
+import java.util.Map;
+
+import org.hornetq.core.protocol.stomp.HornetQStompException;
+import org.hornetq.core.protocol.stomp.StompFrame;
+
+public class StompFrameV11 extends StompFrame
+{
+ public static final char ESC_CHAR = '\\';
+ public static final char COLON = ':';
+
+ public StompFrameV11(String command, Map<String, String> headers, byte[] content)
+ {
+ super(command, headers, content);
+ }
+
+ public StompFrameV11(String command)
+ {
+ super(command);
+ }
+
+ public static String escaping(String rawString) throws HornetQStompException
+ {
+ int len = rawString.length();
+
+ SimpleBytes sb = new SimpleBytes(1024);
+
+ boolean beginEsc = false;
+ for (int i = 0; i < len; i++)
+ {
+ char k = rawString.charAt(i);
+
+ if (k == ESC_CHAR)
+ {
+ if (beginEsc)
+ {
+ //it is a backslash
+ sb.append('\\');
+ beginEsc = false;
+ }
+ else
+ {
+ beginEsc = true;
+ }
+ }
+ else if (k == 'n')
+ {
+ if (beginEsc)
+ {
+ //it is a newline
+ sb.append('\n');
+ beginEsc = false;
+ }
+ else
+ {
+ sb.append(k);
+ }
+ }
+ else if (k == ':')
+ {
+ if (beginEsc)
+ {
+ sb.append(k);
+ beginEsc = false;
+ }
+ else
+ {
+ //error
+ throw new HornetQStompException("Colon not escaped!");
+ }
+ }
+ else
+ {
+ if (beginEsc)
+ {
+ //error, no other escape defined.
+ throw new HornetQStompException("Bad escape char found: " + k);
+ }
+ else
+ {
+ sb.append(k);
+ }
+ }
+ }
+ return sb.toString();
+ }
+
+ public static void main(String[] args)
+ {
+ String rawStr = "hello world\\n\\:"
+ }
+
+}
13 years, 3 months
JBoss hornetq SVN: r11304 - in branches/HORNETQ-720_Replication: hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal and 8 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-09-08 11:06:11 -0400 (Thu, 08 Sep 2011)
New Revision: 11304
Added:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRegistrationMessage.java
Removed:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/HaBackupRegistrationMessage.java
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorInternal.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java
Log:
HORNETQ-720 Clean up of code related to the case (see "HORNETQ-720 XXX" problem markers)
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-09-07 16:10:18 UTC (rev 11303)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-09-08 15:06:11 UTC (rev 11304)
@@ -76,7 +76,7 @@
private final StaticConnector staticConnector = new StaticConnector();
- private Topology topology = new Topology();
+ private final Topology topology = new Topology();
private Pair<TransportConfiguration, TransportConfiguration>[] topologyArray;
@@ -1270,21 +1270,6 @@
}
}
- public synchronized void factoryClosed(final ClientSessionFactory factory)
- {
- factories.remove(factory);
-
- if (factories.isEmpty())
- {
- // Go back to using the broadcast or static list
-
- receivedTopology = false;
-
- topology = null;
-
- }
- }
-
public Topology getTopology()
{
return topology;
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorInternal.java 2011-09-07 16:10:18 UTC (rev 11303)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorInternal.java 2011-09-08 15:06:11 UTC (rev 11304)
@@ -31,8 +31,6 @@
{
void start(Executor executor) throws Exception;
- void factoryClosed(final ClientSessionFactory factory);
-
void setNodeID(String nodeID);
String getNodeID();
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-07 16:10:18 UTC (rev 11303)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-08 15:06:11 UTC (rev 11304)
@@ -347,7 +347,6 @@
}
/**
- * XXX FIXME HORNETQ-720 Method ignores the synchronization of Paging.
* @param replicationManager
* @param pagingManager
* @throws HornetQException
@@ -703,8 +702,6 @@
try
{
// Note that we don't sync, the add reference that comes immediately after will sync if appropriate
-
- // XXX HORNETQ-720
if (message.isLargeMessage())
{
messageJournal.appendAddRecord(message.getMessageID(),
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-09-07 16:10:18 UTC (rev 11303)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-09-08 15:06:11 UTC (rev 11304)
@@ -31,7 +31,7 @@
import org.hornetq.core.protocol.core.ServerSessionPacketHandler;
import org.hornetq.core.protocol.core.impl.ChannelImpl.CHANNEL_ID;
import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.HaBackupRegistrationMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.BackupRegistrationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
import org.hornetq.core.protocol.core.impl.wireformat.Ping;
import org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage;
@@ -146,9 +146,9 @@
server.getClusterManager().notifyNodeUp(msg.getNodeID(), getPair(msg.getConnector(), backup), backup,
true);
}
- else if (packet.getType() == PacketImpl.HA_BACKUP_REGISTRATION)
+ else if (packet.getType() == PacketImpl.BACKUP_REGISTRATION)
{
- HaBackupRegistrationMessage msg = (HaBackupRegistrationMessage)packet;
+ BackupRegistrationMessage msg = (BackupRegistrationMessage)packet;
try
{
server.addHaBackup(rc);
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-09-07 16:10:18 UTC (rev 11303)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-09-08 15:06:11 UTC (rev 11304)
@@ -90,7 +90,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.DisconnectMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.HaBackupRegistrationMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.BackupRegistrationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.HornetQExceptionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
import org.hornetq.core.protocol.core.impl.wireformat.NullResponseMessage;
@@ -521,9 +521,9 @@
packet = new SessionAddMetaDataMessageV2();
break;
}
- case PacketImpl.HA_BACKUP_REGISTRATION:
+ case PacketImpl.BACKUP_REGISTRATION:
{
- packet = new HaBackupRegistrationMessage();
+ packet = new BackupRegistrationMessage();
break;
}
case PacketImpl.REPLICATION_START_STOP_SYNC:
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java 2011-09-07 16:10:18 UTC (rev 11303)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java 2011-09-08 15:06:11 UTC (rev 11304)
@@ -192,8 +192,7 @@
public static final byte SUBSCRIBE_TOPOLOGY = 112;
- /** XXX HORNETQ-720 "HA" is not really used anywhere else. Better name? */
- public static final byte HA_BACKUP_REGISTRATION = 113;
+ public static final byte BACKUP_REGISTRATION = 113;
public static final byte REPLICATION_START_STOP_SYNC = 120;
Copied: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRegistrationMessage.java (from rev 11303, branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/HaBackupRegistrationMessage.java)
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRegistrationMessage.java (rev 0)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRegistrationMessage.java 2011-09-08 15:06:11 UTC (rev 11304)
@@ -0,0 +1,60 @@
+/**
+ *
+ */
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * Registers a backup node with its live server.
+ * <p>
+ * After registration the live server will initiate synchronization of its state with the new backup
+ * node.
+ */
+public class BackupRegistrationMessage extends PacketImpl
+{
+
+ private TransportConfiguration connector;
+
+ private String nodeID;
+
+ public BackupRegistrationMessage(String nodeId, TransportConfiguration tc)
+ {
+ this();
+ connector = tc;
+ nodeID = nodeId;
+ }
+
+ public BackupRegistrationMessage()
+ {
+ super(BACKUP_REGISTRATION);
+ }
+
+ public String getNodeID()
+ {
+ return nodeID;
+ }
+
+ public TransportConfiguration getConnector()
+ {
+ return connector;
+ }
+
+ @Override
+ public void encodeRest(final HornetQBuffer buffer)
+ {
+ buffer.writeString(nodeID);
+ connector.encode(buffer);
+ }
+
+ @Override
+ public void decodeRest(final HornetQBuffer buffer)
+ {
+ nodeID = buffer.readString();
+ connector = new TransportConfiguration();
+ connector.decode(buffer);
+ }
+
+}
Deleted: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/HaBackupRegistrationMessage.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/HaBackupRegistrationMessage.java 2011-09-07 16:10:18 UTC (rev 11303)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/HaBackupRegistrationMessage.java 2011-09-08 15:06:11 UTC (rev 11304)
@@ -1,60 +0,0 @@
-/**
- *
- */
-package org.hornetq.core.protocol.core.impl.wireformat;
-
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.core.protocol.core.impl.PacketImpl;
-
-/**
- * Registers a backup node with its live server.
- * <p>
- * After registration the live server will initiate synchronization of its state with the new backup
- * node.
- */
-public class HaBackupRegistrationMessage extends PacketImpl
-{
-
- private TransportConfiguration connector;
-
- private String nodeID;
-
- public HaBackupRegistrationMessage(String nodeId, TransportConfiguration tc)
- {
- this();
- connector = tc;
- nodeID = nodeId;
- }
-
- public HaBackupRegistrationMessage()
- {
- super(HA_BACKUP_REGISTRATION);
- }
-
- public String getNodeID()
- {
- return nodeID;
- }
-
- public TransportConfiguration getConnector()
- {
- return connector;
- }
-
- @Override
- public void encodeRest(final HornetQBuffer buffer)
- {
- buffer.writeString(nodeID);
- connector.encode(buffer);
- }
-
- @Override
- public void decodeRest(final HornetQBuffer buffer)
- {
- nodeID = buffer.readString();
- connector = new TransportConfiguration();
- connector.decode(buffer);
- }
-
-}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-09-07 16:10:18 UTC (rev 11303)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-09-08 15:06:11 UTC (rev 11304)
@@ -523,26 +523,14 @@
return;
}
- final Journal journalIf = journalsHolder.get(packet.getJournalContentType());
+ final Journal journal = journalsHolder.get(packet.getJournalContentType());
- JournalImpl journal = assertJournalImpl(journalIf);
Map<Long, JournalFile> mapToFill = filesReservedForSync.get(packet.getJournalContentType());
- JournalFile current = journal.createFilesForRemoteSync(packet.getFileIds(), mapToFill);
+ JournalFile current = journal.createFilesForBackupSync(packet.getFileIds(), mapToFill);
registerJournal(packet.getJournalContentType().typeByte,
new FileWrapperJournal(current, storage.hasCallbackSupport()));
}
- // XXX HORNETQ-720 really need to do away with this once the method calls get stable.
- private static JournalImpl assertJournalImpl(final Journal journalIf) throws HornetQException
- {
- if (!(journalIf instanceof JournalImpl))
- {
- throw new HornetQException(HornetQException.INTERNAL_ERROR,
- "Journals of backup server are expected to be JournalImpl");
- }
- return (JournalImpl)journalIf;
- }
-
private void handleLargeMessageEnd(final ReplicationLargeMessageEndMessage packet)
{
LargeServerMessage message = lookupLargeMessage(packet.getMessageId(), true);
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-09-07 16:10:18 UTC (rev 11303)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-09-08 15:06:11 UTC (rev 11304)
@@ -577,7 +577,6 @@
@Override
public void sendSynchronizationDone()
{
- ReplicationStartSyncMessage msg = new ReplicationStartSyncMessage(null, null);
- sendReplicatePacket(msg);
+ sendReplicatePacket(new ReplicationStartSyncMessage(null, null));
}
}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-09-07 16:10:18 UTC (rev 11303)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-09-08 15:06:11 UTC (rev 11304)
@@ -45,7 +45,7 @@
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.protocol.core.Channel;
-import org.hornetq.core.protocol.core.impl.wireformat.HaBackupRegistrationMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.BackupRegistrationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
@@ -480,7 +480,7 @@
"'. backup cannot be announced.");
return;
}
- liveChannel.send(new HaBackupRegistrationMessage(nodeUUID.toString(), connector));
+ liveChannel.send(new BackupRegistrationMessage(nodeUUID.toString(), connector));
}
else
{
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-07 16:10:18 UTC (rev 11303)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-08 15:06:11 UTC (rev 11304)
@@ -570,7 +570,6 @@
Channel pingChannel = liveConnection.getChannel(CHANNEL_ID.PING.id, -1);
Channel replicationChannel = liveConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
- replicationChannel.setHandler(replicationEndpoint);
connectToReplicationEndpoint(replicationChannel);
replicationEndpoint.start();
@@ -1064,10 +1063,6 @@
return session;
}
- /**
- * XXX FIXME to be made private, and method removed from Server interface once HORNETQ-720 is
- * finished.
- */
private synchronized ReplicationEndpoint connectToReplicationEndpoint(final Channel channel) throws Exception
{
if (!configuration.isBackup())
@@ -1076,8 +1071,7 @@
getIdentity());
}
- if (replicationEndpoint == null)
- System.err.println("endpoint is null!");
+ channel.setHandler(replicationEndpoint);
if (replicationEndpoint.getChannel() != null)
{
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java 2011-09-07 16:10:18 UTC (rev 11303)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java 2011-09-08 15:06:11 UTC (rev 11304)
@@ -14,7 +14,9 @@
package org.hornetq.core.journal;
import java.util.List;
+import java.util.Map;
+import org.hornetq.core.journal.impl.JournalFile;
import org.hornetq.core.server.HornetQComponent;
/**
@@ -139,4 +141,19 @@
void runDirectJournalBlast() throws Exception;
+ /**
+ * Reserves journal file IDs, creates the necessary files for synchronization, and places
+ * references to these (reserved for sync) files in the map.
+ * <p>
+ * During the synchronization between a live server and backup, we reserve in the backup the
+ * journal file IDs used in the live server. This call also makes sure the files are created
+ * empty without any kind of headers added.
+ * @param fileIds ids to reserve for synchronization
+ * @param mapToFill map to be filled with id and journal file pairs for <b>synchronization</b>.
+ * @return a new {@link JournalFile} to be used for regular <b>replication</b> during
+ * synchronization
+ * @throws Exception
+ */
+ JournalFile createFilesForBackupSync(long[] fileIds, Map<Long, JournalFile> mapToFill) throws Exception;
+
}
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-09-07 16:10:18 UTC (rev 11303)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-09-08 15:06:11 UTC (rev 11304)
@@ -3106,7 +3106,8 @@
* @return
* @throws Exception
*/
- public JournalFile createFilesForRemoteSync(long[] fileIds, Map<Long, JournalFile> map) throws Exception
+ @Override
+ public JournalFile createFilesForBackupSync(long[] fileIds, Map<Long, JournalFile> map) throws Exception
{
writeLock();
try
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java 2011-09-07 16:10:18 UTC (rev 11303)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java 2011-09-08 15:06:11 UTC (rev 11304)
@@ -58,7 +58,7 @@
@Override
public boolean intercept(Packet packet, RemotingConnection connection) throws HornetQException
{
- if (packet.getType() == PacketImpl.HA_BACKUP_REGISTRATION)
+ if (packet.getType() == PacketImpl.BACKUP_REGISTRATION)
{
try
{
13 years, 3 months
JBoss hornetq SVN: r11303 - in branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core: protocol/core/impl and 3 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-09-07 12:10:18 -0400 (Wed, 07 Sep 2011)
New Revision: 11303
Removed:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationCurrentPagesMessage.java
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
Log:
HORNETQ-720 Remove unnecessary replication package.
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-07 16:09:20 UTC (rev 11302)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-07 16:10:18 UTC (rev 11303)
@@ -467,7 +467,6 @@
// HORNETQ-720 XXX perhaps before? unnecessary?
store.forceAnotherPage();
}
- replicator.sendPagingInfo(info);
return info;
}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-09-07 16:09:20 UTC (rev 11302)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-09-07 16:10:18 UTC (rev 11303)
@@ -102,7 +102,6 @@
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationAddTXMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCommitMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCompareDataMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCurrentPagesMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeingMessage;
@@ -537,11 +536,6 @@
packet = new ReplicationSyncFileMessage();
break;
}
- case PacketImpl.REPLICATION_CURRENT_PAGES_INFO:
- {
- packet = new ReplicationCurrentPagesMessage();
- break;
- }
default:
{
throw new IllegalArgumentException("Invalid type: " + packetType);
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java 2011-09-07 16:09:20 UTC (rev 11302)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java 2011-09-07 16:10:18 UTC (rev 11303)
@@ -196,7 +196,6 @@
public static final byte HA_BACKUP_REGISTRATION = 113;
public static final byte REPLICATION_START_STOP_SYNC = 120;
- public static final byte REPLICATION_CURRENT_PAGES_INFO = 121;
// Static --------------------------------------------------------
Deleted: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationCurrentPagesMessage.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationCurrentPagesMessage.java 2011-09-07 16:09:20 UTC (rev 11302)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationCurrentPagesMessage.java 2011-09-07 16:10:18 UTC (rev 11303)
@@ -1,77 +0,0 @@
-/**
- *
- */
-package org.hornetq.core.protocol.core.impl.wireformat;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.SimpleString;
-import org.hornetq.core.protocol.core.impl.PacketImpl;
-
-public final class ReplicationCurrentPagesMessage extends PacketImpl
-{
-
- private Map<SimpleString, Collection<Integer>> info;
-
- /**
- * @param type
- */
- public ReplicationCurrentPagesMessage()
- {
- super(REPLICATION_CURRENT_PAGES_INFO);
- }
-
- /**
- * @param info
- */
- public ReplicationCurrentPagesMessage(Map<SimpleString, Collection<Integer>> info)
- {
- this();
- this.info = info;
- }
-
- @Override
- public void decodeRest(HornetQBuffer buffer)
- {
- info = new HashMap<SimpleString, Collection<Integer>>();
- int entries = buffer.readInt();
- for (int i = 0; i < entries; i++)
- {
- SimpleString name = buffer.readSimpleString();
- int nPages = buffer.readInt();
- List<Integer> ids = new ArrayList<Integer>(nPages);
- for (int j = 0; j < nPages; j++)
- {
- ids.add(Integer.valueOf(buffer.readInt()));
- }
- info.put(name, ids);
- }
- }
-
- @Override
- public void encodeRest(HornetQBuffer buffer)
- {
- buffer.writeInt(info.size());
- for (Entry<SimpleString, Collection<Integer>> entry : info.entrySet())
- {
- buffer.writeSimpleString(entry.getKey());
- Collection<Integer> value = entry.getValue();
- buffer.writeInt(value.size());
- for (Integer id : value)
- {
- buffer.writeInt(id);
- }
- }
- }
-
- public Map<SimpleString, Collection<Integer>> getInfo()
- {
- return info;
- }
-}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2011-09-07 16:09:20 UTC (rev 11302)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2011-09-07 16:10:18 UTC (rev 11303)
@@ -13,8 +13,6 @@
package org.hornetq.core.replication;
-import java.util.Collection;
-import java.util.Map;
import java.util.Set;
import org.hornetq.api.core.HornetQException;
@@ -117,8 +115,6 @@
*/
void syncLargeMessageFile(SequentialFile seqFile, long size, long id) throws Exception;
- void sendPagingInfo(Map<SimpleString, Collection<Integer>> info);
-
/**
* @param file
* @param id
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-09-07 16:09:20 UTC (rev 11302)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-09-07 16:10:18 UTC (rev 11303)
@@ -14,7 +14,6 @@
package org.hornetq.core.replication.impl;
import java.nio.ByteBuffer;
-import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
@@ -48,7 +47,6 @@
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationAddTXMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCommitMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCompareDataMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCurrentPagesMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeingMessage;
@@ -207,10 +205,6 @@
{
handleReplicationSynchronization((ReplicationSyncFileMessage)packet);
}
- else if (type == PacketImpl.REPLICATION_CURRENT_PAGES_INFO)
- {
- handleCurrentPagesInfo((ReplicationCurrentPagesMessage)packet);
- }
else
{
log.warn("Packet " + packet + " can't be processed by the ReplicationEndpoint");
@@ -230,17 +224,6 @@
channel.send(response);
}
- /**
- * @param packet
- */
- private void handleCurrentPagesInfo(ReplicationCurrentPagesMessage packet)
- {
- for (Entry<SimpleString, Collection<Integer>> entry : packet.getInfo().entrySet())
- {
- // ignore the actual file list for the moment...
- }
- }
-
public boolean isStarted()
{
return started;
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-09-07 16:09:20 UTC (rev 11302)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-09-07 16:10:18 UTC (rev 11303)
@@ -14,9 +14,7 @@
package org.hornetq.core.replication.impl;
import java.nio.ByteBuffer;
-import java.util.Collection;
import java.util.LinkedHashSet;
-import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -44,7 +42,6 @@
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationAddTXMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCommitMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCompareDataMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCurrentPagesMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeingMessage;
@@ -583,10 +580,4 @@
ReplicationStartSyncMessage msg = new ReplicationStartSyncMessage(null, null);
sendReplicatePacket(msg);
}
-
- @Override
- public void sendPagingInfo(Map<SimpleString, Collection<Integer>> info)
- {
- sendReplicatePacket(new ReplicationCurrentPagesMessage(info));
- }
}
13 years, 3 months
JBoss hornetq SVN: r11302 - branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal.
by do-not-reply@jboss.org
Author: borges
Date: 2011-09-07 12:09:20 -0400 (Wed, 07 Sep 2011)
New Revision: 11302
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
Log:
HORNETQ-720 Replicate the deletion of large messages.
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-07 16:08:41 UTC (rev 11301)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-07 16:09:20 UTC (rev 11302)
@@ -464,7 +464,7 @@
PagingStore store = pagingManager.getPageStore(storeName);
List<Integer> ids = new ArrayList<Integer>();
info.put(storeName, store.getCurrentIds());
- // XXX perhaps before? unnecessary?
+ // HORNETQ-720 XXX perhaps before? unnecessary?
store.forceAnotherPage();
}
replicator.sendPagingInfo(info);
@@ -2118,15 +2118,30 @@
// Package protected ---------------------------------------------
// This should be accessed from this package only
- void deleteFile(final SequentialFile file)
+ void deleteLargeMessageFile(final LargeServerMessage largeServerMessageImpl) throws HornetQException
{
+ final SequentialFile file = largeServerMessageImpl.getFile();
+ if (file == null)
+ return;
Runnable deleteAction = new Runnable()
{
public void run()
{
try
{
- file.delete();
+ readLock();
+ try
+ {
+ if (replicator != null)
+ {
+ replicator.largeMessageDelete(largeServerMessageImpl.getMessageID());
+ }
+ file.delete();
+ }
+ finally
+ {
+ readUnLock();
+ }
}
catch (Exception e)
{
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2011-09-07 16:08:41 UTC (rev 11301)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2011-09-07 16:09:20 UTC (rev 11302)
@@ -226,7 +226,7 @@
{
validateFile();
releaseResources();
- storageManager.deleteFile(file);
+ storageManager.deleteLargeMessageFile(this);
}
public boolean isFileExists() throws Exception
13 years, 3 months
JBoss hornetq SVN: r11301 - in branches/HORNETQ-720_Replication: hornetq-core/src/main/java/org/hornetq/core/paging/impl and 2 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-09-07 12:08:41 -0400 (Wed, 07 Sep 2011)
New Revision: 11301
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/PageTransactionInfo.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedPagingFailoverTest.java
branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
clean up
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/PageTransactionInfo.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/PageTransactionInfo.java 2011-09-07 14:16:34 UTC (rev 11300)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/PageTransactionInfo.java 2011-09-07 16:08:41 UTC (rev 11301)
@@ -14,8 +14,8 @@
package org.hornetq.core.paging;
import org.hornetq.core.journal.EncodingSupport;
-import org.hornetq.core.paging.cursor.PageSubscription;
import org.hornetq.core.paging.cursor.PagePosition;
+import org.hornetq.core.paging.cursor.PageSubscription;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.transaction.Transaction;
@@ -26,10 +26,6 @@
*/
public interface PageTransactionInfo extends EncodingSupport
{
- boolean isCommit();
-
- boolean isRollback();
-
void setCommitted(boolean committed);
void commit();
@@ -48,13 +44,9 @@
void reloadUpdate(final StorageManager storageManager, final PagingManager pagingManager, final Transaction tx, final int increment) throws Exception;
- void storeUpdate(StorageManager storageManager, PagingManager pagingManager) throws Exception;
-
// To be used after the update was stored or reload
void onUpdate(int update, StorageManager storageManager, PagingManager pagingManager);
- void increment();
-
void increment(int size);
int getNumberOfMessages();
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2011-09-07 14:16:34 UTC (rev 11300)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2011-09-07 16:08:41 UTC (rev 11301)
@@ -21,14 +21,12 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.Pair;
-import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.paging.cursor.PageSubscription;
import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.core.server.MessageReference;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.TransactionOperationAbstract;
import org.hornetq.core.transaction.TransactionPropertyIndexes;
@@ -57,7 +55,7 @@
private volatile boolean rolledback = false;
- private AtomicInteger numberOfMessages = new AtomicInteger(0);
+ private final AtomicInteger numberOfMessages = new AtomicInteger(0);
private List<Pair<PageSubscription, PagePosition>> lateDeliveries;
@@ -109,11 +107,6 @@
pagingManager.removeTransaction(this.transactionID);
}
}
-
- public void increment()
- {
- numberOfMessages.incrementAndGet();
- }
public void increment(final int size)
{
@@ -207,44 +200,11 @@
return pgtxUpdate;
}
- public void storeUpdate(final StorageManager storageManager, final PagingManager pagingManager) throws Exception
- {
- storageManager.updatePageTransaction(this, 1);
- storageManager.afterCompleteOperations(new IOAsyncTask()
- {
- public void onError(int errorCode, String errorMessage)
- {
- }
-
- public void done()
- {
- PageTransactionInfoImpl.this.onUpdate(1, storageManager, pagingManager);
- }
-
- public List<MessageReference> getRelatedMessageReferences()
- {
- return null;
- }
- });
- }
-
-
-
- public boolean isCommit()
- {
- return committed;
- }
-
public void setCommitted(final boolean committed)
{
this.committed = committed;
}
- public boolean isRollback()
- {
- return rolledback;
- }
-
public synchronized void rollback()
{
rolledback = true;
@@ -260,6 +220,7 @@
}
}
+ @Override
public String toString()
{
return "PageTransactionInfoImpl(transactionID=" + transactionID +
@@ -316,7 +277,7 @@
static class UpdatePageTXOperation extends TransactionOperationAbstract
{
- private HashMap<PageTransactionInfo, AtomicInteger> countsToUpdate = new HashMap<PageTransactionInfo, AtomicInteger>();
+ private final HashMap<PageTransactionInfo, AtomicInteger> countsToUpdate = new HashMap<PageTransactionInfo, AtomicInteger>();
private boolean stored = false;
@@ -348,16 +309,19 @@
counter.addAndGet(increment);
}
+ @Override
public void beforePrepare(Transaction tx) throws Exception
{
storeUpdates(tx);
}
+ @Override
public void beforeCommit(Transaction tx) throws Exception
{
storeUpdates(tx);
}
+ @Override
public void afterCommit(Transaction tx)
{
for (Map.Entry<PageTransactionInfo, AtomicInteger> entry : countsToUpdate.entrySet())
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java 2011-09-07 14:16:34 UTC (rev 11300)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java 2011-09-07 16:08:41 UTC (rev 11301)
@@ -38,8 +38,6 @@
createProducerSendSomeMessages();
startBackupFinishSyncing();
File dir = new File(backupServer.getServer().getConfiguration().getLargeMessagesDirectory());
- System.out.println("Dir " + dir.getAbsolutePath() + " " + dir.exists());
- // Set<Long> idsOnBkp = getAllMessageFileIds(dir);
receiveMsgsInRange(0, n_msgs / 2);
assertEquals("we really ought to delete these after delivery", n_msgs / 2, getAllMessageFileIds(dir).size());
}
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedPagingFailoverTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedPagingFailoverTest.java 2011-09-07 14:16:34 UTC (rev 11300)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedPagingFailoverTest.java 2011-09-07 16:08:41 UTC (rev 11301)
@@ -13,9 +13,6 @@
package org.hornetq.tests.integration.cluster.failover;
-import org.hornetq.core.config.Configuration;
-import org.hornetq.tests.integration.cluster.util.SameProcessHornetQServer;
-import org.hornetq.tests.integration.cluster.util.TestableServer;
/**
* A ReplicatedPagingFailoverTest
Modified: branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2011-09-07 14:16:34 UTC (rev 11300)
+++ branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2011-09-07 16:08:41 UTC (rev 11301)
@@ -91,7 +91,7 @@
for (int i = 0; i < nr1; i++)
{
- trans.increment();
+ trans.increment(1);
}
Assert.assertEquals(nr1, trans.getNumberOfMessages());
13 years, 3 months