Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for stream_id #104

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

drolando
Copy link
Contributor

2 main changes:

  • allow setting an optional stream_id in the request
  • if stream_id is set, all responses with a different id will be dropped

The issue we're hitting right now is that we set a low timeout (increasing it is not an option), which makes some of our queries timeout. AFAICT there's the possibility of a race condition where the first client times out, immediately after a second client sends a different request using the same connection and ends up reading the old response for client1. The way to avoid this is to set and verify the stream_id and ignore any non-matching message.


NOTES

The logic to assign an id to a request has to be protected with a mutex to avoid 2 requests ending up with the same id. Since the code outside lib/resty/cassandra cannot depend on openresty packages, I had to generate the id in cluster.lua and propagate it to lib/cassandra/cql.lua:build_frame.

The response read code in lib/cassandra/init.lua:send is now inside an infinite loop. We'll keep reading until we find a response with the right id. Or until the timeout expires.

lib/cassandra/init.lua and lib/resty/cassandra are not unit tested at all, so I haven't added any new test for my change.

Also the integration tests fail on my macbook: cassandra refuses to start with Can not set double field org.apache.cassandra.config.Config.commitlog_sync_batch_window_in_ms to null value.
I'll let travis run them.

@@ -550,6 +551,15 @@ function _Cluster:refresh()
-- initiate the load balancing policy
self.lb_policy:init(peers)

if self.stream_ids == nil then
-- Initialize the list of available seed ids (only once)
self.stream_ids = {}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just use a counter for stream_id? It's less computationally expensive and you'll end up writing much less code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, I could use a counter and re-initialize it when we get over 2^7, however that gives us less features than this implementation.

Having a list of available ids and removing and reinserting them means only non currently used ids will be available to get. With a counter, there's no way to keep track of which ids are still being used. This might not be a big deal in the protocol v3 where we can use 32767 ids, but might be an issue with protocol v2 since we only have 127 available ids there.

The list of ids also gives us the advantage that we'll always be using the least recently used (released) id since they're added and removed in order.

This logic is similar to what's done in the python datastax driver: https://github.com/datastax/python-driver/blob/6bec6fd7e852ae33e12cf475b030d08260b04240/cassandra/connection.py#L286

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that using the list implementation provides more features than using the counter.
I'm just concerned about the performance impact of inserting into the front.
From the official docs: "The table.insert function inserts an element in a given position of an array, moving up other elements to open space." Making it seem like an O(N) operation.

Also one thing to note is that this doesn't perfectly solve the issue of getting old responses. It seems to me that the stream_id will be released after a timeout, allowing another request to pick up that same stream_id and connection, resulting in the new request receiving the old response.
It's definitely a small probability of happening but the same can be said of a counter.

If Lua can insert in O(1) time to the front, then there shouldn't be a problem. But if it doesn't, it might be useful to think of implementing it using the counter, which is better than the current version (stream_id = 0) but not as robust as the list version.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, inserting at the beginning of an array as I'm doing takes O(N) but according to the docs is implemented in C and it's very fast.

I run get_stream_id and release_stream_id in a loop 10,000 times and I saw that each call took on average:
get_stream_id: 2.4 usec
release_stream_id: 54usec

Another possibility is to use a linked list instead of an array, which has the same features as the current implementation, O(1) get and put but uses a bit more memory.

I'm not completely against the counter implementation though since we only use the v3 protocol so it should work fine for us, less for people on v2

@thibaultcha
Copy link
Owner

@drolando Interesting, thanks for the PR! I have given it a look and will have some comments, but will need a couple days to post them. Are you already using this patch in production? And if so, have you noticed any performance impact (at least just from a QPS point of view)?

@thibaultcha
Copy link
Owner

@drolando Sorry for the delay! I really need some time on my hands to be able to look at this, I hope to get there soon since I finally got another library out of the door and now have more time to dedicate on this lib.

@drolando
Copy link
Contributor Author

np. Fwiw we've been running this patch in production for a month now and didn't have any issue.

@thibaultcha
Copy link
Owner

thibaultcha commented Aug 29, 2017

Hi @drolando,

Here are the concerns that have been on my mind but that I haven't had time to put in writing until now.

As far as I can tell, this logic will not prevent two workers from using similar stream ids at the same time. Sure, the mutex prevents two workers from choosing a stream_id at the same time. But since each worker operates in its own Lua VM, each worker has a different self.stream_ids table. And that is where the flaw is. Example:

Consider 2 workers with the following self.stream_ids arrays:

w1: [126, 127]
w2: [126, 127]

Now, if both workers process a query at the same time, both workers run get_stream_id():

w1: lock('stream_id')
    w2: lock('stream_id') -- sleep
w1: table.remove(self.stream_ids) -- returns 126
w1: unlock('stream_id')
    w2: table.remove(self.stream_ids) -- returns 126

We now have two workers using 126 at the same time as the stream_id for their request.

Additionally, the mutex logic does not prevent more than two workers from choosing stream ids at the same time. Example:

Consider 3 workers w1, w2, and w3:

w1: lock('stream_id')
    w2: lock('stream_id') -- sleep
    w3: lock('stream_id') -- sleep
w1: table.remove(self.stream_ids)
w1: unlock('stream_id') -- unlocks both w2 and w3
    w2: table.remove(self.stream_ids) + w3: table.remove(self.stream_ids) -- concurrently executed
    w2: unlock('stream_id') + w3: unlock('stream_id') -- could unlock another worker's lock

We here illustrate several issues with the mutex logic: once a worker released the mutex, all other workers execute the get_stream_id() logic concurrently. Additionally, once they are done, they also release a lock that was already released (and potentially release a mutex set by an hypothetical w4, which is the second issue with this logic).

To have one source of truth for stream ids, and to ensure that source of truth's access is protected by a mutex so no two workers can modify it at the same time, you should take a look at the double-ended queue capabilities of ngx_lua (see ngx.shared.dict:lpush() et al.). However, I believe inserting a mutex in such a hot code paths (if cluster:execute() is called once for each request) will create a bottleneck. If you are doing read operations, you can use a solution like the new lua-resty-mlcache, but Cassandra being a write-oriented database, this is an issue for time-series metrics for example. A benchmark of this would be interesting.

I haven't looked into it, but do you happen to know what happens when two frames are using the same stream_id? Does Cassandra refuses to process a query that bears the same stream_id as an already-running query? I will check the CQL protocol to dig for clarifications...

It is also worth noting that connection pools are only per-worker, and not per Nginx instances. This information could be meaningful when deciding whether several workers can use the same stream_id or not... A possible option could be to divide the pool of available ids per worker (i.e. w1 gets 1-16383, and w2 gets 16384-32767). This is just a wild idea because it might not be applicable in practice - not sure.

@thibaultcha
Copy link
Owner

@drolando Hi; any thoughts on the above?

@drolando
Copy link
Contributor Author

drolando commented Sep 27, 2017

@thibaultcha Sorry, I completely missed you comments.

protocol v3: https://github.com/apache/cassandra/blob/trunk/doc/native_protocol_v3.spec#L130

this logic will not prevent two workers from using similar stream ids at the same time

True, but that's not important. Each worker has its own connection pool, so they can use the same id if they want.
The id is used to differentiate responses on the same connection so it only has to be unique per connection. Since here we don't have control on which connection we'll use, we have to make them unique for the entire connection pool.

Does Cassandra refuses to process a query that bears the same stream_id as an already-running query?

The protocol only says Cassandra will put the same id in the response, but it won't validate it in any way.

@thibaultcha
Copy link
Owner

thibaultcha commented Sep 27, 2017

@drolando No worries.

True, but that's not important. Each worker has its own connection pool, so they can use the same id if they want.

Indeed they do have separate connection pools (as highlighted in my previous comment). Then what is the goal of the mutex in this patch? Wasn't it to prevent two workers from using the same stream id?

The protocol only says Cassandra will put the same id in the response, but it won't validate it in any way.

Hmm indeed. For some reason I wouldn't feel comfortable just making that assumption though... I'd be inclined to double check that (although I do have a hunch this assumption will turn true).

In the meantime, should we rework this logic to not be built around an shm mutex anymore?

@drolando
Copy link
Contributor Author

Then what is the goal of the mutex in this patch? Wasn't it to prevent two workers from using the same stream id?

What I wanted to prevent is 2 coroutines on the same worker from using the same id. However I just remembered that nginx non pre-emptive, we don't really risk any race condition, right?

In that case I could just remove the mutex entirely

@drolando
Copy link
Contributor Author

drolando commented Oct 5, 2017

Sorry for the delay on this. I now have some spare time so I'll work in finish upstreaming this change.

I have already made most changes suggested in the above comments, I'm just stuck in trying to run the tests. I'll submit a few other pull-requests while I fix them.

@drolando
Copy link
Contributor Author

drolando commented Oct 6, 2017

A bunch of changes here:

I removed the mutex as discussed. This meant I could move the stream_id implementation inside cassandra/init.lua since I don't need anything openresty specific anymore.

To improve the performance of pushing and popping from the stream_ids list I decided to use a deque. I copied its implementation from the "programming in lua" book, I've no idea why they put it there but not in the stdlib... The implementation is quite simple and reduces the time complexity to O(1) for both push and pop.

I also added a bunch of unit tests to make sure the stream_id is released and that the code does the right thing if it receives a response with the wrong stream_id in the header.

@drolando
Copy link
Contributor Author

drolando commented Oct 6, 2017

I've also rebased on master so hopefully the tests should now pass

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants