mirror of
https://github.com/moby/moby.git
synced 2026-06-30 19:58:03 +00:00
@@ -45,8 +45,7 @@ func newRingLogger(driver Logger, logInfo Info, maxSize int64) *ringLogger {
|
||||
l: driver,
|
||||
logInfo: logInfo,
|
||||
}
|
||||
l.wg.Add(1)
|
||||
go l.run()
|
||||
l.wg.Go(l.run)
|
||||
return l
|
||||
}
|
||||
|
||||
@@ -121,7 +120,6 @@ func (r *ringLogger) Close() error {
|
||||
// logger.
|
||||
// This is run in a goroutine when the ringLogger is created
|
||||
func (r *ringLogger) run() {
|
||||
defer r.wg.Done()
|
||||
for {
|
||||
if r.closed() {
|
||||
return
|
||||
@@ -169,21 +167,18 @@ func (r *messageRing) Enqueue(m *Message) error {
|
||||
mSize := int64(len(m.Line))
|
||||
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
if r.closed {
|
||||
r.mu.Unlock()
|
||||
return errClosed
|
||||
}
|
||||
if mSize+r.sizeBytes > r.maxBytes && len(r.queue) > 0 {
|
||||
r.wait.Signal()
|
||||
r.mu.Unlock()
|
||||
PutMessage(m)
|
||||
return nil
|
||||
}
|
||||
|
||||
r.queue = append(r.queue, m)
|
||||
r.sizeBytes += mSize
|
||||
r.wait.Signal()
|
||||
r.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -192,19 +187,18 @@ func (r *messageRing) Enqueue(m *Message) error {
|
||||
// If the buffer is closed, it will return immediately.
|
||||
func (r *messageRing) Dequeue() (*Message, error) {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
for len(r.queue) == 0 && !r.closed {
|
||||
r.wait.Wait()
|
||||
}
|
||||
|
||||
if r.closed {
|
||||
r.mu.Unlock()
|
||||
return nil, errClosed
|
||||
}
|
||||
|
||||
msg := r.queue[0]
|
||||
r.queue = r.queue[1:]
|
||||
r.sizeBytes -= int64(len(msg.Line))
|
||||
r.mu.Unlock()
|
||||
return msg, nil
|
||||
}
|
||||
|
||||
@@ -214,24 +208,23 @@ var errClosed = errors.New("closed")
|
||||
// Any callers waiting to dequeue a message will be woken up.
|
||||
func (r *messageRing) Close() {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
if r.closed {
|
||||
r.mu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
r.closed = true
|
||||
r.wait.Broadcast()
|
||||
r.mu.Unlock()
|
||||
}
|
||||
|
||||
// Drain drains all messages from the queue.
|
||||
// This can be used after `Close()` to get any remaining messages that were in queue.
|
||||
func (r *messageRing) Drain() []*Message {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
ls := make([]*Message, 0, len(r.queue))
|
||||
ls = append(ls, r.queue...)
|
||||
r.sizeBytes = 0
|
||||
r.queue = r.queue[:0]
|
||||
r.mu.Unlock()
|
||||
return ls
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user