Author: ataylor
Date: 2010-06-29 04:46:10 -0400 (Tue, 29 Jun 2010)
New Revision: 9368
Added:
trunk/examples/core/twitter-connector/
trunk/examples/core/twitter-connector/build.bat
trunk/examples/core/twitter-connector/build.sh
trunk/examples/core/twitter-connector/build.xml
trunk/examples/core/twitter-connector/readme.html
trunk/examples/core/twitter-connector/server0/
trunk/examples/core/twitter-connector/server0/client-jndi.properties
trunk/examples/core/twitter-connector/server0/hornetq-beans.xml
trunk/examples/core/twitter-connector/server0/hornetq-configuration.xml
trunk/examples/core/twitter-connector/server0/hornetq-jms.xml
trunk/examples/core/twitter-connector/server0/hornetq-users.xml
trunk/examples/core/twitter-connector/src/
trunk/examples/core/twitter-connector/src/org/
trunk/examples/core/twitter-connector/src/org/hornetq/
trunk/examples/core/twitter-connector/src/org/hornetq/core/
trunk/examples/core/twitter-connector/src/org/hornetq/core/example/
trunk/examples/core/twitter-connector/src/org/hornetq/core/example/TwitterConnectorExample.java
trunk/licenses/LICENSE_twitter4j.txt
trunk/src/main/org/hornetq/core/config/TwitterConnectorConfiguration.java
trunk/src/main/org/hornetq/core/twitter/
trunk/src/main/org/hornetq/core/twitter/TwitterConnectorService.java
trunk/src/main/org/hornetq/core/twitter/TwitterConstants.java
trunk/src/main/org/hornetq/core/twitter/impl/
trunk/src/main/org/hornetq/core/twitter/impl/TwitterConnectorServiceImpl.java
trunk/tests/src/org/hornetq/tests/integration/twitter/
trunk/tests/src/org/hornetq/tests/integration/twitter/TwitterTest.java
Modified:
trunk/.classpath
trunk/build-hornetq.xml
trunk/examples/common/build.xml
trunk/pom.xml
trunk/src/config/common/schema/hornetq-configuration.xsd
trunk/src/config/jboss-as-4/build.xml
trunk/src/config/jboss-as-5/build.xml
trunk/src/main/org/hornetq/core/config/Configuration.java
trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
trunk/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java
trunk/src/main/org/hornetq/core/server/HornetQServer.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
HORNETQ-189 - Create twitter bridge
Modified: trunk/.classpath
===================================================================
--- trunk/.classpath 2010-06-29 08:37:55 UTC (rev 9367)
+++ trunk/.classpath 2010-06-29 08:46:10 UTC (rev 9368)
@@ -1,6 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry excluding="**/.svn/**/*" kind="src"
path="src/main"/>
+ <classpathentry kind="src"
path="examples/core/twitter-connector/src"/>
<classpathentry kind="src" path="src/config/common"/>
<classpathentry kind="src" path="build/src"/>
<classpathentry kind="src" path="tests/jms-tests/config"/>
@@ -124,5 +125,6 @@
<classpathentry kind="lib"
path="thirdparty/org/jboss/security/lib/jboss-security-spi.jar"/>
<classpathentry kind="lib"
path="thirdparty/wutka-dtdparser/lib/dtdparser121.jar"/>
<classpathentry kind="lib"
path="thirdparty/org/jboss/ejb3/lib/jboss-ejb3-ext-api.jar"/>
+ <classpathentry kind="lib"
path="thirdparty/org/twitter4j/lib/twitter4j-core.jar"/>
<classpathentry kind="output" path="eclipse-output"/>
</classpath>
Modified: trunk/build-hornetq.xml
===================================================================
--- trunk/build-hornetq.xml 2010-06-29 08:37:55 UTC (rev 9367)
+++ trunk/build-hornetq.xml 2010-06-29 08:46:10 UTC (rev 9368)
@@ -78,6 +78,7 @@
<property name="service.sources.sar.name"
value="hornetq-service-sources.sar"/>
<property name="resources.jar.name"
value="hornetq-resources.jar"/>
<property name="resources.sources.jar.name"
value="hornetq-resources-sources.jar"/>
+ <property name="twitter4j.jar.name"
value="twitter4j-core.jar"/>
<!--source and build dirs-->
<property name="build.dir" value="build"/>
@@ -185,6 +186,7 @@
<path id="core.compilation.classpath">
<path refid="org.jboss.netty.classpath"/>
+ <path refid="org.twitter4j.classpath"/>
</path>
<path id="jms.compilation.classpath">
@@ -231,6 +233,7 @@
<path refid="jboss.integration.compilation.classpath"/>
<path refid="bootstrap.compilation.classpath"/>
<path refid="junit.junit.classpath"/>
+ <path refid="org.twitter4j.classpath"/>
<path location="${build.jars.dir}/${ra.jar.name}"/>
<path location="${build.jars.dir}/${jms.jar.name}"/>
<path location="${build.jars.dir}/${jboss.integration.jar.name}"/>
@@ -288,6 +291,7 @@
<!-- we must include Apache commons logging -->
<!-- as a transitive dependency from JBoss TM -->
<path refid="apache.logging.classpath"/>
+ <path refid="org.twitter4j.classpath"/>
</path>
<path id="emma.unit.test.execution.classpath">
@@ -950,6 +954,7 @@
</fileset>
</copy>
<copy file="${org.jboss.netty.lib}/${netty.jar.name}"
tofile="${build.distro.lib.dir}/netty.jar"/>
+ <copy file="${org.twitter4j.lib}/${twitter4j.jar.name}"
tofile="${build.distro.lib.dir}/${twitter4j.jar.name}"/>
<copy todir="${build.distro.config.dir}">
<fileset dir="${src.config.dir}">
<include name="*.xml"/>
Modified: trunk/examples/common/build.xml
===================================================================
--- trunk/examples/common/build.xml 2010-06-29 08:37:55 UTC (rev 9367)
+++ trunk/examples/common/build.xml 2010-06-29 08:46:10 UTC (rev 9368)
@@ -101,6 +101,7 @@
<fileset dir="${jars.dir}">
<include name="org/jboss/naming/lib/jnpserver.jar"/>
<include name="org/jboss/netty/lib/netty*.jar"/>
+ <include name="org/twitter4j/lib/twitter4j*.jar"/>
</fileset>
</path>
Added: trunk/examples/core/twitter-connector/build.bat
===================================================================
--- trunk/examples/core/twitter-connector/build.bat (rev 0)
+++ trunk/examples/core/twitter-connector/build.bat 2010-06-29 08:46:10 UTC (rev 9368)
@@ -0,0 +1,13 @@
+@echo off
+
+set "OVERRIDE_ANT_HOME=..\..\..\tools\ant"
+
+if exist "..\..\..\src\bin\build.bat" (
+ rem running from TRUNK
+ call ..\..\..\src\bin\build.bat %*
+) else (
+ rem running from the distro
+ call ..\..\..\bin\build.bat %*
+)
+
+set "OVERRIDE_ANT_HOME="
Added: trunk/examples/core/twitter-connector/build.sh
===================================================================
--- trunk/examples/core/twitter-connector/build.sh (rev 0)
+++ trunk/examples/core/twitter-connector/build.sh 2010-06-29 08:46:10 UTC (rev 9368)
@@ -0,0 +1,15 @@
+#!/bin/sh -x
+
+OVERRIDE_ANT_HOME=../../../tools/ant
+export OVERRIDE_ANT_HOME
+
+if [ -f "../../../src/bin/build.sh" ]; then
+ # running from TRUNK
+ ../../../src/bin/build.sh "$@"
+else
+ # running from the distro
+ ../../../bin/build.sh "$@"
+fi
+
+
+
Added: trunk/examples/core/twitter-connector/build.xml
===================================================================
--- trunk/examples/core/twitter-connector/build.xml (rev 0)
+++ trunk/examples/core/twitter-connector/build.xml 2010-06-29 08:46:10 UTC (rev 9368)
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE project [
+ <!ENTITY libraries SYSTEM "../../../thirdparty/libraries.ent">
+ ]>
+<!--
+ ~ Copyright 2009 Red Hat, Inc.
+ ~ Red Hat licenses this file to you under the Apache License, version
+ ~ 2.0 (the "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
http://www.apache.org/licenses/LICENSE-2.0
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ ~ implied. See the License for the specific language governing
+ ~ permissions and limitations under the License.
+ -->
+<project default="run" name="HornetQ Twitter Connector Service
Example">
+
+ <import file="../../common/build.xml"/>
+ <property environment='env'/>
+
+ <target name="check" unless="env.TWITTER_USERNAME">
+
<echo>**************************************************************************</echo>
+ <echo>* Please set the twitter account:
*</echo>
+ <echo>* ./build.sh -Denv.TWITTER_USERNAME=user
-Denv.TWITTER_PASSWORD=password *</echo>
+
<echo>**************************************************************************</echo>
+ <fail message="run example failed"/>
+ </target>
+
+ <target name="run" depends="check">
+ <antcall target="runExample">
+ <param name="example.classname"
value="org.hornetq.core.example.TwitterConnectorExample"/>
+ <!-- HTTP proxy settings
+ <param name="server.args"
value="-Dtwitter4j.http.proxyHost=your.proxy.server
-Dtwitter4j.http.proxyPort=your.proxy.port"/>
+ -->
+ <param name="server.args"
value="-Dtwitter.username=${env.TWITTER_USERNAME}
-Dtwitter.password=${env.TWITTER_PASSWORD}"/>
+ </antcall>
+ </target>
+
+ <target name="runRemote">
+ <antcall target="runExample">
+ <param name="example.classname"
value="org.hornetq.core.example.TwitterConnectorExample"/>
+ <param name="hornetq.example.runServer"
value="false"/>
+ </antcall>
+ </target>
+
+</project>
Added: trunk/examples/core/twitter-connector/readme.html
===================================================================
--- trunk/examples/core/twitter-connector/readme.html (rev 0)
+++ trunk/examples/core/twitter-connector/readme.html 2010-06-29 08:46:10 UTC (rev 9368)
@@ -0,0 +1,136 @@
+<html>
+ <head>
+ <title>HornetQ Twitter Connector Service Example</title>
+ <link rel="stylesheet" type="text/css"
href="../../common/common.css" />
+ <link rel="stylesheet" type="text/css"
href="../../common/prettify.css" />
+ <script type="text/javascript"
src="../../common/prettify.js"></script>
+ </head>
+ <body onload="prettyPrint()">
+ <h1>Twitter Connector Service Example</h1>
+
+ <p>This example shows you how to configure HornetQ to use the Twitter
Connector Service.</p>
+
+ <p>HornetQ supports 2 types of Twitter connector, incoming and outgoing.
+ Incoming connector consumes from twitter and forwards to a configurable address.
+ Outgoing connector consumes from a configurable address and forwards to twitter.
+ </p>
+
+ <p>In this example, incoming connector and outgoing conenctor is related to
same twitter account.
+ So if you send a message to an outgoing address, outgoing connector forwards it to
twitter,
+ and then incoming connector consumes it and forwards to incoming address.</p>
+ <p>All you need to do is edit the server0/hornetq-configuration.xml to use
twitter connector.</p>
+
+
+ <pre class="prettyprint">
+ <code>
+ <twitter-connectors>
+ <!-- consumes from twitter and forwards to queue.incomingQueue
-->
+ <incoming-twitter-connector
name="my-incoming-tweets">
+ <queue-name>queue.incomingQueue</queue-name>
+ <twitter-account>
+ <username>${twitter.username}</username>
+ <password>${twitter.password}</password>
+ </twitter-account>
+ <interval-seconds>60</interval-seconds>
+ </incoming-twitter-connector>
+
+ <!-- consumes from queue.outgoingQueue and forwards to twitter
-->
+ <outgoing-twitter-connector
name="my-outgoing-tweets">
+ <queue-name>queue.outgoingQueue</queue-name>
+ <twitter-account>
+ <username>${twitter.username}</username>
+ <password>${twitter.password}</password>
+ </twitter-account>
+ </outgoing-twitter-connector>
+ </twitter-connectors>
+ </code>
+ </pre>
+
+ <h2>Example step-by-step</h2>
+ <p><i>To run the example, simply type
<code>./build.sh</code> (or <code>build.bat</code> on windows)
from this directory.
+ Also please remember to set the twitter user/pass in environment variable Before you
run.</i></p>
+
+ on Linux:
+ <pre class="prettyprint">
+ <code>export TWITTER_USERNAME=your_twitter_username
+export TWITTER_PASSWORD=your_twitter_password
+./build.sh</code>
+ </pre>
+
+ on Windows:
+ <pre class="prettyprint">
+ <code>set TWITTER_USERNAME=your_twitter_username
+set TWITTER_PASSWORD=your_twitter_password
+build.bat</code>
+ </pre>
+ </p>
+
+ <ol>
+ <li>First we need to create a ClientSessionFactory with Netty transport
configuration</li>
+ <pre class="prettyprint">
+ <code>csf = HornetQClient.createClientSessionFactory(new
TransportConfiguration(NettyConnectorFactory.class.getName()));</code>
+ </pre>
+
+ <li>We create a core session with auto-commit mode</li>
+ <pre class="prettyprint">
+ <code>session = csf.createSession(true,true);</code>
+ </pre>
+
+ <li>We Create a core producer for queue.outgoingQueue</li>
+ <pre class="prettyprint">
+ <code>ClientProducer cp =
session.createProducer(OUTGOING_QUEUE);</code>
+ </pre>
+
+ <li>We create a core consumer for queue.incomingQueue</li>
+ <pre class="prettyprint">
+ <code>ClientConsumer cc =
session.createConsumer(INCOMING_QUEUE);</code>
+ </pre>
+
+ <li>We create a core message that we are going to send</li>
+ <pre class="prettyprint">
+ <code>ClientMessage cm =
session.createMessage(org.hornetq.api.core.Message.TEXT_TYPE,true);
+String testMessage = System.currentTimeMillis() + ": twitter connector test
example";
+cm.getBodyBuffer().writeString(testMessage);</code>
+ </pre>
+
+ <li>We send the message to queue.outgoingQueue</li>
+ <pre class="prettyprint">
+ <code>cp.send(cm);</code>
+ </pre>
+
+ <li>We start the session</li>
+ <pre class="prettyprint">
+ <code>session.start();</code>
+ </pre>
+
+ <li>We will receive a message from queue.incomingQueue.
+ Outgoing connector forwards a message(we sent before) to twitter immediately.
+ Since incoming connector consumes from twitter and forwards to
queue.incomingQueue
+ every 60 seconds, It will be received in 60+x seconds.</li>
+ <pre class="prettyprint">
+ <code>ClientMessage received = cc.receive(70 * 1000);
+received.acknowledge();
+String receivedText = received.getBodyBuffer().readString();</code>
+ </pre>
+
+ <li>And finally, remember to close core session and ClientSessionFactory in
a <code>finally</code> block.</li>
+
+ <pre class="prettyprint">
+ <code>finally
+{
+ if(session != null)
+ {
+ session.close();
+ }
+ if(csf != null)
+ {
+ csf.close();
+ }
+}</code>
+ </pre>
+
+
+
+ </ol>
+ </body>
+</html>
Added: trunk/examples/core/twitter-connector/server0/client-jndi.properties
===================================================================
--- trunk/examples/core/twitter-connector/server0/client-jndi.properties
(rev 0)
+++ trunk/examples/core/twitter-connector/server0/client-jndi.properties 2010-06-29
08:46:10 UTC (rev 9368)
@@ -0,0 +1,3 @@
+java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
+java.naming.provider.url=jnp://localhost:1099
+java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces
Added: trunk/examples/core/twitter-connector/server0/hornetq-beans.xml
===================================================================
--- trunk/examples/core/twitter-connector/server0/hornetq-beans.xml
(rev 0)
+++ trunk/examples/core/twitter-connector/server0/hornetq-beans.xml 2010-06-29 08:46:10
UTC (rev 9368)
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<deployment xmlns="urn:jboss:bean-deployer:2.0">
+
+ <bean name="Naming" class="org.jnp.server.NamingBeanImpl"/>
+
+ <!-- JNDI server. Disable this if you don't want JNDI -->
+ <bean name="JNDIServer" class="org.jnp.server.Main">
+ <property name="namingInfo">
+ <inject bean="Naming"/>
+ </property>
+ <property name="port">1099</property>
+ <property name="bindAddress">localhost</property>
+ <property name="rmiPort">1098</property>
+ <property name="rmiBindAddress">localhost</property>
+ </bean>
+
+ <!-- MBean server -->
+ <bean name="MBeanServer"
class="javax.management.MBeanServer">
+ <constructor factoryClass="java.lang.management.ManagementFactory"
+ factoryMethod="getPlatformMBeanServer"/>
+ </bean>
+
+ <!-- The core configuration -->
+ <bean name="Configuration"
class="org.hornetq.core.config.impl.FileConfiguration"/>
+
+ <!-- The security manager -->
+ <bean name="HornetQSecurityManager"
class="org.hornetq.spi.core.security.HornetQSecurityManagerImpl">
+ <start ignored="true"/>
+ <stop ignored="true"/>
+ </bean>
+
+ <!-- The core server -->
+ <bean name="HornetQServer"
class="org.hornetq.core.server.impl.HornetQServerImpl">
+ <constructor>
+ <parameter>
+ <inject bean="Configuration"/>
+ </parameter>
+ <parameter>
+ <inject bean="MBeanServer"/>
+ </parameter>
+ <parameter>
+ <inject bean="HornetQSecurityManager"/>
+ </parameter>
+ </constructor>
+ <start ignored="true"/>
+ <stop ignored="true"/>
+ </bean>
+
+ <!-- The JMS server -->
+ <bean name="JMSServerManager"
class="org.hornetq.jms.server.impl.JMSServerManagerImpl">
+ <constructor>
+ <parameter>
+ <inject bean="HornetQServer"/>
+ </parameter>
+ </constructor>
+ </bean>
+
+</deployment>
Added: trunk/examples/core/twitter-connector/server0/hornetq-configuration.xml
===================================================================
--- trunk/examples/core/twitter-connector/server0/hornetq-configuration.xml
(rev 0)
+++ trunk/examples/core/twitter-connector/server0/hornetq-configuration.xml 2010-06-29
08:46:10 UTC (rev 9368)
@@ -0,0 +1,64 @@
+<configuration xmlns="urn:hornetq"
+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq
/schema/hornetq-configuration.xsd">
+
+ <!-- Connectors -->
+
+ <connectors>
+ <connector name="netty-connector">
+
<factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
+ </connector>
+ </connectors>
+
+ <!-- Acceptors -->
+ <acceptors>
+ <acceptor name="netty-acceptor">
+
<factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
+ </acceptor>
+ </acceptors>
+
+ <!-- Other config -->
+
+ <security-settings>
+ <!--security for example queue-->
+ <security-setting match="queue.incomingQueue">
+ <permission type="consume" roles="guest"/>
+ <permission type="send" roles="guest"/>
+ </security-setting>
+ <security-setting match="queue.outgoingQueue">
+ <permission type="consume" roles="guest"/>
+ <permission type="send" roles="guest"/>
+ </security-setting>
+ </security-settings>
+
+ <queues>
+ <queue name="queue.incomingQueue">
+ <address>queue.incomingQueue</address>
+ </queue>
+ <queue name="queue.outgoingQueue">
+ <address>queue.outgoingQueue</address>
+ </queue>
+ </queues>
+
+ <twitter-connectors>
+ <!-- consumes from twitter and forwards to queue.incomingQueue -->
+ <incoming-twitter-connector name="my-incoming-tweets">
+ <queue-name>queue.incomingQueue</queue-name>
+ <twitter-account>
+ <username>${twitter.username}</username>
+ <password>${twitter.password}</password>
+ </twitter-account>
+ <interval-seconds>60</interval-seconds>
+ </incoming-twitter-connector>
+
+ <!-- consumes from queue.outgoingQueue and forwards to twitter -->
+ <outgoing-twitter-connector name="my-outgoing-tweets">
+ <queue-name>queue.outgoingQueue</queue-name>
+ <twitter-account>
+ <username>${twitter.username}</username>
+ <password>${twitter.password}</password>
+ </twitter-account>
+ </outgoing-twitter-connector>
+ </twitter-connectors>
+
+</configuration>
Added: trunk/examples/core/twitter-connector/server0/hornetq-jms.xml
===================================================================
--- trunk/examples/core/twitter-connector/server0/hornetq-jms.xml
(rev 0)
+++ trunk/examples/core/twitter-connector/server0/hornetq-jms.xml 2010-06-29 08:46:10 UTC
(rev 9368)
@@ -0,0 +1,19 @@
+<configuration xmlns="urn:hornetq"
+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
+ <!--the connection factory used by the example-->
+ <connection-factory name="ConnectionFactory">
+ <connectors>
+ <connector-ref connector-name="netty-connector"/>
+ </connectors>
+ <entries>
+ <entry name="ConnectionFactory"/>
+ </entries>
+ </connection-factory>
+
+ <!--the queue used by the example-->
+ <queue name="exampleQueue">
+ <entry name="/queue/exampleQueue"/>
+ </queue>
+
+</configuration>
\ No newline at end of file
Added: trunk/examples/core/twitter-connector/server0/hornetq-users.xml
===================================================================
--- trunk/examples/core/twitter-connector/server0/hornetq-users.xml
(rev 0)
+++ trunk/examples/core/twitter-connector/server0/hornetq-users.xml 2010-06-29 08:46:10
UTC (rev 9368)
@@ -0,0 +1,7 @@
+<configuration xmlns="urn:hornetq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-users.xsd">
+ <!-- the default user. this is used where username is null-->
+ <defaultuser name="guest" password="guest">
+ <role name="guest"/>
+ </defaultuser>
+</configuration>
\ No newline at end of file
Added:
trunk/examples/core/twitter-connector/src/org/hornetq/core/example/TwitterConnectorExample.java
===================================================================
---
trunk/examples/core/twitter-connector/src/org/hornetq/core/example/TwitterConnectorExample.java
(rev 0)
+++
trunk/examples/core/twitter-connector/src/org/hornetq/core/example/TwitterConnectorExample.java 2010-06-29
08:46:10 UTC (rev 9368)
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.example;
+
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.common.example.HornetQExample;
+import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
+
+/**
+ * A simple example of using twitter connector service.
+ *
+ * @author <a href="tm.igarashi(a)gmail.com">Tomohisa Igarashi</a>
+ */
+public class TwitterConnectorExample extends HornetQExample
+{
+ private static final String INCOMING_QUEUE = "queue.incomingQueue";
+ private static final String OUTGOING_QUEUE = "queue.outgoingQueue";
+
+ public static void main(final String[] args)
+ {
+ new TwitterConnectorExample().run(args);
+ }
+
+ @Override
+ public boolean runExample() throws Exception
+ {
+ ClientSessionFactory csf = null;
+ ClientSession session = null;
+ try
+ {
+ // Step 1. Create a ClientSessionFactory.
+ csf = HornetQClient.createClientSessionFactory (new
TransportConfiguration(NettyConnectorFactory.class.getName()));
+
+ // Step 2. Create a core session.
+ session = csf.createSession(true,true);
+
+ // Step 3. Create a core producer for queue.outgoingQueue.
+ ClientProducer cp = session.createProducer(OUTGOING_QUEUE);
+
+ // Step 4. Create a core consumer for queue.incomingQueue.
+ ClientConsumer cc = session.createConsumer(INCOMING_QUEUE);
+
+ // Step 5. Create a core message.
+ ClientMessage cm =
session.createMessage(org.hornetq.api.core.Message.TEXT_TYPE,true);
+ String testMessage = System.currentTimeMillis() + ": ### Hello, HornetQ
fans!! We are now experiencing so fast, so reliable and so exciting messaging never seen
before ;-) ###";
+ cm.getBodyBuffer().writeString(testMessage);
+
+ // Step 6. Send a message to queue.outgoingQueue.
+ cp.send(cm);
+ System.out.println("#### Sent a message to " + OUTGOING_QUEUE +
": " + testMessage);
+
+ // Step 7. Start the session.
+ session.start();
+
+ // Step 8. Receive a message from queue.incomingQueue.
+ // Outgoing connector forwards a message(sent at Step 6.) to twitter
immediately.
+ // Since incoming connector consumes from twitter and forwards to
queue.incomingQueue
+ // every 60 seconds, It will be received in 60+x seconds.
+ System.out.println("#### A message will be received in 60 seconds. Please
wait...");
+ ClientMessage received = cc.receive(70 * 1000);
+ received.acknowledge();
+ String receivedText = received.getBodyBuffer().readString();
+ System.out.println("#### Received a message from " + INCOMING_QUEUE +
": " + receivedText);
+
+ if(!receivedText.equals(testMessage))
+ {
+ return false;
+ }
+
+ return true;
+ }
+ finally
+ {
+ // Step 9. Be sure to close some resources.
+ if(session != null)
+ {
+ session.close();
+ }
+ if(csf != null)
+ {
+ csf.close();
+ }
+ }
+ }
+
+}
Added: trunk/licenses/LICENSE_twitter4j.txt
===================================================================
--- trunk/licenses/LICENSE_twitter4j.txt (rev 0)
+++ trunk/licenses/LICENSE_twitter4j.txt 2010-06-29 08:46:10 UTC (rev 9368)
@@ -0,0 +1,26 @@
+Twitter4J includes software from
JSON.org to parse JSON response from the Twitter API.
You can see the license term at
http://www.JSON.org/license.html
+
+Copyright (c) 2007-2010, Yusuke Yamamoto
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+ * Redistributions of source code must retain the above copyright
+ notice, this list of conditions and the following disclaimer.
+ * Redistributions in binary form must reproduce the above copyright
+ notice, this list of conditions and the following disclaimer in the
+ documentation and/or other materials provided with the distribution.
+ * Neither the name of the Yusuke Yamamoto nor the
+ names of its contributors may be used to endorse or promote products
+ derived from this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY Yusuke Yamamoto ``AS IS'' AND ANY
+EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL Yusuke Yamamoto BE LIABLE FOR ANY
+DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
Modified: trunk/pom.xml
===================================================================
--- trunk/pom.xml 2010-06-29 08:37:55 UTC (rev 9367)
+++ trunk/pom.xml 2010-06-29 08:46:10 UTC (rev 9368)
@@ -235,6 +235,12 @@
<artifactId>jboss-logging-spi</artifactId>
<version>2.1.0.GA</version>
</dependency>
+ <!--needed to compile twitter support-->
+ <dependency>
+ <groupId>org.twitter4j</groupId>
+ <artifactId>twitter4j-core</artifactId>
+ <version>2.1.2</version>
+ </dependency>
<!-- needed to compile the tests-->
<dependency>
<groupId>junit</groupId>
Modified: trunk/src/config/common/schema/hornetq-configuration.xsd
===================================================================
--- trunk/src/config/common/schema/hornetq-configuration.xsd 2010-06-29 08:37:55 UTC (rev
9367)
+++ trunk/src/config/common/schema/hornetq-configuration.xsd 2010-06-29 08:46:10 UTC (rev
9368)
@@ -180,6 +180,8 @@
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0"
name="address-settings">
</xsd:element>
+ <xsd:element maxOccurs="1" minOccurs="0"
name="twitter-connectors">
+ </xsd:element>
</xsd:all>
</xsd:complexType>
</xsd:element>
@@ -505,5 +507,59 @@
</xsd:restriction>
</xsd:simpleType>
-
+ <xsd:element name="twitter-connectors">
+ <xsd:complexType>
+ <xsd:sequence>
+ <xsd:element maxOccurs="unbounded" minOccurs="0"
name="incoming-twitter-connector">
+ <xsd:complexType>
+ <xsd:all>
+ <xsd:element maxOccurs="1" minOccurs="1"
name="queue-name" type="xsd:string"/>
+ <xsd:element maxOccurs="1" minOccurs="0"
name="interval-seconds" type="xsd:int"/>
+ <xsd:element maxOccurs="1" minOccurs="0"
name="use-streaming">
+ <xsd:simpleType>
+ <xsd:restriction base="xsd:string">
+ <xsd:enumeration value="yes"/>
+ <xsd:enumeration value="no"/>
+ </xsd:restriction>
+ </xsd:simpleType>
+ </xsd:element>
+ <xsd:element maxOccurs="1" minOccurs="1"
name="twitter-account">
+ <xsd:complexType>
+ <xsd:all>
+ <xsd:element maxOccurs="1" minOccurs="1"
name="username" type="xsd:string"/>
+ <xsd:element maxOccurs="1" minOccurs="1"
name="password" type="xsd:string"/>
+ </xsd:all>
+ </xsd:complexType>
+ </xsd:element>
+ </xsd:all>
+ <xsd:attribute name="name" type="xsd:string"
use="required"/>
+ </xsd:complexType>
+ </xsd:element>
+ <xsd:element maxOccurs="unbounded" minOccurs="0"
name="outgoing-twitter-connector">
+ <xsd:complexType>
+ <xsd:all>
+ <xsd:element maxOccurs="1" minOccurs="1"
name="queue-name" type="xsd:string" />
+ <xsd:element maxOccurs="1" minOccurs="0"
name="use-streaming">
+ <xsd:simpleType>
+ <xsd:restriction base="xsd:string">
+ <xsd:enumeration value="yes" />
+ <xsd:enumeration value="no" />
+ </xsd:restriction>
+ </xsd:simpleType>
+ </xsd:element>
+ <xsd:element maxOccurs="1" minOccurs="1"
name="twitter-account">
+ <xsd:complexType>
+ <xsd:all>
+ <xsd:element maxOccurs="1" minOccurs="1"
name="username" type="xsd:string"/>
+ <xsd:element maxOccurs="1" minOccurs="1"
name="password" type="xsd:string"/>
+ </xsd:all>
+ </xsd:complexType>
+ </xsd:element>
+ </xsd:all>
+ <xsd:attribute name="name" type="xsd:string"
use="required"/>
+ </xsd:complexType>
+ </xsd:element>
+ </xsd:sequence>
+ </xsd:complexType>
+ </xsd:element>
</xsd:schema>
Modified: trunk/src/config/jboss-as-4/build.xml
===================================================================
--- trunk/src/config/jboss-as-4/build.xml 2010-06-29 08:37:55 UTC (rev 9367)
+++ trunk/src/config/jboss-as-4/build.xml 2010-06-29 08:46:10 UTC (rev 9368)
@@ -70,6 +70,7 @@
<fileset dir="${lib.dir}">
<include name="hornetq-*.jar"/>
<include name="netty*.jar"/>
+ <include name="twitter4j*.jar"/>
</fileset>
</copy>
Modified: trunk/src/config/jboss-as-5/build.xml
===================================================================
--- trunk/src/config/jboss-as-5/build.xml 2010-06-29 08:37:55 UTC (rev 9367)
+++ trunk/src/config/jboss-as-5/build.xml 2010-06-29 08:46:10 UTC (rev 9368)
@@ -69,6 +69,7 @@
<fileset dir="${lib.dir}">
<include name="hornetq-*.jar"/>
<include name="netty*.jar"/>
+ <include name="twitter4j*.jar"/>
</fileset>
</copy>
<property name="hornetq.sar.dir"
value="${dest.dir}/deploy/hornetq.sar"/>
Modified: trunk/src/main/org/hornetq/core/config/Configuration.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/Configuration.java 2010-06-29 08:37:55 UTC (rev
9367)
+++ trunk/src/main/org/hornetq/core/config/Configuration.java 2010-06-29 08:46:10 UTC (rev
9368)
@@ -822,4 +822,14 @@
*/
Map<String, Set<Role>> getSecurityRoles();
+ /**
+ *
+ * @param
+ */
+ void setTwitterConnectorConfigurations(List<TwitterConnectorConfiguration>
configs);
+ /**
+ *
+ * @return
+ */
+ List<TwitterConnectorConfiguration> getTwitterConnectorConfigurations();
}
Added: trunk/src/main/org/hornetq/core/config/TwitterConnectorConfiguration.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/TwitterConnectorConfiguration.java
(rev 0)
+++ trunk/src/main/org/hornetq/core/config/TwitterConnectorConfiguration.java 2010-06-29
08:46:10 UTC (rev 9368)
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.config;
+
+import java.io.Serializable;
+
+/**
+ * A TwitterConnectorConfiguration
+ *
+ * @author <a href="tm.igarashi(a)gmail.com">Tomohisa Igarashi</a>
+ *
+ *
+ */
+public class TwitterConnectorConfiguration implements Serializable
+{
+ private static final long serialVersionUID = -641207073030767325L;
+
+ private String connectorName = null;
+
+ private boolean isIncoming = false;
+
+ private String userName = null;
+
+ private String password = null;
+
+ private String queueName = null;
+
+ private int intervalSeconds = 0;
+
+ public boolean isIncoming()
+ {
+ return isIncoming;
+ }
+
+ public String getUserName()
+ {
+ return userName;
+ }
+
+ public String getPassword()
+ {
+ return password;
+ }
+
+ public String getQueueName()
+ {
+ return queueName;
+ }
+
+ public int getIntervalSeconds()
+ {
+ return intervalSeconds;
+ }
+
+ public String getConnectorName()
+ {
+ return connectorName;
+ }
+
+ /**
+ * @param isIncoming the isIncoming to set
+ */
+ public void setIncoming(boolean isIncoming)
+ {
+ this.isIncoming = isIncoming;
+ }
+
+ /**
+ * @param userName the userName to set
+ */
+ public void setUserName(String userName)
+ {
+ this.userName = userName;
+ }
+
+ /**
+ * @param password the password to set
+ */
+ public void setPassword(String password)
+ {
+ this.password = password;
+ }
+
+ /**
+ * @param queueName the queueName to set
+ */
+ public void setQueueName(String queueName)
+ {
+ this.queueName = queueName;
+ }
+
+ /**
+ * @param intervalSeconds the intervalSeconds to set
+ */
+ public void setIntervalSeconds(int intervalSeconds)
+ {
+ this.intervalSeconds = intervalSeconds;
+ }
+
+ /**
+ * @param connectorName the connectorName to set
+ */
+ public void setConnectorName(String connectorName)
+ {
+ this.connectorName = connectorName;
+ }
+}
Modified: trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java 2010-06-29 08:37:55
UTC (rev 9367)
+++ trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java 2010-06-29 08:46:10
UTC (rev 9368)
@@ -30,6 +30,7 @@
import org.hornetq.core.config.DiscoveryGroupConfiguration;
import org.hornetq.core.config.DivertConfiguration;
import org.hornetq.core.config.CoreQueueConfiguration;
+import org.hornetq.core.config.TwitterConnectorConfiguration;
import org.hornetq.core.logging.impl.JULLogDelegateFactory;
import org.hornetq.core.security.Role;
import org.hornetq.core.server.JournalType;
@@ -322,6 +323,8 @@
private Map<String, Set<Role>> securitySettings = new HashMap<String,
Set<Role>>();
+ protected List<TwitterConnectorConfiguration> twitterConnectorConfigurations =
new ArrayList<TwitterConnectorConfiguration>();
+
// Public -------------------------------------------------------------------------
public boolean isClustered()
@@ -1316,4 +1319,14 @@
this.securitySettings = securitySettings;
}
+ public List<TwitterConnectorConfiguration> getTwitterConnectorConfigurations()
+ {
+ return this.twitterConnectorConfigurations;
+ }
+
+ public void setTwitterConnectorConfigurations(final
List<TwitterConnectorConfiguration> configs)
+ {
+ this.twitterConnectorConfigurations = configs;
+ }
+
}
Modified: trunk/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java
===================================================================
--- trunk/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2010-06-29
08:37:55 UTC (rev 9367)
+++ trunk/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2010-06-29
08:46:10 UTC (rev 9368)
@@ -34,6 +34,7 @@
import org.hornetq.core.config.DiscoveryGroupConfiguration;
import org.hornetq.core.config.DivertConfiguration;
import org.hornetq.core.config.CoreQueueConfiguration;
+import org.hornetq.core.config.TwitterConnectorConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.config.impl.FileConfiguration;
import org.hornetq.core.config.impl.Validators;
@@ -546,7 +547,25 @@
parseQueues(e, config);
parseSecurity(e, config);
+
+ NodeList incomingTwitterConnectors =
e.getElementsByTagName("incoming-twitter-connector");
+
+ for (int i = 0; i < incomingTwitterConnectors.getLength(); i++)
+ {
+ Element twitterConnectorNode = (Element)incomingTwitterConnectors.item(i);
+ parseTwitterConnector(twitterConnectorNode, config, true);
+ }
+
+ NodeList outgoingTwitterConnectors =
e.getElementsByTagName("outgoing-twitter-connector");
+
+ for (int i = 0; i < outgoingTwitterConnectors.getLength(); i++)
+ {
+ Element twitterConnectorNode = (Element)outgoingTwitterConnectors.item(i);
+
+ parseTwitterConnector(twitterConnectorNode, config, false);
+ }
+
}
/**
@@ -1223,7 +1242,47 @@
mainConfig.getDivertConfigurations().add(config);
}
+
+ private void parseTwitterConnector(final Element connector,
+ final Configuration mainConfig,
+ final boolean isIncoming )
+ {
+ TwitterConnectorConfiguration conf = new TwitterConnectorConfiguration();
+ conf.setIncoming(isIncoming);
+
+ String connectorName = connector.getAttribute("name");
+ conf.setConnectorName(connectorName);
+
+ String queueName = XMLConfigurationUtil.getString(connector,
"queue-name", null, Validators.NOT_NULL_OR_EMPTY);
+ conf.setQueueName(queueName);
+
+ if(isIncoming)
+ {
+ int intervalMinutes = XMLConfigurationUtil.getInteger(connector,
"interval-minutes", 10, Validators.NO_CHECK);
+ conf.setIntervalSeconds(intervalMinutes);
+ }
+
+ NodeList accountInfo =
connector.getElementsByTagName("twitter-account").item(0).getChildNodes();
+ String username = null;
+ String password = null;
+ for(int i=0; i<accountInfo.getLength(); i++)
+ {
+ Node val = accountInfo.item(i);
+ if(val.getNodeName().equals("username"))
+ {
+ username = val.getTextContent();
+ }
+ else if(val.getNodeName().equals("password"))
+ {
+ password = val.getTextContent();
+ }
+ }
+ conf.setUserName(username);
+ conf.setPassword(password);
+ mainConfig.getTwitterConnectorConfigurations().add(conf);
+ }
+
// Inner classes -------------------------------------------------
}
Modified: trunk/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/HornetQServer.java 2010-06-29 08:37:55 UTC (rev
9367)
+++ trunk/src/main/org/hornetq/core/server/HornetQServer.java 2010-06-29 08:46:10 UTC (rev
9368)
@@ -35,6 +35,7 @@
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.core.transaction.ResourceManager;
+import org.hornetq.core.twitter.TwitterConnectorService;
import org.hornetq.core.version.Version;
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.spi.core.protocol.SessionCallback;
@@ -144,7 +145,9 @@
ReplicationManager getReplicationManager();
boolean checkActivate() throws Exception;
-
+
+ TwitterConnectorService getTwitterConnectorService();
+
void deployDivert(DivertConfiguration config) throws Exception;
void destroyDivert(SimpleString name) throws Exception;
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-06-29 08:37:55
UTC (rev 9367)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-06-29 08:46:10
UTC (rev 9368)
@@ -108,6 +108,8 @@
import org.hornetq.core.settings.impl.HierarchicalObjectRepository;
import org.hornetq.core.transaction.ResourceManager;
import org.hornetq.core.transaction.impl.ResourceManagerImpl;
+import org.hornetq.core.twitter.TwitterConnectorService;
+import org.hornetq.core.twitter.impl.TwitterConnectorServiceImpl;
import org.hornetq.core.version.Version;
import org.hornetq.spi.core.logging.LogDelegateFactory;
import org.hornetq.spi.core.protocol.RemotingConnection;
@@ -184,6 +186,8 @@
private volatile RemotingService remotingService;
private volatile ManagementService managementService;
+
+ private volatile TwitterConnectorService twitterService;
private MemoryManager memoryManager;
@@ -314,6 +318,13 @@
// so it can be initialised by the live node
remotingService.start();
+ // start twitter connector service
+ twitterService = new TwitterConnectorServiceImpl(configuration,
+ scheduledPool,
+ storageManager,
+ postOffice);
+ twitterService.start();
+
started = true;
HornetQServerImpl.log.info("HornetQ Server version " +
getVersion().getFullVersion() + " started");
@@ -341,6 +352,8 @@
return;
}
+ twitterService.stop();
+
if (clusterManager != null)
{
clusterManager.stop();
@@ -766,6 +779,12 @@
return replicationManager;
}
+ public TwitterConnectorService getTwitterConnectorService()
+ {
+ return twitterService;
+ }
+
+
// Public
//
---------------------------------------------------------------------------------------
Added: trunk/src/main/org/hornetq/core/twitter/TwitterConnectorService.java
===================================================================
--- trunk/src/main/org/hornetq/core/twitter/TwitterConnectorService.java
(rev 0)
+++ trunk/src/main/org/hornetq/core/twitter/TwitterConnectorService.java 2010-06-29
08:46:10 UTC (rev 9368)
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.twitter;
+
+import org.hornetq.core.server.HornetQComponent;
+/**
+ * A TwitterConnectorService
+ *
+ * @author <a href="tm.igarashi(a)gmail.com">Tomohisa Igarashi</a>
+ *
+ *
+ */
+public interface TwitterConnectorService extends HornetQComponent
+{
+ public int getIncomingConnectorCount();
+
+ public int getOutgoingConnectorCount();
+
+ public boolean isStarted();
+}
Added: trunk/src/main/org/hornetq/core/twitter/TwitterConstants.java
===================================================================
--- trunk/src/main/org/hornetq/core/twitter/TwitterConstants.java
(rev 0)
+++ trunk/src/main/org/hornetq/core/twitter/TwitterConstants.java 2010-06-29 08:46:10 UTC
(rev 9368)
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.twitter;
+
+/**
+ * A TwitterConstants
+ *
+ * @author <a href="tm.igarashi(a)gmail.com">Tomohisa Igarashi</a>
+ */
+public class TwitterConstants
+{
+ public static final String KEY_ID = "id";
+ public static final String KEY_SOURCE = "source";
+ public static final String KEY_CREATED_AT = "createdAt";
+ public static final String KEY_IS_TRUNCATED = "isTruncated";
+ public static final String KEY_IN_REPLY_TO_STATUS_ID = "inReplyToStatusId";
+ public static final String KEY_IN_REPLY_TO_USER_ID = "inReplyToUserId";
+ public static final String KEY_IN_REPLY_TO_SCREEN_NAME =
"inReplyToScreenName";
+ public static final String KEY_IS_FAVORITED = "isFavorited";
+ public static final String KEY_IS_RETWEET = "isRetweet";
+ public static final String KEY_CONTRIBUTORS = "contributors";
+ public static final String KEY_GEO_LOCATION_LATITUDE =
"geoLocation.latitude";
+ public static final String KEY_GEO_LOCATION_LONGITUDE =
"geoLocation.longitude";
+ public static final String KEY_PLACE_ID = "place.id";
+ public static final String KEY_DISPLAY_COODINATES = "displayCoodinates";
+
+ public static final int DEFAULT_POLLING_INTERVAL_SECS = 10;
+ public static final int DEFAULT_PAGE_SIZE = 100;
+ public static final int FIRST_ATTEMPT_PAGE_SIZE = 1;
+ public static final int START_SINCE_ID = 1;
+ public static final int INITIAL_MESSAGE_BUFFER_SIZE = 50;
+}
Added: trunk/src/main/org/hornetq/core/twitter/impl/TwitterConnectorServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/twitter/impl/TwitterConnectorServiceImpl.java
(rev 0)
+++
trunk/src/main/org/hornetq/core/twitter/impl/TwitterConnectorServiceImpl.java 2010-06-29
08:46:10 UTC (rev 9368)
@@ -0,0 +1,524 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.twitter.impl;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import twitter4j.*;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.postoffice.Binding;
+import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.HandleStatus;
+import org.hornetq.core.server.MessageReference;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.Consumer;
+import org.hornetq.core.server.impl.ServerMessageImpl;
+import org.hornetq.core.twitter.TwitterConstants;
+import org.hornetq.core.twitter.TwitterConnectorService;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.TwitterConnectorConfiguration;
+import org.hornetq.core.filter.Filter;
+
+/**
+ * A TwitterConnectorServiceImpl
+ *
+ * @author <a href="tm.igarashi(a)gmail.com">Tomohisa Igarashi</a>
+ *
+ */
+public class TwitterConnectorServiceImpl implements TwitterConnectorService
+{
+ private static final Logger log =
Logger.getLogger(TwitterConnectorServiceImpl.class);
+
+ private volatile boolean isStarted = false;
+
+ private final Configuration config;
+
+ private final ScheduledExecutorService scheduledPool;
+
+ private final StorageManager storageManager;
+
+ private final PostOffice postOffice;
+
+ private final List<IncomingTweetsHandler> incomingHandlers = new
ArrayList<IncomingTweetsHandler>();
+
+ private final HashMap<IncomingTweetsHandler, ScheduledFuture<?>>
futureList = new HashMap<IncomingTweetsHandler,ScheduledFuture<?>>();
+
+ private final List<OutgoingTweetsHandler> outgoingHandlers = new
ArrayList<OutgoingTweetsHandler>();
+
+ public TwitterConnectorServiceImpl(final Configuration config,
+ final ScheduledExecutorService pool,
+ final StorageManager storageManager,
+ final PostOffice postOffice)
+ {
+ this.config = config;
+ this.scheduledPool = pool;
+ this.storageManager = storageManager;
+ this.postOffice = postOffice;
+ }
+
+ public boolean isStarted()
+ {
+ return this.isStarted;
+ }
+
+ public synchronized void start() throws Exception
+ {
+ if (this.isStarted)
+ {
+ return;
+ }
+
+ for (TwitterConnectorConfiguration twitterConf :
this.config.getTwitterConnectorConfigurations())
+ {
+ String connectorName = twitterConf.getConnectorName();
+
+ if (twitterConf.isIncoming())
+ {
+ IncomingTweetsHandler incoming;
+ try
+ {
+ incoming = new IncomingTweetsHandler(connectorName,
+ twitterConf.getUserName(),
+ twitterConf.getPassword(),
+ twitterConf.getQueueName(),
+ twitterConf.getIntervalSeconds(),
+ storageManager,
+ postOffice);
+ incoming.initialize();
+ ScheduledFuture<?> sf =
this.scheduledPool.scheduleWithFixedDelay(incoming,
+
incoming.getIntervalSeconds(),
+
incoming.getIntervalSeconds(),
+
TimeUnit.SECONDS);
+ this.futureList.put(incoming, sf);
+ this.incomingHandlers.add(incoming);
+ }
+ catch(Exception e)
+ {
+ log.warn(connectorName + ": failed to initialize", e);
+ continue;
+ }
+ }
+ else
+ {
+ OutgoingTweetsHandler outgoing;
+ try
+ {
+ outgoing = new OutgoingTweetsHandler(connectorName,
+ twitterConf.getUserName(),
+ twitterConf.getPassword(),
+ twitterConf.getQueueName(),
+ postOffice);
+ outgoing.start();
+ this.outgoingHandlers.add(outgoing);
+ }
+ catch(Exception e)
+ {
+ log.warn(connectorName + ": failed to initialize", e);
+ continue;
+ }
+ }
+
+ log.debug("Initialize twitter connector: [" +
"connector-name=" +
+ connectorName +
+ ", username=" +
+ twitterConf.getUserName() +
+ ", queue-name=" +
+ twitterConf.getQueueName() +
+ ", interval-seconds=" +
+ twitterConf.getIntervalSeconds() +
+ "]");
+ }
+
+ this.isStarted = true;
+ log.debug(this.getClass().getSimpleName() + " started");
+ }
+
+ public synchronized void stop() throws Exception
+ {
+ if (!this.isStarted)
+ {
+ return;
+ }
+
+ for (IncomingTweetsHandler in : this.incomingHandlers)
+ {
+ if (this.futureList.get(in).cancel(true))
+ {
+ this.futureList.remove(in);
+ log.debug(in.getConnectorName() + ": stopped");
+ }
+ else
+ {
+ log.warn(in.getConnectorName() + ": stop failed");
+ }
+ }
+ this.incomingHandlers.clear();
+
+ for (OutgoingTweetsHandler out : this.outgoingHandlers)
+ {
+ try
+ {
+ out.shutdown();
+ }
+ catch(Exception e)
+ {
+ log.warn(e);
+ }
+ }
+ this.outgoingHandlers.clear();
+
+ this.isStarted = false;
+ log.debug(this.getClass().getSimpleName() + " stopped");
+ }
+
+ public int getIncomingConnectorCount()
+ {
+ return incomingHandlers.size();
+ }
+
+ public int getOutgoingConnectorCount()
+ {
+ return outgoingHandlers.size();
+ }
+
+ /**
+ * IncomingTweetsHandler consumes from twitter and forwards to the
+ * configured HornetQ address.
+ */
+ private class IncomingTweetsHandler extends Thread
+ {
+ private final String connectorName;
+
+ private final String userName;
+
+ private final String password;
+
+ private final String queueName;
+
+ private final int intervalSeconds;
+
+ private final StorageManager storageManager;
+
+ private final PostOffice postOffice;
+
+ private final Paging paging = new Paging();
+
+ private Twitter twitter;
+
+ public IncomingTweetsHandler(final String connectorName,
+ final String userName,
+ final String password,
+ final String queueName,
+ final int intervalSeconds,
+ final StorageManager storageManager,
+ final PostOffice postOffice) throws Exception
+ {
+ this.connectorName = connectorName;
+ this.userName = userName;
+ this.password = password;
+ this.queueName = queueName;
+ if (intervalSeconds > 0)
+ {
+ this.intervalSeconds = intervalSeconds;
+ }
+ else
+ {
+ this.intervalSeconds = TwitterConstants.DEFAULT_POLLING_INTERVAL_SECS;
+ }
+ this.storageManager = storageManager;
+ this.postOffice = postOffice;
+ }
+
+ public void initialize() throws Exception
+ {
+ Binding b = postOffice.getBinding(new SimpleString(queueName));
+ if(b == null)
+ {
+ throw new Exception(connectorName + ": queue " + queueName + "
not found");
+ }
+
+ TwitterFactory tf = new TwitterFactory();
+ this.twitter = tf.getInstance(userName, password);
+ this.twitter.verifyCredentials();
+
+ // getting latest ID
+ this.paging.setCount(TwitterConstants.FIRST_ATTEMPT_PAGE_SIZE);
+ ResponseList<Status> res = this.twitter.getHomeTimeline(paging);
+ this.paging.setSinceId(res.get(0).getId());
+ log.debug(connectorName + " initialise(): got latest ID:
"+this.paging.getSinceId());
+
+ // TODO make page size configurable
+ this.paging.setCount(TwitterConstants.DEFAULT_PAGE_SIZE);
+ }
+
+ /**
+ * TODO streaming API support
+ * TODO rate limit support
+ */
+ public void run()
+ {
+ // Avoid cancelling the task with RuntimeException
+ try
+ {
+ poll();
+ }
+ catch(Throwable t)
+ {
+ log.warn(connectorName, t);
+ }
+ }
+
+ public int getIntervalSeconds()
+ {
+ return this.intervalSeconds;
+ }
+
+ public String getConnectorName()
+ {
+ return this.connectorName;
+ }
+
+ private void poll() throws Exception
+ {
+ // get new tweets
+ ResponseList<Status> res = this.twitter.getHomeTimeline(paging);
+
+ if(res == null || res.size() == 0)
+ {
+ return;
+ }
+
+ for (int i = res.size() - 1; i >= 0; i--)
+ {
+ Status status = res.get(i);
+
+ ServerMessage msg = new
ServerMessageImpl(this.storageManager.generateUniqueID(),
+
TwitterConstants.INITIAL_MESSAGE_BUFFER_SIZE);
+ msg.setAddress(new SimpleString(this.queueName));
+ msg.setDurable(true);
+ msg.encodeMessageIDToBuffer();
+
+ putTweetIntoMessage(status, msg);
+
+ this.postOffice.route(msg,false);
+ log.debug(connectorName + ": routed: " + status.toString());
+ }
+
+ this.paging.setSinceId(res.get(0).getId());
+ log.debug(connectorName + ": update latest ID: " +
this.paging.getSinceId());
+ }
+
+ private void putTweetIntoMessage(final Status status, final ServerMessage msg)
+ {
+ msg.getBodyBuffer().writeString(status.getText());
+ msg.putLongProperty(TwitterConstants.KEY_ID, status.getId());
+ msg.putStringProperty(TwitterConstants.KEY_SOURCE, status.getSource());
+
+ msg.putLongProperty(TwitterConstants.KEY_CREATED_AT,
status.getCreatedAt().getTime());
+ msg.putBooleanProperty(TwitterConstants.KEY_IS_TRUNCATED,
status.isTruncated());
+ msg.putLongProperty(TwitterConstants.KEY_IN_REPLY_TO_STATUS_ID,
status.getInReplyToStatusId());
+ msg.putIntProperty(TwitterConstants.KEY_IN_REPLY_TO_USER_ID,
status.getInReplyToUserId());
+ msg.putBooleanProperty(TwitterConstants.KEY_IS_FAVORITED,
status.isFavorited());
+ msg.putBooleanProperty(TwitterConstants.KEY_IS_RETWEET, status.isRetweet());
+ msg.putObjectProperty(TwitterConstants.KEY_CONTRIBUTORS,
status.getContributors());
+ GeoLocation gl;
+ if ((gl = status.getGeoLocation()) != null)
+ {
+ msg.putDoubleProperty(TwitterConstants.KEY_GEO_LOCATION_LATITUDE,
gl.getLatitude());
+ msg.putDoubleProperty(TwitterConstants.KEY_GEO_LOCATION_LONGITUDE,
gl.getLongitude());
+ }
+ Place place;
+ if ((place = status.getPlace()) != null)
+ {
+ msg.putStringProperty(TwitterConstants.KEY_PLACE_ID, place.getId());
+ }
+ }
+ }
+
+ /**
+ * OutgoingTweetsHandler consumes from configured HornetQ address
+ * and forwards to the twitter.
+ */
+ private class OutgoingTweetsHandler implements Consumer
+ {
+ private final String connectorName;
+
+ private final String userName;
+
+ private final String password;
+
+ private final String queueName;
+
+ private final PostOffice postOffice;
+
+ private Twitter twitter = null;
+
+ private Queue queue = null;
+
+ private Filter filter = null;
+
+ private boolean enabled = false;
+
+ public OutgoingTweetsHandler(final String connectorName,
+ final String userName,
+ final String password,
+ final String queueName,
+ final PostOffice postOffice) throws Exception
+ {
+ this.connectorName = connectorName;
+ this.userName = userName;
+ this.password = password;
+ this.queueName = queueName;
+ this.postOffice = postOffice;
+ }
+
+ /**
+ * TODO streaming API support
+ * TODO rate limit support
+ */
+ public synchronized void start() throws Exception
+ {
+ if(this.enabled)
+ {
+ return;
+ }
+
+ if(this.connectorName == null ||
this.connectorName.trim().equals(""))
+ {
+ throw new Exception("invalid connector name: " +
this.connectorName);
+ }
+
+ if(this.queueName == null || this.queueName.trim().equals(""))
+ {
+ throw new Exception("invalid queue name: " + queueName);
+ }
+
+ SimpleString name = new SimpleString(this.queueName);
+ Binding b = this.postOffice.getBinding(name);
+ if(b == null)
+ {
+ throw new Exception(connectorName + ": queue " + queueName + "
not found");
+ }
+ this.queue = (Queue)b.getBindable();
+
+ TwitterFactory tf = new TwitterFactory();
+ this.twitter = tf.getInstance(userName, password);
+ this.twitter.verifyCredentials();
+ // TODO make filter-string configurable
+ // this.filter = FilterImpl.createFilter(filterString);
+ this.filter = null;
+
+ this.queue.addConsumer(this);
+
+ this.queue.deliverAsync();
+ this.enabled = true;
+ log.debug(connectorName + ": started");
+ }
+
+ public synchronized void shutdown() throws Exception
+ {
+ if(!this.enabled)
+ {
+ return;
+ }
+
+ log.debug(connectorName + ": receive shutdown request");
+
+ this.queue.removeConsumer(this);
+
+ this.enabled = false;
+ log.debug(connectorName + ": shutdown");
+ }
+
+ public Filter getFilter()
+ {
+ return filter;
+ }
+
+ public HandleStatus handle(final MessageReference ref) throws Exception
+ {
+ if (filter != null && !filter.match(ref.getMessage()))
+ {
+ return HandleStatus.NO_MATCH;
+ }
+
+ synchronized (this)
+ {
+ ref.handled();
+
+ ServerMessage message = ref.getMessage();
+
+ StatusUpdate status = new
StatusUpdate(message.getBodyBuffer().readString());
+
+ // set optional property
+
+ if(message.containsProperty(TwitterConstants.KEY_IN_REPLY_TO_STATUS_ID))
+ {
+
status.setInReplyToStatusId(message.getLongProperty(TwitterConstants.KEY_IN_REPLY_TO_STATUS_ID));
+ }
+
+ if(message.containsProperty(TwitterConstants.KEY_GEO_LOCATION_LATITUDE))
+ {
+ double geolat =
message.getDoubleProperty(TwitterConstants.KEY_GEO_LOCATION_LATITUDE);
+ double geolong =
message.getDoubleProperty(TwitterConstants.KEY_GEO_LOCATION_LONGITUDE);
+ status.setLocation(new GeoLocation(geolat, geolong));
+ }
+
+ if(message.containsProperty(TwitterConstants.KEY_PLACE_ID))
+ {
+
status.setPlaceId(message.getStringProperty(TwitterConstants.KEY_PLACE_ID));
+ }
+
+ if(message.containsProperty(TwitterConstants.KEY_DISPLAY_COODINATES))
+ {
+
status.setDisplayCoordinates(message.getBooleanProperty(TwitterConstants.KEY_DISPLAY_COODINATES));
+ }
+
+ // send to Twitter
+ try
+ {
+ this.twitter.updateStatus(status);
+ }
+ catch (TwitterException e)
+ {
+ if(e.getStatusCode() == 403 )
+ {
+ // duplicated message
+ log.warn(connectorName + ": HTTP status code = 403: Ignore
duplicated message");
+ queue.acknowledge(ref);
+
+ return HandleStatus.HANDLED;
+ }
+ else
+ {
+ throw e;
+ }
+ }
+
+ queue.acknowledge(ref);
+ log.debug(connectorName + ": forwarded to twitter: " +
message.getMessageID());
+ return HandleStatus.HANDLED;
+ }
+ }
+ }
+}
Added: trunk/tests/src/org/hornetq/tests/integration/twitter/TwitterTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/twitter/TwitterTest.java
(rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/twitter/TwitterTest.java 2010-06-29
08:46:10 UTC (rev 9368)
@@ -0,0 +1,498 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.twitter;
+
+import java.util.HashMap;
+
+import junit.framework.Assert;
+
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.CoreQueueConfiguration;
+import org.hornetq.core.config.TwitterConnectorConfiguration;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.twitter.TwitterConnectorService;
+import org.hornetq.core.twitter.TwitterConstants;
+import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.tests.util.UnitTestCase;
+import twitter4j.*;
+
+/**
+ * A TwitterTest
+ *
+ * @author tm.igarashi(a)gmail.com
+ *
+ *
+ */
+public class TwitterTest extends ServiceTestBase
+{
+ private static final Logger log = Logger.getLogger(TwitterTest.class);
+ private static final String KEY_CONNECTOR_NAME = "connector.name";
+ private static final String KEY_USERNAME = "username";
+ private static final String KEY_PASSWORD = "password";
+ private static final String KEY_QUEUE_NAME = "queue.name";
+
+ private static final String TWITTER_USERNAME =
System.getProperty("twitter.username");
+ private static final String TWITTER_PASSWORD =
System.getProperty("twitter.password");
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ if(TWITTER_USERNAME == null || TWITTER_PASSWORD == null)
+ {
+ throw new Exception("* * * Please set twitter.username and
twitter.password in system property * * *");
+ }
+ super.setUp();
+ }
+
+ // incoming
+
+ public void testSimpleIncoming() throws Exception
+ {
+ internalTestIncoming(true,false);
+ }
+
+ public void testIncomingNoQueue() throws Exception
+ {
+ internalTestIncoming(false,false);
+ }
+
+ public void testIncomingWithRestart() throws Exception
+ {
+ internalTestIncoming(true,true);
+ }
+
+ public void testIncomingWithEmptyConnectorName() throws Exception
+ {
+ HashMap<String,String> params = new HashMap<String,String>();
+ params.put(KEY_CONNECTOR_NAME, "");
+ internalTestIncomingFailedToInitialize(params);
+ }
+
+ public void testIncomingWithEmptyQueueName() throws Exception
+ {
+ HashMap<String,String> params = new HashMap<String,String>();
+ params.put(KEY_QUEUE_NAME, "");
+ internalTestIncomingFailedToInitialize(params);
+ }
+
+ public void testIncomingWithInvalidCredentials() throws Exception
+ {
+ HashMap<String,String> params = new HashMap<String,String>();
+ params.put(KEY_USERNAME, "invalidUsername");
+ params.put(KEY_PASSWORD, "invalidPassword");
+ internalTestIncomingFailedToInitialize(params);
+ }
+
+ //outgoing
+
+ public void testSimpleOutgoing() throws Exception
+ {
+ internalTestOutgoing(true,false);
+ }
+
+ public void testOutgoingNoQueue() throws Exception
+ {
+ internalTestOutgoing(false,false);
+ }
+ public void testOutgoingWithRestart() throws Exception
+ {
+ internalTestOutgoing(true,true);
+ }
+
+ public void testOutgoingWithEmptyConnectorName() throws Exception
+ {
+ HashMap<String,String> params = new HashMap<String,String>();
+ params.put(KEY_CONNECTOR_NAME, "");
+ internalTestOutgoingFailedToInitialize(params);
+ }
+
+ public void testOutgoingWithEmptyQueueName() throws Exception
+ {
+ HashMap<String,String> params = new HashMap<String,String>();
+ params.put(KEY_QUEUE_NAME, "");
+ internalTestOutgoingFailedToInitialize(params);
+ }
+
+ public void testOutgoingWithInvalidCredentials() throws Exception
+ {
+ HashMap<String,String> params = new HashMap<String,String>();
+ params.put(KEY_USERNAME, "invalidUsername");
+ params.put(KEY_PASSWORD, "invalidPassword");
+ internalTestOutgoingFailedToInitialize(params);
+ }
+
+ /**
+ * This will fail until TFJ-347 is fixed.
+ *
http://twitter4j.org/jira/browse/TFJ-347
+ *
+ * @throws Exception
+ */
+ public void _testOutgoingWithInReplyTo() throws Exception
+ {
+ internalTestOutgoingWithInReplyTo();
+ }
+
+ protected void internalTestIncoming(boolean createQueue, boolean restart) throws
Exception
+ {
+ HornetQServer server0 = null;
+ ClientSession session = null;
+ String queue = "TwitterTestQueue";
+ int interval = 5;
+ Twitter twitter = new
TwitterFactory().getInstance(TWITTER_USERNAME,TWITTER_PASSWORD);
+ String testMessage = "TwitterTest/incoming: " +
System.currentTimeMillis();
+ log.debug("test incoming: " + testMessage);
+
+ try
+ {
+ Configuration configuration = createDefaultConfig(false);
+ TwitterConnectorConfiguration inconf = new TwitterConnectorConfiguration();
+ inconf.setConnectorName("test-incoming-connector");
+ inconf.setIncoming(true);
+ inconf.setIntervalSeconds(interval);
+ inconf.setQueueName(queue);
+ inconf.setUserName(TWITTER_USERNAME);
+ inconf.setPassword(TWITTER_PASSWORD);
+ configuration.getTwitterConnectorConfigurations().add(inconf);
+ if(createQueue)
+ {
+ CoreQueueConfiguration qc = new CoreQueueConfiguration(queue, queue, null,
true);
+ configuration.getQueueConfigurations().add(qc);
+ }
+
+ server0 = createServer(false,configuration);
+ server0.start();
+
+ TwitterConnectorService service = server0.getTwitterConnectorService();
+ if(restart)
+ {
+ service.stop();
+ service.start();
+ }
+
+ Assert.assertEquals(0, service.getOutgoingConnectorCount());
+ if(createQueue)
+ {
+ Assert.assertEquals(1, service.getIncomingConnectorCount());
+ }
+ else
+ {
+ Assert.assertEquals(0, service.getIncomingConnectorCount());
+ return;
+ }
+
+ twitter.updateStatus(testMessage);
+
+ TransportConfiguration tpconf = new
TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY);
+ ClientSessionFactory sf = HornetQClient.createClientSessionFactory(tpconf);
+ session = sf.createSession(false, true, true);
+ ClientConsumer consumer = session.createConsumer(queue);
+ session.start();
+ ClientMessage msg = consumer.receive(60*1000);
+
+ Assert.assertNotNull(msg);
+ Assert.assertEquals(testMessage, msg.getBodyBuffer().readString());
+
+ msg.acknowledge();
+ }
+ finally
+ {
+ try
+ {
+ session.close();
+ }
+ catch(Throwable t)
+ {
+ }
+ try
+ {
+ server0.stop();
+ }
+ catch(Throwable ignored)
+ {
+ }
+ }
+ }
+
+ protected void internalTestIncomingFailedToInitialize(HashMap<String,String>
params) throws Exception
+ {
+ HornetQServer server0 = null;
+ String connectorName = "test-incoming-connector";
+ String queue = "TwitterTestQueue";
+ String userName = "invalidUsername";
+ String password = "invalidPassword";
+ int interval = 5;
+
+ if(params.containsKey(KEY_CONNECTOR_NAME))
+ {
+ connectorName = params.get(KEY_CONNECTOR_NAME);
+ }
+ if(params.containsKey(KEY_USERNAME))
+ {
+ userName = params.get(KEY_USERNAME);
+ }
+ if(params.containsKey(KEY_PASSWORD))
+ {
+ password = params.get(KEY_PASSWORD);
+ }
+ if(params.containsKey(KEY_QUEUE_NAME))
+ {
+ queue = params.get(KEY_QUEUE_NAME);
+ }
+
+ try
+ {
+ Configuration configuration = createDefaultConfig(false);
+ TwitterConnectorConfiguration inconf = new TwitterConnectorConfiguration();
+ inconf.setConnectorName(connectorName);
+ inconf.setIncoming(true);
+ inconf.setIntervalSeconds(interval);
+ inconf.setQueueName(queue);
+ inconf.setUserName(userName);
+ inconf.setPassword(password);
+ configuration.getTwitterConnectorConfigurations().add(inconf);
+ CoreQueueConfiguration qc = new CoreQueueConfiguration(queue, queue, null,
true);
+ configuration.getQueueConfigurations().add(qc);
+
+ server0 = createServer(false,configuration);
+ server0.start();
+
+ TwitterConnectorService twitterService = server0.getTwitterConnectorService();
+ Assert.assertEquals(0, twitterService.getIncomingConnectorCount());
+ Assert.assertEquals(0, twitterService.getOutgoingConnectorCount());
+ }
+ finally
+ {
+ try
+ {
+ server0.stop();
+ }
+ catch(Throwable ignored)
+ {
+ }
+ }
+ }
+
+ protected void internalTestOutgoing(boolean createQueue, boolean restart) throws
Exception
+ {
+ HornetQServer server0 = null;
+ ClientSession session = null;
+ String queue = "TwitterTestQueue";
+ Twitter twitter = new
TwitterFactory().getInstance(TWITTER_USERNAME,TWITTER_PASSWORD);
+ String testMessage = "TwitterTest/outgoing: " +
System.currentTimeMillis();
+ log.debug("test outgoing: " + testMessage);
+
+ try
+ {
+ Configuration configuration = createDefaultConfig(false);
+ TwitterConnectorConfiguration outconf = new TwitterConnectorConfiguration();
+ outconf.setConnectorName("test-outgoing-connector");
+ outconf.setIncoming(false);
+ outconf.setQueueName(queue);
+ outconf.setUserName(TWITTER_USERNAME);
+ outconf.setPassword(TWITTER_PASSWORD);
+ configuration.getTwitterConnectorConfigurations().add(outconf);
+ if(createQueue)
+ {
+ CoreQueueConfiguration qc = new CoreQueueConfiguration(queue, queue, null,
false);
+ configuration.getQueueConfigurations().add(qc);
+ }
+
+ server0 = createServer(false,configuration);
+ server0.start();
+
+ TwitterConnectorService service = server0.getTwitterConnectorService();
+ if(restart)
+ {
+ service.stop();
+ service.start();
+ }
+
+ Assert.assertEquals(0, service.getIncomingConnectorCount());
+ if(createQueue)
+ {
+ Assert.assertEquals(1, service.getOutgoingConnectorCount());
+ }
+ else
+ {
+ Assert.assertEquals(0, service.getOutgoingConnectorCount());
+ return;
+ }
+
+ TransportConfiguration tpconf = new
TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY);
+ ClientSessionFactory sf = HornetQClient.createClientSessionFactory(tpconf);
+ session = sf.createSession(false, true, true);
+ ClientProducer producer = session.createProducer(queue);
+ ClientMessage msg = session.createMessage(false);
+ msg.getBodyBuffer().writeString(testMessage);
+ session.start();
+ producer.send(msg);
+
+ Thread.sleep(3000);
+
+ Paging page = new Paging();
+ page.setCount(1);
+ ResponseList<Status> res = twitter.getHomeTimeline(page);
+
+ Assert.assertEquals(testMessage, res.get(0).getText());
+ }
+ finally
+ {
+ try
+ {
+ session.close();
+ }
+ catch(Throwable t)
+ {
+ }
+ try
+ {
+ server0.stop();
+ }
+ catch(Throwable ignored)
+ {
+ }
+ }
+ }
+
+ protected void internalTestOutgoingFailedToInitialize(HashMap<String,String>
params) throws Exception
+ {
+ HornetQServer server0 = null;
+ String connectorName = "test-outgoing-connector";
+ String queue = "TwitterTestQueue";
+ String userName = TWITTER_USERNAME;
+ String password = TWITTER_PASSWORD;
+
+ if(params.containsKey(KEY_CONNECTOR_NAME))
+ {
+ connectorName = params.get(KEY_CONNECTOR_NAME);
+ }
+ if(params.containsKey(KEY_USERNAME))
+ {
+ userName = params.get(KEY_USERNAME);
+ }
+ if(params.containsKey(KEY_PASSWORD))
+ {
+ password = params.get(KEY_PASSWORD);
+ }
+ if(params.containsKey(KEY_QUEUE_NAME))
+ {
+ queue = params.get(KEY_QUEUE_NAME);
+ }
+
+ try
+ {
+ Configuration configuration = createDefaultConfig(false);
+ TwitterConnectorConfiguration outconf = new TwitterConnectorConfiguration();
+ outconf.setConnectorName(connectorName);
+ outconf.setIncoming(false);
+ outconf.setQueueName(queue);
+ outconf.setUserName(userName);
+ outconf.setPassword(password);
+ configuration.getTwitterConnectorConfigurations().add(outconf);
+ CoreQueueConfiguration qc = new CoreQueueConfiguration(queue, queue, null,
false);
+ configuration.getQueueConfigurations().add(qc);
+
+ server0 = createServer(false,configuration);
+ server0.start();
+
+ TwitterConnectorService service = server0.getTwitterConnectorService();
+ Assert.assertEquals(0, service.getIncomingConnectorCount());
+ Assert.assertEquals(0, service.getOutgoingConnectorCount());
+ }
+ finally
+ {
+ try
+ {
+ server0.stop();
+ }
+ catch(Throwable ignored)
+ {
+ }
+ }
+ }
+
+ protected void internalTestOutgoingWithInReplyTo() throws Exception
+ {
+ HornetQServer server0 = null;
+ ClientSession session = null;
+ String queue = "TwitterTestQueue";
+ Twitter twitter = new
TwitterFactory().getInstance(TWITTER_USERNAME,TWITTER_PASSWORD);
+ String testMessage = "TwitterTest/outgoing with in_reply_to: " +
System.currentTimeMillis();
+ String replyMessage = "@" + TWITTER_USERNAME + "
TwitterTest/outgoing reply: " + System.currentTimeMillis();
+ try
+ {
+ Configuration configuration = createDefaultConfig(false);
+ TwitterConnectorConfiguration outconf = new TwitterConnectorConfiguration();
+ outconf.setConnectorName("test-outgoing-with-in-reply-to");
+ outconf.setIncoming(false);
+ outconf.setQueueName(queue);
+ outconf.setUserName(TWITTER_USERNAME);
+ outconf.setPassword(TWITTER_PASSWORD);
+ configuration.getTwitterConnectorConfigurations().add(outconf);
+ CoreQueueConfiguration qc = new CoreQueueConfiguration(queue, queue, null,
false);
+ configuration.getQueueConfigurations().add(qc);
+
+ Status s = twitter.updateStatus(testMessage);
+
+ server0 = createServer(false,configuration);
+ server0.start();
+
+ TransportConfiguration tpconf = new
TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY);
+ ClientSessionFactory sf = HornetQClient.createClientSessionFactory(tpconf);
+ session = sf.createSession(false, true, true);
+ ClientProducer producer = session.createProducer(queue);
+ ClientMessage msg = session.createMessage(false);
+ msg.getBodyBuffer().writeString(replyMessage);
+ msg.putLongProperty(TwitterConstants.KEY_IN_REPLY_TO_STATUS_ID, s.getId());
+ session.start();
+ producer.send(msg);
+
+ Thread.sleep(3000);
+
+ Paging page = new Paging();
+ page.setCount(2);
+ ResponseList<Status> res = twitter.getHomeTimeline(page);
+
+ Assert.assertEquals(testMessage, res.get(1).getText());
+ Assert.assertEquals(-1, res.get(1).getInReplyToStatusId());
+ Assert.assertEquals(replyMessage, res.get(0).getText());
+ Assert.assertEquals(s.getId(), res.get(0).getInReplyToStatusId());
+ }
+ finally
+ {
+ try
+ {
+ session.close();
+ }
+ catch(Throwable t)
+ {
+ }
+ try
+ {
+ server0.stop();
+ }
+ catch(Throwable ignored)
+ {
+ }
+ }
+ }
+}