Hi,
Lately there has been some discussion on the AOS scalability lists for our storage usage
when used in Openshift. While we can scale, the issue is that some customers do not wish
to allocate large amounts of storage for storing metrics, as I assume they view metrics
and monitoring as secondary functions (now that's whole another discussion..)
To the numbers, they're predicting that at maximum scale, Hawkular-Metrics would use
close to ~4TB of disk for one week of data. This is clearly too much, and we don't
deploy any other compression methods currently than LZ4, which according to my tests is
quite bad for our data model. So I created a small prototype that reads our current data
model, compresses it and stores it to a new data model (and verifies that the returned
data equals to sent data). For testing I used a ~55MB extract from the MiQ instance that
QE was running. One caveat of course here, the QE instance is not in heavy usage. For
following results, I decided to remove COUNTER type of data, as they looked to be
"0" in most cases and compression would basically get rid of all of them, giving
too rosy picture.
When storing to our current data model, the disk space taken by "data" table was
74MB. My prototype uses the method of Facebook's Gorilla paper (same as what for
example Prometheus uses), and in this test I used a one day block size (storing one
metric's one day data to one row inside Cassandra). The end result was 3,1MB of
storage space used. Code can be found from
bitbucket.org/burmanm/compress_proto (Golang).
I know Prometheus advertises estimated 1.3 bytes per timestamped value, but those numbers
require certain sort of test data that does not represent anything I have (the compression
scheme's efficiency depends on the timestamp delta and value deltas and delta-deltas).
The prototype lacks certain features, for example I want it to encode compression type to
the first 1 byte of the header for each row - so we could add more compression types in
the future for different workloads - and availabilities would probably have better
compression if we changed the disk presentation to something bit based.
** Read performance
John brought up the first question - now that we store large amount of datapoints in a
single row, what happens to our performance when we want to read only some parts of the
data?
- We need to read rows we don't need and then discard those
+ We reduce the amount of rows read from the Cassandra (less overhead for driver &
server)
+ Reduced disk usage means we'll store more of the data in memory caches
How does this affect the end result? I'll skip the last part of the advantage in my
testing now and make sure all the reads for both scenarios are happening from the
in-memory SSTables or at least disk cache (the testing machine has enough memory to keep
everything in memory). For this scenario I stored 1024 datapoints for a single metric,
storing them inside one block of data, thus trying to maximize the impact of unnecessary
reads. I'm only interested in the first 360 datapoints.
In the scenario, our current method requests 360 rows from Cassandra and then processes
them. In the compressed mode, we request 1 row (which has 1024 stored metrics) and then
filter out those we don't need in the client. Results:
BenchmarkCompressedPartialReadSpeed-4 275371 ns/op
BenchmarkUncompressedPartialReadSpeed-4 1303088 ns/op
As we can see, filtering on the HWKMETRICS side yields quite a large speedup instead of
letting Cassandra to read so many rows (all of the rows were from the same partition in
this test).
** Storing data
Next, lets address some issues we're going to face because of the distributed nature
of our solution. We have two issues compared to Prometheus for example (I use it as an
example as it was used by one Openshift PM) - we let data to arrive out-of-order and we
must deal with distributed nature of our data storage. We are also stricter when it comes
to syncing to the storage, while Prometheus allows some data to be lost in between the
writes. I can get back to optimization targets later.
For storing the data, to be able to apply this sort of compression to it, we would need to
always know the previous stored value. To be able to do this, we would need to do
read-write path to the Cassandra and this is exactly one of the weaknesses of
Cassandra's design (in performance and consistency). Clearly we need to overcome this
issue somehow, while still keeping those properties that let us have our advantages.
** First phase of integration
For the first phase, I would propose that we keep our current data model for short term
storage. We would store the data here as it arrives and then later rewrite it to the
compressed scheme in different table. For reads we would request data from the both tables
and merge the results. This should not be visible to the users at all and it's a
simple approach to the issue. A job framework such as the one John develops currently is
required.
There are some open questions to this, and I hope some of you have some great ideas I
didn't think. Please read the optimization part also if I happened to mention your
idea as some future path.
- How often do we process the data and do we restrict the out-of-order capabilities to
certain timeslice? If we would use something like 4 hour blocks as default, should we
start compressing rows after one hour of block closing? While we can technically reopen
the row and reindex the whole block, it does not make sense to do this too often. If we
decide to go with the reindexing scenario, in that case we could start writing the next
block before it closes (like every 15 minutes we would re-encode the currently open blocks
if they have new incoming data). We have to be careful here as to not overwhelm our
processing power and Cassandra's. This is a tradeoff between minimum disk space usage
or minimum CPU/memory usage.
- Compression block size changes. User could configure this - increasing it on the fly
is no problem for reads, but reducing is slightly more complex scenario). If user
increases the size of the block, our query would just pick some extra rows that are
instantly discarded, but nothing would break. However, decreasing the size would confuse
our Cassandra reads unless we know the time of the block size change and adjust queries
accordingly for times before this event and after.
** Optimizing the solution
The following optimizations would increase the performance of Hawkular-Metrics ingestion
rate a lot and as such are probably worth investigation at some point. But they're
also complex and I would want to refrain from implementing them in the first phase so that
we could get compression quicker to the product - so thta we would not miss certain
deadlines.
- Stop writing to the Cassandra in the first phase. Instead we write to something more
ephemeral, such as mmap backed memory cache that is distributed among the Hawkular nodes.
It would also need some sort of processing locality (direct the write to the node that
controls the hash of the metricId for example - sort of like HBase does), unless we want
to employ locks to prevent ordering issues if we encode already in the memory. From memory
we would then store blocks to the permanent Cassandra store. The clients need to be
token/hash-method aware to send data to the correct node.
Benefits for that solution is increased write speed as we such backend easily reaches a
million writes per second and the only bottleneck would be our JSON parsing performance.
Reads could be served from both storages without much overhead. This optimization would be
worth it even without the compression layer, but I would say this is not our most urgent
issue (but if the write ingestion speed becomes an issue, this is the best solution to
increasing and it's used in many Cassandra solutions, for time series I think SignalFX
uses somewhat same approach, although they first write to Kafka).