A Random Asynchronous Walk

It’s been since I last wrote anything interesting inside a code editor, but spurred on by a few personal itches I decided I’d have another stab at it - and behold, snippets of code sprung forth, nearly unbidden. Time is as scarce as ever and my current role lends itself more to crossing out and re-drawing things on whiteboards than on coding them from scratch, but nevertheless I’ve been having fun with an unlikely combination of , Cognitive Services and asyncio.

The Problem

It all started out innocuously enough late last year when I needed a sizable corpus of text to try out some NLP techniques and build a small proof of concept - I just grabbed my RSS subscription list, did some brute force fetching, tossed it into DocumentDB, hooked up a Jupyter Notebook to it, and that was it.

Later on, after the meeting was long past, I realized that it would be pretty nice to revisit RSS aggregation from a text mining perspective, and started cleaning things up for kicks. For instance, here’s the smallest OPML feed parser you’ll see all day:

from xml.etree import ElementTree

def feeds_from_opml(filename):
    tree = ElementTree.parse(filename)
    for feed in tree.findall('.//outline'):
        if feed.get('xmlUrl'):
            yield {'title': feed.get('title'),
                   'url': feed.get('xmlUrl')}

[{'title': 'Open Culture', 'url': 'http://feeds2.feedburner.com/OpenCulture'},
    {'title': 'The Kid Should See This.',
    'url': 'http://feeds.feedburner.com/TheKidShouldSeeThis'}]

Going Async

I started out running the whole thing as a single script, but fetching multiple URLs is something best done using multiprocessing, so I started out with that - only to realize that fetcher processes were still getting tied up waiting for responses and that this was an opportunity to take advantage of aiohttp and async/await in 3.x.

As it turned out, moving to aiohttp speeded up things so much that I had to add a semaphore to avoid exhausting TCP connections on my Mac:

async def throttle(sem, session, feed, client, database):
    """Throttle number of simultaneous requests"""

    async with sem:
        res = await fetch_one(session, feed, client, database)
        log.info("%s: %d", res[0]['url'], res[1])

async def fetcher(database):
    """Fetch all the feeds"""

    sem = Semaphore(MAX_CONCURRENT_REQUESTS)

    while True:
        client = await connect_pipeline(connect=ENTRY_PARSER)
        tasks = []
        threshold = datetime.now() - timedelta(seconds=FETCH_INTERVAL)

        async with ClientSession() as session:
            log.info("Beginning run.")
            async for feed in database.feeds.find({}):
                log.debug("Checking %s", feed['url'])
                last_fetched = feed.get('last_fetched', threshold)
                if last_fetched <= threshold:
                    task = ensure_future(throttle(sem, session, feed, client, database))

            responses = gather(*tasks)
            await responses
            log.info("Run complete, sleeping %ds...", CHECK_INTERVAL)
            await sleep(CHECK_INTERVAL)

However, running all this on a single process got old pretty quickly, so I soon had multiple processes fetching, parsing and storing feeds and communicating over aiozmq.

This works fine when you have one or two kinds of workers, but soon enough the next step was obvious - I needed a proper task queue. But I wanted speed and the ability to share arbitrary chunks of data rather than “pure” queueing, so I decided to roll my own atop Redis.

Redis Queues

I love Redis, and I’m flabbergasted at how little people take advantage of it for task queueing and keep thinking it’s just another flavour of memcache.

With a little help from bson, I was soon able to toss just about anything into a queue with a few tiny, straightforward wrappers:

from asyncio import get_event_loop
from aioredis import create_redis
from config import log, REDIS_SERVER, REDIS_NAMESPACE
from json import dumps, loads
from bson import json_util

async def connect_redis(loop=None):
    """Connect to a Redis server"""
    if not loop:
        loop = get_event_loop()
    return await create_redis(REDIS_SERVER.split(':'), loop=loop)

async def enqueue(server, queue_name, data):
    """Enqueue an object in a given redis queue"""
    return await server.rpush(REDIS_NAMESPACE + queue_name, dumps(data, default=json_util.default))

async def dequeue(server, queue_name):
    """Blocking dequeue from Redis"""
    _, data = await server.blpop(REDIS_NAMESPACE + queue_name, 0)
    return loads(data, object_hook=json_util.object_hook)

It bears noting at this point that I had bson handy because I was using the MongoDB protocol to talk to DocumentDB, and that it is doubly useful because I can use it to serialize not just datetime objects in a sensible way, but also (should I want to) toss complete document “trees” into Redis if I really want to.

Which I didn’t - given that only the feed and item parsers did database writes (and that the processing between each write gave the database more than enough breathing room), my write cycles are low enough that passing around document IDs (and having them individually retrieved from the database by each worker) is a tenable approach.

Packaging the Runtime

With inter-process communication sorted, I then began to think about deploying and running the whole thing across multiple machines. Piku was great for deploying via git and iterating quickly, but it’s pretty obvious that I had to consider moving to sooner or later.

And since I’ve been maintaining my own Python images for a while now, I decided to move my development process to docker-compose, which had two interesting side-effects:

  • I moved back from a cloud setting into a laptop-only deployment (using local Redis and MongoDB containers).
  • I spent a good while rebuilding my containers - not just to target 3.6.1 (which I needed so I could yield from inside a coroutine) but also because there are no wheels for musl.

This last bit was the interesting one for me - most of the stuff on my requirements.txt is instantly downloadable as precompiled wheels for glibc, but if you want to use Alpine as a container base, then you have to wait until pip rebuilds the whole shebang locally - which was OK when I was relying solely on external APIs for text processing, but which soon devolved into the old classic when I added NLTK to the mix:

That time is, thankfully, quickly offset by the ease with which I get the whole stack up and running:

!docker-compose up
Starting newsfeedcorpus_db_1
Starting newsfeedcorpus_redis_1
Starting newsfeedcorpus_parser_1
Starting newsfeedcorpus_scheduler_1
Starting newsfeedcorpus_importer_1
Starting newsfeedcorpus_web_1
Starting newsfeedcorpus_fetcher_1
Attaching to newsfeedcorpus_db_1, newsfeedcorpus_redis_1, newsfeedcorpus_scheduler_1, newsfeedcorpus_fetcher_1, newsfeedcorpus_web_1, newsfeedcorpus_parser_1, newsfeedcorpus_importer_1
db_1         | WARNING: no logs are available with the 'none' log driver
redis_1      | WARNING: no logs are available with the 'none' log driver
scheduler_1  | 2017-04-02 19:56:10 INFO Configuration loaded.
fetcher_1    | 2017-04-02 19:56:10 INFO Configuration loaded.
fetcher_1    | 2017-04-02 19:56:10 INFO Beginning run.
web_1        | 2017-04-02 19:56:10 INFO Configuration loaded.
web_1        | 2017-04-02 19:56:10 WARNING cPickle module not found, using pickle
importer_1   | 2017-04-02 19:56:11 INFO Configuration loaded.
parser_1     | 2017-04-02 19:56:11 INFO Configuration loaded.
parser_1     | 2017-04-02 19:56:12 INFO 'pattern' package not found; tag filters are not available for English
newsfeedcorpus_importer_1 exited with code 0
parser_1     | 2017-04-02 19:56:13 INFO Beginning run.
fetcher_1    | 2017-04-02 19:56:15 INFO http://feeds.feedburner.com/TheKidShouldSeeThis: 200
fetcher_1    | 2017-04-02 19:56:15 INFO http://feeds2.feedburner.com/OpenCulture: 200
fetcher_1    | 2017-04-02 19:56:15 INFO http://feedproxy.google.com/taoofmac/full: 200
fetcher_1    | 2017-04-02 19:56:16 INFO http://www.alistapart.com/site/rss: 200
fetcher_1    | 2017-04-02 19:56:16 INFO http://2kindsofpeople.tumblr.com/rss: 200
fetcher_1    | 2017-04-02 19:56:16 INFO http://littlebigdetails.com/rss: 200
fetcher_1    | 2017-04-02 19:56:16 INFO http://feeds.feedburner.com/well-formed_data: 200
fetcher_1    | 2017-04-02 19:56:16 INFO http://css3wizardry.com/feed/: 304
scheduler_1  | 2017-04-02 19:56:16 INFO Run complete, sleeping 3600s...
fetcher_1    | 2017-04-02 19:56:16 INFO https://unsplash.com/rss: 200
fetcher_1    | 2017-04-02 19:56:16 INFO http://blogs.technet.com/b/machinelearning/rss.aspx: 304
newsfeedcorpus_fetcher_1 exited with code 137
newsfeedcorpus_scheduler_1 exited with code 137
newsfeedcorpus_web_1 exited with code 137
newsfeedcorpus_redis_1 exited with code 0
newsfeedcorpus_db_1 exited with code 0

…and scaling it at will:

!docker-compose scale fetcher=4
Creating and starting newsfeedcorpus_fetcher_2 ... 
Creating and starting newsfeedcorpus_fetcher_3 ... 
Creating and starting newsfeedcorpus_fetcher_4 ...
!docker-compose scale fetcher=1
Stopping and removing newsfeedcorpus_fetcher_2 ... 
Stopping and removing newsfeedcorpus_fetcher_3 ... 
Stopping and removing newsfeedcorpus_fetcher_4 ...

Obviously that works for me in Piku as well, but it’s nice to think that I’ll be able to do this across a bunch of machines later on (and I’ve got just the place to do it in).

Doing Actual NLP

Although I’m pretty happy with Cognitive Services, I decided to strip them out from the code for the moment being, for two reasons:

  • I exhausted my personal API request quota over a weekend (duh!)
  • A lot of the stuff I need (like language detection and basic analysis) can be done in-process more efficiently
  • More importantly, I can learn a lot more by doing it myself while using off-the-shelf stuff like NLTK

So it made sense to take care of the low-hanging fruit and leave the toughest things (like natural language understanding and document clustering) to Cognitive Services.

Soon enough I had a couple of basic amenities (like keyword extraction) going, so I started a little module I call langkit:

from langkit import extract_keywords

extract_keywords("""Manually Throttle the Bandwidth of a Linux Network Interface: In complex service oriented application stacks, some bugs only manifest themselves on congested or slow networking interfaces. Consider a web-service running on a generic Linux box with a single networking interface, eth0. If eth0 is busy enough to completely saturate its networking link, a web-service running on the host behind that link may experience odd behavior when things “slowdown”.""", language="en", scores=True)
[('complex service oriented application stacks', 25.0),
 ('link may experience odd behavior', 23.5),
 ('generic linux box', 9.0),
 ('linux network interface', 9.0),
 ('slow networking interfaces', 8.666666666666666),
 ('single networking interface', 8.666666666666666),
 ('networking link', 6.166666666666666),
 ('webservice running', 4.0),
 ('things slowdown', 4.0),
 ('completely saturate', 4.0),
 ('busy enough', 4.0),
 ('manually throttle', 4.0),
 ('host behind', 4.0),
 ('bandwidth', 1.0),
 ('eth0', 1.0),
 ('congested', 1.0),
 ('consider', 1.0),
 ('manifest', 1.0),
 ('bugs', 1.0)]

So far it’s mostly about the little RAKE keyword extractor you can see working above, but I have more plans for it - in particular, I expect to try my hand at doing TF-IDF and document clustering “by hand” once I can find a couple of vacant hours one evening.

In the meantime, given the limited time I have, I decided to go low-brow for a bit and take a little time to start building a minimal web UI and examining what I could do differently with the new web stacks now surfacing around asyncio.

Gotta Go Fast

It didn’t take me much time to get around to trying Sanic, which leverages uvloop to achieve a massively impressive performance of 25.000 requests per second on my (an i5 clocking in at 2.9GHz):

$ wrk http://localhost:8000/test
Running 10s test @ http://localhost:8000/test
  2 threads and 10 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency   435.44us  422.41us  12.70ms   93.40%
    Req/Sec    12.57k     1.70k   15.23k    71.78%
  252720 requests in 10.10s, 29.64MB read
Requests/sec:  25021.80
Transfer/sec:      2.94MB

However, this is where I’ve started getting seriously annoyed at async/await, not just because it has a fair amount of impact in program structure (Sanic is OK, but I miss the simpler structure of Bottle, as well as all the nice stuff I built to use with it over the years), but also because it can be a right pain to remember to await for a future when it’s not immediately obvious that what you’re trying to serve up to the browser isn’t a “normal” result.

The Internet Of Things Intrudes

Prompted by my frustrations with async/await, I decided to take a break and do something a little closer to the real world this weekend.

Since I’m going to be doing some IoT stuff over the coming days, I decided I’d revisit (which I haven’t written in a while) and write a minimal Azure IoT Hub client able to manage device registrations and send device-to-cloud messages (which is the bare minimum you need to get started).

I’ve been playing around with Azure IoT Hub ever since I contriuted a little Python snippet for the docs, but using is a lot more fun, for a number of reasons:

  • I enjoy the relatively low-level abstractions, which make a nice change
  • Encoding and hashing is different enough to be a challenge (and there are very few samples of it down there)
  • The final static binary, with everything baked in and UPX-compressed, weighs in at around 1MB for arm7

Besides, the whole experience has a good feel to it (no need for an IDE, none of the turtles-all-the-way-down abstractions typical of the enterprise stuff I have to wade through on occasion, and raw, blistering speed when running, testing, and iterating).

It’s not going to win any awards for beauty, though:

func buildSasToken(hub *IoTHub, uri string) string {
    timestamp := time.Now().Unix() + int64(tokenValidSecs)
    encodedUri := template.URLQueryEscaper(uri)
    toSign := encodedUri + "\n" + strconv.FormatInt(timestamp, 10)
    binKey, _ := base64.StdEncoding.DecodeString(hub.SharedAccessKey)
    mac := hmac.New(sha256.New, []byte(binKey))
    encodedSignature := template.URLQueryEscaper(base64.StdEncoding.EncodeToString(mac.Sum(nil)))
    return fmt.Sprintf("SharedAccessSignature sr=%s&sig=%s&se=%d&skn=%s", encodedUri, encodedSignature, timestamp, hub.SharedAccessKeyName)

Speaking of beauty in code, all I need now is some time to get back into in earnest, and I can probably get a sense of achievement (and aesthetics) back into my neural pathways, which are sorely worn out with slides and meetings all over the place…