diff --git a/.travis.yml b/.travis.yml index 6f14ad5..6b8d8b8 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,7 +1,7 @@ language: go go: - - 1.5.1 + - 1.x notifications: email: @@ -10,11 +10,17 @@ notifications: on_success: change on_failure: change +env: + global: + - GO111MODULE=on + #COVERALLS_TOKEN + - secure: "Sem13/2A1jUUMrhgC5w8oPaVVKU0iPs7LZ1QqlHjSpCX03WcEvhwUloXoFtKqMjoe1hHRplUE5rO/gquQa+t34i9PSVp00qiLO5eec4M0rJJ/Uz1PG8BEfAlEaiuSZTQ+JM4pNvldc2gbmYQ+dz7Ans5lA0Hv5j5L7A9///O+IJ/3Ob6yhG9YGXx/ohrF0US6PkHwl6+8Vu9iMOLBpnxeV1pRPny5yjVbtvTmAEnQc6ZMLMX0LgwcIuTJIvS8JTJgqyO6vygh0ZxEzmCB1gqf4h4ZkxTn7wxlY9UaH2P7F0agInZLP+Pt6Opg/YvIVreh9gOGy1u6utN8CGkUstkz9W8zuPpxV0PGdhhR7nyWoj1072qPYTk/m/ED5flKsmlW8tfq0HbgcpumWoZov/UoECgWltT9nQb8XVHXMQApaNnhh/EThlC0/noNfehtJsT/zNEK6fSZK+ZHOqyxalViZsEznTbo7RzpP1qD59+dBWx544PCAghSemLg+XYoEJALn0SocSVpYiW+F1cHhxL4c0imMhHGzzk/id5ZAJ3RLFzOzH8Ker1Ssp8TkCqlHoksv/IkkEW0IQ3O7vfrNPjBCcI2foAPfdv3EivfuI3L5JPgiteQ8LyZfzCO2xTGnCqUoBBcIiM6471aaQtADdnZMcwW3tAzdgrSgXrTWGcobI=" + install: - - ./bin/travis/install.sh + - ./bin/travis/install script: - - ./bin/travis/test_coverage.sh + - ./bin/travis/test-coverage after_success: - - ./bin/coveralls/push.sh + - ./bin/coveralls/push diff --git a/README.md b/README.md index 22fe385..f6da8e5 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,3 @@ # timequeue -timequeue provides a TimeQueue type that releases arbitrary messages at given +timequeue provides a TimeQueue type that releases arbitrary Messages at given time.Times. - -#### Status -[![Build Status](https://travis-ci.org/gogolfing/timequeue.svg?branch=master)](https://travis-ci.org/gogolfing/timequeue) -[![Coverage Status](https://coveralls.io/repos/github/gogolfing/timequeue/badge.svg?branch=master)](https://coveralls.io/github/gogolfing/timequeue?branch=master) -[![Go Report Card](https://goreportcard.com/badge/github.com/gogolfing/timequeue)](https://goreportcard.com/report/github.com/gogolfing/timequeue) - -### Documentation and Usage -Full documentation and examples can be found at [![GoDoc](https://godoc.org/github.com/gogolfing/timequeue?status.svg)](https://godoc.org/github.com/gogolfing/timequeue) diff --git a/bin/bump_version.sh b/bin/bump_version.sh deleted file mode 100755 index 835696e..0000000 --- a/bin/bump_version.sh +++ /dev/null @@ -1,37 +0,0 @@ -#!/bin/sh - -#ensure we are on the master branch. -branch=$(git branch | grep "*" | cut -d " " -f 2) -if [ "$branch" != "master" ]; then - >&2 echo "you must be on master branch to run this command" - exit 1 -fi - -#grab the version argument. -version="$1" - -if [ -z "$version" ]; then - >&2 echo "usage: first argument supplied must be a non-empty version" - exit 2 -fi - -#set tag on new version commit. -git tag -a "$version" -did_tag=$? - -if [ $did_tag -ne 0 ]; then - exit 3 -fi - -#push master branch and new tag to origin. -git push origin master tag "$version" -did_push=$? - -echo "\n" - -if [ $did_push -eq 0 ]; then - echo "Version bump, commit, tag, and push to origin/master successful!!" -else - >&2 echo "failed to push new tag to origin/master" - exit 4 -fi diff --git a/bin/coveralls/push.sh b/bin/coveralls/push similarity index 100% rename from bin/coveralls/push.sh rename to bin/coveralls/push diff --git a/bin/travis/install.sh b/bin/travis/install similarity index 84% rename from bin/travis/install.sh rename to bin/travis/install index 6a2a7de..a6a4829 100755 --- a/bin/travis/install.sh +++ b/bin/travis/install @@ -2,4 +2,5 @@ go get github.com/axw/gocov/gocov go get github.com/ericelsken/goveralls -go get -t ./... + +go get ./... diff --git a/bin/travis/test_coverage.sh b/bin/travis/test-coverage similarity index 100% rename from bin/travis/test_coverage.sh rename to bin/travis/test-coverage diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..1e598c8 --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module github.com/gogolfing/timequeue + +go 1.12 diff --git a/message.go b/message.go index 70f5eb5..95a0fa9 100644 --- a/message.go +++ b/message.go @@ -2,143 +2,169 @@ package timequeue import ( "container/heap" - "fmt" "time" ) -//sentinel value that says a Message is not in a messageHeap. -const notInIndex = -1 +const ( + //indexNotInHeap is a sentinel for a Message.index that indicates the Message + //is not a in a messageHeap. + indexNotInHeap = -1 +) -//Message is a simple holder struct for a time.Time (the time the Message -//will be released from the queue) and a Data payload of type interface{}. -// -//A Message is not safe for modification from multiple go-routines. -//The Time field is used to calculate when the Message should be released from -//a TimeQueue, and thus changing its value while the Message is still referenced -//by a TimeQueue could have unknown side-effects. -//The Data field is never modified by a TimeQueue. +//Message is a container type that associates a Time with some +//arbitrary data. +//A Message is "released" from a TimeQueue as close to Time At as possible. // -//It is up to client code to ensure that Data is always of the same underlying -//type if that is desired. +//Message zero values are not in a valid state. You should use NewMessage to create +//Message instances. type Message struct { - time.Time - Data interface{} + //at is the Time at which to release this Message. + at time.Time + + //data is any arbitrary data that you can put in a Message and retrieve when + //the Message is released. + data interface{} - //reference to the messageHeap that this Message is in. used for removal safety. - mh *messageHeap - //the index of this Message in mh. used to remove a Message from a messageHeap. + //messageHeap is the messageHeap that this Message is in. + //A nil value means that Message is not in a messageHeap. + *messageHeap + + //index is the index at which this Message resides in messageHeap. index int } -//String returns the standard string representation of a struct. -func (m *Message) String() string { - return fmt.Sprintf("&timequeue.Message{%v %v}", m.Time, m.Data) +//NewMessage returns a Message with at and data set on their corresponding fields. +// +//You should use this function to create Messages instead of using a struct initializer. +func NewMessage(at time.Time, data interface{}) *Message { + return &Message{ + at: at, + data: data, + messageHeap: nil, + index: indexNotInHeap, + } } -//messageHeap is a heap.Interface implementation for Messages. -//The peekMessage(), pushMessage(), popMessage(), and removeMessage() methods -//should be used over Push() and Pop() because they provide logic for emprty heaps, -//nil Messages, and removal. -//A messageHeap has no guarantees for correctness if they are not used. -//messageHeap is not safe for use by multiple go-routines. -type messageHeap struct { - messages []*Message +//At returns the Time at which m is scheduled to be released. +func (m *Message) At() time.Time { + return m.at } -//newMessageHeap creates a messageHeap with messages added to the heap. -//heap.Init() is called before the value is returned. -func newMessageHeap() *messageHeap { - mh := &messageHeap{ - messages: []*Message{}, - } - heap.Init(mh) - return mh +//Data returns the data associated with m. +// +//This will usually be used after receiving a Message from a TimeQueue in order +//to process the Message appropriately. +func (m *Message) Data() interface{} { + return m.data } -//Len returns the number of Messages in the heap. -func (mh *messageHeap) Len() int { - return len(mh.messages) +//less returns whether or not m is "less than" other. +//This is used to determined the order in which Messages are released from a TimeQueue. +// +//It returns true if m.at is before other.at. +func (m *Message) less(other *Message) bool { + return m.at.Before(other.at) } -//Less determines whether or not the Message at index i is less than that at index -//j. -//This is determined by the (message at i.Time).Before(message at j.Time). -func (mh *messageHeap) Less(i, j int) bool { - return mh.messages[i].Time.Before(mh.messages[j].Time) +//isHead returns whether or not m is at the head of a messageHeap, i.e. the next +//one to be released. +func (m *Message) isHead() bool { + return m.messageHeap != nil && m.index == 0 } -//Swap swaps the messages at indices i and j. -func (mh *messageHeap) Swap(i, j int) { - mh.messages[i], mh.messages[j] = mh.messages[j], mh.messages[i] - mh.messages[i].index = i - mh.messages[j].index = j +func (m *Message) withoutHeap() Message { + m.messageHeap = nil + m.index = indexNotInHeap + return *m } -//Push is the heap.Interface Push method that adds value to the heap. -//Appends value to the internal slice. -func (mh *messageHeap) Push(value interface{}) { - mh.messages = append(mh.messages, value.(*Message)) +//messageHeap is a slice of Messages with methods that satisfy the heap.Interface. +// +//messageHeaps can be used with the heap package to push and pop Messages ordered +//by Message.less. +// +//messageHeaps are not safe for use by multiple goroutines. +// +//We let Go manage how increasing size and capacity works when appending to a +//messageHeap. +type messageHeap []*Message + +//Len is the heap.Interface implementation. +//It returns len(mh). +func (mh messageHeap) Len() int { + return len(mh) } -//Pop is the heap.Interface Pop method that removes the "smallest" Message from the heap. -func (mh *messageHeap) Pop() interface{} { - n := len(mh.messages) - result := (mh.messages)[n-1] - mh.messages = (mh.messages)[0 : n-1] - return result +//Less is the heap.Interface implementation. +func (mh messageHeap) Less(i, j int) bool { + return mh[i].less(mh[j]) } -//peekMessage returns the "smallest" Message in the heap (without removal) or -//nil if the heap is empty. -func (mh *messageHeap) peekMessage() *Message { - if mh.Len() > 0 { - return mh.messages[0] - } - return nil +//Swap is the heap.Interface implementation. +func (mh messageHeap) Swap(i, j int) { + mh[i], mh[j] = mh[j], mh[i] + mh[i].index = i + mh[j].index = j } -//pushMessageValues creates and adds a Message with values t and data in the -//appropriate index to mh. -//The created message is returned. -func (mh *messageHeap) pushMessageValues(t time.Time, data interface{}) *Message { - message := &Message{ - Time: t, - Data: data, - index: mh.Len(), - mh: mh, - } - heap.Push(mh, message) - return message +//pushMessage is a helper that calls the heap.Push package function with mh and m. +func pushMessage(mh *messageHeap, m *Message) { + heap.Push(mh, m) } -//popMessage returns the "smallest" Message in the heap (after removal) or nil -//if the heap is empty. -func (mh *messageHeap) popMessage() *Message { - if mh.Len() == 0 { - return nil +//Push is the heap.Interface implementation. +func (mh *messageHeap) Push(x interface{}) { + n := len(*mh) + m := x.(*Message) + m.messageHeap, m.index = mh, n + *mh = append(*mh, m) +} + +//popMessage is a helper that calls the heap.Pop package function with mh. +func popMessage(mh *messageHeap) *Message { + return heap.Pop(mh).(*Message) +} + +//Pop is the heap.Interface implementation. +func (mh *messageHeap) Pop() interface{} { + old := *mh + n := len(old) + m := old[n-1] + m.messageHeap, m.index = nil, indexNotInHeap + *mh = old[0 : n-1] + return m +} + +//peek returns the next Message to be released, or nil if mh is empty. +func (mh *messageHeap) peek() *Message { + if mh.Len() > 0 { + return (*mh)[0] } - result := heap.Pop(mh).(*Message) - beforeRemoval(result) - return result + return nil } -//removeMessage removes the message from mh. -//If mh is empty, message is nil, or message is not in mh, then this is a nop -//and returns false. -//Returns true or false indicating whether or not message was actually removed -//from mh. -func (mh *messageHeap) removeMessage(message *Message) bool { - if mh.Len() == 0 || message == nil || message.index == notInIndex || message.mh != mh { +//remove attemps to remove m from mh. +// +//It returns true if m is actually stored in mh and was actually removed, false +//if m is not in mh. +func (mh *messageHeap) remove(m *Message) bool { + if m.messageHeap != mh { return false } - result := heap.Remove(mh, message.index).(*Message) - beforeRemoval(result) + + heap.Remove(mh, m.index) return true } -//beforeRemoval sets the index and mh fields of message to indicate that it is -//no longer in a messageHeap. -func beforeRemoval(message *Message) { - message.index = notInIndex - message.mh = nil +func (mh *messageHeap) drain() []Message { + old := *mh + + result := make([]Message, len(old)) + for i, m := range old { + result[i] = m.withoutHeap() + } + + *mh = old[0:0] + + return result } diff --git a/message_test.go b/message_test.go index 6b88017..89698b8 100644 --- a/message_test.go +++ b/message_test.go @@ -1,222 +1,250 @@ package timequeue import ( - "fmt" + "container/heap" + "reflect" + "sort" "testing" "time" ) -func TestMessage_String(t *testing.T) { +var ( + _ heap.Interface = new(messageHeap) +) + +func TestNewMessage(t *testing.T) { now := time.Now() - message := &Message{now, "test_data", nil, notInIndex} - want := "&timequeue.Message{" + now.String() + " test_data}" - if result := message.String(); result != want { - t.Errorf("message.String() = %v WANT %v", result, want) - } -} + var data interface{} = t.Name() -func TestNewMessageHeap(t *testing.T) { - mh := newMessageHeap() - if mh.messages == nil { - t.Errorf("mh.messages = nil WANT non-nil") + m := NewMessage(now, data) + + if !m.at.Equal(now) { + t.Fatal("at") } - if size := len(mh.messages); size != 0 { - t.Errorf("len(mh.messages) = %v WANT %v", size, 0) + if !reflect.DeepEqual(m.data, data) { + t.Fatal("data") } -} -func TestMessageHeap_Len(t *testing.T) { - tests := []struct { - messages []*Message - result int - }{ - {nil, 0}, - {[]*Message{}, 0}, - {[]*Message{{time.Now(), 0, nil, notInIndex}, {time.Now(), 1, nil, notInIndex}}, 2}, + if m.messageHeap != nil { + t.Fatal("messageHeap") } - for _, test := range tests { - if result := (&messageHeap{test.messages}).Len(); result != test.result { - t.Errorf("messageHeap.Len() = %v WANT %v", result, test.result) - } + if m.index != indexNotInHeap { + t.Fatal("index") } } -func TestMessageHeap_Less(t *testing.T) { +func TestMessage_less(t *testing.T) { now := time.Now() - tests := []struct { - a *Message - b *Message + + cases := []struct { + a Message + b Message result bool }{ - {&Message{now.Add(-1), 0, nil, notInIndex}, &Message{now, 0, nil, notInIndex}, true}, - {&Message{now, 0, nil, notInIndex}, &Message{now, 0, nil, notInIndex}, false}, - {&Message{now.Add(1), 0, nil, notInIndex}, &Message{now, 0, nil, notInIndex}, false}, - } - for _, test := range tests { - //do this so the heap.Init() is not called and messes with the ordering we want. - mh := &messageHeap{ - messages: []*Message{test.a, test.b}, - } - if result := mh.Less(0, 1); result != test.result { - t.Errorf("mh.Less(%v, %v) = %v WANT %v", mh.messages[0], mh.messages[1], result, test.result) + { + Message{at: now}, + Message{at: now.Add(-1)}, + false, + }, + { + Message{at: now.Add(-1)}, + Message{at: now}, + true, + }, + { + Message{at: now}, + Message{at: now}, + false, + }, + } + + for i, tc := range cases { + result := tc.a.less(&tc.b) + + if result != tc.result { + t.Errorf("%d: %v less %v = %v WANT %v", i, tc.a, tc.b, result, tc.result) } } } -func TestMessageHeap_Swap(t *testing.T) { - mh := newMessageHeap() - a := mh.pushMessageValues(time.Now(), 0) - b := mh.pushMessageValues(time.Now(), 0) - mh.Swap(0, 1) - if mh.messages[0] != b || mh.messages[1] != a { - t.Errorf("mh.Swap(0, 1) should equal b, a") - } - if a.index != 1 { - t.Errorf("mh.Swap() a.index = %v WANT %v", a.index, 1) - } - if b.index != 0 { - t.Errorf("mh.Swap() b.index = %v WANT %v", b.index, 0) +func TestMessage_isHead_NewMessagesShouldNotBeHeads(t *testing.T) { + m := NewMessage(time.Now(), nil) + if m.isHead() { + t.Fatal() } } -func TestMessageHeap_Push(t *testing.T) { - mh := newMessageHeap() - message := &Message{time.Now(), 0, nil, notInIndex} - mh.Push(message) - if mh.Len() != 1 || mh.messages[0] != message { - t.Errorf("mh.Len(), mh[0] = %v, %v WANT %v, %v", mh.Len(), 1, mh.messages[0], message) +func TestMessage_isHead_MessagesInLenOneHeapsAreHeads(t *testing.T) { + mh := messageHeap([]*Message{}) + m := NewMessage(time.Now(), nil) + + pushMessage(&mh, m) + + if !m.isHead() { + t.Fatal() } } -func TestMessageHeap_Pop(t *testing.T) { - mh := newMessageHeap() - message := mh.pushMessageValues(time.Now(), 0) - if result := mh.Pop(); result != message { - t.Errorf("mh.Pop() = %v WANT %v", result, message) - } - if size := mh.Len(); size != 0 { - t.Errorf("mh.Len() = %v WANT %v", size, 0) +func TestMessageHeap_Len(t *testing.T) { + mh := messageHeap([]*Message{}) + if mh.Len() != 0 { + t.Fatal() } -} -func TestMessageHeap_peekMessage_empty(t *testing.T) { - mh := newMessageHeap() - if message := mh.peekMessage(); message != nil { - t.Errorf("mh.peekMessage() = non-nil WANT nil") + mh = messageHeap(make([]*Message, 1234)) + if mh.Len() != 1234 { + t.Fatal() } } -func TestMessageHeap_peekMessage_nonEmpty(t *testing.T) { - mh := newMessageHeap() - want := mh.pushMessageValues(time.Now(), nil) - mh.pushMessageValues(time.Now(), nil) - if actual := mh.peekMessage(); actual != want { - t.Errorf("mh.peekMessage() = %v WANT %v", actual, want) +func TestMessageHeap_Less_DefersToTheMessageLessMethod(t *testing.T) { + now := time.Now() + m1 := NewMessage(now, nil) + m2 := NewMessage(now.Add(1), nil) + + mh := messageHeap([]*Message{m1, m2}) + + if !mh.Less(0, 1) { + t.Fatal() } - if size := mh.Len(); size != 2 { - t.Errorf("mh.Len() = %v WANT %v", size, 2) + if mh.Less(1, 0) { + t.Fatal() } } -func TestMessageHeap_pushMessageValues(t *testing.T) { - mh := newMessageHeap() - for i := 0; i < 10; i++ { - data := fmt.Sprintf("data_%v", i) - now := time.Now() - message := mh.pushMessageValues(now, data) - if message.Time != now { - t.Errorf("message.Time = %v WANT %v", message.Time, now) - } - if message.Data != data { - t.Errorf("message.Data = %v WANT %v", message.Data, data) - } - if message.mh != mh { - t.Errorf("message.mh = %v WANT %v", message.mh, mh) - } - if message.index != i { - t.Errorf("message.index = %v WANT %v", message.index, i) - } +func TestMessageHeap_Swap_UpdatesReferencesAndIndices(t *testing.T) { + now := time.Now() + m1 := NewMessage(now, nil) + m2 := NewMessage(now, nil) + + mh := messageHeap([]*Message{m1, m2}) + + mh.Swap(0, 1) + + //Messages weren't pushed, so there isn't information on them. + //We can check to make sure the index is updated. + + if mh[0] != m2 || m2.index != 0 { + t.Fatal() + } + if mh[1] != m1 || m1.index != 1 { + t.Fatal() } } -func TestMessageHeap_popMessage_empty(t *testing.T) { - mh := newMessageHeap() - if message := mh.popMessage(); message != nil { - t.Errorf("mh.popMessage() = non-nil WANT nil") +func TestMessageHeap_Push_SetsTheMessageHeapFieldOnMessage(t *testing.T) { + m := NewMessage(time.Now(), nil) + + mh := messageHeap([]*Message{}) + + pushMessage(&mh, m) + + if m.messageHeap != &mh { + t.Fatal() } } -func TestMessageHeap_popMessage_nonEmpty(t *testing.T) { - mh := newMessageHeap() - want := mh.pushMessageValues(time.Now(), 0) - actual := mh.popMessage() - if actual != want { - t.Errorf("mh.popMessage() = %v WANT %v", actual, want) +func TestMessageHeap_PushAndPopResultInTheCorrectOrdering(t *testing.T) { + now := time.Now() + + mh := messageHeap([]*Message{}) + + want := []*Message{} + for i := 0; i < 100; i++ { + m := NewMessage(now.Add(time.Duration(i)), nil) + want = append(want, m) + + pushMessage(&mh, m) } - if actual.mh != nil || actual.index != notInIndex { - t.Errorf("popped message mh, index = %v, %v WANT %v, %v", actual.mh, actual.index, nil, notInIndex) + sort.Sort(messageHeap(want)) + + result := []*Message{} + for mh.Len() > 0 { + result = append(result, popMessage(&mh)) } - if size := mh.Len(); size != 0 { - t.Errorf("mh.Len() = %v WANT %v", size, 0) + + //Do a loop here to check equality of pointer values. + for i, m := range result { + if m != want[i] { + t.Fatal() + } } } -func TestMessageHeap_removeMessage_empty(t *testing.T) { - mh := newMessageHeap() - if result := mh.removeMessage(nil); result { - t.Errorf("mh.removeMessage() = %v WANT %v", result, false) +func TestMessageHeap_peek_EmptyReturnsNil(t *testing.T) { + mh := messageHeap([]*Message{}) + + if r := mh.peek(); r != nil { + t.Fatal() } } -func TestMessageHeap_removeMessage_messageNil(t *testing.T) { - mh := newMessageHeap() - mh.pushMessageValues(time.Now(), nil) - if result := mh.removeMessage(nil); result { - t.Errorf("mh.removeMessage() = %v WANT %v", result, false) +func TestMessageHeap_peek_ReturnsMessageAtIndexZero(t *testing.T) { + m := NewMessage(time.Now(), nil) + + mh := messageHeap([]*Message{}) + + pushMessage(&mh, m) + + if peeked := mh.peek(); peeked != mh[0] { + t.Fatal() } } -func TestMessageHeap_removeMessage_notInIndex(t *testing.T) { - mh := newMessageHeap() - mh.pushMessageValues(time.Now(), nil) - mh.pushMessageValues(time.Now(), nil) - message := mh.popMessage() - if result := mh.removeMessage(message); result { - t.Errorf("mh.removeMessage() = %v WANT %v", result, false) +func TestMessageHeap_remove_ReturnsFalseWithoutAssociation(t *testing.T) { + m := NewMessage(time.Now(), nil) + + mh := messageHeap([]*Message{}) + + if ok := mh.remove(m); ok { + t.Fatal() } } -func TestMessageHeap_removeMessage_notInMh(t *testing.T) { - mh := newMessageHeap() - mh.pushMessageValues(time.Now(), nil) - other := newMessageHeap().pushMessageValues(time.Now(), nil) - if result := mh.removeMessage(other); result { - t.Errorf("mh.removeMessage() = %v WANT %v", result, false) +func TestMessageHeap_remove_ReturnsTrueAndModifiesMessage(t *testing.T) { + m := NewMessage(time.Now(), nil) + + mh := messageHeap([]*Message{}) + + pushMessage(&mh, m) + + if m.messageHeap == nil || m.index < 0 { + t.Fatal() + } + + if ok := mh.remove(m); !ok { + t.Fatal() } + + assertDisassociated(t, *m) } -func TestMessageHeap_removeMessage_success(t *testing.T) { - mh := newMessageHeap() - message := mh.pushMessageValues(time.Now(), nil) - if result := mh.removeMessage(message); !result { - t.Errorf("mh.removeMessage() = %v WANT %v", result, true) +func TestMessageHeap_drain_ReturnsEqualLengthSliceOfMessagesNotInAHeapAndSetsLengthToZero(t *testing.T) { + mh := messageHeap([]*Message{}) + + for i := 0; i < 100; i++ { + m := NewMessage(time.Now(), i) + pushMessage(&mh, m) } - if size := mh.Len(); size != 0 { - t.Errorf("mh.Len() = %v WANT %v", size, 0) + + drained := mh.drain() + + assertDisassociated(t, drained...) + + if len(drained) != 100 { + t.Fatal() } - if message.mh != nil || message.index != notInIndex { - t.Errorf("popped message mh, index = %v, %v WANT %v, %v", message.mh, message.index, nil, notInIndex) + if mh.Len() != 0 { + t.Fatal() } } -func TestBeforeRemoval(t *testing.T) { - mh := newMessageHeap() - message := &Message{time.Now(), nil, mh, 1} - beforeRemoval(message) - if message.mh != nil { - t.Errorf("message.mh = non-nil WANT nil") - } - if message.index != notInIndex { - t.Errorf("message.index = %v WANT %v", message.index, notInIndex) +func assertDisassociated(t *testing.T, messages ...Message) { + t.Helper() + + for _, m := range messages { + if m.messageHeap != nil || m.index >= 0 { + t.Error("Message is not disassociated", m) + } } } diff --git a/timequeue.go b/timequeue.go index 4405179..067c13b 100644 --- a/timequeue.go +++ b/timequeue.go @@ -1,35 +1,3 @@ -//Package timequeue provides the TimeQueue type that is a queue of Messages. -//Each Message contains a time.Time that determines the time at which the Message -//should be released from the queue. -//Message types also have a Data field of type interface{} that should be used -//as the payload of the Message. -//TimeQueue is safe for use by multiple go-routines. -// -//Messages need only be pushed to the queue, and then when their time passes, -//they will be sent on the channel returned by Messages(). -//See below for examples. -// -//TimeQueue uses a single go-routine, spawned from Start() that returns from Stop(), -//that processes the Messages as their times pass. -//When a Message is pushed to the queue, the earliest Message in the queue is -//used to determine the next time the running go-routine should wake. -//The running go-routine knows when to wake because the earliest time is used -//to make a channel via time.After(). Receiving on that channel wakes the -//running go-routine if a call to Stop() has not happened prior. -//Upon waking, that Message is removed from the queue and released on the channel -//returned from Messages(). -//Then the newest remaining Message is used to determine when to wake, etc. -//If a Message with a time before any other in the queue is inserted, then that -//Message is pushed to the front of the queue and released appropriately. -// -//Messages that are "released", i.e. sent on the Messages() channel, are always -//released from a newly spawned go-routine so that other go-routines are not -//paused waiting for a receive from Messages(). -// -//Messages with the same Time value will be "flood-released" from the same -//separately spawned go-routine. -//Additionally, Messages that are pushed with times before time.Now() will -//immediately be released from the queue. package timequeue import ( @@ -38,415 +6,240 @@ import ( ) const ( - //DefaultCapacity is the default capacity used for Messages() channels in New(). - DefaultCapacity = 1 + DefaultCapacity = 0 ) -//TimeQueue is a queue of Messages that releases its Messages when their -//Time fields pass. -// -//When Messages are pushed to a TimeQueue, the earliest Message is used to -//determine when the TimeQueue should wake. Upon waking, that earliest Message -//is "released" from the TimeQueue by being sent on the channel returned by -//Messages(). -// -//Messages may be pushed and popped from a TimeQueue whether or not the TimeQueue -//is running or not. Start() and Stop() may be called as many times as desired, -//but Messsages will be released only between calls to Start() and Stop(), i.e. -//while the TimeQueue is running and IsRunning() returns true. -// -//Calls to Pop(), PopAll(), and PopAllUntil() may be called to remove Messages -//from a TimeQueue, but this is required for normal use. -// -//One of the New*() functions should be used to create a TimeQueue. A zero-value -//TimeQueue is not in a valid or working state. type TimeQueue struct { - //the goal is to have only one go-routine "inside" a TimeQueue at once. - //this is achieved by locking on lock in all exported methods and - //requiring the TimeQueue be locked in all unexported methods and - //before all use of unexported fields. - - //protects all other members of a TimeQueue. - lock *sync.Mutex - - //the heap of Messages in the TimeQueue. - messages *messageHeap - - //flag determining if the TimeQueue is running. - //should be true between calls to Start() and Stop() and false otherwise. - running bool - //signal that sends to stopChan or wakeChan to wake or stop the running go-routine. - wakeSignal *wakeSignal - - //the channel to send released Messages on. should be receive only in client code. - messageChan chan *Message - //send to this channel to wake the running go-routine and release Messages. - wakeChan chan time.Time - //send to this channel to stop the running go-routine. - stopChan chan struct{} + timer *time.Timer + + out chan Message + + lock *sync.Mutex + messageHeap messageHeap + stopChan chan chan struct{} + pauseChan chan chan struct{} } -//New creates a new *TimeQueue with a call to New(DefaultCapacity). -func New() *TimeQueue { - return NewCapacity(DefaultCapacity) +func NewTimeQueue() *TimeQueue { + return NewTimeQueueCapacity(DefaultCapacity) } -//NewCapacity creates a new *TimeQueue where the channel returned from Messages() -//has the capacity given by capacity. -//The new TimeQueue is in the stopped state and has no Messages in it. -func NewCapacity(capacity int) *TimeQueue { - return &TimeQueue{ +func NewTimeQueueCapacity(c int) *TimeQueue { + tq := &TimeQueue{ + timer: newExpiredTimer(), + out: make(chan Message, c), lock: &sync.Mutex{}, - messages: newMessageHeap(), - running: false, - wakeSignal: nil, - messageChan: make(chan *Message, capacity), - wakeChan: make(chan time.Time), - stopChan: make(chan struct{}), + messageHeap: messageHeap([]*Message{}), + stopChan: nil, + pauseChan: make(chan chan struct{}), //Must not have capacity to ensure only only goroutine is able to pause the run loop. } -} -//Push creates and adds a Message to q with t and data. The created Message is returned. -func (q *TimeQueue) Push(t time.Time, data interface{}) *Message { - q.lock.Lock() - defer q.lock.Unlock() - message := q.messages.pushMessageValues(t, data) - q.afterHeapUpdate() - return message -} + tq.Start() -//Peek returns (without removing) the Time and Data fields from the earliest -//Message in q. -//If q is empty, then the zero Time and nil are returned. -func (q *TimeQueue) Peek() (time.Time, interface{}) { - message := q.PeekMessage() - if message == nil { - return time.Time{}, nil - } - return message.Time, message.Data + return tq } -//PeekMessage returns (without removing) the earliest Message in q or nil if q -//is empty. -func (q *TimeQueue) PeekMessage() *Message { - q.lock.Lock() - defer q.lock.Unlock() - return q.peekMessage() -} +func newExpiredTimer() *time.Timer { + timer := time.NewTimer(0) + <-timer.C -//peekMessage is the unexported version of PeekMessage(). -//It should only be called when q is locked. -func (q *TimeQueue) peekMessage() *Message { - return q.messages.peekMessage() + return timer } -//Pop removes and returns the earliest Message in q or nil if q is empty. -//If release is true, then the Message (if not nil) will also be sent on the -//channel returned from Messages(). -func (q *TimeQueue) Pop(release bool) *Message { - q.lock.Lock() - defer q.lock.Unlock() - message := q.messages.popMessage() - if message == nil { - return nil - } - if release { - q.releaseMessage(message) - } - q.afterHeapUpdate() - return message +func (tq *TimeQueue) Messages() <-chan Message { + return tq.out } -//PopAll removes and returns a slice of all Messages in q. -//The returned slice will be non-nil but empty if q is itseld empty. -//If release is true, then all returned Messages will also be sent on the channel -//returned from Messages(). -func (q *TimeQueue) PopAll(release bool) []*Message { - q.lock.Lock() - defer q.lock.Unlock() - result := make([]*Message, 0, q.messages.Len()) - for message := q.messages.popMessage(); message != nil; message = q.messages.popMessage() { - result = append(result, message) - } - if release { - q.releaseCopyToChan(result) - } - q.afterHeapUpdate() - return result -} +func (tq *TimeQueue) Start() bool { + tq.lock.Lock() + defer tq.lock.Unlock() -//PopAllUntil removes and returns a slice of Messages in q with Time fields before, -//but not equal to, until. -//If release is true, then all returned Messages will also be sent on the channel -//returned from Messages(). -func (q *TimeQueue) PopAllUntil(until time.Time, release bool) []*Message { - q.lock.Lock() - defer q.lock.Unlock() - return q.popAllUntil(until, release) + return tq.start() } -//popAllUntil is the unexported verson of PopAllUntil. -//It should only be called when q is locked. -func (q *TimeQueue) popAllUntil(until time.Time, release bool) []*Message { - result := make([]*Message, 0, q.messages.Len()) - for message := q.messages.peekMessage(); message != nil && message.Before(until); message = q.messages.peekMessage() { - result = append(result, q.messages.popMessage()) - } - if release { - q.releaseCopyToChan(result) +func (tq *TimeQueue) start() bool { + if !tq.isStopped() { + return false } - q.afterHeapUpdate() - return result + + tq.stopChan = make(chan chan struct{}) + tq.run() + return true } -//Remove removes message from q. -//If q is empty, message is nil, or message is not in q, then Remove is a nop -//and returns false. -//Returns true or false indicating whether or not message was actually removed from q. -//If release is true and message was actually removed, then message will also be -//sent on the channel returned by Messages(). -func (q *TimeQueue) Remove(message *Message, release bool) bool { - q.lock.Lock() - defer q.lock.Unlock() - removed := q.messages.removeMessage(message) - if removed && release { - q.releaseMessage(message) - } - q.afterHeapUpdate() - return removed +func (tq *TimeQueue) run() { + go func() { + for { + select { + case <-tq.timer.C: + tq.releaseNextMessage() + + case resultChan := <-tq.pauseChan: + resultChan <- struct{}{} + <-resultChan + + case resultChan := <-tq.stopChan: + resultChan <- struct{}{} + return + } + + select { + case resultChan := <-tq.stopChan: + resultChan <- struct{}{} + return + + default: + } + } + }() } -//afterHeapUpdate ensures the earliest time is in the next wake signal, if q is running. -//It should only be called when q is locked. -func (q *TimeQueue) afterHeapUpdate() { - if q.isRunning() { - q.updateAndSpawnWakeSignal() - } +func (tq *TimeQueue) releaseNextMessage() { + //TODO document how we are the only goroutine with access to the messageHeap. + + m := popMessage(&tq.messageHeap) + tq.dispatch(m) + + tq.maybeResetTimerToHead() } -//Messages returns the receive only channel that all Messages are released on. -//The returned channel will be the same instance on every call, and this value -//will never be closed. -// -//In order to receive Messages when they are earliest available a go-routine should -//be spawned to drain the channel of all Messages. -// q := timequeue.New() -// q.Start() -// go func() { -// message := <-q.Messages() -// }() -// //push Messages to q. -func (q *TimeQueue) Messages() <-chan *Message { - return q.messageChan +func (tq *TimeQueue) dispatch(m *Message) { + //We don't need to call m.withoutHeap becuase the prior pop operation already does that. + tq.out <- *m } -//Size returns the number of Messages in q. This is the number of Messages that -//have yet to be released (or waiting to be sent on Messages()) in q. -//Therefore, there could still be Messages that q has reference to that are waiting -//to be released or in the Messages() channel buffer. -// -//To obtain the number of total Messages that q still has references to add this value -//and the length of Messages(): -// q.Size() + len(q.Messages()) -func (q *TimeQueue) Size() int { - q.lock.Lock() - defer q.lock.Unlock() - return q.messages.Len() +func (tq *TimeQueue) Stop() bool { + tq.lock.Lock() + defer tq.lock.Unlock() + + return tq.stop() } -//Start spawns a new go-routine to listen for wake times of Messages and sets the -//state to running. -//If q is already running, then Start is a nop. -func (q *TimeQueue) Start() { - q.lock.Lock() - defer q.lock.Unlock() - if q.isRunning() { - return +func (tq *TimeQueue) stop() bool { + if tq.isStopped() { + return false } - q.setRunning(true) - go q.run() - q.updateAndSpawnWakeSignal() -} -//IsRunning returns whether or not q is running. E.g. in between calls to Start() -//and Stop(). -//If IsRunning returns true, then it is possible that Messages are being released. -func (q *TimeQueue) IsRunning() bool { - q.lock.Lock() - defer q.lock.Unlock() - return q.isRunning() + resultChan := make(chan struct{}) + tq.stopChan <- resultChan + <-resultChan + + tq.stopChan = nil + return true } -//isRunning is the unexported version of IsRunning. -//It should only be called when q is locked. -func (q *TimeQueue) isRunning() bool { - return q.running +func (tq *TimeQueue) Drain() []Message { + tq.lock.Lock() + defer tq.lock.Unlock() + + return tq.drain() } -//run is the run loop of a TimeQueue. -//It is an infinite loop that selects between q.wakeChan and q.stopChan. -//If q.wakeChan is selected, then q.onWake() is called. -//If q.wakeStop is selected, then this method returns. -//Note that this method does not spawn a new go-routine. -//That should be done outside of run. -//run does not have the precondition that q must be locked. -//This is a function that should execute in its own go-routine and thus cannot -//lock any other parts of q. -func (q *TimeQueue) run() { - for { - select { - case wakeTime := <-q.wakeChan: - q.onWake(wakeTime) - case <-q.stopChan: - return - } +func (tq *TimeQueue) drain() []Message { + unpause := tq.pause() + defer unpause() + + if tq.messageHeap.Len() > 0 { + defer tq.stopTimer() } -} -//onWake should be called when q receives a value on q.wakeChan. -//Because onWake will be called from a go-routine that we spawned, we lock and -//defer unlock on q since this acts like an exported method of sorts in that -//it starts execution of unexported code from an outside go-routine. -func (q *TimeQueue) onWake(wakeTime time.Time) { - q.lock.Lock() - defer q.lock.Unlock() - q.popAllUntil(wakeTime, true) - q.updateAndSpawnWakeSignal() -} + //We start with the drained Messages from our heap. + result := tq.messageHeap.drain() -//releaseMessage is a utility method that spawns a go-routine to send message on -//q.messageChan so that that calling go-routine does not have to wait. -func (q *TimeQueue) releaseMessage(message *Message) { - go func() { - q.messageChan <- message - }() + //If there are Messages on our output channel, then drain the channel. + //Messages on this channel are already disassociated with a heap. + for len(tq.out) > 0 { + result = append(result, <-tq.out) + } + + return result } -//releaseCopyToChan is a utility method that copies messages to a new, buffered -//channel, and empties that new channel by sending every messsage on q.messageChan. -func (q *TimeQueue) releaseCopyToChan(messages []*Message) { - copyChan := make(chan *Message, len(messages)) - for _, message := range messages { - copyChan <- message - } - q.releaseChan(copyChan) - close(copyChan) +func (tq *TimeQueue) isStopped() bool { + return tq.stopChan == nil } -//releaseChan is a utility method that spawns a go-routine to send every message -//in messages on q.messageChan. -//Note that releaseChan reads from messages until it is closed, thus messages must -//be closed by the calling function. -func (q *TimeQueue) releaseChan(messages <-chan *Message) { - go func() { - for message := range messages { - q.messageChan <- message - } - }() +func (tq *TimeQueue) Remove(m *Message) bool { + tq.lock.Lock() + defer tq.lock.Unlock() + + return tq.remove(m) } -//updateAndSpawnSignal kills the current wake signal if it exists -//and creates and spawns the next wake signal if there are any messages left in q. -//Returns true if a new wakeSignal was spawned, false otherwise. -//It should only be called when q is locked. -func (q *TimeQueue) updateAndSpawnWakeSignal() bool { - q.killWakeSignal() - message := q.peekMessage() - if message == nil { - return false +func (tq *TimeQueue) remove(m *Message) bool { + unpause := tq.pause() + defer unpause() + + isHead := m.isHead() + ok := tq.messageHeap.remove(m) + + if ok && isHead { + tq.stopTimer() + tq.maybeResetTimerToHead() } - q.setWakeSignal(newWakeSignal(q.wakeChan, message.Time)) - return q.spawnWakeSignal() -} -//setWakeSignal sets q.wakeSignal to wakeSignal. -//It should only be called when q is locked. -func (q *TimeQueue) setWakeSignal(wakeSignal *wakeSignal) { - q.wakeSignal = wakeSignal + return ok } -//spawnWakeSignal calls spawn() on q.wakeSignal if it is not nil. -//Returns true if spawn was called, false otherwise. -//It should only be called when q is locked. -func (q *TimeQueue) spawnWakeSignal() bool { - if q.wakeSignal != nil { - q.wakeSignal.spawn() - return true - } - return false +func (tq *TimeQueue) Push(at time.Time, data interface{}) *Message { + m := NewMessage(at, data) + tq.PushAll(m) + return m } -//killWakeSignal call kill() and sets q.wakeSignal to nil if it is not nil. -//Returns true if the old wakeSignal is not nil, false otherwise. -//It should only be called when q is locked. -func (q *TimeQueue) killWakeSignal() bool { - if q.wakeSignal != nil { - q.wakeSignal.kill() - q.wakeSignal = nil - return true +func (tq *TimeQueue) PushAll(messages ...*Message) { + tq.lock.Lock() + defer tq.lock.Unlock() + + unpause := tq.pause() + defer unpause() + + hasNewHead := false + + for _, m := range messages { + pushMessage(&tq.messageHeap, m) + + if m.isHead() { + hasNewHead = true + } } - return false -} -//Stop tells the running go-routine to stop running. -//This results in no more Messages being released (until a subsequent call to Start()) -//and the state to be set to not running. -//If q is already stopped, then Stop is a nop. -func (q *TimeQueue) Stop() { - q.lock.Lock() - defer q.lock.Unlock() - if !q.isRunning() { - return + if hasNewHead { + if len(messages) < tq.messageHeap.Len() { + //TODO displaced docs + tq.stopTimer() + } + tq.maybeResetTimerToHead() } - q.killWakeSignal() - q.setRunning(false) - go func() { - q.stopChan <- struct{}{} - }() } -//setRunning is the unexported version of SetRunning. Sets q.running to running. -//It should only be called when q is locked. -func (q *TimeQueue) setRunning(running bool) { - q.running = running -} +func (tq *TimeQueue) pause() func() { + if tq.isStopped() { + return func() {} + } -//wakeSignal represents a signal that sends a time.Time value after that time has passed. -//wakeSignals can be killed, which will prevent the signal from sending its value. -type wakeSignal struct { - dst chan time.Time - src <-chan time.Time - stop chan struct{} + resultChan := make(chan struct{}) + tq.pauseChan <- resultChan + <-resultChan + return func() { + resultChan <- struct{}{} + } } -//newWakeSignal create a wakeSignal that sends wakeTime on dst when wakeTime passes. -//this function should be used to create wakeSignals. -//the zero value wakeSignal is not valid. -func newWakeSignal(dst chan time.Time, wakeTime time.Time) *wakeSignal { - return &wakeSignal{ - dst: dst, - src: time.After(wakeTime.Sub(time.Now())), - stop: make(chan struct{}), +func (tq *TimeQueue) stopTimer() { + if !tq.timer.Stop() { + <-tq.timer.C } } -//spawn starts a new go-routine that selects on w.src and w.stop. -//If w.src is selected, the received value is sent on w.dst. -//If w.stop is selected, then the function stops selecting. -//In both cases, w.src is set to nil and the function returns. -func (w *wakeSignal) spawn() { - go func() { - select { - case wakeTime := <-w.src: - w.dst <- wakeTime - case <-w.stop: - } - w.src = nil - }() +func (tq *TimeQueue) maybeResetTimerToHead() { + peeked := tq.messageHeap.peek() + + if peeked != nil { + tq.resetTimerTo(peeked.At()) + } } -//kill closes the w.stop channel. -//This is NOT idempotent. I.e. kill should only be called once a single wakeSignal. -func (w *wakeSignal) kill() { - close(w.stop) +func (tq *TimeQueue) resetTimerTo(t time.Time) { + tq.timer.Reset(time.Until(t)) } diff --git a/timequeue_accept_test.go b/timequeue_accept_test.go new file mode 100644 index 0000000..d804a28 --- /dev/null +++ b/timequeue_accept_test.go @@ -0,0 +1,273 @@ +package timequeue_test + +import ( + "context" + "math/rand" + "sort" + "testing" + "time" + + "github.com/gogolfing/timequeue" +) + +func messagesLessFunc(messages []timequeue.Message) func(i, j int) bool { + return func(i, j int) bool { + return messages[i].At().Before(messages[j].At()) + } +} + +func timeWithinDurationFunc(t time.Time, d time.Duration) func() time.Time { + return func() time.Time { + return t.Add(time.Duration(rand.Int63n(int64(d)))) + } +} + +func TestTimeQueue_SinglePublisherAndConsumerRetrievesMessagesInOrder(t *testing.T) { + tq := timequeue.NewTimeQueue() + + const count = 10000 + + go func() { + now := time.Now() + atFunc := timeWithinDurationFunc(now, time.Second) + + toPush := make([]*timequeue.Message, count) + for i := 0; i < count; i++ { + toPush[i] = timequeue.NewMessage(atFunc(), i) + } + + tq.PushAll(toPush...) + }() + + messages := make([]timequeue.Message, 0, count) + + for i := 0; i < count; i++ { + messages = append(messages, <-tq.Messages()) + } + + sorted := sort.SliceIsSorted(messages, messagesLessFunc(messages)) + if !sorted { + t.Fatal(sorted) + } +} + +func TestTimeQueue_FullOnUsage(t *testing.T) { + if testing.Short() { + t.Skip("skipping for time") + } + + //now is used a reference for the start time of the test. + now := time.Now() + goroutineCount := 10 + messagesPerGoroutine := 10000 + duration := time.Duration(10) * time.Second + atFunc := timeWithinDurationFunc(now, duration) + pauseCount := 100 + + tq := timequeue.NewTimeQueueCapacity(messagesPerGoroutine) + + consumingCtx, consumingCtxCancel := context.WithCancel(context.Background()) + consumedCountChan := consumeMessages(consumingCtx, tq, goroutineCount) + + producingCtx, producingCtxCancel := context.WithCancel(context.Background()) + producedCountChan, messagesToRemove := produceMessages( + producingCtx, + tq, + goroutineCount, + messagesPerGoroutine, + atFunc, + 0.15, + ) + + removingCtx := producingCtx + removedCountChan := removeMessages(removingCtx, tq, 1, messagesToRemove) + + //Wait to get through a portion of the allotted duration. + deadline := now.Add(duration * 3 / 4) + + stopCtx, stop := context.WithDeadline(context.Background(), deadline) + defer stop() + + //Send intermittent Stop then Start signals. + pauseThroughoutDeadline(stopCtx, tq, deadline, pauseCount) + + <-stopCtx.Done() //Actually wait until the deadline hits. + producingCtxCancel() //Stop producing. We may already be done producing depending on the number of messages sent. + + producedCount := <-producedCountChan + //Getting here means that we are done producing. All producers have returned. + + tq.Stop() //We call Stop while there are still consumers running. This is a requirement. + consumingCtxCancel() //Now we stop consuming. + + removedCount := <-removedCountChan //All removers have returned. + consumedCount := <-consumedCountChan //All consumers have returned. We know there are no more receive attempts from tq.Messages(). + + //From above, we know we can call Drain(). + + drainedCount := len(tq.Drain()) + + totalReceived := removedCount + consumedCount + drainedCount + + if producedCount != totalReceived { + t.Fatalf( + "produced %v ; removed + consumed + drained = %v + %v + %v = %v", + producedCount, + removedCount, + consumedCount, + drainedCount, + totalReceived, + ) + } +} + +func consumeMessages(ctx context.Context, tq *timequeue.TimeQueue, grc int) <-chan int { + aggChan := make(chan int) + + for i := 0; i < grc; i++ { + go func() { + count := 0 + for { + select { + case <-ctx.Done(): + aggChan <- count + return + + case <-tq.Messages(): + count++ + } + } + }() + } + + result := make(chan int, 1) + + go func() { + defer close(aggChan) + defer close(result) + + sum := 0 + for i := 0; i < grc; i++ { + sum += <-aggChan + } + result <- sum + }() + + return result +} + +func produceMessages(ctx context.Context, tq *timequeue.TimeQueue, grc, mpg int, atFunc func() time.Time, removeRate float64) (<-chan int, <-chan *timequeue.Message) { + aggChan := make(chan int) + removeChan := make(chan *timequeue.Message, mpg) + + for i := 0; i < grc; i++ { + go func() { + count := 0 + + loop: + for count < mpg { + select { + case <-ctx.Done(): + break loop + + default: + m := tq.Push(atFunc(), i) + count++ + + if rand.Float64() < removeRate { + removeChan <- m + } + } + } + + aggChan <- count + }() + } + + result := make(chan int, 1) + + go func() { + defer close(aggChan) + defer close(removeChan) + defer close(result) + + sum := 0 + for i := 0; i < grc; i++ { + sum += <-aggChan + } + result <- sum + }() + + return result, removeChan +} + +func removeMessages(ctx context.Context, tq *timequeue.TimeQueue, grc int, toRemove <-chan *timequeue.Message) <-chan int { + aggChan := make(chan int) + + for i := 0; i < grc; i++ { + go func() { + count := 0 + + loop: + for m := range toRemove { + select { + case <-ctx.Done(): + break loop + + default: + if ok := tq.Remove(m); ok { + count++ + } + } + } + + aggChan <- count + }() + } + + result := make(chan int, 1) + + go func() { + defer close(aggChan) + defer close(result) + + sum := 0 + for i := 0; i < grc; i++ { + sum += <-aggChan + } + result <- sum + }() + + return result +} + +func pauseThroughoutDeadline(ctx context.Context, tq *timequeue.TimeQueue, deadline time.Time, count int) { + defer tq.Start() + + done := make(chan struct{}) + + go func() { + ticker := time.NewTicker(time.Until(deadline) / time.Duration(count)) + defer ticker.Stop() + + start := false + + for { + select { + case <-ctx.Done(): + close(done) + return + + case <-ticker.C: + if start { + tq.Start() + } else { + tq.Stop() + } + start = !start + } + } + }() + + <-done +} diff --git a/timequeue_acceptance_test.go b/timequeue_acceptance_test.go deleted file mode 100644 index 8ebc170..0000000 --- a/timequeue_acceptance_test.go +++ /dev/null @@ -1,57 +0,0 @@ -package timequeue_test - -import ( - "testing" - "time" - - "github.com/gogolfing/timequeue" -) - -func TestTimeQueue_acceptance_messageAddedBeforeStart(t *testing.T) { - tq := timequeue.New() - tq.Push(time.Now(), "now") - tq.Start() - defer tq.Stop() - if message := <-tq.Messages(); message.Data != "now" { - t.Errorf("message was not released") - } -} - -func TestTimeQueue_acceptance_startAndStopStress(t *testing.T) { - const count = 100000 - tq := timequeue.NewCapacity(100) - tq.Start() - defer tq.Stop() - for i := 0; i < count; i++ { - tq.Push(time.Now().Add(time.Duration(i)*time.Nanosecond), i) - } - go func() { - for i := 0; i < count; i++ { - tq.Stop() - tq.Start() - } - }() - for i := 0; i < count; i++ { - <-tq.Messages() - } - if size := tq.Size(); size != 0 { - t.Errorf("size = %v WANT %v", size, 0) - } -} - -func TestTimeQueue_acceptance_millionMessagesSameTime(t *testing.T) { - const count = 1000000 - tq := timequeue.NewCapacity(100) - tq.Start() - defer tq.Stop() - now := time.Now() - for i := 0; i < count; i++ { - tq.Push(now, i) - } - for i := 0; i < count; i++ { - <-tq.Messages() - } - if size := tq.Size(); size != 0 { - t.Errorf("size = %v WANT %v", size, 0) - } -} diff --git a/timequeue_example_test.go b/timequeue_example_test.go index 56bc7b6..579e859 100644 --- a/timequeue_example_test.go +++ b/timequeue_example_test.go @@ -1,82 +1,72 @@ package timequeue_test import ( + "context" "fmt" "time" "github.com/gogolfing/timequeue" ) -func Example() { - tq := timequeue.New() - tq.Start() - //this would normally be a long-running process, - //and not stop at the return of a function call. - defer tq.Stop() - - startTime := time.Now() - - tq.Push(startTime, "this will be released immediately") - - //adding Messages in chronological order. - for i := 1; i <= 4; i++ { - tq.Push( - startTime.Add(time.Duration(i)*time.Second), - fmt.Sprintf("message at second %v", i), - ) - } - //adding Messages in reverse chronological order. - for i := 8; i >= 5; i-- { - tq.Push( - startTime.Add(time.Duration(i)*time.Second), - fmt.Sprintf("message at second %v", i), - ) - } - - //receive all 9 Messages that were pushed. - for i := 0; i < 9; i++ { - message := <-tq.Messages() - fmt.Println(message.Data) - } - - fmt.Printf("there are %v messages left in the queue\n", tq.Size()) - - endTime := time.Now() - if endTime.Sub(startTime) > time.Duration(8)*time.Second { - fmt.Println("releasing all messages took more than 8 seconds") - } else { - fmt.Println("releasing all messages took less than 8 seconds") - } +func ExampleTimeQueue() { + now := time.Now() + tq := timequeue.NewTimeQueue() - //Output: - //this will be released immediately - //message at second 1 - //message at second 2 - //message at second 3 - //message at second 4 - //message at second 5 - //message at second 6 - //message at second 7 - //message at second 8 - //there are 0 messages left in the queue - //releasing all messages took more than 8 seconds -} + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() -func ExampleTimeQueue_PopAllUntil() { - tq := timequeue.New() - now := time.Now() - for i := 0; i < 4; i++ { - tq.Push(now.Add(time.Duration(i)*time.Second), i) - } - tq.PopAllUntil(now.Add(time.Duration(2)*time.Second), true) - for i := 0; i < 2; i++ { - message := <-tq.Messages() - fmt.Println(message.Data) - } - fmt.Println("messages left:", tq.Size()) + stopped := make(chan struct{}) + go func() { + defer close(stopped) + + <-ctx.Done() + + tq.Stop() + }() + + doneProducing := make(chan struct{}) + go func() { + defer close(doneProducing) + + const count = 10 + + toPush := make([]*timequeue.Message, count) + for i := 0; i < count; i++ { + m := timequeue.NewMessage(now.Add(time.Duration(i)), i+1) + toPush[i] = m + } + + tq.PushAll(toPush...) + }() + + doneConsuming := make(chan struct{}) + go func() { + defer close(doneConsuming) + + for { + select { + case <-stopped: + return + + case m := <-tq.Messages(): + fmt.Println(m.Data().(int)) + } + } + }() + + <-doneProducing + <-stopped + <-doneConsuming //Output: - //0 //1 - //messages left: 2 + //2 + //3 + //4 + //5 + //6 + //7 + //8 + //9 + //10 } diff --git a/timequeue_test.go b/timequeue_test.go index 3aa4809..464ff96 100644 --- a/timequeue_test.go +++ b/timequeue_test.go @@ -1,598 +1,213 @@ package timequeue import ( - "reflect" - "sort" "testing" "time" ) -func TestNew(t *testing.T) { - q := New() - if cap(q.messageChan) != DefaultCapacity { - t.Errorf("cap(messageChan) = %v WANT %v", cap(q.messageChan), DefaultCapacity) - } -} +func TestTimeQueue_New_CreatesAMessageChannelWithDefaultCapacity(t *testing.T) { + tq := NewTimeQueue() + defer tq.Stop() -func TestNewCapacity(t *testing.T) { - q := NewCapacity(2) - if size := q.messages.Len(); size != 0 { - t.Errorf("NewSize() q.messges.Len() = %v WANT %v", size, 0) - } - if q.lock == nil { - t.Errorf("NewSize() lock should be non-nil") - } - if q.running == true { - t.Errorf("NewSize() running = %v WANT %v", q.running, false) - } - if q.wakeSignal != nil { - t.Errorf("NewSize() wakeSignal should be nil") - } - if cap(q.messageChan) != 2 { - t.Errorf("NewSize() cap(messageChan) = %v WANT %v", cap(q.messageChan), 2) - } - if q.wakeChan == nil || q.stopChan == nil { - t.Errorf("NewSize() wakeChan and stopChan should be non-nil") + if cap(tq.Messages()) != DefaultCapacity { + t.Fatal() } } -func TestTimeQueue_Push(t *testing.T) { - q := New() - message := q.Push(time.Time{}, "test_data") - size := q.messages.Len() - if size != 1 { - t.Errorf("q.messages.Len() = %v WANT %v", size, 1) - } - if message == nil { - t.Errorf("message = nil WANT non-nil") - } - if message != q.messages.peekMessage() { - t.Errorf("return message should equal peek message") - } - if !message.Time.Equal(time.Time{}) { - t.Errorf("message.Time = %v WANT %v", message.Time, time.Time{}) - } - if message.Data != "test_data" { - t.Errorf("message.Data = %v WANT %v", message.Data, "test_data") - } -} +func TestTimeQueue_NewCapacity_CreatesAMessagesChannelWithDesiredCapacity(t *testing.T) { + tq := NewTimeQueueCapacity(1234) + defer tq.Stop() -func TestTimeQueue_Peek_nil(t *testing.T) { - q := New() - peekTime, data := q.Peek() - if !peekTime.IsZero() || data != nil { - t.Errorf("q.Peek() = %v, %v WANT %v, %v", peekTime, data, time.Time{}, nil) + if cap(tq.Messages()) != 1234 { + t.Fatal() } } -func TestTimeQueue_Peek_nonNil(t *testing.T) { - q := New() - now := time.Now() - q.Push(now, "test_data") - peekTime, data := q.Peek() - if !peekTime.Equal(now) || data != "test_data" { - t.Errorf("q.Peek() = %v, %v WANT %v, %v", peekTime, data, now, "test_data") - } -} +func TestTimeQueue_Start_ReturnsFalseOnARunningTimeQueue(t *testing.T) { + tq := NewTimeQueue() + defer tq.Stop() -func TestTimeQueue_PeekMessage_nil(t *testing.T) { - q := New() - message := q.PeekMessage() - if message != nil { - t.Errorf("q.PeekMessage() = non-nil WANT nil") + for i := 0; i < 10; i++ { + if tq.Start() { + t.Fatal() + } } } -func TestTimeQueue_PeekMessage_nonNil(t *testing.T) { - q := New() - want := q.Push(time.Now(), "test_data") - actual := q.PeekMessage() - if actual == nil || actual != want { - t.Errorf("q.PeekMessage() = %v WANT %v", actual, want) - } -} +func TestTimeQueue_Stop_ReturnsTrueWhileRunningAndFalseWhileStopped(t *testing.T) { + tq := NewTimeQueue() -func TestTimeQueue_Pop_empty(t *testing.T) { - q := New() - message := q.Pop(false) - if message != nil { - t.Errorf("q.Pop() is non-nil WANT nil") + if !tq.Stop() { + t.Fatal("first Stop") } -} -func TestTimeQueue_Pop_nonEmptyRelease(t *testing.T) { - q := New() - want := q.Push(time.Now(), "test_data") - actual := q.Pop(true) - if actual != want { - t.Errorf("q.Pop() return = %v WANT %v", actual, want) - } - actual = <-q.Messages() - if actual != want { - t.Errorf("q.Pop() Messages() = %v WANT %v", actual, want) - } - if len(q.Messages()) != 0 { - t.Errorf("len(q.Messages()) = %v WANT %v", len(q.Messages()), 0) + for i := 0; i < 10; i++ { + if tq.Stop() { + t.Fatal() + } } } -func TestTimeQueue_Pop_nonEmptyNonRelease(t *testing.T) { - q := New() - want := q.Push(time.Now(), "test_data") - actual := q.Pop(true) - if actual != want { - t.Errorf("q.Pop() return = %v WANT %v", actual, want) - } -} +func TestTimeQueue_Start_ReturnsTrueWhileStopped(t *testing.T) { + tq := NewTimeQueue() + tq.Stop() -func TestTimeQueue_PopAll(t *testing.T) { - now := time.Now() - tests := []struct { - messageValues []*testMessageValue - release bool - }{ - {[]*testMessageValue{}, false}, - {[]*testMessageValue{}, true}, - {[]*testMessageValue{{now, 0}}, false}, - {[]*testMessageValue{{now, 0}}, true}, - {[]*testMessageValue{{now, 0}, {now.Add(1), 1}, {now.Add(2), 2}}, true}, - {[]*testMessageValue{{now.Add(4), 4}, {now.Add(2), 2}, {now.Add(1), 1}, {now, 0}}, true}, - } - for _, test := range tests { - q := New() - want := []*Message{} - for _, mv := range test.messageValues { - message := q.Push(mv.Time, mv.Data) - want = append(want, message) - } - sort.Sort(&messageHeap{want}) - result := q.PopAll(test.release) - if !areMessagesEqual(result, want) { - t.Errorf("q.PopAll() messages sorted = %v WANT %v", result, want) - } - if test.release && !areChannelMessagesEqual(q.Messages(), want) { - t.Errorf("q.PopAll() Messages() sorted WANT %v", want) - } - if len(q.Messages()) != 0 { - t.Errorf("len(q.Messages() = %v WANT %v", len(q.Messages()), 0) - } + if !tq.Start() { + t.Fatal() } + + defer tq.Stop() } -func TestTimeQueue_PopAllUntil(t *testing.T) { +func TestTimeQueue_ANewTimeQueueCanHoldCapacityWithoutBlocking_DrainCalledWhileRunning(t *testing.T) { + tq := NewTimeQueueCapacity(10) + defer tq.Stop() + now := time.Now() - tests := []struct { - messageValues []*testMessageValue - release bool - untilTime time.Time - untilCount int - }{ - {[]*testMessageValue{}, false, now.Add(10), 0}, - {[]*testMessageValue{}, true, now.Add(-10), 0}, - {[]*testMessageValue{{now, 0}}, true, now, 0}, - {[]*testMessageValue{{now, 0}, {now.Add(1), 1}, {now.Add(2), 2}}, true, now.Add(2), 2}, - {[]*testMessageValue{{now.Add(4), 4}, {now.Add(2), 2}, {now.Add(1), 1}, {now, 0}}, true, now.Add(3), 3}, - {[]*testMessageValue{{now.Add(4), 4}, {now.Add(-1), -1}, {now.Add(2), 2}, {now.Add(1), 1}, {now, 0}}, true, now.Add(3), 4}, + for i := 0; i < 10; i++ { + tq.Push(now, i) } - for _, test := range tests { - q := New() - want := []*Message{} - for _, mv := range test.messageValues { - message := q.Push(mv.Time, mv.Data) - want = append(want, message) - } - sort.Sort(&messageHeap{want}) - want = want[:test.untilCount] - result := q.PopAllUntil(test.untilTime, test.release) - if !areMessagesEqual(result, want) { - t.Errorf("q.PopAllUntil() messages sorted = %v WANT %v", result, want) - } - if test.release && !areChannelMessagesEqual(q.Messages(), want) { - t.Errorf("q.PopAllUntil() Messages() sorted WANT %v", want) - } - if q.messages.Len() != len(test.messageValues)-test.untilCount { - t.Errorf("len(q.messages) = %v WANT %v", q.messages.Len(), len(test.messageValues)-test.untilCount) - } - if len(q.Messages()) != 0 { - t.Errorf("len(q.Messages()) = %v WANT %v", len(q.Messages()), 0) - } - } -} -func TestTimeQueue_Remove_empty(t *testing.T) { - q := New() - if result := q.Remove(nil, true); result { - t.Errorf("q.Remove() = %v WANT %v", result, false) - } - if size := len(q.Messages()); size != 0 { - t.Errorf("len(q.Messages()) = %v WANT %v", size, 0) + drained := tq.Drain() + if len(drained) != 10 { + t.Fatal() } + assertDisassociated(t, drained...) } -func TestTimeQueue_Remove_nonEmpty(t *testing.T) { - tests := []struct { - release bool - }{ - {true}, - {false}, - } - for _, test := range tests { - q := New() - want := q.Push(time.Now(), nil) - if result := q.Remove(want, test.release); !result { - t.Errorf("q.Remove() = %v WANT %v", result, true) - } - if test.release { - if actual := <-q.Messages(); actual != want { - t.Errorf("<-q.Messages() = %v WANT %v", actual, want) - } - } - if size := q.Size(); size != 0 { - t.Errorf("t.Size() = %v WANT %v", size, 0) - } - if size := len(q.Messages()); size != 0 { - t.Errorf("len(q.Messages()) = %v WANT %v", size, 0) - } - } -} +func TestTimeQueue_ANewTimeQueueCanHoldCapacityWithoutBlocking_DrainCalledWhileStopped(t *testing.T) { + tq := NewTimeQueueCapacity(10) -func TestTimeQueue_Remove_notIn(t *testing.T) { - q := New() - q.Push(time.Now(), nil) - other := New().Push(time.Now(), nil) - if result := q.Remove(other, true); result { - t.Errorf("q.Remove(other) = %v WANT %v", result, false) + now := time.Now() + for i := 0; i < 10; i++ { + tq.Push(now, i) } -} -func TestTimeQueue_afterHeapUpdate_notRunning(t *testing.T) { - q := New() - q.afterHeapUpdate() - if q.wakeSignal != nil { - t.Errorf("q.wakeSignal = non-nil WANT nil") - } -} + tq.Stop() -func TestTimeQueue_afterHeapUpdate_running(t *testing.T) { - q := New() - q.setRunning(true) - q.afterHeapUpdate() - if q.wakeSignal != nil { - t.Errorf("q.wakeSignal = non-nil WANT nil") + drained := tq.Drain() + if len(drained) != 10 { + t.Fatal() } + assertDisassociated(t, drained...) } -func TestTimeQueue_Messages(t *testing.T) { - q := New() - if q.Messages() != q.messageChan { - t.Errorf("q.Messages() != q.messageChan") - } -} +func TestTimeQueue_Remove_CanRemoveAMessageWhileRunningAndNothingGetsDrained(t *testing.T) { + tq := NewTimeQueue() + defer tq.Stop() -func TestTimeQueue_Size(t *testing.T) { - q := New() - q.Push(time.Now(), 0) - if q.Size() != 1 { - t.Errorf("q.Size() = %v WANT %v", q.Size(), 1) - } -} + m := tq.Push(time.Now().Add(time.Hour), nil) -func TestTimeQueue_Start_notRunning(t *testing.T) { - q := New() - q.setRunning(true) - q.Start() - if q.wakeSignal != nil { - t.Errorf("q.wakeSignal = non-nil WANT nil") + if ok := tq.Remove(m); !ok { + t.Fatal(ok) } -} -func TestTimeQueue_Start_running(t *testing.T) { - q := New() - message := q.Push(time.Now().Add(time.Duration(200)*time.Millisecond), "test_data") - q.Start() - defer q.Stop() - if q.wakeSignal == nil { - t.Errorf("q.wakeSignal = nil WANT non-nil") - } - if running := q.IsRunning(); !running { - t.Errorf("running = %v WANT %v", running, true) - } - if result := <-q.Messages(); result != message { - t.Errorf("message = %v WANT %v", result, message) + if len(tq.Drain()) != 0 { + t.Fatal() } } -func TestTimeQueue_run(t *testing.T) { - q := New() - go func() { - q.wakeChan <- time.Now() - q.stopChan <- struct{}{} - }() - q.run() - if q.wakeSignal != nil { - t.Errorf("q.wakeSignal = non-nil WANT nil") - } - if count := len(q.messageChan); count != 0 { - t.Errorf("len(q.messageChan) = %v WANT %v", count, 0) - } -} +func TestTimeQueue_Remove_CanRemoveAMessageWhileStoppedAndNothingGetsDrained(t *testing.T) { + tq := NewTimeQueue() -func TestTimeQueue_onWake(t *testing.T) { - q := New() - now := time.Now() - for i := 0; i < 4; i++ { - q.Push(now.Add(time.Duration(i)), i) - } - q.onWake(now.Add(4)) - for i := 0; i < 4; i++ { - message := <-q.Messages() - if message.Data != i { - t.Errorf("message.Data = %v WANT %v", message.Data, i) - } - } - if q.wakeSignal != nil { - t.Errorf("q.wakeSignal = non-nil WANT nil") - } -} + m := tq.Push(time.Now().Add(time.Hour), nil) -func TestTimeQueue_popAllUntil(t *testing.T) { - q := New() - now := time.Now() - for i := 4; i >= 0; i-- { - q.Push(now.Add(time.Duration(i)), i) - } - q.popAllUntil(now.Add(5), true) - for i := 0; i <= 4; i++ { - message := <-q.Messages() - if message.Data != i { - t.Errorf("message.Data = %v WANT %v", message.Data, i) - } - } - if size := q.Size(); size != 0 { - t.Errorf("q.Size() = %v WANT %v", size, 0) - } - if q.wakeSignal != nil { - t.Errorf("q.wakeSignal = non-nil WANT nil") - } -} + tq.Stop() -func TestTimeQueue_releaseMessage(t *testing.T) { - q := New() - q.releaseMessage(&Message{time.Now(), 0, nil, notInIndex}) - if message := <-q.Messages(); message.Data != 0 { - t.Errorf("message.Data = %v WANT %v", message.Data, 0) + if ok := tq.Remove(m); !ok { + t.Fatal(ok) } -} -func TestTimeQueue_releaseCopyToChan(t *testing.T) { - tests := []struct { - messages []*Message - }{ - {nil}, - {[]*Message{}}, - {[]*Message{{time.Now(), 0, nil, notInIndex}, {time.Now(), 1, nil, notInIndex}}}, - } - for _, test := range tests { - q := New() - q.releaseCopyToChan(test.messages) - for _, wantMessage := range test.messages { - if message := <-q.Messages(); message != wantMessage { - t.Errorf("q.Messages() = %v WANT %v", message, wantMessage) - } - } + if len(tq.Drain()) != 0 { + t.Fatal() } } -func TestTimeQueue_releaseChan(t *testing.T) { - tests := []struct { - messages []*Message - }{ - {nil}, - {[]*Message{}}, - {[]*Message{{time.Now(), 0, nil, notInIndex}, {time.Now(), 1, nil, notInIndex}}}, - } - for _, test := range tests { - q := New() - out := make(chan *Message) - go func() { - for _, message := range test.messages { - out <- message - } - close(out) - }() - q.releaseChan(out) - for _, wantMessage := range test.messages { - if message := <-q.Messages(); message != wantMessage { - t.Errorf("q.Messages() = %v WANT %v", message, wantMessage) - } - } - } -} +func TestTimeQueue_Remove_CallingRemoveWithAMessageFromAnotherQueueReturnsFalse(t *testing.T) { + tq1 := NewTimeQueueCapacity(1) //We need capacity so we don't block. + defer tq1.Stop() -func TestTimeQueue_updateAndSpawnWakeSignal_empty(t *testing.T) { - q := New() - if result := q.updateAndSpawnWakeSignal(); result != false { - t.Errorf("q.updateAndSpawnWakeSignal() = %v WANT %v", result, false) - } -} + tq2 := NewTimeQueueCapacity(1) //We need capacity so we don't block. + defer tq2.Stop() + + tq1.Push(time.Now(), nil) + m2 := tq2.Push(time.Now(), nil) -func TestTimeQueue_updateAndSpawnWakeSignal_nonEmpty(t *testing.T) { - q := New() - wantMessage := q.Push(time.Now().Add(time.Duration(250)*time.Millisecond), 0) - if result := q.updateAndSpawnWakeSignal(); result != true { - t.Fatalf("q.updateAndSpawnWakeSignal() = %v WANT %v", result, true) + if tq1.Remove(m2) { + t.Fatal() } - if q.wakeSignal == nil { - t.Errorf("q.wakeSignal = nil WANT non-nil") + + if len(tq1.Drain()) != 1 { + t.Fatal() } - go q.run() - if message := <-q.Messages(); message != wantMessage { - t.Errorf("q.Messages() = %v WANT %v", message, wantMessage) + if len(tq2.Drain()) != 1 { + t.Fatal() } } -func TestTimeQueue_setWakeSignal(t *testing.T) { - q := New() - ws := newWakeSignal(q.wakeChan, time.Now()) - q.setWakeSignal(ws) - if q.wakeSignal != ws { - t.Errorf("q.wakeSignal = %v WANT %v", q.wakeSignal, ws) - } -} +func TestTimeQueue_Push_WeCanPushAsManyMessagesAsWeWantAtNowWhileStoppedWithoutCapacity(t *testing.T) { + tq := NewTimeQueue() + tq.Stop() -func TestTimeQueue_spawnWakeSignal_nil(t *testing.T) { - q := New() - if result := q.spawnWakeSignal(); result != false { - t.Errorf("q.spawnWakeSignal() = %v WANT %v", result, false) + now := time.Now() + for i := 0; i < 100; i++ { + tq.Push(now, i) } } -func TestTimeQueue_spawnWakeSignal_nonNil(t *testing.T) { - q := New() - ws := newWakeSignal(q.wakeChan, time.Now().Add(time.Duration(1)*time.Second)) - ws.kill() - q.setWakeSignal(ws) - if result := q.spawnWakeSignal(); result != true { - t.Errorf("q.spawnWakeSignal() = %v WANT %v", result, true) - } -} +func TestTimeQueue_PushAll_WeCanPushAllWithAsManyMessagesAsWeWantAtNowWhileRunningWithoutCapacity(t *testing.T) { + done := make(chan struct{}) -func TestTimeQueue_killWakeSignal_nil(t *testing.T) { - q := New() - if result := q.killWakeSignal(); result != false { - t.Errorf("q.killWakeSignal() = %v WANT %v", result, false) - } -} + tq := NewTimeQueue() + defer close(done) + defer tq.Stop() -func TestTimeQueue_killWakeSignal_nonNil(t *testing.T) { - q := New() - q.setWakeSignal(newWakeSignal(q.wakeChan, time.Now().Add(time.Duration(1)*time.Second))) - if result := q.killWakeSignal(); result != true { - t.Errorf("q.killWakeSignal() = %v WANT %v", result, true) + now := time.Now() + messages := []*Message{} + for i := 0; i < 100; i++ { + messages = append(messages, NewMessage(now, i)) } -} -func TestTimeQueue_Stop_notRunning(t *testing.T) { - q := New() - q.Stop() -} + tq.PushAll(messages...) -func TestTimeQueue_Stop_running(t *testing.T) { - q := New() - q.setRunning(true) - q.Stop() - q.run() - if result := q.IsRunning(); result != false { - t.Errorf("q.IsRunning() = %v WANT %v", result, false) - } -} + go func() { + for { + select { + case <-tq.Messages(): -func TestTimeQueue_IsRunning(t *testing.T) { - tests := []struct { - value bool - }{ - {true}, - {false}, - } - for _, test := range tests { - q := New() - q.running = test.value - if result := q.IsRunning(); result != test.value { - t.Errorf("q.IsRunning() = %v WANT %v", result, test.value) + case <-done: + return + } } - } + }() } -func TestTimeQueue_setRunning(t *testing.T) { - tests := []struct { - value bool - }{ - {false}, - {true}, - } - for _, test := range tests { - q := New() - q.setRunning(test.value) - if result := q.running; result != test.value { - t.Errorf("q.running = %v WANT %v", result, test.value) - } - } -} +func TestTimeQueue_PushAll_CorrectlyStopsThenResetsTheTimerWhenWeAddANewHeadWithMessagesAlreadyInTheQueue(t *testing.T) { + tq := NewTimeQueueCapacity(2) //We need capacity so we don't block. -func TestNewWakeSignal(t *testing.T) { - dst := make(chan time.Time) - wakeTime := time.Now() - ws := newWakeSignal(dst, wakeTime) - if ws.dst != dst { - t.Errorf("ws.dst = %v WANT %v", ws.dst, dst) - } - if ws.src == nil { - t.Errorf("ws.src = nil WANT non-nil") - } - if cap(ws.src) != 1 { - t.Errorf("cap(ws.src) = %v WANT %v", cap(ws.src), 1) - } - if ws.stop == nil { - t.Errorf("ws.stop = nil WANT non-nil") - } - if cap(ws.stop) != 0 { - t.Errorf("cap(ws.stop) = %v WANT %v", cap(ws.stop), 0) - } -} + defer tq.Stop() //Need to be running to check the timer stop. So defer. -func TestWakeSignal_spawn_wake(t *testing.T) { - dst := make(chan time.Time) now := time.Now() - ws := newWakeSignal(dst, now) - ws.spawn() - result := <-dst - time.Sleep(time.Duration(250) * time.Millisecond) - diff := result.Sub(now) - if diff < 0 { - diff = -diff - } - if diff > time.Duration(1)*time.Millisecond { - t.Errorf("<-ws.dst too far away from desired : %v WANT %v", result, now) - } - if ws.src != nil { - t.Errorf("ws.src = nil WANT non-nil") - } -} + tq.PushAll( + NewMessage(now.Add(time.Hour), nil), + ) -func TestWakeSignal_spawn_stop(t *testing.T) { - ws := newWakeSignal(nil, time.Now().Add(time.Duration(1)*time.Second)) - ws.spawn() - ws.stop <- struct{}{} - time.Sleep(time.Duration(250) * time.Millisecond) - if ws.src != nil { - t.Errorf("ws.src = nil WANT non-nil") - } + tq.PushAll( + NewMessage(now, nil), + ) } -func TestWakeSignal_kill(t *testing.T) { - ws := newWakeSignal(nil, time.Now()) - ws.kill() - defer func() { - if result := recover(); result == nil { - t.Errorf("kill() kill() recover() = nil WANT non-nil") - } - }() - ws.kill() -} +func TestTimeQueue_CanSuccessfullyReceiveMessageFromTimerAfterResumingFromStopped(t *testing.T) { + tq := NewTimeQueueCapacity(1) -type testMessageValue struct { - time.Time - Data interface{} -} + tq.Stop() -func areChannelMessagesEqual(actualChan <-chan *Message, want []*Message) bool { - actual := []*Message{} - for i := 0; i < len(want); i++ { - actual = append(actual, <-actualChan) - } - return areMessagesEqual(actual, want) -} + tq.Push(time.Now().Add(time.Second/2), "my message") -func areMessagesEqual(actual, want []*Message) bool { - return (len(actual) == 0 && len(want) == 0) || reflect.DeepEqual(actual, want) + tq.Start() + + m := <-tq.Messages() + if m.Data().(string) != "my message" { + t.Fatal() + } }