[Hawkular-dev] Metrics storage usage and compression

Michael Burman miburman at redhat.com
Thu Jul 28 10:29:30 EDT 2016


Hi,

We can use Cassandra's compression in addition to ours. There's always some metadata in the blocks that could benefit a little bit (but not much). This does not help with string metrics, but they benefit a lot from deflate compression already (and there isn't necessarily much better algorithms than deflate for strings - LZMA2 is _a lot_ slower). For availabilities, we can benefit as there's only the timestamp delta-delta to store (which could be close to 0 bits and value delta is also 0 bits).

Multiple metrics cause no differences here, if you fetch 360 rows from 25 metrics (9000 rows) or 25 rows. Each 360 rows equals same performance hit. The only case where we could see some performance issue is comparing single datapoint fetching, but how realistic reading scenario is that - and is it worth optimizing?

Nodes joining is certainly one aspect that must be monitored, however a single row isn't that big. For example, when transferring the MiQ data, there are few rows which have ~2800 datapoints. Those take ~1600 bytes. Largest row I could find is ~11kB. Lets say our worst cases are around 5 bytes per stored timestamp, that gives us 8640 datapoints if frequency is 10s and a total of ~43kB.

If we stick to the immutability, then we will have uncompressed data lying around for out-of-order data that arrives later than the compression moment. Which would be X time after the blockSize closes. However, given that uncompressed data is slower to read than compressed, do we really want to have that immutability? We can also avoid cells modification and instead use new rows (but which one it is depends of course on the schema):

  - Assume blockSize of one day, so each compressed block would have a timestamp starting at 00:00:00. 
  - If we query something, we must always use as startTime the 00:00:00
  - Now, that allows us to store data in 01:00:00, 02:00:00, etc. they would be found with the same query and we could just merge the 01:00:00, 02:00:00 etc later to a single block of 00:00:00. 
  - At the merge time we could also take all outlier timestamps and merge them to the single block

Infinispan sounds like a fine option, I didn't evaluate any technical options yet for that storage.

  - Micke

----- Original Message -----
From: "John Sanda" <jsanda at redhat.com>
To: "Discussions around Hawkular development" <hawkular-dev at lists.jboss.org>
Sent: Wednesday, July 27, 2016 10:17:11 PM
Subject: Re: [Hawkular-dev] Metrics storage usage and compression


> On Jul 27, 2016, at 6:19 AM, Michael Burman <miburman at redhat.com> wrote:
> 
> Hi,
> 
> Lately there has been some discussion on the AOS scalability lists for our storage usage when used in Openshift. While we can scale, the issue is that some customers do not wish to allocate large amounts of storage for storing metrics, as I assume they view metrics and monitoring as secondary functions (now that's whole another discussion..)
> 
> To the numbers, they're predicting that at maximum scale, Hawkular-Metrics would use close to ~4TB of disk for one week of data. This is clearly too much, and we don't deploy any other compression methods currently than LZ4, which according to my tests is quite bad for our data model. So I created a small prototype that reads our current data model, compresses it and stores it to a new data model (and verifies that the returned data equals to sent data). For testing I used a ~55MB extract from the MiQ instance that QE was running. One caveat of course here, the QE instance is not in heavy usage. For following results, I decided to remove COUNTER type of data, as they looked to be "0" in most cases and compression would basically get rid of all of them, giving too rosy picture.

Would we use Cassandra’s compression in addition to this or turn it off?

Will this compression work with non-numeric data? I am wondering about availability and string metrics. I don’t necessarily see it as a deal breaker if it only handles numeric data because that is mostly what we’re dealing with, and in OpenShift neither availability nor string metrics are currently used.

> 
> When storing to our current data model, the disk space taken by "data" table was 74MB. My prototype uses the method of Facebook's Gorilla paper (same as what for example Prometheus uses), and in this test I used a one day block size (storing one metric's one day data to one row inside Cassandra). The end result was 3,1MB of storage space used. Code can be found from bitbucket.org/burmanm/compress_proto (Golang). I know Prometheus advertises estimated 1.3 bytes per timestamped value, but those numbers require certain sort of test data that does not represent anything I have (the compression scheme's efficiency depends on the timestamp delta and value deltas and delta-deltas). The prototype lacks certain features, for example I want it to encode compression type to the first 1 byte of the header for each row - so we could add more compression types in the future for different workloads - and availabilities would probably have better compression if we changed the disk presentat!
> ion to something bit based.
> 
> ** Read performance
> 
> John brought up the first question - now that we store large amount of datapoints in a single row, what happens to our performance when we want to read only some parts of the data?
> 
> - We need to read rows we don't need and then discard those
> + We reduce the amount of rows read from the Cassandra (less overhead for driver & server)
> + Reduced disk usage means we'll store more of the data in memory caches
> 
> How does this affect the end result? I'll skip the last part of the advantage in my testing now and make sure all the reads for both scenarios are happening from the in-memory SSTables or at least disk cache (the testing machine has enough memory to keep everything in memory). For this scenario I stored 1024 datapoints for a single metric, storing them inside one block of data, thus trying to maximize the impact of unnecessary reads. I'm only interested in the first 360 datapoints.
> 
> In the scenario, our current method requests 360 rows from Cassandra and then processes them. In the compressed mode, we request 1 row (which has 1024 stored metrics) and then filter out those we don't need in the client. Results:
> 
> BenchmarkCompressedPartialReadSpeed-4     	    	    275371 ns/op
> BenchmarkUncompressedPartialReadSpeed-4   	    	   1303088 ns/op
> 
> As we can see, filtering on the HWKMETRICS side yields quite a large speedup instead of letting Cassandra to read so many rows (all of the rows were from the same partition in this test).  

We definitely want to do more testing with reads to understand the impact. We have several endpoints that allow you to fetch data points from multiple metrics. It is great to see these numbers for reading against a single metric. What about 5, 10, 20, etc. metrics? And what about when the query spans multiple blocks?

Another thing we’ll need to test is how larger row sizes impact Cassandra streaming operations which happen when nodes join/leave the cluster and during anti-entropy repair.

> 
> ** Storing data
> 
> Next, lets address some issues we're going to face because of the distributed nature of our solution. We have two issues compared to Prometheus for example (I use it as an example as it was used by one Openshift PM) - we let data to arrive out-of-order and we must deal with distributed nature of our data storage. We are also stricter when it comes to syncing to the storage, while Prometheus allows some data to be lost in between the writes. I can get back to optimization targets later.
> 
> For storing the data, to be able to apply this sort of compression to it, we would need to always know the previous stored value. To be able to do this, we would need to do read-write path to the Cassandra and this is exactly one of the weaknesses of Cassandra's design (in performance and consistency). Clearly we need to overcome this issue somehow, while still keeping those properties that let us have our advantages.
> 
> ** First phase of integration
> 
> For the first phase, I would propose that we keep our current data model for short term storage. We would store the data here as it arrives and then later rewrite it to the compressed scheme in different table. For reads we would request data from the both tables and merge the results. This should not be visible to the users at all and it's a simple approach to the issue. A job framework such as the one John develops currently is required.

If we know we have the uncompressed data, then we don’t need to read from the compressed table(s). I would expect this to be the case for queries involving recent data, e.g., past 4 hours.

> 
> There are some open questions to this, and I hope some of you have some great ideas I didn't think. Please read the optimization part also if I happened to mention your idea as some future path.
> 
>  - How often do we process the data and do we restrict the out-of-order capabilities to certain timeslice? If we would use something like 4 hour blocks as default, should we start compressing rows after one hour of block closing? While we can technically reopen the row and reindex the whole block, it does not make sense to do this too often. If we decide to go with the reindexing scenario, in that case we could start writing the next block before it closes (like every 15 minutes we would re-encode the currently open blocks if they have new incoming data). We have to be careful here as to not overwhelm our processing power and Cassandra's. This is a tradeoff between minimum disk space usage or minimum CPU/memory usage.

Query patterns are going to dictate idea block sizes. For a first (and probably second) iteration, I say we go with a reasonable default like a day.

Does reindexing the row mean updating existing cells on disk? If so, I would like to think about ways that allow us to continue keeping data immutable as we currently do.
> 
>  - Compression block size changes. User could configure this - increasing it on the fly is no problem for reads, but reducing is slightly more complex scenario). If user increases the size of the block, our query would just pick some extra rows that are instantly discarded, but nothing would break. However, decreasing the size would confuse our Cassandra reads unless we know the time of the block size change and adjust queries accordingly for times before this event and after.
> 
> ** Optimizing the solution
> 
> The following optimizations would increase the performance of Hawkular-Metrics ingestion rate a lot and as such are probably worth investigation at some point. But they're also complex and I would want to refrain from implementing them in the first phase so that we could get compression quicker to the product - so thta we would not miss certain deadlines.
> 
>  - Stop writing to the Cassandra in the first phase. Instead we write to something more ephemeral, such as mmap backed memory cache that is distributed among the Hawkular nodes. It would also need some sort of processing locality (direct the write to the node that controls the hash of the metricId for example - sort of like HBase does), unless we want to employ locks to prevent ordering issues if we encode already in the memory. From memory we would then store blocks to the permanent Cassandra store. The clients need to be token/hash-method aware to send data to the correct node.

We ought to look at Infinispan for this. We are already talking about other use cases for Infinispan so it would be nice if it works out.

Lastly, this is great stuff!
_______________________________________________
hawkular-dev mailing list
hawkular-dev at lists.jboss.org
https://lists.jboss.org/mailman/listinfo/hawkular-dev



More information about the hawkular-dev mailing list