diff --git a/daemon/libnetwork/networkdb/cluster.go b/daemon/libnetwork/networkdb/cluster.go index 3c7ceb7c34..d8f542dc60 100644 --- a/daemon/libnetwork/networkdb/cluster.go +++ b/daemon/libnetwork/networkdb/cluster.go @@ -597,7 +597,7 @@ func (nDB *NetworkDB) bulkSync(nodes []string, all bool) ([]string, error) { // Bulk sync all the table entries belonging to a set of networks to a // single peer node. It can be unsolicited or can be in response to an // unsolicited bulk sync -func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited bool) error { +func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited bool) (retErr error) { var msgs [][]byte var unsolMsg string @@ -651,7 +651,7 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b compound := makeCompoundMessage(msgs) bsm := BulkSyncMessage{ - LTime: nDB.tableClock.Time(), + LTime: nDB.tableClock.Increment(), Unsolicited: unsolicited, NodeName: nDB.config.NodeID, Networks: networks, @@ -663,33 +663,58 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b return fmt.Errorf("failed to encode bulk sync message: %v", err) } - nDB.Lock() - ch := make(chan struct{}) - nDB.bulkSyncAckTbl[node] = ch - nDB.Unlock() + // Wait on a response only if we are sending an unsolicited bulk sync, + // as only unsolicited bulk syncs trigger a response. + if unsolicited { + nDB.Lock() + ch := make(chan struct{}) + nDB.bulkSyncAckTbl[node] = append(nDB.bulkSyncAckTbl[node], bulkSyncSubscription{ + LTime: bsm.LTime, + Done: ch, + }) + nDB.Unlock() + + unsubscribe := func() { + nDB.Lock() + defer nDB.Unlock() + subscriptions := nDB.bulkSyncAckTbl[node] + for i, sub := range subscriptions { + if sub.Done == ch { + subscriptions = slices.Delete(subscriptions, i, i+1) + break + } + } + if len(subscriptions) > 0 { + nDB.bulkSyncAckTbl[node] = subscriptions + } else { + delete(nDB.bulkSyncAckTbl, node) + } + } + + defer func() { + if retErr != nil { + unsubscribe() + return + } + + startTime := time.Now() + t := time.NewTimer(30 * time.Second) + select { + case <-t.C: + log.G(context.TODO()).Errorf("Bulk sync to node %s timed out", node) + nDB.bulkSyncAckTimeouts.Add(1) + unsubscribe() + case <-ch: + log.G(context.TODO()).Debugf("%v(%v): Bulk sync to node %s took %s", nDB.config.Hostname, nDB.config.NodeID, node, time.Since(startTime)) + } + t.Stop() + }() + } err = nDB.memberlist.SendReliable(&mnode.Node, buf) if err != nil { - nDB.Lock() - delete(nDB.bulkSyncAckTbl, node) - nDB.Unlock() - return fmt.Errorf("failed to send a TCP message during bulk sync: %v", err) } - - // Wait on a response only if it is unsolicited. - if unsolicited { - startTime := time.Now() - t := time.NewTimer(30 * time.Second) - select { - case <-t.C: - log.G(context.TODO()).Errorf("Bulk sync to node %s timed out", node) - case <-ch: - log.G(context.TODO()).Debugf("%v(%v): Bulk sync to node %s took %s", nDB.config.Hostname, nDB.config.NodeID, node, time.Since(startTime)) - } - t.Stop() - } - return nil } diff --git a/daemon/libnetwork/networkdb/delegate.go b/daemon/libnetwork/networkdb/delegate.go index 9b363eb545..b026e4a792 100644 --- a/daemon/libnetwork/networkdb/delegate.go +++ b/daemon/libnetwork/networkdb/delegate.go @@ -357,28 +357,35 @@ func (nDB *NetworkDB) handleBulkSync(buf []byte) { nDB.handleMessage(bsm.Payload, true) - // Don't respond to a bulk sync which was not unsolicited - if !bsm.Unsolicited { - nDB.Lock() - ch, ok := nDB.bulkSyncAckTbl[bsm.NodeName] - if ok { - close(ch) - delete(nDB.bulkSyncAckTbl, bsm.NodeName) + nDB.Lock() + acks := nDB.bulkSyncAckTbl[bsm.NodeName] + var pendingAcks []bulkSyncSubscription + for _, ack := range acks { + if bsm.LTime > ack.LTime { + close(ack.Done) + } else { + pendingAcks = append(pendingAcks, ack) } - nDB.Unlock() - - return } - - var nodeAddr net.IP - nDB.RLock() - if node, ok := nDB.nodes[bsm.NodeName]; ok { - nodeAddr = node.Addr + if len(pendingAcks) > 0 { + nDB.bulkSyncAckTbl[bsm.NodeName] = pendingAcks + } else { + delete(nDB.bulkSyncAckTbl, bsm.NodeName) } - nDB.RUnlock() + nDB.Unlock() - if err := nDB.bulkSyncNode(bsm.Networks, bsm.NodeName, false); err != nil { - log.G(context.TODO()).Errorf("Error in responding to bulk sync from node %s: %v", nodeAddr, err) + // Only respond to an unsolicited bulk sync. + if bsm.Unsolicited { + var nodeAddr net.IP + nDB.RLock() + if node, ok := nDB.nodes[bsm.NodeName]; ok { + nodeAddr = node.Addr + } + nDB.RUnlock() + + if err := nDB.bulkSyncNode(bsm.Networks, bsm.NodeName, false); err != nil { + log.G(context.TODO()).Errorf("Error in responding to bulk sync from node %s: %v", nodeAddr, err) + } } } diff --git a/daemon/libnetwork/networkdb/networkdb.go b/daemon/libnetwork/networkdb/networkdb.go index c30076d173..1d46c8309f 100644 --- a/daemon/libnetwork/networkdb/networkdb.go +++ b/daemon/libnetwork/networkdb/networkdb.go @@ -79,9 +79,13 @@ type NetworkDB struct { // network. The key is a network ID. networkNodes map[string][]string - // A table of ack channels for every node from which we are - // waiting for an ack. - bulkSyncAckTbl map[string]chan struct{} + // A table of subscriptions for every node from which we are waiting for + // a bulk-sync ack. + bulkSyncAckTbl map[string][]bulkSyncSubscription + + // A count of the number of times we have timed out waiting for a bulk + // sync ack from any peer node. + bulkSyncAckTimeouts atomic.Uint64 // Broadcast queue for network event gossip. networkBroadcasts *memberlist.TransmitLimitedQueue @@ -261,6 +265,14 @@ type entry struct { reapTime time.Duration } +// bulkSyncSubscription is a subscription to the bulk-sync progress for a peer node. +// Done is closed when a bulk sync from the node is received with an LTime +// greater than the subscription's LTime. +type bulkSyncSubscription struct { + LTime serf.LamportTime + Done chan<- struct{} +} + // DefaultConfig returns a NetworkDB config with default values func DefaultConfig() *Config { hostname, _ := os.Hostname() @@ -310,7 +322,7 @@ func newNetworkDB(c *Config) *NetworkDB { failedNodes: make(map[string]*node), leftNodes: make(map[string]*node), networkNodes: make(map[string][]string), - bulkSyncAckTbl: make(map[string]chan struct{}), + bulkSyncAckTbl: make(map[string][]bulkSyncSubscription), broadcaster: events.NewBroadcaster(), rng: rand.New(rand.NewChaCha8(rngSeed)), //gosec:disable G404 -- not used in a security sensitive context } diff --git a/daemon/libnetwork/networkdb/networkdb_test.go b/daemon/libnetwork/networkdb/networkdb_test.go index 96cb09c078..fb7eae1ab1 100644 --- a/daemon/libnetwork/networkdb/networkdb_test.go +++ b/daemon/libnetwork/networkdb/networkdb_test.go @@ -17,6 +17,7 @@ import ( "github.com/docker/go-events" "github.com/hashicorp/memberlist" "github.com/moby/moby/v2/daemon/internal/stringid" + "golang.org/x/sync/errgroup" "gotest.tools/v3/assert" is "gotest.tools/v3/assert/cmp" "gotest.tools/v3/poll" @@ -464,6 +465,55 @@ func TestNetworkDBBulkSync(t *testing.T) { closeNetworkDBInstances(t, dbs) } +// Regression test for https://github.com/moby/moby/issues/51701 +func TestNetworkDBBulkSyncNodeConcurrent(t *testing.T) { + dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig()) + defer closeNetworkDBInstances(t, dbs) + + err := dbs[0].JoinNetwork("network1") + assert.NilError(t, err) + + dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true) + + err = dbs[1].JoinNetwork("network1") + assert.NilError(t, err) + + dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, "network1", true) + + for i := range 50 { + err = dbs[0].CreateEntry("test_table", "network1", + fmt.Sprintf("key%d", i), + fmt.Appendf(nil, "val%d", i)) + assert.NilError(t, err) + } + + const N = 4 + start := make(chan struct{}) + var eg errgroup.Group + for i := range N { + eg.Go(func() error { + <-start + err := dbs[1].bulkSyncNode( + []string{"network1"}, dbs[0].config.NodeID, true) + if err != nil { + return fmt.Errorf("[%d] %w", i, err) + } + return nil + }) + } + close(start) + assert.NilError(t, eg.Wait()) + + // No bulk sync should have timed out. On broken code this is N-1. + assert.Equal(t, dbs[1].bulkSyncAckTimeouts.Load(), uint64(0), + "expected 0 bulk sync timeouts, but some ack channels were orphaned") + + dbs[1].RLock() + defer dbs[1].RUnlock() + assert.Assert(t, is.Len(dbs[1].bulkSyncAckTbl, 0), + "bulkSyncAckTbl should be empty after all syncs complete") +} + func TestNetworkDBCRUDMediumCluster(t *testing.T) { n := 5