Our team at Scality has been working for years on QuadIron, an open source library which implements Fast Fourier Transform (FFT) based erasure codes in binary, prime, and extension fields, and a set of optimizations to make them practical for a very large number of symbols. These FFT based erasure codes could be useful for applications like decentralized storage over the internet. We’ll be presenting QuadIron at SNIA Storage Developer Conference in Santa Clara (CA) on Tuesday Sep 25.
As drive density continues to be driven higher in keeping with Moore’s Law, the price of storage continues to fall. This makes extra data copies cheaper, data can be stored reliably on relatively unreliable Internet servers by making extra copies. For example, generating and spreading hundreds of fragments from a file makes it possible to reconstruct the data while having only a fraction of the total data available. The QuadIron library provides a framework to generate such erasure codes efficiently.
Erasure codes are a form of error correction code that use bit erasures transforming a message into longer messages made of pieces such that the original message can be recovered from a subset of those pieces.
A C(n,k) erasure code is defined by n=k+m, k being the number of data fragments, m being the number of desired erasure fragments. In an application it is required to transmit the n fragments. A Maximum Distance Separable (MDS) code guarantees that any k fragments can be used to decode a file. Erasure codes can be either systematic or non-systematic. Systematic codes generate n-k erasure fragments and therefore maintain k data fragments. Non-systematic codes generate n erasure fragments. In the case of systematic codes, we try to retrieve primarily the k data fragments if possible because there is nothing to decode. A decoding is necessary only if one or more data fragments are missing. In the case of non-systematic codes, we need to decode k fragments. Erasure codes can also be compared by their sensitivity to the rate r=k/n, which may or may not impact the encoding and decoding speed. Another comparison criterion is the support of adaptive rate: does the erasure code allows to change k and m dynamically, without having to regenerate the whole set of erasure fragments. Another critical property is called the ‘confidentiality’ which is determined if an attacker can partially decode the data if he obtains less than k fragments. Finally, we can also compare erasure code according to their repair bandwidth, i.e. the number of fragments required to repair a fragment. To sum up, here is a list of codes properties that are of interest for us:
MDS/non-MDS
Systematic/Non-systematic
Encoding/Decoding speed according to various n
Encoding/Decoding speed predictivity and stability acc/ to n
Rate sensitivity
Adaptive vs non-adaptive rate
Confidentiality
Repair bandwidth
Reed-Solomon (RS) codes are MDS codes constructed from Vandermonde or Cauchy matrices that support both systematic and adaptive rates properties. The RS encoding process is traditionally performed in ways that lead to a high complexity. The topic of optimizing those codes has been widely discussed but mostly around hardware optimizations.
Low-Density-Parity-Check (LDPC) codes are also an important class of erasure codes and are constructed over sparse parity-check matrices. Although initially used in networking applications, some researchers recently showed that it is possible to use them in distributed storage scenarios. Those codes, which even though require to store n=k+m fragments (like MDS codes), need to retrieve k*f fragments to recover the data (instead of only k for MDS codes), f being called the overhead or the inefficiency. In general f oscillates between 1.10 and 1.30 for n <100, and when n > 1000, f is approaching 1.0. Those codes are more sensible to network latency because of the extra fragments, due to the overhead, that they need to retrieve, so in cases where latency can be important RS codes seems more interesting than LDPC codes. More recently hybrid-LDPC schemes have reduced the overhead to k+f with a very small f. Also it is possible to design LDPC codes which beat RS codes when taking into account the repair bandwidth, because RS codes always need to retrieve k fragments to be able to repair the data, while it is possible to design LDPC codes that require less than k fragments for the repair process. However:
LDPC are not MDS: it is always possible to find a pattern (e.g. stopping sets) that cannot decode (e.g. having only k fragments out of n).
You can always find/design an LDPC code optimized for few properties (i.e. tailored for a specific use case) that beats other codes on those few properties, but there is no silver bullet: it will be sub-optimal for the other properties (its a trade-off, e.g. good for large n and with an optimal repair bandwidth, but not good for small n and cannot support adaptive rate): these cannot be used in a generic library.
Designing a good LDPC code is some kind of black art that requires a lot of fine tuning and experimentation. Ultimately an LDPC code optimal for all the interesting properties for a given use case could exist but would be very complex and/or would only be available in a commercial library.
Recently some other types of codes, called Locally-Repairable-Codes (LRC) have tackled the repair bandwidth issue of the RS codes. They combine multiple layers of RS: the local codes and the global codes. However those codes are not MDS and they require an higher storage overhead than MDS codes.
Fast Fourier transform (FFT) based RS codes remain relatively simple, and can be used to perform encoding on finite fields with clearer and lower announced complexities therefore having a good set of desirable properties. We focus our research on optimizing their encode and decode speed.
Since chosen finite fields dominate the computational complexities of FFT operations, we investigate two types of finite field: prime finite fields and binary extension finite fields. For each type, there are different approaches to accelerate FFT operations.
These codes offer superior encoding performance compared to matrix-based RS erasure codes for applications requiring more than 24 symbols, and are simpler than LDPC codes, while supporting all the desirable properties: Fast, MDS, systematic or non-systematic, confidential when non-systematic, predictive and rate insensitive. The systematicity is not critical for a decentralized storage application because as k augments the chance of losing a data fragment also augments. The optimization of repair bandwidth are not critical for a decentralized storage archive application as people download the files in their entirety. The most important property for us remains the MDS property as a rock solid contract: being sure than if k fragments are available then the data is recoverable.
All of those properties make us think that the QuadIron library and NTT based erasure codes are suitable for a decentralized storage archival application over the Internet.
The library is open source (BSD 3-clause license), available on GitHub. Ask questions on the forum.
Today Scality announces the first stable release of MetalK8s, the open source K8s distribution focused on bare-metal deployments, long-term maintenance and ease of operation. To bolster these efforts, it joined the Linux Foundation and Cloud Native Computing Foundation (CNCF) as a Silver member.
As the number of organizations using multi-cloud environments continues to increase, Kubernetes is becoming a standard way to manage applications. Scality has invested time and resources over the past 18 months to find the best way to deploy and manage its next-generation product line, and Kubernetes emerged as a clear winner.
It’s exciting to deepen our open source strategy by joining the Linux Foundation and be active with CNCF. With our flagship open source project Zenko we’re incessantly building a strong community and with MetalK8s reaching v.1 we’re hoping to get more people excited about all of our cloud projects.
The team was looking for the best solutions to manage Zenko on-premise for large customers. Early versions of Zenko employed Docker Swarm but the limits in that approach became quickly apparent. Looking at other K8s implementations, no option emerged as a clear winner, so the team decided to build a new solution based on other open source projects.
MetalK8s has a strong foundation with the open source installer Kubespray and other tools, like Prometheus and Grafana. However, because the Scality team has parsed the often baffling options, MetalK8s’ deployments only require few key decisions from operators. The result is a simplified deployment path for a new Kubernetes cluster on bare-metal, with easier long-term maintenance.
People will have strong feelings about the choices made to simplify Kubernetes deployments. That’s why we like to call MetalK8s the opinionated Kubernetes distribution.
MetalK8s version 1.0 comes with default dashboards that help operators keep control of the cluster based on the following services:
Grafana Monitoring dashboards for cluster services
Cerebro An administration and monitoring console for Elasticsearch clusters
Kibana A search console for logs indexed in Elasticsearch
Deployments using release 1.0 can be upgraded to later MetalK8s 1.x versions. The upcoming Zenko version 1 is deployed on a Kubernetes cluster by default: the enterprise edition will initially support only MetalK8s clusters. The open source code has no such limitations and can run on existing or managed clusters like GKE, simply using the same Helm packages.
As part of Zenko’s commitment to enable multi-cloud data control, we evaluated Apache Spark as the tool to perform metadata search across multiple clouds. Spark promises ad-hoc queries on arbitrary data without prior indexation, it sounded perfect for the need of a petabyte-scale multi-cloud storage. However, we came to realize that Spark expects semi-structured data, which prevents ad-hoc queries and is not exactly arbitrary data. In our tests we realized that parquet files need to be updated and Spark couldn’t handle raw metadata wouldn’t work, forcing us to post-process it to parquet files. We realized that we were using Spark the wrong way and decided to redesign Zenko metadata system with MongoDB.
Evaluate Spark Performance For Zenko Metadata Search
Zenko architecture team setup a series of tests to evaluate Spark’s performance to enable Zenko’s metadata search capabilities. The lab was setup on our internal OpenStack-powered Scality Cloud, with five 8-core instances called swarm1, swarm2, swarm3, swarm4 and swarm5 running CentOS 7.2. Each of the instance has 50GB of memory except swarm1 which has 64GB. The machines run Scality S3 Connector and Docker Swarm is used to launch Spark version 2.1.0 master and workers. For the tests we used Hadoop version 2.7.3.
Process:
Put x number of objects on a separate S3 instance using bucketfile.
Use command line tool to convert leveldb sst (because we use leveldb in our metadata engine) and log files to line delimited json files.
Put converted files on the S3 Connector
For metadata of 1 million objects: s3a://onemillion/stuff (about 100 files totalling 823 MB)
For metadata of 10 million objects: s3a://gobigger/stuff (about 1,000 files totalling 8.5 GB)
For metadata of 100 million objects: s3a://gobiggest/stuff (about 8,043 files totalling 79 GB)
Specified executor memory for each worker at 40 GB when 1 worker, 14 GB when 5 workers (except used 40g for the 100 million workload), and 25 GB when 10 workers (2 on the same servers, not 10 independent servers).
Call json.read with proper s3a path, provide schema and call cache.
Do one sql query and call show so have action that will trigger actual read.
Note that need some action so data will actually be read. Did a little testing and calling count() as the action seemed to be similar time as calling a query and show so did test with query and show.
Command:
val t0 = System.currentTimeMillis()
val translatedDF = spark.read.schema(sstSchema).json("s3a://BUCKETNAME/stuff").cache()
translatedDF.createOrReplaceTempView("translatedMD")
val getcertain = spark.sql("SELECT * FROM translatedMD WHERE userMd['x-amz-meta-someotherkey']='someOtherMetadatapartof100millionput977927'")
getcertain.show(10, false)
val t1 = System.currentTimeMillis()
println("Elapsed time: " + (t1 - t0) + "ms")
translatedDF.unpersist()
To test query time:
Do sql query and call show.
Command:
var x = 0
for( x <- 1 to 100 ){
val t2 = System.currentTimeMillis()
val getcertain2 = spark.sql("SELECT * FROM translatedMD WHERE userMd['x-amz-meta-someotherkey']='someOtherMetadatapartof100millionput977926'")
getcertain2.show(10, true)
val t3 = System.currentTimeMillis()
println("Elapsed time: " + (t3 - t2) + "ms")
}
Caveats:
The sst and log files are pre-translated to line delimited json so time to do that work is not accounted for.
We are not filtering for duplicates based on incrementer so time to do that work (which will likely involve a shuffle) is not accounted for.
If lose workers when loading data, subsequent query will take longer. For instance, with 100 million objects, lost 2 workers during read. Subsequent query then took around 60 seconds.
Numbers reflect accessing data through one s3 instance.
Results
Metadata for 1 million objects (all numbers are average of 10 runs)
1 worker
5 workers
5 workers parquet
10 workers
Read time in sec
14.24 sec
15.59 sec
12.66 sec
17.63 sec
Query time in ms
260 ms
197 ms
196 ms
258 ms
Memory used on worker for read
4.82 GB
1.66 GB
1.23 GB
1.15 GB
Memory used on master for read
< 1 GB
1.14 GB
1.11 GB
1.21 GB
Metadata for 10 million objects (all numbers are average of 10 runs)
Note that the read times may be artificially low here since the initial read upon starting the spark shell takes longer. The initial read upon starting the spark shell was 44 seconds (1 worker), 41 seconds (5 workers) and 52 seconds (10 workers). For the tests of 1 million and 100 million, the numbers all reflect a read upon starting the Spark shell.
1 worker
5 workers
5 workers Parquet
10 workers
Read time in sec
34.1 sec
30.42 sec
18.507 sec
37.76 sec
Query time in ms
575 ms
376 ms
342 ms
360 ms
Memory used on worker for read
20.09 GB
6.43 GB
5.11 GB
5.97 GB
Memory used on master for read
< 1 GB
1.21 GB
1.47 GB
1.3 GB
Metadata for 100 million objects (all numbers are average of 10 runs)
1 worker
5 workers
5 workers parquet
10 workers
Read time in sec
348.6 sec
(5.81 min)
184.02 sec
(3.07 min)
73.842 sec (1.23 min)
281.4 sec
(4.69 min)
Query time in ms
72,110 ms
(72.110 sec)
1,745 ms
(1.745 sec)
1,784 ms
(1.784 sec)
2,165 ms
(2.165 sec)
Memory used on worker for read
36.76 GB
24.37 GB
23.86 GB
14.03 GB
Memory used on master for read
< 1 GB
1.18 GB
1.13 GB
1.28 GB
Conclusions
These numbers convinced us that we were on the wrong path and the team needed to go back to the drafting board. The tests showed that Spark needed a lot of resources to function properly in our scenario, which forced a very high latency due to the need to reload all parquet files every time there was a change. We realized that we were trying to force Spark into an unnatural role, so we switched Zenko to a new design based on MongoDB.
Today, we are breaking convention to make it easier to run Kubernetes (K8s) on bare metal servers to support stateful applications that need stable, persistent storage. To do this, we built MetalK8s, an open source, opinionated K8s distribution, to simplify on-premises deployments. It is available now on GitHub, under the Apache Software License v2.
We made the early design choice to trust K8s as the infrastructure foundation for Scality’s Zenko Multi-Cloud Data Controller. Since Zenko is meant to run in any cloud environment and also on-premises for the most demanding enterprise customers, K8s is the perfect tool to run Zenko the same way whether it’s on Amazon, Azure, Google Cloud, in a private cloud or on bare metal.
Because the on-premises bare metal K8s experience is not prevalent in the community, we decided to take the time to find the best way to deploy Kubernetes on bare metal servers for Zenko.
Why are we choosing to go Bare Metal?
Kubernetes itself grew up in virtualized environments, which is natural given its purpose to orchestrate distributed container environments. We realized that very few people dare to run K8s on bare metal, and actually most have no choice but to run it on virtual infrastructure. In the course of our development, though, we discovered that there are several huge benefits to be gained from deploying on bare metal. But, this is only true when developers and operators find all of the tools they need for smooth, long-term operations.
While developing Zenko on Kubernetes, we required efficient access to stateful local storage for both metadata and data. Moreover, as Zenko is a distributed environment, we really wanted to optimize the proximity of compute and storage to the same machine that has the local storage. For applications that require this type of storage access efficiency, the K8s environment has never been optimal. By default, K8s can otherwise force the use of an expensive SAN or cloud block-storage volumes. With MetalK8s, we resolve this problem and are enabling fast local storage access for container-based applications.
Why an Opinionated Distribution?
We chose to go the ‘opinionated’ route because we have made some specific choices in the course of our development: MetalK8s’ goal is to provide great functionality while reducing complexity for other users and delivering the stateful storage efficiencies described earlier.
Our team specifically chose to leverage an existing project rather than reinvent the wheel, so we based MetalK8s on top of the excellent open-source Kubespray ‘playbook’ project. Kubespray enables us to install a base Kubernetes cluster reliably using the Ansible provisioning tool with its dependencies (e.g.; the etcd distributed database system). This allowed us to quickly iterate and implement the features we need to run Kubernetes at the scale needed by Scality customers. This is where our own Scality DevOps team excels, and so this stayed in line with our focus on ease of operations. Contrary to Kubespray’s general-purpose approach, we decided to make hard choices like use Calico as the only Container Network Interface (CNI) implementation. Further, an “ingress controller” is deployed by default, based on Nginx. And for simplicity, all of these are managed as Helm packages.
The installation is further augmented with a set of powerful operational tools for monitoring and metering, including Prometheus, Grafana, ElasticSearch and Kibana.
Unlike hosted Kubernetes solutions, where network-attached storage is available and managed by the provider, MetalK8s assumes no such system to be available in environments where MetalK8s itself is deployed. This means its focus is on managing node-local storage and exposing local volumes to containers managed in the cluster.
Contributing Back
The team plans to work with upstream projects including Kubespray, Kubernetes, Helm Charts and others to release back all useful contributions and eventually implement new features.
Maz from our team at Scality has been working on a simple guide explaining how to use Zenko and Orbit to replicate data between Microsoft Azure Blob storage and Amazon S3.
This is a very important milestone for us as it shows how easy it is to just create an account and login into the Zenko management portal, create a Zenko sandbox and start replicating data between 2 completely different public clouds replication wizard, no command line required. – Giorgio Regni
Why is this news worthy?
It is all about data durability and availability!
Replicating your data across different providers is a great way to increase its protection and guarantee that your data will always be available, even in the case of a catastrophic failure:
In terms of durability, we now have two independent services each of which has a durability of eleven 9’s. By storing data across both clouds, we can increase our data durability to “22 9’s” that makes a data loss event a statistically negligible probability.
We can also take advantage of immutability through object versioning in one or more of the cloud services, for even greater protection. We have also gained disaster recovery (D/R) protection, meaning the data is protected in the event of a total site disaster or loss.
In terms of data availability, what are the chances that two cloud regions in one service (for example, AWS US East and AWS US West) are unavailable at the same time? Stretching this further, what are the chances that two INDEPENDENT cloud services such as AWS S3 and Azure Blob Storage are unavailable at the same time?
Customers and apps are embracing cloud storage, we see this across many different businesses and industries. Since managing multiple cloud services can be complex due to all their differences, we set out to simplify this. We also have heard that customers really don’t want to bank everything on a single vendor’s cloud, especially their business data!
We launched our Zenko Multi-Cloud data controller earlier this year to provide a platform to simplify the lives of developers who are building applications requiring multi-cloud storage. The basic idea is if you know the AWS S3 API, you can use it to access any of the public cloud storage services supported by Zenko. This (of course) includes AWS S3 itself, but also cloud storage services that don’t natively support the S3 API such as Azure Blob Storage (supported now) and Google Cloud Storage (coming soon). Any conversation of multi-cloud should also consider private clouds, so we can include on-premises object storage as another “cloud” to be managed through Zenko.
So how can we take advantage of multiple clouds from an application or business perspective? There are a number of things that surface as potentially valuable, such as:
Reversibility: can I take my data back on-premises or move to another cloud?
Data proximity: How to take advantage of compute in the cloud, can I easily move my data to be close to the service that needs it?
Cost: How to control the cost of cloud storage, since there are differences across vendors, and also to make sure I don’t get trapped or locked in.
Where is my data: how do I search over everything in this cross-cloud content depot?
Durability: is it enough to store data in one-cloud, or can I take advantage of storing across two clouds (more on this one later!)
As introduced in our other blogs, Zenko has four basic capabilities to help simplify cloud storage:
A single API that works on all clouds, including AWS, Azure & Google
Data is stored natively in each cloud, so it can be accessed directly
Data workflows based on policies, across clouds
Finding data simply across multiple clouds
Simplified Multi-Cloud Management through the Zenko Orbit Portal
All of these features are powerful, but again the key question is how can we make this extremely simple to use? For this reason, we now have introduced Zenko Orbit, a cloud based portal that makes multi-cloud storage as simple as “point-n-click”.
Getting started is easy: just use your Gmail account to register and login. The first thing we simplify is starting up a cloud instance of Zenko itself. If you already have a Zenko instance created, just enter your InstanceID and Orbit will connect to it.
The Orbit dashboard provides useful information about your aggregate capacity utilization, and breaks it down across the various clouds. You can also easily manage your cloud storage accounts and credentials, plus monitor the Zenko instance resources and performance indicators.
Once these simple actions are completed, you are ready to use Zenko to access multi-cloud storage services. Orbit will soon offer an integrated data (object) browser, for easy upload/download of objects into these target clouds. In addition, we’re working on some very simple capabilities to provide additional business value from multi-cloud storage through Orbit.
In our next blog, we’ll explore a super interesting and high-potential use case for storing replicated objects (data) across two clouds, and what this can do for your data durability, availability – and of course ultimately, what does it mean for cost?