mirror of
https://github.com/containerd/containerd.git
synced 2026-07-01 12:19:48 +00:00
337 lines
9.5 KiB
Go
337 lines
9.5 KiB
Go
/*
|
|
Copyright The containerd Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package erofs
|
|
|
|
import (
|
|
"context"
|
|
"crypto/rand"
|
|
"encoding/base64"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"path"
|
|
"path/filepath"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/containerd/continuity/fs"
|
|
"github.com/containerd/errdefs"
|
|
"github.com/containerd/log"
|
|
digest "github.com/opencontainers/go-digest"
|
|
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
|
|
|
"github.com/containerd/containerd/v2/core/content"
|
|
"github.com/containerd/containerd/v2/core/diff"
|
|
"github.com/containerd/containerd/v2/core/images"
|
|
"github.com/containerd/containerd/v2/core/mount"
|
|
"github.com/containerd/containerd/v2/internal/erofsutils"
|
|
"github.com/containerd/containerd/v2/pkg/archive"
|
|
"github.com/containerd/containerd/v2/pkg/archive/compression"
|
|
"github.com/containerd/containerd/v2/pkg/epoch"
|
|
"github.com/containerd/containerd/v2/pkg/labels"
|
|
)
|
|
|
|
var emptyDesc = ocispec.Descriptor{}
|
|
|
|
type differ interface {
|
|
diff.Applier
|
|
diff.Comparer
|
|
}
|
|
|
|
// erofsDiff does erofs comparison and application
|
|
type erofsDiff struct {
|
|
store content.Store
|
|
mkfsExtraOpts []string
|
|
}
|
|
|
|
func NewErofsDiffer(store content.Store, mkfsExtraOpts []string) differ {
|
|
return &erofsDiff{
|
|
store: store,
|
|
mkfsExtraOpts: mkfsExtraOpts,
|
|
}
|
|
}
|
|
|
|
// A valid EROFS native layer media type should end with ".erofs".
|
|
//
|
|
// Please avoid using any +suffix to list the algorithms used inside EROFS
|
|
// blobs, since:
|
|
// - Each EROFS layer can use multiple compression algorithms;
|
|
// - The suffixes should only indicate the corresponding preprocessor for
|
|
// `images.DiffCompression`.
|
|
//
|
|
// Since `images.DiffCompression` doesn't support arbitrary media types,
|
|
// disallow non-empty suffixes for now.
|
|
func isErofsMediaType(mt string) bool {
|
|
mediaType, ext, ok := strings.Cut(mt, "+")
|
|
if !ok || ext != "" {
|
|
return false
|
|
}
|
|
return strings.HasSuffix(mediaType, ".erofs")
|
|
}
|
|
|
|
func writeDiff(ctx context.Context, w io.Writer, lower []mount.Mount, upperRoot string) error {
|
|
var opts []archive.ChangeWriterOpt
|
|
|
|
return mount.WithTempMount(ctx, lower, func(lowerRoot string) error {
|
|
cw := archive.NewChangeWriter(w, upperRoot, opts...)
|
|
err := fs.DiffDirChanges(ctx, lowerRoot, upperRoot, fs.DiffSourceOverlayFS, cw.HandleChange)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create diff tar stream: %w", err)
|
|
}
|
|
return cw.Close()
|
|
})
|
|
}
|
|
|
|
// Compare creates a diff between the given mounts and uploads the result
|
|
// to the content store.
|
|
func (s erofsDiff) Compare(ctx context.Context, lower, upper []mount.Mount, opts ...diff.Opt) (d ocispec.Descriptor, err error) {
|
|
layer, err := erofsutils.MountsToLayer(upper)
|
|
if err != nil {
|
|
return emptyDesc, fmt.Errorf("unsupported layer for erofsDiff Compare method: %w", err)
|
|
}
|
|
|
|
var config diff.Config
|
|
for _, opt := range opts {
|
|
if err := opt(&config); err != nil {
|
|
return emptyDesc, err
|
|
}
|
|
}
|
|
if tm := epoch.FromContext(ctx); tm != nil && config.SourceDateEpoch == nil {
|
|
config.SourceDateEpoch = tm
|
|
}
|
|
|
|
if config.MediaType == "" {
|
|
config.MediaType = ocispec.MediaTypeImageLayerGzip
|
|
}
|
|
|
|
var compressionType compression.Compression
|
|
switch config.MediaType {
|
|
case ocispec.MediaTypeImageLayer:
|
|
compressionType = compression.Uncompressed
|
|
case ocispec.MediaTypeImageLayerGzip:
|
|
compressionType = compression.Gzip
|
|
case ocispec.MediaTypeImageLayerZstd:
|
|
compressionType = compression.Zstd
|
|
default:
|
|
return emptyDesc, fmt.Errorf("unsupported diff media type: %v: %w", config.MediaType, errdefs.ErrNotImplemented)
|
|
}
|
|
|
|
var newReference bool
|
|
if config.Reference == "" {
|
|
newReference = true
|
|
config.Reference = uniqueRef()
|
|
}
|
|
|
|
cw, err := s.store.Writer(ctx,
|
|
content.WithRef(config.Reference),
|
|
content.WithDescriptor(ocispec.Descriptor{
|
|
MediaType: config.MediaType, // most contentstore implementations just ignore this
|
|
}))
|
|
if err != nil {
|
|
return emptyDesc, fmt.Errorf("failed to open writer: %w", err)
|
|
}
|
|
|
|
// errOpen is set when an error occurs while the content writer has not been
|
|
// committed or closed yet to force a cleanup
|
|
var errOpen error
|
|
defer func() {
|
|
if errOpen != nil {
|
|
cw.Close()
|
|
if newReference {
|
|
if abortErr := s.store.Abort(ctx, config.Reference); abortErr != nil {
|
|
log.G(ctx).WithError(abortErr).WithField("ref", config.Reference).Warnf("failed to delete diff upload")
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
if !newReference {
|
|
if errOpen = cw.Truncate(0); errOpen != nil {
|
|
return emptyDesc, errOpen
|
|
}
|
|
}
|
|
|
|
upperRoot := filepath.Join(layer, "fs")
|
|
if compressionType != compression.Uncompressed {
|
|
dgstr := digest.SHA256.Digester()
|
|
var compressed io.WriteCloser
|
|
if config.Compressor != nil {
|
|
compressed, errOpen = config.Compressor(cw, config.MediaType)
|
|
if errOpen != nil {
|
|
return emptyDesc, fmt.Errorf("failed to get compressed stream: %w", errOpen)
|
|
}
|
|
} else {
|
|
compressed, errOpen = compression.CompressStream(cw, compressionType)
|
|
if errOpen != nil {
|
|
return emptyDesc, fmt.Errorf("failed to get compressed stream: %w", errOpen)
|
|
}
|
|
}
|
|
errOpen = writeDiff(ctx, io.MultiWriter(compressed, dgstr.Hash()), lower, upperRoot)
|
|
compressed.Close()
|
|
if errOpen != nil {
|
|
return emptyDesc, fmt.Errorf("failed to write compressed diff: %w", errOpen)
|
|
}
|
|
|
|
if config.Labels == nil {
|
|
config.Labels = map[string]string{}
|
|
}
|
|
config.Labels[labels.LabelUncompressed] = dgstr.Digest().String()
|
|
} else {
|
|
err := writeDiff(ctx, cw, lower, upperRoot)
|
|
if err != nil {
|
|
return emptyDesc, fmt.Errorf("failed to create diff tar stream: %w", err)
|
|
}
|
|
}
|
|
|
|
var commitopts []content.Opt
|
|
if config.Labels != nil {
|
|
commitopts = append(commitopts, content.WithLabels(config.Labels))
|
|
}
|
|
|
|
dgst := cw.Digest()
|
|
if errOpen = cw.Commit(ctx, 0, dgst, commitopts...); errOpen != nil {
|
|
if !errdefs.IsAlreadyExists(errOpen) {
|
|
return emptyDesc, fmt.Errorf("failed to commit: %w", errOpen)
|
|
}
|
|
errOpen = nil
|
|
}
|
|
|
|
info, err := s.store.Info(ctx, dgst)
|
|
if err != nil {
|
|
return emptyDesc, fmt.Errorf("failed to get info from content store: %w", err)
|
|
}
|
|
if info.Labels == nil {
|
|
info.Labels = make(map[string]string)
|
|
}
|
|
// Set "containerd.io/uncompressed" label if digest already existed without label
|
|
if _, ok := info.Labels[labels.LabelUncompressed]; !ok {
|
|
info.Labels[labels.LabelUncompressed] = config.Labels[labels.LabelUncompressed]
|
|
if _, err := s.store.Update(ctx, info, "labels."+labels.LabelUncompressed); err != nil {
|
|
return emptyDesc, fmt.Errorf("error setting uncompressed label: %w", err)
|
|
}
|
|
}
|
|
|
|
return ocispec.Descriptor{
|
|
MediaType: config.MediaType,
|
|
Size: info.Size,
|
|
Digest: info.Digest,
|
|
}, nil
|
|
}
|
|
|
|
func (s erofsDiff) Apply(ctx context.Context, desc ocispec.Descriptor, mounts []mount.Mount, opts ...diff.ApplyOpt) (d ocispec.Descriptor, err error) {
|
|
t1 := time.Now()
|
|
defer func() {
|
|
if err == nil {
|
|
log.G(ctx).WithFields(log.Fields{
|
|
"d": time.Since(t1),
|
|
"digest": desc.Digest,
|
|
"size": desc.Size,
|
|
"media": desc.MediaType,
|
|
}).Debugf("diff applied")
|
|
}
|
|
}()
|
|
|
|
native := false
|
|
if isErofsMediaType(desc.MediaType) {
|
|
native = true
|
|
} else if _, err := images.DiffCompression(ctx, desc.MediaType); err != nil {
|
|
return emptyDesc, fmt.Errorf("currently unsupported media type: %s", desc.MediaType)
|
|
}
|
|
|
|
var config diff.ApplyConfig
|
|
for _, o := range opts {
|
|
if err := o(ctx, desc, &config); err != nil {
|
|
return emptyDesc, fmt.Errorf("failed to apply config opt: %w", err)
|
|
}
|
|
}
|
|
|
|
layer, err := erofsutils.MountsToLayer(mounts)
|
|
if err != nil {
|
|
return emptyDesc, err
|
|
}
|
|
|
|
ra, err := s.store.ReaderAt(ctx, desc)
|
|
if err != nil {
|
|
return emptyDesc, fmt.Errorf("failed to get reader from content store: %w", err)
|
|
}
|
|
defer ra.Close()
|
|
|
|
layerBlobPath := path.Join(layer, "layer.erofs")
|
|
if native {
|
|
f, err := os.Create(layerBlobPath)
|
|
if err != nil {
|
|
return emptyDesc, err
|
|
}
|
|
_, err = io.Copy(f, content.NewReader(ra))
|
|
f.Close()
|
|
if err != nil {
|
|
return emptyDesc, err
|
|
}
|
|
return desc, nil
|
|
}
|
|
|
|
processor := diff.NewProcessorChain(desc.MediaType, content.NewReader(ra))
|
|
for {
|
|
if processor, err = diff.GetProcessor(ctx, processor, config.ProcessorPayloads); err != nil {
|
|
return emptyDesc, fmt.Errorf("failed to get stream processor for %s: %w", desc.MediaType, err)
|
|
}
|
|
if processor.MediaType() == ocispec.MediaTypeImageLayer {
|
|
break
|
|
}
|
|
}
|
|
defer processor.Close()
|
|
|
|
digester := digest.Canonical.Digester()
|
|
rc := &readCounter{
|
|
r: io.TeeReader(processor, digester.Hash()),
|
|
}
|
|
|
|
err = erofsutils.ConvertTarErofs(ctx, rc, layerBlobPath, s.mkfsExtraOpts)
|
|
if err != nil {
|
|
return emptyDesc, fmt.Errorf("failed to convert erofs: %w", err)
|
|
}
|
|
|
|
// Read any trailing data
|
|
if _, err := io.Copy(io.Discard, rc); err != nil {
|
|
return emptyDesc, err
|
|
}
|
|
|
|
return ocispec.Descriptor{
|
|
MediaType: ocispec.MediaTypeImageLayer,
|
|
Size: rc.c,
|
|
Digest: digester.Digest(),
|
|
}, nil
|
|
}
|
|
|
|
type readCounter struct {
|
|
r io.Reader
|
|
c int64
|
|
}
|
|
|
|
func (rc *readCounter) Read(p []byte) (n int, err error) {
|
|
n, err = rc.r.Read(p)
|
|
rc.c += int64(n)
|
|
return
|
|
}
|
|
|
|
func uniqueRef() string {
|
|
t := time.Now()
|
|
var b [3]byte
|
|
// Ignore read failures, just decreases uniqueness
|
|
rand.Read(b[:])
|
|
return fmt.Sprintf("%d-%s", t.UnixNano(), base64.URLEncoding.EncodeToString(b[:]))
|
|
}
|