[Hawkular-dev] Metrics storage usage and compression

Michael Burman miburman at redhat.com
Wed Jul 27 06:19:31 EDT 2016


Hi,

Lately there has been some discussion on the AOS scalability lists for our storage usage when used in Openshift. While we can scale, the issue is that some customers do not wish to allocate large amounts of storage for storing metrics, as I assume they view metrics and monitoring as secondary functions (now that's whole another discussion..)

To the numbers, they're predicting that at maximum scale, Hawkular-Metrics would use close to ~4TB of disk for one week of data. This is clearly too much, and we don't deploy any other compression methods currently than LZ4, which according to my tests is quite bad for our data model. So I created a small prototype that reads our current data model, compresses it and stores it to a new data model (and verifies that the returned data equals to sent data). For testing I used a ~55MB extract from the MiQ instance that QE was running. One caveat of course here, the QE instance is not in heavy usage. For following results, I decided to remove COUNTER type of data, as they looked to be "0" in most cases and compression would basically get rid of all of them, giving too rosy picture.

When storing to our current data model, the disk space taken by "data" table was 74MB. My prototype uses the method of Facebook's Gorilla paper (same as what for example Prometheus uses), and in this test I used a one day block size (storing one metric's one day data to one row inside Cassandra). The end result was 3,1MB of storage space used. Code can be found from bitbucket.org/burmanm/compress_proto (Golang). I know Prometheus advertises estimated 1.3 bytes per timestamped value, but those numbers require certain sort of test data that does not represent anything I have (the compression scheme's efficiency depends on the timestamp delta and value deltas and delta-deltas). The prototype lacks certain features, for example I want it to encode compression type to the first 1 byte of the header for each row - so we could add more compression types in the future for different workloads - and availabilities would probably have better compression if we changed the disk presentation to something bit based.

** Read performance

John brought up the first question - now that we store large amount of datapoints in a single row, what happens to our performance when we want to read only some parts of the data?

 - We need to read rows we don't need and then discard those
 + We reduce the amount of rows read from the Cassandra (less overhead for driver & server)
 + Reduced disk usage means we'll store more of the data in memory caches

How does this affect the end result? I'll skip the last part of the advantage in my testing now and make sure all the reads for both scenarios are happening from the in-memory SSTables or at least disk cache (the testing machine has enough memory to keep everything in memory). For this scenario I stored 1024 datapoints for a single metric, storing them inside one block of data, thus trying to maximize the impact of unnecessary reads. I'm only interested in the first 360 datapoints.

In the scenario, our current method requests 360 rows from Cassandra and then processes them. In the compressed mode, we request 1 row (which has 1024 stored metrics) and then filter out those we don't need in the client. Results:

BenchmarkCompressedPartialReadSpeed-4     	    	    275371 ns/op
BenchmarkUncompressedPartialReadSpeed-4   	    	   1303088 ns/op

As we can see, filtering on the HWKMETRICS side yields quite a large speedup instead of letting Cassandra to read so many rows (all of the rows were from the same partition in this test).  

** Storing data

Next, lets address some issues we're going to face because of the distributed nature of our solution. We have two issues compared to Prometheus for example (I use it as an example as it was used by one Openshift PM) - we let data to arrive out-of-order and we must deal with distributed nature of our data storage. We are also stricter when it comes to syncing to the storage, while Prometheus allows some data to be lost in between the writes. I can get back to optimization targets later.

For storing the data, to be able to apply this sort of compression to it, we would need to always know the previous stored value. To be able to do this, we would need to do read-write path to the Cassandra and this is exactly one of the weaknesses of Cassandra's design (in performance and consistency). Clearly we need to overcome this issue somehow, while still keeping those properties that let us have our advantages.

** First phase of integration

For the first phase, I would propose that we keep our current data model for short term storage. We would store the data here as it arrives and then later rewrite it to the compressed scheme in different table. For reads we would request data from the both tables and merge the results. This should not be visible to the users at all and it's a simple approach to the issue. A job framework such as the one John develops currently is required.

There are some open questions to this, and I hope some of you have some great ideas I didn't think. Please read the optimization part also if I happened to mention your idea as some future path.

  - How often do we process the data and do we restrict the out-of-order capabilities to certain timeslice? If we would use something like 4 hour blocks as default, should we start compressing rows after one hour of block closing? While we can technically reopen the row and reindex the whole block, it does not make sense to do this too often. If we decide to go with the reindexing scenario, in that case we could start writing the next block before it closes (like every 15 minutes we would re-encode the currently open blocks if they have new incoming data). We have to be careful here as to not overwhelm our processing power and Cassandra's. This is a tradeoff between minimum disk space usage or minimum CPU/memory usage.

  - Compression block size changes. User could configure this - increasing it on the fly is no problem for reads, but reducing is slightly more complex scenario). If user increases the size of the block, our query would just pick some extra rows that are instantly discarded, but nothing would break. However, decreasing the size would confuse our Cassandra reads unless we know the time of the block size change and adjust queries accordingly for times before this event and after.

** Optimizing the solution

The following optimizations would increase the performance of Hawkular-Metrics ingestion rate a lot and as such are probably worth investigation at some point. But they're also complex and I would want to refrain from implementing them in the first phase so that we could get compression quicker to the product - so thta we would not miss certain deadlines.

  - Stop writing to the Cassandra in the first phase. Instead we write to something more ephemeral, such as mmap backed memory cache that is distributed among the Hawkular nodes. It would also need some sort of processing locality (direct the write to the node that controls the hash of the metricId for example - sort of like HBase does), unless we want to employ locks to prevent ordering issues if we encode already in the memory. From memory we would then store blocks to the permanent Cassandra store. The clients need to be token/hash-method aware to send data to the correct node.

Benefits for that solution is increased write speed as we such backend easily reaches a million writes per second and the only bottleneck would be our JSON parsing performance. Reads could be served from both storages without much overhead. This optimization would be worth it even without the compression layer, but I would say this is not our most urgent issue (but if the write ingestion speed becomes an issue, this is the best solution to increasing and it's used in many Cassandra solutions, for time series I think SignalFX uses somewhat same approach, although they first write to Kafka).



More information about the hawkular-dev mailing list