Skip to content

Commit 070507b

Browse files
matovaltyraziel
andauthored
Add env vars for the start duration and amount of retries for kube api retries (#1425)
Co-authored-by: Andrew Potozniak <tyraziel@gmail.com>
1 parent c670d8d commit 070507b

3 files changed

Lines changed: 378 additions & 92 deletions

File tree

docs/source/user_guide/configuration_options.rst

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -942,7 +942,46 @@ Work Kubernetes
942942
work-kubernetes:
943943
- worktype: cat
944944
945+
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
946+
Kubernetes Environment Variables
947+
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
945948

949+
The following environment variables can be used to configure Kubernetes worker behavior:
950+
951+
.. note::
952+
The environment variable ``RECEPTOR_OPEN_LOGSTREAM_TIMEOUT`` has been replaced with ``RECEPTOR_KUBE_TIMEOUT_START``. The new variable controls the initial sleep duration for all Kubernetes API retry operations using Fibonacci backoff, not just log stream timeouts.
953+
954+
.. list-table:: Kubernetes Environment Variables
955+
:header-rows: 1
956+
:widths: auto
957+
958+
* - Variable
959+
- Description
960+
- Default value
961+
- Valid range
962+
- Type
963+
* - ``RECEPTOR_KUBE_TIMEOUT_START``
964+
- Initial timeout duration between Kubernetes API retry attempts. Valid time units: "ns", "ms", "s", "m", "h"
965+
- 1s
966+
- Any valid duration up to 1m
967+
- string (duration)
968+
* - ``RECEPTOR_KUBE_RETRY_COUNT``
969+
- Number of retry attempts for Kubernetes API operations. Uses exponential backoff with Fibonacci-like sequence.
970+
- 5
971+
- 1-100
972+
- int
973+
974+
**Important Notes:**
975+
976+
- **Fibonacci Backoff**: Retry delays increase by Fibonacci increments. For example, with ``RECEPTOR_KUBE_TIMEOUT_START=1s``, retry delays will be: 1s, 2s, 3s, 5s, 8s, etc.
977+
- **Timeout Start Limit**: ``RECEPTOR_KUBE_TIMEOUT_START`` values exceeding 1 minute will be capped at the maximum of 1 minute.
978+
- **Maximum Sleep Duration**: Individual sleep durations are capped at 5 minutes to prevent extremely long waits.
979+
- **Performance Impact**: High retry counts can result in very long wait times. Consider the total time impact when setting these values.
980+
981+
.. code-block:: bash
982+
983+
export RECEPTOR_KUBE_TIMEOUT_START=500ms
984+
export RECEPTOR_KUBE_RETRY_COUNT=3
946985
947986
948987
^^^^^^^^^^^

pkg/workceptor/kubernetes.go

Lines changed: 69 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"errors"
1010
"fmt"
1111
"io"
12+
"math"
1213
"net"
1314
"net/url"
1415
"os"
@@ -226,23 +227,68 @@ func podRunningAndReady(kw KubeUnit) func(event watch.Event) (bool, error) {
226227
return inner
227228
}
228229

229-
func GetTimeoutOpenLogstream(kw *KubeUnit) int {
230-
// RECEPTOR_OPEN_LOGSTREAM_TIMEOUT
231-
// default: 1
232-
openLogStreamTimeout := 1
233-
envTimeout := os.Getenv("RECEPTOR_OPEN_LOGSTREAM_TIMEOUT")
230+
func (kw *KubeUnit) GetKubeTimeoutStart() time.Duration {
231+
// RECEPTOR_KUBE_TIMEOUT_START
232+
// default: 1 second
233+
kubeTimeoutStart := 1 * time.Second
234+
envTimeout := os.Getenv("RECEPTOR_KUBE_TIMEOUT_START")
234235
if envTimeout != "" {
235236
var err error
236-
openLogStreamTimeout, err = strconv.Atoi(envTimeout)
237-
if err != nil || openLogStreamTimeout < 1 {
237+
kubeTimeoutStart, err = time.ParseDuration(envTimeout)
238+
if err != nil || kubeTimeoutStart <= 0 {
238239
// ignore error, use default
239-
kw.GetWorkceptor().nc.GetLogger().Warning("Invalid value for RECEPTOR_OPEN_LOGSTREAM_TIMEOUT: %s. Ignoring", envTimeout)
240-
openLogStreamTimeout = 1
240+
kw.GetWorkceptor().nc.GetLogger().Warning("Invalid value for RECEPTOR_KUBE_TIMEOUT_START: %s. Ignoring", envTimeout)
241+
kubeTimeoutStart = 1 * time.Second
241242
}
243+
// ignore if exceeds limit, use max
244+
if kubeTimeoutStart > time.Minute*1 {
245+
kw.GetWorkceptor().nc.GetLogger().Warning("RECEPTOR_KUBE_TIMEOUT_START of: %d is larger than the max timeout of 1m. Max of 1m will be used", kubeTimeoutStart)
246+
kubeTimeoutStart = time.Minute * 1
247+
}
248+
}
249+
kw.GetWorkceptor().nc.GetLogger().Debug("RECEPTOR_KUBE_TIMEOUT_START: %s", kubeTimeoutStart)
250+
251+
return kubeTimeoutStart
252+
}
253+
254+
func (kw *KubeUnit) GetKubeRetryCount() int {
255+
// RECEPTOR_KUBE_RETRY_COUNT
256+
// default: 5
257+
kubeRetryCount := 5
258+
envRetryCount := os.Getenv("RECEPTOR_KUBE_RETRY_COUNT")
259+
if envRetryCount != "" {
260+
var err error
261+
kubeRetryCount, err = strconv.Atoi(envRetryCount)
262+
if err != nil || kubeRetryCount < 1 {
263+
// ignore error, use default
264+
kw.GetWorkceptor().nc.GetLogger().Warning("Invalid value for RECEPTOR_KUBE_RETRY_COUNT: %s. Default of 5 will be used", envRetryCount)
265+
kubeRetryCount = 5
266+
}
267+
// ignore if exceeds limit, use max retry
268+
if kubeRetryCount > 100 {
269+
kw.GetWorkceptor().nc.GetLogger().Warning("RECEPTOR_KUBE_RETRY_COUNT of: %d is larger than the max retry count of 100. Retry count of 100 will be used", kubeRetryCount)
270+
kubeRetryCount = 100
271+
}
272+
}
273+
kw.GetWorkceptor().nc.GetLogger().Debug("RECEPTOR_KUBE_RETRY_COUNT: %d", kubeRetryCount)
274+
275+
return kubeRetryCount
276+
}
277+
278+
func (kw *KubeUnit) GetSleepDuration(multipler int) time.Duration {
279+
maxSleepDuration := time.Minute * 5
280+
baseTimeout := int64(kw.GetKubeTimeoutStart())
281+
282+
if baseTimeout > 0 && int64(multipler) > math.MaxInt64/baseTimeout {
283+
return maxSleepDuration
284+
}
285+
286+
sleepDuration := kw.GetKubeTimeoutStart() * time.Duration(multipler)
287+
if sleepDuration > maxSleepDuration {
288+
return maxSleepDuration
242289
}
243-
kw.GetWorkceptor().nc.GetLogger().Debug("RECEPTOR_OPEN_LOGSTREAM_TIMEOUT: %d", openLogStreamTimeout)
244290

245-
return openLogStreamTimeout
291+
return sleepDuration
246292
}
247293

248294
func (kw *KubeUnit) kubeLoggingConnectionHandler(timestamps bool, sinceTime time.Time) (io.ReadCloser, error) {
@@ -261,7 +307,7 @@ func (kw *KubeUnit) kubeLoggingConnectionHandler(timestamps bool, sinceTime time
261307

262308
logReq := kw.KubeAPIWrapperInstance.GetLogs(kw.clientset, podNamespace, podName, podOptions)
263309
// get logstream, with retry
264-
for retries := 5; retries > 0; retries-- {
310+
for retries := kw.GetKubeRetryCount(); retries > 0; retries-- {
265311
logStream, err = logReq.Stream(kw.GetContext())
266312
if err == nil {
267313
break
@@ -273,7 +319,7 @@ func (kw *KubeUnit) kubeLoggingConnectionHandler(timestamps bool, sinceTime time
273319
retries,
274320
err,
275321
)
276-
time.Sleep(time.Duration(GetTimeoutOpenLogstream(kw)) * time.Second)
322+
time.Sleep(kw.GetKubeTimeoutStart())
277323
}
278324
if err != nil {
279325
errMsg := fmt.Sprintf("Error opening log stream for pod %s/%s. Error: %s", podNamespace, podName, err)
@@ -320,7 +366,7 @@ func (kw *KubeUnit) KubeLoggingWithReconnect(streamWait *sync.WaitGroup, stdout
320366
podNamespace := kw.Pod.Namespace
321367
podName := kw.Pod.Name
322368

323-
retries := 5
369+
retries := kw.GetKubeRetryCount()
324370
prevDelay, curDelay := 0, 1
325371
prevPodDelay, curPodDelay := 0, 1
326372
prevContainerDelay, curContainerDelay := 0, 1
@@ -346,7 +392,7 @@ mainLoop:
346392
retryGetPod,
347393
err,
348394
)
349-
time.Sleep(time.Second * time.Duration(curPodDelay))
395+
time.Sleep(kw.GetSleepDuration(curPodDelay))
350396
prevPodDelay, curPodDelay = curPodDelay, prevPodDelay+curPodDelay
351397
}
352398
if err != nil {
@@ -404,7 +450,7 @@ mainLoop:
404450
retryGetLogStream,
405451
)
406452

407-
time.Sleep(time.Second * time.Duration(curDelay))
453+
time.Sleep(kw.GetSleepDuration(curDelay))
408454
prevDelay, curDelay = curDelay, prevDelay+curDelay
409455

410456
continue mainLoop
@@ -469,7 +515,7 @@ mainLoop:
469515
retryGetLogStream,
470516
)
471517

472-
time.Sleep(time.Second * time.Duration(curContainerDelay))
518+
time.Sleep(kw.GetSleepDuration(curContainerDelay))
473519
prevContainerDelay, curContainerDelay = curContainerDelay, prevContainerDelay+curContainerDelay
474520

475521
continue mainLoop
@@ -919,7 +965,7 @@ func (kw *KubeUnit) RunWorkUsingLogger() {
919965
kw.UpdateBasicStatus(WorkStateRunning, "Pod Running", stdout.Size())
920966
streamWait.Done()
921967
} else {
922-
retryCount := 5
968+
retryCount := kw.GetKubeRetryCount()
923969
prevPodDelay, curPodDelay := 1, 1
924970
prevContainerDelay, curContainerDelay := 1, 1
925971
podLoop:
@@ -933,7 +979,7 @@ func (kw *KubeUnit) RunWorkUsingLogger() {
933979
if retryCount > 0 {
934980
kw.GetWorkceptor().nc.GetLogger().Debug("Error getting pod while trying to attach stdin: '%s' , continuing try to get pod up to %v more times.", kubeErr, retryCount)
935981

936-
time.Sleep(time.Second * time.Duration(curPodDelay))
982+
time.Sleep(kw.GetSleepDuration(curPodDelay))
937983
prevPodDelay, curPodDelay = curPodDelay, prevPodDelay+curPodDelay
938984

939985
continue
@@ -944,7 +990,7 @@ func (kw *KubeUnit) RunWorkUsingLogger() {
944990

945991
return
946992
}
947-
retryCount = 5
993+
retryCount = kw.GetKubeRetryCount()
948994

949995
var containerState corev1.ContainerState
950996
foundContainer := false
@@ -974,7 +1020,7 @@ func (kw *KubeUnit) RunWorkUsingLogger() {
9741020
if retryCount > 0 {
9751021
kw.GetWorkceptor().nc.GetLogger().Debug("Container in %s pod is waiting, will retry %v more times.", podName, retryCount)
9761022

977-
time.Sleep(time.Second * time.Duration(curContainerDelay))
1023+
time.Sleep(kw.GetSleepDuration(curContainerDelay))
9781024
prevContainerDelay, curContainerDelay = curContainerDelay, prevContainerDelay+curContainerDelay
9791025

9801026
continue podLoop
@@ -995,7 +1041,7 @@ func (kw *KubeUnit) RunWorkUsingLogger() {
9951041
if retryCount > 0 {
9961042
kw.GetWorkceptor().nc.GetLogger().Debug("%s is in an unexpected container state %s. This is unexpected. Will retry %v more times.", podName, containerState, retryCount)
9971043

998-
time.Sleep(time.Second * time.Duration(curContainerDelay))
1044+
time.Sleep(kw.GetSleepDuration(curContainerDelay))
9991045
prevContainerDelay, curContainerDelay = curContainerDelay, prevContainerDelay+curContainerDelay
10001046

10011047
continue podLoop
@@ -1018,7 +1064,7 @@ func (kw *KubeUnit) RunWorkUsingLogger() {
10181064
})
10191065

10201066
var err error
1021-
for retries := 5; retries > 0; retries-- {
1067+
for retries := kw.GetKubeRetryCount(); retries > 0; retries-- {
10221068
err = kw.KubeAPIWrapperInstance.StreamWithContext(kw.GetContext(), exec, remotecommand.StreamOptions{
10231069
Stdin: stdin,
10241070
Tty: false,

0 commit comments

Comments
 (0)