Author: timfox
Date: 2010-01-20 11:51:03 -0500 (Wed, 20 Jan 2010)
New Revision: 8814
Added:
trunk/src/main/org/hornetq/core/exception/
trunk/src/main/org/hornetq/core/exception/HornetQXAException.java
Modified:
trunk/src/main/org/hornetq/core/remoting/Channel.java
trunk/src/main/org/hornetq/core/remoting/RemotingConnection.java
trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
Log:
added missing class
Added: trunk/src/main/org/hornetq/core/exception/HornetQXAException.java
===================================================================
--- trunk/src/main/org/hornetq/core/exception/HornetQXAException.java
(rev 0)
+++ trunk/src/main/org/hornetq/core/exception/HornetQXAException.java 2010-01-20 16:51:03
UTC (rev 8814)
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.exception;
+
+import javax.transaction.xa.XAException;
+
+/**
+ * A HornetQXAException
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class HornetQXAException extends XAException
+{
+ private static final long serialVersionUID = 6535914602965015803L;
+
+ public HornetQXAException(final int errorCode, final String message)
+ {
+ super(message);
+
+ this.errorCode = errorCode;
+ }
+
+ public HornetQXAException(final int errorCode)
+ {
+ super(errorCode);
+ }
+}
Modified: trunk/src/main/org/hornetq/core/remoting/Channel.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/Channel.java 2010-01-20 15:08:28 UTC (rev
8813)
+++ trunk/src/main/org/hornetq/core/remoting/Channel.java 2010-01-20 16:51:03 UTC (rev
8814)
@@ -17,10 +17,10 @@
import org.hornetq.api.core.HornetQException;
/**
- * A channel is a way of interleaving data meant for different endpoints over the same
{@link org.hornetq.core.remoting.RemotingConnection}.
+ * A channel is a way of interleaving data meant for different endpoints over the same
{@link org.hornetq.core.remoting.CoreRemotingConnection}.
* <p/>
* Any packet sent will have its channel id set to the specific channel sending so it can
be routed to its correct channel
- * when received by the {@link org.hornetq.core.remoting.RemotingConnection}. see {@link
org.hornetq.core.remoting.Packet#setChannelID(long)}.
+ * when received by the {@link org.hornetq.core.remoting.CoreRemotingConnection}. see
{@link org.hornetq.core.remoting.Packet#setChannelID(long)}.
* <p/>
* Each Channel should will forward any packets received to its {@link
org.hornetq.core.remoting.ChannelHandler}.
* <p/>
@@ -81,7 +81,7 @@
*
* @param newConnection the new connection
*/
- void transferConnection(RemotingConnection newConnection);
+ void transferConnection(CoreRemotingConnection newConnection);
/**
* resends any packets that have not received confirmations yet.
@@ -126,7 +126,7 @@
/**
* returns the Remoting Connection being used by the channel
*/
- RemotingConnection getConnection();
+ CoreRemotingConnection getConnection();
/**
* sends a confirmation of a packet being received.
@@ -148,7 +148,7 @@
void flushConfirmations();
/**
- * Called by {@link org.hornetq.core.remoting.RemotingConnection} when a packet is
received.
+ * Called by {@link org.hornetq.core.remoting.CoreRemotingConnection} when a packet is
received.
* <p/>
* This method should then call its {@link org.hornetq.core.remoting.ChannelHandler}
after appropriate processing of
* the packet
Modified: trunk/src/main/org/hornetq/core/remoting/RemotingConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/RemotingConnection.java 2010-01-20 15:08:28
UTC (rev 8813)
+++ trunk/src/main/org/hornetq/core/remoting/RemotingConnection.java 2010-01-20 16:51:03
UTC (rev 8814)
@@ -46,40 +46,6 @@
String getRemoteAddress();
/**
- * return the channel with the channel id specified.
- * <p/>
- * If it does not exist create it with the confirmation window size.
- *
- * @param channelID the channel id
- * @param confWindowSize the confirmation window size
- * @return the channel
- */
- Channel getChannel(long channelID, int confWindowSize);
-
- /**
- * add the channel with the specified channel id
- *
- * @param channelID the channel id
- * @param channel the channel
- */
- void putChannel(long channelID, Channel channel);
-
- /**
- * remove the channel with the specified channel id
- *
- * @param channelID the channel id
- * @return true if removed
- */
- boolean removeChannel(long channelID);
-
- /**
- * generate a unique (within this connection) channel id
- *
- * @return the id
- */
- long generateChannelID();
-
- /**
* add a failure listener.
* <p/>
* The listener will be called in the event of connection failure.
@@ -150,20 +116,6 @@
void destroy();
/**
- * resets the id generator used to when generating id's
- *
- * @param id the first id to set it to
- */
- void syncIDGeneratorSequence(long id);
-
- /**
- * return the next id that will be chosen.
- *
- * @return the id
- */
- long getIDGeneratorSequence();
-
- /**
* return the underlying Connection.
*
* @return the connection
@@ -182,36 +134,7 @@
*
* @return true if destroyed, otherwise false
*/
- boolean isDestroyed();
-
- /**
- * return the current tomeout for blocking calls
- *
- * @return the timeout in milliseconds
- */
- long getBlockingCallTimeout();
-
- /**
- * return the transfer lock used when transferring connections.
- *
- * @return the lock
- */
- Object getTransferLock();
-
- /**
- * returns true if any data has been received since the last time this method was
called.
- *
- * @return true if data has been received.
- */
- boolean checkDataReceived();
-
- /**
- * remove all channels from the remoting connection
- */
- void removeAllChannels();
-
- /**
- * flush all outstanding confirmations onto the connection.
- */
- void flushConfirmations();
+ boolean isDestroyed();
+
+ void disconnect();
}
Modified: trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java 2010-01-20 15:08:28 UTC
(rev 8813)
+++ trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java 2010-01-20 16:51:03 UTC
(rev 8814)
@@ -25,6 +25,7 @@
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.ChannelHandler;
import org.hornetq.core.remoting.CommandConfirmationHandler;
+import org.hornetq.core.remoting.CoreRemotingConnection;
import org.hornetq.core.remoting.Packet;
import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.wireformat.HornetQExceptionMessage;
@@ -52,7 +53,7 @@
private volatile int lastConfirmedCommandID = -1;
- private volatile RemotingConnection connection;
+ private volatile CoreRemotingConnection connection;
private volatile boolean closed;
@@ -76,7 +77,7 @@
private volatile boolean transferring;
- public ChannelImpl(final RemotingConnection connection, final long id, final int
confWindowSize)
+ public ChannelImpl(final CoreRemotingConnection connection, final long id, final int
confWindowSize)
{
this.connection = connection;
@@ -316,7 +317,7 @@
closed = true;
}
- public void transferConnection(final RemotingConnection newConnection)
+ public void transferConnection(final CoreRemotingConnection newConnection)
{
// Needs to synchronize on the connection to make sure no packets from
// the old connection get processed after transfer has occurred
@@ -326,7 +327,7 @@
// And switch it
- final RemotingConnectionImpl rnewConnection =
(RemotingConnectionImpl)newConnection;
+ final CoreRemotingConnection rnewConnection =
(CoreRemotingConnection)newConnection;
rnewConnection.putChannel(id, this);
@@ -369,7 +370,7 @@
lock.unlock();
}
- public RemotingConnection getConnection()
+ public CoreRemotingConnection getConnection()
{
return connection;
}
Modified: trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java 2010-01-20
15:08:28 UTC (rev 8813)
+++ trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java 2010-01-20
16:51:03 UTC (rev 8814)
@@ -26,9 +26,9 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.CloseListener;
+import org.hornetq.core.remoting.CoreRemotingConnection;
import org.hornetq.core.remoting.FailureListener;
import org.hornetq.core.remoting.Packet;
-import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.utils.SimpleIDGenerator;
@@ -38,7 +38,7 @@
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
* @version <tt>$Revision$</tt> $Id$
*/
-public class RemotingConnectionImpl extends AbstractBufferHandler implements
RemotingConnection
+public class RemotingConnectionImpl extends AbstractBufferHandler implements
CoreRemotingConnection
{
// Constants
//
------------------------------------------------------------------------------------
@@ -84,6 +84,8 @@
private volatile boolean dataReceived;
private final Executor executor;
+
+ private volatile boolean executing;
// Constructors
// ---------------------------------------------------------------------------------
@@ -275,7 +277,22 @@
callClosingListeners();
}
+
+ public void disconnect()
+ {
+ Channel channel0 = getChannel(0, -1);
+ // And we remove all channels from the connection, this ensures no more packets
will be processed after this
+ // method is
+ // complete
+
+ removeAllChannels();
+
+ // Now we are 100% sure that no more packets will be processed we can send the
disconnect
+
+ channel0.sendAndFlush(new PacketImpl(PacketImpl.DISCONNECT));
+ }
+
public long generateChannelID()
{
return idGenerator.generateID();
@@ -325,16 +342,6 @@
return res;
}
- public void removeAllChannels()
- {
- // We get the transfer lock first - this ensures no packets are being processed
AND
- // it's guaranteed no more packets will be processed once this method is
complete
- synchronized (transferLock)
- {
- channels.clear();
- }
- }
-
public void flushConfirmations()
{
synchronized (transferLock)
@@ -349,8 +356,6 @@
// Buffer Handler implementation
// ----------------------------------------------------
- private volatile boolean executing;
-
public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
{
final Packet packet = decoder.decode(buffer);
@@ -434,6 +439,15 @@
// Private
//
--------------------------------------------------------------------------------------
+ private void removeAllChannels()
+ {
+ // We get the transfer lock first - this ensures no packets are being processed
AND
+ // it's guaranteed no more packets will be processed once this method is
complete
+ synchronized (transferLock)
+ {
+ channels.clear();
+ }
+ }
private void callFailureListeners(final HornetQException me)
{
final List<FailureListener> listenersClone = new
ArrayList<FailureListener>(failureListeners);
Modified: trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
---
trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-01-20
15:08:28 UTC (rev 8813)
+++
trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-01-20
16:51:03 UTC (rev 8814)
@@ -170,6 +170,7 @@
if (managementService != null)
{
acceptor.setNotificationService(managementService);
+
managementService.registerAcceptor(acceptor, info);
}
}
@@ -235,17 +236,7 @@
{
RemotingConnection conn = entry.connection;
- Channel channel0 = conn.getChannel(0, -1);
-
- // And we remove all channels from the connection, this ensures no more packets
will be processed after this
- // method is
- // complete
-
- conn.removeAllChannels();
-
- // Now we are 100% sure that no more packets will be processed we can send the
disconnect
-
- channel0.sendAndFlush(new PacketImpl(PacketImpl.DISCONNECT));
+ conn.disconnect();
}
for (Acceptor acceptor : acceptors)