I previously blogged about a Cassandra anti-pattern:
Distributed
joins. This commonly happens when people move from a relational database to a Cassandra. I'm going to use the same example to show how to use Spark to migrate data that previously required joins into a denormalised model in Cassandra.
So let's start with a simple set of tables in MySQL that store customer event information that references staff members and a store from a different table.
Insert a few rows (or a few million)
Okay so we only have a few rows but imagine we had many millions of customer events and in the order of hundreds of
staff members and stores.
Now let's see how we can migrate it to Cassandra with a few lines of Spark code
:)
Spark has built in support for databases that have a JDBC driver via the
JdbcRDD.
Cassandra has great support for Spark via
DataStax's open source connector. We'll be using
the two together to migrate data from MySQL to Cassandra. Prepare to be shocked how easy this is...
Assuming
you have Spark and the connector on your classpath you'll need these imports:
Then we can create our SparkContext and it also adds the Cassandra methods to the context and to
RDDs.
My MySQL server is running on IP 192.168.10.11 and I am connecting very securely with with user root and password password.
Next we'll create the new Cassandra table, if yours already exists skip this part.
Then it is time for the migration!
We first create an JdbcRDD allowing MySQL to do the join. You need to give Spark a way to partition the MySql table, so you give it a statement with variables in and a starting index and a final index. You also tell Spark how many partitions to split it into, you want this to be greater than the number of cores in your Spark cluster so these can happen concurrently.
Finally we save it to Cassandra. The chances are this migration will be bottle necked by the queries to MySQL. If the Store and Staff table are relatively small it would be worth bringing them completely in to memory, either as an RDD or as an actual map so that MySQL doesn't have to join for every partition.
Assuming your Spark workers are running on the same servers as your Cassandra nodes the partitions will be spread out and inserted locally to every node in your cluster.
This will obviously hammer the MySQL server so beware :)
The full source file is on
Github.