Exploring the Oscars with d3, Cassandra and the command line

This post is by Jerry Chen, our Lead Engineer. 

Here at TweetReach we love data. But what we love more is making data understandable, useful and maybe even a little bit fun. When we saw all the amazing visualizations people have done with the d3.js library, we were inspired to do something with the millions of tweets that flow through our system every day. Fortunately, this amazing data-driven JavaScript framework does most of the heavy lifting and fluently speaks SVG and CSS. As a proof of concept, we put something together for the Grammys. It was a good first step, but we knew we could do much better for the Oscars.

And of course, we did! Check out the TweetReach Academy Awards Explorer.

On the other end of the stack, we were revisiting Apache Cassandra. Since last we took a look at the datastore, it graduated from Incubator, got counters, hit a 1.0 version milestone, and continued to capture the hearts (and columns) of millions. We knew our chart data would be broken down by a time component, so this project would be a great fit for Cassandra.

After a few sketches about what to show and how to show it, we decided to capture tweets containing any mention of the Oscars, and then break them down by a few categories and nominees. For each minute we would measure the volume about a particular nominee, and provide a slider so the user could view the exact volume at a particular minute in time.

Academy Awards Explorer Whiteboard
The Starting Point

But first, how is datta formed?

From the beginning

Our journey begins, as with many things on the Internet, with text. We wrote Flamingo to consume the Twitter Streaming API (and later on, Gnip PowerTrack). Incoming tweets get appended to an event log, and optionally resque jobs are scheduled based on subscriptions. Normally, we use the latter for our larger pipeline (which includes search, OLAP, contributor and reach calculations), but for this special project we fork the events log and stream it to a separate server.

For moving log files around, there’s Apache Flume, Facebook Scribe, and maybe even time-tested syslog (here’s a great post by Urban Airship), but in the spirit of getting the job done, we can get away with tailing over SSH (and maybe wrapping that in a screen session):

    nibbler$ tail -F /var/log/flamingod/events.log 
             | pv -l 
             | ssh -C parabox 'cat - >> /var/log/events.log'


(We use the capital -F flag for tail so to follow symlinks even if their destination changes, and pv is a great utility which will be explained shortly.) Meanwhile, on the destination server, we employ tail again and stream the events log into a ruby script which reads from STDIN, for the actual data insertion into Cassandra.

The schema is simple. For each tweet, we see if there are any matching terms. If there are matching terms, we extract the timestamp of the tweet, get it into its minute-resolution “time bucket” format (YYYYMMDDHHmm) and insert it into Cassandra. The schema ends up like this:

Optionally, we keep the available time buckets in a special super column called “index.” This is preferable to trying to list all the super columns under the row key. Thus, using the Ruby cassandra gem, an insertion looks like the following:



where i64() is a function that packs 64-bit unsigned integers, which in this case is the tweet ID.

To get the volume at a given minute, count the columns:

    >> client.count_columns(:volume,"hugo","201202241201",:count=>MAX_COLUMNS)


The default :count is 100, so if we have a magnitude greater than 100, it’ll get capped. I’ve set MAX_COLUMNS to something high like 999999.

Streaming Insertions with Ruby

The actual processing task is straightforward, but the script is optimized to do the least amount of work possible. This is the key to high-throughput: don’t waste your time and if you can correctly get away with skipping a line, get away with it. Based on the nominees/terms we’re filtering out, we define the group of regular expressions to match against, and then combine them, e.g. [/hugo/, /artist/] becomes /hugo|artist/. Using the group regular expression as a first pass means not having to parse JSON unless we absolutely must.

The crux of the code uses tweet.created_at (e.g., "Thu Mar 06 10:26:58 +0000 2008") to determine the time bucket, e.g. "200803061026". Since consecutive tweets are likely to be close in time, and perhaps in the same time bucket, we take the substring of created_at timestamp up to the minute and memoize the time bucket. In other words, if both the current and last tweets had created_at strings beginning with "Thu Mar 06 10:26", then skip parsing the timestamp and reuse the last time bucket. While this may seem like a micro-optimization, it’s with this mindset that we can maintain a processing rate of hundreds of tweets per second.

How do we measure performance? We could use Ruby’s Benchmark module and measure timing between various points. For a larger picture by way of throughput, we write the insertion script to consume STDIN and combine use the incredibly handy utility called Pipe Viewer, which provides information like throughput about anything that’s being piped:

    $ pv -l event.log | ruby insert.rb
    26.3k 0:01:27 [303.4/s ] [===============>              ] 0:01:30


In this example, pv starts off by counting the lines (-l), and then keeps track of lines seen, the duration and the rate. So far, 26k lines have been processed at a rate of 303k/s, and pv estimates about 1m30 left.

It also works in streaming mode, which is how we use it with a live stream of tweets:

    $ tail -F event.log | pv -l | ruby insert.rb
    26.3k 0:01:27 [303.4/s ] [                <=>                   ]


Meeting in the Middle

Once we have the data in Cassandra, how do we get it out and onto a webpage? If we’re in Ruby, a sane stack might be a Sinatra or Rails app that serves well-formatted JSON right from Cassandra. Given the static nature and finite data set of the visualization though, it was easier to write a script to generate JSON — nay, pure JavaScript! — that provided the series data in a global variable.

While JavaScript code generation in Ruby may seem inelegant, sacrilege or downright insane, working with static files meant being able to initially populate the frontend with dummy data, and figure out the format required by d3. In parallel, we determine how to retrieve the correct data from Cassandra, and finally, generate it in the format needed by d3. Luckily we had last year’s dataset to work with as well, which became integral in the testing and sanity check step.

All in all, it was a whirlwind expedition with two great pieces of open source — Cassandra and d3 — the latter of which deserves its own blog post. Cassandra took a hearty portion of memory but barely broke a sweat handling both insertions and queries.

The Finished Product. Click to Try it out.

Oh and by the way, if you want to build visualizations like this or wrangle terabytes of data, we’re hiring!