feat(pubsublite): Periodic background task (#3152)
* feat(pubsublite): Periodic background task
* Add comment for Start called multiple times
* Fixed: Ticker.Reset is not supported before 1.15
diff --git a/pubsublite/internal/wire/periodic_task.go b/pubsublite/internal/wire/periodic_task.go
new file mode 100644
index 0000000..4b5810b
--- /dev/null
+++ b/pubsublite/internal/wire/periodic_task.go
@@ -0,0 +1,70 @@
+// Copyright 2020 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+
+package wire
+
+import (
+ "time"
+)
+
+// periodicTask manages a recurring background task.
+type periodicTask struct {
+ period time.Duration
+ task func()
+ ticker *time.Ticker
+ stop chan struct{}
+}
+
+func newPeriodicTask(period time.Duration, task func()) *periodicTask {
+ return &periodicTask{
+ period: period,
+ task: task,
+ }
+}
+
+// Start the polling goroutine. No-op if the goroutine is already running.
+// The task is executed after the polling period.
+func (pt *periodicTask) Start() {
+ if pt.ticker != nil {
+ return
+ }
+
+ pt.ticker = time.NewTicker(pt.period)
+ pt.stop = make(chan struct{})
+ go pt.poll(pt.ticker, pt.stop)
+}
+
+// Stop/pause the periodic task.
+func (pt *periodicTask) Stop() {
+ if pt.ticker == nil {
+ return
+ }
+
+ pt.ticker.Stop()
+ close(pt.stop)
+
+ pt.ticker = nil
+ pt.stop = nil
+}
+
+func (pt *periodicTask) poll(ticker *time.Ticker, stop chan struct{}) {
+ for {
+ select {
+ case <-stop:
+ // Ends the goroutine.
+ return
+ case <-ticker.C:
+ pt.task()
+ }
+ }
+}
diff --git a/pubsublite/internal/wire/periodic_task_test.go b/pubsublite/internal/wire/periodic_task_test.go
new file mode 100644
index 0000000..c3b04ab
--- /dev/null
+++ b/pubsublite/internal/wire/periodic_task_test.go
@@ -0,0 +1,67 @@
+// Copyright 2020 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+
+package wire
+
+import (
+ "sync/atomic"
+ "testing"
+ "time"
+)
+
+func TestPeriodicTask(t *testing.T) {
+ var callCount int32
+ values := make(chan int32)
+ task := func() {
+ values <- atomic.AddInt32(&callCount, 1)
+ }
+ ptask := newPeriodicTask(10*time.Millisecond, task)
+
+ t.Run("Start1", func(t *testing.T) {
+ ptask.Start()
+ ptask.Start() // Tests duplicate start
+
+ if got, want := <-values, int32(1); got != want {
+ t.Errorf("got %d, want %d", got, want)
+ }
+ })
+
+ t.Run("Stop1", func(t *testing.T) {
+ ptask.Stop()
+ ptask.Stop() // Tests duplicate stop
+
+ select {
+ case got := <-values:
+ t.Errorf("got unexpected value %d", got)
+ default:
+ }
+ })
+
+ t.Run("Start2", func(t *testing.T) {
+ ptask.Start()
+
+ if got, want := <-values, int32(2); got != want {
+ t.Errorf("got %d, want %d", got, want)
+ }
+ })
+
+ t.Run("Stop2", func(t *testing.T) {
+ ptask.Stop()
+
+ select {
+ case got := <-values:
+ t.Errorf("got unexpected value %d", got)
+ default:
+ }
+ })
+}