Merge pull request #52862 from corhere/fix-networkdb-bulksync-concurrency

libn/networkdb: fix waiting for many bulkSync ACKs
This commit is contained in:
Paweł Gronowski
2026-06-15 16:32:44 +02:00
committed by GitHub
4 changed files with 140 additions and 46 deletions

View File

@@ -597,7 +597,7 @@ func (nDB *NetworkDB) bulkSync(nodes []string, all bool) ([]string, error) {
// Bulk sync all the table entries belonging to a set of networks to a
// single peer node. It can be unsolicited or can be in response to an
// unsolicited bulk sync
func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited bool) error {
func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited bool) (retErr error) {
var msgs [][]byte
var unsolMsg string
@@ -651,7 +651,7 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b
compound := makeCompoundMessage(msgs)
bsm := BulkSyncMessage{
LTime: nDB.tableClock.Time(),
LTime: nDB.tableClock.Increment(),
Unsolicited: unsolicited,
NodeName: nDB.config.NodeID,
Networks: networks,
@@ -663,33 +663,58 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b
return fmt.Errorf("failed to encode bulk sync message: %v", err)
}
nDB.Lock()
ch := make(chan struct{})
nDB.bulkSyncAckTbl[node] = ch
nDB.Unlock()
// Wait on a response only if we are sending an unsolicited bulk sync,
// as only unsolicited bulk syncs trigger a response.
if unsolicited {
nDB.Lock()
ch := make(chan struct{})
nDB.bulkSyncAckTbl[node] = append(nDB.bulkSyncAckTbl[node], bulkSyncSubscription{
LTime: bsm.LTime,
Done: ch,
})
nDB.Unlock()
unsubscribe := func() {
nDB.Lock()
defer nDB.Unlock()
subscriptions := nDB.bulkSyncAckTbl[node]
for i, sub := range subscriptions {
if sub.Done == ch {
subscriptions = slices.Delete(subscriptions, i, i+1)
break
}
}
if len(subscriptions) > 0 {
nDB.bulkSyncAckTbl[node] = subscriptions
} else {
delete(nDB.bulkSyncAckTbl, node)
}
}
defer func() {
if retErr != nil {
unsubscribe()
return
}
startTime := time.Now()
t := time.NewTimer(30 * time.Second)
select {
case <-t.C:
log.G(context.TODO()).Errorf("Bulk sync to node %s timed out", node)
nDB.bulkSyncAckTimeouts.Add(1)
unsubscribe()
case <-ch:
log.G(context.TODO()).Debugf("%v(%v): Bulk sync to node %s took %s", nDB.config.Hostname, nDB.config.NodeID, node, time.Since(startTime))
}
t.Stop()
}()
}
err = nDB.memberlist.SendReliable(&mnode.Node, buf)
if err != nil {
nDB.Lock()
delete(nDB.bulkSyncAckTbl, node)
nDB.Unlock()
return fmt.Errorf("failed to send a TCP message during bulk sync: %v", err)
}
// Wait on a response only if it is unsolicited.
if unsolicited {
startTime := time.Now()
t := time.NewTimer(30 * time.Second)
select {
case <-t.C:
log.G(context.TODO()).Errorf("Bulk sync to node %s timed out", node)
case <-ch:
log.G(context.TODO()).Debugf("%v(%v): Bulk sync to node %s took %s", nDB.config.Hostname, nDB.config.NodeID, node, time.Since(startTime))
}
t.Stop()
}
return nil
}

View File

@@ -357,28 +357,35 @@ func (nDB *NetworkDB) handleBulkSync(buf []byte) {
nDB.handleMessage(bsm.Payload, true)
// Don't respond to a bulk sync which was not unsolicited
if !bsm.Unsolicited {
nDB.Lock()
ch, ok := nDB.bulkSyncAckTbl[bsm.NodeName]
if ok {
close(ch)
delete(nDB.bulkSyncAckTbl, bsm.NodeName)
nDB.Lock()
acks := nDB.bulkSyncAckTbl[bsm.NodeName]
var pendingAcks []bulkSyncSubscription
for _, ack := range acks {
if bsm.LTime > ack.LTime {
close(ack.Done)
} else {
pendingAcks = append(pendingAcks, ack)
}
nDB.Unlock()
return
}
var nodeAddr net.IP
nDB.RLock()
if node, ok := nDB.nodes[bsm.NodeName]; ok {
nodeAddr = node.Addr
if len(pendingAcks) > 0 {
nDB.bulkSyncAckTbl[bsm.NodeName] = pendingAcks
} else {
delete(nDB.bulkSyncAckTbl, bsm.NodeName)
}
nDB.RUnlock()
nDB.Unlock()
if err := nDB.bulkSyncNode(bsm.Networks, bsm.NodeName, false); err != nil {
log.G(context.TODO()).Errorf("Error in responding to bulk sync from node %s: %v", nodeAddr, err)
// Only respond to an unsolicited bulk sync.
if bsm.Unsolicited {
var nodeAddr net.IP
nDB.RLock()
if node, ok := nDB.nodes[bsm.NodeName]; ok {
nodeAddr = node.Addr
}
nDB.RUnlock()
if err := nDB.bulkSyncNode(bsm.Networks, bsm.NodeName, false); err != nil {
log.G(context.TODO()).Errorf("Error in responding to bulk sync from node %s: %v", nodeAddr, err)
}
}
}

View File

@@ -79,9 +79,13 @@ type NetworkDB struct {
// network. The key is a network ID.
networkNodes map[string][]string
// A table of ack channels for every node from which we are
// waiting for an ack.
bulkSyncAckTbl map[string]chan struct{}
// A table of subscriptions for every node from which we are waiting for
// a bulk-sync ack.
bulkSyncAckTbl map[string][]bulkSyncSubscription
// A count of the number of times we have timed out waiting for a bulk
// sync ack from any peer node.
bulkSyncAckTimeouts atomic.Uint64
// Broadcast queue for network event gossip.
networkBroadcasts *memberlist.TransmitLimitedQueue
@@ -261,6 +265,14 @@ type entry struct {
reapTime time.Duration
}
// bulkSyncSubscription is a subscription to the bulk-sync progress for a peer node.
// Done is closed when a bulk sync from the node is received with an LTime
// greater than the subscription's LTime.
type bulkSyncSubscription struct {
LTime serf.LamportTime
Done chan<- struct{}
}
// DefaultConfig returns a NetworkDB config with default values
func DefaultConfig() *Config {
hostname, _ := os.Hostname()
@@ -310,7 +322,7 @@ func newNetworkDB(c *Config) *NetworkDB {
failedNodes: make(map[string]*node),
leftNodes: make(map[string]*node),
networkNodes: make(map[string][]string),
bulkSyncAckTbl: make(map[string]chan struct{}),
bulkSyncAckTbl: make(map[string][]bulkSyncSubscription),
broadcaster: events.NewBroadcaster(),
rng: rand.New(rand.NewChaCha8(rngSeed)), //gosec:disable G404 -- not used in a security sensitive context
}

View File

@@ -17,6 +17,7 @@ import (
"github.com/docker/go-events"
"github.com/hashicorp/memberlist"
"github.com/moby/moby/v2/daemon/internal/stringid"
"golang.org/x/sync/errgroup"
"gotest.tools/v3/assert"
is "gotest.tools/v3/assert/cmp"
"gotest.tools/v3/poll"
@@ -464,6 +465,55 @@ func TestNetworkDBBulkSync(t *testing.T) {
closeNetworkDBInstances(t, dbs)
}
// Regression test for https://github.com/moby/moby/issues/51701
func TestNetworkDBBulkSyncNodeConcurrent(t *testing.T) {
dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
defer closeNetworkDBInstances(t, dbs)
err := dbs[0].JoinNetwork("network1")
assert.NilError(t, err)
dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
err = dbs[1].JoinNetwork("network1")
assert.NilError(t, err)
dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, "network1", true)
for i := range 50 {
err = dbs[0].CreateEntry("test_table", "network1",
fmt.Sprintf("key%d", i),
fmt.Appendf(nil, "val%d", i))
assert.NilError(t, err)
}
const N = 4
start := make(chan struct{})
var eg errgroup.Group
for i := range N {
eg.Go(func() error {
<-start
err := dbs[1].bulkSyncNode(
[]string{"network1"}, dbs[0].config.NodeID, true)
if err != nil {
return fmt.Errorf("[%d] %w", i, err)
}
return nil
})
}
close(start)
assert.NilError(t, eg.Wait())
// No bulk sync should have timed out. On broken code this is N-1.
assert.Equal(t, dbs[1].bulkSyncAckTimeouts.Load(), uint64(0),
"expected 0 bulk sync timeouts, but some ack channels were orphaned")
dbs[1].RLock()
defer dbs[1].RUnlock()
assert.Assert(t, is.Len(dbs[1].bulkSyncAckTbl, 0),
"bulkSyncAckTbl should be empty after all syncs complete")
}
func TestNetworkDBCRUDMediumCluster(t *testing.T) {
n := 5