blob: 408e95b12e333d0a20d407c74431c9fd5d105a8c [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package simplemr provides a simple map reduce framework for use by
// commandline and other tools and consequently can only be used from
// within a single process. It is specifically not intended to support
// large datasets, but mappers are run concurrently so that long running
// tasks (e.g. external shell commands will be run in parallel). The
// current implementation supoorts only a single reducer however future
// implementations are likely to run multiple reducers and hence reducers
// should be coded accordingly.
package simplemr
import (
"errors"
"fmt"
"runtime"
"sort"
"sync"
"time"
)
var ErrMRCancelled = errors.New("MR cancelled")
// Mapper is in the interface that must be implemented by all mappers.
type Mapper interface {
// Map is called by the framework for every key, value pair read
// from the specified input.
Map(mr *MR, key string, value interface{}) error
}
// Reducer is the interface that must be implemented by the reducer.
type Reducer interface {
// Reduce is called by the framework for every key and associated
// values that are emitted by the Mappers.
Reduce(mr *MR, key string, values []interface{}) error
}
// Record represents all input and output data.
type Record struct {
Key string
Values []interface{}
}
type store struct {
sync.Mutex
data map[string][]interface{}
}
func newStore() *store {
return &store{data: make(map[string][]interface{})}
}
func (s *store) sortedKeys() []string {
s.Lock()
defer s.Unlock()
keys := make([]string, 0, len(s.data))
for k := range s.data {
keys = append(keys, k)
}
sort.Strings(keys)
return keys
}
func (s *store) insert(k string, v ...interface{}) {
s.Lock()
defer s.Unlock()
s.data[k] = append(s.data[k], v...)
}
func (s *store) lookup(k string) []interface{} {
s.Lock()
defer s.Unlock()
return s.data[k]
}
// MR represents the Map Reduction.
type MR struct {
input <-chan *Record
output chan<- *Record
cancel chan struct{}
cancelled bool
cancelled_mu sync.RWMutex // guards cancelled
err error
err_mu sync.RWMutex // guards err
data *store
// The number of conccurent mappers to use. A value of 0 instructs
// the implementation to use an appropriate number, such as the number
// of available CPUs.
NumMappers int
// The time to wait for the map reduce to complete. A value of 0 implies
// no timeout - i.e. an infinite wait.
Timeout time.Duration
}
// Error returns any error that was returned by the Run method. It is
// safe to read its value once the output channel passed to Run has been
// closed.
func (mr *MR) Error() error {
mr.err_mu.RLock()
defer mr.err_mu.RUnlock()
return mr.err
}
// MapOut outputs the key and associated values for subsequent
// processing by a Reducer. It should only be called from a mapper.
func (mr *MR) MapOut(key string, values ...interface{}) {
mr.data.insert(key, values...)
}
// ReduceOut outputs the key and associated values to the specified output
// stream. It should only be called from a reducer.
func (mr *MR) ReduceOut(key string, values ...interface{}) {
mr.output <- &Record{key, values}
}
// CancelCh returns a channel that will be closed when the Cancel
// method is called. It should only be called by a mapper or reducer.
func (mr *MR) CancelCh() <-chan struct{} {
return mr.cancel
}
// Cancel closes the channel intended to be used for monitoring
// cancellation requests. If Cancel is called before any reducers
// have been run then no reducers will be run. It can only be called
// after mr.Run has been called, generally by a mapper or a reducer.
func (mr *MR) Cancel() {
mr.cancelled_mu.Lock()
defer mr.cancelled_mu.Unlock()
if mr.cancelled {
return
}
close(mr.cancel)
mr.cancelled = true
}
// IsCancelled returns true if this MR has been cancelled.
func (mr *MR) IsCancelled() bool {
mr.cancelled_mu.RLock()
defer mr.cancelled_mu.RUnlock()
return mr.cancelled
}
func (mr *MR) runMapper(ch chan error, mapper Mapper) {
for {
rec := <-mr.input
if rec == nil {
ch <- nil
return
}
for _, v := range rec.Values {
if err := mapper.Map(mr, rec.Key, v); err != nil {
ch <- err
return
}
}
}
}
func (mr *MR) runMappers(mapper Mapper, timeout <-chan time.Time) error {
ch := make(chan error, mr.NumMappers)
for i := 0; i < mr.NumMappers; i++ {
go mr.runMapper(ch, mapper)
}
done := 0
for {
select {
case err := <-ch:
if err != nil {
// We should probably drain the channel.
return err
}
done++
if done == mr.NumMappers {
return nil
}
case <-mr.cancel:
return ErrMRCancelled
case <-timeout:
return fmt.Errorf("timed out mappers after %s", mr.Timeout)
}
}
}
func (mr *MR) runReducers(reducer Reducer, timeout <-chan time.Time) error {
ch := make(chan error, 1)
go func() {
for _, k := range mr.data.sortedKeys() {
v := mr.data.lookup(k)
if err := reducer.Reduce(mr, k, v); err != nil {
ch <- err
}
}
close(ch)
}()
var err error
select {
case err = <-ch:
case <-timeout:
err = fmt.Errorf("timed out reducers after %s", mr.Timeout)
}
return err
}
// Run runs the map reduction using the supplied mapper and reducer reading
// from input and writing to output. The caller must close the input channel
// when there is no more input data. The implementation of Run will close
// the output channel when the Reducer has processed all intermediate data.
// Run may only be called once per MR receiver.
func (mr *MR) Run(input <-chan *Record, output chan<- *Record, mapper Mapper, reducer Reducer) error {
mr.input, mr.output, mr.data = input, output, newStore()
mr.cancel = make(chan struct{})
if mr.NumMappers == 0 {
// TODO(cnicolaou,toddw): consider using a new goroutine
// for every input record rather than fixing concurrency like
// this. Maybe an another option is to use the capacity of the
// input channel.
mr.NumMappers = runtime.NumCPU()
}
var timeout <-chan time.Time
if mr.Timeout > 0 {
timeout = time.After(mr.Timeout)
}
defer close(mr.output)
if err := mr.runMappers(mapper, timeout); err != nil {
mr.err_mu.Lock()
mr.err = err
mr.err_mu.Unlock()
return err
}
if mr.IsCancelled() {
return ErrMRCancelled
}
err := mr.runReducers(reducer, timeout)
mr.err_mu.Lock()
mr.err = err
mr.err_mu.Unlock()
if mr.IsCancelled() {
return ErrMRCancelled
}
return err
}