Author: jmesnil
Date: 2010-01-21 09:40:58 -0500 (Thu, 21 Jan 2010)
New Revision: 8825
Added:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/ConnectionEntry.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/ProtocolManager.java
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/replication/ReplicationEndpoint.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0
* sync with the trunk: svn merge -r 8806:8821
https://svn.jboss.org/repos/hornetq/trunk
Copied:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/ConnectionEntry.java
(from rev 8819, trunk/src/main/org/hornetq/core/remoting/server/ConnectionEntry.java)
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/ConnectionEntry.java
(rev 0)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/ConnectionEntry.java 2010-01-21
14:40:58 UTC (rev 8825)
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.server;
+
+import org.hornetq.core.remoting.RemotingConnection;
+
+/**
+ * A ConnectionEntry
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class ConnectionEntry
+{
+ public final RemotingConnection connection;
+
+ public volatile long lastCheck;
+
+ public volatile long ttl;
+
+ public ConnectionEntry(final RemotingConnection connection, final long lastCheck,
final long ttl)
+ {
+ this.connection = connection;
+
+ this.lastCheck = lastCheck;
+
+ this.ttl = ttl;
+ }
+}
Copied:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/ProtocolManager.java
(from rev 8819, trunk/src/main/org/hornetq/core/remoting/server/ProtocolManager.java)
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/ProtocolManager.java
(rev 0)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/ProtocolManager.java 2010-01-21
14:40:58 UTC (rev 8825)
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.server;
+
+import org.hornetq.spi.core.remoting.Connection;
+
+/**
+ * A ProtocolManager
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public interface ProtocolManager
+{
+ ConnectionEntry createConnectionEntry(Connection connection);
+
+}
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-01-21
14:39:29 UTC (rev 8824)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-01-21
14:40:58 UTC (rev 8825)
@@ -26,20 +26,16 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Interceptor;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.remoting.Channel;
-import org.hornetq.core.remoting.ChannelHandler;
-import org.hornetq.core.remoting.Packet;
-import org.hornetq.core.remoting.PacketDecoder;
+import org.hornetq.core.protocol.core.CoreProtocolManager;
+import org.hornetq.core.protocol.core.CoreRemotingConnection;
+import org.hornetq.core.remoting.ProtocolType;
import org.hornetq.core.remoting.RemotingConnection;
-import org.hornetq.core.remoting.impl.RemotingConnectionImpl;
-import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
-import org.hornetq.core.remoting.impl.wireformat.Ping;
+import org.hornetq.core.remoting.server.ConnectionEntry;
+import org.hornetq.core.remoting.server.ProtocolManager;
import org.hornetq.core.remoting.server.RemotingService;
import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.impl.HornetQPacketHandler;
import org.hornetq.core.server.management.ManagementService;
import org.hornetq.integration.transports.netty.ServerHolder;
import org.hornetq.spi.core.remoting.Acceptor;
@@ -90,7 +86,10 @@
private final ScheduledExecutorService scheduledThreadPool;
private FailureCheckAndFlushThread failureCheckAndFlushThread;
-
+
+ private Map<ProtocolType, ProtocolManager> protocolMap =
+ new ConcurrentHashMap<ProtocolType, ProtocolManager>();
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -123,6 +122,8 @@
this.managementService = managementService;
this.threadPool = threadPool;
this.scheduledThreadPool = scheduledThreadPool;
+
+ this.protocolMap.put(ProtocolType.CORE, new CoreProtocolManager(server,
interceptors));
}
// RemotingService implementation -------------------------------
@@ -169,13 +170,14 @@
return server;
}
- public RemotingConnection getRemotingConnection(int connectionID)
+ public CoreRemotingConnection getRemotingConnection(int connectionID)
{
ConnectionEntry conn = connections.get(connectionID);
if (conn != null)
{
- return conn.connection;
+ // FIXME ....
+ return (CoreRemotingConnection)conn.connection;
}
else
{
@@ -192,6 +194,7 @@
if (managementService != null)
{
acceptor.setNotificationService(managementService);
+
managementService.registerAcceptor(acceptor, info);
}
}
@@ -257,17 +260,7 @@
{
RemotingConnection conn = entry.connection;
- Channel channel0 = conn.getChannel(0, -1);
-
- // And we remove all channels from the connection, this ensures no more packets
will be processed after this
- // method is
- // complete
-
- conn.removeAllChannels();
-
- // Now we are 100% sure that no more packets will be processed we can send the
disconnect
-
- channel0.sendAndFlush(new PacketImpl(PacketImpl.DISCONNECT));
+ conn.disconnect();
}
for (Acceptor acceptor : acceptors)
@@ -326,61 +319,32 @@
// ConnectionLifeCycleListener implementation -----------------------------------
- public void connectionCreated(final Connection connection)
+ private ProtocolManager getProtocolManager(ProtocolType protocol)
{
+ return protocolMap.get(protocol);
+ }
+
+ public void connectionCreated(final Connection connection, final ProtocolType
protocol)
+ {
if (server == null)
{
throw new IllegalStateException("Unable to create connection, server
hasn't finished starting up");
}
-
- RemotingConnection rc = new RemotingConnectionImpl(connection,
- interceptors,
-
config.isAsyncConnectionExecutionEnabled() ? server.getExecutorFactory()
-
.getExecutor()
-
: null);
-
- Channel channel1 = rc.getChannel(1, -1);
-
- ChannelHandler handler = createHandler(rc, channel1);
-
- channel1.setHandler(handler);
-
- long ttl = HornetQClient.DEFAULT_CONNECTION_TTL;
-
- if (config.getConnectionTTLOverride() != -1)
+
+ ProtocolManager pmgr = this.getProtocolManager(protocol);
+
+ if (pmgr == null)
{
- ttl = config.getConnectionTTLOverride();
+ throw new IllegalArgumentException("Unknown protocol " + protocol);
}
- final ConnectionEntry entry = new ConnectionEntry(rc, System.currentTimeMillis(),
ttl);
-
+ ConnectionEntry entry = pmgr.createConnectionEntry(connection);
+
connections.put(connection.getID(), entry);
- final Channel channel0 = rc.getChannel(0, -1);
-
- channel0.setHandler(new ChannelHandler()
- {
- public void handlePacket(final Packet packet)
- {
- if (packet.getType() == PacketImpl.PING)
- {
- Ping ping = (Ping)packet;
-
- if (config.getConnectionTTLOverride() == -1)
- {
- // Allow clients to specify connection ttl
- entry.ttl = ping.getConnectionTTL();
- }
-
- // Just send a ping back
- channel0.send(packet);
- }
- }
- });
-
if (config.isBackup())
{
- serverSideReplicatingConnection = rc;
+ serverSideReplicatingConnection = entry.connection;
}
}
@@ -431,49 +395,23 @@
// Protected -----------------------------------------------------
- /**
- * Subclasses (on tests) may use this to create a different channel.
- */
- protected ChannelHandler createHandler(final RemotingConnection rc, final Channel
channel)
- {
- return new HornetQPacketHandler(server, channel, rc);
- }
-
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
private final class DelegatingBufferHandler implements BufferHandler
{
- public void bufferReceived(final Object connectionID, final HornetQBuffer buffer,
final PacketDecoder decoder)
+ public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
{
ConnectionEntry conn = connections.get(connectionID);
if (conn != null)
{
- conn.connection.bufferReceived(connectionID, buffer, decoder);
+ conn.connection.bufferReceived(connectionID, buffer);
}
}
}
- private static final class ConnectionEntry
- {
- final RemotingConnection connection;
-
- volatile long lastCheck;
-
- volatile long ttl;
-
- ConnectionEntry(final RemotingConnection connection, final long lastCheck, final
long ttl)
- {
- this.connection = connection;
-
- this.lastCheck = lastCheck;
-
- this.ttl = ttl;
- }
- }
-
private final class FailureCheckAndFlushThread extends Thread
{
private final long pauseInterval;
@@ -538,11 +476,8 @@
}
if (flush)
- {
- // We flush any confirmations on the connection - this prevents idle
bridges for example
- // sitting there with many unacked messages
-
- conn.flushConfirmations();
+ {
+ conn.flush();
}
}
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/replication/ReplicationEndpoint.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/replication/ReplicationEndpoint.java 2010-01-21
14:39:29 UTC (rev 8824)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/replication/ReplicationEndpoint.java 2010-01-21
14:40:58 UTC (rev 8825)
@@ -15,8 +15,8 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.core.journal.JournalLoadInformation;
-import org.hornetq.core.remoting.Channel;
-import org.hornetq.core.remoting.ChannelHandler;
+import org.hornetq.core.protocol.core.Channel;
+import org.hornetq.core.protocol.core.ChannelHandler;
import org.hornetq.core.server.HornetQComponent;
/**
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2010-01-21
14:39:29 UTC (rev 8824)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2010-01-21
14:40:58 UTC (rev 8825)
@@ -28,24 +28,24 @@
import org.hornetq.core.paging.impl.PagingManagerImpl;
import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
-import org.hornetq.core.remoting.Channel;
-import org.hornetq.core.remoting.Packet;
-import org.hornetq.core.remoting.impl.wireformat.HornetQExceptionMessage;
-import org.hornetq.core.remoting.impl.wireformat.NullResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationAddMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationAddTXMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationCommitMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationCompareDataMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteTXMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageBeingMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageWriteMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationLargemessageEndMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationPageEventMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationPageWriteMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationPrepareMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationResponseMessage;
+import org.hornetq.core.protocol.core.Channel;
+import org.hornetq.core.protocol.core.Packet;
+import org.hornetq.core.protocol.core.PacketImpl;
+import org.hornetq.core.protocol.core.wireformat.HornetQExceptionMessage;
+import org.hornetq.core.protocol.core.wireformat.NullResponseMessage;
+import org.hornetq.core.protocol.core.wireformat.ReplicationAddMessage;
+import org.hornetq.core.protocol.core.wireformat.ReplicationAddTXMessage;
+import org.hornetq.core.protocol.core.wireformat.ReplicationCommitMessage;
+import org.hornetq.core.protocol.core.wireformat.ReplicationCompareDataMessage;
+import org.hornetq.core.protocol.core.wireformat.ReplicationDeleteMessage;
+import org.hornetq.core.protocol.core.wireformat.ReplicationDeleteTXMessage;
+import org.hornetq.core.protocol.core.wireformat.ReplicationLargeMessageBeingMessage;
+import org.hornetq.core.protocol.core.wireformat.ReplicationLargeMessageWriteMessage;
+import org.hornetq.core.protocol.core.wireformat.ReplicationLargemessageEndMessage;
+import org.hornetq.core.protocol.core.wireformat.ReplicationPageEventMessage;
+import org.hornetq.core.protocol.core.wireformat.ReplicationPageWriteMessage;
+import org.hornetq.core.protocol.core.wireformat.ReplicationPrepareMessage;
+import org.hornetq.core.protocol.core.wireformat.ReplicationResponseMessage;
import org.hornetq.core.replication.ReplicationEndpoint;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.LargeServerMessage;
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2010-01-21
14:39:29 UTC (rev 8824)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2010-01-21
14:40:58 UTC (rev 8825)
@@ -29,24 +29,24 @@
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
-import org.hornetq.core.remoting.Channel;
-import org.hornetq.core.remoting.ChannelHandler;
-import org.hornetq.core.remoting.Packet;
-import org.hornetq.core.remoting.RemotingConnection;
-import org.hornetq.core.remoting.impl.wireformat.CreateReplicationSessionMessage;
-import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationAddMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationAddTXMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationCommitMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationCompareDataMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteTXMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageBeingMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageWriteMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationLargemessageEndMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationPageEventMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationPageWriteMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationPrepareMessage;
+import org.hornetq.core.protocol.core.Channel;
+import org.hornetq.core.protocol.core.ChannelHandler;
+import org.hornetq.core.protocol.core.CoreRemotingConnection;
+import org.hornetq.core.protocol.core.Packet;
+import org.hornetq.core.protocol.core.PacketImpl;
+import org.hornetq.core.protocol.core.wireformat.CreateReplicationSessionMessage;
+import org.hornetq.core.protocol.core.wireformat.ReplicationAddMessage;
+import org.hornetq.core.protocol.core.wireformat.ReplicationAddTXMessage;
+import org.hornetq.core.protocol.core.wireformat.ReplicationCommitMessage;
+import org.hornetq.core.protocol.core.wireformat.ReplicationCompareDataMessage;
+import org.hornetq.core.protocol.core.wireformat.ReplicationDeleteMessage;
+import org.hornetq.core.protocol.core.wireformat.ReplicationDeleteTXMessage;
+import org.hornetq.core.protocol.core.wireformat.ReplicationLargeMessageBeingMessage;
+import org.hornetq.core.protocol.core.wireformat.ReplicationLargeMessageWriteMessage;
+import org.hornetq.core.protocol.core.wireformat.ReplicationLargemessageEndMessage;
+import org.hornetq.core.protocol.core.wireformat.ReplicationPageEventMessage;
+import org.hornetq.core.protocol.core.wireformat.ReplicationPageWriteMessage;
+import org.hornetq.core.protocol.core.wireformat.ReplicationPrepareMessage;
import org.hornetq.core.replication.ReplicationManager;
import org.hornetq.utils.ExecutorFactory;
@@ -69,7 +69,7 @@
private final FailoverManager failoverManager;
- private RemotingConnection replicatingConnection;
+ private CoreRemotingConnection replicatingConnection;
private Channel replicatingChannel;