Hi,
We can use Cassandra's compression in addition to ours. There's always some
metadata in the blocks that could benefit a little bit (but not much). This does not help
with string metrics, but they benefit a lot from deflate compression already (and there
isn't necessarily much better algorithms than deflate for strings - LZMA2 is _a lot_
slower). For availabilities, we can benefit as there's only the timestamp delta-delta
to store (which could be close to 0 bits and value delta is also 0 bits).
Multiple metrics cause no differences here, if you fetch 360 rows from 25 metrics (9000
rows) or 25 rows. Each 360 rows equals same performance hit. The only case where we could
see some performance issue is comparing single datapoint fetching, but how realistic
reading scenario is that - and is it worth optimizing?
Nodes joining is certainly one aspect that must be monitored, however a single row
isn't that big. For example, when transferring the MiQ data, there are few rows which
have ~2800 datapoints. Those take ~1600 bytes. Largest row I could find is ~11kB. Lets say
our worst cases are around 5 bytes per stored timestamp, that gives us 8640 datapoints if
frequency is 10s and a total of ~43kB.
If we stick to the immutability, then we will have uncompressed data lying around for
out-of-order data that arrives later than the compression moment. Which would be X time
after the blockSize closes. However, given that uncompressed data is slower to read than
compressed, do we really want to have that immutability? We can also avoid cells
modification and instead use new rows (but which one it is depends of course on the
schema):
- Assume blockSize of one day, so each compressed block would have a timestamp starting
at 00:00:00.
- If we query something, we must always use as startTime the 00:00:00
- Now, that allows us to store data in 01:00:00, 02:00:00, etc. they would be found with
the same query and we could just merge the 01:00:00, 02:00:00 etc later to a single block
of 00:00:00.
- At the merge time we could also take all outlier timestamps and merge them to the
single block
Infinispan sounds like a fine option, I didn't evaluate any technical options yet for
that storage.
- Micke
----- Original Message -----
From: "John Sanda" <jsanda(a)redhat.com>
To: "Discussions around Hawkular development"
<hawkular-dev(a)lists.jboss.org>
Sent: Wednesday, July 27, 2016 10:17:11 PM
Subject: Re: [Hawkular-dev] Metrics storage usage and compression
On Jul 27, 2016, at 6:19 AM, Michael Burman
<miburman(a)redhat.com> wrote:
Hi,
Lately there has been some discussion on the AOS scalability lists for our storage usage
when used in Openshift. While we can scale, the issue is that some customers do not wish
to allocate large amounts of storage for storing metrics, as I assume they view metrics
and monitoring as secondary functions (now that's whole another discussion..)
To the numbers, they're predicting that at maximum scale, Hawkular-Metrics would use
close to ~4TB of disk for one week of data. This is clearly too much, and we don't
deploy any other compression methods currently than LZ4, which according to my tests is
quite bad for our data model. So I created a small prototype that reads our current data
model, compresses it and stores it to a new data model (and verifies that the returned
data equals to sent data). For testing I used a ~55MB extract from the MiQ instance that
QE was running. One caveat of course here, the QE instance is not in heavy usage. For
following results, I decided to remove COUNTER type of data, as they looked to be
"0" in most cases and compression would basically get rid of all of them, giving
too rosy picture.
Would we use Cassandra’s compression in addition to this or turn it off?
Will this compression work with non-numeric data? I am wondering about availability and
string metrics. I don’t necessarily see it as a deal breaker if it only handles numeric
data because that is mostly what we’re dealing with, and in OpenShift neither availability
nor string metrics are currently used.
When storing to our current data model, the disk space taken by "data" table
was 74MB. My prototype uses the method of Facebook's Gorilla paper (same as what for
example Prometheus uses), and in this test I used a one day block size (storing one
metric's one day data to one row inside Cassandra). The end result was 3,1MB of
storage space used. Code can be found from
bitbucket.org/burmanm/compress_proto (Golang).
I know Prometheus advertises estimated 1.3 bytes per timestamped value, but those numbers
require certain sort of test data that does not represent anything I have (the compression
scheme's efficiency depends on the timestamp delta and value deltas and delta-deltas).
The prototype lacks certain features, for example I want it to encode compression type to
the first 1 byte of the header for each row - so we could add more compression types in
the future for different workloads - and availabilities would probably have better
compression if we changed the disk presentat!
ion to something bit based.
** Read performance
John brought up the first question - now that we store large amount of datapoints in a
single row, what happens to our performance when we want to read only some parts of the
data?
- We need to read rows we don't need and then discard those
+ We reduce the amount of rows read from the Cassandra (less overhead for driver &
server)
+ Reduced disk usage means we'll store more of the data in memory caches
How does this affect the end result? I'll skip the last part of the advantage in my
testing now and make sure all the reads for both scenarios are happening from the
in-memory SSTables or at least disk cache (the testing machine has enough memory to keep
everything in memory). For this scenario I stored 1024 datapoints for a single metric,
storing them inside one block of data, thus trying to maximize the impact of unnecessary
reads. I'm only interested in the first 360 datapoints.
In the scenario, our current method requests 360 rows from Cassandra and then processes
them. In the compressed mode, we request 1 row (which has 1024 stored metrics) and then
filter out those we don't need in the client. Results:
BenchmarkCompressedPartialReadSpeed-4 275371 ns/op
BenchmarkUncompressedPartialReadSpeed-4 1303088 ns/op
As we can see, filtering on the HWKMETRICS side yields quite a large speedup instead of
letting Cassandra to read so many rows (all of the rows were from the same partition in
this test).
We definitely want to do more testing with reads to understand the impact. We have several
endpoints that allow you to fetch data points from multiple metrics. It is great to see
these numbers for reading against a single metric. What about 5, 10, 20, etc. metrics? And
what about when the query spans multiple blocks?
Another thing we’ll need to test is how larger row sizes impact Cassandra streaming
operations which happen when nodes join/leave the cluster and during anti-entropy repair.
** Storing data
Next, lets address some issues we're going to face because of the distributed nature
of our solution. We have two issues compared to Prometheus for example (I use it as an
example as it was used by one Openshift PM) - we let data to arrive out-of-order and we
must deal with distributed nature of our data storage. We are also stricter when it comes
to syncing to the storage, while Prometheus allows some data to be lost in between the
writes. I can get back to optimization targets later.
For storing the data, to be able to apply this sort of compression to it, we would need
to always know the previous stored value. To be able to do this, we would need to do
read-write path to the Cassandra and this is exactly one of the weaknesses of
Cassandra's design (in performance and consistency). Clearly we need to overcome this
issue somehow, while still keeping those properties that let us have our advantages.
** First phase of integration
For the first phase, I would propose that we keep our current data model for short term
storage. We would store the data here as it arrives and then later rewrite it to the
compressed scheme in different table. For reads we would request data from the both tables
and merge the results. This should not be visible to the users at all and it's a
simple approach to the issue. A job framework such as the one John develops currently is
required.
If we know we have the uncompressed data, then we don’t need to read from the compressed
table(s). I would expect this to be the case for queries involving recent data, e.g., past
4 hours.
There are some open questions to this, and I hope some of you have some great ideas I
didn't think. Please read the optimization part also if I happened to mention your
idea as some future path.
- How often do we process the data and do we restrict the out-of-order capabilities to
certain timeslice? If we would use something like 4 hour blocks as default, should we
start compressing rows after one hour of block closing? While we can technically reopen
the row and reindex the whole block, it does not make sense to do this too often. If we
decide to go with the reindexing scenario, in that case we could start writing the next
block before it closes (like every 15 minutes we would re-encode the currently open blocks
if they have new incoming data). We have to be careful here as to not overwhelm our
processing power and Cassandra's. This is a tradeoff between minimum disk space usage
or minimum CPU/memory usage.
Query patterns are going to dictate idea block sizes. For a first (and probably second)
iteration, I say we go with a reasonable default like a day.
Does reindexing the row mean updating existing cells on disk? If so, I would like to think
about ways that allow us to continue keeping data immutable as we currently do.
- Compression block size changes. User could configure this - increasing it on the fly
is no problem for reads, but reducing is slightly more complex scenario). If user
increases the size of the block, our query would just pick some extra rows that are
instantly discarded, but nothing would break. However, decreasing the size would confuse
our Cassandra reads unless we know the time of the block size change and adjust queries
accordingly for times before this event and after.
** Optimizing the solution
The following optimizations would increase the performance of Hawkular-Metrics ingestion
rate a lot and as such are probably worth investigation at some point. But they're
also complex and I would want to refrain from implementing them in the first phase so that
we could get compression quicker to the product - so thta we would not miss certain
deadlines.
- Stop writing to the Cassandra in the first phase. Instead we write to something more
ephemeral, such as mmap backed memory cache that is distributed among the Hawkular nodes.
It would also need some sort of processing locality (direct the write to the node that
controls the hash of the metricId for example - sort of like HBase does), unless we want
to employ locks to prevent ordering issues if we encode already in the memory. From memory
we would then store blocks to the permanent Cassandra store. The clients need to be
token/hash-method aware to send data to the correct node.
We ought to look at Infinispan for this. We are already talking about other use cases for
Infinispan so it would be nice if it works out.
Lastly, this is great stuff!
_______________________________________________
hawkular-dev mailing list
hawkular-dev(a)lists.jboss.org
https://lists.jboss.org/mailman/listinfo/hawkular-dev