Skip to content

Commit 246f066

Browse files
committed
Add option to disable auth on streaming endpoint
1 parent 2556a60 commit 246f066

5 files changed

Lines changed: 46 additions & 27 deletions

File tree

common/authorization/interceptor.go

Lines changed: 32 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ type Interceptor struct {
8989
authHeaderName string
9090
authExtraHeaderName string
9191
enableCrossNamespaceCommands dynamicconfig.BoolPropertyFn
92+
disableStreamingAuthorizer dynamicconfig.BoolPropertyFn
9293
}
9394

9495
// NewInterceptor creates an authorization interceptor.
@@ -102,6 +103,7 @@ func NewInterceptor(
102103
authHeaderName string,
103104
authExtraHeaderName string,
104105
enableCrossNamespaceCommands dynamicconfig.BoolPropertyFn,
106+
disableStreamingAuthorizer dynamicconfig.BoolPropertyFn,
105107
) *Interceptor {
106108
return &Interceptor{
107109
claimMapper: claimMapper,
@@ -113,6 +115,7 @@ func NewInterceptor(
113115
authExtraHeaderName: cmp.Or(authExtraHeaderName, defaultAuthExtraHeaderName),
114116
audienceGetter: audienceGetter,
115117
enableCrossNamespaceCommands: enableCrossNamespaceCommands,
118+
disableStreamingAuthorizer: disableStreamingAuthorizer,
116119
}
117120
}
118121

@@ -174,34 +177,37 @@ func (a *Interceptor) InterceptStream(
174177
handler grpc.StreamHandler,
175178
) error {
176179
ctx := ss.Context()
177-
tlsConnection := TLSInfoFromContext(ctx)
178-
179-
authInfo := a.GetAuthInfo(tlsConnection, headers.NewGRPCHeaderGetter(ctx), func() string {
180-
// JWTAudienceMapper only supports UnaryServerInfo; no request is available at stream init.
181-
return ""
182-
})
183-
184-
var claims *Claims
185-
if authInfo != nil {
186-
var err error
187-
claims, err = a.GetClaims(authInfo)
188-
if err != nil {
189-
a.logger.Error("Authorization error", tag.Error(err))
190-
return errUnauthorized
180+
bypassAuth := a.disableStreamingAuthorizer()
181+
if !bypassAuth {
182+
tlsConnection := TLSInfoFromContext(ctx)
183+
184+
authInfo := a.GetAuthInfo(tlsConnection, headers.NewGRPCHeaderGetter(ctx), func() string {
185+
// JWTAudienceMapper only supports UnaryServerInfo; no request is available at stream init.
186+
return ""
187+
})
188+
189+
var claims *Claims
190+
if authInfo != nil {
191+
var err error
192+
claims, err = a.GetClaims(authInfo)
193+
if err != nil {
194+
a.logger.Error("Authorization error", tag.Error(err))
195+
return errUnauthorized
196+
}
197+
ctx = a.EnhanceContext(ctx, authInfo, claims)
191198
}
192-
ctx = a.EnhanceContext(ctx, authInfo, claims)
193-
}
194199

195-
if a.authorizer != nil {
196-
// Namespace is not available in the stream handshake (no initial request body).
197-
ct := &CallTarget{
198-
Namespace: "",
199-
APIName: info.FullMethod,
200-
Request: nil,
201-
}
202-
if err := a.Authorize(ctx, claims, ct); err != nil {
203-
a.logger.Error("Authorization error", tag.Error(err))
204-
return err
200+
if a.authorizer != nil {
201+
// Namespace is not available in the stream handshake (no initial request body).
202+
ct := &CallTarget{
203+
Namespace: "",
204+
APIName: info.FullMethod,
205+
Request: nil,
206+
}
207+
if err := a.Authorize(ctx, claims, ct); err != nil {
208+
a.logger.Error("Authorization error", tag.Error(err))
209+
return err
210+
}
205211
}
206212
}
207213

common/authorization/interceptor_test.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ func (s *authorizerInterceptorSuite) SetupTest() {
8282
"",
8383
"",
8484
dynamicconfig.GetBoolPropertyFn(false), // enableCrossNamespaceCommands
85+
dynamicconfig.GetBoolPropertyFn(false), // disableStreamingAuthorizer
8586
)
8687
s.handler = func(ctx context.Context, req interface{}) (interface{}, error) { return true, nil }
8788
}
@@ -165,6 +166,7 @@ func (s *authorizerInterceptorSuite) TestNoopClaimMapperWithoutTLS() {
165166
"",
166167
"",
167168
dynamicconfig.GetBoolPropertyFn(false), // enableCrossNamespaceCommands
169+
dynamicconfig.GetBoolPropertyFn(false), // disableStreamingAuthorizer
168170
)
169171
_, err := interceptor.Intercept(ctx, describeNamespaceRequest, describeNamespaceInfo, s.handler)
170172
s.NoError(err)
@@ -181,6 +183,7 @@ func (s *authorizerInterceptorSuite) TestAlternateHeaders() {
181183
"custom-header",
182184
"custom-extra-header",
183185
dynamicconfig.GetBoolPropertyFn(false), // enableCrossNamespaceCommands
186+
dynamicconfig.GetBoolPropertyFn(false), // disableStreamingAuthorizer
184187
)
185188

186189
cases := []struct {
@@ -288,7 +291,8 @@ func (s *authorizerInterceptorSuite) newCrossNamespaceInterceptor(namespaces ...
288291
nil,
289292
"",
290293
"",
291-
dynamicconfig.GetBoolPropertyFn(true), // enableCrossNamespaceCommands
294+
dynamicconfig.GetBoolPropertyFn(true), // enableCrossNamespaceCommands
295+
dynamicconfig.GetBoolPropertyFn(false), // disableStreamingAuthorizer
292296
)
293297
}
294298

@@ -605,6 +609,7 @@ func (s *authorizerInterceptorSuite) TestInterceptStream_AuthDisabled() {
605609
"",
606610
"",
607611
dynamicconfig.GetBoolPropertyFn(false),
612+
dynamicconfig.GetBoolPropertyFn(false),
608613
)
609614

610615
handlerCalled := false
@@ -630,6 +635,7 @@ func (s *authorizerInterceptorSuite) TestInterceptStream_InvalidToken() {
630635
"",
631636
"",
632637
dynamicconfig.GetBoolPropertyFn(false),
638+
dynamicconfig.GetBoolPropertyFn(false),
633639
)
634640

635641
// Provide an incoming context with an auth token so GetAuthInfo returns non-nil.

common/dynamicconfig/constants.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,11 @@ for signal / start / signal with start API if namespace is not active`,
148148
true,
149149
`EnableCrossNamespaceCommands is the key to enable commands for external namespaces`,
150150
)
151+
DisableStreamingAuthorizer = NewGlobalBoolSetting(
152+
"system.disableStreamingAuthorizer",
153+
false,
154+
`DisableStreamingAuthorizer is the key to disable the auth on streaming endpoint`,
155+
)
151156
ClusterMetadataRefreshInterval = NewGlobalDurationSetting(
152157
"system.clusterMetadataRefreshInterval",
153158
time.Minute,

service/frontend/fx.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ func AuthorizationInterceptorProvider(
171171
cfg.Global.Authorization.AuthHeaderName,
172172
cfg.Global.Authorization.AuthExtraHeaderName,
173173
dynamicconfig.EnableCrossNamespaceCommands.Get(dc),
174+
dynamicconfig.DisableStreamingAuthorizer.Get(dc),
174175
)
175176
}
176177

service/frontend/nexus_handler_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ func newOperationContext(options contextOptions) *operationContext {
128128
"",
129129
"",
130130
dynamicconfig.GetBoolPropertyFn(false), // enableCrossNamespaceCommands
131+
dynamicconfig.GetBoolPropertyFn(false), // disableStreamingAuthorizer
131132
)
132133
oc.namespaceConcurrencyLimitInterceptor = interceptor.NewConcurrentRequestLimitInterceptor(
133134
nil,

0 commit comments

Comments
 (0)