A case of sizing and draining buffered Go channels

Update: Based on several comments posted on the Google+ Go community, I rewrote the solution presented here. See past the Conclusion section.

Recently, I was asked to write a small program for collecting product promotion information for a large collection of products. Given an input file of product identifiers, the following tasks had to be done:

  1. read all identifiers from an input file
  2. for each identifier, fetch product offer information in XML
  3. parse the offer XML into in a DOM document
  4. extract all promotions from the offer document
  5. for each promotion, export its data into a CSV record

A straightforward implementation is to write a function for each task and call them in a nested loop. For this program, I created functions such that the basic flow is quite simple.

xml := fetchSellingOfferXml(id)
doc := parseStringXml(xml)
promos := extractPromotions(doc)
exportPromotion(promo)

Next, I created the loops (reading identifiers and iterating the promotions) and the program was completed.

However, as predicted, some of these tasks took relative more time to execute. For getting product offer information, a Http request was needed to access a remote WebService. The next task, parsing the response in XML representing an offer with promotion information, is also taking some time. Finally, given the requirement of processing 50.000 identifiers, I decided to use Go concurrency here.

Concurreny to the rescue

Again, a straightforward implementation of concurreny in Go is to use goroutines and channels. Channels will be used to communicate input and output data between functions which are called inside goroutines. chaining functions with channels

Because one input (an identifier) can result in zero (no offer) or more promotions and the observation that some functions take more time then others, I decided to use buffered channels. To balance the time between waiting for fetching an offer XML and processing it, I expected to use multiple goroutines for fetching.

The following channels are initialized:

id_channel := make(chan string, CHAN_CAP)
xml_channel := make(chan string, CHAN_CAP)
offer_channel := make(chan *dom.Document, CHAN_CAP)
promotion_channel := make(chan *promotion, CHAN_CAP)

Next, I created new functions to wrap each task function into a loop. For example, the fetchXmlLoop will block read on id-channel. After receiving an identifier, it will call fetchSellingOfferXml. Unless the result is empty (no offer available) the result is send on the xml-channel. Now, my program can setup the chain of channels by spawning goroutines for each loop.

go fetchXmlLoop(id_channel, xml_channel)
go parseXmlLoop(xml_channel, offer_channel)
go extractPromotionsLoop(offer_channel, promotion_channel)
go writePromotionsLoop(promotion_channel, quit_channel)

Real challenges

Up to this point the story is pretty straightforward and easy to understand without getting into details. Now I will discuss the real challenges for this program because it is not complete.

Draining

At one point in time, the program has to terminate gracefully. All identifiers have to be processed and as a result all available promotions have to be exported. Because the program uses multiple buffered channels, the program can only terminate if all channels are drained ; no data can be left unconsumed.

The simple method of counting cannot be used here; for a given identifier there might not be offer information and if such offer information is available then there can be zero or more promotions available. Another method would be polling the length of each channel. Again this method cannot be used here as detecting an empty xml-channel does not imply there is nothing more to receive in the future.

Fortunately, I found a solution which involves the use of a termination value for each channel type. This termination value is sent through the chain of channels “notifying” each goroutine loop to discontinue. After sending a termination value on the next channel it can stop.

For this program, after reading all product identifiers from the file, the terminating identifier (“eof”) is sent on the id-channel. The fetchXmlLoop will receive this value and sends the termination Xml string (“eof”) on the xml-channel. For each channel datatype, a termination value is defined.

no_more_ids = "eof"
no_more_xml = "eof"
no_more_offers = new(dom.Document)
no_more_promotions = new(promotion)

The fetchXmlLoop tests each received identifier against the termination value and acts accordingly.

	func fetchXmlLoop(id_channel chan string, xml_channel chan string) {
		for {
			id := <-id_channel
			if no_more_ids == id {
				xml_channel <- no_more_xml
				break
			} else {
				xml := fetchSellingOfferXml(id)
				if xml != "" { // happens if no offer is found
					xml_channel <- xml
				}
			}
		}
	}

This pattern has been applied to all loop functions as listed above. There is one caveat though. The thread (or goroutine) that runs the main function cannot exit until the termination value has been received by the last channel in the chain. To synchronize this, I introduced the non-buffered quit-channel. After reading all identifiers from the input file, a boolean value is blocked sent on the quit-channel. When the termination value of the promotion-channel (in the writePromotionsLoop) is received, it takes the boolean value from the quit-channel. Now, the program is terminated after all work is done.

	// start reading and feeding the chain
	scanner := bufio.NewScanner(fileIn)
	for scanner.Scan() {
		id_channel <- scanner.Text()
		observer.Observe()
	}
	// send terminating identifier
	id_channel <- no_more_ids

	// and wait for the last channel to be drained
	quit_channel <- true

Sizing

The concurrent program now uses multiple buffered channels and can create multiple goroutines each executing a loop. In general, each channel must be given a capacity and for each loop I need to decide how many goroutines could run in parallel. Even for this simple program, there are potentially 4 + 4 variables to size.

A practical way to find these values, such that the program runs as fast as possible, is to measure the use of each channel. Setting the channel capacity too low, then the sending goroutines will have to wait. Creating too many goroutines for one loop function, then the channel may not provide information fast enough to process.

For this purpose, I created a simple ChannelsObserver object. It runs a goroutine that periodically inspects the length of all registered channels and logs basic statistical (min,man,average,..) information. To compute the statistics, I am using the go-metrics package. Because in Go it is not possible to store different typed channels as values in a single map, I defined the ChannelLengthReporter type.

type ChannelLengthReporter func() int

Now, each channel can be registered on the ChannelsObserver using a closure.

observer := ChannelsObserver{}
observer.Register("id", func() int { return len(id_channel) })
observer.Register("xml", func() int { return len(xml_channel) })
observer.Register("offer", func() int { return len(offer_channel) })
observer.Register("promotion", func() int { return len(promotion_channel) })
timeBetweenLogEntry, _ := time.ParseDuration("5s")
observer.Run(timeBetweenLogEntry)

In the loop for reading identifiers from the input file, I added the observer.Observe() call to measure the current status of all channels. The obsever will log the metrics of all its channels every 5 seconds.

Channel Length Observer

	type ChannelsObserver struct {
		reporters  map[string]ChannelLengthReporter
		histograms map[string]metrics.Histogram
	}

	func (c *ChannelsObserver) Register(name string, reporter ChannelLengthReporter) {
		if c.reporters == nil {
			// lazy init
			c.reporters = make(map[string]ChannelLengthReporter)
			c.histograms = make(map[string]metrics.Histogram)
		}
		c.reporters[name] = reporter
		sample := metrics.NewUniformSample(100)
		c.histograms[name] = metrics.NewHistogram(sample)
		metrics.Register(name, c.histograms[name])
	}

	func (c ChannelsObserver) Observe() {
		for name, reporter := range c.reporters {
			c.histograms[name].Update(int64(reporter()))
		}
	}

	func (c ChannelsObserver) Run(logEvery time.Duration) {
		go metrics.Log(metrics.DefaultRegistry, logEvery, log.New(os.Stderr, "metrics: ", log.Lmicroseconds))
	}

Conclusion

Using goroutines and channels, two of the Go concurrency features, I was able to apply concurrency to this program. Because the program uses buffered channels to decouple from time-varying processing tasks, correctly draining the channels was solved by passing “termination” data. Finally, to size the capacity of each channel and to decide how many goroutines per processing tasks is needed, measurement of channel usage is a valid method.

Update from feedback

First, given the solution described earlier and its measurement results, I observed that only the fetching process is taking significantly more time that any other processing. This is simply due to the network latency involved. Consequently, all but one channel remained empty. The id_channel was always saturated. Using buffering for the channels is not a right solution here.

Second, studying the comments added by several Gophers on the Go Community post, I decided to re-implemented the solution using a pattern suggested and prototyped by Bryan Mills. Basically his advice is “You don’t need buffers for any of these: the goroutines themselves form all the buffer you need”. Using the standard WaitGroup synchronization types, this solution spawns goroutines for the different stages (fetch,parse,export) and make them terminate and close the channels gracefully. I put in extra logging to view the actual flow ; see below the code for an example output.

As can be seen from the code, no buffered channels are used. Each stage is implemented using the same pattern for which a WaitGroup is created, goroutines are created and the cleanup is done using a deferred function call. All channels are consumed using the range operator.


	// spawn 1 goroutine to export promotions
	var promoWG sync.WaitGroup
	defer func() {
		glog.V(1).Info("waiting for the promo group")
		promoWG.Wait()
	}()
	promos := make(chan *promotion)
	promoWG.Add(1)
	go func() {
		defer func() {
			glog.V(1).Infof("promo writer done")
			promoWG.Done()
		}()
		for promo := range promos {
			writePromotion(promo)
		}
	}()

	// waitgroup to synchronize finished goroutines for parsing
	var parseWG sync.WaitGroup
	defer func() {
		glog.V(1).Info("waiting for the parse group")
		parseWG.Wait()
		close(promos)
	}()

	// spawn NumCPU goroutines to process the XML documents
	xml := make(chan string)
	for i := 0; i < runtime.NumCPU(); i++ {
		parseWG.Add(1)
		go func(parser int) {
			defer func(parser int) {
				glog.V(1).Infof("parser %d done", parser)
				parseWG.Done()
			}(parser)
			for each := range xml {
				doc, err := dom.ParseStringXml(each)
				if err == nil {
					extractPromotions(doc, promos)
				} else {
					// log it
				}
			}
		}(i)
	}

	// waitgroup to synchronize finished goroutines for fetching
	var fetchWG sync.WaitGroup
	defer func() {
		glog.V(1).Info("waiting for the fetch group")
		fetchWG.Wait()
		close(xml)
	}()

	// spawn FETCHER_CAP goroutines to process each read identifier
	nFetchers := 0
	scanner := bufio.NewScanner(fileIn)
	id_channel := make(chan string)
	for scanner.Scan() {
		if nFetchers < FETCHER_CAP {
			nFetchers++
			fetchWG.Add(1)
			go func() {
				defer func(fetcher int) {
					glog.V(1).Infof("fetcher %d done", fetcher)
					fetchWG.Done()
				}(nFetchers)
				for id := range id_channel {
					xml <- fetchSellingOfferXml(id)
				}
			}()
		}
		id_channel <- scanner.Text()
	}
	close(id_channel)
	glog.V(1).Info("done scanning")
I1015 11:09:38.632012 06942 getpromos_waitgroup.go:190] fetchSellingOffer: http://offer.services.xxxxx/slo/products/9200000019409664/offer
I1015 11:09:38.632309 06942 getpromos_waitgroup.go:190] fetchSellingOffer: http://offer.services.xxxxx/slo/products/9000000011756074/offer
I1015 11:09:38.632505 06942 getpromos_waitgroup.go:190] fetchSellingOffer: http://offer.services.xxxxx/slo/products/9200000011859583/offer
I1015 11:09:38.632543 06942 getpromos_waitgroup.go:123] done scanning
I1015 11:09:38.632551 06942 getpromos_waitgroup.go:97] waiting for the fetch group
I1015 11:09:38.694256 06942 getpromos_waitgroup.go:112] fetcher 1 done
I1015 11:09:38.694920 06942 getpromos_waitgroup.go:158] extractPromotions
I1015 11:09:38.694950 06942 getpromos_waitgroup.go:140] writePromotion
I1015 11:09:38.700120 06942 getpromos_waitgroup.go:112] fetcher 2 done
I1015 11:09:38.701317 06942 getpromos_waitgroup.go:158] extractPromotions
I1015 11:09:38.701367 06942 getpromos_waitgroup.go:140] writePromotion
I1015 11:09:38.701429 06942 getpromos_waitgroup.go:140] writePromotion
I1015 11:09:38.701535 06942 getpromos_waitgroup.go:112] fetcher 3 done
I1015 11:09:38.702002 06942 getpromos_waitgroup.go:158] extractPromotions
I1015 11:09:38.702019 06942 getpromos_waitgroup.go:69] waiting for the parse group
I1015 11:09:38.702029 06942 getpromos_waitgroup.go:140] writePromotion
I1015 11:09:38.702092 06942 getpromos_waitgroup.go:80] parser 3 done
I1015 11:09:38.702098 06942 getpromos_waitgroup.go:80] parser 4 done
I1015 11:09:38.702102 06942 getpromos_waitgroup.go:80] parser 5 done
I1015 11:09:38.702105 06942 getpromos_waitgroup.go:80] parser 6 done
I1015 11:09:38.702109 06942 getpromos_waitgroup.go:80] parser 7 done
I1015 11:09:38.702113 06942 getpromos_waitgroup.go:80] parser 0 done
I1015 11:09:38.702116 06942 getpromos_waitgroup.go:80] parser 1 done
I1015 11:09:38.702120 06942 getpromos_waitgroup.go:80] parser 2 done
I1015 11:09:38.702124 06942 getpromos_waitgroup.go:51] waiting for the promo group
I1015 11:09:38.702128 06942 getpromos_waitgroup.go:58] promo writer done