detect: refactor the detect package

This refactors the detect package with the goal of making it more
similar to otel's `autoexport` package and splitting out the additional
functionality used by buildkit, like the trace recorder and delegated
tracer, to more explicit processors rather than implicit through
`autoexport`.

This removes the global variables for the trace provider and meter
provider along with the global variable for the exporters. This is
replaced with functions that create the exporters. The delegated tracer
has been removed from detect and moved into the normal tracing util
package. This is still used by the command line to send delegated
traces, but it's an explicit exporter that's added rather than implicit.

Some functions have been renamed mostly to force dependent packages to
change their usage rather than have a chance at incorrect usage because
the semantics changed.

Signed-off-by: Jonathan A. Sternberg <jonathan.sternberg@docker.com>
This commit is contained in:
Jonathan A. Sternberg
2024-03-26 14:07:27 -05:00
parent b3cee61a90
commit 228e250d77
10 changed files with 242 additions and 240 deletions

View File

@@ -12,7 +12,7 @@ import (
"time"
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/util/tracing/detect"
"github.com/moby/buildkit/util/tracing/delegated"
"github.com/pkg/errors"
"github.com/urfave/cli"
"go.opentelemetry.io/otel/trace"
@@ -68,16 +68,10 @@ func ResolveClient(c *cli.Context) (*client.Client, error) {
ctx := CommandContext(c)
var opts []client.ClientOpt
if span := trace.SpanFromContext(ctx); span.SpanContext().IsValid() {
opts = append(opts, client.WithTracerProvider(span.TracerProvider()))
exp, _, err := detect.Exporter()
if err != nil {
return nil, err
}
if td, ok := exp.(client.TracerDelegate); ok {
opts = append(opts, client.WithTracerDelegate(td))
}
opts = append(opts,
client.WithTracerProvider(span.TracerProvider()),
client.WithTracerDelegate(delegated.DefaultExporter),
)
}
if caCert != "" {

View File

@@ -5,20 +5,28 @@ import (
"os"
"github.com/moby/buildkit/util/appcontext"
"github.com/moby/buildkit/util/tracing/delegated"
"github.com/moby/buildkit/util/tracing/detect"
"github.com/urfave/cli"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
)
func AttachAppContext(app *cli.App) error {
ctx := appcontext.Context()
tp, err := detect.TracerProvider()
exp, err := detect.NewSpanExporter(ctx)
if err != nil {
return err
}
tp := sdktrace.NewTracerProvider(
sdktrace.WithResource(detect.Resource()),
sdktrace.WithBatcher(exp),
sdktrace.WithBatcher(delegated.DefaultExporter),
)
tracer := tp.Tracer("")
var span trace.Span
@@ -60,7 +68,7 @@ func AttachAppContext(app *cli.App) error {
if span != nil {
span.End()
}
return detect.Shutdown(context.TODO())
return tp.Shutdown(context.TODO())
}
return nil
}

View File

@@ -16,7 +16,6 @@ import (
"github.com/moby/buildkit/util/appdefaults"
"github.com/moby/buildkit/util/profiler"
"github.com/moby/buildkit/util/stack"
_ "github.com/moby/buildkit/util/tracing/detect/delegated"
_ "github.com/moby/buildkit/util/tracing/detect/jaeger"
_ "github.com/moby/buildkit/util/tracing/env"
"github.com/moby/buildkit/version"

View File

@@ -23,6 +23,7 @@ import (
sddaemon "github.com/coreos/go-systemd/v22/daemon"
"github.com/docker/docker/pkg/reexec"
"github.com/gofrs/flock"
"github.com/hashicorp/go-multierror"
"github.com/moby/buildkit/cache/remotecache"
"github.com/moby/buildkit/cache/remotecache/azblob"
"github.com/moby/buildkit/cache/remotecache/gha"
@@ -63,7 +64,9 @@ import (
"github.com/urfave/cli"
"go.etcd.io/bbolt"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel/exporters/prometheus"
"go.opentelemetry.io/otel/propagation"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
tracev1 "go.opentelemetry.io/proto/otlp/collector/trace/v1"
"golang.org/x/sync/errgroup"
@@ -217,6 +220,7 @@ func main() {
app.Flags = append(app.Flags, appFlags...)
app.Flags = append(app.Flags, serviceFlags()...)
var closers []func(ctx context.Context) error
app.Action = func(c *cli.Context) error {
// TODO: On Windows this always returns -1. The actual "are you admin" check is very Windows-specific.
// See https://github.com/golang/go/issues/28804#issuecomment-505326268 for the "short" version.
@@ -259,15 +263,17 @@ func main() {
}
}
tp, err := detect.TracerProvider()
tp, err := newTracerProvider(ctx)
if err != nil {
return err
}
closers = append(closers, tp.Shutdown)
mp, err := detect.MeterProvider()
mp, err := newMeterProvider(ctx)
if err != nil {
return err
}
closers = append(closers, mp.Shutdown)
statsHandler := tracing.ServerStatsHandler(
otelgrpc.WithTracerProvider(tp),
@@ -375,8 +381,13 @@ func main() {
return err
}
app.After = func(_ *cli.Context) error {
return detect.Shutdown(context.TODO())
app.After = func(_ *cli.Context) (err error) {
for _, c := range closers {
if e := c(context.TODO()); e != nil {
err = multierror.Append(err, e)
}
}
return err
}
profiler.Attach(app)
@@ -737,13 +748,19 @@ func newController(c *cli.Context, cfg *config.Config) (*control.Controller, err
return nil, err
}
tc, _, err := detect.Exporter()
if err != nil {
tc := make(tracing.MultiSpanExporter, 0, 2)
if detect.Recorder != nil {
tc = append(tc, detect.Recorder)
}
if exp, err := detect.NewSpanExporter(context.TODO()); err != nil {
return nil, err
} else if !detect.IsNoneSpanExporter(exp) {
tc = append(tc, exp)
}
var traceSocket string
if tc != nil {
if len(tc) > 0 {
traceSocket = cfg.OTEL.SocketPath
if err := runTraceController(traceSocket, tc); err != nil {
return nil, err
@@ -942,9 +959,45 @@ type traceCollector struct {
}
func (t *traceCollector) Export(ctx context.Context, req *tracev1.ExportTraceServiceRequest) (*tracev1.ExportTraceServiceResponse, error) {
err := t.exporter.ExportSpans(ctx, transform.Spans(req.GetResourceSpans()))
if err != nil {
if err := t.exporter.ExportSpans(ctx, transform.Spans(req.GetResourceSpans())); err != nil {
return nil, err
}
return &tracev1.ExportTraceServiceResponse{}, nil
}
func newTracerProvider(ctx context.Context) (*sdktrace.TracerProvider, error) {
opts := []sdktrace.TracerProviderOption{
sdktrace.WithResource(detect.Resource()),
sdktrace.WithSyncer(detect.Recorder),
}
if exp, err := detect.NewSpanExporter(ctx); err != nil {
return nil, err
} else if !detect.IsNoneSpanExporter(exp) {
opts = append(opts, sdktrace.WithBatcher(exp))
}
return sdktrace.NewTracerProvider(opts...), nil
}
func newMeterProvider(ctx context.Context) (*sdkmetric.MeterProvider, error) {
opts := []sdkmetric.Option{
sdkmetric.WithResource(detect.Resource()),
}
if r, err := prometheus.New(); err != nil {
// Log the error but do not fail if we could not configure the prometheus metrics.
bklog.G(context.Background()).
WithError(err).
Error("failed prometheus metrics configuration")
} else {
opts = append(opts, sdkmetric.WithReader(r))
}
if exp, err := detect.NewMetricExporter(ctx); err != nil {
return nil, err
} else if !detect.IsNoneMetricExporter(exp) {
r := sdkmetric.NewPeriodicReader(exp)
opts = append(opts, sdkmetric.WithReader(r))
}
return sdkmetric.NewMeterProvider(opts...), nil
}

2
go.mod
View File

@@ -85,7 +85,6 @@ require (
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.21.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.21.0
go.opentelemetry.io/otel/exporters/prometheus v0.42.0
go.opentelemetry.io/otel/metric v1.21.0
go.opentelemetry.io/otel/sdk v1.21.0
go.opentelemetry.io/otel/sdk/metric v1.21.0
go.opentelemetry.io/otel/trace v1.21.0
@@ -165,6 +164,7 @@ require (
github.com/vishvananda/netns v0.0.4 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.42.0 // indirect
go.opentelemetry.io/otel/metric v1.21.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/tools v0.14.0 // indirect
google.golang.org/genproto v0.0.0-20231016165738-49dd2c1f3d0b // indirect

View File

@@ -4,19 +4,13 @@ import (
"context"
"sync"
"github.com/moby/buildkit/util/tracing/detect"
"github.com/moby/buildkit/client"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)
const maxBuffer = 256
var exp = &Exporter{}
func init() {
detect.Register("delegated", detect.TraceExporterDetector(func() (sdktrace.SpanExporter, error) {
return exp, nil
}), 100)
}
var DefaultExporter = &Exporter{}
type Exporter struct {
mu sync.Mutex
@@ -24,7 +18,10 @@ type Exporter struct {
buffer []sdktrace.ReadOnlySpan
}
var _ sdktrace.SpanExporter = &Exporter{}
var (
_ sdktrace.SpanExporter = (*Exporter)(nil)
_ client.TracerDelegate = (*Exporter)(nil)
)
func (e *Exporter) ExportSpans(ctx context.Context, ss []sdktrace.ReadOnlySpan) error {
e.mu.Lock()

View File

@@ -1,18 +0,0 @@
package delegated_test
import (
"testing"
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/util/tracing/detect"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestDetectPreservesDelegateInterface(t *testing.T) {
exp, _, err := detect.Exporter()
require.NoError(t, err)
_, ok := exp.(client.TracerDelegate)
assert.True(t, ok, "delegated tracer expected to fulfill client.TracerDelegate interface")
}

View File

@@ -3,22 +3,13 @@ package detect
import (
"context"
"os"
"path/filepath"
"sort"
"strconv"
"sync"
"github.com/moby/buildkit/util/bklog"
"github.com/pkg/errors"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/prometheus"
"go.opentelemetry.io/otel/metric"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
)
type ExporterDetector interface {
@@ -31,18 +22,7 @@ type detector struct {
priority int
}
var ServiceName string
var detectors map[string]detector
var once sync.Once
var tp trace.TracerProvider
var mp metric.MeterProvider
var exporter struct {
SpanExporter sdktrace.SpanExporter
MetricExporter sdkmetric.Exporter
}
var closers []func(context.Context) error
var err error
func Register(name string, exp ExporterDetector, priority int) {
if detectors == nil {
@@ -64,32 +44,21 @@ func (fn TraceExporterDetector) DetectMetricExporter() (sdkmetric.Exporter, erro
return nil, nil
}
func detectExporters() (texp sdktrace.SpanExporter, mexp sdkmetric.Exporter, err error) {
texp, err = detectExporter("OTEL_TRACES_EXPORTER", func(d ExporterDetector) (sdktrace.SpanExporter, bool, error) {
exp, err := d.DetectTraceExporter()
return exp, exp != nil, err
})
if err != nil {
return nil, nil, err
}
mexp, err = detectExporter("OTEL_METRICS_EXPORTER", func(d ExporterDetector) (sdkmetric.Exporter, bool, error) {
exp, err := d.DetectMetricExporter()
return exp, exp != nil, err
})
if err != nil {
return nil, nil, err
}
return texp, mexp, nil
}
func detectExporter[T any](envVar string, fn func(d ExporterDetector) (T, bool, error)) (exp T, err error) {
ignoreErrors, _ := strconv.ParseBool("OTEL_IGNORE_ERROR")
if n := os.Getenv(envVar); n != "" {
d, ok := detectors[n]
if !ok {
return exp, errors.Errorf("unsupported opentelemetry exporter %v", n)
if !ignoreErrors {
err = errors.Errorf("unsupported opentelemetry exporter %v", n)
}
return exp, err
}
exp, _, err = fn(d.f)
if err != nil && ignoreErrors {
err = nil
}
return exp, err
}
@@ -104,7 +73,7 @@ func detectExporter[T any](envVar string, fn func(d ExporterDetector) (T, bool,
var ok bool
for _, d := range arr {
exp, ok, err = fn(d.f)
if err != nil {
if err != nil && !ignoreErrors {
return exp, err
}
@@ -115,164 +84,70 @@ func detectExporter[T any](envVar string, fn func(d ExporterDetector) (T, bool,
return exp, nil
}
func detect() error {
tp = noop.NewTracerProvider()
mp = sdkmetric.NewMeterProvider()
texp, mexp, err := detectExporters()
if err != nil {
return err
}
res := Resource()
if texp != nil || Recorder != nil {
// enable log with traceID when a valid exporter is used
bklog.EnableLogWithTraceID(true)
sdktpopts := []sdktrace.TracerProviderOption{
sdktrace.WithResource(res),
}
if texp != nil {
sdktpopts = append(sdktpopts, sdktrace.WithBatcher(texp))
}
if Recorder != nil {
sp := sdktrace.NewSimpleSpanProcessor(Recorder)
sdktpopts = append(sdktpopts, sdktrace.WithSpanProcessor(sp))
}
sdktp := sdktrace.NewTracerProvider(sdktpopts...)
closers = append(closers, sdktp.Shutdown)
exporter.SpanExporter = texp
tp = sdktp
}
var readers []sdkmetric.Reader
if mexp != nil {
// Create a new periodic reader using any configured metric exporter.
readers = append(readers, sdkmetric.NewPeriodicReader(mexp))
}
if r, err := prometheus.New(); err != nil {
// Log the error but do not fail if we could not configure the prometheus metrics.
bklog.G(context.Background()).
WithError(err).
Error("failed prometheus metrics configuration")
} else {
// Register the prometheus reader if there was no error.
readers = append(readers, r)
}
if len(readers) > 0 {
opts := make([]sdkmetric.Option, 0, len(readers)+1)
opts = append(opts, sdkmetric.WithResource(res))
for _, r := range readers {
opts = append(opts, sdkmetric.WithReader(r))
}
sdkmp := sdkmetric.NewMeterProvider(opts...)
closers = append(closers, sdkmp.Shutdown)
exporter.MetricExporter = mexp
mp = sdkmp
}
return nil
}
func TracerProvider() (trace.TracerProvider, error) {
if err := detectOnce(); err != nil {
return nil, err
}
return tp, nil
}
func MeterProvider() (metric.MeterProvider, error) {
if err := detectOnce(); err != nil {
return nil, err
}
return mp, nil
}
func detectOnce() error {
once.Do(func() {
if err1 := detect(); err1 != nil {
b, _ := strconv.ParseBool(os.Getenv("OTEL_IGNORE_ERROR"))
if !b {
err = err1
}
}
})
return err
}
func Exporter() (sdktrace.SpanExporter, sdkmetric.Exporter, error) {
_, err := TracerProvider()
if err != nil {
return nil, nil, err
}
return exporter.SpanExporter, exporter.MetricExporter, nil
}
func Shutdown(ctx context.Context) error {
for _, c := range closers {
if err := c(ctx); err != nil {
return err
}
}
return nil
}
var (
detectedResource *resource.Resource
detectedResourceOnce sync.Once
)
func Resource() *resource.Resource {
detectedResourceOnce.Do(func() {
res, err := resource.New(context.Background(),
resource.WithDetectors(serviceNameDetector{}),
resource.WithFromEnv(),
resource.WithTelemetrySDK(),
)
if err != nil {
otel.Handle(err)
}
detectedResource = res
})
return detectedResource
}
// OverrideResource overrides the resource returned from Resource.
//
// This must be invoked before Resource is called otherwise it is a no-op.
func OverrideResource(res *resource.Resource) {
detectedResourceOnce.Do(func() {
detectedResource = res
func NewSpanExporter(_ context.Context) (sdktrace.SpanExporter, error) {
return detectExporter("OTEL_TRACES_EXPORTER", func(d ExporterDetector) (sdktrace.SpanExporter, bool, error) {
exp, err := d.DetectTraceExporter()
return exp, exp != nil, err
})
}
type serviceNameDetector struct{}
func (serviceNameDetector) Detect(ctx context.Context) (*resource.Resource, error) {
return resource.StringDetector(
semconv.SchemaURL,
semconv.ServiceNameKey,
func() (string, error) {
if ServiceName != "" {
return ServiceName, nil
}
return filepath.Base(os.Args[0]), nil
},
).Detect(ctx)
func NewMetricExporter(_ context.Context) (sdkmetric.Exporter, error) {
return detectExporter("OTEL_METRICS_EXPORTER", func(d ExporterDetector) (sdkmetric.Exporter, bool, error) {
exp, err := d.DetectMetricExporter()
return exp, exp != nil, err
})
}
type noneDetector struct{}
func (n noneDetector) DetectTraceExporter() (sdktrace.SpanExporter, error) {
return nil, nil
return noneSpanExporter{}, nil
}
func (n noneDetector) DetectMetricExporter() (sdkmetric.Exporter, error) {
return nil, nil
return noneMetricExporter{}, nil
}
type noneSpanExporter struct{}
func (n noneSpanExporter) ExportSpans(_ context.Context, _ []sdktrace.ReadOnlySpan) error {
return nil
}
func (n noneSpanExporter) Shutdown(_ context.Context) error {
return nil
}
func IsNoneSpanExporter(exp sdktrace.SpanExporter) bool {
_, ok := exp.(noneSpanExporter)
return ok
}
type noneMetricExporter struct{}
func (n noneMetricExporter) Temporality(kind sdkmetric.InstrumentKind) metricdata.Temporality {
return sdkmetric.DefaultTemporalitySelector(kind)
}
func (n noneMetricExporter) Aggregation(kind sdkmetric.InstrumentKind) sdkmetric.Aggregation {
return sdkmetric.DefaultAggregationSelector(kind)
}
func (n noneMetricExporter) Export(_ context.Context, _ *metricdata.ResourceMetrics) error {
return nil
}
func (n noneMetricExporter) ForceFlush(_ context.Context) error {
return nil
}
func (n noneMetricExporter) Shutdown(_ context.Context) error {
return nil
}
func IsNoneMetricExporter(exp sdkmetric.Exporter) bool {
_, ok := exp.(noneMetricExporter)
return ok
}
func init() {

View File

@@ -0,0 +1,58 @@
package detect
import (
"context"
"os"
"path/filepath"
"sync"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
)
var (
ServiceName string
detectedResource *resource.Resource
detectedResourceOnce sync.Once
)
func Resource() *resource.Resource {
detectedResourceOnce.Do(func() {
res, err := resource.New(context.Background(),
resource.WithDetectors(serviceNameDetector{}),
resource.WithFromEnv(),
resource.WithTelemetrySDK(),
)
if err != nil {
otel.Handle(err)
}
detectedResource = res
})
return detectedResource
}
// OverrideResource overrides the resource returned from Resource.
//
// This must be invoked before Resource is called otherwise it is a no-op.
func OverrideResource(res *resource.Resource) {
detectedResourceOnce.Do(func() {
detectedResource = res
})
}
type serviceNameDetector struct{}
func (serviceNameDetector) Detect(ctx context.Context) (*resource.Resource, error) {
return resource.StringDetector(
semconv.SchemaURL,
semconv.ServiceNameKey,
func() (string, error) {
if ServiceName != "" {
return ServiceName, nil
}
return filepath.Base(os.Args[0]), nil
},
).Detect(ctx)
}

View File

@@ -0,0 +1,36 @@
package tracing
import (
"context"
"github.com/hashicorp/go-multierror"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)
type MultiSpanExporter []sdktrace.SpanExporter
func (m MultiSpanExporter) ExportSpans(ctx context.Context, spans []sdktrace.ReadOnlySpan) (err error) {
for _, exp := range m {
if e := exp.ExportSpans(ctx, spans); e != nil {
if err != nil {
err = multierror.Append(err, e)
continue
}
err = e
}
}
return err
}
func (m MultiSpanExporter) Shutdown(ctx context.Context) (err error) {
for _, exp := range m {
if e := exp.Shutdown(ctx); e != nil {
if err != nil {
err = multierror.Append(err, e)
continue
}
err = e
}
}
return err
}