pubsub: add ordered keys

Pub/Sub ordered keys requires:

- Publish with a non-empty key gets added to a per-key queue, as implemented
by Bundler.HandlerLimit=1. This setting causes the bundler to only have one
outstanding bundle being handled at a time; further bundles form a queue.
- Receive with a message that has a non-empty key gets added to a per-key
queue, as implemented by a slice. When a worker operates on a key, it
processes all items in the key's slice until the slice is empty, at which point
it deletes the key to release the key's resources, and the worker returns to
the worker pool.

Functionally, for users, this should involve:

- Slightly slower processing speeds due to additional data structures
along the Publish and Receive paths. (we should do some performance
testing to vet this out)
- PublishSettings.NumGoroutines and ReceiveSettings.NumGoroutines should
both result in double the number of goroutines, since an additional data
structure (publish/receive scheduler) uses that value. Documentation has
been adjusted accordingly to describe it more as a scaling factor than
an exact number.
- Default ReceiveSettings.NumGoroutines is increased from 1 to 10. It's
generally a better experience to have multiple workers than one. And,
some tests - like TestStreamingPullFlowControl - implicitly require it.
- Since ordered keys require only a single outstanding RPC at once, it is
possible to send ordered key messages to Topic.Publish (and subsequently to
PublishScheduler.Add) faster than the bundler can publish them to the
Pub/Sub service, resulting in a backed up queue of Pub/Sub bundles. Each
item in the bundler queue is a goroutine, which means users may see thousands
of goroutines in debugging they might do.

TODO: add Resume method.

Change-Id: Ib710944d557970290cf12321d1fdbd9b1699231d
16 files changed
tree: 30d2f1fb7cd37bef9a7e52c08d3bde407febe792
  1. asset/
  2. automl/
  3. bigquery/
  4. bigtable/
  5. civil/
  6. cloudtasks/
  7. cmd/
  8. compute/
  9. container/
  10. containeranalysis/
  11. dataproc/
  12. datastore/
  13. debugger/
  14. dialogflow/
  15. dlp/
  16. errorreporting/
  17. expr/
  18. firestore/
  19. functions/
  20. grafeas/
  21. httpreplay/
  22. iam/
  23. internal/
  24. iot/
  25. irm/
  26. kms/
  27. language/
  28. logging/
  29. longrunning/
  30. monitoring/
  31. oslogin/
  32. phishingprotection/
  33. profiler/
  34. pubsub/
  35. recaptchaenterprise/
  36. recommender/
  37. redis/
  38. rpcreplay/
  39. scheduler/
  40. securitycenter/
  41. spanner/
  42. speech/
  43. storage/
  44. talent/
  45. texttospeech/
  46. trace/
  47. translate/
  48. videointelligence/
  49. vision/
  50. webrisk/
  51. CHANGES.md
  52. CODE_OF_CONDUCT.md
  53. CONTRIBUTING.md
  54. doc.go
  55. examples_test.go
  56. gapics.txt
  57. go.mod
  58. go.sum
  59. issue_template.md
  60. LICENSE
  61. license_test.go
  62. manuals.txt
  63. microgens.csv
  64. README.md
  65. regen-gapic.sh
  66. RELEASING.md
  67. tidyall.sh
  68. tools.go
README.md

Google Cloud Client Libraries for Go

GoDoc

Go packages for Google Cloud Platform services.

import "cloud.google.com/go"

To install the packages on your system, do not clone the repo. Instead use

$ go get -u cloud.google.com/go/...

NOTE: Some of these packages are under development, and may occasionally make backwards-incompatible changes.

NOTE: Github repo is a mirror of https://code.googlesource.com/gocloud.

Supported APIs

Google APIStatusPackage
Assetalphacloud.google.com/go/asset/v1beta
BigQuerystablecloud.google.com/go/bigquery
Bigtablestablecloud.google.com/go/bigtable
Cloudtasksstablecloud.google.com/go/cloudtasks/apiv2
Containerstablecloud.google.com/go/container/apiv1
ContainerAnalysisbetacloud.google.com/go/containeranalysis/apiv1beta1
Dataprocstablecloud.google.com/go/dataproc/apiv1
Datastorestablecloud.google.com/go/datastore
Debuggeralphacloud.google.com/go/debugger/apiv2
Dialogflowalphacloud.google.com/go/dialogflow/apiv2
Data Loss Preventionalphacloud.google.com/go/dlp/apiv2
ErrorReportingalphacloud.google.com/go/errorreporting
Firestorestablecloud.google.com/go/firestore
IAMstablecloud.google.com/go/iam
IoTalphacloud.google.com/iot/apiv1
KMSstablecloud.google.com/go/kms
Natural Languagestablecloud.google.com/go/language/apiv1
Loggingstablecloud.google.com/go/logging
Monitoringalphacloud.google.com/go/monitoring/apiv3
OS Loginalphacloud.google.com/go/oslogin/apiv1
Pub/Substablecloud.google.com/go/pubsub
Phishing Protectionalphacloud.google.com/go/phishingprotection/apiv1betad1
reCAPTCHA Enterprisealphacloud.google.com/go/recaptchaenterprise/apiv1betad1
Memorystorealphacloud.google.com/go/redis/apiv1
Schedulerstablecloud.google.com/go/scheduler/apiv1
Spannerstablecloud.google.com/go/spanner
Speechstablecloud.google.com/go/speech/apiv1
Storagestablecloud.google.com/go/storage
Talentalphacloud.google.com/go/talent/apiv4beta1
Text To Speechalphacloud.google.com/go/texttospeech/apiv1
Tracealphacloud.google.com/go/trace/apiv2
Translatestablecloud.google.com/go/translate
Video Intelligencealphacloud.google.com/go/videointelligence/apiv1beta1
Visionstablecloud.google.com/go/vision/apiv1

Alpha status: the API is still being actively developed. As a result, it might change in backward-incompatible ways and is not recommended for production use.

Beta status: the API is largely complete, but still has outstanding features and bugs to be addressed. There may be minor backwards-incompatible changes where necessary.

Stable status: the API is mature and ready for production use. We will continue addressing bugs and feature requests.

Documentation and examples are available at godoc.org/cloud.google.com/go

Go Versions Supported

We support the two most recent major versions of Go. If Google App Engine uses an older version, we support that as well.

Authorization

By default, each API will use Google Application Default Credentials for authorization credentials used in calling the API endpoints. This will allow your application to run in many environments without requiring explicit configuration.

client, err := storage.NewClient(ctx)

To authorize using a JSON key file, pass option.WithCredentialsFile to the NewClient function of the desired package. For example:

client, err := storage.NewClient(ctx, option.WithCredentialsFile("path/to/keyfile.json"))

You can exert more control over authorization by using the golang.org/x/oauth2 package to create an oauth2.TokenSource. Then pass option.WithTokenSource to the NewClient function: snip:# (auth-ts)

tokenSource := ...
client, err := storage.NewClient(ctx, option.WithTokenSource(tokenSource))

Contributing

Contributions are welcome. Please, see the CONTRIBUTING document for details. We‘re using Gerrit for our code reviews. Please don’t open pull requests against this repo, new pull requests will be automatically closed.

Please note that this project is released with a Contributor Code of Conduct. By participating in this project you agree to abide by its terms. See Contributor Code of Conduct for more information.