As part of Zenko’s commitment to enable multi-cloud data control, we evaluated Apache Spark as the tool to perform metadata search across multiple clouds. Spark promises ad-hoc queries on arbitrary data without prior indexation, it sounded perfect for the need of a petabyte-scale multi-cloud storage. However, we came to realize that Spark expects semi-structured data, which prevents ad-hoc queries and is not exactly arbitrary data. In our tests we realized that parquet files need to be updated and Spark couldn’t handle raw metadata wouldn’t work, forcing us to post-process it to parquet files. We realized that we were using Spark the wrong way and decided to redesign Zenko metadata system with MongoDB.
Evaluate Spark Performance For Zenko Metadata Search
Zenko architecture team setup a series of tests to evaluate Spark’s performance to enable Zenko’s metadata search capabilities. The lab was setup on our internal OpenStack-powered Scality Cloud, with five 8-core instances called swarm1, swarm2, swarm3, swarm4 and swarm5 running CentOS 7.2. Each of the instance has 50GB of memory except swarm1 which has 64GB. The machines run Scality S3 Connector and Docker Swarm is used to launch Spark version 2.1.0 master and workers. For the tests we used Hadoop version 2.7.3.
Process:
- Put x number of objects on a separate S3 instance using bucketfile.
- Use command line tool to convert leveldb sst (because we use leveldb in our metadata engine) and log files to line delimited json files.
- Put converted files on the S3 Connector
- For metadata of 1 million objects: s3a://onemillion/stuff (about 100 files totalling 823 MB)
- For metadata of 10 million objects: s3a://gobigger/stuff (about 1,000 files totalling 8.5 GB)
- For metadata of 100 million objects: s3a://gobiggest/stuff (about 8,043 files totalling 79 GB)
- Specified executor memory for each worker at 40 GB when 1 worker, 14 GB when 5 workers (except used 40g for the 100 million workload), and 25 GB when 10 workers (2 on the same servers, not 10 independent servers).
-
./bin/spark-shell --master spark://master:7077 --executor-memory Xg
To test load time:
- Call json.read with proper s3a path, provide schema and call cache.
- Do one sql query and call show so have action that will trigger actual read.
- Note that need some action so data will actually be read. Did a little testing and calling count() as the action seemed to be similar time as calling a query and show so did test with query and show.
- Command:
val t0 = System.currentTimeMillis() val translatedDF = spark.read.schema(sstSchema).json("s3a://BUCKETNAME/stuff").cache() translatedDF.createOrReplaceTempView("translatedMD") val getcertain = spark.sql("SELECT * FROM translatedMD WHERE userMd['x-amz-meta-someotherkey']='someOtherMetadatapartof100millionput977927'") getcertain.show(10, false) val t1 = System.currentTimeMillis() println("Elapsed time: " + (t1 - t0) + "ms") translatedDF.unpersist()
To test query time:
- Do sql query and call show.
- Command:
var x = 0 for( x <- 1 to 100 ){ val t2 = System.currentTimeMillis() val getcertain2 = spark.sql("SELECT * FROM translatedMD WHERE userMd['x-amz-meta-someotherkey']='someOtherMetadatapartof100millionput977926'") getcertain2.show(10, true) val t3 = System.currentTimeMillis() println("Elapsed time: " + (t3 - t2) + "ms") }
Caveats:
- The sst and log files are pre-translated to line delimited json so time to do that work is not accounted for.
- We are not filtering for duplicates based on incrementer so time to do that work (which will likely involve a shuffle) is not accounted for.
- If lose workers when loading data, subsequent query will take longer. For instance, with 100 million objects, lost 2 workers during read. Subsequent query then took around 60 seconds.
- Numbers reflect accessing data through one s3 instance.
Results
Metadata for 1 million objects (all numbers are average of 10 runs)
1 worker | 5 workers | 5 workers parquet | 10 workers | |
Read time in sec | 14.24 sec | 15.59 sec | 12.66 sec | 17.63 sec |
Query time in ms | 260 ms | 197 ms | 196 ms | 258 ms |
Memory used on worker for read | 4.82 GB | 1.66 GB | 1.23 GB | 1.15 GB |
Memory used on master for read | < 1 GB | 1.14 GB | 1.11 GB | 1.21 GB |
Metadata for 10 million objects (all numbers are average of 10 runs)
Note that the read times may be artificially low here since the initial read upon starting the spark shell takes longer. The initial read upon starting the spark shell was 44 seconds (1 worker), 41 seconds (5 workers) and 52 seconds (10 workers). For the tests of 1 million and 100 million, the numbers all reflect a read upon starting the Spark shell.
1 worker | 5 workers | 5 workers Parquet | 10 workers | |
Read time in sec | 34.1 sec | 30.42 sec | 18.507 sec | 37.76 sec |
Query time in ms | 575 ms | 376 ms | 342 ms | 360 ms |
Memory used on worker for read | 20.09 GB | 6.43 GB | 5.11 GB | 5.97 GB |
Memory used on master for read | < 1 GB | 1.21 GB | 1.47 GB | 1.3 GB |
Metadata for 100 million objects (all numbers are average of 10 runs)
1 worker | 5 workers | 5 workers parquet | 10 workers | |
Read time in sec | 348.6 sec
(5.81 min) |
184.02 sec
(3.07 min) |
73.842 sec (1.23 min) | 281.4 sec
(4.69 min) |
Query time in ms | 72,110 ms
(72.110 sec) |
1,745 ms
(1.745 sec) |
1,784 ms
(1.784 sec) |
2,165 ms
(2.165 sec) |
Memory used on worker for read | 36.76 GB | 24.37 GB | 23.86 GB | 14.03 GB |
Memory used on master for read | < 1 GB | 1.18 GB | 1.13 GB | 1.28 GB |
Conclusions
These numbers convinced us that we were on the wrong path and the team needed to go back to the drafting board. The tests showed that Spark needed a lot of resources to function properly in our scenario, which forced a very high latency due to the need to reload all parquet files every time there was a change. We realized that we were trying to force Spark into an unnatural role, so we switched Zenko to a new design based on MongoDB.