index of Stackdriver timeseries revisited

decreasing the number of queries in a Stackdriver monitoring scan

In the previous post I ran into trouble trying to handle 120,000 timeseries.list queries. Let’s cut that down a bit. I know I don’t have a choice but to iterate over all the metric types, because I don’t know which ones are available. But I don’t have to iterate over all the resource types, because timeseries.list doesn’t require a resource type restriction. So let’s get rid of fetchResourceTypes and instead scan for the resource types associated with each metric type. Now there’s at least three steps:

  • fetch all metric types,
  • for each metric type, fetch all resource types with data, and
  • for each metric type and resource type, fetch all time series IDs.

That’s enough steps that we should start treating everything as channels with message passing. The first one is easy enough: fetching all metric types and feeding them into a channel. It’s a small modification of fetchMetricTypes:

func fetchMetricTypes(ctx context.Context, client *monitoring.MetricClient, projectID string,
	out chan<- *metricpb.MetricDescriptor, e chan<- error) {
	it := client.ListMetricDescriptors(ctx,
		&monitoringpb.ListMetricDescriptorsRequest{
			Name: fmt.Sprintf("projects/%s", projectID),
		},
	)
	for descriptor, err := it.Next(); err != iterator.Done; descriptor, err = it.Next() {
		if err != nil {
			e <- errors.Wrap(err, "failed to fetch metric descriptor")
		}
		out <- descriptor
	}
	close(out)
	log.Print("finished fetching metric types")
}

Then I get all the resource types with data for each metric type. This is the killer reduction, because most metric types are only associated with a handful of resource types. The result of this operation should be a representation of the pairs of (resource type, metric type) that have data. However, Stackdriver allows multiple projects in a workspace, each with different pairs. So this is a triplet:

type TimeSeriesDescriptor struct {
	Project  string
	Metric   string
	Resource string
}

func (descriptor TimeSeriesDescriptor) String() string {
	return fmt.Sprintf("(%s, %s, %s)", descriptor.Project, descriptor.Resource, descriptor.Metric)
}

With that object in a good-enough state, fetching resource types based on the a metric descriptor is a bunch of timeseries.list queries that aggregate everything away except for project ID.


func fetchResourceTypes(ctx context.Context, client *monitoring.MetricClient,
	project string, start, end time.Time, descriptor *metricpb.MetricDescriptor,
	out chan<- TimeSeriesDescriptor, barrier *sync.WaitGroup, e chan<- error) {
	defer barrier.Done()

	pieces := strings.SplitN(descriptor.GetName(), "/", 3)
	request := monitoringpb.ListTimeSeriesRequest{
		Name:   fmt.Sprintf("projects/%s", project),
		Filter: fmt.Sprintf("project=%s metric.type=\"%s\"", pieces[1], descriptor.GetType()),
		Interval: &monitoringpb.TimeInterval{
			StartTime: &googlepb.Timestamp{Seconds: start.Unix()},
			EndTime:   &googlepb.Timestamp{Seconds: end.Unix()},
		},
		View: monitoringpb.ListTimeSeriesRequest_HEADERS,
		Aggregation: &monitoringpb.Aggregation{
			GroupByFields: []string{"resource.labels.project_id"},
		},
	}
	if descriptor.GetMetricKind() == metricpb.MetricDescriptor_GAUGE {
		request.Aggregation.PerSeriesAligner = monitoringpb.Aggregation_ALIGN_NEXT_OLDER
	}

	it := client.ListTimeSeries(ctx, &request)
	for header, err := it.Next(); err != iterator.Done; header, err = it.Next() {
		if err != nil {
			e <- err
			break
		} else {
			timeSeriesDescriptor := TimeSeriesDescriptor{
				Project:  header.GetResource().GetLabels()["project_id"],
				Metric:   descriptor.GetType(),
				Resource: header.GetResource().GetType(),
			}
			log.Printf("sending out time series descriptor %s", timeSeriesDescriptor)
			out <- timeSeriesDescriptor
		}
	}
}

Three things in the above require Stackdriver-specific context:

  • There is no per-series aligner that works for gauge, cumulative, and delta metrics, so we have to choose the appropriate one or we’ll get an INVALID_ARGUMENT error.
  • Grouping by resource.labels.project_id is for when two Stackdriver monitored projects have the same metric type but different metric descriptors (one team added a label and another didn’t, for example).
  • The rest of the aggregation parameters (per-series alignment, cross-series reduction, alignment periods) don’t matter because we set view to HEADERS.

One thing is Go-specific: by having fetchResourceTypes take in an object and push to a channel, I can get a smooth QPS in fetchResourceTypes calls:

go func(in <-chan *metricpb.MetricDescriptor, out chan<- TimeSeriesDescriptor, e chan<- error) {
  barrier := sync.WaitGroup{}
  ticker := time.NewTicker(10 * time.Millisecond)
  for descriptor := range in {
    barrier.Add(1)
    go fetchResourceTypes(ctx, client, projectID, start, end, descriptor, out, &barrier, e)
    <-ticker.C
  }
  barrier.Wait()
  log.Print("finished fetching time series descriptors")
  close(out)
}(metricDescriptors, timeSeriesDescriptors, stop)

This gets me the near-100 QPS that’s near my quota. I know I’m getting near the point where I should use interface(?) design for all of these, but I’ll get there the slow way.

Once I have a project, metric type, and resource type, I can get all the time series IDs. A time series ID is a metric type, resource type, metric labels, and resource labels (which already include the project), so the object is

type TimeSeriesID struct {
	Metric   *metricpb.Metric
	Resource *monitoredrespb.MonitoredResource
}

func (id TimeSeriesID) String() string {
	return fmt.Sprintf("(%s, %s, %s, %s)",
		id.Resource.GetType(), id.Metric.GetType(),
		id.Resource.GetLabels(), id.Metric.GetLabels())
}

and a request for that object given the triplet of project, resource type, and metric type is a small modification of the request in fetchResourceTypes to add the resource type as a restriction and remove the aggregation.

func fetchTimeSeriesIDs(ctx context.Context, client *monitoring.MetricClient,
	project string, start, end time.Time, descriptor TimeSeriesDescriptor,
	out chan<- TimeSeriesID, barrier *sync.WaitGroup, e chan<- error) {
	defer barrier.Done()

	it := client.ListTimeSeries(ctx, &monitoringpb.ListTimeSeriesRequest{
		Name: fmt.Sprintf("projects/%s", project),
		Filter: fmt.Sprintf("project=%s resource.type=%s metric.type=\"%s\"",
			descriptor.Project, descriptor.Resource, descriptor.Metric),
		Interval: &monitoringpb.TimeInterval{
			StartTime: &googlepb.Timestamp{Seconds: start.Unix()},
			EndTime:   &googlepb.Timestamp{Seconds: end.Unix()},
		},
		View: monitoringpb.ListTimeSeriesRequest_HEADERS,
	})

	for id, err := it.Next(); err != iterator.Done; id, err = it.Next() {
		if s, ok := status.FromError(err); ok && s.Code() == codes.Internal {
			go fetchTimeSeriesIDs(ctx, client, project, start, end, descriptor, out, barrier, e)
			return
		} else if err != nil {
			e <- errors.Wrapf(err, "failed to fetch %s %s", descriptor.Resource, descriptor.Metric)
			return
		}
		timeSeriesID := TimeSeriesID{Metric: id.GetMetric(), Resource: id.GetResource()}
		log.Printf("sending out time series ID %s", timeSeriesID)
		out <- timeSeriesID
	}
}

That’s actually it. I’ll have a dumb receiver that counts the streams and do the asynchronous message forwarding that I described above for all the functions:

type PubSubMessage struct {
	Data        []byte `json:"data"`
	Attributes  map[string]string
	MessageID   string
	PublishTime string
}

func OnMessage(ctx context.Context, _ PubSubMessage) error {
	end := time.Now()
	start := end.Add(-1 * time.Hour)
	projectID := os.Getenv("GCP_PROJECT") // Set in Google Cloud Functions.
	if projectID == "" {
		projectID = os.Getenv("GOOGLE_CLOUD_PROJECT") // Set in Google Cloud Shell.
	}
	log.Printf("Project ID: %s\n", projectID)

	client, err := monitoring.NewMetricClient(ctx)
	if err != nil {
		return errors.Wrap(err, "failed to create client")
	}

	begin := make(chan bool, 0)
	stop := make(chan error, 0)
	metricDescriptors := make(chan *metricpb.MetricDescriptor, 2000)
	go fetchMetricTypes(ctx, client, projectID, begin, metricDescriptors, stop)

	timeSeriesDescriptors := make(chan TimeSeriesDescriptor, 2000)
	go func(in <-chan *metricpb.MetricDescriptor, out chan<- TimeSeriesDescriptor, e chan<- error) {
		barrier := sync.WaitGroup{}
		ticker := time.NewTicker(10 * time.Millisecond)
		for descriptor := range in {
			barrier.Add(1)
			go fetchResourceTypes(ctx, client, projectID, start, end, descriptor, out, &barrier, e)
			<-ticker.C
		}
		barrier.Wait()
		log.Print("finished fetching time series descriptors")
		close(out)
	}(metricDescriptors, timeSeriesDescriptors, stop)

	timeSeriesIDs := make(chan TimeSeriesID, 2000)
	go func(in <-chan TimeSeriesDescriptor, out chan<- TimeSeriesID, e chan<- error) {
		barrier := sync.WaitGroup{}
		ticker := time.NewTicker(10 * time.Millisecond)
		for descriptor := range in {
			barrier.Add(1)
			go fetchTimeSeriesIDs(ctx, client, projectID, start, end, descriptor, out, &barrier, e)
			<-ticker.C
		}
		barrier.Wait()
		log.Print("finished fetching time series IDs")
		close(out)
	}(timeSeriesDescriptors, timeSeriesIDs, stop)

	numStreams := 0
	go func(in <-chan TimeSeriesID, e chan<- error) {
		for _ = range in {
			numStreams++
		}
		log.Print("finished counting time series IDs")
		e <- nil
		close(e)
	}(timeSeriesIDs, stop)

	tick := time.Now()
	begin <- true
	err = <-stop
	log.Printf("elapsed time: %s for %d streams", time.Since(tick), numStreams)
	return err
}

In a local run on Google Cloud Shell, this took 14s for a project that has as little as possible (27 streams, for API call tracking mainly). I think the right thing to get this onto Google Cloud Functions is to have each fetch as its own function, all of them talking through Pub/Sub. That way I get the high QPS I need when the data gets bigger.

Published by using 1210 words.