Compare commits
1 Commits
feature/us
...
add-read-w
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b97b676e69 |
24
.github/workflows/golang-test-freebsd.yml
vendored
24
.github/workflows/golang-test-freebsd.yml
vendored
@@ -13,7 +13,7 @@ concurrency:
|
||||
|
||||
jobs:
|
||||
test:
|
||||
runs-on: ubuntu-22.04
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- name: Test in FreeBSD
|
||||
@@ -21,15 +21,19 @@ jobs:
|
||||
uses: vmactions/freebsd-vm@v1
|
||||
with:
|
||||
usesh: true
|
||||
copyback: false
|
||||
release: "14.1"
|
||||
prepare: |
|
||||
pkg install -y go
|
||||
pkg install -y curl
|
||||
pkg install -y git
|
||||
|
||||
# -x - to print all executed commands
|
||||
# -e - to faile on first error
|
||||
run: |
|
||||
set -e -x
|
||||
go build -o netbird client/main.go
|
||||
go test -timeout 5m -p 1 -failfast ./iface/...
|
||||
go test -timeout 5m -p 1 -failfast ./client/...
|
||||
set -x
|
||||
curl -o go.tar.gz https://go.dev/dl/go1.21.11.freebsd-amd64.tar.gz -L
|
||||
tar zxf go.tar.gz
|
||||
mv go /usr/local/go
|
||||
ln -s /usr/local/go/bin/go /usr/local/bin/go
|
||||
go mod tidy
|
||||
go test -timeout 5m -p 1 ./iface/...
|
||||
go test -timeout 5m -p 1 ./client/...
|
||||
cd client
|
||||
go build .
|
||||
cd ..
|
||||
@@ -31,8 +31,6 @@ var installCmd = &cobra.Command{
|
||||
configPath,
|
||||
"--log-level",
|
||||
logLevel,
|
||||
"--daemon-addr",
|
||||
daemonAddr,
|
||||
}
|
||||
|
||||
if managementURL != "" {
|
||||
|
||||
@@ -128,9 +128,6 @@ func (s *serviceViaListener) RuntimeIP() string {
|
||||
}
|
||||
|
||||
func (s *serviceViaListener) setListenerStatus(running bool) {
|
||||
s.listenerFlagLock.Lock()
|
||||
defer s.listenerFlagLock.Unlock()
|
||||
|
||||
s.listenerIsRunning = running
|
||||
}
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
//go:build darwin
|
||||
//go:build darwin || dragonfly || freebsd || netbsd || openbsd
|
||||
|
||||
package main
|
||||
|
||||
|
||||
19
go.mod
19
go.mod
@@ -36,12 +36,10 @@ require (
|
||||
github.com/cilium/ebpf v0.15.0
|
||||
github.com/coreos/go-iptables v0.7.0
|
||||
github.com/creack/pty v1.1.18
|
||||
github.com/eclipse/paho.mqtt.golang v1.4.3
|
||||
github.com/eko/gocache/v3 v3.1.1
|
||||
github.com/fsnotify/fsnotify v1.6.0
|
||||
github.com/getlantern/systray v1.2.1
|
||||
github.com/gliderlabs/ssh v0.3.4
|
||||
github.com/go-redis/redis/v8 v8.11.5
|
||||
github.com/godbus/dbus/v5 v5.1.0
|
||||
github.com/golang/mock v1.6.0
|
||||
github.com/google/go-cmp v0.6.0
|
||||
@@ -52,7 +50,6 @@ require (
|
||||
github.com/hashicorp/go-multierror v1.1.1
|
||||
github.com/hashicorp/go-secure-stdlib/base62 v0.1.2
|
||||
github.com/hashicorp/go-version v1.6.0
|
||||
github.com/lib/pq v1.10.9
|
||||
github.com/libp2p/go-netroute v0.2.1
|
||||
github.com/magiconair/properties v1.8.7
|
||||
github.com/mattn/go-sqlite3 v1.14.19
|
||||
@@ -60,7 +57,6 @@ require (
|
||||
github.com/miekg/dns v1.1.43
|
||||
github.com/mitchellh/hashstructure/v2 v2.0.2
|
||||
github.com/nadoo/ipset v0.5.0
|
||||
github.com/nats-io/nats.go v1.36.0
|
||||
github.com/netbirdio/management-integrations/integrations v0.0.0-20240703085513-32605f7ffd8e
|
||||
github.com/okta/okta-sdk-golang/v2 v2.18.0
|
||||
github.com/oschwald/maxminddb-golang v1.12.0
|
||||
@@ -79,7 +75,6 @@ require (
|
||||
github.com/things-go/go-socks5 v0.0.4
|
||||
github.com/yusufpapurcu/wmi v1.2.4
|
||||
github.com/zcalusic/sysinfo v1.0.2
|
||||
go.etcd.io/etcd/client/v3 v3.5.14
|
||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0
|
||||
go.opentelemetry.io/otel v1.26.0
|
||||
go.opentelemetry.io/otel/exporters/prometheus v0.48.0
|
||||
@@ -115,14 +110,12 @@ require (
|
||||
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
||||
github.com/containerd/containerd v1.7.16 // indirect
|
||||
github.com/containerd/log v0.1.0 // indirect
|
||||
github.com/coreos/go-semver v0.3.0 // indirect
|
||||
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
|
||||
github.com/cpuguy83/dockercfg v0.3.1 // indirect
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/dgraph-io/ristretto v0.1.1 // indirect
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
|
||||
github.com/distribution/reference v0.6.0 // indirect
|
||||
github.com/docker/docker v27.0.3+incompatible // indirect
|
||||
github.com/docker/docker v26.1.3+incompatible // indirect
|
||||
github.com/docker/go-connections v0.5.0 // indirect
|
||||
github.com/docker/go-units v0.5.0 // indirect
|
||||
github.com/felixge/httpsnoop v1.0.4 // indirect
|
||||
@@ -138,6 +131,7 @@ require (
|
||||
github.com/go-logr/logr v1.4.1 // indirect
|
||||
github.com/go-logr/stdr v1.2.2 // indirect
|
||||
github.com/go-ole/go-ole v1.3.0 // indirect
|
||||
github.com/go-redis/redis/v8 v8.11.5 // indirect
|
||||
github.com/go-stack/stack v1.8.0 // indirect
|
||||
github.com/gogo/protobuf v1.3.2 // indirect
|
||||
github.com/goki/freetype v0.0.0-20181231101311-fa8a33aabaff // indirect
|
||||
@@ -146,7 +140,6 @@ require (
|
||||
github.com/google/s2a-go v0.1.7 // indirect
|
||||
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
|
||||
github.com/googleapis/gax-go/v2 v2.12.3 // indirect
|
||||
github.com/gorilla/websocket v1.5.0 // indirect
|
||||
github.com/hashicorp/errwrap v1.1.0 // indirect
|
||||
github.com/hashicorp/go-uuid v1.0.2 // indirect
|
||||
github.com/inconshreveable/mousetrap v1.1.0 // indirect
|
||||
@@ -168,9 +161,6 @@ require (
|
||||
github.com/moby/sys/user v0.1.0 // indirect
|
||||
github.com/moby/term v0.5.0 // indirect
|
||||
github.com/morikuni/aec v1.0.0 // indirect
|
||||
github.com/nats-io/nkeys v0.4.7 // indirect
|
||||
github.com/nats-io/nuid v1.0.1 // indirect
|
||||
github.com/netbirdio/signal-dispatcher/dispatcher v0.0.0-20240801161502-2171ea0babf1 // indirect
|
||||
github.com/nxadm/tail v1.4.8 // indirect
|
||||
github.com/opencontainers/go-digest v1.0.0 // indirect
|
||||
github.com/opencontainers/image-spec v1.1.0 // indirect
|
||||
@@ -194,15 +184,10 @@ require (
|
||||
github.com/tklauser/numcpus v0.8.0 // indirect
|
||||
github.com/vishvananda/netns v0.0.4 // indirect
|
||||
github.com/yuin/goldmark v1.4.13 // indirect
|
||||
go.etcd.io/etcd/api/v3 v3.5.14 // indirect
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.5.14 // indirect
|
||||
go.opencensus.io v0.24.0 // indirect
|
||||
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.51.0 // indirect
|
||||
go.opentelemetry.io/otel/sdk v1.26.0 // indirect
|
||||
go.opentelemetry.io/otel/trace v1.26.0 // indirect
|
||||
go.uber.org/atomic v1.7.0 // indirect
|
||||
go.uber.org/multierr v1.6.0 // indirect
|
||||
go.uber.org/zap v1.17.0 // indirect
|
||||
golang.org/x/image v0.18.0 // indirect
|
||||
golang.org/x/text v0.16.0 // indirect
|
||||
golang.org/x/time v0.5.0 // indirect
|
||||
|
||||
35
go.sum
35
go.sum
@@ -62,10 +62,6 @@ github.com/coocood/freecache v1.2.1 h1:/v1CqMq45NFH9mp/Pt142reundeBM0dVUD3osQBeu
|
||||
github.com/coocood/freecache v1.2.1/go.mod h1:RBUWa/Cy+OHdfTGFEhEuE1pMCMX51Ncizj7rthiQ3vk=
|
||||
github.com/coreos/go-iptables v0.7.0 h1:XWM3V+MPRr5/q51NuWSgU0fqMad64Zyxs8ZUoMsamr8=
|
||||
github.com/coreos/go-iptables v0.7.0/go.mod h1:Qe8Bv2Xik5FyTXwgIbLAnv2sWSBmvWdFETJConOQ//Q=
|
||||
github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM=
|
||||
github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
|
||||
github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8iXXhfZs=
|
||||
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
|
||||
github.com/cpuguy83/dockercfg v0.3.1 h1:/FpZ+JaygUR/lZP2NlFI2DVfrOEMAIKP5wWEJdoYe9E=
|
||||
github.com/cpuguy83/dockercfg v0.3.1/go.mod h1:sugsbF4//dDlL/i+S+rtpIWp+5h0BHJHfjj5/jFyUJc=
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
|
||||
@@ -85,8 +81,8 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/r
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
|
||||
github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk=
|
||||
github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E=
|
||||
github.com/docker/docker v27.0.3+incompatible h1:aBGI9TeQ4MPlhquTQKq9XbK79rKFVwXNUAYz9aXyEBE=
|
||||
github.com/docker/docker v27.0.3+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
|
||||
github.com/docker/docker v26.1.3+incompatible h1:lLCzRbrVZrljpVNobJu1J2FHk8V0s4BawoZippkc+xo=
|
||||
github.com/docker/docker v26.1.3+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
|
||||
github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj1Br63c=
|
||||
github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc=
|
||||
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
|
||||
@@ -94,8 +90,6 @@ github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDD
|
||||
github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96/go.mod h1:Qh8CwZgvJUkLughtfhJv5dyTYa91l1fOUCrgjqmcifM=
|
||||
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
|
||||
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
|
||||
github.com/eclipse/paho.mqtt.golang v1.4.3 h1:2kwcUGn8seMUfWndX0hGbvH8r7crgcJguQNCyp70xik=
|
||||
github.com/eclipse/paho.mqtt.golang v1.4.3/go.mod h1:CSYvoAlsMkhYOXh/oKyxa8EcBci6dVkLCbo5tTC1RIE=
|
||||
github.com/eko/gocache/v3 v3.1.1 h1:r3CBwLnqPkcK56h9Do2CWw1kZ4TeKK0wDE1Oo/YZnhs=
|
||||
github.com/eko/gocache/v3 v3.1.1/go.mod h1:UpP/LyHAioP/a/dizgl0MpgZ3A3CkS4NbG/mWkGTQ9M=
|
||||
github.com/elazarl/goproxy v0.0.0-20170405201442-c4fc26588b6e/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
|
||||
@@ -234,11 +228,8 @@ github.com/gopherjs/gopherjs v0.0.0-20220410123724-9e86199038b0 h1:fWY+zXdWhvWnd
|
||||
github.com/gopherjs/gopherjs v0.0.0-20220410123724-9e86199038b0/go.mod h1:pRRIvn/QzFLrKfvEz3qUuEhtE/zLCWfreZ6J5gM2i+k=
|
||||
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
|
||||
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
|
||||
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
|
||||
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
||||
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.2-0.20240212192251-757544f21357 h1:Fkzd8ktnpOR9h47SXHe2AYPwelXLH2GjGsjlAloiWfo=
|
||||
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.2-0.20240212192251-757544f21357/go.mod h1:w9Y7gY31krpLmrVU5ZPG9H7l9fZuRu5/3R3S3FMtVQ4=
|
||||
github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4M0+kPpLofRdBo=
|
||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 h1:YBftPWNWd4WwGqtY2yeZL2ef8rHAxPBD8KFhJpmcqms=
|
||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0/go.mod h1:YN5jB8ie0yfIUg6VvR9Kz84aCaG7AsGZnLjhHbUqwPg=
|
||||
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
|
||||
@@ -338,12 +329,6 @@ github.com/munnerz/goautoneg v0.0.0-20120707110453-a547fc61f48d/go.mod h1:+n7T8m
|
||||
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw=
|
||||
github.com/nadoo/ipset v0.5.0 h1:5GJUAuZ7ITQQQGne5J96AmFjRtI8Avlbk6CabzYWVUc=
|
||||
github.com/nadoo/ipset v0.5.0/go.mod h1:rYF5DQLRGGoQ8ZSWeK+6eX5amAuPqwFkWjhQlEITGJQ=
|
||||
github.com/nats-io/nats.go v1.36.0 h1:suEUPuWzTSse/XhESwqLxXGuj8vGRuPRoG7MoRN/qyU=
|
||||
github.com/nats-io/nats.go v1.36.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
|
||||
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
|
||||
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
|
||||
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
|
||||
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
|
||||
github.com/netbirdio/go-netroute v0.0.0-20240611143515-f59b0e1d3944 h1:TDtJKmM6Sf8uYFx/dMeqNOL90KUoRscdfpFZ3Im89uk=
|
||||
github.com/netbirdio/go-netroute v0.0.0-20240611143515-f59b0e1d3944/go.mod h1:sHA6TRxjQ6RLbnI+3R4DZo2Eseg/iKiPRfNmcuNySVQ=
|
||||
github.com/netbirdio/ice/v3 v3.0.0-20240315174635-e72a50fcb64e h1:PURA50S8u4mF6RrkYYCAvvPCixhqqEiEy3Ej6avh04c=
|
||||
@@ -352,8 +337,6 @@ github.com/netbirdio/management-integrations/integrations v0.0.0-20240703085513-
|
||||
github.com/netbirdio/management-integrations/integrations v0.0.0-20240703085513-32605f7ffd8e/go.mod h1:nykwWZnxb+sJz2Z//CEq45CMRWSHllH8pODKRB8eY7Y=
|
||||
github.com/netbirdio/service v0.0.0-20230215170314-b923b89432b0 h1:hirFRfx3grVA/9eEyjME5/z3nxdJlN9kfQpvWWPk32g=
|
||||
github.com/netbirdio/service v0.0.0-20230215170314-b923b89432b0/go.mod h1:CIMRFEJVL+0DS1a3Nx06NaMn4Dz63Ng6O7dl0qH0zVM=
|
||||
github.com/netbirdio/signal-dispatcher/dispatcher v0.0.0-20240801161502-2171ea0babf1 h1:YMPC80v0/bCWXjpCraSSpgh7/5lsNUZ1qoxVzzmRu5k=
|
||||
github.com/netbirdio/signal-dispatcher/dispatcher v0.0.0-20240801161502-2171ea0babf1/go.mod h1:5/sjFmLb8O96B5737VCqhHyGRzNFIaN/Bu7ZodXc3qQ=
|
||||
github.com/netbirdio/systray v0.0.0-20231030152038-ef1ed2a27949 h1:xbWM9BU6mwZZLHxEjxIX/V8Hv3HurQt4mReIE4mY4DM=
|
||||
github.com/netbirdio/systray v0.0.0-20231030152038-ef1ed2a27949/go.mod h1:AecygODWIsBquJCJFop8MEQcJbWFfw/1yWbVabNgpCM=
|
||||
github.com/netbirdio/wireguard-go v0.0.0-20240105182236-6c340dd55aed h1:t0UADZUJDaaZgfKrt8JUPrOLL9Mg/ryjP85RAH53qgs=
|
||||
@@ -413,7 +396,6 @@ github.com/pion/transport/v3 v3.0.1/go.mod h1:UY7kiITrlMv7/IKgd5eTUcaahZx5oUN3l9
|
||||
github.com/pion/turn/v3 v3.0.1 h1:wLi7BTQr6/Q20R0vt/lHbjv6y4GChFtC33nkYbasoT8=
|
||||
github.com/pion/turn/v3 v3.0.1/go.mod h1:MrJDKgqryDyWy1/4NT9TWfXWGMC7UHT6pJIv1+gMeNE=
|
||||
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
|
||||
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/pmezard/go-difflib v0.0.0-20151028094244-d8ed2627bdf0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
@@ -516,12 +498,6 @@ github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo
|
||||
github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
|
||||
github.com/zcalusic/sysinfo v1.0.2 h1:nwTTo2a+WQ0NXwo0BGRojOJvJ/5XKvQih+2RrtWqfxc=
|
||||
github.com/zcalusic/sysinfo v1.0.2/go.mod h1:kluzTYflRWo6/tXVMJPdEjShsbPpsFRyy+p1mBQPC30=
|
||||
go.etcd.io/etcd/api/v3 v3.5.14 h1:vHObSCxyB9zlF60w7qzAdTcGaglbJOpSj1Xj9+WGxq0=
|
||||
go.etcd.io/etcd/api/v3 v3.5.14/go.mod h1:BmtWcRlQvwa1h3G2jvKYwIQy4PkHlDej5t7uLMUdJUU=
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.5.14 h1:SaNH6Y+rVEdxfpA2Jr5wkEvN6Zykme5+YnbCkxvuWxQ=
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.5.14/go.mod h1:8uMgAokyG1czCtIdsq+AGyYQMvpIKnSvPjFMunkgeZI=
|
||||
go.etcd.io/etcd/client/v3 v3.5.14 h1:CWfRs4FDaDoSz81giL7zPpZH2Z35tbOrAJkkjMqOupg=
|
||||
go.etcd.io/etcd/client/v3 v3.5.14/go.mod h1:k3XfdV/VIHy/97rqWjoUzrj9tk7GgJGH9J8L4dNXmAk=
|
||||
go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0=
|
||||
go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo=
|
||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 h1:4Pp6oUg3+e/6M4C0A/3kJ2VYa++dsWVTtGgLVj5xtHg=
|
||||
@@ -546,12 +522,6 @@ go.opentelemetry.io/otel/trace v1.26.0 h1:1ieeAUb4y0TE26jUFrCIXKpTuVK7uJGN9/Z/2L
|
||||
go.opentelemetry.io/otel/trace v1.26.0/go.mod h1:4iDxvGDQuUkHve82hJJ8UqrwswHYsZuWCBllGV2U2y0=
|
||||
go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I=
|
||||
go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM=
|
||||
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
|
||||
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
|
||||
go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
|
||||
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
|
||||
go.uber.org/zap v1.17.0 h1:MTjgFu6ZLKvY6Pvaqk97GlxNBuMpV4Hy/3P6tRGlI2U=
|
||||
go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo=
|
||||
goauthentik.io/api/v3 v3.2023051.3 h1:NebAhD/TeTWNo/9X3/Uj+rM5fG1HaiLOlKTNLQv9Qq4=
|
||||
goauthentik.io/api/v3 v3.2023051.3/go.mod h1:nYECml4jGbp/541hj8GcylKQG1gVBsKppHy4+7G8u4U=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
@@ -777,7 +747,6 @@ gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
|
||||
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gorm.io/driver/postgres v1.5.7 h1:8ptbNJTDbEmhdr62uReG5BGkdQyeasu/FZHxI0IMGnM=
|
||||
|
||||
@@ -770,6 +770,10 @@ func (a *Account) GetPeer(peerID string) *nbpeer.Peer {
|
||||
// SetJWTGroups updates the user's auto groups by synchronizing JWT groups.
|
||||
// Returns true if there are changes in the JWT group membership.
|
||||
func (a *Account) SetJWTGroups(userID string, groupsNames []string) bool {
|
||||
if len(groupsNames) == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
user, ok := a.Users[userID]
|
||||
if !ok {
|
||||
return false
|
||||
|
||||
@@ -2219,13 +2219,6 @@ func TestAccount_SetJWTGroups(t *testing.T) {
|
||||
assert.Len(t, account.Users["user2"].AutoGroups, 1, "new group should be added")
|
||||
assert.Contains(t, account.Groups, account.Users["user2"].AutoGroups[0], "groups must contain group3 from user groups")
|
||||
})
|
||||
|
||||
t.Run("remove all JWT groups", func(t *testing.T) {
|
||||
updated := account.SetJWTGroups("user1", []string{})
|
||||
assert.True(t, updated, "account should be updated")
|
||||
assert.Len(t, account.Users["user1"].AutoGroups, 1, "only non-JWT groups should remain")
|
||||
assert.Contains(t, account.Users["user1"].AutoGroups, "group1", " group1 should still be present")
|
||||
})
|
||||
}
|
||||
|
||||
func TestAccount_UserGroupsAddToPeers(t *testing.T) {
|
||||
|
||||
@@ -666,26 +666,6 @@ func (s *FileStore) SaveInstallationID(ctx context.Context, ID string) error {
|
||||
return s.persist(ctx, s.storeFile)
|
||||
}
|
||||
|
||||
// SavePeer saves the peer in the account
|
||||
func (s *FileStore) SavePeer(_ context.Context, accountID string, peer *nbpeer.Peer) error {
|
||||
s.mux.Lock()
|
||||
defer s.mux.Unlock()
|
||||
|
||||
account, err := s.getAccount(accountID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
newPeer := peer.Copy()
|
||||
|
||||
account.Peers[peer.ID] = newPeer
|
||||
|
||||
s.PeerKeyID2AccountID[peer.Key] = accountID
|
||||
s.PeerID2AccountID[peer.ID] = accountID
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// SavePeerStatus stores the PeerStatus in memory. It doesn't attempt to persist data to speed up things.
|
||||
// PeerStatus will be saved eventually when some other changes occur.
|
||||
func (s *FileStore) SavePeerStatus(accountID, peerID string, peerStatus nbpeer.PeerStatus) error {
|
||||
|
||||
@@ -7,11 +7,10 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/netbirdio/netbird/management/server/posture"
|
||||
"github.com/rs/xid"
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/netbirdio/netbird/management/server/posture"
|
||||
|
||||
"github.com/netbirdio/netbird/management/proto"
|
||||
"github.com/netbirdio/netbird/management/server/activity"
|
||||
nbpeer "github.com/netbirdio/netbird/management/server/peer"
|
||||
@@ -453,17 +452,6 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
|
||||
Location: peer.Location,
|
||||
}
|
||||
|
||||
if am.geo != nil && newPeer.Location.ConnectionIP != nil {
|
||||
location, err := am.geo.Lookup(newPeer.Location.ConnectionIP)
|
||||
if err != nil {
|
||||
log.WithContext(ctx).Warnf("failed to get location for new peer realip: [%s]: %v", newPeer.Location.ConnectionIP.String(), err)
|
||||
} else {
|
||||
newPeer.Location.CountryCode = location.Country.ISOCode
|
||||
newPeer.Location.CityName = location.City.Names.En
|
||||
newPeer.Location.GeoNameID = location.City.GeonameID
|
||||
}
|
||||
}
|
||||
|
||||
// add peer to 'All' group
|
||||
group, err := account.GetGroupAll()
|
||||
if err != nil {
|
||||
@@ -546,12 +534,12 @@ func (am *DefaultAccountManager) SyncPeer(ctx context.Context, sync PeerSync, ac
|
||||
}
|
||||
|
||||
if peerLoginExpired(ctx, peer, account.Settings) {
|
||||
return nil, nil, nil, status.NewPeerLoginExpiredError()
|
||||
return nil, nil, nil, status.Errorf(status.PermissionDenied, "peer login has expired, please log in once more")
|
||||
}
|
||||
|
||||
peer, updated := updatePeerMeta(peer, sync.Meta, account)
|
||||
if updated {
|
||||
err = am.Store.SavePeer(ctx, account.Id, peer)
|
||||
err = am.Store.SaveAccount(ctx, account)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
@@ -597,10 +585,21 @@ func (am *DefaultAccountManager) LoginPeer(ctx context.Context, login PeerLogin)
|
||||
// we couldn't find this peer by its public key which can mean that peer hasn't been registered yet.
|
||||
// Try registering it.
|
||||
newPeer := &nbpeer.Peer{
|
||||
Key: login.WireGuardPubKey,
|
||||
Meta: login.Meta,
|
||||
SSHKey: login.SSHKey,
|
||||
Location: nbpeer.Location{ConnectionIP: login.ConnectionIP},
|
||||
Key: login.WireGuardPubKey,
|
||||
Meta: login.Meta,
|
||||
SSHKey: login.SSHKey,
|
||||
}
|
||||
if am.geo != nil && login.ConnectionIP != nil {
|
||||
location, err := am.geo.Lookup(login.ConnectionIP)
|
||||
if err != nil {
|
||||
log.WithContext(ctx).Warnf("failed to get location for new peer realip: [%s]: %v", login.ConnectionIP.String(), err)
|
||||
} else {
|
||||
newPeer.Location.ConnectionIP = login.ConnectionIP
|
||||
newPeer.Location.CountryCode = location.Country.ISOCode
|
||||
newPeer.Location.CityName = location.City.Names.En
|
||||
newPeer.Location.GeoNameID = location.City.GeonameID
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
return am.AddPeer(ctx, login.SetupKey, login.UserID, newPeer)
|
||||
@@ -610,17 +609,44 @@ func (am *DefaultAccountManager) LoginPeer(ctx context.Context, login PeerLogin)
|
||||
return nil, nil, nil, status.Errorf(status.Internal, "failed while logging in peer")
|
||||
}
|
||||
|
||||
// when the client sends a login request with a JWT which is used to get the user ID,
|
||||
// it means that the client has already checked if it needs login and had been through the SSO flow
|
||||
// so, we can skip this check and directly proceed with the login
|
||||
if login.UserID == "" {
|
||||
err = am.checkIFPeerNeedsLoginWithoutLock(ctx, accountID, login)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
peer, err := am.Store.GetPeerByPeerPubKey(ctx, login.WireGuardPubKey)
|
||||
if err != nil {
|
||||
return nil, nil, nil, status.NewPeerNotRegisteredError()
|
||||
}
|
||||
|
||||
unlock := am.Store.AcquireAccountWriteLock(ctx, accountID)
|
||||
accSettings, err := am.Store.GetAccountSettings(ctx, accountID)
|
||||
if err != nil {
|
||||
return nil, nil, nil, status.Errorf(status.Internal, "failed to get account settings: %s", err)
|
||||
}
|
||||
|
||||
var isWriteLock bool
|
||||
|
||||
// duplicated logic from after the lock to have an early exit
|
||||
expired := peerLoginExpired(ctx, peer, accSettings)
|
||||
switch {
|
||||
case expired:
|
||||
if err := checkAuth(ctx, login.UserID, peer); err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
isWriteLock = true
|
||||
log.WithContext(ctx).Debugf("peer login expired, acquiring write lock")
|
||||
|
||||
case peer.UpdateMetaIfNew(login.Meta):
|
||||
isWriteLock = true
|
||||
log.WithContext(ctx).Debugf("peer changed meta, acquiring write lock")
|
||||
|
||||
default:
|
||||
isWriteLock = false
|
||||
log.WithContext(ctx).Debugf("peer meta is the same, acquiring read lock")
|
||||
}
|
||||
|
||||
var unlock func()
|
||||
|
||||
if isWriteLock {
|
||||
unlock = am.Store.AcquireAccountWriteLock(ctx, accountID)
|
||||
} else {
|
||||
unlock = am.Store.AcquireAccountReadLock(ctx, accountID)
|
||||
}
|
||||
defer func() {
|
||||
if unlock != nil {
|
||||
unlock()
|
||||
@@ -633,7 +659,7 @@ func (am *DefaultAccountManager) LoginPeer(ctx context.Context, login PeerLogin)
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
peer, err := account.FindPeerByPubKey(login.WireGuardPubKey)
|
||||
peer, err = account.FindPeerByPubKey(login.WireGuardPubKey)
|
||||
if err != nil {
|
||||
return nil, nil, nil, status.NewPeerNotRegisteredError()
|
||||
}
|
||||
@@ -644,39 +670,53 @@ func (am *DefaultAccountManager) LoginPeer(ctx context.Context, login PeerLogin)
|
||||
}
|
||||
|
||||
// this flag prevents unnecessary calls to the persistent store.
|
||||
shouldStorePeer := false
|
||||
shouldStoreAccount := false
|
||||
updateRemotePeers := false
|
||||
if peerLoginExpired(ctx, peer, account.Settings) {
|
||||
err = am.handleExpiredPeer(ctx, login, account, peer)
|
||||
err = checkAuth(ctx, login.UserID, peer)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
// If peer was expired before and if it reached this point, it is re-authenticated.
|
||||
// UserID is present, meaning that JWT validation passed successfully in the API layer.
|
||||
updatePeerLastLogin(peer, account)
|
||||
updateRemotePeers = true
|
||||
shouldStorePeer = true
|
||||
shouldStoreAccount = true
|
||||
|
||||
// sync user last login with peer last login
|
||||
user, err := account.FindUser(login.UserID)
|
||||
if err != nil {
|
||||
return nil, nil, nil, status.Errorf(status.Internal, "couldn't find user")
|
||||
}
|
||||
user.updateLastLogin(peer.LastLogin)
|
||||
|
||||
am.StoreEvent(ctx, login.UserID, peer.ID, account.Id, activity.UserLoggedInPeer, peer.EventMeta(am.GetDNSDomain()))
|
||||
}
|
||||
|
||||
isRequiresApproval, isStatusChanged, err := am.integratedPeerValidator.IsNotValidPeer(ctx, account.Id, peer, account.GetPeerGroupsList(peer.ID), account.Settings.Extra)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
peer, updated := updatePeerMeta(peer, login.Meta, account)
|
||||
if updated {
|
||||
shouldStorePeer = true
|
||||
shouldStoreAccount = true
|
||||
}
|
||||
|
||||
if peer.SSHKey != login.SSHKey {
|
||||
peer.SSHKey = login.SSHKey
|
||||
shouldStorePeer = true
|
||||
peer, err = am.checkAndUpdatePeerSSHKey(ctx, peer, account, login.SSHKey)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
if shouldStorePeer {
|
||||
err = am.Store.SavePeer(ctx, accountID, peer)
|
||||
if shouldStoreAccount {
|
||||
if !isWriteLock {
|
||||
log.WithContext(ctx).Errorf("account %s should be stored but is not write locked", accountID)
|
||||
return nil, nil, nil, status.Errorf(status.Internal, "account should be stored but is not write locked")
|
||||
}
|
||||
err = am.Store.SaveAccount(ctx, account)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
}
|
||||
|
||||
unlock()
|
||||
unlock = nil
|
||||
|
||||
@@ -684,46 +724,13 @@ func (am *DefaultAccountManager) LoginPeer(ctx context.Context, login PeerLogin)
|
||||
am.updateAccountPeers(ctx, account)
|
||||
}
|
||||
|
||||
return am.getValidatedPeerWithMap(ctx, isRequiresApproval, account, peer)
|
||||
}
|
||||
|
||||
// checkIFPeerNeedsLoginWithoutLock checks if the peer needs login without acquiring the account lock. The check validate if the peer was not added via SSO
|
||||
// and if the peer login is expired.
|
||||
// The NetBird client doesn't have a way to check if the peer needs login besides sending a login request
|
||||
// with no JWT token and usually no setup-key. As the client can send up to two login request to check if it is expired
|
||||
// and before starting the engine, we do the checks without an account lock to avoid piling up requests.
|
||||
func (am *DefaultAccountManager) checkIFPeerNeedsLoginWithoutLock(ctx context.Context, accountID string, login PeerLogin) error {
|
||||
peer, err := am.Store.GetPeerByPeerPubKey(ctx, login.WireGuardPubKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// if the peer was not added with SSO login we can exit early because peers activated with setup-key
|
||||
// doesn't expire, and we avoid extra databases calls.
|
||||
if !peer.AddedWithSSOLogin() {
|
||||
return nil
|
||||
}
|
||||
|
||||
settings, err := am.Store.GetAccountSettings(ctx, accountID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if peerLoginExpired(ctx, peer, settings) {
|
||||
return status.NewPeerLoginExpiredError()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (am *DefaultAccountManager) getValidatedPeerWithMap(ctx context.Context, isRequiresApproval bool, account *Account, peer *nbpeer.Peer) (*nbpeer.Peer, *NetworkMap, []*posture.Checks, error) {
|
||||
var postureChecks []*posture.Checks
|
||||
|
||||
if isRequiresApproval {
|
||||
emptyMap := &NetworkMap{
|
||||
Network: account.Network.Copy(),
|
||||
}
|
||||
return peer, emptyMap, nil, nil
|
||||
return peer, emptyMap, postureChecks, nil
|
||||
}
|
||||
|
||||
approvedPeersMap, err := am.GetValidatedPeers(account)
|
||||
@@ -735,30 +742,6 @@ func (am *DefaultAccountManager) getValidatedPeerWithMap(ctx context.Context, is
|
||||
return peer, account.GetPeerNetworkMap(ctx, peer.ID, am.dnsDomain, approvedPeersMap), postureChecks, nil
|
||||
}
|
||||
|
||||
func (am *DefaultAccountManager) handleExpiredPeer(ctx context.Context, login PeerLogin, account *Account, peer *nbpeer.Peer) error {
|
||||
err := checkAuth(ctx, login.UserID, peer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// If peer was expired before and if it reached this point, it is re-authenticated.
|
||||
// UserID is present, meaning that JWT validation passed successfully in the API layer.
|
||||
updatePeerLastLogin(peer, account)
|
||||
|
||||
// sync user last login with peer last login
|
||||
user, err := account.FindUser(login.UserID)
|
||||
if err != nil {
|
||||
return status.Errorf(status.Internal, "couldn't find user")
|
||||
}
|
||||
|
||||
err = am.Store.SaveUserLastLogin(account.Id, user.Id, peer.LastLogin)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
am.StoreEvent(ctx, login.UserID, peer.ID, account.Id, activity.UserLoggedInPeer, peer.EventMeta(am.GetDNSDomain()))
|
||||
return nil
|
||||
}
|
||||
|
||||
func checkIfPeerOwnerIsBlocked(peer *nbpeer.Peer, account *Account) error {
|
||||
if peer.AddedWithSSOLogin() {
|
||||
user, err := account.FindUser(peer.UserID)
|
||||
@@ -775,11 +758,11 @@ func checkIfPeerOwnerIsBlocked(peer *nbpeer.Peer, account *Account) error {
|
||||
func checkAuth(ctx context.Context, loginUserID string, peer *nbpeer.Peer) error {
|
||||
if loginUserID == "" {
|
||||
// absence of a user ID indicates that JWT wasn't provided.
|
||||
return status.NewPeerLoginExpiredError()
|
||||
return status.Errorf(status.PermissionDenied, "peer login has expired, please log in once more")
|
||||
}
|
||||
if peer.UserID != loginUserID {
|
||||
log.WithContext(ctx).Warnf("user mismatch when logging in peer %s: peer user %s, login user %s ", peer.ID, peer.UserID, loginUserID)
|
||||
return status.Errorf(status.Unauthenticated, "can't login with this credentials")
|
||||
return status.Errorf(status.Unauthenticated, "can't login")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -799,6 +782,31 @@ func updatePeerLastLogin(peer *nbpeer.Peer, account *Account) {
|
||||
account.UpdatePeer(peer)
|
||||
}
|
||||
|
||||
func (am *DefaultAccountManager) checkAndUpdatePeerSSHKey(ctx context.Context, peer *nbpeer.Peer, account *Account, newSSHKey string) (*nbpeer.Peer, error) {
|
||||
if len(newSSHKey) == 0 {
|
||||
log.WithContext(ctx).Debugf("no new SSH key provided for peer %s, skipping update", peer.ID)
|
||||
return peer, nil
|
||||
}
|
||||
|
||||
if peer.SSHKey == newSSHKey {
|
||||
log.WithContext(ctx).Debugf("same SSH key provided for peer %s, skipping update", peer.ID)
|
||||
return peer, nil
|
||||
}
|
||||
|
||||
peer.SSHKey = newSSHKey
|
||||
account.UpdatePeer(peer)
|
||||
|
||||
err := am.Store.SaveAccount(ctx, account)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// trigger network map update
|
||||
am.updateAccountPeers(ctx, account)
|
||||
|
||||
return peer, nil
|
||||
}
|
||||
|
||||
// UpdatePeerSSHKey updates peer's public SSH key
|
||||
func (am *DefaultAccountManager) UpdatePeerSSHKey(ctx context.Context, peerID string, sshKey string) error {
|
||||
if sshKey == "" {
|
||||
|
||||
@@ -31,10 +31,8 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
storeSqliteFileName = "store.db"
|
||||
idQueryCondition = "id = ?"
|
||||
accountAndIDQueryCondition = "account_id = ? and id = ?"
|
||||
peerNotFoundFMT = "peer %s not found"
|
||||
storeSqliteFileName = "store.db"
|
||||
idQueryCondition = "id = ?"
|
||||
)
|
||||
|
||||
// SqlStore represents an account storage backed by a Sql DB persisted to disk
|
||||
@@ -273,38 +271,6 @@ func (s *SqlStore) GetInstallationID() string {
|
||||
return installation.InstallationIDValue
|
||||
}
|
||||
|
||||
func (s *SqlStore) SavePeer(ctx context.Context, accountID string, peer *nbpeer.Peer) error {
|
||||
// To maintain data integrity, we create a copy of the peer's to prevent unintended updates to other fields.
|
||||
peerCopy := peer.Copy()
|
||||
peerCopy.AccountID = accountID
|
||||
|
||||
err := s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
|
||||
// check if peer exists before saving
|
||||
var peerID string
|
||||
result := tx.Model(&nbpeer.Peer{}).Select("id").Find(&peerID, accountAndIDQueryCondition, accountID, peer.ID)
|
||||
if result.Error != nil {
|
||||
return result.Error
|
||||
}
|
||||
|
||||
if peerID == "" {
|
||||
return status.Errorf(status.NotFound, peerNotFoundFMT, peer.ID)
|
||||
}
|
||||
|
||||
result = tx.Model(&nbpeer.Peer{}).Where(accountAndIDQueryCondition, accountID, peer.ID).Save(peerCopy)
|
||||
if result.Error != nil {
|
||||
return result.Error
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *SqlStore) SavePeerStatus(accountID, peerID string, peerStatus nbpeer.PeerStatus) error {
|
||||
var peerCopy nbpeer.Peer
|
||||
peerCopy.Status = &peerStatus
|
||||
@@ -315,14 +281,14 @@ func (s *SqlStore) SavePeerStatus(accountID, peerID string, peerStatus nbpeer.Pe
|
||||
}
|
||||
result := s.db.Model(&nbpeer.Peer{}).
|
||||
Select(fieldsToUpdate).
|
||||
Where(accountAndIDQueryCondition, accountID, peerID).
|
||||
Where("account_id = ? AND id = ?", accountID, peerID).
|
||||
Updates(&peerCopy)
|
||||
if result.Error != nil {
|
||||
return result.Error
|
||||
}
|
||||
|
||||
if result.RowsAffected == 0 {
|
||||
return status.Errorf(status.NotFound, peerNotFoundFMT, peerID)
|
||||
return status.Errorf(status.NotFound, "peer %s not found", peerID)
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -336,7 +302,7 @@ func (s *SqlStore) SavePeerLocation(accountID string, peerWithLocation *nbpeer.P
|
||||
peerCopy.Location = peerWithLocation.Location
|
||||
|
||||
result := s.db.Model(&nbpeer.Peer{}).
|
||||
Where(accountAndIDQueryCondition, accountID, peerWithLocation.ID).
|
||||
Where("account_id = ? and id = ?", accountID, peerWithLocation.ID).
|
||||
Updates(peerCopy)
|
||||
|
||||
if result.Error != nil {
|
||||
@@ -344,7 +310,7 @@ func (s *SqlStore) SavePeerLocation(accountID string, peerWithLocation *nbpeer.P
|
||||
}
|
||||
|
||||
if result.RowsAffected == 0 {
|
||||
return status.Errorf(status.NotFound, peerNotFoundFMT, peerWithLocation.ID)
|
||||
return status.Errorf(status.NotFound, "peer %s not found", peerWithLocation.ID)
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -678,7 +644,7 @@ func (s *SqlStore) GetAccountSettings(ctx context.Context, accountID string) (*S
|
||||
func (s *SqlStore) SaveUserLastLogin(accountID, userID string, lastLogin time.Time) error {
|
||||
var user User
|
||||
|
||||
result := s.db.First(&user, accountAndIDQueryCondition, accountID, userID)
|
||||
result := s.db.First(&user, "account_id = ? and id = ?", accountID, userID)
|
||||
if result.Error != nil {
|
||||
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
|
||||
return status.Errorf(status.NotFound, "user %s not found", userID)
|
||||
|
||||
@@ -55,6 +55,37 @@ func TestSqlite_SaveAccount_Large(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
// generatePeerMeta generates nbpeer.PeerSystemMeta with all fields and multiple NetworkAddresses
|
||||
func generatePeerMeta() nbpeer.PeerSystemMeta {
|
||||
return nbpeer.PeerSystemMeta{
|
||||
OS: "Linux",
|
||||
OSVersion: "5.4.0-1043-aws",
|
||||
NetworkAddresses: []nbpeer.NetworkAddress{
|
||||
{NetIP: netip.MustParsePrefix("192.168.0.0/24"), Mac: "00:00:00:00:00:00"},
|
||||
{NetIP: netip.MustParsePrefix("192.168.0.0/24"), Mac: "00:00:00:00:00:00"},
|
||||
{NetIP: netip.MustParsePrefix("192.168.0.0/24"), Mac: "00:00:00:00:00:00"},
|
||||
{NetIP: netip.MustParsePrefix("192.168.0.0/24"), Mac: "00:00:00:00:00:00"},
|
||||
{NetIP: netip.MustParsePrefix("192.168.0.0/24"), Mac: "00:00:00:00:00:00"},
|
||||
{NetIP: netip.MustParsePrefix("192.168.0.0/24"), Mac: "00:00:00:00:00:00"},
|
||||
{NetIP: netip.MustParsePrefix("192.168.0.0/24"), Mac: "00:00:00:00:00:00"},
|
||||
{NetIP: netip.MustParsePrefix("192.168.0.0/24"), Mac: "00:00:00:00:00:00"},
|
||||
{NetIP: netip.MustParsePrefix("192.168.0.0/24"), Mac: "00:00:00:00:00:00"},
|
||||
{NetIP: netip.MustParsePrefix("192.168.0.0/24"), Mac: "00:00:00:00:00:00"},
|
||||
{NetIP: netip.MustParsePrefix("192.168.0.0/24"), Mac: "00:00:00:00:00:00"},
|
||||
{NetIP: netip.MustParsePrefix("192.168.0.0/24"), Mac: "00:00:00:00:00:00"},
|
||||
{NetIP: netip.MustParsePrefix("192.168.0.0/24"), Mac: "00:00:00:00:00:00"},
|
||||
{NetIP: netip.MustParsePrefix("192.168.0.0/24"), Mac: "00:00:00:00:00:00"},
|
||||
{NetIP: netip.MustParsePrefix("192.168.0.0/24"), Mac: "00:00:00:00:00:00"},
|
||||
{NetIP: netip.MustParsePrefix("192.168.0.0/24"), Mac: "00:00:00:00:00:00"},
|
||||
{NetIP: netip.MustParsePrefix("192.168.0.0/24"), Mac: "00:00:00:00:00:00"},
|
||||
{NetIP: netip.MustParsePrefix("192.168.0.0/24"), Mac: "00:00:00:00:00:00"},
|
||||
{NetIP: netip.MustParsePrefix("192.168.0.0/24"), Mac: "00:00:00:00:00:00"},
|
||||
{NetIP: netip.MustParsePrefix("192.168.0.0/24"), Mac: "00:00:00:00:00:00"},
|
||||
{NetIP: netip.MustParsePrefix("192.168.0.0/24"), Mac: "00:00:00:00:00:00"},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func runLargeTest(t *testing.T, store Store) {
|
||||
t.Helper()
|
||||
|
||||
@@ -80,6 +111,7 @@ func runLargeTest(t *testing.T, store Store) {
|
||||
UserID: userID,
|
||||
Status: &nbpeer.PeerStatus{Connected: false, LastSeen: time.Now()},
|
||||
SSHEnabled: false,
|
||||
Meta: generatePeerMeta(),
|
||||
}
|
||||
account.Peers[peerID] = peer
|
||||
group, _ := account.GetGroupAll()
|
||||
@@ -129,17 +161,24 @@ func runLargeTest(t *testing.T, store Store) {
|
||||
account.SetupKeys[setupKey.Key] = setupKey
|
||||
}
|
||||
|
||||
// display number of objects in the Account
|
||||
t.Logf("Account has %d Peers, %d Users, %d Routes, %d NameServerGroups, %d SetupKeys", len(account.Peers), len(account.Users), len(account.Routes), len(account.NameServerGroups), len(account.SetupKeys))
|
||||
|
||||
start := time.Now()
|
||||
err = store.SaveAccount(context.Background(), account)
|
||||
require.NoError(t, err)
|
||||
t.Logf("SaveAccount took %s", time.Since(start))
|
||||
|
||||
if len(store.GetAllAccounts(context.Background())) != 1 {
|
||||
t.Errorf("expecting 1 Accounts to be stored after SaveAccount()")
|
||||
}
|
||||
|
||||
start = time.Now()
|
||||
a, err := store.GetAccount(context.Background(), account.Id)
|
||||
if a == nil {
|
||||
t.Errorf("expecting Account to be stored after SaveAccount(): %v", err)
|
||||
}
|
||||
t.Logf("GetAccount took %s", time.Since(start))
|
||||
|
||||
if a != nil && len(a.Policies) != 1 {
|
||||
t.Errorf("expecting Account to have one policy stored after SaveAccount(), got %d", len(a.Policies))
|
||||
@@ -362,54 +401,6 @@ func TestSqlite_GetAccount(t *testing.T) {
|
||||
require.Equal(t, status.NotFound, parsedErr.Type(), "should return not found error")
|
||||
}
|
||||
|
||||
func TestSqlite_SavePeer(t *testing.T) {
|
||||
if runtime.GOOS == "windows" {
|
||||
t.Skip("The SQLite store is not properly supported by Windows yet")
|
||||
}
|
||||
|
||||
store := newSqliteStoreFromFile(t, "testdata/store.json")
|
||||
|
||||
account, err := store.GetAccount(context.Background(), "bf1c8084-ba50-4ce7-9439-34653001fc3b")
|
||||
require.NoError(t, err)
|
||||
|
||||
// save status of non-existing peer
|
||||
peer := &nbpeer.Peer{
|
||||
Key: "peerkey",
|
||||
ID: "testpeer",
|
||||
SetupKey: "peerkeysetupkey",
|
||||
IP: net.IP{127, 0, 0, 1},
|
||||
Meta: nbpeer.PeerSystemMeta{Hostname: "testingpeer"},
|
||||
Name: "peer name",
|
||||
Status: &nbpeer.PeerStatus{Connected: true, LastSeen: time.Now().UTC()},
|
||||
}
|
||||
ctx := context.Background()
|
||||
err = store.SavePeer(ctx, account.Id, peer)
|
||||
assert.Error(t, err)
|
||||
parsedErr, ok := status.FromError(err)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, status.NotFound, parsedErr.Type(), "should return not found error")
|
||||
|
||||
// save new status of existing peer
|
||||
account.Peers[peer.ID] = peer
|
||||
|
||||
err = store.SaveAccount(context.Background(), account)
|
||||
require.NoError(t, err)
|
||||
|
||||
updatedPeer := peer.Copy()
|
||||
updatedPeer.Status.Connected = false
|
||||
updatedPeer.Meta.Hostname = "updatedpeer"
|
||||
|
||||
err = store.SavePeer(ctx, account.Id, updatedPeer)
|
||||
require.NoError(t, err)
|
||||
|
||||
account, err = store.GetAccount(context.Background(), account.Id)
|
||||
require.NoError(t, err)
|
||||
|
||||
actual := account.Peers[peer.ID]
|
||||
assert.Equal(t, updatedPeer.Status, actual.Status)
|
||||
assert.Equal(t, updatedPeer.Meta, actual.Meta)
|
||||
}
|
||||
|
||||
func TestSqlite_SavePeerStatus(t *testing.T) {
|
||||
if runtime.GOOS == "windows" {
|
||||
t.Skip("The SQLite store is not properly supported by Windows yet")
|
||||
@@ -450,19 +441,7 @@ func TestSqlite_SavePeerStatus(t *testing.T) {
|
||||
|
||||
actual := account.Peers["testpeer"].Status
|
||||
assert.Equal(t, newStatus, *actual)
|
||||
|
||||
newStatus.Connected = true
|
||||
|
||||
err = store.SavePeerStatus(account.Id, "testpeer", newStatus)
|
||||
require.NoError(t, err)
|
||||
|
||||
account, err = store.GetAccount(context.Background(), account.Id)
|
||||
require.NoError(t, err)
|
||||
|
||||
actual = account.Peers["testpeer"].Status
|
||||
assert.Equal(t, newStatus, *actual)
|
||||
}
|
||||
|
||||
func TestSqlite_SavePeerLocation(t *testing.T) {
|
||||
if runtime.GOOS == "windows" {
|
||||
t.Skip("The SQLite store is not properly supported by Windows yet")
|
||||
|
||||
@@ -95,8 +95,3 @@ func NewUserNotFoundError(userKey string) error {
|
||||
func NewPeerNotRegisteredError() error {
|
||||
return Errorf(Unauthenticated, "peer is not registered")
|
||||
}
|
||||
|
||||
// NewPeerLoginExpiredError creates a new Error with PermissionDenied type for an expired peer
|
||||
func NewPeerLoginExpiredError() error {
|
||||
return Errorf(PermissionDenied, "peer login has expired, please log in once more")
|
||||
}
|
||||
|
||||
@@ -12,11 +12,10 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
nbgroup "github.com/netbirdio/netbird/management/server/group"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"gorm.io/gorm"
|
||||
|
||||
nbgroup "github.com/netbirdio/netbird/management/server/group"
|
||||
|
||||
"github.com/netbirdio/netbird/management/server/telemetry"
|
||||
"github.com/netbirdio/netbird/util"
|
||||
|
||||
@@ -55,7 +54,6 @@ type Store interface {
|
||||
AcquireAccountReadLock(ctx context.Context, accountID string) func()
|
||||
// AcquireGlobalLock should attempt to acquire a global lock and return a function that releases the lock
|
||||
AcquireGlobalLock(ctx context.Context) func()
|
||||
SavePeer(ctx context.Context, accountID string, peer *nbpeer.Peer) error
|
||||
SavePeerStatus(accountID, peerID string, status nbpeer.PeerStatus) error
|
||||
SavePeerLocation(accountID string, peer *nbpeer.Peer) error
|
||||
SaveUserLastLogin(accountID, userID string, lastLogin time.Time) error
|
||||
|
||||
@@ -90,7 +90,7 @@ var (
|
||||
return err
|
||||
}
|
||||
|
||||
metricsServer, err := metrics.NewServer(metricsPort, "")
|
||||
metricsServer := metrics.NewServer(metricsPort, "")
|
||||
if err != nil {
|
||||
return fmt.Errorf("setup metrics: %v", err)
|
||||
}
|
||||
|
||||
@@ -26,10 +26,10 @@ type Metrics struct {
|
||||
}
|
||||
|
||||
// NewServer initializes and returns a new Metrics instance
|
||||
func NewServer(port int, endpoint string) (*Metrics, error) {
|
||||
func NewServer(port int, endpoint string) *Metrics {
|
||||
exporter, err := prometheus.New()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil
|
||||
}
|
||||
|
||||
provider := metric.NewMeterProvider(metric.WithReader(exporter))
|
||||
@@ -57,7 +57,7 @@ func NewServer(port int, endpoint string) (*Metrics, error) {
|
||||
provider: provider,
|
||||
Endpoint: endpoint,
|
||||
Server: server,
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Shutdown stops the metrics server
|
||||
|
||||
@@ -18,20 +18,16 @@ type Peer struct {
|
||||
|
||||
StreamID int64
|
||||
|
||||
// a gRpc connection stream to the Peer
|
||||
//a gRpc connection stream to the Peer
|
||||
Stream proto.SignalExchange_ConnectStreamServer
|
||||
|
||||
// registration time
|
||||
RegisteredAt time.Time
|
||||
}
|
||||
|
||||
// NewPeer creates a new instance of a connected Peer
|
||||
func NewPeer(id string, stream proto.SignalExchange_ConnectStreamServer) *Peer {
|
||||
return &Peer{
|
||||
Id: id,
|
||||
Stream: stream,
|
||||
StreamID: time.Now().UnixNano(),
|
||||
RegisteredAt: time.Now(),
|
||||
Id: id,
|
||||
Stream: stream,
|
||||
StreamID: time.Now().UnixNano(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -6,7 +6,6 @@ import (
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"github.com/netbirdio/signal-dispatcher/dispatcher"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
@@ -41,8 +40,8 @@ const (
|
||||
type Server struct {
|
||||
registry *peer.Registry
|
||||
proto.UnimplementedSignalExchangeServer
|
||||
dispatcher *dispatcher.Dispatcher
|
||||
metrics *metrics.AppMetrics
|
||||
|
||||
metrics *metrics.AppMetrics
|
||||
}
|
||||
|
||||
// NewServer creates a new Signal server
|
||||
@@ -52,14 +51,9 @@ func NewServer(meter metric.Meter) (*Server, error) {
|
||||
return nil, fmt.Errorf("creating app metrics: %v", err)
|
||||
}
|
||||
|
||||
dispatcher, err := dispatcher.NewDispatcher()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("creating dispatcher: %v", err)
|
||||
}
|
||||
s := &Server{
|
||||
dispatcher: dispatcher,
|
||||
registry: peer.NewRegistry(appMetrics),
|
||||
metrics: appMetrics,
|
||||
registry: peer.NewRegistry(appMetrics),
|
||||
metrics: appMetrics,
|
||||
}
|
||||
|
||||
return s, nil
|
||||
@@ -67,26 +61,57 @@ func NewServer(meter metric.Meter) (*Server, error) {
|
||||
|
||||
// Send forwards a message to the signal peer
|
||||
func (s *Server) Send(ctx context.Context, msg *proto.EncryptedMessage) (*proto.EncryptedMessage, error) {
|
||||
log.Debugf("received a new message to send from peer [%s] to peer [%s]", msg.Key, msg.RemoteKey)
|
||||
if !s.registry.IsPeerRegistered(msg.Key) {
|
||||
s.metrics.MessageForwardFailures.Add(ctx, 1, metric.WithAttributes(attribute.String(labelType, labelTypeNotRegistered)))
|
||||
|
||||
if msg.RemoteKey == "dummy" {
|
||||
// Test message send during netbird status
|
||||
return &proto.EncryptedMessage{}, nil
|
||||
return nil, fmt.Errorf("peer %s is not registered", msg.Key)
|
||||
}
|
||||
|
||||
return s.dispatcher.SendMessage(ctx, msg)
|
||||
getRegistrationStart := time.Now()
|
||||
|
||||
if dstPeer, found := s.registry.Get(msg.RemoteKey); found {
|
||||
s.metrics.GetRegistrationDelay.Record(ctx, float64(time.Since(getRegistrationStart).Nanoseconds())/1e6, metric.WithAttributes(attribute.String(labelType, labelTypeMessage), attribute.String(labelRegistrationStatus, labelRegistrationFound)))
|
||||
start := time.Now()
|
||||
//forward the message to the target peer
|
||||
if err := dstPeer.Stream.Send(msg); err != nil {
|
||||
log.Errorf("error while forwarding message from peer [%s] to peer [%s] %v", msg.Key, msg.RemoteKey, err)
|
||||
//todo respond to the sender?
|
||||
|
||||
s.metrics.MessageForwardFailures.Add(ctx, 1, metric.WithAttributes(attribute.String(labelType, labelTypeError)))
|
||||
} else {
|
||||
s.metrics.MessageForwardLatency.Record(ctx, float64(time.Since(start).Nanoseconds())/1e6, metric.WithAttributes(attribute.String(labelType, labelTypeMessage)))
|
||||
s.metrics.MessagesForwarded.Add(context.Background(), 1)
|
||||
}
|
||||
} else {
|
||||
s.metrics.GetRegistrationDelay.Record(ctx, float64(time.Since(getRegistrationStart).Nanoseconds())/1e6, metric.WithAttributes(attribute.String(labelType, labelTypeMessage), attribute.String(labelRegistrationStatus, labelRegistrationNotFound)))
|
||||
log.Debugf("message from peer [%s] can't be forwarded to peer [%s] because destination peer is not connected", msg.Key, msg.RemoteKey)
|
||||
//todo respond to the sender?
|
||||
|
||||
s.metrics.MessageForwardFailures.Add(ctx, 1, metric.WithAttributes(attribute.String(labelType, labelTypeNotConnected)))
|
||||
}
|
||||
return &proto.EncryptedMessage{}, nil
|
||||
}
|
||||
|
||||
// ConnectStream connects to the exchange stream
|
||||
func (s *Server) ConnectStream(stream proto.SignalExchange_ConnectStreamServer) error {
|
||||
p, err := s.RegisterPeer(stream)
|
||||
p, err := s.connectPeer(stream)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer s.DeregisterPeer(p)
|
||||
startRegister := time.Now()
|
||||
|
||||
// needed to confirm that the peer has been registered so that the client can proceed
|
||||
s.metrics.ActivePeers.Add(stream.Context(), 1)
|
||||
|
||||
defer func() {
|
||||
log.Infof("peer disconnected [%s] [streamID %d] ", p.Id, p.StreamID)
|
||||
s.registry.Deregister(p)
|
||||
|
||||
s.metrics.PeerConnectionDuration.Record(stream.Context(), int64(time.Since(startRegister).Seconds()))
|
||||
s.metrics.ActivePeers.Add(context.Background(), -1)
|
||||
}()
|
||||
|
||||
//needed to confirm that the peer has been registered so that the client can proceed
|
||||
header := metadata.Pairs(proto.HeaderRegistered, "1")
|
||||
err = stream.SendHeader(header)
|
||||
if err != nil {
|
||||
@@ -94,10 +119,11 @@ func (s *Server) ConnectStream(stream proto.SignalExchange_ConnectStreamServer)
|
||||
return err
|
||||
}
|
||||
|
||||
log.Debugf("peer connected [%s] [streamID %d] ", p.Id, p.StreamID)
|
||||
log.Infof("peer connected [%s] [streamID %d] ", p.Id, p.StreamID)
|
||||
|
||||
for {
|
||||
// read incoming messages
|
||||
|
||||
//read incoming messages
|
||||
msg, err := stream.Recv()
|
||||
if err == io.EOF {
|
||||
break
|
||||
@@ -105,25 +131,44 @@ func (s *Server) ConnectStream(stream proto.SignalExchange_ConnectStreamServer)
|
||||
return err
|
||||
}
|
||||
|
||||
log.Debugf("Received a response from peer [%s] to peer [%s]", msg.Key, msg.RemoteKey)
|
||||
log.Debugf("received a new message from peer [%s] to peer [%s]", p.Id, msg.RemoteKey)
|
||||
|
||||
s.dispatcher.SendMessage(stream.Context(), msg)
|
||||
getRegistrationStart := time.Now()
|
||||
|
||||
// lookup the target peer where the message is going to
|
||||
if dstPeer, found := s.registry.Get(msg.RemoteKey); found {
|
||||
s.metrics.GetRegistrationDelay.Record(stream.Context(), float64(time.Since(getRegistrationStart).Nanoseconds())/1e6, metric.WithAttributes(attribute.String(labelType, labelTypeStream), attribute.String(labelRegistrationStatus, labelRegistrationFound)))
|
||||
start := time.Now()
|
||||
//forward the message to the target peer
|
||||
if err := dstPeer.Stream.Send(msg); err != nil {
|
||||
log.Errorf("error while forwarding message from peer [%s] to peer [%s] %v", p.Id, msg.RemoteKey, err)
|
||||
//todo respond to the sender?
|
||||
s.metrics.MessageForwardFailures.Add(stream.Context(), 1, metric.WithAttributes(attribute.String(labelType, labelTypeError)))
|
||||
} else {
|
||||
// in milliseconds
|
||||
s.metrics.MessageForwardLatency.Record(stream.Context(), float64(time.Since(start).Nanoseconds())/1e6, metric.WithAttributes(attribute.String(labelType, labelTypeStream)))
|
||||
s.metrics.MessagesForwarded.Add(stream.Context(), 1)
|
||||
}
|
||||
} else {
|
||||
s.metrics.GetRegistrationDelay.Record(stream.Context(), float64(time.Since(getRegistrationStart).Nanoseconds())/1e6, metric.WithAttributes(attribute.String(labelType, labelTypeStream), attribute.String(labelRegistrationStatus, labelRegistrationNotFound)))
|
||||
s.metrics.MessageForwardFailures.Add(stream.Context(), 1, metric.WithAttributes(attribute.String(labelType, labelTypeNotConnected)))
|
||||
log.Debugf("message from peer [%s] can't be forwarded to peer [%s] because destination peer is not connected", p.Id, msg.RemoteKey)
|
||||
//todo respond to the sender?
|
||||
}
|
||||
}
|
||||
|
||||
<-stream.Context().Done()
|
||||
return stream.Context().Err()
|
||||
}
|
||||
|
||||
func (s *Server) RegisterPeer(stream proto.SignalExchange_ConnectStreamServer) (*peer.Peer, error) {
|
||||
log.Debugf("registering new peer")
|
||||
// Handles initial Peer connection.
|
||||
// Each connection must provide an Id header.
|
||||
// At this moment the connecting Peer will be registered in the peer.Registry
|
||||
func (s Server) connectPeer(stream proto.SignalExchange_ConnectStreamServer) (*peer.Peer, error) {
|
||||
if meta, hasMeta := metadata.FromIncomingContext(stream.Context()); hasMeta {
|
||||
if id, found := meta[proto.HeaderId]; found {
|
||||
p := peer.NewPeer(id[0], stream)
|
||||
|
||||
s.registry.Register(p)
|
||||
s.dispatcher.ListenForMessages(stream.Context(), p.Id, s.forwardMessageToPeer)
|
||||
|
||||
s.metrics.ActivePeers.Add(stream.Context(), 1)
|
||||
|
||||
return p, nil
|
||||
} else {
|
||||
@@ -135,38 +180,3 @@ func (s *Server) RegisterPeer(stream proto.SignalExchange_ConnectStreamServer) (
|
||||
return nil, status.Errorf(codes.FailedPrecondition, "missing connection stream meta")
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) DeregisterPeer(p *peer.Peer) {
|
||||
log.Debugf("peer disconnected [%s] [streamID %d] ", p.Id, p.StreamID)
|
||||
s.registry.Deregister(p)
|
||||
|
||||
s.metrics.PeerConnectionDuration.Record(p.Stream.Context(), int64(time.Since(p.RegisteredAt).Seconds()))
|
||||
s.metrics.ActivePeers.Add(context.Background(), -1)
|
||||
}
|
||||
|
||||
func (s *Server) forwardMessageToPeer(ctx context.Context, msg *proto.EncryptedMessage) {
|
||||
log.Debugf("forwarding a new message from peer [%s] to peer [%s]", msg.Key, msg.RemoteKey)
|
||||
|
||||
getRegistrationStart := time.Now()
|
||||
|
||||
// lookup the target peer where the message is going to
|
||||
if dstPeer, found := s.registry.Get(msg.RemoteKey); found {
|
||||
s.metrics.GetRegistrationDelay.Record(ctx, float64(time.Since(getRegistrationStart).Nanoseconds())/1e6, metric.WithAttributes(attribute.String(labelType, labelTypeStream), attribute.String(labelRegistrationStatus, labelRegistrationFound)))
|
||||
start := time.Now()
|
||||
// forward the message to the target peer
|
||||
if err := dstPeer.Stream.Send(msg); err != nil {
|
||||
log.Warnf("error while forwarding message from peer [%s] to peer [%s] %v", msg.Key, msg.RemoteKey, err)
|
||||
// todo respond to the sender?
|
||||
s.metrics.MessageForwardFailures.Add(ctx, 1, metric.WithAttributes(attribute.String(labelType, labelTypeError)))
|
||||
} else {
|
||||
// in milliseconds
|
||||
s.metrics.MessageForwardLatency.Record(ctx, float64(time.Since(start).Nanoseconds())/1e6, metric.WithAttributes(attribute.String(labelType, labelTypeStream)))
|
||||
s.metrics.MessagesForwarded.Add(ctx, 1)
|
||||
}
|
||||
} else {
|
||||
s.metrics.GetRegistrationDelay.Record(ctx, float64(time.Since(getRegistrationStart).Nanoseconds())/1e6, metric.WithAttributes(attribute.String(labelType, labelTypeStream), attribute.String(labelRegistrationStatus, labelRegistrationNotFound)))
|
||||
s.metrics.MessageForwardFailures.Add(ctx, 1, metric.WithAttributes(attribute.String(labelType, labelTypeNotConnected)))
|
||||
log.Debugf("message from peer [%s] can't be forwarded to peer [%s] because destination peer is not connected", msg.Key, msg.RemoteKey)
|
||||
// todo respond to the sender?
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user