Friday, July 18, 2014

Clustering Medical Procedure Codes with Scalding


A colleague pointed out that there exists an inverse use-case to finding outliers in medical claims. This one is to group procedure codes into clusters, or what Health Insurance companies call Episode Treatment Groups (ETG). Essentially an ETG is a way to cluster a group of services (procedures) into a medically relevant unit.

The CMS.gov dataset provides slightly under 16 million anonymized outpatient claims for Medicare/Medicaid patients. Each outpatient record can have upto 6 ICD-9 procedure codes, upto 10 ICD-9 diagnosis codes and upto 45 HCPCS codes. So just like the outlier case, we can derive a measure of similarity between a pair of codes as the average co-occurrence within claims across the dataset.

I decided to use a variant of the DBSCAN clustering algorithm. This post provides some tips on how to implement DBSCAN in a distributed manner - I used the ideas in this post to develop my implementation. The intuition behind my clustering algorithm goes something like this.

We calculate the similarity sAB between a pair of codes A and B as the number of times they co-occur in the corpus. Clustering algorithms need a distance measure, so we treat the distance dAB as the reciprocal of their similarity, ie 1/sAB. The DBSCAN clustering algorithm works by selecting other points around each point that are within a specified distance ε from each other. Candidate cluster centroids are those that have at least MinPoints codes within this distance ε. My algorithm deviates from DBSCAN at this point - instead of finding density-reachable codes I just find the Top-N densest clusters. Density is calculated as the number of codes within a circular area of the mean radius, i.e. N2 / πΣi=0..Nd2. We then calculate the top N densest code clusters - these are our derived ETGs.

The Scalding code below does just this. We simplify a bit by not calculating using some constants such as π but otherwise the code is quite faithful to the algorithm described above.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
// Source: src/main/scala/com/mycompany/cmspp/cluster/CodeCluster.scala
package com.mycompany.cmspp.clusters

import com.twitter.scalding.Job
import com.twitter.scalding.Args
import com.twitter.scalding.TextLine
import com.twitter.scalding.Tsv
import scala.io.Source

class CodeCluster(args: Args) extends Job(args) {

  def extractPairs(line: String): List[(String,String)] = {
    val cols = line.split(",").toList
    val codes = (cols.slice(22, 27)  // ICD9 procedure code cols
      .map(x => if (x.isEmpty) x else "ICD9:" + x) 
      ::: cols.slice(31, 75)         // HCPCS (CPT4) procedure cols
      .map(x => if (x.isEmpty) x else "HCPCS:" + x))          
      .filter(x => (! x.isEmpty))
    val cjoin = for {codeA <- codes; codeB <- codes} yield (codeA, codeB)
    cjoin.filter(x => x._1 < x._2)
  }

  val Epsilon = args("epsilon").toDouble
  val MinPoints = args("minpoints").toInt
  val NumClusters = args("nclusters").toInt
  
  val output = Tsv(args("output"))

  val dists = TextLine(args("input"))
    .read
    // compute pair-wise distances between procedure codes
    .flatMapTo('line -> ('codeA, 'codeB)) { line: String => extractPairs(line) }
    .groupBy('codeA, 'codeB) { group => group.size('sim) }
    .map('sim -> 'radius) { x: Int => (1.0D / x) }
    .discard('sim)
    // group by codeA and retain only records which are within epsilon distance
    .groupBy('codeA) { group => group.sortBy('radius).reverse }
    .filter('radius) { x: Double => x < Epsilon }
    
  val codeCounts = dists
    .groupBy('codeA) { group => 
      group.sizeAveStdev('radius -> ('count, 'mean, 'std)) 
    }
    // only retain codes that have at least MinPoints points within Epsilon
    .filter('count) { x: Int => x > MinPoints }
    .discard('std)

  val densities = dists.joinWithSmaller(('codeA -> 'codeA), codeCounts)
    .map(('mean, 'count) -> 'density) { x: (Double,Int) => 
      1.0D * Math.pow(x._2, 2) / Math.pow(x._1, 2)
    }
    .discard('radius, 'count)
    
  // sort the result by density descending and find the top N clusters
  val densestCodes = densities.groupAll { group => 
      group.sortBy('density).reverse }
    .unique('codeA)
    .limit(NumClusters)
    
  // join code densities with densest codes to find final clusters
  densities.joinWithTiny(('codeA -> 'codeA), densestCodes)
    .groupBy('codeA) { group => group.mkString('codeB, ",")}
    .write(output)
}

object CodeCluster {
  def main(args: Array[String]): Unit = {
    // populate redis cache
    new CodeCluster(Args(List(
      "--local", "",
      "--epsilon", "0.3",
      "--minpoints", "10",
      "--nclusters", "10",
      "--input", "data/outpatient_claims.csv",
      "--output", "data/clusters.csv"
    ))).run
    Source.fromFile("data/clusters.csv")
      .getLines()
      .foreach(Console.println(_))
  }
}

I ran this locally with 1 million claims (out of the 16 million claims in my dataset) and got results like this:

1
 2
 3
 4
 5
 6
 7
 8
 9
10
HCPCS:00142    HCPCS:85025,  HCPCS:36415, HCPCS:93005, HCPCS:80053, ...
HCPCS:00300    HCPCS:85025,  HCPCS:80053, HCPCS:36415, HCPCS:99284, ...
HCPCS:00400    HCPCS:85025,  HCPCS:93005, HCPCS:80053, HCPCS:36415, ...
HCPCS:00532    HCPCS:85025,  HCPCS:36415, HCPCS:80048, HCPCS:G0378, ...
HCPCS:0073T    HCPCS:77417,  HCPCS:77336, HCPCS:77427, HCPCS:77280, ...
HCPCS:00740    HCPCS:85025,  HCPCS:36415, HCPCS:93005, HCPCS:85610, ...
HCPCS:00750    HCPCS:36415,  HCPCS:85025, HCPCS:85610, HCPCS:J3010, ...
HCPCS:00790    HCPCS:36415,  HCPCS:85025, HCPCS:80048, HCPCS:80053, ...
HCPCS:00810    HCPCS:36415,  HCPCS:85025, HCPCS:93005, HCPCS:80053, ...
HCPCS:00830    HCPCS:36415,  HCPCS:85025, HCPCS:80048, HCPCS:93005, ...

[Edit: 07/22/2014: This approach does not produce clusters. Notice in the data the same HCPCS:85025 is part of the first 3 clusters, which is obviously not wanted. I will implement the last part of the DBSCAN algorithm and update this page when I am done.]

And thats all I have for today. I'd like to point out a new book on Scalding, Programming MapReduce with Scalding by Antonios Chalkiopoulos. I was quite impressed by this book, you can read my review on Amazon if you are interested.

4 comments (moderated to prevent spam):

Anonymous said...

Great post, why are you using the reciprocal of the similarity as a distance measure? Can you provide any more information on this approach? Why could you not just use the similarity / co occurence count directly?

Sujit Pal said...

Thank you. I am using the reciprocal because DBScan needs a distance measure to cluster, so it would try to cluster points with smaller distances between them. Similarity or co-occurrence count would be higher the closer the points are, so it wouldn't work.

RamA said...

Hi, have you updated this algorithm please (as you indicated at the bottom)? Thank you

Sujit Pal said...

Unfortunately no, I guess I got distracted by the next shiny object, sorry about that. But now that you point it out, I still don't know how to implement a distributed clustering algorithm from scratch (Spark ML and possibly other frameworks offer OOB approaches which I would most likely use) but might be something worth looking into again. Thanks for jogging my memory.