Wide Matcher

I had a sudden need to find things, so I went searching – and likely a tad overboard this weekend.

I’ve actually spent most evenings in the past couple of weeks reading books, watching long overdue TV and doing other non-technological things. A list of said books will eventually pop up here (it’s been a good while since I posted book reviews), and who knows, I might even dish out some TV critique1, but on the whole I’ve been using leisure time as I otherwise should.

Until, of course, I found myself optimising a sort of Wide Finder “benchmark” (actually, a way to process a real dataset and count occurrences of specific patterns in it) using multi-threaded2 until 1AM yesterday.

Wheeeeeeeeeee!

The equivalent bit of followed suite after breakfast today, and the parallel version is churning its way across the dataset as I type this, and I’m mostly pleased with the results – aside from it taking up 8GB of RAM and most CPU cores while running.

was slower (PyPy was a tad faster, but not by much), but didn’t drive like a semi and leave a smell of burning rubber, so to speak.

Still, the code is pretty similar. Here’s a simplified version of what I’m doing (the hairy lambda parses groups of items in a line, and match_fun is a more complex function that does the actual matching):

counters = defaultdict(int)
def consume(filename):
   groups      = (set(map(lambda x: x.strip()[1:-1],line.strip()[1:-1].split(','))) for line in gzip.open(filename,'r'))
   interesting = (items for items in groups if len(items) > 1)
   for items in interesting:
       for p in patterns:
           if match_fun(p, items):
               counters[','.join(sorted(items))] += 1
consume('dataset.gz')
print json.dumps(counters)

…and the equivalent, which has a little more ceremony for outputting results, parsing arguments, etc.:

(defn -main 
 [& args]
 (let [filename (first args)
       counters (atom (transient (zipmap interesting (repeat (int 0)))))]
   (with-open [in (io/reader
                  (java.util.zip.GZIPInputStream.
                  (io/input-stream filename)))]
      (doseq [line (line-seq in)]
         (let [items (distinct (map string-clip (string/split (string-clip (string/trim line)) #",")))]
           (if (> (count items) 1)
              (doseq [p patterns]
                 (if (match_fun p items)
                    (swap! counters assoc! items (inc (@counters items)))))))))
    (println (json/write-str (persistent! @counters) :key-fn #(string/join "," %)))))

As you can see, thinking in terms of generators translates well into lazy seqs (even though the parsing bits are still ugly), and were it not for the trickery with transients and atoms (which give a fair boost in speed and pave the way for transactional access to counters, respectively), the version would arguably be more readable.

The threaded versions, however, are completely different, and the one is far too long to print here, even though it’s quite readable – but the threaded version has only a couple more lines:

(defn -main 
  [& args]
  (let [filename (first args)
        counters (atom (zipmap interesting (repeat (int 0))))]
    (with-open [in (io/reader
                   (java.util.zip.GZIPInputStream.
                   (io/input-stream filename)))]
      (doall (pmap (fn [line]
          (let [items (distinct (map string-clip (string/split (string-clip (string/trim line)) #",")))]
            (if (> (count items) 1)
               (doseq [p patterns]
                  (if (match_fun p items)
                     (swap! counters assoc items (inc (@counters items)))))))) (line-seq in))))
    (println (json/write-str @counters :key-fn #(string/join "," %)))
    (shutdown-agents)))

The trick is in that (doall (pmap (fn, which wraps the processing bits and unravels the input sequence.

And it seems to be a pretty nice, low-impact way to get a quick speed boost, although I’m a little happier with the threaded version – it has a set of matchers reading chunks from a common record queue and posting results to a reducer via another queue, which lends itself better to running across multiple machines, and looks something like this (@task is part of my threading library, which emulates Celery):

matchers = Queue()
counts = Queue()

@task
def producer(filename):
    global matchers
    groups      = (set(map(lambda x: x.strip()[1:-1],line.strip()[1:-1].split(','))) for line in gzip.open(filename,'r'))
    interesting = chunker((items for items in groups if len(items) > 1))
    for chunks in interesting:
        matchers.put(chunks)

@task
def matcher():
    global matchers, counts
    partials = defaultdict(int)
    i = 0
    while True:
        try:
            chunks = matchers.get()
            for sites in chunks:
                i += 1
                for p in patterns:
                    if match_fun(p, items):
                        partials[','.join(sorted(items))] += 1
                if i % chunk_size == 0:
                    counts.put(partials)
                    partials = defaultdict(int)
            matchers.task_done()
        except Empty:
            break
    counts.put(partials)

# the counter simply adds up all the partials as they arrive from all the matchers, and that's too boring to clean up and paste here

producer.delay('dataset.gz')
for _ in range(0,24):
    matcher.delay()
counter.delay()

start()
print json.dumps(counters)

It’s a bit on the long side, I know. But it does work quite well, and it would probably look (and run) even better with Stackless – although I can’t use PyPy for this (yet).

Still, can do better. A whole lot better:

(defn -main 
 [& args]
 (let [filename (first args)
       counters (agent (zipmap interesting (repeat (int 0))))]
   (with-open [in (io/reader
                  (java.util.zip.GZIPInputStream.
                  (io/input-stream filename)))]
      (doseq [line (line-seq in)]
         (let [items (distinct (map string-clip (string/split (string-clip (string/trim line)) #",")))]
           (if (> (count items) 1)
              (doseq [pat patterns]
                (send counters 
                   (fn [a p i] 
                     (if (match_fun p i)
                       (update-in a [g] inc)
                       a)) pat items))
              (await counters)))))
    (println (json/write-str @counters :key-fn #(string/join "," %)))))

As it turns out, using agents is way faster, although it tends to do garbage collection every now and then.

But the main point here is that I’m getting into the ways of (barring the usual amount of stupid newbie mistakes), and I’ll certainly have a go at re-implementing this a few times in a couple of different ways (agents and core.async spring to mind, although it will take me a while yet to get to grips with those). That and re-casting this into Cascalog somewhat later on.

In the meantime, if you’re curious about doing this kind of thing in , I wholeheartedly recommend this post on optimising Wide Finder 2, which I suspect I’ll be re-visiting in the future for further hints.

And now, back to a good book.


  1. No, I’m not watching Breaking Bad. I might get around to it by 2020 or so, at this rate. ↩︎

  2. And yes, I wanted to use threads, and this was a situation where the global interpreter lock was a mere nuisance. ↩︎

This page is referenced in: