Blocking request/response with Netty
Iain McGinniss
iainmcgin at gmail.com
Thu Jul 23 04:54:01 EDT 2009
I think you can avoid the race condition by adding the listener to the
queue before the request is dispatched - logically, the response
cannot arrive before the request is dispatched. Having said this, my
understanding of Netty is still somewhat limited so I don't know if
multiple threads can be handling the pipeline simultaneously, i.e. two
writes could be happening in parallel. In this case, or if you need to
do some post-write processing that must happen before processing the
response, you will need locking.
Iain
On 22 Jul 2009, at 23:38, Tomasz Blachowicz wrote:
>
> Hey Iain,
>
> Thanks a lot for your reply. Blocking while waiting for asynchronous
> response to arrive is what I precisely wanted to do. I don't want to
> wait
> until the frame is send to the wire, this is just a half of the
> story and
> honestly speaking not the more interesting half ;)
>
> If I'm reading your recommended approach right this is something
> that I
> presented in my previous post in sample code. I reckon there is
> however a
> gap in this what you are saying. Here is the thing, the setup of a
> response
> listener and writing to channel must be synchronized. Otherwise
> there is a
> potential situation when the listener may be notified in wrong
> order. This
> is why I used ReentrantLock to make sure that registering the
> listener in a
> queue and asynchronous writing to channel happens together.
>
> Anyone else, any other patterns or idioms?
>
> Cheers,
> Tom
>
>
> Iain McGinniss wrote:
>>
>> The complication in this case is that waiting on the success of the
>> ChannelFuture for write isn't enough - Tomasz also needs to wait for
>> the response corresponding to the request in the write. The situation
>> is potentially further complicated by whether the client wishes to
>> use
>> HTTP pipelining or not. If pipelining is used, it simplifies the
>> ChannelHandlers that need to be written as they just fire stuff out
>> onto the wire without waiting for responses. If pipelining is not
>> used, you need to wait for each response to come back before you can
>> send another request.
>>
>> I would recommend first implementing the asynchronous solution -
>> dispatch an HttpRequest to Netty, and store an event listener in a
>> FIFO queue corresponding to this request. As responses come back in,
>> pop a listener off the queue and notify it that the response has
>> arrived, attaching the HttpResponse object. Wrapping that in a
>> synchronous call is then quite simple - create a new CountdownLatch
>> for each request (with value 1), send the request using the
>> asynchronous interface and give it an event listener that will call
>> countDown() on the latch. Then, call await() on the latch without a
>> timeout if you like. Edge cases exist, for instance of the connection
>> dies before the response comes back you should notify all listeners.
>>
>> Iain
>>
>> On 22 Jul 2009, at 18:19, Michael McGrady wrote:
>>
>>> Just a thought. While Netty is asynchronous, synchronicity is
>>> always
>>> just virtual anyway, cf. TCP/IP. You can build a program using the
>>> ChannelFuture functionality to create your own virtual
>>> synchronicity.
>>> 'Hope this helps.
>>>
>>> How did you implement a Netty synchronous call on the client
>>> side? I
>>> assume that you mean a virtual synchronous call? Netty is
>>> inherently
>>> asynchronous.
>>>
>>> Mike
>>>
>>>
>>> On Jul 22, 2009, at 9:57 AM, Tomasz Blachowicz wrote:
>>>
>>>>
>>>> Hi Trustin,
>>>>
>>>>
>>>> Trustin Lee wrote:
>>>>>
>>>>> In HTTP, the first response corresponds to the first request and
>>>>> the
>>>>> second response corresponds to the second response and so on.
>>>>> Therefore, you could maintain a queue of sent requests, and poll
>>>>> the
>>>>> request from the queue when you receive a response. Then you can
>>>>> match
>>>>> a request and response.
>>>>>
>>>>
>>>> This is all right. However I'm not too sure how you'd implement the
>>>> matching
>>>> of requests and responses. I played a little with Netty recently
>>>> and
>>>> managed
>>>> to implement synchronous call on client side, that works pretty
>>>> well. Here
>>>> is how I have done that. Can you have a look and comment on my
>>>> approach? I
>>>> wonder if there are any other idioms/patterns to implement
>>>> synchronous call
>>>> using Netty.
>>>>
>>>> Please, bear in mind that I still want to use single instance of
>>>> client by
>>>> many threads. Effectively single channel being used by many
>>>> threads.
>>>> Do you
>>>> thing it is correct approach?
>>>>
>>>> Let's assume that we are having the interface of synchronous
>>>> service
>>>> (client):
>>>>
>>>> <code>
>>>> public interface SyncService {
>>>> Response callService(Request request);
>>>> }
>>>> </code>
>>>>
>>>> It doesn't matter what is Request and Response and how these are
>>>> serialized
>>>> and transported to the server and back.
>>>>
>>>> Here is how I implemented the interface:
>>>>
>>>> <code>
>>>> @ChannelPipelineCoverage("one")
>>>> public class SyncServiceImpl extends SimpleChannleUpstreamHAndler
>>>> implements
>>>> SyncService, Closeable {
>>>>
>>>> private final Lock lock = new ReentrantLock();
>>>> private final Queue<Callback> callbacks = new
>>>> ConcurrentLinkedQueue<Callback>();
>>>>
>>>> private Channel channel;
>>>>
>>>> public Response callService(Request request) {
>>>> Callback callback = new Callback();
>>>> lock.lock();
>>>> try {
>>>> callbacks.add(callback);
>>>> channel.write(request);
>>>> } finally {
>>>> lock.unlock();
>>>> }
>>>> return callback.get();
>>>> }
>>>>
>>>> public void messageReceived(ChannelHandlerContext ctx, MessageEvent
>>>> e)
>>>> throws Exception {
>>>> Response response = (Response) e.getMessage();
>>>> callbacks.poll().handle(response);
>>>> }
>>>>
>>>> public void channelOpen(ChannelHandlerContext ctx,
>>>> ChannelStateEvent e)
>>>> throws Exception {
>>>> channel = e.getChannel();
>>>> super.channelOpen(ctx, e);
>>>> }
>>>>
>>>> public void close() throws IOException {
>>>> channel.close().awaitUninterruptibly();
>>>> }
>>>>
>>>> static class Callback {
>>>>
>>>> private final CountDownLatch latch = new CountDownLatch(1);
>>>>
>>>> private Response response;
>>>>
>>>> Response get() {
>>>> try {
>>>> latch.await();
>>>> } catch (InterruptedException e) {
>>>> throw new RuntimeException(e);
>>>> }
>>>> return response;
>>>> }
>>>>
>>>> void handle(Response response) {
>>>> this.response = response;
>>>> latch.countDown();
>>>> }
>>>> }
>>>>
>>>> }
>>>> </code>
>>>>
>>>> How does it look to you? Any ideas how to improve this?
>>>>
>>>> Thanks a lot for any help and piece of discussion!
>>>>
>>>> Regards,
>>>> Tom
>>
>>
>
> --
> View this message in context: http://n2.nabble.com/Blocking-request-response-with-Netty-tp3254813p3306337.html
> Sent from the Netty User Group mailing list archive at Nabble.com.
> _______________________________________________
> netty-users mailing list
> netty-users at lists.jboss.org
> https://lists.jboss.org/mailman/listinfo/netty-users
More information about the netty-users
mailing list