index of Stackdriver timeseries

trying to find out what timeseries Stackdriver has in my project

As an engineer on Stackdriver’s monitoring API I’m always curious how easy or hard it is to try to use the API. I decided to start with a very bad question: can you get a list of all of the time series Stackdriver has?

I know the timeseries.list endpoint requires a metric type in the filter. So that means at the least I have to query for each metric type individually. I know that each time series is

  • a metric type,
  • some labels,
  • a resource type, and
  • some more labels.

Since resource type is always defined, I can then query by pairs of (resource type, metric type) instead of just each metric type.

I’ll do everything in Go because I know I want rate limiting and I understand that in Go far more than in any other language. Please accept my apologies in advance for the state of my Go code. My most readable code is MatLab :(.

Based on the decision to get pairs of (resource type, metric type), I start by fetching all the possible values for those two types:

func fetchResourceTypes(ctx context.Context, client *monitoring.MetricClient, projectID string) ([]string, error) {
  it := client.ListMonitoredResourceDescriptors(ctx,
    &monitoringpb.ListMonitoredResourceDescriptorsRequest{
      Name: fmt.Sprintf("projects/%s", projectID),
    },
  )
  types := []string{}
  for d, err := it.Next(); err != iterator.Done; d, err = it.Next() {
    if err != nil {
      return nil, errors.Wrap(err, "failed to fetch resource descriptor")
    }
    types = append(types, d.GetType())
  }
  return types, nil
}

func fetchMetricTypes(ctx context.Context, client *monitoring.MetricClient, projectID string) ([]string, error) {
  it := client.ListMetricDescriptors(ctx,
    &monitoringpb.ListMetricDescriptorsRequest{
      Name: fmt.Sprintf("projects/%s", projectID),
    },
  )
  types := []string{}
  for d, err := it.Next(); err != iterator.Done; d, err = it.Next() {
    if err != nil {
      return nil, errors.Wrap(err, "failed to fetch metric descriptor")
    }
    types = append(types, d.GetType())
  }
  return types, nil
}

Easy peasy. Next, I call timeseries.list for each pair. This is over 100,000 pairs so I need rate limiting to avoid the monitoring API quota on queries. Borrowing from the rate limiting Go example, I fetch, wait, fetch, wait, …

go func() {
  for timeSeriesID := range timeSeriesIDs {
    log.Print(timeSeriesID)
  }
  log.Print("finished fetching time series IDs")
}() // Handle time series IDs as they come in

end := time.Now()
ticker := time.NewTicker(100 * time.Millisecond)
for _, resourceType := range resourceTypes {
  for _, metricType := range metricTypes {
    go fetchTimeSeriesHeaders(ctx, client, projectID, resourceType, metricType, end, timeSeriesIDs)
    <-ticker.C
  }
}

But how do I wait for all of those finish? With the help of a colleague I found sync which provides an incremental barrier:

barrier := sync.WaitGroup{}
for _, resourceType := range resourceTypes {
  for _, metricType := range metricTypes {
    barrier.Add(1) // Erect another barrier for the upcoming request.
    go fetchTimeSeriesHeaders(ctx, client, projectID, resourceType, metricType, end, timeSeriesIDs)
    <-ticker.C
  }
}

Now I can write the fetcher, bringing down the barrier when each request is finished:

type TimeSeriesID struct {
  Metric   *metricpb.Metric
  Resource *monitoredrespb.MonitoredResource
}

func (id TimeSeriesID) String() string {
  return fmt.Sprintf("(%s, %s)",
    proto.MarshalTextString(id.Metric),
    proto.MarshalTextString(id.Resource))
}

func fetchTimeSeriesHeaders(ctx context.Context, client *monitoring.MetricClient,
  projectID, resourceType, metricType string, end time.Time,
  c chan<- TimeSeriesID, barrier *sync.WaitGroup) {
  defer barrier.Done() // Tear down another barrier when the response is used.

  it := client.ListTimeSeries(ctx, &monitoringpb.ListTimeSeriesRequest{
    Name:   fmt.Sprintf("projects/%s", projectID),
    Filter: fmt.Sprintf("resource.type=%s metric.type=\"%s\"", resourceType, metricType),
    View:   monitoringpb.ListTimeSeriesRequest_HEADERS,
    Interval: &monitoringpb.TimeInterval{
      EndTime:   &googlepb.Timestamp{Seconds: end.Unix()},
      StartTime: &googlepb.Timestamp{Seconds: end.Unix() - 60},
    },
  })
  for h, err := it.Next(); err != iterator.Done; h, err = it.Next() {
    if err != nil {
      log.Print(errors.Wrapf(err, "failed to fetch %s %s", resourceType, metricType))
      break
    }
    c <- TimeSeriesID{Metric: h.GetMetric(), Resource: h.GetResource()}
  }
}

and I then give it a whirl. The logs have a bunch of spam of the form The supplied filter does not specify a valid combination of metric and monitored resource descriptors. The query will not return any time series. Gross!

func fetchTimeSeriesHeaders(
  ...
  for h, err := it.Next(); err != iterator.Done; h, err = it.Next() {
    if status, ok := status.FromError(err); ok && status.Code() == codes.InvalidArgument && status.Message() == "The supplied filter does not specify a valid combination of metric and monitored resource descriptors. The query will not return any time series." {
      break
    }
    ...
  }
}

Note: The Google APIs for Go guidance says to typecast errors to googleapi.Error but I found that for the monitoring API, you should actually use status.FromError which translates correctly.

Great, but my next whirl still took a really long time. To check for a specific pair that I know has data, I removed the fetchResourceTypes and fetchMetricTypes calls just to check the pipes:

resourceTypes = []string{"consumed_api"}
metricTypes = []string{"serviceruntime.googleapis.com/api/request_count"}

That works. So to reduce anxiety, I add a little bit of logging:

func fetchTimeSeriesHeaders(...) {
  ...
	numStreams := 0
	for h, err := it.Next(); err != iterator.Done; h, err = it.Next() {
    ...
		numStreams++
	}
	if numStreams > 0 {
		log.Printf("%d streams found for (%s, %s)", numStreams, resourceType, metricType)
	}
}

I’m not seeing the requests counted in Stackdriver’s Metrics Explorer so I try to blow my QPS quota by increasing the QPS to 1000

...
ticker := time.NewTicker(1 * time.Millisecond)
...

which did blow the quota but referenced some unknown project. Since I want to track my QPS, I’ll use a downloaded service account credential, based on the authentication part of the client library page.

export GOOGLE_APPLICATION_CREDENTIALS=/home/syed/credentials.json

In the next run, some responses are INTERNAL error so I add bad retry logic:

func fetchTimeSeriesHeaders(...) {
  ...
	for h, err := it.Next(); err != iterator.Done; h, err = it.Next() {
		if s, ok := status.FromError(err); ok && s.Code() == codes.Internal {
			barrier.Add(1) // Erect another barrier.
			go fetchTimeSeriesHeaders(ctx, client, projectID, resourceType, metricType, end, c, barrier)
		}
    ...
	}
  ...
}

This retry logic is bad because there is no retry delay or backoff. I’ll deal with this later.

This still ran a long time, so I’m going to move the workload to Google Cloud Functions so that I can disconnect from Google Cloud Shell while running the scrape. Google Cloud Functions deploys locally from a folder, so I create a folder ~/scrape and move the code into ~/scrape/fn.go. I run

gcloud functions deploy scrape --runtime go111 --trigger-topic scrape

which fails due to missing dependencies. 2 minutes of Googling got me to the docs for specifying Go dependencies which is as simple as

go init module scrape && go mod tidy

but I also need to reformat my Go function to be treated like a package:

package scrape
...
func OnMessage(ctx context.Context, _ PubSubMessage) error { // Replaces main().
  // Make sure to remove ctx := context.Background(); I forgot to do that.
  ...
}

and the command changes to

gcloud functions deploy scrape --entry-point OnMessage --runtime go111 --trigger-topic scrape

For build verification, I took a cue from a Reddit post and created a dummy test fn_test.go that just triggers the message:

func TestDummy(_ *testing.T) {
  ctx := context.Background()
  log.Print(OnMessage(ctx, PubSubMessage{}))
}

To clean up the upload to Google Cloud Functions, I create a file .gcloudignore inside scrape/ with:

.gcloudignore
*.wp
*_test.go

Now temporary files and test files won’t pollute my deployed function. The local trigger is just go test.

NOTE: I tried .* and .* and that excluded everything, so don’t do that.

I also have to change all my early-exit log messages from log.Fatal(errors.Wrap()) to return errors.Wrap() because Google Cloud Functions prefers reporting errors to crashing programs.

I also found that Google Cloud Functions and Google Cloud Shell use different environment variables for the hosting project. Google Cloud Functions uses GCP_PROJECT while Google Cloud Shell uses GOOGLE_CLOUD_PROJECT. So I modify the project identification in my scraper:

...
func OnMessage(ctx context.Context, _ PubSubMessage) error {
  ...
  projectID := os.Getenv("GCP_PROJECT") // Set in Google Cloud Functions.
  if projectID == "" {
    projectID = os.Getenv("GOOGLE_CLOUD_PROJECT") // Set in Google Cloud Shell.
  }
  ...
}

On the run in Google Cloud Functions, requests are getting cancelled, which makes me think that Google Cloud Functions doesn’t expect long-running functions. So next, I’ll work on trimming queries to what actually has data.

Published by using 1320 words.