This post is by Jerry Chen, our Lead Engineer.
And of course, we did! Check out the TweetReach Academy Awards Explorer.
On the other end of the stack, we were revisiting Apache Cassandra. Since last we took a look at the datastore, it graduated from Incubator, got counters, hit a 1.0 version milestone, and continued to capture the hearts (and columns) of millions. We knew our chart data would be broken down by a time component, so this project would be a great fit for Cassandra.
After a few sketches about what to show and how to show it, we decided to capture tweets containing any mention of the Oscars, and then break them down by a few categories and nominees. For each minute we would measure the volume about a particular nominee, and provide a slider so the user could view the exact volume at a particular minute in time.
But first, how is datta formed?
From the beginning
Our journey begins, as with many things on the Internet, with text. We wrote Flamingo to consume the Twitter Streaming API (and later on, Gnip PowerTrack). Incoming tweets get appended to an event log, and optionally resque jobs are scheduled based on subscriptions. Normally, we use the latter for our larger pipeline (which includes search, OLAP, contributor and reach calculations), but for this special project we fork the events log and stream it to a separate server.
For moving log files around, there’s Apache Flume, Facebook Scribe, and maybe even time-tested syslog (here’s a great post by Urban Airship), but in the spirit of getting the job done, we can get away with tailing over SSH (and maybe wrapping that in a
nibbler$ tail -F /var/log/flamingod/events.log | pv -l | ssh -C parabox 'cat - >> /var/log/events.log'
(We use the capital
-F flag for
tail so to follow symlinks even if their destination changes, and
pv is a great utility which will be explained shortly.) Meanwhile, on the destination server, we employ
tail again and stream the events log into a ruby script which reads from
STDIN, for the actual data insertion into Cassandra.
The schema is simple. For each tweet, we see if there are any matching terms. If there are matching terms, we extract the timestamp of the tweet, get it into its minute-resolution “time bucket” format (
YYYYMMDDHHmm) and insert it into Cassandra. The schema ends up like this:
Optionally, we keep the available time buckets in a special super column called “index.” This is preferable to trying to list all the super columns under the row key. Thus, using the Ruby cassandra gem, an insertion looks like the following:
i64() is a function that packs 64-bit unsigned integers, which in this case is the tweet ID.
To get the volume at a given minute, count the columns:
>> client.count_columns(:volume,"hugo","201202241201",:count=>MAX_COLUMNS) 305
:count is 100, so if we have a magnitude greater than 100, it’ll get capped. I’ve set
MAX_COLUMNS to something high like
Streaming Insertions with Ruby
The actual processing task is straightforward, but the script is optimized to do the least amount of work possible. This is the key to high-throughput: don’t waste your time and if you can correctly get away with skipping a line, get away with it. Based on the nominees/terms we’re filtering out, we define the group of regular expressions to match against, and then combine them, e.g.
[/hugo/, /artist/] becomes
/hugo|artist/. Using the group regular expression as a first pass means not having to parse JSON unless we absolutely must.
The crux of the code uses
"Thu Mar 06 10:26:58 +0000 2008") to determine the time bucket, e.g.
"200803061026". Since consecutive tweets are likely to be close in time, and perhaps in the same time bucket, we take the substring of
created_at timestamp up to the minute and memoize the time bucket. In other words, if both the current and last tweets had
created_at strings beginning with
"Thu Mar 06 10:26", then skip parsing the timestamp and reuse the last time bucket. While this may seem like a micro-optimization, it’s with this mindset that we can maintain a processing rate of hundreds of tweets per second.
How do we measure performance? We could use Ruby’s Benchmark module and measure timing between various points. For a larger picture by way of throughput, we write the insertion script to consume
STDIN and combine use the incredibly handy utility called Pipe Viewer, which provides information like throughput about anything that’s being piped:
$ pv -l event.log | ruby insert.rb 26.3k 0:01:27 [303.4/s ] [===============> ] 0:01:30
In this example,
pv starts off by counting the lines (
-l), and then keeps track of lines seen, the duration and the rate. So far, 26k lines have been processed at a rate of 303k/s, and
pv estimates about 1m30 left.
It also works in streaming mode, which is how we use it with a live stream of tweets:
$ tail -F event.log | pv -l | ruby insert.rb 26.3k 0:01:27 [303.4/s ] [ <=> ]
Meeting in the Middle
All in all, it was a whirlwind expedition with two great pieces of open source — Cassandra and d3 — the latter of which deserves its own blog post. Cassandra took a hearty portion of memory but barely broke a sweat handling both insertions and queries.
Oh and by the way, if you want to build visualizations like this or wrangle terabytes of data, we’re hiring!