Java's Stream implementation with Go Generics

June 25, 2020

The Go team recently released a new post in the Go blog called «The Next Step in Generics» and updated the design draft on «Type Parameters» (a long one).

In the current post I’m going to describe how to port Java’s Stream to Go 2 Generics.

Let’s make it straightforward — define the Stream:

type Stream(type Type) struct {
	slice    []Type
	parallel bool
}

Here we declare a new type using type parameter Type with underlying slice []Type.

And let’s have a constructor:

func Of(type Type)(slice []Type) *Stream(Type) {
	return &Stream(Type){
		slice: slice,
	}
}

The parameter (type Type) after name of function and in type declaration is Type Parameter. The type parameter is required whenever we are going to use generics.

Now we can implement core functions of a typical stream:

  • Filter
  • Map
  • Reduce
  • ForEach
  • Collect
  • AnyMatch/AllMatch/NoneMatch

    func (stream *Stream(Type)) Filter(predicate func(Type) bool) *Stream(Type) {
    	filtered := []Type{}
    	for _, item := range stream.slice {
    		if predicate(item) {
    			filtered = append(filtered, item)
    		}
    	}
    	stream.slice = filtered
    	return stream
    }
    
    func Map(type Type, R)(stream *Stream(Type), predicate func(Type) R) *Stream(R) {
    	slice := make([]R, len(stream.slice))
    	for i, item := range stream.slice {
    		slice[i] = predicate(item)
    	}
    	return Of(slice)
    }
    
    func Reduce(type Type, R)(stream *Stream(Type), predicate func(R, Type) R) R {
    	var result R
    	for _, item := range stream.slice {
    		result = predicate(result, item)
    	}
    	return result
    }
    
    func (stream *Stream(Type)) ForEach(predicate func(Type)) *Stream(Type) {
    	for _, item := range stream.slice {
    		predicate(item)
    	}
    	return stream
    }
    

Notice that Map and Reduce functions are not declared on Stream(Type). It’s one of unsolved problems of current generics design draft — parametrized methods are not permitted.

It means that we can have a function that receives type A and returns type B:

func foo(type A, B) (a A) B

But we can’t declare a parametrized method on type Struct(A) like this:

func (s Struct(A)) foo(type B) (a A) B

This design draft does not permit methods to declare type parameters that are specific to the method. The receiver may have type parameters, but the method not add any type parameters. Quote from the updated design draft

It’s time for the cherry on top — Stream(Type).Collect().

The signature of Collect function will be pretty simple:

func Collect(type Type, Col)(
	stream *Stream(Type),
	collector collector.Collector(Type, Col),
) Col

Notice that now we have two type parameters — one for original type that was used for creating a stream and another one for resulting collection.

Here is a interface for a typical collector that consists of four functions:

  • Supply() — a function that creates and returns a new result container.
  • Accumulate() — a function that folds a value into a mutable result container.
  • Combine() — a function that accepts two partial results and merges them. This is going to be used only in parallel mode.
  • Finish() — while in Java this method converts from an intermediate type to the final result type, but in order to save the simplicity of the implementation, it will return the same type.

    type Collector(type Type, Col) interface {
    	Supply() Col
    	Accumulate(Col, Type) Col
    	Finish(Col) Col
    	Combine(Col, Col) Col
    }
    

Now we have the interface, we need an implementation. But since we want to allow developers to add their own implementations without declaring new types, we need a constructor that will accept functions, think of it like http.HandleFunc() which is often used to wrap functions and returns http.Handler.

We will declare a new constructor for Collector(Type, Col) like we did for Stream(Type) that will return unexported implementation of Collector(Type, Col) that will just call the given functions:

func Of(type Type, Col)(
	supply func() Col,
	accumulate func(Col, Type) Col,
	finish func(Col) Col,
	combine func(Col, Col) Col,
) Collector(Type, Col) {
	return impl(Type, Col){
		supply:     supply,
		accumulate: accumulate,
		finish:     finish,
		combine:    combine,
	}
}

Notice that we also have to specify (Type, Col) for Collector even though we specified it later impl(Type, Col).

The impl(Type, Col) type is going to be a holder of functions that implements the Collector(Type, Col) interface:

type impl(type Type, Col) struct {
	supply     func() Col
	accumulate func(Col, Type) Col
	finish     func(Col) Col
	combine    func(Col, Col) Col
}

func (collector impl(Type, Col)) Supply() Col {
	return collector.supply()
}

func (collector impl(Type, Col)) Accumulate(collection Col, item Type) Col {
	return collector.accumulate(collection, item)
}

func (collector impl(Type, Col)) Finish(collection Col) Col {
	if collector.finish == nil {
		return collection
	}
	return collector.finish(collection)
}

func (collector impl(Type, Col)) Combine(collection1, collection2 Col) Col {
	return collector.combine(collection1, collection2)
}

Here we’re good, now let’s declare a simple collector that is going to convert a Type to Map[Key]ValueToMap:

func ToMap(type Type interface{}, Key comparable, Value interface{})(
	mapper func(Type) (Key, Value),
	accumulator func(Value, Value) Value,
) Collector(Type, map[Key]Value) {
	return Of(
		func() map[Key]Value {
			return map[Key]Value{}
		},
		func(table map[Key]Value, item Type) map[Key]Value {
			key, value := mapper(item)
			now, _ := table[key]
			table[key] = accumulator(now, value)
			return table
		},
		func(table map[Key]Value) map[Key]Value {
			return table
		},
		func(table1, table2 map[Key]Value) map[Key]Value {
			for k2, v2 := range table2 {
				v1, _ := table1[k2]
				table1[k2] = accumulator(v1, v2)
			}
			return table1
		},
	)
}

An interesting thing about this function is that while it has three separated type parameters, it returns Collector(Type, map[Key]Value) that uses only two type parameters.

Note that we need to specify Type Constraint for Key comparable in order to use it as key of a map otherwise it just wouldn’t work.

ToMap expects two functions on input:

  • Mapper func(Type) (Key, Value) — receives an original element and returns two values: first one is going to be used as key of the map, and the second one is going to be used as the value of the map[key]
  • Accumulator func(now Value, update Value) Value — receives current value of map[key] and new value update that have to be somehow accumulated into now.

Let’s see how it will work out on practice:

  1. Declare a simple type that will describe a movie and a score that a user gave to the movie:

    type MovieVote struct {
    	Name  string
    	Score int
    }
    
  2. Create a stream of slice of some movies and user votes:

    items := stream.Of(
    	[]MovieVote{
    		{Name: "A", Score: 1},
    		{Name: "A", Score: 2},
    		{Name: "B", Score: 9},
    		{Name: "B", Score: 10},
    		{Name: "B", Score: 8},
    		{Name: "C", Score: 7},
    		{Name: "C", Score: 8},
    		{Name: "C", Score: 7},
    	},
    )
    
  3. Now let’s convert them into map[Name]SumOfScore:

    result := stream.Collect(
    	items,
    	collector.ToMap(MovieVote, string, int)(
    		func(movie MovieVote) (string, int) {
    			return movie.Name, movie.Score
    		},
    		func(total, score int) int {
    			return total + score
    		},
    	),
    )
    fmt.Println(result)
    
  • The mapper returns name as key and score as value
  • The accumulator sums total and current score

Output:

map[A:3 B:27 C:22]

By the way, this collector also works in Parallel mode, check it out in the source code:

Source Code is available on GitHub: github.com/reconquest/goava

And a big shout-out to Matt Layher for his article about building a hashtable with new Go generics!

Cool stuff, heh!

Start building faster. Today.

Free 30-days trial.