diff --git a/pubsublite/rpc.go b/pubsublite/rpc.go
new file mode 100644
index 0000000..94ac1d7
--- /dev/null
+++ b/pubsublite/rpc.go
@@ -0,0 +1,100 @@
+// Copyright 2020 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+
+package pubsublite
+
+import (
+	"time"
+
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+
+	gax "github.com/googleapis/gax-go/v2"
+)
+
+// streamRetryer implements the retry policy for establishing gRPC stream
+// connections.
+type streamRetryer struct {
+	bo       gax.Backoff
+	deadline time.Time
+}
+
+func newStreamRetryer(timeout time.Duration) *streamRetryer {
+	return &streamRetryer{
+		bo: gax.Backoff{
+			Initial:    10 * time.Millisecond,
+			Max:        10 * time.Second,
+			Multiplier: 2,
+		},
+		deadline: time.Now().Add(timeout),
+	}
+}
+
+func (r *streamRetryer) RetrySend(err error) (time.Duration, bool) {
+	if time.Now().After(r.deadline) {
+		return 0, false
+	}
+	if isRetryableSendError(err) {
+		return r.bo.Pause(), true
+	}
+	return 0, false
+}
+
+func (r *streamRetryer) RetryRecv(err error) (time.Duration, bool) {
+	if time.Now().After(r.deadline) {
+		return 0, false
+	}
+	if isRetryableRecvError(err) {
+		return r.bo.Pause(), true
+	}
+	return 0, false
+}
+
+func isRetryableSendCode(code codes.Code) bool {
+	switch code {
+	// Client-side errors that occur during grpc.ClientStream.SendMsg() have a
+	// smaller set of retryable codes.
+	case codes.DeadlineExceeded, codes.Unavailable:
+		return true
+	default:
+		return false
+	}
+}
+
+func isRetryableRecvCode(code codes.Code) bool {
+	switch code {
+	// Consistent with https://github.com/googleapis/java-pubsublite/blob/master/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/ErrorCodes.java
+	case codes.Aborted, codes.DeadlineExceeded, codes.Internal, codes.ResourceExhausted, codes.Unavailable, codes.Unknown:
+		return true
+	default:
+		return false
+	}
+}
+
+func isRetryableSendError(err error) bool {
+	return isRetryableStreamError(err, isRetryableSendCode)
+}
+
+func isRetryableRecvError(err error) bool {
+	return isRetryableStreamError(err, isRetryableRecvCode)
+}
+
+func isRetryableStreamError(err error, isEligible func(codes.Code) bool) bool {
+	s, ok := status.FromError(err)
+	if !ok {
+		// Includes io.EOF, normal stream close.
+		// Consistent with https://github.com/googleapis/google-cloud-go/blob/master/pubsub/service.go
+		return true
+	}
+	return isEligible(s.Code())
+}
diff --git a/pubsublite/streams.go b/pubsublite/streams.go
new file mode 100644
index 0000000..2e1cb32
--- /dev/null
+++ b/pubsublite/streams.go
@@ -0,0 +1,338 @@
+// Copyright 2020 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+
+package pubsublite
+
+import (
+	"context"
+	"io"
+	"reflect"
+	"sync"
+	"time"
+
+	"google.golang.org/grpc"
+
+	gax "github.com/googleapis/gax-go/v2"
+)
+
+// streamStatus is the status of a retryableStream. A stream starts off
+// uninitialized. While it is active, it can transition between reconnecting and
+// connected due to retryable errors. When a permanent error occurs, the stream
+// is terminated and cannot be reconnected.
+type streamStatus int
+
+const (
+	streamUninitialized streamStatus = 0
+	streamReconnecting  streamStatus = 1
+	streamConnected     streamStatus = 2
+	streamTerminated    streamStatus = 3
+)
+
+// streamHandler provides hooks for different Pub/Sub Lite streaming APIs
+// (e.g. publish, subscribe, streaming cursor, etc.) to use retryableStream.
+// All Pub/Sub Lite streaming APIs implement a similar handshaking protocol,
+// where an initial request and response must be transmitted before other
+// requests can be sent over the stream.
+//
+// streamHandler methods must not be called while holding retryableStream.mu in
+// order to prevent the streamHandler calling back into the retryableStream and
+// deadlocking.
+type streamHandler interface {
+	// newStream implementations must create the client stream with the given
+	// (cancellable) context.
+	newStream(context.Context) (grpc.ClientStream, error)
+	initialRequest() interface{}
+	validateInitialResponse(interface{}) error
+
+	// onStreamStatusChange is used to notify stream handlers when the stream has
+	// changed state. In particular, the `streamTerminated` state must be handled.
+	// retryableStream.Error() returns the error that caused the stream to
+	// terminate. Stream handlers should perform any necessary reset of state upon
+	// `streamConnected`.
+	onStreamStatusChange(streamStatus)
+	// onResponse forwards a response received on the stream to the stream
+	// handler.
+	onResponse(interface{})
+}
+
+// retryableStream is a wrapper around a bidirectional gRPC client stream to
+// handle automatic reconnection when the stream breaks.
+//
+// A retryableStream cycles between the following goroutines:
+//   Start() --> reconnect() <--> listen()
+// terminate() can be called at any time, either by the client to force stream
+// closure, or as a result of an unretryable error.
+//
+// Safe to call capitalized methods from multiple goroutines. All other methods
+// are private implementation.
+type retryableStream struct {
+	// Immutable after creation.
+	ctx          context.Context
+	handler      streamHandler
+	responseType reflect.Type
+	timeout      time.Duration
+
+	// Guards access to fields below.
+	mu sync.Mutex
+
+	// The current connected stream.
+	stream grpc.ClientStream
+	// Function to cancel the current stream (which may be reconnecting).
+	cancelStream context.CancelFunc
+	status       streamStatus
+	finalErr     error
+}
+
+// newRetryableStream creates a new retryable stream wrapper. `timeout` is the
+// maximum duration for reconnection. `responseType` is the type of the response
+// proto received on the stream.
+func newRetryableStream(ctx context.Context, handler streamHandler, timeout time.Duration, responseType reflect.Type) *retryableStream {
+	return &retryableStream{
+		ctx:          ctx,
+		handler:      handler,
+		responseType: responseType,
+		timeout:      timeout,
+	}
+}
+
+// Start establishes a stream connection. It is a no-op if the stream has
+// already started.
+func (rs *retryableStream) Start() {
+	rs.mu.Lock()
+	defer rs.mu.Unlock()
+
+	if rs.status != streamUninitialized {
+		return
+	}
+	go rs.reconnect()
+}
+
+// Stop gracefully closes the stream without error.
+func (rs *retryableStream) Stop() {
+	rs.terminate(nil)
+}
+
+// Send attempts to send the request to the underlying stream and returns true
+// if successfully sent. Returns false if an error occurred or a reconnection is
+// in progress.
+func (rs *retryableStream) Send(request interface{}) (sent bool) {
+	rs.mu.Lock()
+
+	if rs.stream != nil {
+		err := rs.stream.SendMsg(request)
+		// Note: if SendMsg returns an error, the stream is aborted.
+		switch {
+		case err == nil:
+			sent = true
+		case err == io.EOF:
+			// If SendMsg returns io.EOF, RecvMsg will return the status of the
+			// stream. Nothing to do here.
+			break
+		case isRetryableSendError(err):
+			go rs.reconnect()
+		default:
+			rs.mu.Unlock() // terminate acquires the mutex.
+			rs.terminate(err)
+			return
+		}
+	}
+
+	rs.mu.Unlock()
+	return
+}
+
+// Status returns the current status of the retryable stream.
+func (rs *retryableStream) Status() streamStatus {
+	rs.mu.Lock()
+	defer rs.mu.Unlock()
+	return rs.status
+}
+
+// Error returns the error that caused the stream to terminate. Can be nil if it
+// was initiated by Stop().
+func (rs *retryableStream) Error() error {
+	rs.mu.Lock()
+	defer rs.mu.Unlock()
+	return rs.finalErr
+}
+
+func (rs *retryableStream) currentStream() grpc.ClientStream {
+	rs.mu.Lock()
+	defer rs.mu.Unlock()
+	return rs.stream
+}
+
+// clearStream must be called with the retryableStream.mu locked.
+func (rs *retryableStream) clearStream() {
+	if rs.cancelStream != nil {
+		// If the stream did not already abort due to error, this will abort it.
+		rs.cancelStream()
+		rs.cancelStream = nil
+	}
+	if rs.stream != nil {
+		rs.stream = nil
+	}
+}
+
+func (rs *retryableStream) setCancel(cancel context.CancelFunc) {
+	rs.mu.Lock()
+	defer rs.mu.Unlock()
+	rs.cancelStream = cancel
+}
+
+// reconnect attempts to establish a valid connection with the server. Due to
+// the potential high latency, initNewStream() should not be done while holding
+// retryableStream.mu. Hence we need to handle the stream being force terminated
+// during reconnection.
+//
+// Intended to be called in a goroutine. It ends once the connection has been
+// established or the stream terminated.
+func (rs *retryableStream) reconnect() {
+	canReconnect := func() bool {
+		rs.mu.Lock()
+		defer rs.mu.Unlock()
+
+		if rs.status == streamReconnecting {
+			// There can only be 1 goroutine reconnecting.
+			return false
+		}
+		if rs.status == streamTerminated {
+			return false
+		}
+		rs.status = streamReconnecting
+		rs.clearStream()
+		return true
+	}
+	if !canReconnect() {
+		return
+	}
+	rs.handler.onStreamStatusChange(streamReconnecting)
+
+	newStream, cancelFunc, err := rs.initNewStream()
+	if err != nil {
+		rs.terminate(err)
+		return
+	}
+
+	connected := func() bool {
+		rs.mu.Lock()
+		defer rs.mu.Unlock()
+
+		if rs.status == streamTerminated {
+			rs.clearStream()
+			return false
+		}
+		rs.status = streamConnected
+		rs.stream = newStream
+		rs.cancelStream = cancelFunc
+		go rs.listen(newStream)
+		return true
+	}
+	if !connected() {
+		return
+	}
+	rs.handler.onStreamStatusChange(streamConnected)
+}
+
+func (rs *retryableStream) initNewStream() (newStream grpc.ClientStream, cancelFunc context.CancelFunc, err error) {
+	r := newStreamRetryer(rs.timeout)
+	for {
+		backoff, shouldRetry := func() (time.Duration, bool) {
+			defer func() {
+				if err != nil && cancelFunc != nil {
+					cancelFunc()
+					cancelFunc = nil
+					newStream = nil
+				}
+			}()
+
+			var cctx context.Context
+			cctx, cancelFunc = context.WithCancel(rs.ctx)
+			// Store the cancel func to quickly cancel reconnecting if the stream is
+			// terminated.
+			rs.setCancel(cancelFunc)
+
+			newStream, err = rs.handler.newStream(cctx)
+			if err != nil {
+				return r.RetryRecv(err)
+			}
+			if err = newStream.SendMsg(rs.handler.initialRequest()); err != nil {
+				return r.RetrySend(err)
+			}
+			response := reflect.New(rs.responseType).Interface()
+			if err = newStream.RecvMsg(response); err != nil {
+				return r.RetryRecv(err)
+			}
+			if err = rs.handler.validateInitialResponse(response); err != nil {
+				// An unexpected initial response from the server is a permanent error.
+				return 0, false
+			}
+
+			// We have a valid connection and should break from the outer loop.
+			return 0, false
+		}()
+
+		if !shouldRetry {
+			break
+		}
+		if err = gax.Sleep(rs.ctx, backoff); err != nil {
+			break
+		}
+	}
+	return
+}
+
+// listen receives responses from the current stream. It initiates reconnection
+// upon retryable errors or terminates the stream upon permanent error.
+//
+// Intended to be called in a goroutine. It ends when recvStream has closed.
+func (rs *retryableStream) listen(recvStream grpc.ClientStream) {
+	for {
+		response := reflect.New(rs.responseType).Interface()
+		err := recvStream.RecvMsg(response)
+
+		// If the current stream has changed while listening, any errors or messages
+		// received now are obsolete. Discard and end the goroutine. Assume the
+		// stream has been cancelled elsewhere.
+		if rs.currentStream() != recvStream {
+			break
+		}
+		if err != nil {
+			if isRetryableRecvError(err) {
+				go rs.reconnect()
+			} else {
+				rs.terminate(err)
+			}
+			break
+		}
+		rs.handler.onResponse(response)
+	}
+}
+
+// terminate forces the stream to terminate with the given error (can be nil)
+// Is a no-op if the stream has already terminated.
+func (rs *retryableStream) terminate(err error) {
+	rs.mu.Lock()
+	defer rs.mu.Unlock()
+
+	if rs.status == streamTerminated {
+		return
+	}
+	rs.status = streamTerminated
+	rs.finalErr = err
+	rs.clearStream()
+
+	// terminate can be called from within a streamHandler method with a lock
+	// held. So notify from a goroutine to prevent deadlock.
+	go rs.handler.onStreamStatusChange(streamTerminated)
+}
