Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion cmd/activator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ type config struct {
// TODO: run loadtests using these flags to determine optimal default values.
MaxIdleProxyConns int `split_words:"true" default:"1000"`
MaxIdleProxyConnsPerHost int `split_words:"true" default:"100"`

ProbeTimeout string `split_words:"true" default:"300ms"`
ProbeFrequency string `split_words:"true" default:"200ms"`
}

func main() {
Expand Down Expand Up @@ -158,7 +161,8 @@ func main() {
// transport so that throttler probe connections can be reused after probing
// (via keep-alive) to send real requests, avoiding needing an extra
// reconnect for the first request after the probe succeeds.
logger.Debugf("MaxIdleProxyConns: %d, MaxIdleProxyConnsPerHost: %d", env.MaxIdleProxyConns, env.MaxIdleProxyConnsPerHost)
logger.Debugf("MaxIdleProxyConns: %d, MaxIdleProxyConnsPerHost: %d, ProbeTimeout: %s, ProbeFrequency: %s",
env.MaxIdleProxyConns, env.MaxIdleProxyConnsPerHost, env.ProbeTimeout, env.ProbeFrequency)
transport := pkgnet.NewProxyAutoTransport(env.MaxIdleProxyConns, env.MaxIdleProxyConnsPerHost)

// Fetch networking configuration to determine whether EnableMeshPodAddressability
Expand Down Expand Up @@ -189,6 +193,16 @@ func main() {
transport = pkgnet.NewProxyAutoTLSTransport(env.MaxIdleProxyConns, env.MaxIdleProxyConnsPerHost, certCache.TLSContext())
}

probeTimeout, err := time.ParseDuration(env.ProbeTimeout)
if err != nil {
logger.Fatalw("Failed to parse PROBE_TIMEOUT", zap.String("value", env.ProbeTimeout), zap.Error(err))
}
probeFrequency, err := time.ParseDuration(env.ProbeFrequency)
if err != nil {
logger.Fatalw("Failed to parse PROBE_FREQUENCY", zap.String("value", env.ProbeFrequency), zap.Error(err))
}
activatornet.SetProbeSettings(probeTimeout, probeFrequency)

// Start throttler.
throttler := activatornet.NewThrottler(ctx, env.PodIP)
go throttler.Run(ctx, transport, networkConfig.EnableMeshPodAddressability, networkConfig.MeshCompatibilityMode)
Expand Down
35 changes: 17 additions & 18 deletions pkg/activator/net/revision_backends.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,20 @@ import (
"knative.dev/serving/pkg/reconciler/serverlessservice/resources/names"
)

var (
// probeTimeout is the timeout for individual probe requests.
probeTimeout = 300 * time.Millisecond

// probeFrequency is the frequency at which probes are performed.
probeFrequency = 200 * time.Millisecond
)

// SetProbeSettings sets the probe timeout and frequency.
func SetProbeSettings(timeout, frequency time.Duration) {
probeTimeout = timeout
probeFrequency = frequency
}

// revisionDestsUpdate contains the state of healthy l4 dests for talking to a revision and is the
// primary output from the RevisionBackendsManager system. If a healthy ClusterIP is found then
// ClusterIPDest will be set to non empty string and Dests will be nil. Otherwise Dests will be set
Expand Down Expand Up @@ -87,11 +101,6 @@ func (d dests) MarshalLogObject(enc zapcore.ObjectEncoder) error {
return nil
}

const (
probeTimeout time.Duration = 300 * time.Millisecond
defaultProbeFrequency time.Duration = 200 * time.Millisecond
)

// revisionWatcher watches the podIPs and ClusterIP of the service for a revision. It implements the logic
// to supply revisionDestsUpdate events on updateCh
type revisionWatcher struct {
Expand Down Expand Up @@ -408,7 +417,7 @@ func (rw *revisionWatcher) checkDests(curDests, prevDests dests) {
}
}

func (rw *revisionWatcher) run(probeFrequency time.Duration) {
func (rw *revisionWatcher) run() {
defer close(rw.done)

var curDests, prevDests dests
Expand Down Expand Up @@ -459,19 +468,10 @@ type revisionBackendsManager struct {
usePassthroughLb bool
meshMode netcfg.MeshCompatibilityMode
logger *zap.SugaredLogger
probeFrequency time.Duration
}

// NewRevisionBackendsManager returns a new RevisionBackendsManager with default
// probe time out.
// newRevisionBackendsManager returns a new RevisionBackendsManager.
func newRevisionBackendsManager(ctx context.Context, tr http.RoundTripper, usePassthroughLb bool, meshMode netcfg.MeshCompatibilityMode) *revisionBackendsManager {
return newRevisionBackendsManagerWithProbeFrequency(ctx, tr, usePassthroughLb, meshMode, defaultProbeFrequency)
}

// newRevisionBackendsManagerWithProbeFrequency creates a fully spec'd RevisionBackendsManager.
func newRevisionBackendsManagerWithProbeFrequency(ctx context.Context, tr http.RoundTripper,
usePassthroughLb bool, meshMode netcfg.MeshCompatibilityMode, probeFreq time.Duration,
) *revisionBackendsManager {
rbm := &revisionBackendsManager{
ctx: ctx,
revisionLister: revisioninformer.Get(ctx).Lister(),
Expand All @@ -482,7 +482,6 @@ func newRevisionBackendsManagerWithProbeFrequency(ctx context.Context, tr http.R
usePassthroughLb: usePassthroughLb,
meshMode: meshMode,
logger: logging.FromContext(ctx),
probeFrequency: probeFreq,
}
endpointsInformer := endpointsinformer.Get(ctx)
endpointsInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
Expand Down Expand Up @@ -567,7 +566,7 @@ func (rbm *revisionBackendsManager) getOrCreateRevisionWatcher(revID types.Names
destsCh := make(chan dests)
rw := newRevisionWatcher(rbm.ctx, revID, rev.GetProtocol(), rbm.updateCh, destsCh, rbm.transport, rbm.serviceLister, rbm.usePassthroughLb, rbm.meshMode, enableProbeOptimisation, rbm.logger)
rbm.revisionWatchers[revID] = rw
go rw.run(rbm.probeFrequency)
go rw.run()
return rw, nil
}

Expand Down
21 changes: 14 additions & 7 deletions pkg/activator/net/revision_backends_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ const (
testNamespace = "test-namespace"
testRevision = "test-revision"

probeFreq = 50 * time.Millisecond
updateTimeout = 16 * probeFreq
probeFreq = 50 * time.Millisecond
updateTimeout = 16 * probeFreq
defaultProbeTimeout = 300 * time.Millisecond

meshErrorStatusCode = http.StatusServiceUnavailable
)
Expand Down Expand Up @@ -544,6 +545,7 @@ func TestRevisionWatcher(t *testing.T) {
close(updateCh)
}()

SetProbeSettings(defaultProbeTimeout, probeFreq)
rw := newRevisionWatcher(
ctx,
revID,
Expand All @@ -562,7 +564,7 @@ func TestRevisionWatcher(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
rw.run(probeFreq)
rw.run()
}()

destsCh <- tc.dests
Expand Down Expand Up @@ -993,7 +995,8 @@ func TestRevisionBackendManagerAddEndpoint(t *testing.T) {
t.Fatal("Failed to start informers:", err)
}

rbm := newRevisionBackendsManagerWithProbeFrequency(ctx, rt, false /*usePassthroughLb*/, netcfg.MeshCompatibilityModeAuto, probeFreq)
SetProbeSettings(defaultProbeTimeout, probeFreq)
rbm := newRevisionBackendsManager(ctx, rt, false /*usePassthroughLb*/, netcfg.MeshCompatibilityModeAuto)
defer func() {
cancel()
waitInformers()
Expand Down Expand Up @@ -1456,7 +1459,8 @@ func TestRevisionDeleted(t *testing.T) {
ri.Informer().GetIndexer().Add(rev)

fakeRT := activatortest.FakeRoundTripper{}
rbm := newRevisionBackendsManagerWithProbeFrequency(ctx, pkgnetwork.RoundTripperFunc(fakeRT.RT), false /*usePassthroughLb*/, netcfg.MeshCompatibilityModeAuto, probeFreq)
SetProbeSettings(defaultProbeTimeout, probeFreq)
rbm := newRevisionBackendsManager(ctx, pkgnetwork.RoundTripperFunc(fakeRT.RT), false /*usePassthroughLb*/, netcfg.MeshCompatibilityModeAuto)
defer func() {
cancel()
waitInformers()
Expand Down Expand Up @@ -1512,7 +1516,8 @@ func TestServiceDoesNotExist(t *testing.T) {
}},
},
}
rbm := newRevisionBackendsManagerWithProbeFrequency(ctx, pkgnetwork.RoundTripperFunc(fakeRT.RT), false /*usePassthroughLb*/, netcfg.MeshCompatibilityModeAuto, probeFreq)
SetProbeSettings(defaultProbeTimeout, probeFreq)
rbm := newRevisionBackendsManager(ctx, pkgnetwork.RoundTripperFunc(fakeRT.RT), false /*usePassthroughLb*/, netcfg.MeshCompatibilityModeAuto)
defer func() {
cancel()
waitInformers()
Expand Down Expand Up @@ -1576,7 +1581,8 @@ func TestServiceMoreThanOne(t *testing.T) {
}},
},
}
rbm := newRevisionBackendsManagerWithProbeFrequency(ctx, pkgnetwork.RoundTripperFunc(fakeRT.RT), false /*usePassthroughLb*/, netcfg.MeshCompatibilityModeAuto, probeFreq)
SetProbeSettings(defaultProbeTimeout, probeFreq)
rbm := newRevisionBackendsManager(ctx, pkgnetwork.RoundTripperFunc(fakeRT.RT), false /*usePassthroughLb*/, netcfg.MeshCompatibilityModeAuto)
defer func() {
cancel()
waitInformers()
Expand Down Expand Up @@ -1848,6 +1854,7 @@ func TestProbePodIPs(t *testing.T) {
}

// Minimally constructed revisionWatcher just to have what is needed for probing
SetProbeSettings(defaultProbeTimeout, probeFreq)
rw := &revisionWatcher{
rev: types.NamespacedName{Namespace: testNamespace, Name: testRevision},
logger: TestLogger(t),
Expand Down
Loading