As an engineer on Stackdriver’s monitoring API I’m always curious how easy or hard it is to try to use the API. I decided to start with a very bad question: can you get a list of all of the time series Stackdriver has?
I know the timeseries.list endpoint requires a metric type in the filter. So that means at the least I have to query for each metric type individually. I know that each time series is
- a metric type,
- some labels,
- a resource type, and
- some more labels.
Since resource type is always defined, I can then query by pairs of (resource type, metric type) instead of just each metric type.
I’ll do everything in Go because I know I want rate limiting and I understand that in Go far more than in any other language. Please accept my apologies in advance for the state of my Go code. My most readable code is MatLab :(.
Based on the decision to get pairs of (resource type, metric type), I start by fetching all the possible values for those two types:
func fetchResourceTypes(ctx context.Context, client *monitoring.MetricClient, projectID string) ([]string, error) {
it := client.ListMonitoredResourceDescriptors(ctx,
&monitoringpb.ListMonitoredResourceDescriptorsRequest{
Name: fmt.Sprintf("projects/%s", projectID),
},
)
types := []string{}
for d, err := it.Next(); err != iterator.Done; d, err = it.Next() {
if err != nil {
return nil, errors.Wrap(err, "failed to fetch resource descriptor")
}
types = append(types, d.GetType())
}
return types, nil
}
func fetchMetricTypes(ctx context.Context, client *monitoring.MetricClient, projectID string) ([]string, error) {
it := client.ListMetricDescriptors(ctx,
&monitoringpb.ListMetricDescriptorsRequest{
Name: fmt.Sprintf("projects/%s", projectID),
},
)
types := []string{}
for d, err := it.Next(); err != iterator.Done; d, err = it.Next() {
if err != nil {
return nil, errors.Wrap(err, "failed to fetch metric descriptor")
}
types = append(types, d.GetType())
}
return types, nil
}
Easy peasy. Next, I call timeseries.list for each pair. This is over 100,000 pairs so I need rate limiting to avoid the monitoring API quota on queries. Borrowing from the rate limiting Go example, I fetch, wait, fetch, wait, …
go func() {
for timeSeriesID := range timeSeriesIDs {
log.Print(timeSeriesID)
}
log.Print("finished fetching time series IDs")
}() // Handle time series IDs as they come in
end := time.Now()
ticker := time.NewTicker(100 * time.Millisecond)
for _, resourceType := range resourceTypes {
for _, metricType := range metricTypes {
go fetchTimeSeriesHeaders(ctx, client, projectID, resourceType, metricType, end, timeSeriesIDs)
<-ticker.C
}
}
But how do I wait for all of those finish? With the help of a colleague I found sync which provides an incremental barrier:
barrier := sync.WaitGroup{}
for _, resourceType := range resourceTypes {
for _, metricType := range metricTypes {
barrier.Add(1) // Erect another barrier for the upcoming request.
go fetchTimeSeriesHeaders(ctx, client, projectID, resourceType, metricType, end, timeSeriesIDs)
<-ticker.C
}
}
Now I can write the fetcher, bringing down the barrier when each request is finished:
type TimeSeriesID struct {
Metric *metricpb.Metric
Resource *monitoredrespb.MonitoredResource
}
func (id TimeSeriesID) String() string {
return fmt.Sprintf("(%s, %s)",
proto.MarshalTextString(id.Metric),
proto.MarshalTextString(id.Resource))
}
func fetchTimeSeriesHeaders(ctx context.Context, client *monitoring.MetricClient,
projectID, resourceType, metricType string, end time.Time,
c chan<- TimeSeriesID, barrier *sync.WaitGroup) {
defer barrier.Done() // Tear down another barrier when the response is used.
it := client.ListTimeSeries(ctx, &monitoringpb.ListTimeSeriesRequest{
Name: fmt.Sprintf("projects/%s", projectID),
Filter: fmt.Sprintf("resource.type=%s metric.type=\"%s\"", resourceType, metricType),
View: monitoringpb.ListTimeSeriesRequest_HEADERS,
Interval: &monitoringpb.TimeInterval{
EndTime: &googlepb.Timestamp{Seconds: end.Unix()},
StartTime: &googlepb.Timestamp{Seconds: end.Unix() - 60},
},
})
for h, err := it.Next(); err != iterator.Done; h, err = it.Next() {
if err != nil {
log.Print(errors.Wrapf(err, "failed to fetch %s %s", resourceType, metricType))
break
}
c <- TimeSeriesID{Metric: h.GetMetric(), Resource: h.GetResource()}
}
}
and I then give it a whirl. The logs have a bunch of spam of the form The
supplied filter does not specify a valid combination of metric and monitored
resource descriptors. The query will not return any time series.
Gross!
func fetchTimeSeriesHeaders(
...
for h, err := it.Next(); err != iterator.Done; h, err = it.Next() {
if status, ok := status.FromError(err); ok && status.Code() == codes.InvalidArgument && status.Message() == "The supplied filter does not specify a valid combination of metric and monitored resource descriptors. The query will not return any time series." {
break
}
...
}
}
Note: The Google APIs for Go guidance says to typecast errors to
googleapi.Error
but I found that for the monitoring API, you should actually usestatus.FromError
which translates correctly.
Great, but my next whirl still took a really long time. To check for a specific
pair that I know has data, I removed the fetchResourceTypes
and
fetchMetricTypes
calls just to check the pipes:
resourceTypes = []string{"consumed_api"}
metricTypes = []string{"serviceruntime.googleapis.com/api/request_count"}
That works. So to reduce anxiety, I add a little bit of logging:
func fetchTimeSeriesHeaders(...) {
...
numStreams := 0
for h, err := it.Next(); err != iterator.Done; h, err = it.Next() {
...
numStreams++
}
if numStreams > 0 {
log.Printf("%d streams found for (%s, %s)", numStreams, resourceType, metricType)
}
}
I’m not seeing the requests counted in Stackdriver’s Metrics Explorer so I try to blow my QPS quota by increasing the QPS to 1000
...
ticker := time.NewTicker(1 * time.Millisecond)
...
which did blow the quota but referenced some unknown project. Since I want to track my QPS, I’ll use a downloaded service account credential, based on the authentication part of the client library page.
export GOOGLE_APPLICATION_CREDENTIALS=/home/syed/credentials.json
In the next run, some responses are INTERNAL
error so I add bad retry logic:
func fetchTimeSeriesHeaders(...) {
...
for h, err := it.Next(); err != iterator.Done; h, err = it.Next() {
if s, ok := status.FromError(err); ok && s.Code() == codes.Internal {
barrier.Add(1) // Erect another barrier.
go fetchTimeSeriesHeaders(ctx, client, projectID, resourceType, metricType, end, c, barrier)
}
...
}
...
}
This retry logic is bad because there is no retry delay or backoff. I’ll deal with this later.
This still ran a long time, so I’m going to move the workload to
Google Cloud Functions so that I can
disconnect from Google Cloud Shell while running the scrape. Google Cloud
Functions deploys locally from a folder, so I create a folder ~/scrape
and
move the code into ~/scrape/fn.go
. I run
gcloud functions deploy scrape --runtime go111 --trigger-topic scrape
which fails due to missing dependencies. 2 minutes of Googling got me to the docs for specifying Go dependencies which is as simple as
go init module scrape && go mod tidy
but I also need to reformat my Go function to be treated like a package:
package scrape
...
func OnMessage(ctx context.Context, _ PubSubMessage) error { // Replaces main().
// Make sure to remove ctx := context.Background(); I forgot to do that.
...
}
and the command changes to
gcloud functions deploy scrape --entry-point OnMessage --runtime go111 --trigger-topic scrape
For build verification, I took a cue from a Reddit
post
and created a dummy test fn_test.go
that just triggers the message:
func TestDummy(_ *testing.T) {
ctx := context.Background()
log.Print(OnMessage(ctx, PubSubMessage{}))
}
To clean up the upload to Google Cloud Functions, I create a file
.gcloudignore
inside scrape/
with:
.gcloudignore
*.wp
*_test.go
Now temporary files and test files won’t pollute my deployed function. The local
trigger is just go test
.
NOTE: I tried .* and .* and that excluded everything, so don’t do that.
I also have to change all my early-exit log messages from
log.Fatal(errors.Wrap())
to return errors.Wrap()
because Google Cloud
Functions prefers reporting errors to crashing programs.
I also found that Google Cloud Functions and Google Cloud Shell use different
environment variables for the hosting project. Google Cloud Functions uses
GCP_PROJECT
while Google Cloud Shell
uses
GOOGLE_CLOUD_PROJECT
. So I modify the project identification in my scraper:
...
func OnMessage(ctx context.Context, _ PubSubMessage) error {
...
projectID := os.Getenv("GCP_PROJECT") // Set in Google Cloud Functions.
if projectID == "" {
projectID = os.Getenv("GOOGLE_CLOUD_PROJECT") // Set in Google Cloud Shell.
}
...
}
On the run in Google Cloud Functions, requests are getting cancelled, which makes me think that Google Cloud Functions doesn’t expect long-running functions. So next, I’ll work on trimming queries to what actually has data.